diff options
author | Johnathan Corgan | 2012-11-29 19:46:29 -0800 |
---|---|---|
committer | Johnathan Corgan | 2012-11-29 19:46:29 -0800 |
commit | 4478cb86a5dc9fccf66c2cdd5806419b70c3837e (patch) | |
tree | 8ba82695b249012761aff88c32c83731393a302c /gnuradio-core/src/lib/runtime | |
parent | 92cfb0240005675f4e7a55a81552f4c7a5128cd8 (diff) | |
download | gnuradio-4478cb86a5dc9fccf66c2cdd5806419b70c3837e.tar.gz gnuradio-4478cb86a5dc9fccf66c2cdd5806419b70c3837e.tar.bz2 gnuradio-4478cb86a5dc9fccf66c2cdd5806419b70c3837e.zip |
Adding PDU to tagged stream and tagged stream to PDU blocks along with QA python
non-stream-connected blocks still need a new thread context
Diffstat (limited to 'gnuradio-core/src/lib/runtime')
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_basic_block.cc | 22 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_basic_block.h | 21 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_basic_block.i | 3 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block.cc | 3 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block_registry.cc | 18 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block_registry.h | 10 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc | 4 |
7 files changed, 48 insertions, 33 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc index 3d08b63d1..7d2f275e8 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc @@ -52,6 +52,7 @@ gr_basic_block::gr_basic_block(const std::string &name, d_color(WHITE), message_subscribers(pmt::pmt_make_dict()) { + mutex.unlock(); s_ncurrently_allocated++; } @@ -130,7 +131,7 @@ void gr_basic_block::_post(pmt_t which_port, pmt_t msg) { insert_tail(which_port, msg); - //notify_msg(); + global_block_registry.notify_blk(alias()); } void @@ -151,8 +152,9 @@ gr_basic_block::delete_head_nowait(pmt::pmt_t which_port) { gruel::scoped_lock guard(mutex); - if (empty_p(which_port)) - return pmt_t(); + if (empty_p(which_port)){ + return pmt::pmt_t(); + } pmt_t m(msg_queue[which_port].front()); msg_queue[which_port].pop_front(); @@ -160,18 +162,4 @@ gr_basic_block::delete_head_nowait(pmt::pmt_t which_port) return m; } -/* - * Caller must already be holding the mutex - */ -pmt_t -gr_basic_block::delete_head_nowait_already_holding_mutex(pmt::pmt_t which_port) -{ - if (empty_p(which_port)) - return pmt_t(); - - pmt_t m(msg_queue[which_port].front()); - msg_queue[which_port].pop_front(); - - return m; -} diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h index 31026a2e4..2ee8161c1 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.h +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h @@ -115,7 +115,6 @@ protected: vcolor color() const { return d_color; } // Message passing interface - std::vector<pmt::pmt_t> message_inputs; pmt::pmt_t message_subscribers; public: @@ -139,15 +138,18 @@ public: void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target); void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target); - /*! - * Accept msg, place in queue, arrange for thread to be awakened if it's not already. - */ - void _post(pmt::pmt_t which_port, pmt::pmt_t msg); - + /*! + * Accept msg, place in queue, arrange for thread to be awakened if it's not already. + */ + void _post(pmt::pmt_t which_port, pmt::pmt_t msg); //! is the queue empty? //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); } - bool empty_p(pmt::pmt_t which_port) { return msg_queue[which_port].empty(); } + bool empty_p(pmt::pmt_t which_port) { + if(msg_queue.find(which_port) == msg_queue.end()) + throw std::runtime_error("port does not exist!"); + return msg_queue[which_port].empty(); + } bool empty_p() { bool rv = true; BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); } @@ -160,11 +162,6 @@ public: * \returns returns pmt at head of queue or pmt_t() if empty. */ pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port); - /*! - * \returns returns pmt at head of queue or pmt_t() if empty. - * Caller must already be holding the mutex - */ - pmt::pmt_t delete_head_nowait_already_holding_mutex( pmt::pmt_t which_port); msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){ return msg_queue[which_port].begin(); diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.i b/gnuradio-core/src/lib/runtime/gr_basic_block.i index 0a8473ba2..d6d6c3d16 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.i +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.i @@ -23,6 +23,8 @@ class gr_basic_block; typedef boost::shared_ptr<gr_basic_block> gr_basic_block_sptr; %template(gr_basic_block_sptr) boost::shared_ptr<gr_basic_block>; +%include "pmt_swig.i" +using namespace pmt; // support vectors of these... namespace std { @@ -45,6 +47,7 @@ public: bool check_topology (int ninputs, int noutputs); std::string alias(); void set_block_alias(std::string name); + void _post(pmt_t which_port, pmt_t msg); }; %rename(block_ncurrently_allocated) gr_basic_block_ncurrently_allocated; diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc index a88052ee0..dc77128a3 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_block.cc @@ -28,6 +28,7 @@ #include <gr_block_detail.h> #include <stdexcept> #include <iostream> +#include <gr_block_registry.h> gr_block::gr_block (const std::string &name, gr_io_signature_sptr input_signature, @@ -46,10 +47,12 @@ gr_block::gr_block (const std::string &name, d_max_output_buffer(std::max(output_signature->max_streams(),1), -1), d_min_output_buffer(std::max(output_signature->max_streams(),1), -1) { + global_block_registry.register_primitive(alias(), this); } gr_block::~gr_block () { + global_block_registry.unregister_primitive(alias()); } // stub implementation: 1:1 diff --git a/gnuradio-core/src/lib/runtime/gr_block_registry.cc b/gnuradio-core/src/lib/runtime/gr_block_registry.cc index 2478e0019..ff23d97eb 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_registry.cc +++ b/gnuradio-core/src/lib/runtime/gr_block_registry.cc @@ -1,5 +1,9 @@ #include <gr_basic_block.h> #include <gr_block_registry.h> +#include <gr_tpb_detail.h> +#include <gr_block_detail.h> +#include <gr_block.h> +#include <stdio.h> gr_block_registry global_block_registry; @@ -56,3 +60,17 @@ gr_basic_block_sptr gr_block_registry::block_lookup(pmt::pmt_t symbol){ } +void gr_block_registry::register_primitive(std::string blk, gr_block* ref){ + primitive_map[blk] = ref; +} + +void gr_block_registry::unregister_primitive(std::string blk){ + primitive_map.erase(primitive_map.find(blk)); +} + +void gr_block_registry::notify_blk(std::string blk){ + if(primitive_map.find(blk) == primitive_map.end()){ return; } + if(primitive_map[blk]->detail().get()) + primitive_map[blk]->detail()->d_tpb.notify_msg(); +} + diff --git a/gnuradio-core/src/lib/runtime/gr_block_registry.h b/gnuradio-core/src/lib/runtime/gr_block_registry.h index 8f1982984..6a2d939e5 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_registry.h +++ b/gnuradio-core/src/lib/runtime/gr_block_registry.h @@ -5,6 +5,7 @@ #ifndef GR_BASIC_BLOCK_H class gr_basic_block; +class gr_block; #endif class gr_block_registry { @@ -18,7 +19,11 @@ class gr_block_registry { void register_symbolic_name(gr_basic_block* block, std::string name); gr_basic_block_sptr block_lookup(pmt::pmt_t symbol); - + + void register_primitive(std::string blk, gr_block* ref); + void unregister_primitive(std::string blk); + void notify_blk(std::string blk); + private: //typedef std::map< long, gr_basic_block_sptr > blocksubmap_t; @@ -27,7 +32,8 @@ class gr_block_registry { blockmap_t d_map; pmt::pmt_t d_ref_map; - + std::map< std::string, gr_block*> primitive_map; + }; extern gr_block_registry global_block_registry; diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc index 1bd3014ad..ff2afca10 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc +++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc @@ -80,7 +80,7 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item // handle all pending messages BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue ) { - while ((msg = block->delete_head_nowait_already_holding_mutex(i.first))){ + while ((msg = block->delete_head_nowait(i.first))){ guard.unlock(); // release lock while processing msg block->dispatch_msg(i.first, msg); guard.lock(); @@ -103,7 +103,7 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item // handle all pending messages BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue ) { - while ((msg = block->delete_head_nowait_already_holding_mutex(i.first))){ + while ((msg = block->delete_head_nowait(i.first))){ guard.unlock(); // release lock while processing msg block->dispatch_msg(i.first,msg); guard.lock(); |