summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2012-09-11 19:44:43 -0700
committerJosh Blum2012-09-11 19:44:43 -0700
commit80ad579996caebe4150a00b727fcd568730be53f (patch)
treea1b5fdf869ff54e62c703f294af4a2c6a43c7e4b
parent3e820aba9c4ef292508c00b686d54eaca040c200 (diff)
downloadsandhi-80ad579996caebe4150a00b727fcd568730be53f.tar.gz
sandhi-80ad579996caebe4150a00b727fcd568730be53f.tar.bz2
sandhi-80ad579996caebe4150a00b727fcd568730be53f.zip
simplify interruptible_thread use, give it a bound callable object
-rw-r--r--lib/block_handlers.cpp6
-rw-r--r--lib/block_task.cpp16
-rw-r--r--lib/element_impl.hpp7
-rw-r--r--lib/gras_impl/interruptible_thread.hpp25
4 files changed, 27 insertions, 27 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 3e861be..5811acc 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -17,6 +17,7 @@
#include "element_impl.hpp"
#include <gras_impl/vector_utils.hpp>
#include <boost/make_shared.hpp>
+#include <boost/bind.hpp>
using namespace gnuradio;
@@ -68,8 +69,9 @@ void ElementImpl::handle_block_msg(
this->interruptible_thread.reset(); //erase old one
if (task_iface.get_num_inputs() == 0) //its a source
{
- this->interruptible_thread =
- boost::make_shared<InterruptibleThread>(this->thread_group);
+ this->interruptible_thread = boost::make_shared<InterruptibleThread>(
+ this->thread_group, boost::bind(&ElementImpl::task_work, this)
+ );
}
return;
}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 3556bac..b0fa45a 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -182,23 +182,19 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
if (this->enable_fixed_rate) work_noutput_items = std::min(
work_noutput_items, myulround((num_input_items)*this->relative_rate));
this->work_task_iface = task_iface;
- int ret = -1;
+ this->work_ret = -1;
if (this->interruptible_thread)
{
- this->interruptible_thread->block = block_ptr;
- this->interruptible_thread->ret = &ret;
- this->interruptible_thread->input_items = &this->input_items;
- this->interruptible_thread->output_items = &this->output_items;
this->interruptible_thread->call();
}
else
{
- ret = block_ptr->Work(this->input_items, this->output_items);
+ this->task_work();
}
this->work_task_iface.reset();
- const size_t noutput_items = size_t(ret);
+ const size_t noutput_items = size_t(work_ret);
- if (ret == Block::WORK_DONE)
+ if (work_ret == Block::WORK_DONE)
{
this->mark_done(task_iface);
return;
@@ -210,7 +206,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
bool input_allows_flush = true;
for (size_t i = 0; i < num_inputs; i++)
{
- ASSERT(enable_fixed_rate or ret != Block::WORK_CALLED_PRODUCE);
+ ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE);
const size_t items = (enable_fixed_rate)? (myulround((noutput_items/this->relative_rate))) : this->consume_items[i];
this->consume_items[i] = 0;
@@ -224,7 +220,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//------------------------------------------------------------------
for (size_t i = 0; i < num_outputs; i++)
{
- const size_t items = (ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items;
+ const size_t items = (work_ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items;
this->produce_items[i] = 0;
if (items == 0) continue;
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index b4f2c1b..a9f272b 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -121,6 +121,13 @@ struct ElementImpl
void conclusion(const tsbe::TaskInterface &task_iface, const bool);
void buffer_returner(const size_t index, SBuffer &buffer);
+ //work helpers
+ int work_ret;
+ inline void task_work(void)
+ {
+ this->work_ret = block_ptr->Work(this->input_items, this->output_items);
+ }
+
//is the fg running?
enum
{
diff --git a/lib/gras_impl/interruptible_thread.hpp b/lib/gras_impl/interruptible_thread.hpp
index a480e78..1e99111 100644
--- a/lib/gras_impl/interruptible_thread.hpp
+++ b/lib/gras_impl/interruptible_thread.hpp
@@ -17,9 +17,9 @@
#ifndef INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP
#define INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP
-#include <gnuradio/block.hpp>
#include <gras_impl/debug.hpp>
#include <boost/bind.hpp>
+#include <boost/function.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
@@ -47,11 +47,12 @@ namespace gnuradio
struct InterruptibleThread
{
+ typedef boost::function<void(void)> Callable;
- InterruptibleThread(SharedThreadGroup thread_group):
- _thread_group(thread_group)
+ InterruptibleThread(SharedThreadGroup thread_group, Callable callable):
+ _thread_group(thread_group),
+ _callable(callable)
{
- _done = false;
_wait_msg = true;
_wait_ack = true;
_mutex.lock();
@@ -64,7 +65,7 @@ namespace gnuradio
{
{
boost::mutex::scoped_lock lock(_mutex);
- _done = true;
+ _callable = Callable();
}
_thread->interrupt();
_thread->join();
@@ -73,7 +74,7 @@ namespace gnuradio
inline void call(void)
{
boost::mutex::scoped_lock lock(_mutex);
- if (_done) return;
+ if (not _callable) return;
_wait_msg = false;
_notify(lock);
while (_wait_ack) _cond.wait(lock);
@@ -90,7 +91,7 @@ namespace gnuradio
{
while (_wait_msg) _cond.wait(lock);
_wait_msg = true;
- *ret = block->Work(*input_items, *output_items);
+ _callable();
_wait_ack = false;
_notify(lock);
}
@@ -107,7 +108,7 @@ namespace gnuradio
{
std::cerr << "InterruptibleThread threw unknown exception" << std::endl;
}
- _done = true;
+ _callable = Callable();
_wait_ack = false;
_notify(lock);
}
@@ -120,19 +121,13 @@ namespace gnuradio
lock.lock();
}
- //shared work variables
- Block *block;
- int *ret;
- Block::InputItems *input_items;
- Block::OutputItems *output_items;
-
//thread locking mechanisms
- bool _done;
bool _wait_msg;
bool _wait_ack;
boost::mutex _mutex;
boost::condition_variable _cond;
SharedThreadGroup _thread_group;
+ Callable _callable;
boost::thread *_thread;
};