diff options
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_basic_block.cc | 129 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_basic_block.h | 17 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_basic_block.i | 2 | ||||
-rwxr-xr-x | gnuradio-core/src/python/gnuradio/gr/qa_pdu.py | 21 |
4 files changed, 127 insertions, 42 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc index 2c77c1c0c..0f7875a12 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc @@ -77,56 +77,101 @@ gr_basic_block::set_block_alias(std::string name) // ** Message passing interface ** // - register a new input message port -void gr_basic_block::message_port_register_in(pmt::pmt_t port_id){ - msg_queue[port_id] = msg_queue_t(); - msg_queue_ready[port_id] = boost::shared_ptr<boost::condition_variable>(new boost::condition_variable()); - } +void +gr_basic_block::message_port_register_in(pmt::pmt_t port_id) +{ + if(!pmt::pmt_is_symbol(port_id)) { + throw std::runtime_error("message_port_register_in: bad port id"); + } + msg_queue[port_id] = msg_queue_t(); + msg_queue_ready[port_id] = boost::shared_ptr<boost::condition_variable>(new boost::condition_variable()); +} + +pmt::pmt_t +gr_basic_block::message_ports_in() +{ + pmt::pmt_t port_names = pmt::pmt_make_vector(msg_queue.size(), pmt::PMT_NIL); + msg_queue_map_itr itr = msg_queue.begin(); + for(size_t i = 0; i < msg_queue.size(); i++) { + pmt::pmt_vector_set(port_names, i, (*itr).first); + itr++; + } + return port_names; +} // - register a new output message port -void gr_basic_block::message_port_register_out(pmt::pmt_t port_id){ - if(!pmt::pmt_is_symbol(port_id)){ throw std::runtime_error("bad port id"); } - if(pmt::pmt_dict_has_key(message_subscribers, port_id)){ throw std::runtime_error("port already in use"); } - message_subscribers = pmt::pmt_dict_add(message_subscribers, port_id, pmt::PMT_NIL); - } +void +gr_basic_block::message_port_register_out(pmt::pmt_t port_id) +{ + if(!pmt::pmt_is_symbol(port_id)) { + throw std::runtime_error("message_port_register_out: bad port id"); + } + if(pmt::pmt_dict_has_key(message_subscribers, port_id)) { + throw std::runtime_error("message_port_register_out: port already in use"); + } + message_subscribers = pmt::pmt_dict_add(message_subscribers, port_id, pmt::PMT_NIL); +} + +pmt::pmt_t +gr_basic_block::message_ports_out() +{ + size_t len = pmt::pmt_length(message_subscribers); + pmt::pmt_t port_names = pmt::pmt_make_vector(len, pmt::PMT_NIL); + pmt::pmt_t keys = pmt::pmt_dict_keys(message_subscribers); + for(size_t i = 0; i < len; i++) { + pmt::pmt_vector_set(port_names, i, pmt::pmt_nth(i, keys)); + } + return port_names; +} // - publish a message on a message port -void gr_basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg){ - if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ throw std::runtime_error("port does not exist"); } - pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); - // iterate through subscribers on port - while( pmt::pmt_is_pair(currlist) ){ - pmt::pmt_t target = pmt::pmt_car(currlist); - - pmt::pmt_t block = pmt::pmt_car(target); - pmt::pmt_t port = pmt::pmt_cdr(target); +void gr_basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg) +{ + if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) { + throw std::runtime_error("port does not exist"); + } + + pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers, port_id, pmt::PMT_NIL); + // iterate through subscribers on port + while(pmt::pmt_is_pair(currlist)) { + pmt::pmt_t target = pmt::pmt_car(currlist); + + pmt::pmt_t block = pmt::pmt_car(target); + pmt::pmt_t port = pmt::pmt_cdr(target); - currlist = pmt::pmt_cdr(currlist); - gr_basic_block_sptr blk = global_block_registry.block_lookup(block); - //blk->post(msg); - blk->post(port, msg); - } - } + currlist = pmt::pmt_cdr(currlist); + gr_basic_block_sptr blk = global_block_registry.block_lookup(block); + //blk->post(msg); + blk->post(port, msg); + } +} // - subscribe to a message port -void gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){ - if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ - std::stringstream ss; - ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl; - throw std::runtime_error(ss.str()); - } - pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); - message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target)); - } +void gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target) +{ + if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) { + std::stringstream ss; + ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) + << "\" on block: " << pmt::pmt_write_string(target) << std::endl; + throw std::runtime_error(ss.str()); + } + + pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); + message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target)); +} -void gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){ - if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ - std::stringstream ss; - ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl; - throw std::runtime_error(ss.str()); - } - pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); - message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_rm(currlist,target)); - } +void gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target) +{ + if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) { + std::stringstream ss; + ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) + << "\" on block: " << pmt::pmt_write_string(target) << std::endl; + throw std::runtime_error(ss.str()); + } + + pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); + message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_rm(currlist,target)); +} void gr_basic_block::_post(pmt_t which_port, pmt_t msg) diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h index e0fd5d2af..00e9c2192 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.h +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h @@ -72,6 +72,7 @@ private: typedef std::deque<pmt::pmt_t> msg_queue_t; typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t; + typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr; msg_queue_map_t msg_queue; // boost::condition_variable msg_queue_ready; std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready; @@ -143,6 +144,22 @@ public: void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target); /*! + * \brief Get input message port names. + * + * Returns the available input message ports for a block. The + * return object is a PMT vector that is filled with PMT symbols. + */ + pmt::pmt_t message_ports_in(); + + /*! + * \brief Get output message port names. + * + * Returns the available output message ports for a block. The + * return object is a PMT vector that is filled with PMT symbols. + */ + pmt::pmt_t message_ports_out(); + + /*! * Accept msg, place in queue, arrange for thread to be awakened if it's not already. */ void _post(pmt::pmt_t which_port, pmt::pmt_t msg); diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.i b/gnuradio-core/src/lib/runtime/gr_basic_block.i index d6d6c3d16..62f16462d 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.i +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.i @@ -48,6 +48,8 @@ public: std::string alias(); void set_block_alias(std::string name); void _post(pmt_t which_port, pmt_t msg); + pmt_t message_ports_in(); + pmt_t message_ports_out(); }; %rename(block_ncurrently_allocated) gr_basic_block_ncurrently_allocated; diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py index da1331d96..bf02d12c1 100755 --- a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py +++ b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py @@ -43,6 +43,27 @@ class test_pdu(gr_unittest.TestCase): dbg = gr.message_debug() + # Test that the right number of ports exist. + pi = dbg.message_ports_in() + po = dbg.message_ports_out() + self.assertEqual(pmt.pmt_length(pi), 2) + self.assertEqual(pmt.pmt_length(po), 0) + + pi = snk3.message_ports_in() + po = snk3.message_ports_out() + self.assertEqual(pmt.pmt_length(pi), 0) + self.assertEqual(pmt.pmt_length(po), 1) + + #print "Message Debug input ports: " + #pmt.pmt_print(pi) + #print "Message Debug output ports: " + #pmt.pmt_print(po) + + #print "Stream to PDU input ports: " + #pmt.pmt_print(pi) + #print "Stream to PDU output ports: " + #pmt.pmt_print(po) + self.tb.connect(src, snk) self.tb.connect(src, snk2) self.tb.connect(src, snk3) |