diff options
-rw-r--r-- | gnuradio-core/gnuradio-core.conf | 4 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_basic_block.h | 23 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc | 17 | ||||
-rwxr-xr-x | gnuradio-core/src/python/gnuradio/gr/qa_pdu.py | 3 |
4 files changed, 41 insertions, 6 deletions
diff --git a/gnuradio-core/gnuradio-core.conf b/gnuradio-core/gnuradio-core.conf index 70eb00236..d575d1dc8 100644 --- a/gnuradio-core/gnuradio-core.conf +++ b/gnuradio-core/gnuradio-core.conf @@ -6,5 +6,9 @@ verbose = False +# The maximum number of messages a block will store up before pruning +# the queue by popping messages from the front. +max_messages = 100 + [PerfCounters] on = False diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h index b4935d8ac..024159c4c 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.h +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h @@ -106,6 +106,13 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_ void set_color(vcolor color) { d_color = color; } vcolor color() const { return d_color; } + /*! + * \brief Tests if there is a handler attached to port \p which_port + */ + bool has_msg_handler(pmt::pmt_t which_port) { + return (d_msg_handlers.find(which_port) != d_msg_handlers.end()); + } + /* * This function is called by the runtime system to dispatch messages. * @@ -115,9 +122,10 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_ virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) { // AA Update this - if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler? + if(has_msg_handler(which_port)) { // Is there a handler? d_msg_handlers[which_port](msg); // Yes, invoke it. - }; + } + } // Message passing interface pmt::pmt_t message_subscribers; @@ -177,9 +185,18 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_ } bool empty_p() { bool rv = true; - BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); } + BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) { + rv &= msg_queue[i.first].empty(); + } return rv; } + + //! How many messages in the queue? + size_t nmsgs(pmt::pmt_t which_port) { + if(msg_queue.find(which_port) == msg_queue.end()) + throw std::runtime_error("port does not exist!"); + return msg_queue[which_port].size(); + } //| Acquires and release the mutex void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg); diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc index cea374fac..679fd1512 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc +++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc @@ -22,6 +22,7 @@ #include <config.h> #endif #include <gr_tpb_thread_body.h> +#include <gr_prefs.h> #include <iostream> #include <boost/thread.hpp> #include <gruel/pmt.h> @@ -41,6 +42,9 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item d->threaded = true; d->thread = gruel::get_current_thread_id(); + gr_prefs *p = gr_prefs::singleton(); + size_t max_nmsgs = static_cast<size_t>(p->get_long("DEFAULT", "max_messages", 100)); + // Set thread affinity if it was set before fg was started. if(block->processor_affinity().size() > 0) { gruel::thread_bind_to_processor(d->thread, block->processor_affinity()); @@ -54,9 +58,20 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue ) { + // Check if we have a message handler attached before getting + // any messages. This is mostly a protection for the unknown + // startup sequence of the threads. + if(block->has_msg_handler(i.first)) { while ((msg = block->delete_head_nowait(i.first))){ - block->dispatch_msg(i.first,msg); + block->dispatch_msg(i.first,msg); } + } + else { + // If we don't have a handler but are building up messages, + // prune the queue from the front to keep memory in check. + if(block->nmsgs(i.first) > max_nmsgs) + msg = block->delete_head_nowait(i.first); + } } d->d_tpb.clear_changed(); diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py index c1110c10b..098aabb4a 100755 --- a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py +++ b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py @@ -63,7 +63,6 @@ class test_pdu(gr_unittest.TestCase): #pmt.pmt_print(pi) #print "Stream to PDU output ports: " #pmt.pmt_print(po) - time.sleep(0.1) self.tb.connect(src, snk) self.tb.connect(src, snk2) @@ -84,7 +83,7 @@ class test_pdu(gr_unittest.TestCase): src.to_basic_block()._post( port, msg ) while(dbg.num_messages() < 1): - time.sleep(0.5) + time.sleep(0.1) self.tb.stop() self.tb.wait() |