diff options
Diffstat (limited to 'gnuradio-core/src/lib/runtime')
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_basic_block.cc | 29 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_basic_block.h | 9 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block.cc | 9 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block.h | 2 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_flowgraph.cc | 9 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_flowgraph.h | 3 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc | 9 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc | 9 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc | 5 |
9 files changed, 71 insertions, 13 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc index 7d2f275e8..69f2e09f9 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2006 Free Software Foundation, Inc. + * Copyright 2006,2012 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -28,6 +28,7 @@ #include <gr_block_registry.h> #include <stdexcept> #include <sstream> +#include <iostream> using namespace pmt; @@ -79,6 +80,7 @@ gr_basic_block::set_block_alias(std::string name) // - register a new input message port void gr_basic_block::message_port_register_in(pmt::pmt_t port_id){ msg_queue[port_id] = msg_queue_t(); + msg_queue_ready[port_id] = boost::shared_ptr<boost::condition_variable>(new boost::condition_variable()); } // - register a new output message port @@ -131,7 +133,6 @@ void gr_basic_block::_post(pmt_t which_port, pmt_t msg) { insert_tail(which_port, msg); - global_block_registry.notify_blk(alias()); } void @@ -139,12 +140,16 @@ gr_basic_block::insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg) { gruel::scoped_lock guard(mutex); + if( (msg_queue.find(which_port) == msg_queue.end()) || (msg_queue_ready.find(which_port) == msg_queue_ready.end())){ + std::cout << "target port = " << pmt::pmt_symbol_to_string(which_port) << std::endl; + throw std::runtime_error("attempted to insert_tail on invalid queue!"); + } + msg_queue[which_port].push_back(msg); + msg_queue_ready[which_port]->notify_one(); // wake up thread if BLKD_IN or BLKD_OUT - //input_cond.notify_one(); - //output_cond.notify_one(); - // TODO: reconsider the need for notification of input and output conditions! + global_block_registry.notify_blk(alias()); } pmt_t @@ -162,4 +167,18 @@ gr_basic_block::delete_head_nowait(pmt::pmt_t which_port) return m; } +pmt_t +gr_basic_block::delete_head_blocking(pmt::pmt_t which_port) +{ + gruel::scoped_lock guard(mutex); + + while (empty_p(which_port)){ + msg_queue_ready[which_port]->wait(guard); + } + + pmt_t m(msg_queue[which_port].front()); + msg_queue[which_port].pop_front(); + return m; +} + diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h index 2ee8161c1..e0fd5d2af 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.h +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h @@ -35,6 +35,7 @@ #include <gr_io_signature.h> #include <gruel/thread.h> #include <boost/foreach.hpp> +#include <boost/thread/condition_variable.hpp> /*! * \brief The abstract base class for all signal processing blocks. @@ -72,6 +73,9 @@ private: typedef std::deque<pmt::pmt_t> msg_queue_t; typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t; msg_queue_map_t msg_queue; +// boost::condition_variable msg_queue_ready; + std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready; + gruel::mutex mutex; //< protects all vars @@ -163,6 +167,11 @@ public: */ pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port); + /*! + * \returns returns pmt at head of queue or pmt_t() if empty. + */ + pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port); + msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){ return msg_queue[which_port].begin(); } diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc index dc77128a3..43aebf0bf 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_block.cc @@ -251,3 +251,12 @@ operator << (std::ostream& os, const gr_block *m) return os; } +int +gr_block::general_work(int noutput_items, + gr_vector_int &ninput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) +{ + throw std::runtime_error("gr_block::general_work() not implemented"); + return 0; +} diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h index 98339080d..57e3fda90 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.h +++ b/gnuradio-core/src/lib/runtime/gr_block.h @@ -124,7 +124,7 @@ class GR_CORE_API gr_block : public gr_basic_block { virtual int general_work (int noutput_items, gr_vector_int &ninput_items, gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) = 0; + gr_vector_void_star &output_items); /*! * \brief Called to enable drivers, etc for i/o devices. diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc index 78e1bc99a..69c304a3d 100644 --- a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc +++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc @@ -180,6 +180,11 @@ gr_flowgraph::calc_used_blocks() { gr_basic_block_vector_t tmp; + // make sure free standing message blocks are included + for (gr_basic_block_vector_t::iterator it=d_msgblocks.begin(); it!=d_msgblocks.end(); it++){ + tmp.push_back(*it); + } + // Collect all blocks in the edge list for (gr_edge_viter_t p = d_edges.begin(); p != d_edges.end(); p++) { tmp.push_back(p->src().block()); @@ -472,3 +477,7 @@ gr_flowgraph::topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_ve output.push_back(block); } +void gr_flowgraph::add_msg_block(gr_basic_block_sptr blk){ + d_msgblocks.push_back(blk); +} + diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h index a2c1580eb..860cb0ff1 100644 --- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h +++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h @@ -110,6 +110,8 @@ public: void disconnect(gr_basic_block_sptr src_block, int src_port, gr_basic_block_sptr dst_block, int dst_port); + void add_msg_block(gr_basic_block_sptr blk); + // Validate connectivity, raise exception if invalid void validate(); @@ -128,6 +130,7 @@ public: // Return vector of vectors of disjointly connected blocks, topologically // sorted. std::vector<gr_basic_block_vector_t> partition(); + gr_basic_block_vector_t d_msgblocks; protected: gr_basic_block_vector_t d_blocks; diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc index 099b2f8e8..ff2a5db8c 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc @@ -152,6 +152,14 @@ gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, // register the subscription src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport)); + + // add block uniquely to list to internal blocks + if (std::find(d_blocks.begin(), d_blocks.end(), dst) == d_blocks.end()){ + d_blocks.push_back(dst); + } + + // make sure we instantiate a thread for this block + d_fg->add_msg_block(dst); } void @@ -449,6 +457,7 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const } } } + sfg->d_msgblocks = d_fg->d_msgblocks; // Construct unique list of blocks used either in edges, inputs, // outputs, or by themselves. I still hate STL. diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc index ff2afca10..9f17a48a8 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc +++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc @@ -32,7 +32,7 @@ using namespace pmt; gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_items) : d_exec(block, max_noutput_items) { - // std::cerr << "gr_tpb_thread_body: " << block << std::endl; + //std::cerr << "gr_tpb_thread_body: " << block << std::endl; gr_block_detail *d = block->detail().get(); gr_block_executor::state s; @@ -53,7 +53,12 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item } d->d_tpb.clear_changed(); - s = d_exec.run_one_iteration(); + // run one iteration if we are a connected stream block + if(d->noutputs() >0 || d->ninputs()>0){ + s = d_exec.run_one_iteration(); + } else { + s = gr_block_executor::BLKD_IN; + } switch(s){ case gr_block_executor::READY: // Tell neighbors we made progress. diff --git a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc index dc8f0f8a9..c84a219bd 100644 --- a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc +++ b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc @@ -70,11 +70,6 @@ void qa_set_msg_handler::t0() send(nop, port, mp(mp("example-msg"), mp(i))); } - // And send a message to null_source to confirm that the default - // message handling action (which should be a nop) doesn't dump - // core. - send(src, port, mp(mp("example-msg"), mp(0))); - // Give the messages a chance to be processed boost::this_thread::sleep(boost::posix_time::milliseconds(100)); |