diff options
31 files changed, 492 insertions, 112 deletions
diff --git a/gnuradio-core/src/lib/general/gr_nop.cc b/gnuradio-core/src/lib/general/gr_nop.cc index ca5983c39..edfe1d76d 100644 --- a/gnuradio-core/src/lib/general/gr_nop.cc +++ b/gnuradio-core/src/lib/general/gr_nop.cc @@ -40,7 +40,8 @@ gr_nop::gr_nop (size_t sizeof_stream_item) d_nmsgs_recvd(0) { // Arrange to have count_received_msgs called when messages are received. - set_msg_handler(boost::bind(&gr_nop::count_received_msgs, this, _1)); + message_port_register_in(pmt::mp("port")); + set_msg_handler(pmt::mp("port"), boost::bind(&gr_nop::count_received_msgs, this, _1)); } // Trivial message handler that just counts them. diff --git a/gnuradio-core/src/lib/runtime/CMakeLists.txt b/gnuradio-core/src/lib/runtime/CMakeLists.txt index 5f3672dde..70938a0f1 100644 --- a/gnuradio-core/src/lib/runtime/CMakeLists.txt +++ b/gnuradio-core/src/lib/runtime/CMakeLists.txt @@ -54,6 +54,7 @@ list(APPEND gnuradio_core_sources ${CMAKE_CURRENT_SOURCE_DIR}/gr_block.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_detail.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_executor.cc + ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_registry.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2_detail.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_buffer.cc @@ -116,6 +117,7 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/gr_block.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_detail.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_executor.h + ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_registry.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2_detail.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_buffer.h diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc index d7263b92d..3d08b63d1 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc @@ -25,7 +25,9 @@ #endif #include <gr_basic_block.h> +#include <gr_block_registry.h> #include <stdexcept> +#include <sstream> using namespace pmt; @@ -45,7 +47,10 @@ gr_basic_block::gr_basic_block(const std::string &name, d_input_signature(input_signature), d_output_signature(output_signature), d_unique_id(s_next_id++), - d_color(WHITE) + d_symbolic_id(global_block_registry.block_register(this)), + d_symbol_name(global_block_registry.register_symbolic_name(this)), + d_color(WHITE), + message_subscribers(pmt::pmt_make_dict()) { s_ncurrently_allocated++; } @@ -53,6 +58,7 @@ gr_basic_block::gr_basic_block(const std::string &name, gr_basic_block::~gr_basic_block() { s_ncurrently_allocated--; + global_block_registry.block_unregister(this); } gr_basic_block_sptr @@ -60,3 +66,112 @@ gr_basic_block::to_basic_block() { return shared_from_this(); } + +void +gr_basic_block::set_block_alias(std::string name) +{ + global_block_registry.register_symbolic_name(this, name); +} + +// ** Message passing interface ** + +// - register a new input message port +void gr_basic_block::message_port_register_in(pmt::pmt_t port_id){ + msg_queue[port_id] = msg_queue_t(); + } + +// - register a new output message port +void gr_basic_block::message_port_register_out(pmt::pmt_t port_id){ + if(!pmt::pmt_is_symbol(port_id)){ throw std::runtime_error("bad port id"); } + if(pmt::pmt_dict_has_key(message_subscribers, port_id)){ throw std::runtime_error("port already in use"); } + message_subscribers = pmt::pmt_dict_add(message_subscribers, port_id, pmt::PMT_NIL); + } + +// - publish a message on a message port +void gr_basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg){ + if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ throw std::runtime_error("port does not exist"); } + pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); + // iterate through subscribers on port + while( pmt::pmt_is_pair(currlist) ){ + pmt::pmt_t target = pmt::pmt_car(currlist); + + pmt::pmt_t block = pmt::pmt_car(target); + pmt::pmt_t port = pmt::pmt_cdr(target); + + currlist = pmt::pmt_cdr(currlist); + gr_basic_block_sptr blk = global_block_registry.block_lookup(block); + //blk->post(msg); + blk->post(port, msg); + } + } + +// - subscribe to a message port +void gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){ + if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ + std::stringstream ss; + ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl; + throw std::runtime_error(ss.str()); + } + pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); + message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target)); + } + +void gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){ + if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ + std::stringstream ss; + ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl; + throw std::runtime_error(ss.str()); + } + pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); + message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_rm(currlist,target)); + } + +void +gr_basic_block::_post(pmt_t which_port, pmt_t msg) +{ + insert_tail(which_port, msg); + //notify_msg(); +} + +void +gr_basic_block::insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg) +{ + gruel::scoped_lock guard(mutex); + + msg_queue[which_port].push_back(msg); + + // wake up thread if BLKD_IN or BLKD_OUT + //input_cond.notify_one(); + //output_cond.notify_one(); + // TODO: reconsider the need for notification of input and output conditions! +} + +pmt_t +gr_basic_block::delete_head_nowait(pmt::pmt_t which_port) +{ + gruel::scoped_lock guard(mutex); + + if (empty_p(which_port)) + return pmt_t(); + + pmt_t m(msg_queue[which_port].front()); + msg_queue[which_port].pop_front(); + + return m; +} + +/* + * Caller must already be holding the mutex + */ +pmt_t +gr_basic_block::delete_head_nowait_already_holding_mutex(pmt::pmt_t which_port) +{ + if (empty_p(which_port)) + return pmt_t(); + + pmt_t m(msg_queue[which_port].front()); + msg_queue[which_port].pop_front(); + + return m; +} + diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h index cb6a983c4..31026a2e4 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.h +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h @@ -30,7 +30,11 @@ #include <boost/function.hpp> #include <gr_msg_accepter.h> #include <string> +#include <deque> +#include <map> #include <gr_io_signature.h> +#include <gruel/thread.h> +#include <boost/foreach.hpp> /*! * \brief The abstract base class for all signal processing blocks. @@ -54,14 +58,23 @@ private: * The thread-safety guarantees mentioned in set_msg_handler are implemented * by the callers of this method. */ - void dispatch_msg(pmt::pmt_t msg) + void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) { - if (d_msg_handler) // Is there a handler? - d_msg_handler(msg); // Yes, invoke it. + // AA Update this + if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler? + d_msg_handlers[which_port](msg); // Yes, invoke it. }; - msg_handler_t d_msg_handler; + //msg_handler_t d_msg_handler; + typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t; + d_msg_handlers_t d_msg_handlers; + typedef std::deque<pmt::pmt_t> msg_queue_t; + typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t; + msg_queue_map_t msg_queue; + gruel::mutex mutex; //< protects all vars + + protected: friend class gr_flowgraph; friend class gr_flat_flowgraph; // TODO: will be redundant @@ -73,6 +86,9 @@ protected: gr_io_signature_sptr d_input_signature; gr_io_signature_sptr d_output_signature; long d_unique_id; + long d_symbolic_id; + std::string d_symbol_name; + std::string d_symbol_alias; vcolor d_color; gr_basic_block(void){} //allows pure virtual interface sub-classes @@ -98,13 +114,65 @@ protected: void set_color(vcolor color) { d_color = color; } vcolor color() const { return d_color; } + // Message passing interface + std::vector<pmt::pmt_t> message_inputs; + pmt::pmt_t message_subscribers; + public: virtual ~gr_basic_block(); long unique_id() const { return d_unique_id; } + long symbolic_id() const { return d_symbolic_id; } std::string name() const { return d_name; } + std::string symbol_name() const { return d_symbol_name; } gr_io_signature_sptr input_signature() const { return d_input_signature; } gr_io_signature_sptr output_signature() const { return d_output_signature; } gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion + bool alias_set() { return !d_symbol_alias.empty(); } + std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); } + pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); } + void set_block_alias(std::string name); + + // ** Message passing interface ** + void message_port_register_in(pmt::pmt_t port_id); + void message_port_register_out(pmt::pmt_t port_id); + void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg); + void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target); + void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target); + + /*! + * Accept msg, place in queue, arrange for thread to be awakened if it's not already. + */ + void _post(pmt::pmt_t which_port, pmt::pmt_t msg); + + + //! is the queue empty? + //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); } + bool empty_p(pmt::pmt_t which_port) { return msg_queue[which_port].empty(); } + bool empty_p() { + bool rv = true; + BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); } + return rv; + } + + //| Acquires and release the mutex + void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg); + /*! + * \returns returns pmt at head of queue or pmt_t() if empty. + */ + pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port); + /*! + * \returns returns pmt at head of queue or pmt_t() if empty. + * Caller must already be holding the mutex + */ + pmt::pmt_t delete_head_nowait_already_holding_mutex( pmt::pmt_t which_port); + + msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){ + return msg_queue[which_port].begin(); + } + void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){ + msg_queue[which_port].erase(it); + } + /*! * \brief Confirm that ninputs and noutputs is an acceptable combination. @@ -147,8 +215,13 @@ public: * If the block inherits from gr_hier_block2, the runtime system will * ensure that no reentrant calls are made to msg_handler. */ - template <typename T> void set_msg_handler(T msg_handler){ - d_msg_handler = msg_handler_t(msg_handler); + //template <typename T> void set_msg_handler(T msg_handler){ + // d_msg_handler = msg_handler_t(msg_handler); + //} + template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){ + if(msg_queue.find(which_port) == msg_queue.end()){ + throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); } + d_msg_handlers[which_port] = msg_handler_t(msg_handler); } }; diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.i b/gnuradio-core/src/lib/runtime/gr_basic_block.i index e43cc114c..0a8473ba2 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.i +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.i @@ -37,11 +37,14 @@ protected: public: virtual ~gr_basic_block(); std::string name() const; + std::string symbol_name() const; gr_io_signature_sptr input_signature() const; gr_io_signature_sptr output_signature() const; long unique_id() const; gr_basic_block_sptr to_basic_block(); bool check_topology (int ninputs, int noutputs); + std::string alias(); + void set_block_alias(std::string name); }; %rename(block_ncurrently_allocated) gr_basic_block_ncurrently_allocated; diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc index 2792cd471..337c9518e 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc @@ -127,12 +127,6 @@ gr_block_detail::produce_each (int how_many_items) } -void -gr_block_detail::_post(pmt_t msg) -{ - d_tpb.insert_tail(msg); -} - uint64_t gr_block_detail::nitems_read(unsigned int which_input) { diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h index c96f00db8..16d9f4d42 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_detail.h +++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h @@ -85,11 +85,6 @@ class GR_CORE_API gr_block_detail { */ void produce_each (int how_many_items); - /*! - * Accept msg, place in queue, arrange for thread to be awakened if it's not already. - */ - void _post(pmt::pmt_t msg); - // Return the number of items read on input stream which_input uint64_t nitems_read(unsigned int which_input); diff --git a/gnuradio-core/src/lib/runtime/gr_block_registry.cc b/gnuradio-core/src/lib/runtime/gr_block_registry.cc new file mode 100644 index 000000000..2478e0019 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_block_registry.cc @@ -0,0 +1,58 @@ +#include <gr_basic_block.h> +#include <gr_block_registry.h> + +gr_block_registry global_block_registry; + +gr_block_registry::gr_block_registry(){ + d_ref_map = pmt::pmt_make_dict(); +} + +long gr_block_registry::block_register(gr_basic_block* block){ + if(d_map.find(block->name()) == d_map.end()){ + d_map[block->name()] = blocksubmap_t(); + d_map[block->name()][0] = block; + return 0; + } else { + for(size_t i=0; i<=d_map[block->name()].size(); i++){ + if(d_map[block->name()].find(i) == d_map[block->name()].end()){ + d_map[block->name()][i] = block; + return i; + } + } + } + throw std::runtime_error("should not reach this"); +} + +void gr_block_registry::block_unregister(gr_basic_block* block){ + d_map[block->name()].erase( d_map[block->name()].find(block->symbolic_id())); + d_ref_map = pmt::pmt_dict_delete(d_ref_map, pmt::pmt_intern(block->symbol_name())); + if(block->alias_set()){ + d_ref_map = pmt::pmt_dict_delete(d_ref_map, pmt::pmt_intern(block->alias())); + } +} + +std::string gr_block_registry::register_symbolic_name(gr_basic_block* block){ + std::stringstream ss; + ss << block->name() << block->symbolic_id(); + //std::cout << "register_symbolic_name: " << ss.str() << std::endl; + register_symbolic_name(block, ss.str()); + return ss.str(); +} + +void gr_block_registry::register_symbolic_name(gr_basic_block* block, std::string name){ + if(pmt_dict_has_key(d_ref_map, pmt::pmt_intern(name))){ + throw std::runtime_error("symbol already exists, can not re-use!"); + } + d_ref_map = pmt_dict_add(d_ref_map, pmt::pmt_intern(name), pmt::pmt_make_any(block)); +} + +gr_basic_block_sptr gr_block_registry::block_lookup(pmt::pmt_t symbol){ + pmt::pmt_t ref = pmt_dict_ref(d_ref_map, symbol, pmt::PMT_NIL); + if(pmt::pmt_eq(ref, pmt::PMT_NIL)){ + throw std::runtime_error("block lookup failed! block not found!"); + } + gr_basic_block* blk = boost::any_cast<gr_basic_block*>( pmt::pmt_any_ref(ref) ); + return blk->shared_from_this(); +} + + diff --git a/gnuradio-core/src/lib/runtime/gr_block_registry.h b/gnuradio-core/src/lib/runtime/gr_block_registry.h new file mode 100644 index 000000000..8f1982984 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_block_registry.h @@ -0,0 +1,36 @@ +#ifndef GR_BLOCK_REGISTRY_H +#define GR_BLOCK_REGISTRY_H + +#include <map> + +#ifndef GR_BASIC_BLOCK_H +class gr_basic_block; +#endif + +class gr_block_registry { + public: + gr_block_registry(); + + long block_register(gr_basic_block* block); + void block_unregister(gr_basic_block* block); + + std::string register_symbolic_name(gr_basic_block* block); + void register_symbolic_name(gr_basic_block* block, std::string name); + + gr_basic_block_sptr block_lookup(pmt::pmt_t symbol); + + private: + + //typedef std::map< long, gr_basic_block_sptr > blocksubmap_t; + typedef std::map< long, gr_basic_block* > blocksubmap_t; + typedef std::map< std::string, blocksubmap_t > blockmap_t; + + blockmap_t d_map; + pmt::pmt_t d_ref_map; + +}; + +extern gr_block_registry global_block_registry; + +#endif + diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc index 756852df8..a19bfe195 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc @@ -81,6 +81,36 @@ gr_hier_block2::connect(gr_basic_block_sptr src, int src_port, } void +gr_hier_block2::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, + gr_basic_block_sptr dst, pmt::pmt_t dstport) +{ + if(!pmt::pmt_is_symbol(srcport)){throw std::runtime_error("bad port id"); } + d_detail->msg_connect(src, srcport, dst, dstport); +} + +void +gr_hier_block2::msg_connect(gr_basic_block_sptr src, std::string srcport, + gr_basic_block_sptr dst, std::string dstport) +{ + d_detail->msg_connect(src, pmt::mp(srcport), dst, pmt::mp(dstport)); +} + +void +gr_hier_block2::msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport, + gr_basic_block_sptr dst, pmt::pmt_t dstport) +{ + if(!pmt::pmt_is_symbol(srcport)){throw std::runtime_error("bad port id"); } + d_detail->msg_disconnect(src, srcport, dst, dstport); +} + +void +gr_hier_block2::msg_disconnect(gr_basic_block_sptr src, std::string srcport, + gr_basic_block_sptr dst, std::string dstport) +{ + d_detail->msg_disconnect(src, pmt::mp(srcport), dst, pmt::mp(dstport)); +} + +void gr_hier_block2::disconnect(gr_basic_block_sptr block) { d_detail->disconnect(block); diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.h b/gnuradio-core/src/lib/runtime/gr_hier_block2.h index 123178724..e8364a740 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2.h +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.h @@ -98,6 +98,21 @@ public: gr_basic_block_sptr dst, int dst_port); /*! + * \brief Add gr-blocks or hierarchical blocks to internal graph and wire together + * + * This adds (if not done earlier by another connect) a pair of gr-blocks or + * hierarchical blocks to the internal message port subscription + */ + void msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, + gr_basic_block_sptr dst, pmt::pmt_t dstport); + void msg_connect(gr_basic_block_sptr src, std::string srcport, + gr_basic_block_sptr dst, std::string dstport); + void msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport, + gr_basic_block_sptr dst, pmt::pmt_t dstport); + void msg_disconnect(gr_basic_block_sptr src, std::string srcport, + gr_basic_block_sptr dst, std::string dstport); + + /*! * \brief Remove a gr-block or hierarchical block from the internal flowgraph. * * This removes a gr-block or hierarchical block from the internal flowgraph, diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.i b/gnuradio-core/src/lib/runtime/gr_hier_block2.i index eefb965b4..7c0e62f28 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2.i +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.i @@ -38,6 +38,8 @@ gr_hier_block2_sptr gr_make_hier_block2(const std::string name, // better interface in scripting land. %rename(primitive_connect) gr_hier_block2::connect; %rename(primitive_disconnect) gr_hier_block2::disconnect; +%rename(primitive_msg_connect) gr_hier_block2::msg_connect; +%rename(primitive_msg_disconnect) gr_hier_block2::msg_disconnect; class gr_hier_block2 : public gr_basic_block { @@ -54,6 +56,19 @@ public: void connect(gr_basic_block_sptr src, int src_port, gr_basic_block_sptr dst, int dst_port) throw (std::invalid_argument); + void msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, + gr_basic_block_sptr dst, pmt::pmt_t dstport) + throw (std::runtime_error); + void msg_connect(gr_basic_block_sptr src, std::string srcport, + gr_basic_block_sptr dst, std::string dstport) + throw (std::runtime_error); + void msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport, + gr_basic_block_sptr dst, pmt::pmt_t dstport) + throw (std::runtime_error); + void msg_disconnect(gr_basic_block_sptr src, std::string srcport, + gr_basic_block_sptr dst, std::string dstport) + throw (std::runtime_error); + void disconnect(gr_basic_block_sptr block) throw (std::invalid_argument); void disconnect(gr_basic_block_sptr src, int src_port, diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc index 76c5ce06f..099b2f8e8 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc @@ -144,6 +144,28 @@ gr_hier_block2_detail::connect(gr_basic_block_sptr src, int src_port, } void +gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, + gr_basic_block_sptr dst, pmt::pmt_t dstport) +{ + if (GR_HIER_BLOCK2_DETAIL_DEBUG) + std::cout << "connecting message port..." << std::endl; + + // register the subscription + src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport)); +} + +void +gr_hier_block2_detail::msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport, + gr_basic_block_sptr dst, pmt::pmt_t dstport) +{ + if (GR_HIER_BLOCK2_DETAIL_DEBUG) + std::cout << "disconnecting message port..." << std::endl; + + // register the subscription + src->message_port_unsub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport)); +} + +void gr_hier_block2_detail::disconnect(gr_basic_block_sptr block) { // Check on singleton list diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h index f4f950e9d..f2d2b3c4e 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h @@ -39,6 +39,10 @@ public: void connect(gr_basic_block_sptr block); void connect(gr_basic_block_sptr src, int src_port, gr_basic_block_sptr dst, int dst_port); + void msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, + gr_basic_block_sptr dst, pmt::pmt_t dstport); + void msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport, + gr_basic_block_sptr dst, pmt::pmt_t dstport); void disconnect(gr_basic_block_sptr block); void disconnect(gr_basic_block_sptr, int src_port, gr_basic_block_sptr, int dst_port); diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc index 5018ee9e6..93d5fb20e 100644 --- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc +++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc @@ -41,12 +41,12 @@ gr_msg_accepter::~gr_msg_accepter() } void -gr_msg_accepter::post(pmt_t msg) +gr_msg_accepter::post(pmt_t which_port, pmt_t msg) { // Notify derived class, handled case by case gr_block *p = dynamic_cast<gr_block *>(this); if (p) { - p->detail()->_post(msg); + p->_post(which_port,msg); return; } gr_hier_block2 *p2 = dynamic_cast<gr_hier_block2 *>(this); diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h index 3e5c97638..a497ba6e7 100644 --- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h +++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h @@ -36,7 +36,7 @@ public: gr_msg_accepter(); ~gr_msg_accepter(); - void post(pmt::pmt_t msg); + void post(pmt::pmt_t which_port, pmt::pmt_t msg); }; diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc index 46b33d91f..46eb6bbe0 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc @@ -68,43 +68,3 @@ gr_tpb_detail::notify_neighbors(gr_block_detail *d) notify_upstream(d); } -void -gr_tpb_detail::insert_tail(pmt::pmt_t msg) -{ - gruel::scoped_lock guard(mutex); - - msg_queue.push_back(msg); - - // wake up thread if BLKD_IN or BLKD_OUT - input_cond.notify_one(); - output_cond.notify_one(); -} - -pmt_t -gr_tpb_detail::delete_head_nowait() -{ - gruel::scoped_lock guard(mutex); - - if (empty_p()) - return pmt_t(); - - pmt_t m(msg_queue.front()); - msg_queue.pop_front(); - - return m; -} - -/* - * Caller must already be holding the mutex - */ -pmt_t -gr_tpb_detail::delete_head_nowait_already_holding_mutex() -{ - if (empty_p()) - return pmt_t(); - - pmt_t m(msg_queue.front()); - msg_queue.pop_front(); - - return m; -} diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h index b6e342dee..69feb6007 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h +++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h @@ -39,9 +39,6 @@ struct GR_CORE_API gr_tpb_detail { bool output_changed; gruel::condition_variable output_cond; -private: - std::deque<pmt::pmt_t> msg_queue; - public: gr_tpb_detail() : input_changed(false), output_changed(false) { } @@ -55,6 +52,12 @@ public: //! Called by us to notify both upstream and downstream void notify_neighbors(gr_block_detail *d); + //! Called by pmt msg posters + void notify_msg(){ + input_cond.notify_one(); + output_cond.notify_one(); + } + //! Called by us void clear_changed() { @@ -63,23 +66,6 @@ public: output_changed = false; } - //! is the queue empty? - bool empty_p() const { return msg_queue.empty(); } - - //| Acquires and release the mutex - void insert_tail(pmt::pmt_t msg); - - /*! - * \returns returns pmt at head of queue or pmt_t() if empty. - */ - pmt::pmt_t delete_head_nowait(); - - /*! - * \returns returns pmt at head of queue or pmt_t() if empty. - * Caller must already be holding the mutex - */ - pmt::pmt_t delete_head_nowait_already_holding_mutex(); - private: //! Used by notify_downstream diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc index a5aabb379..1bd3014ad 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc +++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc @@ -25,6 +25,7 @@ #include <iostream> #include <boost/thread.hpp> #include <gruel/pmt.h> +#include <boost/foreach.hpp> using namespace pmt; @@ -42,8 +43,14 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item boost::this_thread::interruption_point(); // handle any queued up messages - while ((msg = d->d_tpb.delete_head_nowait())) - block->dispatch_msg(msg); + //BOOST_FOREACH( pmt::pmt_t port, block->msg_queue.keys() ) + + BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue ) + { + while ((msg = block->delete_head_nowait(i.first))){ + block->dispatch_msg(i.first,msg); + } + } d->d_tpb.clear_changed(); s = d_exec.run_one_iteration(); @@ -67,15 +74,18 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item while (!d->d_tpb.input_changed){ // wait for input or message - while(!d->d_tpb.input_changed && d->d_tpb.empty_p()) + while(!d->d_tpb.input_changed && block->empty_p()) d->d_tpb.input_cond.wait(guard); // handle all pending messages - while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){ - guard.unlock(); // release lock while processing msg - block->dispatch_msg(msg); - guard.lock(); - } + BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue ) + { + while ((msg = block->delete_head_nowait_already_holding_mutex(i.first))){ + guard.unlock(); // release lock while processing msg + block->dispatch_msg(i.first, msg); + guard.lock(); + } + } } } break; @@ -87,15 +97,18 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item while (!d->d_tpb.output_changed){ // wait for output room or message - while(!d->d_tpb.output_changed && d->d_tpb.empty_p()) + while(!d->d_tpb.output_changed && block->empty_p()) d->d_tpb.output_cond.wait(guard); // handle all pending messages - while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){ - guard.unlock(); // release lock while processing msg - block->dispatch_msg(msg); - guard.lock(); - } + BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue ) + { + while ((msg = block->delete_head_nowait_already_holding_mutex(i.first))){ + guard.unlock(); // release lock while processing msg + block->dispatch_msg(i.first,msg); + guard.lock(); + } + } } } break; diff --git a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc index 25ae0b1e1..dc8f0f8a9 100644 --- a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc +++ b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc @@ -65,14 +65,15 @@ void qa_set_msg_handler::t0() tb->start(); // Send them... + pmt_t port(pmt_intern("port")); for (int i = 0; i < NMSGS; i++){ - send(nop, mp(mp("example-msg"), mp(i))); + send(nop, port, mp(mp("example-msg"), mp(i))); } // And send a message to null_source to confirm that the default // message handling action (which should be a nop) doesn't dump // core. - send(src, mp(mp("example-msg"), mp(0))); + send(src, port, mp(mp("example-msg"), mp(0))); // Give the messages a chance to be processed boost::this_thread::sleep(boost::posix_time::milliseconds(100)); diff --git a/gnuradio-core/src/python/gnuradio/gr/top_block.py b/gnuradio-core/src/python/gnuradio/gr/top_block.py index 43af8073b..dc1f443aa 100644 --- a/gnuradio-core/src/python/gnuradio/gr/top_block.py +++ b/gnuradio-core/src/python/gnuradio/gr/top_block.py @@ -123,6 +123,12 @@ class top_block(object): for i in range (1, len (points)): self._connect(points[i-1], points[i]) + def msg_connect(self, src, srcport, dst, dstport): + self.primitive_msg_connect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport); + + def msg_disconnect(self, src, srcport, dst, dstport): + self.primitive_msg_disconnect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport); + def _connect(self, src, dst): (src_block, src_port) = self._coerce_endpoint(src) (dst_block, dst_port) = self._coerce_endpoint(dst) diff --git a/grc/python/Connection.py b/grc/python/Connection.py index 218baf074..341dd2d82 100644 --- a/grc/python/Connection.py +++ b/grc/python/Connection.py @@ -31,6 +31,9 @@ class Connection(_Connection, _GUIConnection): def is_msg(self): return self.get_source().get_type() == self.get_sink().get_type() == 'msg' + def is_message(self): + return self.get_source().get_type() == self.get_sink().get_type() == 'message' + def validate(self): """ Validate the connections. diff --git a/grc/python/Constants.py b/grc/python/Constants.py index 1a65caf1c..09c308196 100644 --- a/grc/python/Constants.py +++ b/grc/python/Constants.py @@ -58,6 +58,7 @@ CORE_TYPES = ( #name, key, sizeof, color ('Integer 16', 's16', 2, '#FFFF66'), ('Integer 8', 's8', 1, '#FF66FF'), ('Message Queue', 'msg', 0, '#777777'), + ('Async Message', 'message', 0, '#777777'), ('Wildcard', '', 0, '#FFFFFF'), ) diff --git a/grc/python/Generator.py b/grc/python/Generator.py index 2a6fe51d5..616ea00fc 100644 --- a/grc/python/Generator.py +++ b/grc/python/Generator.py @@ -116,8 +116,9 @@ Add a Misc->Throttle block to your flow graph to avoid CPU congestion.''') #list of regular blocks (all blocks minus the special ones) blocks = filter(lambda b: b not in (imports + parameters), blocks) #list of connections where each endpoint is enabled - connections = filter(lambda c: not c.is_msg(), self._flow_graph.get_enabled_connections()) + connections = filter(lambda c: not (c.is_msg() or c.is_message()), self._flow_graph.get_enabled_connections()) messages = filter(lambda c: c.is_msg(), self._flow_graph.get_enabled_connections()) + messages2 = filter(lambda c: c.is_message(), self._flow_graph.get_enabled_connections()) #list of variable names var_ids = [var.get_id() for var in parameters + variables] #prepend self. @@ -142,6 +143,7 @@ Add a Misc->Throttle block to your flow graph to avoid CPU congestion.''') 'blocks': blocks, 'connections': connections, 'messages': messages, + 'messages2': messages2, 'generate_options': self._generate_options, 'var_id2cbs': var_id2cbs, } diff --git a/grc/python/flow_graph.tmpl b/grc/python/flow_graph.tmpl index 17feb01f6..af55ad641 100644 --- a/grc/python/flow_graph.tmpl +++ b/grc/python/flow_graph.tmpl @@ -189,6 +189,17 @@ gr.io_signaturev($(len($io_sigs)), $(len($io_sigs)), [$(', '.join($size_strs))]) self.connect($make_port_sig($source), $make_port_sig($sink)) #end if #end for +######################################################## +##Create Asynch Message Connections +######################################################## +#if $messages2 + $DIVIDER + # Asynch Message Connections + $DIVIDER +#end if +#for $msg in $messages2 + self.msg_connect(self.$msg.get_source().get_parent().get_id(), "$msg.get_source().get_name()", self.$msg.get_sink().get_parent().get_id(), "$msg.get_sink().get_name()") +#end for ######################################################## ##Create Callbacks diff --git a/gruel/src/include/gruel/msg_accepter.h b/gruel/src/include/gruel/msg_accepter.h index 2dc1a6859..65abd5a6b 100644 --- a/gruel/src/include/gruel/msg_accepter.h +++ b/gruel/src/include/gruel/msg_accepter.h @@ -43,7 +43,7 @@ namespace gruel { * call will not wait for the message either to arrive at the * destination or to be received. */ - virtual void post(pmt::pmt_t msg) = 0; + virtual void post(pmt::pmt_t which_port, pmt::pmt_t msg) = 0; }; typedef boost::shared_ptr<msg_accepter> msg_accepter_sptr; diff --git a/gruel/src/include/gruel/msg_passing.h b/gruel/src/include/gruel/msg_passing.h index 0cc0cd111..7230dfc5b 100644 --- a/gruel/src/include/gruel/msg_passing.h +++ b/gruel/src/include/gruel/msg_passing.h @@ -45,9 +45,9 @@ namespace gruel { * \returns msg */ static inline pmt::pmt_t - send(msg_accepter_sptr accepter, const pmt::pmt_t &msg) + send(msg_accepter_sptr accepter, const pmt::pmt_t &which_port, const pmt::pmt_t &msg) { - accepter->post(msg); + accepter->post(which_port, msg); return msg; } @@ -64,9 +64,9 @@ namespace gruel { * \returns msg */ static inline pmt::pmt_t - send(msg_accepter *accepter, const pmt::pmt_t &msg) + send(msg_accepter *accepter, const pmt::pmt_t &which_port, const pmt::pmt_t &msg) { - accepter->post(msg); + accepter->post(which_port, msg); return msg; } @@ -83,9 +83,9 @@ namespace gruel { * \returns msg */ static inline pmt::pmt_t - send(msg_accepter &accepter, const pmt::pmt_t &msg) + send(msg_accepter &accepter, const pmt::pmt_t &which_port, const pmt::pmt_t &msg) { - accepter.post(msg); + accepter.post(which_port, msg); return msg; } @@ -102,9 +102,9 @@ namespace gruel { * \returns msg */ static inline pmt::pmt_t - send(pmt::pmt_t accepter, const pmt::pmt_t &msg) + send(pmt::pmt_t accepter, const pmt::pmt_t &which_port, const pmt::pmt_t &msg) { - return send(pmt_msg_accepter_ref(accepter), msg); + return send(pmt_msg_accepter_ref(accepter), which_port, msg); } } /* namespace gruel */ diff --git a/gruel/src/include/gruel/pmt.h b/gruel/src/include/gruel/pmt.h index 1e8b38627..a462155c5 100644 --- a/gruel/src/include/gruel/pmt.h +++ b/gruel/src/include/gruel/pmt.h @@ -729,6 +729,10 @@ GRUEL_API pmt_t pmt_list6(const pmt_t& x1, const pmt_t& x2, const pmt_t& x3, con */ GRUEL_API pmt_t pmt_list_add(pmt_t list, const pmt_t& item); +/*! + * \brief Return \p list with \p item removed from it. + */ +GRUEL_API pmt_t pmt_list_rm(pmt_t list, const pmt_t& item); /* * ------------------------------------------------------------------------ @@ -805,6 +809,15 @@ GRUEL_API std::string pmt_serialize_str(pmt_t obj); */ GRUEL_API pmt_t pmt_deserialize_str(std::string str); +/*! + * \brief Provide a comparator function object to allow pmt use in stl types + */ +class pmt_comperator { + public: + bool operator()(pmt::pmt_t const& p1, pmt::pmt_t const& p2) const + { return pmt::pmt_eqv(p1,p2)?false:p1.get()>p2.get(); } + }; + } /* namespace pmt */ #include <gruel/pmt_sugar.h> diff --git a/gruel/src/lib/pmt/pmt.cc b/gruel/src/lib/pmt/pmt.cc index 1d9125d4e..3eb39ed7b 100644 --- a/gruel/src/lib/pmt/pmt.cc +++ b/gruel/src/lib/pmt/pmt.cc @@ -1325,6 +1325,22 @@ pmt_list_add(pmt_t list, const pmt_t& item) } pmt_t +pmt_list_rm(pmt_t list, const pmt_t& item) +{ + if(pmt_is_pair(list)){ + pmt_t left = pmt_car(list); + pmt_t right = pmt_cdr(list); + if(!pmt_equal(left, item)){ + return pmt_cons(left, pmt_list_rm(right, item)); + } else { + return pmt_list_rm(right, item); + } + } else { + return list; + } +} + +pmt_t pmt_caar(pmt_t pair) { return (pmt_car(pmt_car(pair))); diff --git a/gruel/src/lib/pmt/qa_pmt_prims.cc b/gruel/src/lib/pmt/qa_pmt_prims.cc index 6212b8ea4..1bf5fcfb1 100644 --- a/gruel/src/lib/pmt/qa_pmt_prims.cc +++ b/gruel/src/lib/pmt/qa_pmt_prims.cc @@ -472,7 +472,7 @@ class qa_pmt_msg_accepter_nop : public gruel::msg_accepter { public: qa_pmt_msg_accepter_nop(){}; ~qa_pmt_msg_accepter_nop(); - void post(pmt_t){}; + void post(pmt_t,pmt_t){}; }; qa_pmt_msg_accepter_nop::~qa_pmt_msg_accepter_nop(){} @@ -495,9 +495,10 @@ qa_pmt_prims::test_msg_accepter() CPPUNIT_ASSERT_THROW(pmt_msg_accepter_ref(p0), pmt_wrong_type); // just confirm interfaces on send are OK - gruel::send(ma0.get(), sym); - gruel::send(ma0, sym); - gruel::send(p1, sym); + pmt_t port(pmt_intern("port")); + gruel::send(ma0.get(), port, sym); + gruel::send(ma0, port, sym); + gruel::send(p1, port, sym); } diff --git a/gruel/src/swig/pmt_swig.i b/gruel/src/swig/pmt_swig.i index 45cfceadc..d46143424 100644 --- a/gruel/src/swig/pmt_swig.i +++ b/gruel/src/swig/pmt_swig.i @@ -696,6 +696,10 @@ pmt_t pmt_list6(const pmt_t& x1, const pmt_t& x2, const pmt_t& x3, const pmt_t& */ pmt_t pmt_list_add(pmt_t list, const pmt_t& item); +/*! + * \brief Return \p list with \p item removed + */ +pmt_t pmt_list_rm(pmt_t list, const pmt_t& item); /* * ------------------------------------------------------------------------ |