diff options
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_basic_block.h | 2 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_msg_accepter.cc | 10 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_msg_accepter.h | 2 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_tpb_detail.h | 6 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc | 35 |
5 files changed, 40 insertions, 15 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h index 5d5b8cbc7..b8797fdc6 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.h +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h @@ -40,7 +40,7 @@ * signal processing functions. */ -class gr_basic_block : gr_msg_accepter, public boost::enable_shared_from_this<gr_basic_block> +class gr_basic_block : public gr_msg_accepter, public boost::enable_shared_from_this<gr_basic_block> { protected: friend class gr_flowgraph; diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc index ebe11870a..50b41df88 100644 --- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc +++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc @@ -45,12 +45,12 @@ gr_msg_accepter::post(pmt_t msg) // Let parent class do whatever it would have gruel::msg_accepter_msgq::post(msg); - // Notify this block's scheduler a message is pending + // Notify derived class, handled case by case gr_block *p = dynamic_cast<gr_block *>(this); - if (p) + if (p) { p->detail()->d_tpb.notify_msg(); - else { - // got here somehow with a non-gr_block - throw std::runtime_error("gr_msg_accepter::post() - invalid derived class"); + return; } + + // Test for other derived classes and handle } diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h index 8ce8d1d9e..2073e7ff1 100644 --- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h +++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h @@ -28,7 +28,7 @@ * \brief Accepts messages and inserts them into a message queue, then notifies * subclass gr_basic_block there is a message pending. */ -class gr_msg_accepter : gruel::msg_accepter_msgq +class gr_msg_accepter : public gruel::msg_accepter_msgq { public: gr_msg_accepter(gruel::msg_queue_sptr msgq); diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h index 29101d730..a1df55806 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h +++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h @@ -35,10 +35,9 @@ struct gr_tpb_detail { gruel::condition_variable input_cond; bool output_changed; gruel::condition_variable output_cond; - bool msg_pending; gr_tpb_detail() - : input_changed(false), output_changed(false), msg_pending(false) { } + : input_changed(false), output_changed(false) { } //! Called by us to tell all our upstream blocks that their output may have changed. void notify_upstream(gr_block_detail *d); @@ -61,7 +60,8 @@ struct gr_tpb_detail { void notify_msg() { gruel::scoped_lock guard(mutex); - msg_pending = true; + + // Just wake up thread if BLKD_IN or BLKD_OUT input_cond.notify_one(); output_cond.notify_one(); } diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc index 458b16d64..c601b588c 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc +++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc @@ -24,17 +24,23 @@ #include <gr_tpb_thread_body.h> #include <iostream> #include <boost/thread.hpp> +#include <gruel/pmt.h> + +using namespace pmt; gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block) : d_exec(block) { // std::cerr << "gr_tpb_thread_body: " << block << std::endl; - gr_block_detail *d = block->detail().get(); + gr_block_detail *d = block->detail().get(); gr_block_executor::state s; while (1){ boost::this_thread::interruption_point(); + + while (!block->msg_queue()->empty_p()) + block->handle_msg(block->msg_queue()->delete_head_nowait()); d->d_tpb.clear_changed(); s = d_exec.run_one_iteration(); @@ -53,18 +59,37 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block) return; case gr_block_executor::BLKD_IN: // Wait for input. + while (!d->d_tpb.input_changed) { + boost::this_thread::interruption_point(); gruel::scoped_lock guard(d->d_tpb.mutex); - while(!d->d_tpb.input_changed) - d->d_tpb.input_cond.wait(guard); + + // Block then wake on input_changed or msg arrived + while(!d->d_tpb.input_changed && !block->msg_queue()->empty_p()) + d->d_tpb.input_cond.wait(guard); + + // Run msgq while unlocked + guard.unlock(); + while (!block->msg_queue()->empty_p()) + block->handle_msg(block->msg_queue()->delete_head_nowait()); } break; + case gr_block_executor::BLKD_OUT: // Wait for output buffer space. + while (!d->d_tpb.output_changed) { + boost::this_thread::interruption_point(); gruel::scoped_lock guard(d->d_tpb.mutex); - while(!d->d_tpb.output_changed) - d->d_tpb.output_cond.wait(guard); + + // Block then wake on output_changed or msg arrived + while(!d->d_tpb.output_changed && !block->msg_queue()->empty_p()) + d->d_tpb.output_cond.wait(guard); + + // Run msgq while unlocked + guard.unlock(); + while (!block->msg_queue()->empty_p()) + block->handle_msg(block->msg_queue()->delete_head_nowait()); } break; |