summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/lib/runtime
diff options
context:
space:
mode:
authorTim O'Shea2012-12-07 09:28:41 -0800
committerJohnathan Corgan2012-12-07 09:36:19 -0800
commit52ca5e2765b7a4532d26502b5b76b7c85c5019d7 (patch)
tree27901f998f00d4c824c060723fab9d397931aeb3 /gnuradio-core/src/lib/runtime
parent69990c3fb6d4c7a0daee0229407241aa1959095a (diff)
downloadgnuradio-52ca5e2765b7a4532d26502b5b76b7c85c5019d7.tar.gz
gnuradio-52ca5e2765b7a4532d26502b5b76b7c85c5019d7.tar.bz2
gnuradio-52ca5e2765b7a4532d26502b5b76b7c85c5019d7.zip
core: added gr_tuntap_pdu, gr_socket_pdu, and msg passing enhancements
Diffstat (limited to 'gnuradio-core/src/lib/runtime')
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.cc41
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h406
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc34
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h4
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flowgraph.cc40
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flowgraph.h61
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2.cc5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2.h33
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2.i6
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc90
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h1
11 files changed, 488 insertions, 233 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index 0f7875a12..6ff57a1d6 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -38,7 +38,7 @@ static long s_ncurrently_allocated = 0;
long
gr_basic_block_ncurrently_allocated()
{
- return s_ncurrently_allocated;
+ return s_ncurrently_allocated;
}
gr_basic_block::gr_basic_block(const std::string &name,
@@ -53,25 +53,25 @@ gr_basic_block::gr_basic_block(const std::string &name,
d_color(WHITE),
message_subscribers(pmt::pmt_make_dict())
{
- s_ncurrently_allocated++;
+ s_ncurrently_allocated++;
}
gr_basic_block::~gr_basic_block()
{
- s_ncurrently_allocated--;
- global_block_registry.block_unregister(this);
+ s_ncurrently_allocated--;
+ global_block_registry.block_unregister(this);
}
gr_basic_block_sptr
gr_basic_block::to_basic_block()
{
- return shared_from_this();
+ return shared_from_this();
}
void
gr_basic_block::set_block_alias(std::string name)
{
- global_block_registry.register_symbolic_name(this, name);
+ global_block_registry.register_symbolic_name(this, name);
}
// ** Message passing interface **
@@ -147,28 +147,29 @@ void gr_basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg)
}
// - subscribe to a message port
-void gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target)
-{
- if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) {
+void
+gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){
std::stringstream ss;
- ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id)
- << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
+ ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
throw std::runtime_error(ss.str());
}
-
pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
- message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target));
+
+ // ignore re-adds of the same target
+ if(!pmt::pmt_list_has(currlist, target))
+ message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target));
}
-void gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target)
-{
- if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) {
+void
+gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){
std::stringstream ss;
- ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id)
- << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
+ ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
throw std::runtime_error(ss.str());
}
-
+
+ // ignore unsubs of unknown targets
pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_rm(currlist,target));
}
@@ -224,5 +225,3 @@ gr_basic_block::delete_head_blocking(pmt::pmt_t which_port)
msg_queue[which_port].pop_front();
return m;
}
-
-
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index 00e9c2192..f3b7b835b 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -36,6 +36,7 @@
#include <gruel/thread.h>
#include <boost/foreach.hpp>
#include <boost/thread/condition_variable.hpp>
+#include <iostream>
/*!
* \brief The abstract base class for all signal processing blocks.
@@ -50,202 +51,215 @@
class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_shared_from_this<gr_basic_block>
{
- typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
-
-private:
- /*
- * This function is called by the runtime system to dispatch messages.
- *
- * The thread-safety guarantees mentioned in set_msg_handler are implemented
- * by the callers of this method.
- */
- void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
- {
- // AA Update this
- if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
- d_msg_handlers[which_port](msg); // Yes, invoke it.
- };
-
- //msg_handler_t d_msg_handler;
- typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
- d_msg_handlers_t d_msg_handlers;
-
- typedef std::deque<pmt::pmt_t> msg_queue_t;
- typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t;
- typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr;
- msg_queue_map_t msg_queue;
-// boost::condition_variable msg_queue_ready;
- std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready;
-
- gruel::mutex mutex; //< protects all vars
-
-
-protected:
- friend class gr_flowgraph;
- friend class gr_flat_flowgraph; // TODO: will be redundant
- friend class gr_tpb_thread_body;
-
- enum vcolor { WHITE, GREY, BLACK };
-
- std::string d_name;
- gr_io_signature_sptr d_input_signature;
- gr_io_signature_sptr d_output_signature;
- long d_unique_id;
- long d_symbolic_id;
- std::string d_symbol_name;
- std::string d_symbol_alias;
- vcolor d_color;
-
- gr_basic_block(void){} //allows pure virtual interface sub-classes
-
- //! Protected constructor prevents instantiation by non-derived classes
- gr_basic_block(const std::string &name,
- gr_io_signature_sptr input_signature,
- gr_io_signature_sptr output_signature);
-
- //! may only be called during constructor
- void set_input_signature(gr_io_signature_sptr iosig) {
- d_input_signature = iosig;
+ typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
+
+ private:
+ /*
+ * This function is called by the runtime system to dispatch messages.
+ *
+ * The thread-safety guarantees mentioned in set_msg_handler are implemented
+ * by the callers of this method.
+ */
+ void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
+ {
+ // AA Update this
+ if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
+ d_msg_handlers[which_port](msg); // Yes, invoke it.
+ };
+
+ //msg_handler_t d_msg_handler;
+ typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
+ d_msg_handlers_t d_msg_handlers;
+
+ typedef std::deque<pmt::pmt_t> msg_queue_t;
+ typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t;
+ typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr;
+ std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready;
+
+ gruel::mutex mutex; //< protects all vars
+
+ protected:
+ friend class gr_flowgraph;
+ friend class gr_flat_flowgraph; // TODO: will be redundant
+ friend class gr_tpb_thread_body;
+
+ enum vcolor { WHITE, GREY, BLACK };
+
+ std::string d_name;
+ gr_io_signature_sptr d_input_signature;
+ gr_io_signature_sptr d_output_signature;
+ long d_unique_id;
+ long d_symbolic_id;
+ std::string d_symbol_name;
+ std::string d_symbol_alias;
+ vcolor d_color;
+ msg_queue_map_t msg_queue;
+
+ gr_basic_block(void){} //allows pure virtual interface sub-classes
+
+ //! Protected constructor prevents instantiation by non-derived classes
+ gr_basic_block(const std::string &name,
+ gr_io_signature_sptr input_signature,
+ gr_io_signature_sptr output_signature);
+
+ //! may only be called during constructor
+ void set_input_signature(gr_io_signature_sptr iosig) {
+ d_input_signature = iosig;
+ }
+
+ //! may only be called during constructor
+ void set_output_signature(gr_io_signature_sptr iosig) {
+ d_output_signature = iosig;
+ }
+
+ /*!
+ * \brief Allow the flowgraph to set for sorting and partitioning
+ */
+ void set_color(vcolor color) { d_color = color; }
+ vcolor color() const { return d_color; }
+
+ // Message passing interface
+ pmt::pmt_t message_subscribers;
+
+ public:
+ virtual ~gr_basic_block();
+ long unique_id() const { return d_unique_id; }
+ long symbolic_id() const { return d_symbolic_id; }
+ std::string name() const { return d_name; }
+ std::string symbol_name() const { return d_symbol_name; }
+ gr_io_signature_sptr input_signature() const { return d_input_signature; }
+ gr_io_signature_sptr output_signature() const { return d_output_signature; }
+ gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion
+ bool alias_set() { return !d_symbol_alias.empty(); }
+ std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
+ pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
+ void set_block_alias(std::string name);
+
+ // ** Message passing interface **
+ void message_port_register_in(pmt::pmt_t port_id);
+ void message_port_register_out(pmt::pmt_t port_id);
+ void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
+ void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
+ void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
+
+ virtual bool message_port_is_hier(pmt::pmt_t port_id) { std::cout << "is_hier\n"; return false; }
+ virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { std::cout << "is_hier_in\n"; return false; }
+ virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { std::cout << "is_hier_out\n"; return false; }
+
+ /*!
+ * \brief Get input message port names.
+ *
+ * Returns the available input message ports for a block. The
+ * return object is a PMT vector that is filled with PMT symbols.
+ */
+ pmt::pmt_t message_ports_in();
+
+ /*!
+ * \brief Get output message port names.
+ *
+ * Returns the available output message ports for a block. The
+ * return object is a PMT vector that is filled with PMT symbols.
+ */
+ pmt::pmt_t message_ports_out();
+
+ /*!
+ * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
+ */
+ void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
+
+ //! is the queue empty?
+ //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); }
+ bool empty_p(pmt::pmt_t which_port) {
+ if(msg_queue.find(which_port) == msg_queue.end())
+ throw std::runtime_error("port does not exist!");
+ return msg_queue[which_port].empty();
+ }
+ bool empty_p() {
+ bool rv = true;
+ BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
+ return rv;
+ }
+
+ //| Acquires and release the mutex
+ void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
+
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
+
+ msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
+ return msg_queue[which_port].begin();
+ }
+
+ void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
+ msg_queue[which_port].erase(it);
+ }
+
+ virtual bool has_msg_port(pmt::pmt_t which_port){
+ if(msg_queue.find(which_port) != msg_queue.end()){
+ return true;
}
-
- //! may only be called during constructor
- void set_output_signature(gr_io_signature_sptr iosig) {
- d_output_signature = iosig;
- }
-
- /*!
- * \brief Allow the flowgraph to set for sorting and partitioning
- */
- void set_color(vcolor color) { d_color = color; }
- vcolor color() const { return d_color; }
-
- // Message passing interface
- pmt::pmt_t message_subscribers;
-
-public:
- virtual ~gr_basic_block();
- long unique_id() const { return d_unique_id; }
- long symbolic_id() const { return d_symbolic_id; }
- std::string name() const { return d_name; }
- std::string symbol_name() const { return d_symbol_name; }
- gr_io_signature_sptr input_signature() const { return d_input_signature; }
- gr_io_signature_sptr output_signature() const { return d_output_signature; }
- gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion
- bool alias_set() { return !d_symbol_alias.empty(); }
- std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
- pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
- void set_block_alias(std::string name);
-
- // ** Message passing interface **
- void message_port_register_in(pmt::pmt_t port_id);
- void message_port_register_out(pmt::pmt_t port_id);
- void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
- void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
- void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
-
- /*!
- * \brief Get input message port names.
- *
- * Returns the available input message ports for a block. The
- * return object is a PMT vector that is filled with PMT symbols.
- */
- pmt::pmt_t message_ports_in();
-
- /*!
- * \brief Get output message port names.
- *
- * Returns the available output message ports for a block. The
- * return object is a PMT vector that is filled with PMT symbols.
- */
- pmt::pmt_t message_ports_out();
-
- /*!
- * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
- */
- void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
-
- //! is the queue empty?
- //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); }
- bool empty_p(pmt::pmt_t which_port) {
- if(msg_queue.find(which_port) == msg_queue.end())
- throw std::runtime_error("port does not exist!");
- return msg_queue[which_port].empty();
- }
- bool empty_p() {
- bool rv = true;
- BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
- return rv;
- }
-
- //| Acquires and release the mutex
- void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
- /*!
- * \returns returns pmt at head of queue or pmt_t() if empty.
- */
- pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
-
- /*!
- * \returns returns pmt at head of queue or pmt_t() if empty.
- */
- pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
-
- msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
- return msg_queue[which_port].begin();
- }
- void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
- msg_queue[which_port].erase(it);
- }
-
-
- /*!
- * \brief Confirm that ninputs and noutputs is an acceptable combination.
- *
- * \param ninputs number of input streams connected
- * \param noutputs number of output streams connected
- *
- * \returns true if this is a valid configuration for this block.
- *
- * This function is called by the runtime system whenever the
- * topology changes. Most classes do not need to override this.
- * This check is in addition to the constraints specified by the input
- * and output gr_io_signatures.
- */
- virtual bool check_topology(int ninputs, int noutputs) { return true; }
-
- /*!
- * \brief Set the callback that is fired when messages are available.
- *
- * \p msg_handler can be any kind of function pointer or function object
- * that has the signature:
- * <pre>
- * void msg_handler(pmt::pmt msg);
- * </pre>
- *
- * (You may want to use boost::bind to massage your callable into the
- * correct form. See gr_nop.{h,cc} for an example that sets up a class
- * method as the callback.)
- *
- * Blocks that desire to handle messages must call this method in their
- * constructors to register the handler that will be invoked when messages
- * are available.
- *
- * If the block inherits from gr_block, the runtime system will ensure that
- * msg_handler is called in a thread-safe manner, such that work and
- * msg_handler will never be called concurrently. This allows msg_handler
- * to update state variables without having to worry about thread-safety
- * issues with work, general_work or another invocation of msg_handler.
- *
- * If the block inherits from gr_hier_block2, the runtime system will
- * ensure that no reentrant calls are made to msg_handler.
- */
- //template <typename T> void set_msg_handler(T msg_handler){
- // d_msg_handler = msg_handler_t(msg_handler);
- //}
- template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){
- if(msg_queue.find(which_port) == msg_queue.end()){
- throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); }
- d_msg_handlers[which_port] = msg_handler_t(msg_handler);
+ if(pmt::pmt_dict_has_key(message_subscribers, which_port)){
+ return true;
}
+ return false;
+ }
+
+
+ /*!
+ * \brief Confirm that ninputs and noutputs is an acceptable combination.
+ *
+ * \param ninputs number of input streams connected
+ * \param noutputs number of output streams connected
+ *
+ * \returns true if this is a valid configuration for this block.
+ *
+ * This function is called by the runtime system whenever the
+ * topology changes. Most classes do not need to override this.
+ * This check is in addition to the constraints specified by the input
+ * and output gr_io_signatures.
+ */
+ virtual bool check_topology(int ninputs, int noutputs) { return true; }
+
+ /*!
+ * \brief Set the callback that is fired when messages are available.
+ *
+ * \p msg_handler can be any kind of function pointer or function object
+ * that has the signature:
+ * <pre>
+ * void msg_handler(pmt::pmt msg);
+ * </pre>
+ *
+ * (You may want to use boost::bind to massage your callable into the
+ * correct form. See gr_nop.{h,cc} for an example that sets up a class
+ * method as the callback.)
+ *
+ * Blocks that desire to handle messages must call this method in their
+ * constructors to register the handler that will be invoked when messages
+ * are available.
+ *
+ * If the block inherits from gr_block, the runtime system will ensure that
+ * msg_handler is called in a thread-safe manner, such that work and
+ * msg_handler will never be called concurrently. This allows msg_handler
+ * to update state variables without having to worry about thread-safety
+ * issues with work, general_work or another invocation of msg_handler.
+ *
+ * If the block inherits from gr_hier_block2, the runtime system will
+ * ensure that no reentrant calls are made to msg_handler.
+ */
+ //template <typename T> void set_msg_handler(T msg_handler){
+ // d_msg_handler = msg_handler_t(msg_handler);
+ //}
+ template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){
+ if(msg_queue.find(which_port) == msg_queue.end()){
+ throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); }
+ d_msg_handlers[which_port] = msg_handler_t(msg_handler);
+ }
};
inline bool operator<(gr_basic_block_sptr lhs, gr_basic_block_sptr rhs)
@@ -260,8 +274,8 @@ GR_CORE_API long gr_basic_block_ncurrently_allocated();
inline std::ostream &operator << (std::ostream &os, gr_basic_block_sptr basic_block)
{
- os << basic_block->name() << "(" << basic_block->unique_id() << ")";
- return os;
+ os << basic_block->name() << "(" << basic_block->unique_id() << ")";
+ return os;
}
#endif /* INCLUDED_GR_BASIC_BLOCK_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
index e04deb948..c19863f34 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
@@ -31,8 +31,9 @@
#include <volk/volk.h>
#include <iostream>
#include <map>
+#include <boost/format.hpp>
-#define GR_FLAT_FLOWGRAPH_DEBUG 0
+#define GR_FLAT_FLOWGRAPH_DEBUG 0
// 32Kbyte buffer size between blocks
#define GR_FIXED_BUFFER_SIZE (32*(1L<<10))
@@ -71,6 +72,15 @@ gr_flat_flowgraph::setup_connections()
block->set_is_unaligned(false);
}
+ // Connect message ports connetions
+ for(gr_msg_edge_viter_t i = d_msg_edges.begin(); i != d_msg_edges.end(); i++){
+ if(GR_FLAT_FLOWGRAPH_DEBUG)
+ std::cout << boost::format("flat_fg connecting msg primitives: (%s, %s)->(%s, %s)\n") %
+ i->src().block() % i->src().port() %
+ i->dst().block() % i->dst().port();
+ i->src().block()->message_port_sub( i->src().port(), pmt::pmt_cons(i->dst().block()->alias_pmt(), i->dst().port()) );
+ }
+
}
gr_block_detail_sptr
@@ -350,3 +360,25 @@ gr_flat_flowgraph::make_block_vector(gr_basic_block_vector_t &blocks)
return result;
}
+
+
+void gr_flat_flowgraph::replace_endpoint(const gr_msg_endpoint &e, const gr_msg_endpoint &r, bool is_src){
+ size_t n_replr(0);
+ if(GR_FLAT_FLOWGRAPH_DEBUG)
+ std::cout << boost::format("gr_flat_flowgraph::replace_endpoint( %s, %s, %d )\n") % e.block()% r.block()% is_src;
+ for(size_t i=0; i<d_msg_edges.size(); i++){
+ if(is_src){
+ if(d_msg_edges[i].src() == e){
+ d_msg_edges[i] = gr_msg_edge(r, d_msg_edges[i].dst() );
+ n_replr++;
+ }
+ } else {
+ if(d_msg_edges[i].dst() == e){
+ d_msg_edges[i] = gr_msg_edge(d_msg_edges[i].src(), r );
+ n_replr++;
+ }
+ }
+ }
+// std::cout << "n_repl = " << n_repl <<"\n";
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
index 0926bcc8f..52f202334 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
@@ -46,7 +46,7 @@ public:
// Wire gr_blocks together in new flat_flowgraph
void setup_connections();
-
+
// Merge applicable connections from existing flat flowgraph
void merge_connections(gr_flat_flowgraph_sptr sfg);
@@ -57,6 +57,8 @@ public:
*/
static gr_block_vector_t make_block_vector(gr_basic_block_vector_t &blocks);
+ void replace_endpoint(const gr_msg_endpoint &e, const gr_msg_endpoint &r, bool is_src);
+
private:
gr_flat_flowgraph();
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
index 69c304a3d..63a208480 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
@@ -149,6 +149,16 @@ gr_flowgraph::check_valid_port(gr_io_signature_sptr sig, int port)
}
}
+void
+gr_flowgraph::check_valid_port(const gr_msg_endpoint &e)
+{
+ if (GR_FLOWGRAPH_DEBUG)
+ std::cout << "check_valid_port( " << e.block() << ", " << e.port() << ")\n";
+
+ if(!e.block()->has_msg_port(e.port()))
+ throw std::invalid_argument("invalid msg port in connect() or disconnect()");
+}
+
void
gr_flowgraph::check_dst_not_used(const gr_endpoint &dst)
{
@@ -181,8 +191,10 @@ gr_flowgraph::calc_used_blocks()
gr_basic_block_vector_t tmp;
// make sure free standing message blocks are included
- for (gr_basic_block_vector_t::iterator it=d_msgblocks.begin(); it!=d_msgblocks.end(); it++){
- tmp.push_back(*it);
+ for (gr_msg_edge_viter_t p = d_msg_edges.begin(); p != d_msg_edges.end(); p++) {
+// for now only blocks receiving messages get a thread context - uncomment to allow senders to also obtain one
+// tmp.push_back(p->src().block());
+ tmp.push_back(p->dst().block());
}
// Collect all blocks in the edge list
@@ -477,7 +489,27 @@ gr_flowgraph::topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_ve
output.push_back(block);
}
-void gr_flowgraph::add_msg_block(gr_basic_block_sptr blk){
- d_msgblocks.push_back(blk);
+void gr_flowgraph::connect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst){
+ check_valid_port(src);
+ check_valid_port(dst);
+ for (gr_msg_edge_viter_t p = d_msg_edges.begin(); p != d_msg_edges.end(); p++) {
+ if(p->src() == src && p->dst() == dst){
+ throw std::runtime_error("connect called on already connected edge!");
+ }
+ }
+ d_msg_edges.push_back(gr_msg_edge(src,dst));
}
+void gr_flowgraph::disconnect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst){
+ check_valid_port(src);
+ check_valid_port(dst);
+ for (gr_msg_edge_viter_t p = d_msg_edges.begin(); p != d_msg_edges.end(); p++) {
+ if(p->src() == src && p->dst() == dst){
+ d_msg_edges.erase(p);
+ return;
+ }
+ }
+ throw std::runtime_error("disconnect called on non-connected edge!");
+}
+
+
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
index 860cb0ff1..bef70f626 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
@@ -52,6 +52,31 @@ inline bool gr_endpoint::operator==(const gr_endpoint &other) const
d_port == other.d_port);
}
+class GR_CORE_API gr_msg_endpoint
+{
+private:
+ gr_basic_block_sptr d_basic_block;
+ pmt::pmt_t d_port;
+ bool d_is_hier;
+public:
+ gr_msg_endpoint() : d_basic_block(), d_port(pmt::PMT_NIL) { }
+ gr_msg_endpoint(gr_basic_block_sptr block, pmt::pmt_t port, bool is_hier=false){ d_basic_block = block; d_port = port; d_is_hier = is_hier;}
+ gr_basic_block_sptr block() const { return d_basic_block; }
+ pmt::pmt_t port() const { return d_port; }
+ bool is_hier() const { return d_is_hier; }
+ void set_hier(bool h) { d_is_hier = h; }
+
+ bool operator==(const gr_msg_endpoint &other) const;
+
+};
+
+inline bool gr_msg_endpoint::operator==(const gr_msg_endpoint &other) const
+{
+ return (d_basic_block == other.d_basic_block &&
+ pmt::pmt_equal(d_port, other.d_port));
+}
+
+
// Hold vectors of gr_endpoint objects
typedef std::vector<gr_endpoint> gr_endpoint_vector_t;
typedef std::vector<gr_endpoint>::iterator gr_endpoint_viter_t;
@@ -75,11 +100,35 @@ private:
gr_endpoint d_dst;
};
+
// Hold vectors of gr_edge objects
typedef std::vector<gr_edge> gr_edge_vector_t;
typedef std::vector<gr_edge>::iterator gr_edge_viter_t;
+/*!
+ *\brief Class representing a msg connection between to graph msg endpoints
+ *
+ */
+class GR_CORE_API gr_msg_edge
+{
+public:
+ gr_msg_edge() : d_src(), d_dst() { };
+ gr_msg_edge(const gr_msg_endpoint &src, const gr_msg_endpoint &dst) : d_src(src), d_dst(dst) { }
+ ~gr_msg_edge() {}
+
+ const gr_msg_endpoint &src() const { return d_src; }
+ const gr_msg_endpoint &dst() const { return d_dst; }
+
+private:
+ gr_msg_endpoint d_src;
+ gr_msg_endpoint d_dst;
+};
+
+// Hold vectors of gr_edge objects
+typedef std::vector<gr_msg_edge> gr_msg_edge_vector_t;
+typedef std::vector<gr_msg_edge>::iterator gr_msg_edge_viter_t;
+
// Create a shared pointer to a heap allocated flowgraph
// (types defined in gr_runtime_types.h)
GR_CORE_API gr_flowgraph_sptr gr_make_flowgraph();
@@ -110,7 +159,11 @@ public:
void disconnect(gr_basic_block_sptr src_block, int src_port,
gr_basic_block_sptr dst_block, int dst_port);
- void add_msg_block(gr_basic_block_sptr blk);
+ // Connect two msg endpoints
+ void connect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst);
+
+ // Disconnect two msg endpoints
+ void disconnect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst);
// Validate connectivity, raise exception if invalid
void validate();
@@ -120,6 +173,9 @@ public:
// Return vector of edges
const gr_edge_vector_t &edges() const { return d_edges; }
+
+ // Return vector of msg edges
+ const gr_msg_edge_vector_t &msg_edges() const { return d_msg_edges; }
// Return vector of connected blocks
gr_basic_block_vector_t calc_used_blocks();
@@ -130,11 +186,11 @@ public:
// Return vector of vectors of disjointly connected blocks, topologically
// sorted.
std::vector<gr_basic_block_vector_t> partition();
- gr_basic_block_vector_t d_msgblocks;
protected:
gr_basic_block_vector_t d_blocks;
gr_edge_vector_t d_edges;
+ gr_msg_edge_vector_t d_msg_edges;
gr_flowgraph();
std::vector<int> calc_used_ports(gr_basic_block_sptr block, bool check_inputs);
@@ -146,6 +202,7 @@ protected:
private:
void check_valid_port(gr_io_signature_sptr sig, int port);
+ void check_valid_port(const gr_msg_endpoint &e);
void check_dst_not_used(const gr_endpoint &dst);
void check_type_match(const gr_endpoint &src, const gr_endpoint &dst);
gr_edge_vector_t calc_connections(gr_basic_block_sptr block, bool check_inputs); // false=use outputs
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
index a19bfe195..8c2794c63 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
@@ -44,7 +44,9 @@ gr_hier_block2::gr_hier_block2(const std::string &name,
gr_io_signature_sptr input_signature,
gr_io_signature_sptr output_signature)
: gr_basic_block(name, input_signature, output_signature),
- d_detail(new gr_hier_block2_detail(this))
+ d_detail(new gr_hier_block2_detail(this)),
+ hier_message_ports_in(pmt::PMT_NIL),
+ hier_message_ports_out(pmt::PMT_NIL)
{
// This bit of magic ensures that self() works in the constructors of derived classes.
gnuradio::detail::sptr_magic::create_and_stash_initial_sptr(this);
@@ -141,6 +143,7 @@ gr_hier_block2::unlock()
d_detail->unlock();
}
+
gr_flat_flowgraph_sptr
gr_hier_block2::flatten() const
{
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.h b/gnuradio-core/src/lib/runtime/gr_hier_block2.h
index e8364a740..f80dd73e4 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.h
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.h
@@ -166,6 +166,39 @@ public:
gr_flat_flowgraph_sptr flatten() const;
gr_hier_block2_sptr to_hier_block2(); // Needed for Python type coercion
+
+ bool has_msg_port(pmt::pmt_t which_port){
+ return message_port_is_hier(which_port) || gr_basic_block::has_msg_port(which_port);
+ }
+
+ bool message_port_is_hier(pmt::pmt_t port_id){
+ return message_port_is_hier_in(port_id) || message_port_is_hier_out(port_id);
+ }
+ bool message_port_is_hier_in(pmt::pmt_t port_id){
+ return pmt::pmt_list_has(hier_message_ports_in, port_id);
+ }
+ bool message_port_is_hier_out(pmt::pmt_t port_id){
+ return pmt::pmt_list_has(hier_message_ports_out, port_id);
+ }
+
+ pmt::pmt_t hier_message_ports_in;
+ pmt::pmt_t hier_message_ports_out;
+
+ void message_port_register_hier_in(pmt::pmt_t port_id){
+ if(pmt::pmt_list_has(hier_message_ports_in, port_id))
+ throw std::invalid_argument("hier msg in port by this name already registered");
+ if(msg_queue.find(port_id) != msg_queue.end())
+ throw std::invalid_argument("block already has a primitive input port by this name");
+ hier_message_ports_in = pmt::pmt_list_add(hier_message_ports_in, port_id);
+ }
+ void message_port_register_hier_out(pmt::pmt_t port_id){
+ if(pmt::pmt_list_has(hier_message_ports_out, port_id))
+ throw std::invalid_argument("hier msg out port by this name already registered");
+ if(pmt::pmt_dict_has_key(message_subscribers, port_id))
+ throw std::invalid_argument("block already has a primitive output port by this name");
+ hier_message_ports_out = pmt::pmt_list_add(hier_message_ports_out, port_id);
+ }
+
};
inline gr_hier_block2_sptr cast_to_hier_block2_sptr(gr_basic_block_sptr block) {
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.i b/gnuradio-core/src/lib/runtime/gr_hier_block2.i
index 7c0e62f28..a857394ca 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.i
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.i
@@ -40,6 +40,8 @@ gr_hier_block2_sptr gr_make_hier_block2(const std::string name,
%rename(primitive_disconnect) gr_hier_block2::disconnect;
%rename(primitive_msg_connect) gr_hier_block2::msg_connect;
%rename(primitive_msg_disconnect) gr_hier_block2::msg_disconnect;
+%rename(primitive_message_port_register_hier_in) gr_hier_block2::message_port_register_hier_in;
+%rename(primitive_message_port_register_hier_out) gr_hier_block2::message_port_register_hier_out;
class gr_hier_block2 : public gr_basic_block
{
@@ -78,5 +80,9 @@ public:
void lock();
void unlock();
+ void message_port_register_hier_in(pmt::pmt_t port_id);
+ void message_port_register_hier_out(pmt::pmt_t port_id);
+
+
gr_hier_block2_sptr to_hier_block2(); // Needed for Python type coercion
};
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
index ff2a5db8c..e70553ddc 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
@@ -27,6 +27,7 @@
#include <gr_io_signature.h>
#include <stdexcept>
#include <sstream>
+#include <boost/format.hpp>
#define GR_HIER_BLOCK2_DETAIL_DEBUG 0
@@ -53,6 +54,7 @@ gr_hier_block2_detail::gr_hier_block2_detail(gr_hier_block2 *owner) :
d_outputs = gr_endpoint_vector_t(max_outputs);
}
+
gr_hier_block2_detail::~gr_hier_block2_detail()
{
d_owner = 0; // Don't use delete, we didn't allocate
@@ -151,15 +153,39 @@ gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
std::cout << "connecting message port..." << std::endl;
// register the subscription
- src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+// this is done later...
+// src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
// add block uniquely to list to internal blocks
if (std::find(d_blocks.begin(), d_blocks.end(), dst) == d_blocks.end()){
+ d_blocks.push_back(src);
d_blocks.push_back(dst);
}
- // make sure we instantiate a thread for this block
- d_fg->add_msg_block(dst);
+ bool hier_out = (d_owner == src.get()) && src->message_port_is_hier_out(srcport);;
+ bool hier_in = (d_owner == dst.get()) && dst->message_port_is_hier_in(dstport);
+
+ gr_hier_block2_sptr src_block(cast_to_hier_block2_sptr(src));
+ gr_hier_block2_sptr dst_block(cast_to_hier_block2_sptr(dst));
+
+ if (src_block && src.get() != d_owner) {
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << "connect: src is hierarchical, setting parent to " << this << std::endl;
+ src_block->d_detail->d_parent_detail = this;
+ }
+
+ if (dst_block && dst.get() != d_owner) {
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << "connect: dst is hierarchical, setting parent to " << this << std::endl;
+ dst_block->d_detail->d_parent_detail = this;
+ }
+
+ // add edge for this message connection
+ if(GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << boost::format("connect( (%s, %s, %d), (%s, %s, %d) )\n") %
+ src % srcport % hier_out %
+ dst % dstport % hier_in;
+ d_fg->connect( gr_msg_endpoint(src, srcport, hier_out), gr_msg_endpoint(dst, dstport, hier_in));
}
void
@@ -169,8 +195,13 @@ gr_hier_block2_detail::msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcpor
if (GR_HIER_BLOCK2_DETAIL_DEBUG)
std::cout << "disconnecting message port..." << std::endl;
- // register the subscription
+ // unregister the subscription - if already subscribed
src->message_port_unsub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+
+ // remove edge for this message connection
+ bool hier_out = (d_owner == src.get()) && src->message_port_is_hier_out(srcport);;
+ bool hier_in = (d_owner == dst.get()) && dst->message_port_is_hier_in(dstport);
+ d_fg->disconnect( gr_msg_endpoint(src, srcport, hier_out), gr_msg_endpoint(dst, dstport, hier_in));
}
void
@@ -435,11 +466,16 @@ void
gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
{
if (GR_HIER_BLOCK2_DETAIL_DEBUG)
- std::cout << "Flattening " << d_owner->name() << std::endl;
+ std::cout << " ** Flattening " << d_owner->name() << std::endl;
// Add my edges to the flow graph, resolving references to actual endpoints
gr_edge_vector_t edges = d_fg->edges();
+ gr_msg_edge_vector_t msg_edges = d_fg->msg_edges();
gr_edge_viter_t p;
+ gr_msg_edge_viter_t q,u;
+
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << "Flattening stream connections: " << std::endl;
for (p = edges.begin(); p != edges.end(); p++) {
if (GR_HIER_BLOCK2_DETAIL_DEBUG)
@@ -457,7 +493,46 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
}
}
}
- sfg->d_msgblocks = d_fg->d_msgblocks;
+
+ // loop through flattening hierarchical connections
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << "Flattening msg connections: " << std::endl;
+
+ for(q = msg_edges.begin(); q != msg_edges.end(); q++) {
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << boost::format(" flattening edge ( %s, %s, %d) -> ( %s, %s, %d)\n") % q->src().block() % q->src().port() % q->src().is_hier() % q->dst().block() % q->dst().port() % q->dst().is_hier();
+
+ bool normal_connection = true;
+
+ // resolve existing connections to hier ports
+ if(q->dst().is_hier()){
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << boost::format(" resolve hier output (%s, %s)") % q->dst().block() % q->dst().port() << std::endl;
+ sfg->replace_endpoint( q->dst(), q->src(), true );
+ normal_connection = false;
+ }
+
+ if(q->src().is_hier()){
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << boost::format(" resolve hier input (%s, %s)") % q->src().block() % q->src().port() << std::endl;
+ sfg->replace_endpoint( q->src(), q->dst(), false );
+ normal_connection = false;
+ }
+
+ // propogate non hier connections through
+ if(normal_connection){
+ sfg->connect( q->src(), q->dst() );
+ }
+ }
+
+/* // connect primitive edges in the new fg
+ for(q = msg_edges.begin(); q != msg_edges.end(); q++) {
+ if( (!q->src().is_hier()) && (!q->dst().is_hier()) ){
+ sfg->connect( q->src(), q->dst() );
+ } else {
+ std::cout << "not connecting hier connection!" << std::endl;
+ }
+ }*/
// Construct unique list of blocks used either in edges, inputs,
// outputs, or by themselves. I still hate STL.
@@ -499,7 +574,7 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
// Recurse hierarchical children
for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
gr_hier_block2_sptr hier_block2(cast_to_hier_block2_sptr(*p));
- if (hier_block2) {
+ if (hier_block2 && (hier_block2.get() != d_owner)) {
if (GR_HIER_BLOCK2_DETAIL_DEBUG)
std::cout << "flatten_aux: recursing into hierarchical block " << hier_block2 << std::endl;
hier_block2->d_detail->flatten_aux(sfg);
@@ -530,3 +605,4 @@ gr_hier_block2_detail::unlock()
else
d_owner->unlock();
}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
index f2d2b3c4e..b38dae301 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
@@ -61,6 +61,7 @@ private:
gr_endpoint_vector_t d_outputs; // Single internal endpoint per external output
gr_basic_block_vector_t d_blocks;
+
void connect_input(int my_port, int port, gr_basic_block_sptr block);
void connect_output(int my_port, int port, gr_basic_block_sptr block);
void disconnect_input(int my_port, int port, gr_basic_block_sptr block);