summaryrefslogtreecommitdiff
path: root/gnuradio-core/src
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-core/src')
-rw-r--r--gnuradio-core/src/lib/general/gr_nop.cc3
-rw-r--r--gnuradio-core/src/lib/runtime/CMakeLists.txt2
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.cc117
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h85
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.i3
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.cc6
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.h5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_registry.cc58
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_registry.h36
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2.cc30
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2.h15
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2.i15
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc22
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h4
-rw-r--r--gnuradio-core/src/lib/runtime/gr_msg_accepter.cc4
-rw-r--r--gnuradio-core/src/lib/runtime/gr_msg_accepter.h2
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_detail.cc40
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_detail.h26
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc41
-rw-r--r--gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc5
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/top_block.py6
21 files changed, 427 insertions, 98 deletions
diff --git a/gnuradio-core/src/lib/general/gr_nop.cc b/gnuradio-core/src/lib/general/gr_nop.cc
index ca5983c39..edfe1d76d 100644
--- a/gnuradio-core/src/lib/general/gr_nop.cc
+++ b/gnuradio-core/src/lib/general/gr_nop.cc
@@ -40,7 +40,8 @@ gr_nop::gr_nop (size_t sizeof_stream_item)
d_nmsgs_recvd(0)
{
// Arrange to have count_received_msgs called when messages are received.
- set_msg_handler(boost::bind(&gr_nop::count_received_msgs, this, _1));
+ message_port_register_in(pmt::mp("port"));
+ set_msg_handler(pmt::mp("port"), boost::bind(&gr_nop::count_received_msgs, this, _1));
}
// Trivial message handler that just counts them.
diff --git a/gnuradio-core/src/lib/runtime/CMakeLists.txt b/gnuradio-core/src/lib/runtime/CMakeLists.txt
index 5f3672dde..70938a0f1 100644
--- a/gnuradio-core/src/lib/runtime/CMakeLists.txt
+++ b/gnuradio-core/src/lib/runtime/CMakeLists.txt
@@ -54,6 +54,7 @@ list(APPEND gnuradio_core_sources
${CMAKE_CURRENT_SOURCE_DIR}/gr_block.cc
${CMAKE_CURRENT_SOURCE_DIR}/gr_block_detail.cc
${CMAKE_CURRENT_SOURCE_DIR}/gr_block_executor.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_registry.cc
${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2.cc
${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2_detail.cc
${CMAKE_CURRENT_SOURCE_DIR}/gr_buffer.cc
@@ -116,6 +117,7 @@ install(FILES
${CMAKE_CURRENT_SOURCE_DIR}/gr_block.h
${CMAKE_CURRENT_SOURCE_DIR}/gr_block_detail.h
${CMAKE_CURRENT_SOURCE_DIR}/gr_block_executor.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_registry.h
${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2.h
${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2_detail.h
${CMAKE_CURRENT_SOURCE_DIR}/gr_buffer.h
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index d7263b92d..3d08b63d1 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -25,7 +25,9 @@
#endif
#include <gr_basic_block.h>
+#include <gr_block_registry.h>
#include <stdexcept>
+#include <sstream>
using namespace pmt;
@@ -45,7 +47,10 @@ gr_basic_block::gr_basic_block(const std::string &name,
d_input_signature(input_signature),
d_output_signature(output_signature),
d_unique_id(s_next_id++),
- d_color(WHITE)
+ d_symbolic_id(global_block_registry.block_register(this)),
+ d_symbol_name(global_block_registry.register_symbolic_name(this)),
+ d_color(WHITE),
+ message_subscribers(pmt::pmt_make_dict())
{
s_ncurrently_allocated++;
}
@@ -53,6 +58,7 @@ gr_basic_block::gr_basic_block(const std::string &name,
gr_basic_block::~gr_basic_block()
{
s_ncurrently_allocated--;
+ global_block_registry.block_unregister(this);
}
gr_basic_block_sptr
@@ -60,3 +66,112 @@ gr_basic_block::to_basic_block()
{
return shared_from_this();
}
+
+void
+gr_basic_block::set_block_alias(std::string name)
+{
+ global_block_registry.register_symbolic_name(this, name);
+}
+
+// ** Message passing interface **
+
+// - register a new input message port
+void gr_basic_block::message_port_register_in(pmt::pmt_t port_id){
+ msg_queue[port_id] = msg_queue_t();
+ }
+
+// - register a new output message port
+void gr_basic_block::message_port_register_out(pmt::pmt_t port_id){
+ if(!pmt::pmt_is_symbol(port_id)){ throw std::runtime_error("bad port id"); }
+ if(pmt::pmt_dict_has_key(message_subscribers, port_id)){ throw std::runtime_error("port already in use"); }
+ message_subscribers = pmt::pmt_dict_add(message_subscribers, port_id, pmt::PMT_NIL);
+ }
+
+// - publish a message on a message port
+void gr_basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg){
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ throw std::runtime_error("port does not exist"); }
+ pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
+ // iterate through subscribers on port
+ while( pmt::pmt_is_pair(currlist) ){
+ pmt::pmt_t target = pmt::pmt_car(currlist);
+
+ pmt::pmt_t block = pmt::pmt_car(target);
+ pmt::pmt_t port = pmt::pmt_cdr(target);
+
+ currlist = pmt::pmt_cdr(currlist);
+ gr_basic_block_sptr blk = global_block_registry.block_lookup(block);
+ //blk->post(msg);
+ blk->post(port, msg);
+ }
+ }
+
+// - subscribe to a message port
+void gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){
+ std::stringstream ss;
+ ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
+ throw std::runtime_error(ss.str());
+ }
+ pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
+ message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target));
+ }
+
+void gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){
+ std::stringstream ss;
+ ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
+ throw std::runtime_error(ss.str());
+ }
+ pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
+ message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_rm(currlist,target));
+ }
+
+void
+gr_basic_block::_post(pmt_t which_port, pmt_t msg)
+{
+ insert_tail(which_port, msg);
+ //notify_msg();
+}
+
+void
+gr_basic_block::insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg)
+{
+ gruel::scoped_lock guard(mutex);
+
+ msg_queue[which_port].push_back(msg);
+
+ // wake up thread if BLKD_IN or BLKD_OUT
+ //input_cond.notify_one();
+ //output_cond.notify_one();
+ // TODO: reconsider the need for notification of input and output conditions!
+}
+
+pmt_t
+gr_basic_block::delete_head_nowait(pmt::pmt_t which_port)
+{
+ gruel::scoped_lock guard(mutex);
+
+ if (empty_p(which_port))
+ return pmt_t();
+
+ pmt_t m(msg_queue[which_port].front());
+ msg_queue[which_port].pop_front();
+
+ return m;
+}
+
+/*
+ * Caller must already be holding the mutex
+ */
+pmt_t
+gr_basic_block::delete_head_nowait_already_holding_mutex(pmt::pmt_t which_port)
+{
+ if (empty_p(which_port))
+ return pmt_t();
+
+ pmt_t m(msg_queue[which_port].front());
+ msg_queue[which_port].pop_front();
+
+ return m;
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index cb6a983c4..31026a2e4 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -30,7 +30,11 @@
#include <boost/function.hpp>
#include <gr_msg_accepter.h>
#include <string>
+#include <deque>
+#include <map>
#include <gr_io_signature.h>
+#include <gruel/thread.h>
+#include <boost/foreach.hpp>
/*!
* \brief The abstract base class for all signal processing blocks.
@@ -54,14 +58,23 @@ private:
* The thread-safety guarantees mentioned in set_msg_handler are implemented
* by the callers of this method.
*/
- void dispatch_msg(pmt::pmt_t msg)
+ void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
{
- if (d_msg_handler) // Is there a handler?
- d_msg_handler(msg); // Yes, invoke it.
+ // AA Update this
+ if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
+ d_msg_handlers[which_port](msg); // Yes, invoke it.
};
- msg_handler_t d_msg_handler;
+ //msg_handler_t d_msg_handler;
+ typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
+ d_msg_handlers_t d_msg_handlers;
+ typedef std::deque<pmt::pmt_t> msg_queue_t;
+ typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t;
+ msg_queue_map_t msg_queue;
+ gruel::mutex mutex; //< protects all vars
+
+
protected:
friend class gr_flowgraph;
friend class gr_flat_flowgraph; // TODO: will be redundant
@@ -73,6 +86,9 @@ protected:
gr_io_signature_sptr d_input_signature;
gr_io_signature_sptr d_output_signature;
long d_unique_id;
+ long d_symbolic_id;
+ std::string d_symbol_name;
+ std::string d_symbol_alias;
vcolor d_color;
gr_basic_block(void){} //allows pure virtual interface sub-classes
@@ -98,13 +114,65 @@ protected:
void set_color(vcolor color) { d_color = color; }
vcolor color() const { return d_color; }
+ // Message passing interface
+ std::vector<pmt::pmt_t> message_inputs;
+ pmt::pmt_t message_subscribers;
+
public:
virtual ~gr_basic_block();
long unique_id() const { return d_unique_id; }
+ long symbolic_id() const { return d_symbolic_id; }
std::string name() const { return d_name; }
+ std::string symbol_name() const { return d_symbol_name; }
gr_io_signature_sptr input_signature() const { return d_input_signature; }
gr_io_signature_sptr output_signature() const { return d_output_signature; }
gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion
+ bool alias_set() { return !d_symbol_alias.empty(); }
+ std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
+ pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
+ void set_block_alias(std::string name);
+
+ // ** Message passing interface **
+ void message_port_register_in(pmt::pmt_t port_id);
+ void message_port_register_out(pmt::pmt_t port_id);
+ void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
+ void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
+ void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
+
+ /*!
+ * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
+ */
+ void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
+
+
+ //! is the queue empty?
+ //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); }
+ bool empty_p(pmt::pmt_t which_port) { return msg_queue[which_port].empty(); }
+ bool empty_p() {
+ bool rv = true;
+ BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
+ return rv;
+ }
+
+ //| Acquires and release the mutex
+ void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ * Caller must already be holding the mutex
+ */
+ pmt::pmt_t delete_head_nowait_already_holding_mutex( pmt::pmt_t which_port);
+
+ msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
+ return msg_queue[which_port].begin();
+ }
+ void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
+ msg_queue[which_port].erase(it);
+ }
+
/*!
* \brief Confirm that ninputs and noutputs is an acceptable combination.
@@ -147,8 +215,13 @@ public:
* If the block inherits from gr_hier_block2, the runtime system will
* ensure that no reentrant calls are made to msg_handler.
*/
- template <typename T> void set_msg_handler(T msg_handler){
- d_msg_handler = msg_handler_t(msg_handler);
+ //template <typename T> void set_msg_handler(T msg_handler){
+ // d_msg_handler = msg_handler_t(msg_handler);
+ //}
+ template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){
+ if(msg_queue.find(which_port) == msg_queue.end()){
+ throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); }
+ d_msg_handlers[which_port] = msg_handler_t(msg_handler);
}
};
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.i b/gnuradio-core/src/lib/runtime/gr_basic_block.i
index e43cc114c..0a8473ba2 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.i
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.i
@@ -37,11 +37,14 @@ protected:
public:
virtual ~gr_basic_block();
std::string name() const;
+ std::string symbol_name() const;
gr_io_signature_sptr input_signature() const;
gr_io_signature_sptr output_signature() const;
long unique_id() const;
gr_basic_block_sptr to_basic_block();
bool check_topology (int ninputs, int noutputs);
+ std::string alias();
+ void set_block_alias(std::string name);
};
%rename(block_ncurrently_allocated) gr_basic_block_ncurrently_allocated;
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
index 2792cd471..337c9518e 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
@@ -127,12 +127,6 @@ gr_block_detail::produce_each (int how_many_items)
}
-void
-gr_block_detail::_post(pmt_t msg)
-{
- d_tpb.insert_tail(msg);
-}
-
uint64_t
gr_block_detail::nitems_read(unsigned int which_input)
{
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index c96f00db8..16d9f4d42 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -85,11 +85,6 @@ class GR_CORE_API gr_block_detail {
*/
void produce_each (int how_many_items);
- /*!
- * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
- */
- void _post(pmt::pmt_t msg);
-
// Return the number of items read on input stream which_input
uint64_t nitems_read(unsigned int which_input);
diff --git a/gnuradio-core/src/lib/runtime/gr_block_registry.cc b/gnuradio-core/src/lib/runtime/gr_block_registry.cc
new file mode 100644
index 000000000..2478e0019
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_block_registry.cc
@@ -0,0 +1,58 @@
+#include <gr_basic_block.h>
+#include <gr_block_registry.h>
+
+gr_block_registry global_block_registry;
+
+gr_block_registry::gr_block_registry(){
+ d_ref_map = pmt::pmt_make_dict();
+}
+
+long gr_block_registry::block_register(gr_basic_block* block){
+ if(d_map.find(block->name()) == d_map.end()){
+ d_map[block->name()] = blocksubmap_t();
+ d_map[block->name()][0] = block;
+ return 0;
+ } else {
+ for(size_t i=0; i<=d_map[block->name()].size(); i++){
+ if(d_map[block->name()].find(i) == d_map[block->name()].end()){
+ d_map[block->name()][i] = block;
+ return i;
+ }
+ }
+ }
+ throw std::runtime_error("should not reach this");
+}
+
+void gr_block_registry::block_unregister(gr_basic_block* block){
+ d_map[block->name()].erase( d_map[block->name()].find(block->symbolic_id()));
+ d_ref_map = pmt::pmt_dict_delete(d_ref_map, pmt::pmt_intern(block->symbol_name()));
+ if(block->alias_set()){
+ d_ref_map = pmt::pmt_dict_delete(d_ref_map, pmt::pmt_intern(block->alias()));
+ }
+}
+
+std::string gr_block_registry::register_symbolic_name(gr_basic_block* block){
+ std::stringstream ss;
+ ss << block->name() << block->symbolic_id();
+ //std::cout << "register_symbolic_name: " << ss.str() << std::endl;
+ register_symbolic_name(block, ss.str());
+ return ss.str();
+}
+
+void gr_block_registry::register_symbolic_name(gr_basic_block* block, std::string name){
+ if(pmt_dict_has_key(d_ref_map, pmt::pmt_intern(name))){
+ throw std::runtime_error("symbol already exists, can not re-use!");
+ }
+ d_ref_map = pmt_dict_add(d_ref_map, pmt::pmt_intern(name), pmt::pmt_make_any(block));
+}
+
+gr_basic_block_sptr gr_block_registry::block_lookup(pmt::pmt_t symbol){
+ pmt::pmt_t ref = pmt_dict_ref(d_ref_map, symbol, pmt::PMT_NIL);
+ if(pmt::pmt_eq(ref, pmt::PMT_NIL)){
+ throw std::runtime_error("block lookup failed! block not found!");
+ }
+ gr_basic_block* blk = boost::any_cast<gr_basic_block*>( pmt::pmt_any_ref(ref) );
+ return blk->shared_from_this();
+}
+
+
diff --git a/gnuradio-core/src/lib/runtime/gr_block_registry.h b/gnuradio-core/src/lib/runtime/gr_block_registry.h
new file mode 100644
index 000000000..8f1982984
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_block_registry.h
@@ -0,0 +1,36 @@
+#ifndef GR_BLOCK_REGISTRY_H
+#define GR_BLOCK_REGISTRY_H
+
+#include <map>
+
+#ifndef GR_BASIC_BLOCK_H
+class gr_basic_block;
+#endif
+
+class gr_block_registry {
+ public:
+ gr_block_registry();
+
+ long block_register(gr_basic_block* block);
+ void block_unregister(gr_basic_block* block);
+
+ std::string register_symbolic_name(gr_basic_block* block);
+ void register_symbolic_name(gr_basic_block* block, std::string name);
+
+ gr_basic_block_sptr block_lookup(pmt::pmt_t symbol);
+
+ private:
+
+ //typedef std::map< long, gr_basic_block_sptr > blocksubmap_t;
+ typedef std::map< long, gr_basic_block* > blocksubmap_t;
+ typedef std::map< std::string, blocksubmap_t > blockmap_t;
+
+ blockmap_t d_map;
+ pmt::pmt_t d_ref_map;
+
+};
+
+extern gr_block_registry global_block_registry;
+
+#endif
+
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
index 756852df8..a19bfe195 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
@@ -81,6 +81,36 @@ gr_hier_block2::connect(gr_basic_block_sptr src, int src_port,
}
void
+gr_hier_block2::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport)
+{
+ if(!pmt::pmt_is_symbol(srcport)){throw std::runtime_error("bad port id"); }
+ d_detail->msg_connect(src, srcport, dst, dstport);
+}
+
+void
+gr_hier_block2::msg_connect(gr_basic_block_sptr src, std::string srcport,
+ gr_basic_block_sptr dst, std::string dstport)
+{
+ d_detail->msg_connect(src, pmt::mp(srcport), dst, pmt::mp(dstport));
+}
+
+void
+gr_hier_block2::msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport)
+{
+ if(!pmt::pmt_is_symbol(srcport)){throw std::runtime_error("bad port id"); }
+ d_detail->msg_disconnect(src, srcport, dst, dstport);
+}
+
+void
+gr_hier_block2::msg_disconnect(gr_basic_block_sptr src, std::string srcport,
+ gr_basic_block_sptr dst, std::string dstport)
+{
+ d_detail->msg_disconnect(src, pmt::mp(srcport), dst, pmt::mp(dstport));
+}
+
+void
gr_hier_block2::disconnect(gr_basic_block_sptr block)
{
d_detail->disconnect(block);
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.h b/gnuradio-core/src/lib/runtime/gr_hier_block2.h
index 123178724..e8364a740 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.h
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.h
@@ -98,6 +98,21 @@ public:
gr_basic_block_sptr dst, int dst_port);
/*!
+ * \brief Add gr-blocks or hierarchical blocks to internal graph and wire together
+ *
+ * This adds (if not done earlier by another connect) a pair of gr-blocks or
+ * hierarchical blocks to the internal message port subscription
+ */
+ void msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport);
+ void msg_connect(gr_basic_block_sptr src, std::string srcport,
+ gr_basic_block_sptr dst, std::string dstport);
+ void msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport);
+ void msg_disconnect(gr_basic_block_sptr src, std::string srcport,
+ gr_basic_block_sptr dst, std::string dstport);
+
+ /*!
* \brief Remove a gr-block or hierarchical block from the internal flowgraph.
*
* This removes a gr-block or hierarchical block from the internal flowgraph,
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.i b/gnuradio-core/src/lib/runtime/gr_hier_block2.i
index eefb965b4..7c0e62f28 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.i
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.i
@@ -38,6 +38,8 @@ gr_hier_block2_sptr gr_make_hier_block2(const std::string name,
// better interface in scripting land.
%rename(primitive_connect) gr_hier_block2::connect;
%rename(primitive_disconnect) gr_hier_block2::disconnect;
+%rename(primitive_msg_connect) gr_hier_block2::msg_connect;
+%rename(primitive_msg_disconnect) gr_hier_block2::msg_disconnect;
class gr_hier_block2 : public gr_basic_block
{
@@ -54,6 +56,19 @@ public:
void connect(gr_basic_block_sptr src, int src_port,
gr_basic_block_sptr dst, int dst_port)
throw (std::invalid_argument);
+ void msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport)
+ throw (std::runtime_error);
+ void msg_connect(gr_basic_block_sptr src, std::string srcport,
+ gr_basic_block_sptr dst, std::string dstport)
+ throw (std::runtime_error);
+ void msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport)
+ throw (std::runtime_error);
+ void msg_disconnect(gr_basic_block_sptr src, std::string srcport,
+ gr_basic_block_sptr dst, std::string dstport)
+ throw (std::runtime_error);
+
void disconnect(gr_basic_block_sptr block)
throw (std::invalid_argument);
void disconnect(gr_basic_block_sptr src, int src_port,
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
index 76c5ce06f..099b2f8e8 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
@@ -144,6 +144,28 @@ gr_hier_block2_detail::connect(gr_basic_block_sptr src, int src_port,
}
void
+gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport)
+{
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << "connecting message port..." << std::endl;
+
+ // register the subscription
+ src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+}
+
+void
+gr_hier_block2_detail::msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport)
+{
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << "disconnecting message port..." << std::endl;
+
+ // register the subscription
+ src->message_port_unsub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+}
+
+void
gr_hier_block2_detail::disconnect(gr_basic_block_sptr block)
{
// Check on singleton list
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
index f4f950e9d..f2d2b3c4e 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
@@ -39,6 +39,10 @@ public:
void connect(gr_basic_block_sptr block);
void connect(gr_basic_block_sptr src, int src_port,
gr_basic_block_sptr dst, int dst_port);
+ void msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport);
+ void msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport);
void disconnect(gr_basic_block_sptr block);
void disconnect(gr_basic_block_sptr, int src_port,
gr_basic_block_sptr, int dst_port);
diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
index 5018ee9e6..93d5fb20e 100644
--- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
+++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
@@ -41,12 +41,12 @@ gr_msg_accepter::~gr_msg_accepter()
}
void
-gr_msg_accepter::post(pmt_t msg)
+gr_msg_accepter::post(pmt_t which_port, pmt_t msg)
{
// Notify derived class, handled case by case
gr_block *p = dynamic_cast<gr_block *>(this);
if (p) {
- p->detail()->_post(msg);
+ p->_post(which_port,msg);
return;
}
gr_hier_block2 *p2 = dynamic_cast<gr_hier_block2 *>(this);
diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
index 3e5c97638..a497ba6e7 100644
--- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
+++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
@@ -36,7 +36,7 @@ public:
gr_msg_accepter();
~gr_msg_accepter();
- void post(pmt::pmt_t msg);
+ void post(pmt::pmt_t which_port, pmt::pmt_t msg);
};
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
index 46b33d91f..46eb6bbe0 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
@@ -68,43 +68,3 @@ gr_tpb_detail::notify_neighbors(gr_block_detail *d)
notify_upstream(d);
}
-void
-gr_tpb_detail::insert_tail(pmt::pmt_t msg)
-{
- gruel::scoped_lock guard(mutex);
-
- msg_queue.push_back(msg);
-
- // wake up thread if BLKD_IN or BLKD_OUT
- input_cond.notify_one();
- output_cond.notify_one();
-}
-
-pmt_t
-gr_tpb_detail::delete_head_nowait()
-{
- gruel::scoped_lock guard(mutex);
-
- if (empty_p())
- return pmt_t();
-
- pmt_t m(msg_queue.front());
- msg_queue.pop_front();
-
- return m;
-}
-
-/*
- * Caller must already be holding the mutex
- */
-pmt_t
-gr_tpb_detail::delete_head_nowait_already_holding_mutex()
-{
- if (empty_p())
- return pmt_t();
-
- pmt_t m(msg_queue.front());
- msg_queue.pop_front();
-
- return m;
-}
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
index b6e342dee..69feb6007 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
@@ -39,9 +39,6 @@ struct GR_CORE_API gr_tpb_detail {
bool output_changed;
gruel::condition_variable output_cond;
-private:
- std::deque<pmt::pmt_t> msg_queue;
-
public:
gr_tpb_detail()
: input_changed(false), output_changed(false) { }
@@ -55,6 +52,12 @@ public:
//! Called by us to notify both upstream and downstream
void notify_neighbors(gr_block_detail *d);
+ //! Called by pmt msg posters
+ void notify_msg(){
+ input_cond.notify_one();
+ output_cond.notify_one();
+ }
+
//! Called by us
void clear_changed()
{
@@ -63,23 +66,6 @@ public:
output_changed = false;
}
- //! is the queue empty?
- bool empty_p() const { return msg_queue.empty(); }
-
- //| Acquires and release the mutex
- void insert_tail(pmt::pmt_t msg);
-
- /*!
- * \returns returns pmt at head of queue or pmt_t() if empty.
- */
- pmt::pmt_t delete_head_nowait();
-
- /*!
- * \returns returns pmt at head of queue or pmt_t() if empty.
- * Caller must already be holding the mutex
- */
- pmt::pmt_t delete_head_nowait_already_holding_mutex();
-
private:
//! Used by notify_downstream
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index a5aabb379..1bd3014ad 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -25,6 +25,7 @@
#include <iostream>
#include <boost/thread.hpp>
#include <gruel/pmt.h>
+#include <boost/foreach.hpp>
using namespace pmt;
@@ -42,8 +43,14 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
boost::this_thread::interruption_point();
// handle any queued up messages
- while ((msg = d->d_tpb.delete_head_nowait()))
- block->dispatch_msg(msg);
+ //BOOST_FOREACH( pmt::pmt_t port, block->msg_queue.keys() )
+
+ BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue )
+ {
+ while ((msg = block->delete_head_nowait(i.first))){
+ block->dispatch_msg(i.first,msg);
+ }
+ }
d->d_tpb.clear_changed();
s = d_exec.run_one_iteration();
@@ -67,15 +74,18 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
while (!d->d_tpb.input_changed){
// wait for input or message
- while(!d->d_tpb.input_changed && d->d_tpb.empty_p())
+ while(!d->d_tpb.input_changed && block->empty_p())
d->d_tpb.input_cond.wait(guard);
// handle all pending messages
- while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
- guard.unlock(); // release lock while processing msg
- block->dispatch_msg(msg);
- guard.lock();
- }
+ BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue )
+ {
+ while ((msg = block->delete_head_nowait_already_holding_mutex(i.first))){
+ guard.unlock(); // release lock while processing msg
+ block->dispatch_msg(i.first, msg);
+ guard.lock();
+ }
+ }
}
}
break;
@@ -87,15 +97,18 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
while (!d->d_tpb.output_changed){
// wait for output room or message
- while(!d->d_tpb.output_changed && d->d_tpb.empty_p())
+ while(!d->d_tpb.output_changed && block->empty_p())
d->d_tpb.output_cond.wait(guard);
// handle all pending messages
- while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
- guard.unlock(); // release lock while processing msg
- block->dispatch_msg(msg);
- guard.lock();
- }
+ BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue )
+ {
+ while ((msg = block->delete_head_nowait_already_holding_mutex(i.first))){
+ guard.unlock(); // release lock while processing msg
+ block->dispatch_msg(i.first,msg);
+ guard.lock();
+ }
+ }
}
}
break;
diff --git a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
index 25ae0b1e1..dc8f0f8a9 100644
--- a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
+++ b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
@@ -65,14 +65,15 @@ void qa_set_msg_handler::t0()
tb->start();
// Send them...
+ pmt_t port(pmt_intern("port"));
for (int i = 0; i < NMSGS; i++){
- send(nop, mp(mp("example-msg"), mp(i)));
+ send(nop, port, mp(mp("example-msg"), mp(i)));
}
// And send a message to null_source to confirm that the default
// message handling action (which should be a nop) doesn't dump
// core.
- send(src, mp(mp("example-msg"), mp(0)));
+ send(src, port, mp(mp("example-msg"), mp(0)));
// Give the messages a chance to be processed
boost::this_thread::sleep(boost::posix_time::milliseconds(100));
diff --git a/gnuradio-core/src/python/gnuradio/gr/top_block.py b/gnuradio-core/src/python/gnuradio/gr/top_block.py
index 43af8073b..dc1f443aa 100644
--- a/gnuradio-core/src/python/gnuradio/gr/top_block.py
+++ b/gnuradio-core/src/python/gnuradio/gr/top_block.py
@@ -123,6 +123,12 @@ class top_block(object):
for i in range (1, len (points)):
self._connect(points[i-1], points[i])
+ def msg_connect(self, src, srcport, dst, dstport):
+ self.primitive_msg_connect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport);
+
+ def msg_disconnect(self, src, srcport, dst, dstport):
+ self.primitive_msg_disconnect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport);
+
def _connect(self, src, dst):
(src_block, src_port) = self._coerce_endpoint(src)
(dst_block, dst_port) = self._coerce_endpoint(dst)