summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/lib/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-core/src/lib/runtime')
-rw-r--r--gnuradio-core/src/lib/runtime/CMakeLists.txt2
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.cc126
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h92
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.i6
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.cc47
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.h115
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.i14
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.cc6
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.h5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_executor.cc6
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_registry.cc76
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_registry.h42
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc25
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flowgraph.cc9
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flowgraph.h3
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2.cc30
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2.h15
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2.i15
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc31
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h4
-rw-r--r--gnuradio-core/src/lib/runtime/gr_msg_accepter.cc4
-rw-r--r--gnuradio-core/src/lib/runtime/gr_msg_accepter.h2
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_detail.cc40
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_detail.h26
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc50
-rw-r--r--gnuradio-core/src/lib/runtime/qa_gr_top_block.cc143
-rw-r--r--gnuradio-core/src/lib/runtime/qa_gr_top_block.h8
-rw-r--r--gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc8
29 files changed, 847 insertions, 108 deletions
diff --git a/gnuradio-core/src/lib/runtime/CMakeLists.txt b/gnuradio-core/src/lib/runtime/CMakeLists.txt
index 5f3672dde..70938a0f1 100644
--- a/gnuradio-core/src/lib/runtime/CMakeLists.txt
+++ b/gnuradio-core/src/lib/runtime/CMakeLists.txt
@@ -54,6 +54,7 @@ list(APPEND gnuradio_core_sources
${CMAKE_CURRENT_SOURCE_DIR}/gr_block.cc
${CMAKE_CURRENT_SOURCE_DIR}/gr_block_detail.cc
${CMAKE_CURRENT_SOURCE_DIR}/gr_block_executor.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_registry.cc
${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2.cc
${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2_detail.cc
${CMAKE_CURRENT_SOURCE_DIR}/gr_buffer.cc
@@ -116,6 +117,7 @@ install(FILES
${CMAKE_CURRENT_SOURCE_DIR}/gr_block.h
${CMAKE_CURRENT_SOURCE_DIR}/gr_block_detail.h
${CMAKE_CURRENT_SOURCE_DIR}/gr_block_executor.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_registry.h
${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2.h
${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2_detail.h
${CMAKE_CURRENT_SOURCE_DIR}/gr_buffer.h
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index d7263b92d..69f2e09f9 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2006 Free Software Foundation, Inc.
+ * Copyright 2006,2012 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -25,7 +25,10 @@
#endif
#include <gr_basic_block.h>
+#include <gr_block_registry.h>
#include <stdexcept>
+#include <sstream>
+#include <iostream>
using namespace pmt;
@@ -45,14 +48,19 @@ gr_basic_block::gr_basic_block(const std::string &name,
d_input_signature(input_signature),
d_output_signature(output_signature),
d_unique_id(s_next_id++),
- d_color(WHITE)
+ d_symbolic_id(global_block_registry.block_register(this)),
+ d_symbol_name(global_block_registry.register_symbolic_name(this)),
+ d_color(WHITE),
+ message_subscribers(pmt::pmt_make_dict())
{
+ mutex.unlock();
s_ncurrently_allocated++;
}
gr_basic_block::~gr_basic_block()
{
s_ncurrently_allocated--;
+ global_block_registry.block_unregister(this);
}
gr_basic_block_sptr
@@ -60,3 +68,117 @@ gr_basic_block::to_basic_block()
{
return shared_from_this();
}
+
+void
+gr_basic_block::set_block_alias(std::string name)
+{
+ global_block_registry.register_symbolic_name(this, name);
+}
+
+// ** Message passing interface **
+
+// - register a new input message port
+void gr_basic_block::message_port_register_in(pmt::pmt_t port_id){
+ msg_queue[port_id] = msg_queue_t();
+ msg_queue_ready[port_id] = boost::shared_ptr<boost::condition_variable>(new boost::condition_variable());
+ }
+
+// - register a new output message port
+void gr_basic_block::message_port_register_out(pmt::pmt_t port_id){
+ if(!pmt::pmt_is_symbol(port_id)){ throw std::runtime_error("bad port id"); }
+ if(pmt::pmt_dict_has_key(message_subscribers, port_id)){ throw std::runtime_error("port already in use"); }
+ message_subscribers = pmt::pmt_dict_add(message_subscribers, port_id, pmt::PMT_NIL);
+ }
+
+// - publish a message on a message port
+void gr_basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg){
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ throw std::runtime_error("port does not exist"); }
+ pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
+ // iterate through subscribers on port
+ while( pmt::pmt_is_pair(currlist) ){
+ pmt::pmt_t target = pmt::pmt_car(currlist);
+
+ pmt::pmt_t block = pmt::pmt_car(target);
+ pmt::pmt_t port = pmt::pmt_cdr(target);
+
+ currlist = pmt::pmt_cdr(currlist);
+ gr_basic_block_sptr blk = global_block_registry.block_lookup(block);
+ //blk->post(msg);
+ blk->post(port, msg);
+ }
+ }
+
+// - subscribe to a message port
+void gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){
+ std::stringstream ss;
+ ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
+ throw std::runtime_error(ss.str());
+ }
+ pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
+ message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target));
+ }
+
+void gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){
+ std::stringstream ss;
+ ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
+ throw std::runtime_error(ss.str());
+ }
+ pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
+ message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_rm(currlist,target));
+ }
+
+void
+gr_basic_block::_post(pmt_t which_port, pmt_t msg)
+{
+ insert_tail(which_port, msg);
+}
+
+void
+gr_basic_block::insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg)
+{
+ gruel::scoped_lock guard(mutex);
+
+ if( (msg_queue.find(which_port) == msg_queue.end()) || (msg_queue_ready.find(which_port) == msg_queue_ready.end())){
+ std::cout << "target port = " << pmt::pmt_symbol_to_string(which_port) << std::endl;
+ throw std::runtime_error("attempted to insert_tail on invalid queue!");
+ }
+
+ msg_queue[which_port].push_back(msg);
+ msg_queue_ready[which_port]->notify_one();
+
+ // wake up thread if BLKD_IN or BLKD_OUT
+ global_block_registry.notify_blk(alias());
+}
+
+pmt_t
+gr_basic_block::delete_head_nowait(pmt::pmt_t which_port)
+{
+ gruel::scoped_lock guard(mutex);
+
+ if (empty_p(which_port)){
+ return pmt::pmt_t();
+ }
+
+ pmt_t m(msg_queue[which_port].front());
+ msg_queue[which_port].pop_front();
+
+ return m;
+}
+
+pmt_t
+gr_basic_block::delete_head_blocking(pmt::pmt_t which_port)
+{
+ gruel::scoped_lock guard(mutex);
+
+ while (empty_p(which_port)){
+ msg_queue_ready[which_port]->wait(guard);
+ }
+
+ pmt_t m(msg_queue[which_port].front());
+ msg_queue[which_port].pop_front();
+ return m;
+}
+
+
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index 4d03b878e..e0fd5d2af 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -30,6 +30,12 @@
#include <boost/function.hpp>
#include <gr_msg_accepter.h>
#include <string>
+#include <deque>
+#include <map>
+#include <gr_io_signature.h>
+#include <gruel/thread.h>
+#include <boost/foreach.hpp>
+#include <boost/thread/condition_variable.hpp>
/*!
* \brief The abstract base class for all signal processing blocks.
@@ -53,13 +59,25 @@ private:
* The thread-safety guarantees mentioned in set_msg_handler are implemented
* by the callers of this method.
*/
- void dispatch_msg(pmt::pmt_t msg)
+ void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
{
- if (d_msg_handler) // Is there a handler?
- d_msg_handler(msg); // Yes, invoke it.
+ // AA Update this
+ if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
+ d_msg_handlers[which_port](msg); // Yes, invoke it.
};
- msg_handler_t d_msg_handler;
+ //msg_handler_t d_msg_handler;
+ typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
+ d_msg_handlers_t d_msg_handlers;
+
+ typedef std::deque<pmt::pmt_t> msg_queue_t;
+ typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t;
+ msg_queue_map_t msg_queue;
+// boost::condition_variable msg_queue_ready;
+ std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready;
+
+ gruel::mutex mutex; //< protects all vars
+
protected:
friend class gr_flowgraph;
@@ -72,6 +90,9 @@ protected:
gr_io_signature_sptr d_input_signature;
gr_io_signature_sptr d_output_signature;
long d_unique_id;
+ long d_symbolic_id;
+ std::string d_symbol_name;
+ std::string d_symbol_alias;
vcolor d_color;
gr_basic_block(void){} //allows pure virtual interface sub-classes
@@ -97,13 +118,67 @@ protected:
void set_color(vcolor color) { d_color = color; }
vcolor color() const { return d_color; }
+ // Message passing interface
+ pmt::pmt_t message_subscribers;
+
public:
virtual ~gr_basic_block();
long unique_id() const { return d_unique_id; }
+ long symbolic_id() const { return d_symbolic_id; }
std::string name() const { return d_name; }
+ std::string symbol_name() const { return d_symbol_name; }
gr_io_signature_sptr input_signature() const { return d_input_signature; }
gr_io_signature_sptr output_signature() const { return d_output_signature; }
gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion
+ bool alias_set() { return !d_symbol_alias.empty(); }
+ std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
+ pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
+ void set_block_alias(std::string name);
+
+ // ** Message passing interface **
+ void message_port_register_in(pmt::pmt_t port_id);
+ void message_port_register_out(pmt::pmt_t port_id);
+ void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
+ void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
+ void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
+
+ /*!
+ * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
+ */
+ void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
+
+ //! is the queue empty?
+ //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); }
+ bool empty_p(pmt::pmt_t which_port) {
+ if(msg_queue.find(which_port) == msg_queue.end())
+ throw std::runtime_error("port does not exist!");
+ return msg_queue[which_port].empty();
+ }
+ bool empty_p() {
+ bool rv = true;
+ BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
+ return rv;
+ }
+
+ //| Acquires and release the mutex
+ void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
+
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
+
+ msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
+ return msg_queue[which_port].begin();
+ }
+ void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
+ msg_queue[which_port].erase(it);
+ }
+
/*!
* \brief Confirm that ninputs and noutputs is an acceptable combination.
@@ -146,8 +221,13 @@ public:
* If the block inherits from gr_hier_block2, the runtime system will
* ensure that no reentrant calls are made to msg_handler.
*/
- template <typename T> void set_msg_handler(T msg_handler){
- d_msg_handler = msg_handler_t(msg_handler);
+ //template <typename T> void set_msg_handler(T msg_handler){
+ // d_msg_handler = msg_handler_t(msg_handler);
+ //}
+ template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){
+ if(msg_queue.find(which_port) == msg_queue.end()){
+ throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); }
+ d_msg_handlers[which_port] = msg_handler_t(msg_handler);
}
};
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.i b/gnuradio-core/src/lib/runtime/gr_basic_block.i
index e43cc114c..d6d6c3d16 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.i
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.i
@@ -23,6 +23,8 @@
class gr_basic_block;
typedef boost::shared_ptr<gr_basic_block> gr_basic_block_sptr;
%template(gr_basic_block_sptr) boost::shared_ptr<gr_basic_block>;
+%include "pmt_swig.i"
+using namespace pmt;
// support vectors of these...
namespace std {
@@ -37,11 +39,15 @@ protected:
public:
virtual ~gr_basic_block();
std::string name() const;
+ std::string symbol_name() const;
gr_io_signature_sptr input_signature() const;
gr_io_signature_sptr output_signature() const;
long unique_id() const;
gr_basic_block_sptr to_basic_block();
bool check_topology (int ninputs, int noutputs);
+ std::string alias();
+ void set_block_alias(std::string name);
+ void _post(pmt_t which_port, pmt_t msg);
};
%rename(block_ncurrently_allocated) gr_basic_block_ncurrently_allocated;
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index 9a5255a93..43aebf0bf 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -28,6 +28,7 @@
#include <gr_block_detail.h>
#include <stdexcept>
#include <iostream>
+#include <gr_block_registry.h>
gr_block::gr_block (const std::string &name,
gr_io_signature_sptr input_signature,
@@ -40,12 +41,18 @@ gr_block::gr_block (const std::string &name,
d_relative_rate (1.0),
d_history(1),
d_fixed_rate(false),
- d_tag_propagation_policy(TPP_ALL_TO_ALL)
+ d_max_noutput_items_set(false),
+ d_max_noutput_items(0),
+ d_tag_propagation_policy(TPP_ALL_TO_ALL),
+ d_max_output_buffer(std::max(output_signature->max_streams(),1), -1),
+ d_min_output_buffer(std::max(output_signature->max_streams(),1), -1)
{
+ global_block_registry.register_primitive(alias(), this);
}
gr_block::~gr_block ()
{
+ global_block_registry.unregister_primitive(alias());
}
// stub implementation: 1:1
@@ -208,6 +215,35 @@ gr_block::set_tag_propagation_policy(tag_propagation_policy_t p)
d_tag_propagation_policy = p;
}
+
+int
+gr_block::max_noutput_items()
+{
+ return d_max_noutput_items;
+}
+
+void
+gr_block::set_max_noutput_items(int m)
+{
+ if(m <= 0)
+ throw std::runtime_error("gr_block::set_max_noutput_items: value for max_noutput_items must be greater than 0.\n");
+
+ d_max_noutput_items = m;
+ d_max_noutput_items_set = true;
+}
+
+void
+gr_block::unset_max_noutput_items()
+{
+ d_max_noutput_items_set = false;
+}
+
+bool
+gr_block::is_set_max_noutput_items()
+{
+ return d_max_noutput_items_set;
+}
+
std::ostream&
operator << (std::ostream& os, const gr_block *m)
{
@@ -215,3 +251,12 @@ operator << (std::ostream& os, const gr_block *m)
return os;
}
+int
+gr_block::general_work(int noutput_items,
+ gr_vector_int &ninput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+{
+ throw std::runtime_error("gr_block::general_work() not implemented");
+ return 0;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 71ac8eee6..57e3fda90 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -124,7 +124,7 @@ class GR_CORE_API gr_block : public gr_basic_block {
virtual int general_work (int noutput_items,
gr_vector_int &ninput_items,
gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items) = 0;
+ gr_vector_void_star &output_items);
/*!
* \brief Called to enable drivers, etc for i/o devices.
@@ -251,6 +251,113 @@ class GR_CORE_API gr_block : public gr_basic_block {
*/
void set_tag_propagation_policy(tag_propagation_policy_t p);
+ /*!
+ * \brief Return the maximum number of output items this block will
+ * handle during a call to work.
+ */
+ int max_noutput_items();
+
+ /*!
+ * \brief Set the maximum number of ouput items htis block will
+ * handle during a call to work.
+ *
+ * \param m the maximum noutput_items this block will handle.
+ */
+ void set_max_noutput_items(int m);
+
+ /*!
+ * \brief Clear the switch for using the max_noutput_items value of this block.
+ *
+ * When is_set_max_noutput_items() returns 'true', the scheduler
+ * will use the value returned by max_noutput_items() to limit the
+ * size of the number of items possible for this block's work
+ * function. If is_set_max_notput_items() returns 'false', then the
+ * scheduler ignores the internal value and uses the value set
+ * globally in the top_block.
+ *
+ * Use this value to clear the 'is_set' flag so the scheduler will
+ * ignore this. Use the set_max_noutput_items(m) call to both set a
+ * new value for max_noutput_items and to reenable its use in the
+ * scheduler.
+ */
+ void unset_max_noutput_items();
+
+ /*!
+ * \brief Ask the block if the flag is or is not set to use the
+ * internal value of max_noutput_items during a call to work.
+ */
+ bool is_set_max_noutput_items();
+
+ /*
+ * Used to expand the vectors that hold the min/max buffer sizes.
+ *
+ * Specifically, when -1 is used, the vectors are just initialized
+ * with 1 value; this is used by the flat_flowgraph to expand when
+ * required to add a new value for new ports on these blocks.
+ */
+ void expand_minmax_buffer(int port) {
+ if((size_t)port >= d_max_output_buffer.size())
+ set_max_output_buffer(port, -1);
+ if((size_t)port >= d_min_output_buffer.size())
+ set_min_output_buffer(port, -1);
+ }
+
+ /*!
+ * \brief Returns max buffer size on output port \p i.
+ */
+ long max_output_buffer(size_t i) {
+ if(i >= d_max_output_buffer.size())
+ throw std::invalid_argument("gr_basic_block::max_output_buffer: port out of range.");
+ return d_max_output_buffer[i];
+ }
+
+ /*!
+ * \brief Sets max buffer size on all output ports.
+ */
+ void set_max_output_buffer(long max_output_buffer) {
+ for(int i = 0; i < output_signature()->max_streams(); i++) {
+ set_max_output_buffer(i, max_output_buffer);
+ }
+ }
+
+ /*!
+ * \brief Sets max buffer size on output port \p port.
+ */
+ void set_max_output_buffer(int port, long max_output_buffer) {
+ if((size_t)port >= d_max_output_buffer.size())
+ d_max_output_buffer.push_back(max_output_buffer);
+ else
+ d_max_output_buffer[port] = max_output_buffer;
+ }
+
+ /*!
+ * \brief Returns min buffer size on output port \p i.
+ */
+ long min_output_buffer(size_t i) {
+ if(i >= d_min_output_buffer.size())
+ throw std::invalid_argument("gr_basic_block::min_output_buffer: port out of range.");
+ return d_min_output_buffer[i];
+ }
+
+ /*!
+ * \brief Sets min buffer size on all output ports.
+ */
+ void set_min_output_buffer(long min_output_buffer) {
+ for(int i=0; i<output_signature()->max_streams(); i++) {
+ set_min_output_buffer(i, min_output_buffer);
+ }
+ }
+
+ /*!
+ * \brief Sets min buffer size on output port \p port.
+ */
+ void set_min_output_buffer(int port, long min_output_buffer) {
+ if((size_t)port >= d_min_output_buffer.size())
+ d_min_output_buffer.push_back(min_output_buffer);
+ else
+ d_min_output_buffer[port] = min_output_buffer;
+ }
+
// ----------------------------------------------------------------------------
private:
@@ -263,6 +370,8 @@ class GR_CORE_API gr_block : public gr_basic_block {
gr_block_detail_sptr d_detail; // implementation details
unsigned d_history;
bool d_fixed_rate;
+ bool d_max_noutput_items_set; // if d_max_noutput_items is valid
+ int d_max_noutput_items; // value of max_noutput_items for this block
tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags downstream
protected:
@@ -345,6 +454,10 @@ class GR_CORE_API gr_block : public gr_basic_block {
uint64_t abs_end,
const pmt::pmt_t &key);
+ std::vector<long> d_max_output_buffer;
+ std::vector<long> d_min_output_buffer;
+
+
// These are really only for internal use, but leaving them public avoids
// having to work up an ever-varying list of friend GR_CORE_APIs
diff --git a/gnuradio-core/src/lib/runtime/gr_block.i b/gnuradio-core/src/lib/runtime/gr_block.i
index 4cc260bfe..db6c1d04a 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.i
+++ b/gnuradio-core/src/lib/runtime/gr_block.i
@@ -52,6 +52,20 @@ class gr_block : public gr_basic_block {
uint64_t nitems_read(unsigned int which_input);
uint64_t nitems_written(unsigned int which_output);
+ // Methods to manage the block's max_noutput_items size.
+ int max_noutput_items();
+ void set_max_noutput_items(int m);
+ void unset_max_noutput_items();
+ bool is_set_max_noutput_items();
+
+ // Methods to manage block's min/max buffer sizes.
+ long max_output_buffer(int i);
+ void set_max_output_buffer(long max_output_buffer);
+ void set_max_output_buffer(int port, long max_output_buffer);
+ long min_output_buffer(int i);
+ void set_min_output_buffer(long min_output_buffer);
+ void set_min_output_buffer(int port, long min_output_buffer);
+
// internal use
gr_block_detail_sptr detail () const { return d_detail; }
void set_detail (gr_block_detail_sptr detail) { d_detail = detail; }
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
index 2792cd471..337c9518e 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
@@ -127,12 +127,6 @@ gr_block_detail::produce_each (int how_many_items)
}
-void
-gr_block_detail::_post(pmt_t msg)
-{
- d_tpb.insert_tail(msg);
-}
-
uint64_t
gr_block_detail::nitems_read(unsigned int which_input)
{
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index c96f00db8..16d9f4d42 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -85,11 +85,6 @@ class GR_CORE_API gr_block_detail {
*/
void produce_each (int how_many_items);
- /*!
- * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
- */
- void _post(pmt::pmt_t msg);
-
// Return the number of items read on input stream which_input
uint64_t nitems_read(unsigned int which_input);
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
index 6fea14613..375b58f56 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_executor.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
@@ -206,7 +206,7 @@ gr_block_executor::run_one_iteration()
// determine the minimum available output space
noutput_items = min_available_space (d, m->output_multiple ());
- noutput_items = std::min(noutput_items, d_max_noutput_items);
+ noutput_items = std::min(noutput_items, max_noutput_items);
LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
if (noutput_items == -1) // we're done
goto were_done;
@@ -251,7 +251,7 @@ gr_block_executor::run_one_iteration()
// take a swag at how much output we can sink
noutput_items = (int) (max_items_avail * m->relative_rate ());
noutput_items = round_down (noutput_items, m->output_multiple ());
- noutput_items = std::min(noutput_items, d_max_noutput_items);
+ noutput_items = std::min(noutput_items, max_noutput_items);
LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
@@ -449,6 +449,7 @@ gr_block_executor::run_one_iteration()
// We didn't produce any output even though we called general_work.
// We have (most likely) consumed some input.
+ /*
// If this is a source, it's broken.
if (d->source_p()){
std::cerr << "gr_block_executor: source " << m
@@ -456,6 +457,7 @@ gr_block_executor::run_one_iteration()
// FIXME maybe we ought to raise an exception...
goto were_done;
}
+ */
// Have the caller try again...
return READY_NO_OUTPUT;
diff --git a/gnuradio-core/src/lib/runtime/gr_block_registry.cc b/gnuradio-core/src/lib/runtime/gr_block_registry.cc
new file mode 100644
index 000000000..ff23d97eb
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_block_registry.cc
@@ -0,0 +1,76 @@
+#include <gr_basic_block.h>
+#include <gr_block_registry.h>
+#include <gr_tpb_detail.h>
+#include <gr_block_detail.h>
+#include <gr_block.h>
+#include <stdio.h>
+
+gr_block_registry global_block_registry;
+
+gr_block_registry::gr_block_registry(){
+ d_ref_map = pmt::pmt_make_dict();
+}
+
+long gr_block_registry::block_register(gr_basic_block* block){
+ if(d_map.find(block->name()) == d_map.end()){
+ d_map[block->name()] = blocksubmap_t();
+ d_map[block->name()][0] = block;
+ return 0;
+ } else {
+ for(size_t i=0; i<=d_map[block->name()].size(); i++){
+ if(d_map[block->name()].find(i) == d_map[block->name()].end()){
+ d_map[block->name()][i] = block;
+ return i;
+ }
+ }
+ }
+ throw std::runtime_error("should not reach this");
+}
+
+void gr_block_registry::block_unregister(gr_basic_block* block){
+ d_map[block->name()].erase( d_map[block->name()].find(block->symbolic_id()));
+ d_ref_map = pmt::pmt_dict_delete(d_ref_map, pmt::pmt_intern(block->symbol_name()));
+ if(block->alias_set()){
+ d_ref_map = pmt::pmt_dict_delete(d_ref_map, pmt::pmt_intern(block->alias()));
+ }
+}
+
+std::string gr_block_registry::register_symbolic_name(gr_basic_block* block){
+ std::stringstream ss;
+ ss << block->name() << block->symbolic_id();
+ //std::cout << "register_symbolic_name: " << ss.str() << std::endl;
+ register_symbolic_name(block, ss.str());
+ return ss.str();
+}
+
+void gr_block_registry::register_symbolic_name(gr_basic_block* block, std::string name){
+ if(pmt_dict_has_key(d_ref_map, pmt::pmt_intern(name))){
+ throw std::runtime_error("symbol already exists, can not re-use!");
+ }
+ d_ref_map = pmt_dict_add(d_ref_map, pmt::pmt_intern(name), pmt::pmt_make_any(block));
+}
+
+gr_basic_block_sptr gr_block_registry::block_lookup(pmt::pmt_t symbol){
+ pmt::pmt_t ref = pmt_dict_ref(d_ref_map, symbol, pmt::PMT_NIL);
+ if(pmt::pmt_eq(ref, pmt::PMT_NIL)){
+ throw std::runtime_error("block lookup failed! block not found!");
+ }
+ gr_basic_block* blk = boost::any_cast<gr_basic_block*>( pmt::pmt_any_ref(ref) );
+ return blk->shared_from_this();
+}
+
+
+void gr_block_registry::register_primitive(std::string blk, gr_block* ref){
+ primitive_map[blk] = ref;
+}
+
+void gr_block_registry::unregister_primitive(std::string blk){
+ primitive_map.erase(primitive_map.find(blk));
+}
+
+void gr_block_registry::notify_blk(std::string blk){
+ if(primitive_map.find(blk) == primitive_map.end()){ return; }
+ if(primitive_map[blk]->detail().get())
+ primitive_map[blk]->detail()->d_tpb.notify_msg();
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_block_registry.h b/gnuradio-core/src/lib/runtime/gr_block_registry.h
new file mode 100644
index 000000000..6a2d939e5
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_block_registry.h
@@ -0,0 +1,42 @@
+#ifndef GR_BLOCK_REGISTRY_H
+#define GR_BLOCK_REGISTRY_H
+
+#include <map>
+
+#ifndef GR_BASIC_BLOCK_H
+class gr_basic_block;
+class gr_block;
+#endif
+
+class gr_block_registry {
+ public:
+ gr_block_registry();
+
+ long block_register(gr_basic_block* block);
+ void block_unregister(gr_basic_block* block);
+
+ std::string register_symbolic_name(gr_basic_block* block);
+ void register_symbolic_name(gr_basic_block* block, std::string name);
+
+ gr_basic_block_sptr block_lookup(pmt::pmt_t symbol);
+
+ void register_primitive(std::string blk, gr_block* ref);
+ void unregister_primitive(std::string blk);
+ void notify_blk(std::string blk);
+
+ private:
+
+ //typedef std::map< long, gr_basic_block_sptr > blocksubmap_t;
+ typedef std::map< long, gr_basic_block* > blocksubmap_t;
+ typedef std::map< std::string, blocksubmap_t > blockmap_t;
+
+ blockmap_t d_map;
+ pmt::pmt_t d_ref_map;
+ std::map< std::string, gr_block*> primitive_map;
+
+};
+
+extern gr_block_registry global_block_registry;
+
+#endif
+
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
index 9005cd339..e04deb948 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
@@ -80,14 +80,23 @@ gr_flat_flowgraph::allocate_block_detail(gr_basic_block_sptr block)
int noutputs = calc_used_ports(block, false).size();
gr_block_detail_sptr detail = gr_make_block_detail(ninputs, noutputs);
+ gr_block_sptr grblock = cast_to_block_sptr(block);
+ if(!grblock)
+ throw std::runtime_error("allocate_block_detail found non-gr_block");
+
if (GR_FLAT_FLOWGRAPH_DEBUG)
std::cout << "Creating block detail for " << block << std::endl;
for (int i = 0; i < noutputs; i++) {
+ grblock->expand_minmax_buffer(i);
+
gr_buffer_sptr buffer = allocate_buffer(block, i);
if (GR_FLAT_FLOWGRAPH_DEBUG)
std::cout << "Allocated buffer for output " << block << ":" << i << std::endl;
detail->set_output(i, buffer);
+
+ // Update the block's max_output_buffer based on what was actually allocated.
+ grblock->set_max_output_buffer(i, buffer->bufsize());
}
return detail;
@@ -114,6 +123,21 @@ gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port)
// ensure we have a buffer at least twice their decimation factor*output_multiple
gr_basic_block_vector_t blocks = calc_downstream_blocks(block, port);
+ // limit buffer size if indicated
+ if(grblock->max_output_buffer(port) > 0) {
+// std::cout << "constraining output items to " << block->max_output_buffer(port) << "\n";
+ nitems = std::min((long)nitems, (long)grblock->max_output_buffer(port));
+ nitems -= nitems%grblock->output_multiple();
+ if( nitems < 1 )
+ throw std::runtime_error("problems allocating a buffer with the given max output buffer constraint!");
+ }
+ else if(grblock->min_output_buffer(port) > 0) {
+ nitems = std::max((long)nitems, (long)grblock->min_output_buffer(port));
+ nitems -= nitems%grblock->output_multiple();
+ if( nitems < 1 )
+ throw std::runtime_error("problems allocating a buffer with the given min output buffer constraint!");
+ }
+
for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
gr_block_sptr dgrblock = cast_to_block_sptr(*p);
if (!dgrblock)
@@ -125,6 +149,7 @@ gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port)
nitems = std::max(nitems, static_cast<int>(2*(decimation*multiple+history)));
}
+// std::cout << "gr_make_buffer(" << nitems << ", " << item_size << ", " << grblock << "\n";
return gr_make_buffer(nitems, item_size, grblock);
}
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
index 78e1bc99a..69c304a3d 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
@@ -180,6 +180,11 @@ gr_flowgraph::calc_used_blocks()
{
gr_basic_block_vector_t tmp;
+ // make sure free standing message blocks are included
+ for (gr_basic_block_vector_t::iterator it=d_msgblocks.begin(); it!=d_msgblocks.end(); it++){
+ tmp.push_back(*it);
+ }
+
// Collect all blocks in the edge list
for (gr_edge_viter_t p = d_edges.begin(); p != d_edges.end(); p++) {
tmp.push_back(p->src().block());
@@ -472,3 +477,7 @@ gr_flowgraph::topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_ve
output.push_back(block);
}
+void gr_flowgraph::add_msg_block(gr_basic_block_sptr blk){
+ d_msgblocks.push_back(blk);
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
index a2c1580eb..860cb0ff1 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
@@ -110,6 +110,8 @@ public:
void disconnect(gr_basic_block_sptr src_block, int src_port,
gr_basic_block_sptr dst_block, int dst_port);
+ void add_msg_block(gr_basic_block_sptr blk);
+
// Validate connectivity, raise exception if invalid
void validate();
@@ -128,6 +130,7 @@ public:
// Return vector of vectors of disjointly connected blocks, topologically
// sorted.
std::vector<gr_basic_block_vector_t> partition();
+ gr_basic_block_vector_t d_msgblocks;
protected:
gr_basic_block_vector_t d_blocks;
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
index 756852df8..a19bfe195 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
@@ -81,6 +81,36 @@ gr_hier_block2::connect(gr_basic_block_sptr src, int src_port,
}
void
+gr_hier_block2::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport)
+{
+ if(!pmt::pmt_is_symbol(srcport)){throw std::runtime_error("bad port id"); }
+ d_detail->msg_connect(src, srcport, dst, dstport);
+}
+
+void
+gr_hier_block2::msg_connect(gr_basic_block_sptr src, std::string srcport,
+ gr_basic_block_sptr dst, std::string dstport)
+{
+ d_detail->msg_connect(src, pmt::mp(srcport), dst, pmt::mp(dstport));
+}
+
+void
+gr_hier_block2::msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport)
+{
+ if(!pmt::pmt_is_symbol(srcport)){throw std::runtime_error("bad port id"); }
+ d_detail->msg_disconnect(src, srcport, dst, dstport);
+}
+
+void
+gr_hier_block2::msg_disconnect(gr_basic_block_sptr src, std::string srcport,
+ gr_basic_block_sptr dst, std::string dstport)
+{
+ d_detail->msg_disconnect(src, pmt::mp(srcport), dst, pmt::mp(dstport));
+}
+
+void
gr_hier_block2::disconnect(gr_basic_block_sptr block)
{
d_detail->disconnect(block);
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.h b/gnuradio-core/src/lib/runtime/gr_hier_block2.h
index 123178724..e8364a740 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.h
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.h
@@ -98,6 +98,21 @@ public:
gr_basic_block_sptr dst, int dst_port);
/*!
+ * \brief Add gr-blocks or hierarchical blocks to internal graph and wire together
+ *
+ * This adds (if not done earlier by another connect) a pair of gr-blocks or
+ * hierarchical blocks to the internal message port subscription
+ */
+ void msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport);
+ void msg_connect(gr_basic_block_sptr src, std::string srcport,
+ gr_basic_block_sptr dst, std::string dstport);
+ void msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport);
+ void msg_disconnect(gr_basic_block_sptr src, std::string srcport,
+ gr_basic_block_sptr dst, std::string dstport);
+
+ /*!
* \brief Remove a gr-block or hierarchical block from the internal flowgraph.
*
* This removes a gr-block or hierarchical block from the internal flowgraph,
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.i b/gnuradio-core/src/lib/runtime/gr_hier_block2.i
index eefb965b4..7c0e62f28 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.i
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.i
@@ -38,6 +38,8 @@ gr_hier_block2_sptr gr_make_hier_block2(const std::string name,
// better interface in scripting land.
%rename(primitive_connect) gr_hier_block2::connect;
%rename(primitive_disconnect) gr_hier_block2::disconnect;
+%rename(primitive_msg_connect) gr_hier_block2::msg_connect;
+%rename(primitive_msg_disconnect) gr_hier_block2::msg_disconnect;
class gr_hier_block2 : public gr_basic_block
{
@@ -54,6 +56,19 @@ public:
void connect(gr_basic_block_sptr src, int src_port,
gr_basic_block_sptr dst, int dst_port)
throw (std::invalid_argument);
+ void msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport)
+ throw (std::runtime_error);
+ void msg_connect(gr_basic_block_sptr src, std::string srcport,
+ gr_basic_block_sptr dst, std::string dstport)
+ throw (std::runtime_error);
+ void msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport)
+ throw (std::runtime_error);
+ void msg_disconnect(gr_basic_block_sptr src, std::string srcport,
+ gr_basic_block_sptr dst, std::string dstport)
+ throw (std::runtime_error);
+
void disconnect(gr_basic_block_sptr block)
throw (std::invalid_argument);
void disconnect(gr_basic_block_sptr src, int src_port,
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
index 76c5ce06f..ff2a5db8c 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
@@ -144,6 +144,36 @@ gr_hier_block2_detail::connect(gr_basic_block_sptr src, int src_port,
}
void
+gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport)
+{
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << "connecting message port..." << std::endl;
+
+ // register the subscription
+ src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+
+ // add block uniquely to list to internal blocks
+ if (std::find(d_blocks.begin(), d_blocks.end(), dst) == d_blocks.end()){
+ d_blocks.push_back(dst);
+ }
+
+ // make sure we instantiate a thread for this block
+ d_fg->add_msg_block(dst);
+}
+
+void
+gr_hier_block2_detail::msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport)
+{
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << "disconnecting message port..." << std::endl;
+
+ // register the subscription
+ src->message_port_unsub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+}
+
+void
gr_hier_block2_detail::disconnect(gr_basic_block_sptr block)
{
// Check on singleton list
@@ -427,6 +457,7 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
}
}
}
+ sfg->d_msgblocks = d_fg->d_msgblocks;
// Construct unique list of blocks used either in edges, inputs,
// outputs, or by themselves. I still hate STL.
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
index f4f950e9d..f2d2b3c4e 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
@@ -39,6 +39,10 @@ public:
void connect(gr_basic_block_sptr block);
void connect(gr_basic_block_sptr src, int src_port,
gr_basic_block_sptr dst, int dst_port);
+ void msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport);
+ void msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport,
+ gr_basic_block_sptr dst, pmt::pmt_t dstport);
void disconnect(gr_basic_block_sptr block);
void disconnect(gr_basic_block_sptr, int src_port,
gr_basic_block_sptr, int dst_port);
diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
index 5018ee9e6..93d5fb20e 100644
--- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
+++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
@@ -41,12 +41,12 @@ gr_msg_accepter::~gr_msg_accepter()
}
void
-gr_msg_accepter::post(pmt_t msg)
+gr_msg_accepter::post(pmt_t which_port, pmt_t msg)
{
// Notify derived class, handled case by case
gr_block *p = dynamic_cast<gr_block *>(this);
if (p) {
- p->detail()->_post(msg);
+ p->_post(which_port,msg);
return;
}
gr_hier_block2 *p2 = dynamic_cast<gr_hier_block2 *>(this);
diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
index 3e5c97638..a497ba6e7 100644
--- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
+++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
@@ -36,7 +36,7 @@ public:
gr_msg_accepter();
~gr_msg_accepter();
- void post(pmt::pmt_t msg);
+ void post(pmt::pmt_t which_port, pmt::pmt_t msg);
};
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
index 131ddd19c..2824eb1b3 100644
--- a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
@@ -74,6 +74,11 @@ gr_scheduler_tpb::gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg, int max_noutput_i
for (size_t i = 0; i < blocks.size(); i++){
std::stringstream name;
name << "thread-per-block[" << i << "]: " << blocks[i];
+
+ // If set, use internal value instead of global value
+ if(blocks[i]->is_set_max_noutput_items())
+ max_noutput_items = blocks[i]->max_noutput_items();
+
d_threads.create_thread(
gruel::thread_body_wrapper<tpb_container>(tpb_container(blocks[i], max_noutput_items),
name.str()));
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
index 46b33d91f..46eb6bbe0 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
@@ -68,43 +68,3 @@ gr_tpb_detail::notify_neighbors(gr_block_detail *d)
notify_upstream(d);
}
-void
-gr_tpb_detail::insert_tail(pmt::pmt_t msg)
-{
- gruel::scoped_lock guard(mutex);
-
- msg_queue.push_back(msg);
-
- // wake up thread if BLKD_IN or BLKD_OUT
- input_cond.notify_one();
- output_cond.notify_one();
-}
-
-pmt_t
-gr_tpb_detail::delete_head_nowait()
-{
- gruel::scoped_lock guard(mutex);
-
- if (empty_p())
- return pmt_t();
-
- pmt_t m(msg_queue.front());
- msg_queue.pop_front();
-
- return m;
-}
-
-/*
- * Caller must already be holding the mutex
- */
-pmt_t
-gr_tpb_detail::delete_head_nowait_already_holding_mutex()
-{
- if (empty_p())
- return pmt_t();
-
- pmt_t m(msg_queue.front());
- msg_queue.pop_front();
-
- return m;
-}
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
index b6e342dee..69feb6007 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
@@ -39,9 +39,6 @@ struct GR_CORE_API gr_tpb_detail {
bool output_changed;
gruel::condition_variable output_cond;
-private:
- std::deque<pmt::pmt_t> msg_queue;
-
public:
gr_tpb_detail()
: input_changed(false), output_changed(false) { }
@@ -55,6 +52,12 @@ public:
//! Called by us to notify both upstream and downstream
void notify_neighbors(gr_block_detail *d);
+ //! Called by pmt msg posters
+ void notify_msg(){
+ input_cond.notify_one();
+ output_cond.notify_one();
+ }
+
//! Called by us
void clear_changed()
{
@@ -63,23 +66,6 @@ public:
output_changed = false;
}
- //! is the queue empty?
- bool empty_p() const { return msg_queue.empty(); }
-
- //| Acquires and release the mutex
- void insert_tail(pmt::pmt_t msg);
-
- /*!
- * \returns returns pmt at head of queue or pmt_t() if empty.
- */
- pmt::pmt_t delete_head_nowait();
-
- /*!
- * \returns returns pmt at head of queue or pmt_t() if empty.
- * Caller must already be holding the mutex
- */
- pmt::pmt_t delete_head_nowait_already_holding_mutex();
-
private:
//! Used by notify_downstream
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index a5aabb379..9f17a48a8 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -25,13 +25,14 @@
#include <iostream>
#include <boost/thread.hpp>
#include <gruel/pmt.h>
+#include <boost/foreach.hpp>
using namespace pmt;
gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_items)
: d_exec(block, max_noutput_items)
{
- // std::cerr << "gr_tpb_thread_body: " << block << std::endl;
+ //std::cerr << "gr_tpb_thread_body: " << block << std::endl;
gr_block_detail *d = block->detail().get();
gr_block_executor::state s;
@@ -42,11 +43,22 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
boost::this_thread::interruption_point();
// handle any queued up messages
- while ((msg = d->d_tpb.delete_head_nowait()))
- block->dispatch_msg(msg);
+ //BOOST_FOREACH( pmt::pmt_t port, block->msg_queue.keys() )
+
+ BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue )
+ {
+ while ((msg = block->delete_head_nowait(i.first))){
+ block->dispatch_msg(i.first,msg);
+ }
+ }
d->d_tpb.clear_changed();
- s = d_exec.run_one_iteration();
+ // run one iteration if we are a connected stream block
+ if(d->noutputs() >0 || d->ninputs()>0){
+ s = d_exec.run_one_iteration();
+ } else {
+ s = gr_block_executor::BLKD_IN;
+ }
switch(s){
case gr_block_executor::READY: // Tell neighbors we made progress.
@@ -67,15 +79,18 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
while (!d->d_tpb.input_changed){
// wait for input or message
- while(!d->d_tpb.input_changed && d->d_tpb.empty_p())
+ while(!d->d_tpb.input_changed && block->empty_p())
d->d_tpb.input_cond.wait(guard);
// handle all pending messages
- while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
- guard.unlock(); // release lock while processing msg
- block->dispatch_msg(msg);
- guard.lock();
- }
+ BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue )
+ {
+ while ((msg = block->delete_head_nowait(i.first))){
+ guard.unlock(); // release lock while processing msg
+ block->dispatch_msg(i.first, msg);
+ guard.lock();
+ }
+ }
}
}
break;
@@ -87,15 +102,18 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
while (!d->d_tpb.output_changed){
// wait for output room or message
- while(!d->d_tpb.output_changed && d->d_tpb.empty_p())
+ while(!d->d_tpb.output_changed && block->empty_p())
d->d_tpb.output_cond.wait(guard);
// handle all pending messages
- while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
- guard.unlock(); // release lock while processing msg
- block->dispatch_msg(msg);
- guard.lock();
- }
+ BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue )
+ {
+ while ((msg = block->delete_head_nowait(i.first))){
+ guard.unlock(); // release lock while processing msg
+ block->dispatch_msg(i.first,msg);
+ guard.lock();
+ }
+ }
}
}
break;
diff --git a/gnuradio-core/src/lib/runtime/qa_gr_top_block.cc b/gnuradio-core/src/lib/runtime/qa_gr_top_block.cc
index cc7b7c720..a0b4755a8 100644
--- a/gnuradio-core/src/lib/runtime/qa_gr_top_block.cc
+++ b/gnuradio-core/src/lib/runtime/qa_gr_top_block.cc
@@ -27,6 +27,7 @@
#include <qa_gr_top_block.h>
#include <gr_top_block.h>
#include <gr_head.h>
+#include <gr_nop.h>
#include <gr_null_source.h>
#include <gr_null_sink.h>
#include <iostream>
@@ -119,3 +120,145 @@ void qa_gr_top_block::t4_reconfigure()
// Wait for flowgraph to end on its own
tb->wait();
}
+
+
+void qa_gr_top_block::t5_max_noutputs()
+{
+ if (VERBOSE) std::cout << "qa_gr_top_block::t5()\n";
+
+ gr_top_block_sptr tb = gr_make_top_block("top");
+
+ gr_block_sptr src = gr_make_null_source(sizeof(int));
+ gr_block_sptr head = gr_make_head(sizeof(int), 100000);
+ gr_block_sptr dst = gr_make_null_sink(sizeof(int));
+
+ // Start infinite flowgraph
+ tb->connect(src, 0, head, 0);
+ tb->connect(head, 0, dst, 0);
+ tb->start(100);
+ tb->wait();
+}
+
+void qa_gr_top_block::t6_reconfig_max_noutputs()
+{
+ if (VERBOSE) std::cout << "qa_gr_top_block::t6()\n";
+
+ gr_top_block_sptr tb = gr_make_top_block("top");
+
+ gr_block_sptr src = gr_make_null_source(sizeof(int));
+ gr_block_sptr head = gr_make_head(sizeof(int), 100000);
+ gr_block_sptr dst = gr_make_null_sink(sizeof(int));
+
+ // Start infinite flowgraph
+ tb->connect(src, 0, dst, 0);
+ tb->start(100);
+
+ // Reconfigure with gr_head in the middle
+ tb->lock();
+ tb->disconnect(src, 0, dst, 0);
+ tb->connect(src, 0, head, 0);
+ tb->connect(head, 0, dst, 0);
+ tb->set_max_noutput_items(1000);
+ head->set_max_noutput_items(500);
+ tb->unlock();
+
+ // Wait for flowgraph to end on its own
+ tb->wait();
+}
+
+void qa_gr_top_block::t7_max_noutputs_per_block()
+{
+ if (VERBOSE) std::cout << "qa_gr_top_block::t7()\n";
+
+ gr_top_block_sptr tb = gr_make_top_block("top");
+
+ gr_block_sptr src = gr_make_null_source(sizeof(int));
+ gr_block_sptr head = gr_make_head(sizeof(int), 100000);
+ gr_block_sptr dst = gr_make_null_sink(sizeof(int));
+
+ head->set_max_noutput_items(100);
+
+ // Start infinite flowgraph
+ tb->connect(src, 0, head, 0);
+ tb->connect(head, 0, dst, 0);
+ tb->start();
+ tb->wait();
+}
+
+void qa_gr_top_block::t8_reconfig_max_noutputs_per_block()
+{
+ if (VERBOSE) std::cout << "qa_gr_top_block::t8()\n";
+
+ gr_top_block_sptr tb = gr_make_top_block("top");
+
+ gr_block_sptr src = gr_make_null_source(sizeof(int));
+ gr_block_sptr head = gr_make_head(sizeof(int), 100000);
+ gr_block_sptr dst = gr_make_null_sink(sizeof(int));
+
+ head->set_max_noutput_items(99);
+
+ // Start infinite flowgraph
+ tb->connect(src, 0, dst, 0);
+ tb->start(201);
+
+ // Reconfigure with gr_head in the middle
+ tb->lock();
+ tb->disconnect(src, 0, dst, 0);
+ tb->connect(src, 0, head, 0);
+ tb->connect(head, 0, dst, 0);
+ tb->set_max_noutput_items(1023);
+ head->set_max_noutput_items(513);
+ tb->unlock();
+
+ // Wait for flowgraph to end on its own
+ tb->wait();
+}
+
+void qa_gr_top_block::t9_max_output_buffer()
+{
+ if (VERBOSE) std::cout << "qa_gr_top_block::t9()\n";
+
+ gr_top_block_sptr tb = gr_make_top_block("top");
+
+ gr_block_sptr src = gr_make_null_source(sizeof(int));
+ gr_block_sptr head = gr_make_head(sizeof(int), 100000);
+ gr_block_sptr dst = gr_make_null_sink(sizeof(int));
+
+ head->set_max_output_buffer(1024);
+
+ // Start infinite flowgraph
+ tb->connect(src, 0, head, 0);
+ tb->connect(head, 0, dst, 0);
+ tb->start();
+ tb->wait();
+}
+
+void qa_gr_top_block::t10_reconfig_max_output_buffer()
+{
+ if (VERBOSE) std::cout << "qa_gr_top_block::t10()\n";
+
+ gr_top_block_sptr tb = gr_make_top_block("top");
+
+ gr_block_sptr src = gr_make_null_source(sizeof(int));
+ gr_block_sptr head = gr_make_head(sizeof(int), 100000);
+ gr_block_sptr dst = gr_make_null_sink(sizeof(int));
+
+ head->set_max_output_buffer(1000);
+
+ // Start infinite flowgraph
+ tb->connect(src, 0, dst, 0);
+ tb->start(201);
+
+ // Reconfigure with gr_head in the middle
+ tb->lock();
+ gr_block_sptr nop = gr_make_nop(sizeof(int));
+ nop->set_max_output_buffer(4000);
+ tb->disconnect(src, 0, dst, 0);
+ tb->connect(src, 0, head, 0);
+ tb->connect(head, 0, nop, 0);
+ tb->connect(nop, 0, dst, 0);
+ tb->unlock();
+
+ // Wait for flowgraph to end on its own
+ tb->wait();
+}
diff --git a/gnuradio-core/src/lib/runtime/qa_gr_top_block.h b/gnuradio-core/src/lib/runtime/qa_gr_top_block.h
index b223633e5..bb891abca 100644
--- a/gnuradio-core/src/lib/runtime/qa_gr_top_block.h
+++ b/gnuradio-core/src/lib/runtime/qa_gr_top_block.h
@@ -36,6 +36,8 @@ class qa_gr_top_block : public CppUnit::TestCase
CPPUNIT_TEST(t2_start_stop_wait);
CPPUNIT_TEST(t3_lock_unlock);
CPPUNIT_TEST(t4_reconfigure); // triggers 'join never returns' bug
+ CPPUNIT_TEST(t5_max_noutputs);
+ CPPUNIT_TEST(t6_reconfig_max_noutputs);
CPPUNIT_TEST_SUITE_END();
@@ -46,6 +48,12 @@ private:
void t2_start_stop_wait();
void t3_lock_unlock();
void t4_reconfigure();
+ void t5_max_noutputs();
+ void t6_reconfig_max_noutputs();
+ void t7_max_noutputs_per_block();
+ void t8_reconfig_max_noutputs_per_block();
+ void t9_max_output_buffer();
+ void t10_reconfig_max_output_buffer();
};
#endif /* INCLUDED_QA_GR_TOP_BLOCK_H */
diff --git a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
index 25ae0b1e1..c84a219bd 100644
--- a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
+++ b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
@@ -65,15 +65,11 @@ void qa_set_msg_handler::t0()
tb->start();
// Send them...
+ pmt_t port(pmt_intern("port"));
for (int i = 0; i < NMSGS; i++){
- send(nop, mp(mp("example-msg"), mp(i)));
+ send(nop, port, mp(mp("example-msg"), mp(i)));
}
- // And send a message to null_source to confirm that the default
- // message handling action (which should be a nop) doesn't dump
- // core.
- send(src, mp(mp("example-msg"), mp(0)));
-
// Give the messages a chance to be processed
boost::this_thread::sleep(boost::posix_time::milliseconds(100));