summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.cc129
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h17
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.i2
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_pdu.py21
4 files changed, 127 insertions, 42 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index 2c77c1c0c..0f7875a12 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -77,56 +77,101 @@ gr_basic_block::set_block_alias(std::string name)
// ** Message passing interface **
// - register a new input message port
-void gr_basic_block::message_port_register_in(pmt::pmt_t port_id){
- msg_queue[port_id] = msg_queue_t();
- msg_queue_ready[port_id] = boost::shared_ptr<boost::condition_variable>(new boost::condition_variable());
- }
+void
+gr_basic_block::message_port_register_in(pmt::pmt_t port_id)
+{
+ if(!pmt::pmt_is_symbol(port_id)) {
+ throw std::runtime_error("message_port_register_in: bad port id");
+ }
+ msg_queue[port_id] = msg_queue_t();
+ msg_queue_ready[port_id] = boost::shared_ptr<boost::condition_variable>(new boost::condition_variable());
+}
+
+pmt::pmt_t
+gr_basic_block::message_ports_in()
+{
+ pmt::pmt_t port_names = pmt::pmt_make_vector(msg_queue.size(), pmt::PMT_NIL);
+ msg_queue_map_itr itr = msg_queue.begin();
+ for(size_t i = 0; i < msg_queue.size(); i++) {
+ pmt::pmt_vector_set(port_names, i, (*itr).first);
+ itr++;
+ }
+ return port_names;
+}
// - register a new output message port
-void gr_basic_block::message_port_register_out(pmt::pmt_t port_id){
- if(!pmt::pmt_is_symbol(port_id)){ throw std::runtime_error("bad port id"); }
- if(pmt::pmt_dict_has_key(message_subscribers, port_id)){ throw std::runtime_error("port already in use"); }
- message_subscribers = pmt::pmt_dict_add(message_subscribers, port_id, pmt::PMT_NIL);
- }
+void
+gr_basic_block::message_port_register_out(pmt::pmt_t port_id)
+{
+ if(!pmt::pmt_is_symbol(port_id)) {
+ throw std::runtime_error("message_port_register_out: bad port id");
+ }
+ if(pmt::pmt_dict_has_key(message_subscribers, port_id)) {
+ throw std::runtime_error("message_port_register_out: port already in use");
+ }
+ message_subscribers = pmt::pmt_dict_add(message_subscribers, port_id, pmt::PMT_NIL);
+}
+
+pmt::pmt_t
+gr_basic_block::message_ports_out()
+{
+ size_t len = pmt::pmt_length(message_subscribers);
+ pmt::pmt_t port_names = pmt::pmt_make_vector(len, pmt::PMT_NIL);
+ pmt::pmt_t keys = pmt::pmt_dict_keys(message_subscribers);
+ for(size_t i = 0; i < len; i++) {
+ pmt::pmt_vector_set(port_names, i, pmt::pmt_nth(i, keys));
+ }
+ return port_names;
+}
// - publish a message on a message port
-void gr_basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg){
- if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ throw std::runtime_error("port does not exist"); }
- pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
- // iterate through subscribers on port
- while( pmt::pmt_is_pair(currlist) ){
- pmt::pmt_t target = pmt::pmt_car(currlist);
-
- pmt::pmt_t block = pmt::pmt_car(target);
- pmt::pmt_t port = pmt::pmt_cdr(target);
+void gr_basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg)
+{
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) {
+ throw std::runtime_error("port does not exist");
+ }
+
+ pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers, port_id, pmt::PMT_NIL);
+ // iterate through subscribers on port
+ while(pmt::pmt_is_pair(currlist)) {
+ pmt::pmt_t target = pmt::pmt_car(currlist);
+
+ pmt::pmt_t block = pmt::pmt_car(target);
+ pmt::pmt_t port = pmt::pmt_cdr(target);
- currlist = pmt::pmt_cdr(currlist);
- gr_basic_block_sptr blk = global_block_registry.block_lookup(block);
- //blk->post(msg);
- blk->post(port, msg);
- }
- }
+ currlist = pmt::pmt_cdr(currlist);
+ gr_basic_block_sptr blk = global_block_registry.block_lookup(block);
+ //blk->post(msg);
+ blk->post(port, msg);
+ }
+}
// - subscribe to a message port
-void gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){
- if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){
- std::stringstream ss;
- ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
- throw std::runtime_error(ss.str());
- }
- pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
- message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target));
- }
+void gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target)
+{
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) {
+ std::stringstream ss;
+ ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id)
+ << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
+ throw std::runtime_error(ss.str());
+ }
+
+ pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
+ message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target));
+}
-void gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){
- if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){
- std::stringstream ss;
- ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
- throw std::runtime_error(ss.str());
- }
- pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
- message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_rm(currlist,target));
- }
+void gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target)
+{
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) {
+ std::stringstream ss;
+ ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id)
+ << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
+ throw std::runtime_error(ss.str());
+ }
+
+ pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
+ message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_rm(currlist,target));
+}
void
gr_basic_block::_post(pmt_t which_port, pmt_t msg)
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index e0fd5d2af..00e9c2192 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -72,6 +72,7 @@ private:
typedef std::deque<pmt::pmt_t> msg_queue_t;
typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t;
+ typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr;
msg_queue_map_t msg_queue;
// boost::condition_variable msg_queue_ready;
std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready;
@@ -143,6 +144,22 @@ public:
void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
/*!
+ * \brief Get input message port names.
+ *
+ * Returns the available input message ports for a block. The
+ * return object is a PMT vector that is filled with PMT symbols.
+ */
+ pmt::pmt_t message_ports_in();
+
+ /*!
+ * \brief Get output message port names.
+ *
+ * Returns the available output message ports for a block. The
+ * return object is a PMT vector that is filled with PMT symbols.
+ */
+ pmt::pmt_t message_ports_out();
+
+ /*!
* Accept msg, place in queue, arrange for thread to be awakened if it's not already.
*/
void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.i b/gnuradio-core/src/lib/runtime/gr_basic_block.i
index d6d6c3d16..62f16462d 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.i
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.i
@@ -48,6 +48,8 @@ public:
std::string alias();
void set_block_alias(std::string name);
void _post(pmt_t which_port, pmt_t msg);
+ pmt_t message_ports_in();
+ pmt_t message_ports_out();
};
%rename(block_ncurrently_allocated) gr_basic_block_ncurrently_allocated;
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py
index da1331d96..bf02d12c1 100755
--- a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py
@@ -43,6 +43,27 @@ class test_pdu(gr_unittest.TestCase):
dbg = gr.message_debug()
+ # Test that the right number of ports exist.
+ pi = dbg.message_ports_in()
+ po = dbg.message_ports_out()
+ self.assertEqual(pmt.pmt_length(pi), 2)
+ self.assertEqual(pmt.pmt_length(po), 0)
+
+ pi = snk3.message_ports_in()
+ po = snk3.message_ports_out()
+ self.assertEqual(pmt.pmt_length(pi), 0)
+ self.assertEqual(pmt.pmt_length(po), 1)
+
+ #print "Message Debug input ports: "
+ #pmt.pmt_print(pi)
+ #print "Message Debug output ports: "
+ #pmt.pmt_print(po)
+
+ #print "Stream to PDU input ports: "
+ #pmt.pmt_print(pi)
+ #print "Stream to PDU output ports: "
+ #pmt.pmt_print(po)
+
self.tb.connect(src, snk)
self.tb.connect(src, snk2)
self.tb.connect(src, snk3)