diff options
author | Josh Blum | 2012-09-11 19:24:52 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-11 19:24:52 -0700 |
commit | 3e820aba9c4ef292508c00b686d54eaca040c200 (patch) | |
tree | 1ab8bee0b59d22a1f267f9c7e6bba69bf0d7bd65 | |
parent | 02f896e5a66575df4f16593584a7bc6892adf81c (diff) | |
download | sandhi-3e820aba9c4ef292508c00b686d54eaca040c200.tar.gz sandhi-3e820aba9c4ef292508c00b686d54eaca040c200.tar.bz2 sandhi-3e820aba9c4ef292508c00b686d54eaca040c200.zip |
interruptible_thread working on tests including udp source
-rw-r--r-- | lib/block_handlers.cpp | 10 | ||||
-rw-r--r-- | lib/block_task.cpp | 6 | ||||
-rw-r--r-- | lib/gras_impl/interruptible_thread.hpp | 59 | ||||
-rw-r--r-- | lib/top_block.cpp | 9 |
4 files changed, 54 insertions, 30 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 14cc2c8..3e861be 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -16,6 +16,7 @@ #include "element_impl.hpp" #include <gras_impl/vector_utils.hpp> +#include <boost/make_shared.hpp> using namespace gnuradio; @@ -59,10 +60,17 @@ void ElementImpl::handle_block_msg( } //store the topology's thread group + //erase any potentially old lingering threads + //spawn a new thread if this block is a source if (msg.type() == typeid(SharedThreadGroup)) { this->thread_group = msg.cast<SharedThreadGroup>(); - this->interruptible_thread = boost::shared_ptr<InterruptibleThread>(new InterruptibleThread(this->thread_group)); + this->interruptible_thread.reset(); //erase old one + if (task_iface.get_num_inputs() == 0) //its a source + { + this->interruptible_thread = + boost::make_shared<InterruptibleThread>(this->thread_group); + } return; } diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 33ffbc1..3556bac 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -86,7 +86,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); - const bool is_source = (num_inputs == 0); + //const bool is_source = (num_inputs == 0); //const bool is_sink = (num_outputs == 0); this->work_io_ptr_mask = 0; //reset @@ -182,8 +182,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) if (this->enable_fixed_rate) work_noutput_items = std::min( work_noutput_items, myulround((num_input_items)*this->relative_rate)); this->work_task_iface = task_iface; - int ret = 0; - if (is_source) + int ret = -1; + if (this->interruptible_thread) { this->interruptible_thread->block = block_ptr; this->interruptible_thread->ret = &ret; diff --git a/lib/gras_impl/interruptible_thread.hpp b/lib/gras_impl/interruptible_thread.hpp index 10b1286..a480e78 100644 --- a/lib/gras_impl/interruptible_thread.hpp +++ b/lib/gras_impl/interruptible_thread.hpp @@ -24,6 +24,12 @@ #include <boost/thread/mutex.hpp> #include <boost/thread/condition_variable.hpp> +//--------------------------- ATTENTION !!! --------------------------// +//-- The author does not intend to have 2 threading platforms. +//-- This file and its invocations should be removed when source blocks +//-- in the tree can yield the thread context without producing. +//--------------------------------------------------------------------// + /*! * This is the only place you will find any threading stuff. * The entire point here is that the source's in gnuradio @@ -47,7 +53,7 @@ namespace gnuradio { _done = false; _wait_msg = true; - _wait_ack = false; + _wait_ack = true; _mutex.lock(); _thread = _thread_group->create_thread(boost::bind(&InterruptibleThread::run, this)); _mutex.lock(); @@ -67,13 +73,9 @@ namespace gnuradio inline void call(void) { boost::mutex::scoped_lock lock(_mutex); - if (_done) - { - *ret = -1; - return; - } + if (_done) return; _wait_msg = false; - _cond.notify_one(); + _notify(lock); while (_wait_ack) _cond.wait(lock); _wait_ack = true; } @@ -82,31 +84,40 @@ namespace gnuradio { _mutex.unlock(); //spawn barrier unlock boost::mutex::scoped_lock lock(_mutex); - while (not _done and not boost::this_thread::interruption_requested()) + try { - try + while (not boost::this_thread::interruption_requested()) { while (_wait_msg) _cond.wait(lock); _wait_msg = true; *ret = block->Work(*input_items, *output_items); + _wait_ack = false; + _notify(lock); } - catch(const std::exception &ex) - { - std::cerr << "InterruptibleThread threw " << ex.what() << std::endl; - _done = true; - } - catch(const boost::thread_interrupted &ex) - { - _done = true; - } - catch(...) - { - _done = true; - } - _wait_ack = false; - _cond.notify_one(); + } + catch(const std::exception &ex) + { + std::cerr << "InterruptibleThread threw " << ex.what() << std::endl; + } + catch(const boost::thread_interrupted &) + { + //normal exit is thread_interrupted + } + catch(...) + { + std::cerr << "InterruptibleThread threw unknown exception" << std::endl; } _done = true; + _wait_ack = false; + _notify(lock); + } + + template <typename Lock> + void _notify(Lock &lock) + { + lock.unlock(); + _cond.notify_one(); + lock.lock(); } //shared work variables diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 440b7fa..306e152 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -91,7 +91,10 @@ void TopBlock::start(void) void TopBlock::stop(void) { + //interrupt these "special" threads (*this)->thread_group->interrupt_all(); + + //message all blocks to mark done TopBlockMessage event; event.what = TopBlockMessage::INERT; (*this)->executor.post_msg(event); @@ -105,11 +108,13 @@ void TopBlock::run(void) void TopBlock::wait(void) { + //We do not need to join "special" threads; + //the token mechainism will do just fine. //(*this)->thread_group->join_all(); + + //wait for all blocks to release the token while (not (*this)->token.unique()) { boost::this_thread::yield(); - //sleep(1); - //VAR((*this)->token.use_count()); } } |