summaryrefslogtreecommitdiff
path: root/gnuradio-core/src
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-core/src')
-rw-r--r--gnuradio-core/src/lib/runtime/Makefile.am2
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.cc3
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h4
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.cc7
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.h5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_msg_accepter.cc59
-rw-r--r--gnuradio-core/src/lib/runtime/gr_msg_accepter.h42
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_detail.cc45
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_detail.h26
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc42
10 files changed, 222 insertions, 13 deletions
diff --git a/gnuradio-core/src/lib/runtime/Makefile.am b/gnuradio-core/src/lib/runtime/Makefile.am
index 14ab464ad..b0e804277 100644
--- a/gnuradio-core/src/lib/runtime/Makefile.am
+++ b/gnuradio-core/src/lib/runtime/Makefile.am
@@ -44,6 +44,7 @@ libruntime_la_SOURCES = \
gr_io_signature.cc \
gr_local_sighandler.cc \
gr_message.cc \
+ gr_msg_accepter.cc \
gr_msg_handler.cc \
gr_msg_queue.cc \
gr_pagesize.cc \
@@ -96,6 +97,7 @@ grinclude_HEADERS = \
gr_io_signature.h \
gr_local_sighandler.h \
gr_message.h \
+ gr_msg_accepter.h \
gr_msg_handler.h \
gr_msg_queue.h \
gr_pagesize.h \
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index 71ccc0245..2fa1066cb 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -41,8 +41,7 @@ gr_basic_block_ncurrently_allocated()
gr_basic_block::gr_basic_block(const std::string &name,
gr_io_signature_sptr input_signature,
gr_io_signature_sptr output_signature)
- : gruel::msg_accepter_msgq(gruel::make_msg_queue(0)),
- d_name(name),
+ : d_name(name),
d_input_signature(input_signature),
d_output_signature(output_signature),
d_unique_id(s_next_id++),
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index 27ec0fd89..b8797fdc6 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -26,7 +26,7 @@
#include <gr_runtime_types.h>
#include <gr_sptr_magic.h>
#include <boost/enable_shared_from_this.hpp>
-#include <gruel/msg_accepter_msgq.h>
+#include <gr_msg_accepter.h>
#include <string>
/*!
@@ -40,7 +40,7 @@
* signal processing functions.
*/
-class gr_basic_block : gruel::msg_accepter_msgq, public boost::enable_shared_from_this<gr_basic_block>
+class gr_basic_block : public gr_msg_accepter, public boost::enable_shared_from_this<gr_basic_block>
{
protected:
friend class gr_flowgraph;
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
index f36a6c215..38d4a13ca 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
@@ -118,3 +118,10 @@ gr_block_detail::produce_each (int how_many_items)
d_produce_or |= how_many_items;
}
}
+
+
+void
+gr_block_detail::_post(pmt::pmt_t msg)
+{
+ d_tpb.insert_tail(msg);
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index f32a875ec..c5787a5ad 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -83,7 +83,10 @@ class gr_block_detail {
*/
void produce_each (int how_many_items);
-
+ /*!
+ * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
+ */
+ void _post(pmt::pmt_t msg);
gr_tpb_detail d_tpb; // used by thread-per-block scheduler
int d_produce_or;
diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
new file mode 100644
index 000000000..89876ae29
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
@@ -0,0 +1,59 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2009 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#if HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <gr_msg_accepter.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+#include <gr_hier_block2.h>
+#include <stdexcept>
+
+using namespace pmt;
+
+gr_msg_accepter::gr_msg_accepter()
+{
+}
+
+gr_msg_accepter::~gr_msg_accepter()
+{
+ // NOP, required as virtual destructor
+}
+
+void
+gr_msg_accepter::post(pmt_t msg)
+{
+ // Notify derived class, handled case by case
+ gr_block *p = dynamic_cast<gr_block *>(this);
+ if (p) {
+ p->detail()->_post(msg);
+ return;
+ }
+ gr_hier_block2 *p2 = dynamic_cast<gr_hier_block2 *>(this);
+ if (p2){
+ // FIXME do the right thing
+ return;
+ }
+
+ throw std::runtime_error("unknown derived class");
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
new file mode 100644
index 000000000..79a631f3a
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
@@ -0,0 +1,42 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2009 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef INCLUDED_GR_MSG_ACCEPTER_H
+#define INCLUDED_GR_MSG_ACCEPTER_H
+
+#include <gruel/msg_accepter.h>
+#include <gruel/pmt.h>
+
+/*!
+ * \brief Accepts messages and inserts them into a message queue, then notifies
+ * subclass gr_basic_block there is a message pending.
+ */
+class gr_msg_accepter : public gruel::msg_accepter
+{
+public:
+ gr_msg_accepter();
+ ~gr_msg_accepter();
+
+ void post(pmt::pmt_t msg);
+
+};
+
+#endif /* INCLUDED_GR_MSG_ACCEPTER_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
index 02e8deed8..c6311ccaa 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2008 Free Software Foundation, Inc.
+ * Copyright 2008,2009 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -27,6 +27,8 @@
#include <gr_block_detail.h>
#include <gr_buffer.h>
+using namespace pmt;
+
/*
* We assume that no worker threads are ever running when the
* graph structure is being manipulated, thus it's safe for us to poke
@@ -65,3 +67,44 @@ gr_tpb_detail::notify_neighbors(gr_block_detail *d)
notify_downstream(d);
notify_upstream(d);
}
+
+void
+gr_tpb_detail::insert_tail(pmt::pmt_t msg)
+{
+ gruel::scoped_lock guard(mutex);
+
+ msg_queue.push_back(msg);
+
+ // wake up thread if BLKD_IN or BLKD_OUT
+ input_cond.notify_one();
+ output_cond.notify_one();
+}
+
+pmt_t
+gr_tpb_detail::delete_head_nowait()
+{
+ gruel::scoped_lock guard(mutex);
+
+ if (empty_p())
+ return pmt_t();
+
+ pmt_t m(msg_queue.front());
+ msg_queue.pop_front();
+
+ return m;
+}
+
+/*
+ * Caller must already be holding the mutex
+ */
+pmt_t
+gr_tpb_detail::delete_head_nowait_already_holding_mutex()
+{
+ if (empty_p())
+ return pmt_t();
+
+ pmt_t m(msg_queue.front());
+ msg_queue.pop_front();
+
+ return m;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
index ab955240b..acfa264c7 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
@@ -22,6 +22,8 @@
#define INCLUDED_GR_TPB_DETAIL_H
#include <gruel/thread.h>
+#include <deque>
+#include <gruel/pmt.h>
class gr_block_detail;
@@ -36,9 +38,12 @@ struct gr_tpb_detail {
bool output_changed;
gruel::condition_variable output_cond;
- gr_tpb_detail()
- : input_changed(false), output_changed(false) {}
+private:
+ std::deque<pmt::pmt_t> msg_queue;
+public:
+ gr_tpb_detail()
+ : input_changed(false), output_changed(false) { }
//! Called by us to tell all our upstream blocks that their output may have changed.
void notify_upstream(gr_block_detail *d);
@@ -56,6 +61,23 @@ struct gr_tpb_detail {
input_changed = false;
output_changed = false;
}
+
+ //! is the queue empty?
+ bool empty_p() const { return msg_queue.empty(); }
+
+ //| Acquires and release the mutex
+ void insert_tail(pmt::pmt_t msg);
+
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_nowait();
+
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ * Caller must already be holding the mutex
+ */
+ pmt::pmt_t delete_head_nowait_already_holding_mutex();
private:
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index 458b16d64..03eef17d9 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -24,17 +24,26 @@
#include <gr_tpb_thread_body.h>
#include <iostream>
#include <boost/thread.hpp>
+#include <gruel/pmt.h>
+
+using namespace pmt;
gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
: d_exec(block)
{
// std::cerr << "gr_tpb_thread_body: " << block << std::endl;
- gr_block_detail *d = block->detail().get();
+ gr_block_detail *d = block->detail().get();
gr_block_executor::state s;
+ pmt_t msg;
+
while (1){
boost::this_thread::interruption_point();
+
+ // handle any queued up messages
+ while ((msg = d->d_tpb.delete_head_nowait()))
+ block->handle_msg(msg);
d->d_tpb.clear_changed();
s = d_exec.run_one_iteration();
@@ -55,16 +64,39 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
case gr_block_executor::BLKD_IN: // Wait for input.
{
gruel::scoped_lock guard(d->d_tpb.mutex);
- while(!d->d_tpb.input_changed)
- d->d_tpb.input_cond.wait(guard);
+ while (!d->d_tpb.input_changed){
+
+ // wait for input or message
+ while(!d->d_tpb.input_changed && d->d_tpb.empty_p())
+ d->d_tpb.input_cond.wait(guard);
+
+ // handle all pending messages
+ while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
+ guard.unlock(); // release lock while processing msg
+ block->handle_msg(msg);
+ guard.lock();
+ }
+ }
}
break;
+
case gr_block_executor::BLKD_OUT: // Wait for output buffer space.
{
gruel::scoped_lock guard(d->d_tpb.mutex);
- while(!d->d_tpb.output_changed)
- d->d_tpb.output_cond.wait(guard);
+ while (!d->d_tpb.output_changed){
+
+ // wait for output room or message
+ while(!d->d_tpb.output_changed && d->d_tpb.empty_p())
+ d->d_tpb.output_cond.wait(guard);
+
+ // handle all pending messages
+ while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
+ guard.unlock(); // release lock while processing msg
+ block->handle_msg(msg);
+ guard.lock();
+ }
+ }
}
break;