summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h2
-rw-r--r--gnuradio-core/src/lib/runtime/gr_msg_accepter.cc10
-rw-r--r--gnuradio-core/src/lib/runtime/gr_msg_accepter.h2
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_detail.h6
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc35
5 files changed, 40 insertions, 15 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index 5d5b8cbc7..b8797fdc6 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -40,7 +40,7 @@
* signal processing functions.
*/
-class gr_basic_block : gr_msg_accepter, public boost::enable_shared_from_this<gr_basic_block>
+class gr_basic_block : public gr_msg_accepter, public boost::enable_shared_from_this<gr_basic_block>
{
protected:
friend class gr_flowgraph;
diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
index ebe11870a..50b41df88 100644
--- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
+++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
@@ -45,12 +45,12 @@ gr_msg_accepter::post(pmt_t msg)
// Let parent class do whatever it would have
gruel::msg_accepter_msgq::post(msg);
- // Notify this block's scheduler a message is pending
+ // Notify derived class, handled case by case
gr_block *p = dynamic_cast<gr_block *>(this);
- if (p)
+ if (p) {
p->detail()->d_tpb.notify_msg();
- else {
- // got here somehow with a non-gr_block
- throw std::runtime_error("gr_msg_accepter::post() - invalid derived class");
+ return;
}
+
+ // Test for other derived classes and handle
}
diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
index 8ce8d1d9e..2073e7ff1 100644
--- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
+++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
@@ -28,7 +28,7 @@
* \brief Accepts messages and inserts them into a message queue, then notifies
* subclass gr_basic_block there is a message pending.
*/
-class gr_msg_accepter : gruel::msg_accepter_msgq
+class gr_msg_accepter : public gruel::msg_accepter_msgq
{
public:
gr_msg_accepter(gruel::msg_queue_sptr msgq);
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
index 29101d730..a1df55806 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
@@ -35,10 +35,9 @@ struct gr_tpb_detail {
gruel::condition_variable input_cond;
bool output_changed;
gruel::condition_variable output_cond;
- bool msg_pending;
gr_tpb_detail()
- : input_changed(false), output_changed(false), msg_pending(false) { }
+ : input_changed(false), output_changed(false) { }
//! Called by us to tell all our upstream blocks that their output may have changed.
void notify_upstream(gr_block_detail *d);
@@ -61,7 +60,8 @@ struct gr_tpb_detail {
void notify_msg()
{
gruel::scoped_lock guard(mutex);
- msg_pending = true;
+
+ // Just wake up thread if BLKD_IN or BLKD_OUT
input_cond.notify_one();
output_cond.notify_one();
}
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index 458b16d64..c601b588c 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -24,17 +24,23 @@
#include <gr_tpb_thread_body.h>
#include <iostream>
#include <boost/thread.hpp>
+#include <gruel/pmt.h>
+
+using namespace pmt;
gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
: d_exec(block)
{
// std::cerr << "gr_tpb_thread_body: " << block << std::endl;
- gr_block_detail *d = block->detail().get();
+ gr_block_detail *d = block->detail().get();
gr_block_executor::state s;
while (1){
boost::this_thread::interruption_point();
+
+ while (!block->msg_queue()->empty_p())
+ block->handle_msg(block->msg_queue()->delete_head_nowait());
d->d_tpb.clear_changed();
s = d_exec.run_one_iteration();
@@ -53,18 +59,37 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
return;
case gr_block_executor::BLKD_IN: // Wait for input.
+ while (!d->d_tpb.input_changed)
{
+ boost::this_thread::interruption_point();
gruel::scoped_lock guard(d->d_tpb.mutex);
- while(!d->d_tpb.input_changed)
- d->d_tpb.input_cond.wait(guard);
+
+ // Block then wake on input_changed or msg arrived
+ while(!d->d_tpb.input_changed && !block->msg_queue()->empty_p())
+ d->d_tpb.input_cond.wait(guard);
+
+ // Run msgq while unlocked
+ guard.unlock();
+ while (!block->msg_queue()->empty_p())
+ block->handle_msg(block->msg_queue()->delete_head_nowait());
}
break;
+
case gr_block_executor::BLKD_OUT: // Wait for output buffer space.
+ while (!d->d_tpb.output_changed)
{
+ boost::this_thread::interruption_point();
gruel::scoped_lock guard(d->d_tpb.mutex);
- while(!d->d_tpb.output_changed)
- d->d_tpb.output_cond.wait(guard);
+
+ // Block then wake on output_changed or msg arrived
+ while(!d->d_tpb.output_changed && !block->msg_queue()->empty_p())
+ d->d_tpb.output_cond.wait(guard);
+
+ // Run msgq while unlocked
+ guard.unlock();
+ while (!block->msg_queue()->empty_p())
+ block->handle_msg(block->msg_queue()->delete_head_nowait());
}
break;