summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gnuradio-core/gnuradio-core.conf4
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h23
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc17
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_pdu.py3
4 files changed, 41 insertions, 6 deletions
diff --git a/gnuradio-core/gnuradio-core.conf b/gnuradio-core/gnuradio-core.conf
index 70eb00236..d575d1dc8 100644
--- a/gnuradio-core/gnuradio-core.conf
+++ b/gnuradio-core/gnuradio-core.conf
@@ -6,5 +6,9 @@
verbose = False
+# The maximum number of messages a block will store up before pruning
+# the queue by popping messages from the front.
+max_messages = 100
+
[PerfCounters]
on = False
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index b4935d8ac..024159c4c 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -106,6 +106,13 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_
void set_color(vcolor color) { d_color = color; }
vcolor color() const { return d_color; }
+ /*!
+ * \brief Tests if there is a handler attached to port \p which_port
+ */
+ bool has_msg_handler(pmt::pmt_t which_port) {
+ return (d_msg_handlers.find(which_port) != d_msg_handlers.end());
+ }
+
/*
* This function is called by the runtime system to dispatch messages.
*
@@ -115,9 +122,10 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_
virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
{
// AA Update this
- if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
+ if(has_msg_handler(which_port)) { // Is there a handler?
d_msg_handlers[which_port](msg); // Yes, invoke it.
- };
+ }
+ }
// Message passing interface
pmt::pmt_t message_subscribers;
@@ -177,9 +185,18 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_
}
bool empty_p() {
bool rv = true;
- BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
+ BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) {
+ rv &= msg_queue[i.first].empty();
+ }
return rv;
}
+
+ //! How many messages in the queue?
+ size_t nmsgs(pmt::pmt_t which_port) {
+ if(msg_queue.find(which_port) == msg_queue.end())
+ throw std::runtime_error("port does not exist!");
+ return msg_queue[which_port].size();
+ }
//| Acquires and release the mutex
void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index cea374fac..679fd1512 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -22,6 +22,7 @@
#include <config.h>
#endif
#include <gr_tpb_thread_body.h>
+#include <gr_prefs.h>
#include <iostream>
#include <boost/thread.hpp>
#include <gruel/pmt.h>
@@ -41,6 +42,9 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
d->threaded = true;
d->thread = gruel::get_current_thread_id();
+ gr_prefs *p = gr_prefs::singleton();
+ size_t max_nmsgs = static_cast<size_t>(p->get_long("DEFAULT", "max_messages", 100));
+
// Set thread affinity if it was set before fg was started.
if(block->processor_affinity().size() > 0) {
gruel::thread_bind_to_processor(d->thread, block->processor_affinity());
@@ -54,9 +58,20 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue )
{
+ // Check if we have a message handler attached before getting
+ // any messages. This is mostly a protection for the unknown
+ // startup sequence of the threads.
+ if(block->has_msg_handler(i.first)) {
while ((msg = block->delete_head_nowait(i.first))){
- block->dispatch_msg(i.first,msg);
+ block->dispatch_msg(i.first,msg);
}
+ }
+ else {
+ // If we don't have a handler but are building up messages,
+ // prune the queue from the front to keep memory in check.
+ if(block->nmsgs(i.first) > max_nmsgs)
+ msg = block->delete_head_nowait(i.first);
+ }
}
d->d_tpb.clear_changed();
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py
index c1110c10b..098aabb4a 100755
--- a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py
@@ -63,7 +63,6 @@ class test_pdu(gr_unittest.TestCase):
#pmt.pmt_print(pi)
#print "Stream to PDU output ports: "
#pmt.pmt_print(po)
- time.sleep(0.1)
self.tb.connect(src, snk)
self.tb.connect(src, snk2)
@@ -84,7 +83,7 @@ class test_pdu(gr_unittest.TestCase):
src.to_basic_block()._post( port, msg )
while(dbg.num_messages() < 1):
- time.sleep(0.5)
+ time.sleep(0.1)
self.tb.stop()
self.tb.wait()