summaryrefslogtreecommitdiff
path: root/gnuradio-core
diff options
context:
space:
mode:
authorTim O'Shea2012-11-30 22:31:43 -0800
committerJohnathan Corgan2012-11-30 22:41:46 -0800
commit6cc818260128df57c51a41e4e6aa459de5faf4fe (patch)
treeb79aaa3f6c584c13bf203406367c756a98c94bdf /gnuradio-core
parent4478cb86a5dc9fccf66c2cdd5806419b70c3837e (diff)
downloadgnuradio-6cc818260128df57c51a41e4e6aa459de5faf4fe.tar.gz
gnuradio-6cc818260128df57c51a41e4e6aa459de5faf4fe.tar.bz2
gnuradio-6cc818260128df57c51a41e4e6aa459de5faf4fe.zip
core: gr_blocks can now have only message ports with no general_work()
* msg only blocks now get thread context * added blocking msg queue delete call * added gr_message_strobe block * added grc definitions for message_debug, message_strobe, pdu_to_tagged_stream, tagged_stream_to_pdu. * allow message fan-in connections in GRC
Diffstat (limited to 'gnuradio-core')
-rw-r--r--gnuradio-core/src/lib/general/CMakeLists.txt1
-rw-r--r--gnuradio-core/src/lib/general/general.i2
-rw-r--r--gnuradio-core/src/lib/general/gr_message_strobe.cc83
-rw-r--r--gnuradio-core/src/lib/general/gr_message_strobe.h65
-rw-r--r--gnuradio-core/src/lib/general/gr_message_strobe.i30
-rw-r--r--gnuradio-core/src/lib/io/gr_message_debug.cc4
-rw-r--r--gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc7
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.cc29
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h9
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.cc9
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.h2
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flowgraph.cc9
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flowgraph.h3
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc9
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc9
-rw-r--r--gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc5
16 files changed, 259 insertions, 17 deletions
diff --git a/gnuradio-core/src/lib/general/CMakeLists.txt b/gnuradio-core/src/lib/general/CMakeLists.txt
index 074f583a7..4c99acfc3 100644
--- a/gnuradio-core/src/lib/general/CMakeLists.txt
+++ b/gnuradio-core/src/lib/general/CMakeLists.txt
@@ -299,6 +299,7 @@ set(gr_core_general_triple_threats
gr_burst_tagger
gr_correlate_access_code_tag_bb
gr_tag_debug
+ gr_message_strobe
)
foreach(file_tt ${gr_core_general_triple_threats})
diff --git a/gnuradio-core/src/lib/general/general.i b/gnuradio-core/src/lib/general/general.i
index e5a9e970d..1446088a2 100644
--- a/gnuradio-core/src/lib/general/general.i
+++ b/gnuradio-core/src/lib/general/general.i
@@ -143,6 +143,7 @@
#include <gr_add_ff.h>
#include <gr_vector_map.h>
#include <gr_tag_debug.h>
+#include <gr_message_strobe.h>
%}
%include "gri_control_loop.i"
@@ -267,3 +268,4 @@
%include "gr_vector_map.i"
%include "gr_tag_debug.i"
%include "gr_block_gateway.i"
+%include "gr_message_strobe.i"
diff --git a/gnuradio-core/src/lib/general/gr_message_strobe.cc b/gnuradio-core/src/lib/general/gr_message_strobe.cc
new file mode 100644
index 000000000..371f472ef
--- /dev/null
+++ b/gnuradio-core/src/lib/general/gr_message_strobe.cc
@@ -0,0 +1,83 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2005,2010 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_message_strobe.h>
+#include <gr_io_signature.h>
+#include <cstdio>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdexcept>
+#include <string.h>
+#include <iostream>
+
+// public constructor that returns a shared_ptr
+
+gr_message_strobe_sptr
+gr_make_message_strobe (pmt::pmt_t msg, float period_ms)
+{
+ return gnuradio::get_initial_sptr(new gr_message_strobe(msg, period_ms));
+}
+
+gr_message_strobe::gr_message_strobe (pmt::pmt_t msg, float period_ms)
+ : gr_sync_block("message_strobe",
+ gr_make_io_signature(0, 0, 0),
+ gr_make_io_signature(0, 0, 0)),
+ d_finished(false),
+ d_period_ms(period_ms),
+ d_msg(msg)
+{
+ message_port_register_out(pmt::mp("strobe"));
+ d_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&gr_message_strobe::run, this)));
+
+ message_port_register_in(pmt::mp("set_msg"));
+ set_msg_handler(pmt::mp("set_msg"), boost::bind(&gr_message_strobe::set_msg, this, _1));
+}
+
+gr_message_strobe::~gr_message_strobe()
+{
+ d_finished = true;
+ d_thread->interrupt();
+ d_thread->join();
+}
+
+void gr_message_strobe::run(){
+ while(!d_finished) {
+ boost::this_thread::sleep(boost::posix_time::milliseconds(d_period_ms));
+ if(d_finished){ return; }
+// std::cout << "strobing...\n";
+ message_port_pub( pmt::mp("strobe"), d_msg );
+ }
+}
+
+int
+gr_message_strobe::work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+{
+ return 0; // FIXME: replace with default NOP work function in gr_block
+}
diff --git a/gnuradio-core/src/lib/general/gr_message_strobe.h b/gnuradio-core/src/lib/general/gr_message_strobe.h
new file mode 100644
index 000000000..a5151a30b
--- /dev/null
+++ b/gnuradio-core/src/lib/general/gr_message_strobe.h
@@ -0,0 +1,65 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2005 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_MESSAGE_STROBE_H
+#define INCLUDED_GR_MESSAGE_STROBE_H
+
+#include <gr_core_api.h>
+#include <gr_sync_block.h>
+#include <gr_message.h>
+#include <gr_msg_queue.h>
+
+class gr_message_strobe;
+typedef boost::shared_ptr<gr_message_strobe> gr_message_strobe_sptr;
+
+GR_CORE_API gr_message_strobe_sptr gr_make_message_strobe (pmt::pmt_t msg, float period_ms);
+
+/*!
+ * \brief Gather received items into messages and insert into msgq
+ * \ingroup sink_blk
+ */
+class GR_CORE_API gr_message_strobe : public gr_sync_block
+{
+ private:
+ friend GR_CORE_API gr_message_strobe_sptr
+ gr_make_message_strobe(pmt::pmt_t msg, float period_ms);
+ boost::shared_ptr<boost::thread> d_thread;
+ bool d_finished;
+ float d_period_ms;
+ pmt::pmt_t d_msg;
+
+ void run();
+
+ protected:
+ gr_message_strobe (pmt::pmt_t msg, float period_ms);
+
+ public:
+ ~gr_message_strobe ();
+
+ void set_msg(pmt::pmt_t msg){ d_msg = msg; }
+
+ int work (int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
+};
+
+#endif /* INCLUDED_GR_MESSAGE_STROBE_H */
diff --git a/gnuradio-core/src/lib/general/gr_message_strobe.i b/gnuradio-core/src/lib/general/gr_message_strobe.i
new file mode 100644
index 000000000..490aa8e8a
--- /dev/null
+++ b/gnuradio-core/src/lib/general/gr_message_strobe.i
@@ -0,0 +1,30 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2005 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+GR_SWIG_BLOCK_MAGIC(gr,message_strobe);
+
+%{
+#include <gr_message_strobe.h>
+%}
+
+%include "gr_message_strobe.h"
+
diff --git a/gnuradio-core/src/lib/io/gr_message_debug.cc b/gnuradio-core/src/lib/io/gr_message_debug.cc
index 84c11c46e..99f4a1f7b 100644
--- a/gnuradio-core/src/lib/io/gr_message_debug.cc
+++ b/gnuradio-core/src/lib/io/gr_message_debug.cc
@@ -44,8 +44,9 @@ gr_make_message_debug ()
}
void gr_message_debug::print(pmt::pmt_t msg){
- std::cout << "******* DEBUG PRINT ********\n";
+ std::cout << "******* MESSAGE DEBUG PRINT ********\n";
pmt::pmt_print(msg);
+ std::cout << "************************************\n";
}
@@ -67,5 +68,6 @@ gr_message_debug::work(int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
+ printf("gr_message_debug::work\n");
return 0; // FIXME: replace with default NOP work function in gr_block
}
diff --git a/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc b/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc
index 26c1babd6..06a1c9596 100644
--- a/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc
+++ b/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc
@@ -77,7 +77,8 @@ gr_pdu_to_tagged_stream::work(int noutput_items,
if(noutput_items > 0){
// grab a message if one exists
- pmt::pmt_t msg( delete_head_nowait( pdu_port_id ) );
+ //pmt::pmt_t msg( delete_head_nowait( pdu_port_id ) );
+ pmt::pmt_t msg( delete_head_blocking( pdu_port_id ) );
if(msg.get() == NULL ){
return nout;
}
@@ -87,8 +88,8 @@ gr_pdu_to_tagged_stream::work(int noutput_items,
throw std::runtime_error("received a malformed pdu message!");
}
- printf("got a msg\n");
- pmt::pmt_print(msg);
+// printf("got a msg\n");
+// pmt::pmt_print(msg);
// grab the components of the pdu message
pmt::pmt_t meta(pmt::pmt_car(msg)); // make sure this is NIL || Dict ?
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index 7d2f275e8..69f2e09f9 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2006 Free Software Foundation, Inc.
+ * Copyright 2006,2012 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -28,6 +28,7 @@
#include <gr_block_registry.h>
#include <stdexcept>
#include <sstream>
+#include <iostream>
using namespace pmt;
@@ -79,6 +80,7 @@ gr_basic_block::set_block_alias(std::string name)
// - register a new input message port
void gr_basic_block::message_port_register_in(pmt::pmt_t port_id){
msg_queue[port_id] = msg_queue_t();
+ msg_queue_ready[port_id] = boost::shared_ptr<boost::condition_variable>(new boost::condition_variable());
}
// - register a new output message port
@@ -131,7 +133,6 @@ void
gr_basic_block::_post(pmt_t which_port, pmt_t msg)
{
insert_tail(which_port, msg);
- global_block_registry.notify_blk(alias());
}
void
@@ -139,12 +140,16 @@ gr_basic_block::insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg)
{
gruel::scoped_lock guard(mutex);
+ if( (msg_queue.find(which_port) == msg_queue.end()) || (msg_queue_ready.find(which_port) == msg_queue_ready.end())){
+ std::cout << "target port = " << pmt::pmt_symbol_to_string(which_port) << std::endl;
+ throw std::runtime_error("attempted to insert_tail on invalid queue!");
+ }
+
msg_queue[which_port].push_back(msg);
+ msg_queue_ready[which_port]->notify_one();
// wake up thread if BLKD_IN or BLKD_OUT
- //input_cond.notify_one();
- //output_cond.notify_one();
- // TODO: reconsider the need for notification of input and output conditions!
+ global_block_registry.notify_blk(alias());
}
pmt_t
@@ -162,4 +167,18 @@ gr_basic_block::delete_head_nowait(pmt::pmt_t which_port)
return m;
}
+pmt_t
+gr_basic_block::delete_head_blocking(pmt::pmt_t which_port)
+{
+ gruel::scoped_lock guard(mutex);
+
+ while (empty_p(which_port)){
+ msg_queue_ready[which_port]->wait(guard);
+ }
+
+ pmt_t m(msg_queue[which_port].front());
+ msg_queue[which_port].pop_front();
+ return m;
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index 2ee8161c1..e0fd5d2af 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -35,6 +35,7 @@
#include <gr_io_signature.h>
#include <gruel/thread.h>
#include <boost/foreach.hpp>
+#include <boost/thread/condition_variable.hpp>
/*!
* \brief The abstract base class for all signal processing blocks.
@@ -72,6 +73,9 @@ private:
typedef std::deque<pmt::pmt_t> msg_queue_t;
typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t;
msg_queue_map_t msg_queue;
+// boost::condition_variable msg_queue_ready;
+ std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready;
+
gruel::mutex mutex; //< protects all vars
@@ -163,6 +167,11 @@ public:
*/
pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
+
msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
return msg_queue[which_port].begin();
}
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index dc77128a3..43aebf0bf 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -251,3 +251,12 @@ operator << (std::ostream& os, const gr_block *m)
return os;
}
+int
+gr_block::general_work(int noutput_items,
+ gr_vector_int &ninput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+{
+ throw std::runtime_error("gr_block::general_work() not implemented");
+ return 0;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 98339080d..57e3fda90 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -124,7 +124,7 @@ class GR_CORE_API gr_block : public gr_basic_block {
virtual int general_work (int noutput_items,
gr_vector_int &ninput_items,
gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items) = 0;
+ gr_vector_void_star &output_items);
/*!
* \brief Called to enable drivers, etc for i/o devices.
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
index 78e1bc99a..69c304a3d 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
@@ -180,6 +180,11 @@ gr_flowgraph::calc_used_blocks()
{
gr_basic_block_vector_t tmp;
+ // make sure free standing message blocks are included
+ for (gr_basic_block_vector_t::iterator it=d_msgblocks.begin(); it!=d_msgblocks.end(); it++){
+ tmp.push_back(*it);
+ }
+
// Collect all blocks in the edge list
for (gr_edge_viter_t p = d_edges.begin(); p != d_edges.end(); p++) {
tmp.push_back(p->src().block());
@@ -472,3 +477,7 @@ gr_flowgraph::topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_ve
output.push_back(block);
}
+void gr_flowgraph::add_msg_block(gr_basic_block_sptr blk){
+ d_msgblocks.push_back(blk);
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
index a2c1580eb..860cb0ff1 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
@@ -110,6 +110,8 @@ public:
void disconnect(gr_basic_block_sptr src_block, int src_port,
gr_basic_block_sptr dst_block, int dst_port);
+ void add_msg_block(gr_basic_block_sptr blk);
+
// Validate connectivity, raise exception if invalid
void validate();
@@ -128,6 +130,7 @@ public:
// Return vector of vectors of disjointly connected blocks, topologically
// sorted.
std::vector<gr_basic_block_vector_t> partition();
+ gr_basic_block_vector_t d_msgblocks;
protected:
gr_basic_block_vector_t d_blocks;
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
index 099b2f8e8..ff2a5db8c 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
@@ -152,6 +152,14 @@ gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
// register the subscription
src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+
+ // add block uniquely to list to internal blocks
+ if (std::find(d_blocks.begin(), d_blocks.end(), dst) == d_blocks.end()){
+ d_blocks.push_back(dst);
+ }
+
+ // make sure we instantiate a thread for this block
+ d_fg->add_msg_block(dst);
}
void
@@ -449,6 +457,7 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
}
}
}
+ sfg->d_msgblocks = d_fg->d_msgblocks;
// Construct unique list of blocks used either in edges, inputs,
// outputs, or by themselves. I still hate STL.
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index ff2afca10..9f17a48a8 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -32,7 +32,7 @@ using namespace pmt;
gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_items)
: d_exec(block, max_noutput_items)
{
- // std::cerr << "gr_tpb_thread_body: " << block << std::endl;
+ //std::cerr << "gr_tpb_thread_body: " << block << std::endl;
gr_block_detail *d = block->detail().get();
gr_block_executor::state s;
@@ -53,7 +53,12 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
}
d->d_tpb.clear_changed();
- s = d_exec.run_one_iteration();
+ // run one iteration if we are a connected stream block
+ if(d->noutputs() >0 || d->ninputs()>0){
+ s = d_exec.run_one_iteration();
+ } else {
+ s = gr_block_executor::BLKD_IN;
+ }
switch(s){
case gr_block_executor::READY: // Tell neighbors we made progress.
diff --git a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
index dc8f0f8a9..c84a219bd 100644
--- a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
+++ b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
@@ -70,11 +70,6 @@ void qa_set_msg_handler::t0()
send(nop, port, mp(mp("example-msg"), mp(i)));
}
- // And send a message to null_source to confirm that the default
- // message handling action (which should be a nop) doesn't dump
- // core.
- send(src, port, mp(mp("example-msg"), mp(0)));
-
// Give the messages a chance to be processed
boost::this_thread::sleep(boost::posix_time::milliseconds(100));