summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/lib/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-core/src/lib/runtime')
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.cc29
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h9
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.cc9
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.h2
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flowgraph.cc9
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flowgraph.h3
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc9
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc9
-rw-r--r--gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc5
9 files changed, 71 insertions, 13 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index 7d2f275e8..69f2e09f9 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2006 Free Software Foundation, Inc.
+ * Copyright 2006,2012 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -28,6 +28,7 @@
#include <gr_block_registry.h>
#include <stdexcept>
#include <sstream>
+#include <iostream>
using namespace pmt;
@@ -79,6 +80,7 @@ gr_basic_block::set_block_alias(std::string name)
// - register a new input message port
void gr_basic_block::message_port_register_in(pmt::pmt_t port_id){
msg_queue[port_id] = msg_queue_t();
+ msg_queue_ready[port_id] = boost::shared_ptr<boost::condition_variable>(new boost::condition_variable());
}
// - register a new output message port
@@ -131,7 +133,6 @@ void
gr_basic_block::_post(pmt_t which_port, pmt_t msg)
{
insert_tail(which_port, msg);
- global_block_registry.notify_blk(alias());
}
void
@@ -139,12 +140,16 @@ gr_basic_block::insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg)
{
gruel::scoped_lock guard(mutex);
+ if( (msg_queue.find(which_port) == msg_queue.end()) || (msg_queue_ready.find(which_port) == msg_queue_ready.end())){
+ std::cout << "target port = " << pmt::pmt_symbol_to_string(which_port) << std::endl;
+ throw std::runtime_error("attempted to insert_tail on invalid queue!");
+ }
+
msg_queue[which_port].push_back(msg);
+ msg_queue_ready[which_port]->notify_one();
// wake up thread if BLKD_IN or BLKD_OUT
- //input_cond.notify_one();
- //output_cond.notify_one();
- // TODO: reconsider the need for notification of input and output conditions!
+ global_block_registry.notify_blk(alias());
}
pmt_t
@@ -162,4 +167,18 @@ gr_basic_block::delete_head_nowait(pmt::pmt_t which_port)
return m;
}
+pmt_t
+gr_basic_block::delete_head_blocking(pmt::pmt_t which_port)
+{
+ gruel::scoped_lock guard(mutex);
+
+ while (empty_p(which_port)){
+ msg_queue_ready[which_port]->wait(guard);
+ }
+
+ pmt_t m(msg_queue[which_port].front());
+ msg_queue[which_port].pop_front();
+ return m;
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index 2ee8161c1..e0fd5d2af 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -35,6 +35,7 @@
#include <gr_io_signature.h>
#include <gruel/thread.h>
#include <boost/foreach.hpp>
+#include <boost/thread/condition_variable.hpp>
/*!
* \brief The abstract base class for all signal processing blocks.
@@ -72,6 +73,9 @@ private:
typedef std::deque<pmt::pmt_t> msg_queue_t;
typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t;
msg_queue_map_t msg_queue;
+// boost::condition_variable msg_queue_ready;
+ std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready;
+
gruel::mutex mutex; //< protects all vars
@@ -163,6 +167,11 @@ public:
*/
pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
+
msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
return msg_queue[which_port].begin();
}
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index dc77128a3..43aebf0bf 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -251,3 +251,12 @@ operator << (std::ostream& os, const gr_block *m)
return os;
}
+int
+gr_block::general_work(int noutput_items,
+ gr_vector_int &ninput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+{
+ throw std::runtime_error("gr_block::general_work() not implemented");
+ return 0;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 98339080d..57e3fda90 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -124,7 +124,7 @@ class GR_CORE_API gr_block : public gr_basic_block {
virtual int general_work (int noutput_items,
gr_vector_int &ninput_items,
gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items) = 0;
+ gr_vector_void_star &output_items);
/*!
* \brief Called to enable drivers, etc for i/o devices.
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
index 78e1bc99a..69c304a3d 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
@@ -180,6 +180,11 @@ gr_flowgraph::calc_used_blocks()
{
gr_basic_block_vector_t tmp;
+ // make sure free standing message blocks are included
+ for (gr_basic_block_vector_t::iterator it=d_msgblocks.begin(); it!=d_msgblocks.end(); it++){
+ tmp.push_back(*it);
+ }
+
// Collect all blocks in the edge list
for (gr_edge_viter_t p = d_edges.begin(); p != d_edges.end(); p++) {
tmp.push_back(p->src().block());
@@ -472,3 +477,7 @@ gr_flowgraph::topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_ve
output.push_back(block);
}
+void gr_flowgraph::add_msg_block(gr_basic_block_sptr blk){
+ d_msgblocks.push_back(blk);
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
index a2c1580eb..860cb0ff1 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
@@ -110,6 +110,8 @@ public:
void disconnect(gr_basic_block_sptr src_block, int src_port,
gr_basic_block_sptr dst_block, int dst_port);
+ void add_msg_block(gr_basic_block_sptr blk);
+
// Validate connectivity, raise exception if invalid
void validate();
@@ -128,6 +130,7 @@ public:
// Return vector of vectors of disjointly connected blocks, topologically
// sorted.
std::vector<gr_basic_block_vector_t> partition();
+ gr_basic_block_vector_t d_msgblocks;
protected:
gr_basic_block_vector_t d_blocks;
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
index 099b2f8e8..ff2a5db8c 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
@@ -152,6 +152,14 @@ gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
// register the subscription
src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+
+ // add block uniquely to list to internal blocks
+ if (std::find(d_blocks.begin(), d_blocks.end(), dst) == d_blocks.end()){
+ d_blocks.push_back(dst);
+ }
+
+ // make sure we instantiate a thread for this block
+ d_fg->add_msg_block(dst);
}
void
@@ -449,6 +457,7 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
}
}
}
+ sfg->d_msgblocks = d_fg->d_msgblocks;
// Construct unique list of blocks used either in edges, inputs,
// outputs, or by themselves. I still hate STL.
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index ff2afca10..9f17a48a8 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -32,7 +32,7 @@ using namespace pmt;
gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_items)
: d_exec(block, max_noutput_items)
{
- // std::cerr << "gr_tpb_thread_body: " << block << std::endl;
+ //std::cerr << "gr_tpb_thread_body: " << block << std::endl;
gr_block_detail *d = block->detail().get();
gr_block_executor::state s;
@@ -53,7 +53,12 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
}
d->d_tpb.clear_changed();
- s = d_exec.run_one_iteration();
+ // run one iteration if we are a connected stream block
+ if(d->noutputs() >0 || d->ninputs()>0){
+ s = d_exec.run_one_iteration();
+ } else {
+ s = gr_block_executor::BLKD_IN;
+ }
switch(s){
case gr_block_executor::READY: // Tell neighbors we made progress.
diff --git a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
index dc8f0f8a9..c84a219bd 100644
--- a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
+++ b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
@@ -70,11 +70,6 @@ void qa_set_msg_handler::t0()
send(nop, port, mp(mp("example-msg"), mp(i)));
}
- // And send a message to null_source to confirm that the default
- // message handling action (which should be a nop) doesn't dump
- // core.
- send(src, port, mp(mp("example-msg"), mp(0)));
-
// Give the messages a chance to be processed
boost::this_thread::sleep(boost::posix_time::milliseconds(100));