diff options
author | Josh Blum | 2012-09-11 19:44:43 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-11 19:44:43 -0700 |
commit | 80ad579996caebe4150a00b727fcd568730be53f (patch) | |
tree | a1b5fdf869ff54e62c703f294af4a2c6a43c7e4b | |
parent | 3e820aba9c4ef292508c00b686d54eaca040c200 (diff) | |
download | sandhi-80ad579996caebe4150a00b727fcd568730be53f.tar.gz sandhi-80ad579996caebe4150a00b727fcd568730be53f.tar.bz2 sandhi-80ad579996caebe4150a00b727fcd568730be53f.zip |
simplify interruptible_thread use, give it a bound callable object
-rw-r--r-- | lib/block_handlers.cpp | 6 | ||||
-rw-r--r-- | lib/block_task.cpp | 16 | ||||
-rw-r--r-- | lib/element_impl.hpp | 7 | ||||
-rw-r--r-- | lib/gras_impl/interruptible_thread.hpp | 25 |
4 files changed, 27 insertions, 27 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 3e861be..5811acc 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -17,6 +17,7 @@ #include "element_impl.hpp" #include <gras_impl/vector_utils.hpp> #include <boost/make_shared.hpp> +#include <boost/bind.hpp> using namespace gnuradio; @@ -68,8 +69,9 @@ void ElementImpl::handle_block_msg( this->interruptible_thread.reset(); //erase old one if (task_iface.get_num_inputs() == 0) //its a source { - this->interruptible_thread = - boost::make_shared<InterruptibleThread>(this->thread_group); + this->interruptible_thread = boost::make_shared<InterruptibleThread>( + this->thread_group, boost::bind(&ElementImpl::task_work, this) + ); } return; } diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 3556bac..b0fa45a 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -182,23 +182,19 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) if (this->enable_fixed_rate) work_noutput_items = std::min( work_noutput_items, myulround((num_input_items)*this->relative_rate)); this->work_task_iface = task_iface; - int ret = -1; + this->work_ret = -1; if (this->interruptible_thread) { - this->interruptible_thread->block = block_ptr; - this->interruptible_thread->ret = &ret; - this->interruptible_thread->input_items = &this->input_items; - this->interruptible_thread->output_items = &this->output_items; this->interruptible_thread->call(); } else { - ret = block_ptr->Work(this->input_items, this->output_items); + this->task_work(); } this->work_task_iface.reset(); - const size_t noutput_items = size_t(ret); + const size_t noutput_items = size_t(work_ret); - if (ret == Block::WORK_DONE) + if (work_ret == Block::WORK_DONE) { this->mark_done(task_iface); return; @@ -210,7 +206,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) bool input_allows_flush = true; for (size_t i = 0; i < num_inputs; i++) { - ASSERT(enable_fixed_rate or ret != Block::WORK_CALLED_PRODUCE); + ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE); const size_t items = (enable_fixed_rate)? (myulround((noutput_items/this->relative_rate))) : this->consume_items[i]; this->consume_items[i] = 0; @@ -224,7 +220,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //------------------------------------------------------------------ for (size_t i = 0; i < num_outputs; i++) { - const size_t items = (ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items; + const size_t items = (work_ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items; this->produce_items[i] = 0; if (items == 0) continue; diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index b4f2c1b..a9f272b 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -121,6 +121,13 @@ struct ElementImpl void conclusion(const tsbe::TaskInterface &task_iface, const bool); void buffer_returner(const size_t index, SBuffer &buffer); + //work helpers + int work_ret; + inline void task_work(void) + { + this->work_ret = block_ptr->Work(this->input_items, this->output_items); + } + //is the fg running? enum { diff --git a/lib/gras_impl/interruptible_thread.hpp b/lib/gras_impl/interruptible_thread.hpp index a480e78..1e99111 100644 --- a/lib/gras_impl/interruptible_thread.hpp +++ b/lib/gras_impl/interruptible_thread.hpp @@ -17,9 +17,9 @@ #ifndef INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP #define INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP -#include <gnuradio/block.hpp> #include <gras_impl/debug.hpp> #include <boost/bind.hpp> +#include <boost/function.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition_variable.hpp> @@ -47,11 +47,12 @@ namespace gnuradio struct InterruptibleThread { + typedef boost::function<void(void)> Callable; - InterruptibleThread(SharedThreadGroup thread_group): - _thread_group(thread_group) + InterruptibleThread(SharedThreadGroup thread_group, Callable callable): + _thread_group(thread_group), + _callable(callable) { - _done = false; _wait_msg = true; _wait_ack = true; _mutex.lock(); @@ -64,7 +65,7 @@ namespace gnuradio { { boost::mutex::scoped_lock lock(_mutex); - _done = true; + _callable = Callable(); } _thread->interrupt(); _thread->join(); @@ -73,7 +74,7 @@ namespace gnuradio inline void call(void) { boost::mutex::scoped_lock lock(_mutex); - if (_done) return; + if (not _callable) return; _wait_msg = false; _notify(lock); while (_wait_ack) _cond.wait(lock); @@ -90,7 +91,7 @@ namespace gnuradio { while (_wait_msg) _cond.wait(lock); _wait_msg = true; - *ret = block->Work(*input_items, *output_items); + _callable(); _wait_ack = false; _notify(lock); } @@ -107,7 +108,7 @@ namespace gnuradio { std::cerr << "InterruptibleThread threw unknown exception" << std::endl; } - _done = true; + _callable = Callable(); _wait_ack = false; _notify(lock); } @@ -120,19 +121,13 @@ namespace gnuradio lock.lock(); } - //shared work variables - Block *block; - int *ret; - Block::InputItems *input_items; - Block::OutputItems *output_items; - //thread locking mechanisms - bool _done; bool _wait_msg; bool _wait_ack; boost::mutex _mutex; boost::condition_variable _cond; SharedThreadGroup _thread_group; + Callable _callable; boost::thread *_thread; }; |