summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/lib/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-core/src/lib/runtime')
-rw-r--r--gnuradio-core/src/lib/runtime/Makefile.am20
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.cc8
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.h6
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.h11
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_executor.cc329
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_executor.h69
-rw-r--r--gnuradio-core/src/lib/runtime/gr_buffer.cc31
-rw-r--r--gnuradio-core/src/lib/runtime/gr_buffer.h71
-rw-r--r--gnuradio-core/src/lib/runtime/gr_buffer.i8
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc48
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h9
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flowgraph.h4
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc2
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler.cc33
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler.h64
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc87
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler_sts.h62
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc110
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler_thread.h59
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc95
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h60
-rw-r--r--gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc12
-rw-r--r--gnuradio-core/src/lib/runtime/gr_top_block.cc3
-rw-r--r--gnuradio-core/src/lib/runtime/gr_top_block_impl.cc135
-rw-r--r--gnuradio-core/src/lib/runtime/gr_top_block_impl.h32
-rw-r--r--gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc128
-rw-r--r--gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h55
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_detail.cc67
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_detail.h81
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc76
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h45
-rw-r--r--gnuradio-core/src/lib/runtime/qa_gr_buffer.cc14
32 files changed, 1336 insertions, 498 deletions
diff --git a/gnuradio-core/src/lib/runtime/Makefile.am b/gnuradio-core/src/lib/runtime/Makefile.am
index 550031b94..b21b32412 100644
--- a/gnuradio-core/src/lib/runtime/Makefile.am
+++ b/gnuradio-core/src/lib/runtime/Makefile.am
@@ -21,7 +21,7 @@
include $(top_srcdir)/Makefile.common
-AM_CPPFLAGS = $(STD_DEFINES_AND_INCLUDES) $(CPPUNIT_INCLUDES) $(WITH_INCLUDES)
+AM_CPPFLAGS = $(STD_DEFINES_AND_INCLUDES) $(CPPUNIT_INCLUDES) $(GRUEL_INCLUDES) $(WITH_INCLUDES)
noinst_LTLIBRARIES = libruntime.la libruntime-qa.la
@@ -35,6 +35,7 @@ libruntime_la_SOURCES = \
gr_flat_flowgraph.cc \
gr_block.cc \
gr_block_detail.cc \
+ gr_block_executor.cc \
gr_hier_block2.cc \
gr_hier_block2_detail.cc \
gr_buffer.cc \
@@ -48,16 +49,19 @@ libruntime_la_SOURCES = \
gr_pagesize.cc \
gr_preferences.cc \
gr_realtime.cc \
- gr_scheduler_thread.cc \
+ gr_scheduler.cc \
+ gr_scheduler_sts.cc \
+ gr_scheduler_tpb.cc \
gr_single_threaded_scheduler.cc \
gr_sptr_magic.cc \
gr_sync_block.cc \
gr_sync_decimator.cc \
gr_sync_interpolator.cc \
+ gr_tmp_path.cc \
gr_top_block.cc \
gr_top_block_impl.cc \
- gr_top_block_impl_sts.cc \
- gr_tmp_path.cc \
+ gr_tpb_detail.cc \
+ gr_tpb_thread_body.cc \
gr_vmcircbuf.cc \
gr_vmcircbuf_mmap_shm_open.cc \
gr_vmcircbuf_mmap_tmpfile.cc \
@@ -82,6 +86,7 @@ grinclude_HEADERS = \
gr_flat_flowgraph.h \
gr_block.h \
gr_block_detail.h \
+ gr_block_executor.h \
gr_hier_block2.h \
gr_hier_block2_detail.h \
gr_buffer.h \
@@ -97,7 +102,9 @@ grinclude_HEADERS = \
gr_preferences.h \
gr_realtime.h \
gr_runtime_types.h \
- gr_scheduler_thread.h \
+ gr_scheduler.h \
+ gr_scheduler_sts.h \
+ gr_scheduler_tpb.h \
gr_select_handler.h \
gr_single_threaded_scheduler.h \
gr_sptr_magic.h \
@@ -106,7 +113,8 @@ grinclude_HEADERS = \
gr_sync_interpolator.h \
gr_top_block.h \
gr_top_block_impl.h \
- gr_top_block_impl_sts.h \
+ gr_tpb_detail.h \
+ gr_tpb_thread_body.h \
gr_timer.h \
gr_tmp_path.h \
gr_types.h \
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index 0a8fb92c2..7c2e9901b 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -110,3 +110,11 @@ gr_block::fixed_rate_noutput_to_ninput(int noutput)
{
throw std::runtime_error("Unimplemented");
}
+
+std::ostream&
+operator << (std::ostream& os, const gr_block *m)
+{
+ os << "<gr_block " << m->name() << " (" << m->unique_id() << ")>";
+ return os;
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 79237ee83..437b610b4 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -214,9 +214,13 @@ class gr_block : public gr_basic_block {
typedef std::vector<gr_block_sptr> gr_block_vector_t;
typedef std::vector<gr_block_sptr>::iterator gr_block_viter_t;
-inline gr_block_sptr make_gr_block_sptr(gr_basic_block_sptr p)
+inline gr_block_sptr cast_to_block_sptr(gr_basic_block_sptr p)
{
return boost::dynamic_pointer_cast<gr_block, gr_basic_block>(p);
}
+
+std::ostream&
+operator << (std::ostream& os, const gr_block *m);
+
#endif /* INCLUDED_GR_BLOCK_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index a3b7731c0..2856c402c 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -24,6 +24,7 @@
#define INCLUDED_GR_BLOCK_DETAIL_H
#include <gr_runtime_types.h>
+#include <gr_tpb_detail.h>
#include <stdexcept>
/*!
@@ -34,7 +35,6 @@
* of almost all users of GNU Radio. This decoupling also means that
* we can make changes to the guts without having to recompile everything.
*/
-
class gr_block_detail {
public:
~gr_block_detail ();
@@ -73,8 +73,14 @@ class gr_block_detail {
*/
void consume_each (int how_many_items);
+ /*!
+ * \brief Tell the scheduler \p how_many_items were produced on each output stream.
+ */
void produce_each (int how_many_items);
+
+ gr_tpb_detail d_tpb; // used by thread-per-block scheduler
+
// ----------------------------------------------------------------------------
private:
@@ -84,8 +90,11 @@ class gr_block_detail {
std::vector<gr_buffer_sptr> d_output;
bool d_done;
+
gr_block_detail (unsigned int ninputs, unsigned int noutputs);
+ friend class gr_tpb_detail;
+
friend gr_block_detail_sptr
gr_make_block_detail (unsigned int ninputs, unsigned int noutputs);
};
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
new file mode 100644
index 000000000..fd3a916d4
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
@@ -0,0 +1,329 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_block_executor.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+#include <gr_buffer.h>
+#include <boost/thread.hpp>
+#include <iostream>
+#include <limits>
+#include <assert.h>
+#include <stdio.h>
+
+// must be defined to either 0 or 1
+#define ENABLE_LOGGING 0
+
+#if (ENABLE_LOGGING)
+#define LOG(x) do { x; } while(0)
+#else
+#define LOG(x) do {;} while(0)
+#endif
+
+static int which_scheduler = 0;
+
+inline static unsigned int
+round_up (unsigned int n, unsigned int multiple)
+{
+ return ((n + multiple - 1) / multiple) * multiple;
+}
+
+inline static unsigned int
+round_down (unsigned int n, unsigned int multiple)
+{
+ return (n / multiple) * multiple;
+}
+
+//
+// Return minimum available write space in all our downstream buffers
+// or -1 if we're output blocked and the output we're blocked
+// on is done.
+//
+static int
+min_available_space (gr_block_detail *d, int output_multiple)
+{
+ int min_space = std::numeric_limits<int>::max();
+
+ for (int i = 0; i < d->noutputs (); i++){
+ gr_buffer::scoped_lock guard(*d->output(i)->mutex());
+#if 0
+ int n = round_down(d->output(i)->space_available(), output_multiple);
+#else
+ int n = round_down(std::min(d->output(i)->space_available(),
+ d->output(i)->bufsize()/2),
+ output_multiple);
+#endif
+ if (n == 0){ // We're blocked on output.
+ if (d->output(i)->done()){ // Downstream is done, therefore we're done.
+ return -1;
+ }
+ return 0;
+ }
+ min_space = std::min (min_space, n);
+ }
+ return min_space;
+}
+
+
+
+gr_block_executor::gr_block_executor (gr_block_sptr block)
+ : d_block(block), d_log(0)
+{
+ if (ENABLE_LOGGING){
+ char name[100];
+ snprintf(name, sizeof(name), "sst-%03d.log", which_scheduler++);
+ d_log = new std::ofstream(name);
+ std::unitbuf(*d_log); // make it unbuffered...
+ *d_log << "gr_block_executor: "
+ << d_block << std::endl;
+ }
+
+ d_block->start(); // enable any drivers, etc.
+}
+
+gr_block_executor::~gr_block_executor ()
+{
+ if (ENABLE_LOGGING)
+ delete d_log;
+
+ d_block->stop(); // stop any drivers, etc.
+}
+
+gr_block_executor::state
+gr_block_executor::run_one_iteration()
+{
+ int noutput_items;
+ int max_items_avail;
+
+ gr_block *m = d_block.get();
+ gr_block_detail *d = m->detail().get();
+
+ LOG(*d_log << std::endl << m);
+
+ if (d->done()){
+ assert(0);
+ return DONE;
+ }
+
+ if (d->source_p ()){
+ d_ninput_items_required.resize (0);
+ d_ninput_items.resize (0);
+ d_input_items.resize (0);
+ d_input_done.resize(0);
+ d_output_items.resize (d->noutputs ());
+
+ // determine the minimum available output space
+ noutput_items = min_available_space (d, m->output_multiple ());
+ LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
+ if (noutput_items == -1) // we're done
+ goto were_done;
+
+ if (noutput_items == 0){ // we're output blocked
+ LOG(*d_log << " BLKD_OUT\n");
+ return BLKD_OUT;
+ }
+
+ goto setup_call_to_work; // jump to common code
+ }
+
+ else if (d->sink_p ()){
+ d_ninput_items_required.resize (d->ninputs ());
+ d_ninput_items.resize (d->ninputs ());
+ d_input_items.resize (d->ninputs ());
+ d_input_done.resize(d->ninputs());
+ d_output_items.resize (0);
+ LOG(*d_log << " sink\n");
+
+ max_items_avail = 0;
+ for (int i = 0; i < d->ninputs (); i++){
+ {
+ /*
+ * Acquire the mutex and grab local copies of items_available and done.
+ */
+ gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+ d_ninput_items[i] = d->input(i)->items_available();
+ d_input_done[i] = d->input(i)->done();
+ }
+
+ LOG(*d_log << " d_ninput_items[" << i << "] = " << d_ninput_items[i] << std::endl);
+ LOG(*d_log << " d_input_done[" << i << "] = " << d_input_done[i] << std::endl);
+
+ if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
+ goto were_done;
+
+ max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+ }
+
+ // take a swag at how much output we can sink
+ noutput_items = (int) (max_items_avail * m->relative_rate ());
+ noutput_items = round_down (noutput_items, m->output_multiple ());
+ LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
+ LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
+
+ if (noutput_items == 0){ // we're blocked on input
+ LOG(*d_log << " BLKD_IN\n");
+ return BLKD_IN;
+ }
+
+ goto try_again; // Jump to code shared with regular case.
+ }
+
+ else {
+ // do the regular thing
+ d_ninput_items_required.resize (d->ninputs ());
+ d_ninput_items.resize (d->ninputs ());
+ d_input_items.resize (d->ninputs ());
+ d_input_done.resize(d->ninputs());
+ d_output_items.resize (d->noutputs ());
+
+ max_items_avail = 0;
+ for (int i = 0; i < d->ninputs (); i++){
+ {
+ /*
+ * Acquire the mutex and grab local copies of items_available and done.
+ */
+ gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+ d_ninput_items[i] = d->input(i)->items_available ();
+ d_input_done[i] = d->input(i)->done();
+ }
+ max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+ }
+
+ // determine the minimum available output space
+ noutput_items = min_available_space (d, m->output_multiple ());
+ if (ENABLE_LOGGING){
+ *d_log << " regular ";
+ if (m->relative_rate() >= 1.0)
+ *d_log << "1:" << m->relative_rate() << std::endl;
+ else
+ *d_log << 1.0/m->relative_rate() << ":1\n";
+ *d_log << " max_items_avail = " << max_items_avail << std::endl;
+ *d_log << " noutput_items = " << noutput_items << std::endl;
+ }
+ if (noutput_items == -1) // we're done
+ goto were_done;
+
+ if (noutput_items == 0){ // we're output blocked
+ LOG(*d_log << " BLKD_OUT\n");
+ return BLKD_OUT;
+ }
+
+ try_again:
+ if (m->fixed_rate()){
+ // try to work it forward starting with max_items_avail.
+ // We want to try to consume all the input we've got.
+ int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
+ reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
+ if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
+ noutput_items = reqd_noutput_items;
+ }
+
+ // ask the block how much input they need to produce noutput_items
+ m->forecast (noutput_items, d_ninput_items_required);
+
+ // See if we've got sufficient input available
+
+ int i;
+ for (i = 0; i < d->ninputs (); i++)
+ if (d_ninput_items_required[i] > d_ninput_items[i]) // not enough
+ break;
+
+ if (i < d->ninputs ()){ // not enough input on input[i]
+ // if we can, try reducing the size of our output request
+ if (noutput_items > m->output_multiple ()){
+ noutput_items /= 2;
+ noutput_items = round_up (noutput_items, m->output_multiple ());
+ goto try_again;
+ }
+
+ // We're blocked on input
+ LOG(*d_log << " BLKD_IN\n");
+ if (d_input_done[i]) // If the upstream block is done, we're done
+ goto were_done;
+
+ // Is it possible to ever fulfill this request?
+ if (d_ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
+ // Nope, never going to happen...
+ std::cerr << "\nsched: <gr_block " << m->name()
+ << " (" << m->unique_id() << ")>"
+ << " is requesting more input data\n"
+ << " than we can provide.\n"
+ << " ninput_items_required = "
+ << d_ninput_items_required[i] << "\n"
+ << " max_possible_items_available = "
+ << d->input(i)->max_possible_items_available() << "\n"
+ << " If this is a filter, consider reducing the number of taps.\n";
+ goto were_done;
+ }
+
+ return BLKD_IN;
+ }
+
+ // We've got enough data on each input to produce noutput_items.
+ // Finish setting up the call to work.
+
+ for (int i = 0; i < d->ninputs (); i++)
+ d_input_items[i] = d->input(i)->read_pointer();
+
+ setup_call_to_work:
+
+ for (int i = 0; i < d->noutputs (); i++)
+ d_output_items[i] = d->output(i)->write_pointer();
+
+ // Do the actual work of the block
+ int n = m->general_work (noutput_items, d_ninput_items,
+ d_input_items, d_output_items);
+ LOG(*d_log << " general_work: noutput_items = " << noutput_items
+ << " result = " << n << std::endl);
+
+ if (n == -1) // block is done
+ goto were_done;
+
+ d->produce_each (n); // advance write pointers
+ if (n > 0)
+ return READY;
+
+ // We didn't produce any output even though we called general_work.
+ // We have (most likely) consumed some input.
+
+ // If this is a source, it's broken.
+ if (d->source_p()){
+ std::cerr << "gr_block_executor: source " << m
+ << " returned 0 from work. We're marking it DONE.\n";
+ // FIXME maybe we ought to raise an exception...
+ goto were_done;
+ }
+
+ // Have the caller try again...
+ return READY_NO_OUTPUT;
+ }
+ assert (0);
+
+ were_done:
+ LOG(*d_log << " were_done\n");
+ d->set_done (true);
+ return DONE;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.h b/gnuradio-core/src/lib/runtime/gr_block_executor.h
new file mode 100644
index 000000000..41b5ede7c
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.h
@@ -0,0 +1,69 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_BLOCK_EXECUTOR_H
+#define INCLUDED_GR_BLOCK_EXECUTOR_H
+
+#include <gr_runtime_types.h>
+#include <fstream>
+
+//class gr_block_executor;
+//typedef boost::shared_ptr<gr_block_executor> gr_block_executor_sptr;
+
+
+/*!
+ * \brief Manage the execution of a single block.
+ * \ingroup internal
+ */
+
+class gr_block_executor {
+protected:
+ gr_block_sptr d_block; // The block we're trying to run
+ std::ofstream *d_log;
+
+ // These are allocated here so we don't have to on each iteration
+
+ gr_vector_int d_ninput_items_required;
+ gr_vector_int d_ninput_items;
+ gr_vector_const_void_star d_input_items;
+ std::vector<bool> d_input_done;
+ gr_vector_void_star d_output_items;
+
+ public:
+ gr_block_executor(gr_block_sptr block);
+ ~gr_block_executor ();
+
+ enum state {
+ READY, // We made progress; everything's cool.
+ READY_NO_OUTPUT, // We consumed some input, but produced no output.
+ BLKD_IN, // no progress; we're blocked waiting for input data.
+ BLKD_OUT, // no progress; we're blocked waiting for output buffer space.
+ DONE, // we're done; don't call me again.
+ };
+
+ /*
+ * \brief Run one iteration.
+ */
+ state run_one_iteration();
+};
+
+#endif /* INCLUDED_GR_BLOCK_EXECUTOR_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.cc b/gnuradio-core/src/lib/runtime/gr_buffer.cc
index 77f0c7c43..31a471ea7 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.cc
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.cc
@@ -77,10 +77,10 @@ minimum_buffer_items (long type_size, long page_size)
}
-gr_buffer::gr_buffer (int nitems, size_t sizeof_item)
+gr_buffer::gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link)
: d_base (0), d_bufsize (0), d_vmcircbuf (0),
- d_sizeof_item (sizeof_item), d_write_index (0),
- d_done (false)
+ d_sizeof_item (sizeof_item), d_link(link),
+ d_write_index (0), d_done (false)
{
if (!allocate_buffer (nitems, sizeof_item))
throw std::bad_alloc ();
@@ -89,9 +89,9 @@ gr_buffer::gr_buffer (int nitems, size_t sizeof_item)
}
gr_buffer_sptr
-gr_make_buffer (int nitems, size_t sizeof_item)
+gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link)
{
- return gr_buffer_sptr (new gr_buffer (nitems, sizeof_item));
+ return gr_buffer_sptr (new gr_buffer (nitems, sizeof_item, link));
}
gr_buffer::~gr_buffer ()
@@ -146,7 +146,7 @@ gr_buffer::allocate_buffer (int nitems, size_t sizeof_item)
int
-gr_buffer::space_available () const
+gr_buffer::space_available ()
{
if (d_readers.empty ())
return d_bufsize - 1; // See comment below
@@ -175,18 +175,27 @@ gr_buffer::write_pointer ()
void
gr_buffer::update_write_pointer (int nitems)
{
+ scoped_lock guard(*mutex());
d_write_index = index_add (d_write_index, nitems);
}
+void
+gr_buffer::set_done (bool done)
+{
+ scoped_lock guard(*mutex());
+ d_done = done;
+}
+
gr_buffer_reader_sptr
-gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload)
+gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link)
{
if (nzero_preload < 0)
throw std::invalid_argument("gr_buffer_add_reader: nzero_preload must be >= 0");
gr_buffer_reader_sptr r (new gr_buffer_reader (buf,
buf->index_sub(buf->d_write_index,
- nzero_preload)));
+ nzero_preload),
+ link));
buf->d_readers.push_back (r.get ());
return r;
@@ -214,8 +223,9 @@ gr_buffer_ncurrently_allocated ()
// ----------------------------------------------------------------------------
-gr_buffer_reader::gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index)
- : d_buffer (buffer), d_read_index (read_index)
+gr_buffer_reader::gr_buffer_reader(gr_buffer_sptr buffer, unsigned int read_index,
+ gr_block_sptr link)
+ : d_buffer(buffer), d_read_index(read_index), d_link(link)
{
s_buffer_reader_count++;
}
@@ -241,6 +251,7 @@ gr_buffer_reader::read_pointer ()
void
gr_buffer_reader::update_read_pointer (int nitems)
{
+ scoped_lock guard(*mutex());
d_read_index = d_buffer->index_add (d_read_index, nitems);
}
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.h b/gnuradio-core/src/lib/runtime/gr_buffer.h
index cf578c89d..75063cc6a 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.h
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.h
@@ -24,6 +24,8 @@
#define INCLUDED_GR_BUFFER_H
#include <gr_runtime_types.h>
+#include <boost/weak_ptr.hpp>
+#include <boost/thread.hpp>
class gr_vmcircbuf;
@@ -33,8 +35,12 @@ class gr_vmcircbuf;
* The total size of the buffer will be rounded up to a system
* dependent boundary. This is typically the system page size, but
* under MS windows is 64KB.
+ *
+ * \param nitems is the minimum number of items the buffer will hold.
+ * \param sizeof_item is the size of an item in bytes.
+ * \param link is the block that writes to this buffer.
*/
-gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item);
+gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link=gr_block_sptr());
/*!
@@ -43,12 +49,20 @@ gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item);
*/
class gr_buffer {
public:
+
+ typedef boost::unique_lock<boost::mutex> scoped_lock;
+
virtual ~gr_buffer ();
/*!
* \brief return number of items worth of space available for writing
*/
- int space_available () const;
+ int space_available ();
+
+ /*!
+ * \brief return size of this buffer in items
+ */
+ int bufsize() const { return d_bufsize; }
/*!
* \brief return pointer to write buffer.
@@ -63,17 +77,26 @@ class gr_buffer {
*/
void update_write_pointer (int nitems);
-
- void set_done (bool done) { d_done = done; }
+ void set_done (bool done);
bool done () const { return d_done; }
+ /*!
+ * \brief Return the block that writes to this buffer.
+ */
+ gr_block_sptr link() { return gr_block_sptr(d_link); }
+
+ size_t nreaders() const { return d_readers.size(); }
+ gr_buffer_reader* reader(size_t index) { return d_readers[index]; }
+
+ boost::mutex *mutex() { return &d_mutex; }
+
// -------------------------------------------------------------------------
private:
friend class gr_buffer_reader;
- friend gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item);
- friend gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload);
+ friend gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link);
+ friend gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link);
protected:
char *d_base; // base address of buffer
@@ -81,8 +104,14 @@ class gr_buffer {
private:
gr_vmcircbuf *d_vmcircbuf;
size_t d_sizeof_item; // in bytes
- unsigned int d_write_index; // in items [0,d_bufsize)
std::vector<gr_buffer_reader *> d_readers;
+ boost::weak_ptr<gr_block> d_link; // block that writes to this buffer
+
+ //
+ // The mutex protects d_write_index, d_done and the d_read_index's in the buffer readers.
+ //
+ boost::mutex d_mutex;
+ unsigned int d_write_index; // in items [0,d_bufsize)
bool d_done;
unsigned
@@ -116,11 +145,15 @@ class gr_buffer {
*
* Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
*
+ * \param nitems is the minimum number of items the buffer will hold.
+ * \param sizeof_item is the size of an item in bytes.
+ * \param link is the block that writes to this buffer.
+ *
* The total size of the buffer will be rounded up to a system
* dependent boundary. This is typically the system page size, but
* under MS windows is 64KB.
*/
- gr_buffer (int nitems, size_t sizeof_item);
+ gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link);
/*!
* \brief disassociate \p reader from this buffer
@@ -132,8 +165,10 @@ class gr_buffer {
/*!
* \brief create a new gr_buffer_reader and attach it to buffer \p buf
* \param nzero_preload -- number of zero items to "preload" into buffer.
+ * \param link is the block that reads from the buffer using this gr_buffer_reader.
*/
-gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload);
+gr_buffer_reader_sptr
+gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link=gr_block_sptr());
//! returns # of gr_buffers currently allocated
long gr_buffer_ncurrently_allocated ();
@@ -147,8 +182,10 @@ long gr_buffer_ncurrently_allocated ();
*/
class gr_buffer_reader {
-
public:
+
+ typedef gr_buffer::scoped_lock scoped_lock;
+
~gr_buffer_reader ();
/*!
@@ -183,19 +220,29 @@ class gr_buffer_reader {
void set_done (bool done) { d_buffer->set_done (done); }
bool done () const { return d_buffer->done (); }
+ boost::mutex *mutex() { return d_buffer->mutex(); }
+
+
+ /*!
+ * \brief Return the block that reads via this reader.
+ */
+ gr_block_sptr link() { return gr_block_sptr(d_link); }
+
// -------------------------------------------------------------------------
private:
friend class gr_buffer;
- friend gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload);
+ friend gr_buffer_reader_sptr
+ gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link);
gr_buffer_sptr d_buffer;
unsigned int d_read_index; // in items [0,d->buffer.d_bufsize)
+ boost::weak_ptr<gr_block> d_link; // block that reads via this buffer reader
//! constructor is private. Use gr_buffer::add_reader to create instances
- gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index);
+ gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index, gr_block_sptr link);
};
//! returns # of gr_buffer_readers currently allocated
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.i b/gnuradio-core/src/lib/runtime/gr_buffer.i
index 38e1d945d..4c1c5afae 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.i
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.i
@@ -26,14 +26,14 @@ typedef boost::shared_ptr<gr_buffer> gr_buffer_sptr;
%rename(buffer) gr_make_buffer;
%ignore gr_buffer;
-gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item);
+gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link);
class gr_buffer {
public:
~gr_buffer ();
private:
- gr_buffer (int nitems, size_t sizeof_item);
+ gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link);
};
@@ -43,7 +43,7 @@ typedef boost::shared_ptr<gr_buffer_reader> gr_buffer_reader_sptr;
%ignore gr_buffer_reader;
%rename(buffer_add_reader) gr_buffer_add_reader;
-gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload);
+gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link);
class gr_buffer_reader {
public:
@@ -51,7 +51,7 @@ class gr_buffer_reader {
private:
friend class gr_buffer;
- gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index);
+ gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index, gr_block_sptr link);
};
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
index aa1aa8353..031eb6dfd 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
@@ -33,6 +33,11 @@
#define GR_FLAT_FLOWGRAPH_DEBUG 0
+// 32Kbyte buffer size between blocks
+#define GR_FIXED_BUFFER_SIZE (32*(1L<<10))
+
+static const unsigned int s_fixed_buffer_size = GR_FIXED_BUFFER_SIZE;
+
gr_flat_flowgraph_sptr
gr_make_flat_flowgraph()
{
@@ -54,7 +59,7 @@ gr_flat_flowgraph::setup_connections()
// Assign block details to blocks
for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++)
- make_gr_block_sptr(*p)->set_detail(allocate_block_detail(*p));
+ cast_to_block_sptr(*p)->set_detail(allocate_block_detail(*p));
// Connect inputs to outputs for each block
for(gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++)
@@ -84,11 +89,15 @@ gr_flat_flowgraph::allocate_block_detail(gr_basic_block_sptr block)
gr_buffer_sptr
gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port)
{
- gr_block_sptr grblock = make_gr_block_sptr(block);
+ gr_block_sptr grblock = cast_to_block_sptr(block);
if (!grblock)
throw std::runtime_error("allocate_buffer found non-gr_block");
int item_size = block->output_signature()->sizeof_stream_item(port);
- int nitems = s_fixed_buffer_size/item_size;
+
+ // *2 because we're now only filling them 1/2 way in order to
+ // increase the available parallelism when using the TPB scheduler.
+ // (We're double buffering, where we used to single buffer)
+ int nitems = s_fixed_buffer_size * 2 / item_size;
// Make sure there are at least twice the output_multiple no. of items
if (nitems < 2*grblock->output_multiple()) // Note: this means output_multiple()
@@ -99,7 +108,7 @@ gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port)
gr_basic_block_vector_t blocks = calc_downstream_blocks(block, port);
for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
- gr_block_sptr dgrblock = make_gr_block_sptr(*p);
+ gr_block_sptr dgrblock = cast_to_block_sptr(*p);
if (!dgrblock)
throw std::runtime_error("allocate_buffer found non-gr_block");
@@ -109,13 +118,13 @@ gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port)
nitems = std::max(nitems, static_cast<int>(2*(decimation*multiple+history)));
}
- return gr_make_buffer(nitems, item_size);
+ return gr_make_buffer(nitems, item_size, grblock);
}
void
gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block)
{
- gr_block_sptr grblock = make_gr_block_sptr(block);
+ gr_block_sptr grblock = cast_to_block_sptr(block);
if (!grblock)
throw std::runtime_error("connect_block_inputs found non-gr_block");
@@ -130,7 +139,7 @@ gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block)
int dst_port = e->dst().port();
int src_port = e->src().port();
gr_basic_block_sptr src_block = e->src().block();
- gr_block_sptr src_grblock = make_gr_block_sptr(src_block);
+ gr_block_sptr src_grblock = cast_to_block_sptr(src_block);
if (!src_grblock)
throw std::runtime_error("connect_block_inputs found non-gr_block");
gr_buffer_sptr src_buffer = src_grblock->detail()->output(src_port);
@@ -138,7 +147,7 @@ gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block)
if (GR_FLAT_FLOWGRAPH_DEBUG)
std::cout << "Setting input " << dst_port << " from edge " << (*e) << std::endl;
- detail->set_input(dst_port, gr_buffer_add_reader(src_buffer, grblock->history()-1));
+ detail->set_input(dst_port, gr_buffer_add_reader(src_buffer, grblock->history()-1, grblock));
}
}
@@ -149,7 +158,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
// by flattening will need one; existing blocks still in the new flowgraph will
// already have one.
for (gr_basic_block_viter_t p = d_blocks.begin(); p != d_blocks.end(); p++) {
- gr_block_sptr block = make_gr_block_sptr(*p);
+ gr_block_sptr block = cast_to_block_sptr(*p);
if (!block->detail()) {
if (GR_FLAT_FLOWGRAPH_DEBUG)
@@ -177,7 +186,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
if (GR_FLAT_FLOWGRAPH_DEBUG)
std::cout << "not in new edge list" << std::endl;
// zero the buffer reader on RHS of old edge
- gr_block_sptr block(make_gr_block_sptr(old_edge->dst().block()));
+ gr_block_sptr block(cast_to_block_sptr(old_edge->dst().block()));
int port = old_edge->dst().port();
block->detail()->set_input(port, gr_buffer_reader_sptr());
}
@@ -189,7 +198,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
// Now connect inputs to outputs, reusing old buffer readers if they exist
for (gr_basic_block_viter_t p = d_blocks.begin(); p != d_blocks.end(); p++) {
- gr_block_sptr block = make_gr_block_sptr(*p);
+ gr_block_sptr block = cast_to_block_sptr(*p);
if (GR_FLAT_FLOWGRAPH_DEBUG)
std::cout << "merge: merging " << (*p) << "...";
@@ -208,7 +217,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
gr_edge edge = calc_upstream_edge(*p, i);
// Fish out old buffer reader and see if it matches correct buffer from edge list
- gr_block_sptr src_block = make_gr_block_sptr(edge.src().block());
+ gr_block_sptr src_block = cast_to_block_sptr(edge.src().block());
gr_block_detail_sptr src_detail = src_block->detail();
gr_buffer_sptr src_buffer = src_detail->output(edge.src().port());
gr_buffer_reader_sptr old_reader;
@@ -225,7 +234,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
std::cout << "needs a new reader" << std::endl;
// Create new buffer reader and assign
- detail->set_input(i, gr_buffer_add_reader(src_buffer, block->history()-1));
+ detail->set_input(i, gr_buffer_add_reader(src_buffer, block->history()-1, block));
}
}
}
@@ -248,7 +257,7 @@ void gr_flat_flowgraph::dump()
for (gr_basic_block_viter_t p = d_blocks.begin(); p != d_blocks.end(); p++) {
std::cout << " block: " << (*p) << std::endl;
- gr_block_detail_sptr detail = make_gr_block_sptr(*p)->detail();
+ gr_block_detail_sptr detail = cast_to_block_sptr(*p)->detail();
std::cout << " detail @" << detail << ":" << std::endl;
int ni = detail->ninputs();
@@ -269,3 +278,14 @@ void gr_flat_flowgraph::dump()
}
}
+
+gr_block_vector_t
+gr_flat_flowgraph::make_block_vector(gr_basic_block_vector_t &blocks)
+{
+ gr_block_vector_t result;
+ for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
+ result.push_back(cast_to_block_sptr(*p));
+ }
+
+ return result;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
index 184ee4514..673c4df16 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
@@ -26,9 +26,6 @@
#include <gr_flowgraph.h>
#include <gr_block.h>
-// 32Kbyte buffer size between blocks
-#define GR_FIXED_BUFFER_SIZE (32*(1L<<10))
-
// Create a shared pointer to a heap allocated gr_flat_flowgraph
// (types defined in gr_runtime_types.h)
gr_flat_flowgraph_sptr gr_make_flat_flowgraph();
@@ -55,10 +52,14 @@ public:
void dump();
+ /*!
+ * Make a vector of gr_block from a vector of gr_basic_block
+ */
+ static gr_block_vector_t make_block_vector(gr_basic_block_vector_t &blocks);
+
private:
gr_flat_flowgraph();
- static const unsigned int s_fixed_buffer_size = GR_FIXED_BUFFER_SIZE;
gr_block_detail_sptr allocate_block_detail(gr_basic_block_sptr block);
gr_buffer_sptr allocate_buffer(gr_basic_block_sptr block, int port);
void connect_block_inputs(gr_basic_block_sptr block);
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
index c97a50782..fc407e72b 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
@@ -122,6 +122,9 @@ public:
// Return vector of connected blocks
gr_basic_block_vector_t calc_used_blocks();
+ // Return toplogically sorted vector of blocks. All the sources come first.
+ gr_basic_block_vector_t topological_sort(gr_basic_block_vector_t &blocks);
+
// Return vector of vectors of disjointly connected blocks, topologically
// sorted.
std::vector<gr_basic_block_vector_t> partition();
@@ -149,7 +152,6 @@ private:
gr_basic_block_vector_t calc_reachable_blocks(gr_basic_block_sptr block, gr_basic_block_vector_t &blocks);
void reachable_dfs_visit(gr_basic_block_sptr block, gr_basic_block_vector_t &blocks);
gr_basic_block_vector_t calc_adjacent_blocks(gr_basic_block_sptr block, gr_basic_block_vector_t &blocks);
- gr_basic_block_vector_t topological_sort(gr_basic_block_vector_t &blocks);
gr_basic_block_vector_t sort_sources_first(gr_basic_block_vector_t &blocks);
bool source_p(gr_basic_block_sptr block);
void topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_vector_t &output);
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
index 32cac2ea8..a026851d2 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
@@ -303,7 +303,7 @@ gr_hier_block2_detail::resolve_endpoint(const gr_endpoint &endp, bool is_input)
std::stringstream msg;
// Check if endpoint is a leaf node
- if (make_gr_block_sptr(endp.block()))
+ if (cast_to_block_sptr(endp.block()))
return endp;
// Check if endpoint is a hierarchical block
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler.cc b/gnuradio-core/src/lib/runtime/gr_scheduler.cc
new file mode 100644
index 000000000..e4d8b3dd9
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler.cc
@@ -0,0 +1,33 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_scheduler.h>
+
+gr_scheduler::gr_scheduler(gr_flat_flowgraph_sptr ffg)
+{
+}
+
+gr_scheduler::~gr_scheduler()
+{
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler.h b/gnuradio-core/src/lib/runtime/gr_scheduler.h
new file mode 100644
index 000000000..13bc1ff14
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler.h
@@ -0,0 +1,64 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef INCLUDED_GR_SCHEDULER_H
+#define INCLUDED_GR_SCHEDULER_H
+
+#include <boost/utility.hpp>
+#include <gr_block.h>
+#include <gr_flat_flowgraph.h>
+
+
+class gr_scheduler;
+typedef boost::shared_ptr<gr_scheduler> gr_scheduler_sptr;
+
+
+/*!
+ * \brief Abstract scheduler that takes a flattened flow graph and runs it.
+ *
+ * Preconditions: details, buffers and buffer readers have been assigned.
+ */
+class gr_scheduler : boost::noncopyable
+{
+
+public:
+ /*!
+ * \brief Construct a scheduler and begin evaluating the graph.
+ *
+ * The scheduler will continue running until all blocks until they
+ * report that they are done or the stop method is called.
+ */
+ gr_scheduler(gr_flat_flowgraph_sptr ffg);
+
+ virtual ~gr_scheduler();
+
+ /*!
+ * \brief Tell the scheduler to stop executing.
+ */
+ virtual void stop() = 0;
+
+ /*!
+ * \brief Block until the graph is done.
+ */
+ virtual void wait() = 0;
+};
+
+#endif /* INCLUDED_GR_SCHEDULER_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc
new file mode 100644
index 000000000..fefc0dc70
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc
@@ -0,0 +1,87 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_scheduler_sts.h>
+#include <gr_single_threaded_scheduler.h>
+#include <gruel/thread_body_wrapper.h>
+
+class sts_container
+{
+ gr_block_vector_t d_blocks;
+
+public:
+
+ sts_container(gr_block_vector_t blocks)
+ : d_blocks(blocks) {}
+
+ void operator()()
+ {
+ gr_make_single_threaded_scheduler(d_blocks)->run();
+ }
+};
+
+
+gr_scheduler_sptr
+gr_scheduler_sts::make(gr_flat_flowgraph_sptr ffg)
+{
+ return gr_scheduler_sptr(new gr_scheduler_sts(ffg));
+}
+
+gr_scheduler_sts::gr_scheduler_sts(gr_flat_flowgraph_sptr ffg)
+ : gr_scheduler(ffg)
+{
+ // Split the flattened flow graph into discrete partitions, each
+ // of which is topologically sorted.
+
+ std::vector<gr_basic_block_vector_t> graphs = ffg->partition();
+
+ // For each partition, create a thread to evaluate it using
+ // an instance of the gr_single_threaded_scheduler
+
+ for (std::vector<gr_basic_block_vector_t>::iterator p = graphs.begin();
+ p != graphs.end(); p++) {
+
+ gr_block_vector_t blocks = gr_flat_flowgraph::make_block_vector(*p);
+ d_threads.create_thread(
+ gruel::thread_body_wrapper<sts_container>(sts_container(blocks),
+ "single-threaded-scheduler"));
+ }
+}
+
+gr_scheduler_sts::~gr_scheduler_sts()
+{
+ stop();
+}
+
+void
+gr_scheduler_sts::stop()
+{
+ d_threads.interrupt_all();
+}
+
+void
+gr_scheduler_sts::wait()
+{
+ d_threads.join_all();
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h
new file mode 100644
index 000000000..4cf835156
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h
@@ -0,0 +1,62 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_SCHEDULER_STS_H
+#define INCLUDED_GR_SCHEDULER_STS_H
+
+#include <gr_scheduler.h>
+#include <gruel/thread_group.h>
+
+/*!
+ * \brief Concrete scheduler that uses the single_threaded_scheduler
+ */
+class gr_scheduler_sts : public gr_scheduler
+{
+ gruel::thread_group d_threads;
+
+protected:
+ /*!
+ * \brief Construct a scheduler and begin evaluating the graph.
+ *
+ * The scheduler will continue running until all blocks until they
+ * report that they are done or the stop method is called.
+ */
+ gr_scheduler_sts(gr_flat_flowgraph_sptr ffg);
+
+public:
+ static gr_scheduler_sptr make(gr_flat_flowgraph_sptr ffg);
+
+ ~gr_scheduler_sts();
+
+ /*!
+ * \brief Tell the scheduler to stop executing.
+ */
+ void stop();
+
+ /*!
+ * \brief Block until the graph is done.
+ */
+ void wait();
+};
+
+
+
+
+#endif /* INCLUDED_GR_SCHEDULER_STS_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc
deleted file mode 100644
index 07bd60500..000000000
--- a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc
+++ /dev/null
@@ -1,110 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING. If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include <gr_scheduler_thread.h>
-#include <iostream>
-#include <stdio.h>
-
-#ifdef HAVE_SIGNAL_H
-#include <signal.h>
-#endif
-
-#define GR_SCHEDULER_THREAD_DEBUG 0
-
-gr_scheduler_thread::gr_scheduler_thread(gr_block_vector_t graph) :
- omni_thread(NULL, PRIORITY_NORMAL),
- d_sts(gr_make_single_threaded_scheduler(graph))
-{
-}
-
-gr_scheduler_thread::~gr_scheduler_thread()
-{
-}
-
-void gr_scheduler_thread::start()
-{
- if (GR_SCHEDULER_THREAD_DEBUG)
- std::cout << "gr_scheduler_thread::start() "
- << this << std::endl;
- start_undetached();
-}
-
-void *
-gr_scheduler_thread::run_undetached(void *arg)
-{
- // This is the first code to run in the new thread context.
-
- /*
- * In general, on a *nix system, any thread of a process can receive
- * any asynchronous signal.
- *
- * http://www.serpentine.com/blog/threads-faq/mixing-threads-and-signals-unix/
- * http://www.linuxjournal.com/article/2121
- *
- * We really don't want to be handling asynchronous signals such
- * as SIGINT and SIGHUP here. We mask them off in the signal
- * processing threads so that they'll get handled by the mainline
- * thread. We leave the synchronous signals SIGQUIT, SIGBUS,
- * SIGILL, SIGSEGV etc alone
- *
- * FIXME? It might be better to mask them all off in the parent
- * thread then dedicate a single thread to handling all signals
- * using sigwait.
- */
-#if defined(HAVE_PTHREAD_SIGMASK) || defined(HAVE_SIGPROCMASK)
- sigset_t old_set;
- sigset_t new_set;
- int r;
- sigemptyset(&new_set);
- sigaddset(&new_set, SIGINT);
- sigaddset(&new_set, SIGHUP);
- sigaddset(&new_set, SIGPIPE);
- sigaddset(&new_set, SIGALRM);
- sigaddset(&new_set, SIGCHLD);
-
-#ifdef HAVE_PTHREAD_SIGMASK
- r = pthread_sigmask(SIG_BLOCK, &new_set, &old_set);
- if (r != 0)
- perror("pthread_sigmask");
-#else
- r = sigprocmask(SIG_BLOCK, &new_set, &old_set);
- if (r != 0)
- perror("sigprocmask");
-#endif
-#endif
- // Run the single-threaded scheduler
- d_sts->run();
- return 0;
-}
-
-void
-gr_scheduler_thread::stop()
-{
- if (0 && GR_SCHEDULER_THREAD_DEBUG) // FIXME not safe to call from signal handler
- std::cout << "gr_scheduler_thread::stop() "
- << this << std::endl;
- d_sts->stop();
-}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.h b/gnuradio-core/src/lib/runtime/gr_scheduler_thread.h
deleted file mode 100644
index 89daba403..000000000
--- a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING. If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifndef INCLUDED_GR_SCHEDULER_THREAD_H
-#define INCLUDED_GR_SCHEDULER_THREAD_H
-
-#include <omnithread.h>
-#include <gr_single_threaded_scheduler.h>
-#include <gr_block.h>
-
-// omnithread calls delete on itself after thread exits, so can't use shared ptr
-class gr_scheduler_thread;
-typedef std::vector<gr_scheduler_thread *> gr_scheduler_thread_vector_t;
-typedef gr_scheduler_thread_vector_t::iterator gr_scheduler_thread_viter_t;
-
-/*!
- *\brief A single thread of execution for the scheduler
- *
- * \ingroup internal
- * This class implements a single thread that runs undetached, and
- * invokes the single-threaded block scheduler. The runtime makes
- * one of these for each distinct partition of a flowgraph and runs
- * them in parallel.
- *
- */
-class gr_scheduler_thread : public omni_thread
-{
-private:
- gr_single_threaded_scheduler_sptr d_sts;
-
-public:
- gr_scheduler_thread(gr_block_vector_t graph);
- ~gr_scheduler_thread();
-
- virtual void *run_undetached(void *arg);
- void start();
- void stop();
-};
-
-#endif /* INCLUDED_GR_SCHEDULER_THREAD_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
new file mode 100644
index 000000000..af0338570
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
@@ -0,0 +1,95 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_scheduler_tpb.h>
+#include <gr_tpb_thread_body.h>
+#include <gruel/thread_body_wrapper.h>
+#include <sstream>
+
+/*
+ * You know, a lambda expression would be sooo much easier...
+ */
+class tpb_container
+{
+ gr_block_sptr d_block;
+
+public:
+ tpb_container(gr_block_sptr block) : d_block(block) {}
+
+ void operator()()
+ {
+ gr_tpb_thread_body body(d_block);
+ }
+};
+
+
+gr_scheduler_sptr
+gr_scheduler_tpb::make(gr_flat_flowgraph_sptr ffg)
+{
+ return gr_scheduler_sptr(new gr_scheduler_tpb(ffg));
+}
+
+gr_scheduler_tpb::gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg)
+ : gr_scheduler(ffg)
+{
+ // Get a topologically sorted vector of all the blocks in use.
+ // Being topologically sorted probably isn't going to matter, but
+ // there's a non-zero chance it might help...
+
+ gr_basic_block_vector_t used_blocks = ffg->calc_used_blocks();
+ used_blocks = ffg->topological_sort(used_blocks);
+ gr_block_vector_t blocks = gr_flat_flowgraph::make_block_vector(used_blocks);
+
+ // Ensure that the done flag is clear on all blocks
+
+ for (size_t i = 0; i < blocks.size(); i++){
+ blocks[i]->detail()->set_done(false);
+ }
+
+ // Fire off a thead for each block
+
+ for (size_t i = 0; i < blocks.size(); i++){
+ std::stringstream name;
+ name << "thread-per-block[" << i << "]: " << blocks[i];
+ d_threads.create_thread(
+ gruel::thread_body_wrapper<tpb_container>(tpb_container(blocks[i]), name.str()));
+ }
+}
+
+gr_scheduler_tpb::~gr_scheduler_tpb()
+{
+ stop();
+}
+
+void
+gr_scheduler_tpb::stop()
+{
+ d_threads.interrupt_all();
+}
+
+void
+gr_scheduler_tpb::wait()
+{
+ d_threads.join_all();
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h
new file mode 100644
index 000000000..16a0c0204
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h
@@ -0,0 +1,60 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_SCHEDULER_TPB_H
+#define INCLUDED_GR_SCHEDULER_TPB_H
+
+#include <gr_scheduler.h>
+#include <gruel/thread_group.h>
+
+/*!
+ * \brief Concrete scheduler that uses a kernel thread-per-block
+ */
+class gr_scheduler_tpb : public gr_scheduler
+{
+ gruel::thread_group d_threads;
+
+protected:
+ /*!
+ * \brief Construct a scheduler and begin evaluating the graph.
+ *
+ * The scheduler will continue running until all blocks until they
+ * report that they are done or the stop method is called.
+ */
+ gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg);
+
+public:
+ static gr_scheduler_sptr make(gr_flat_flowgraph_sptr ffg);
+
+ ~gr_scheduler_tpb();
+
+ /*!
+ * \brief Tell the scheduler to stop executing.
+ */
+ void stop();
+
+ /*!
+ * \brief Block until the graph is done.
+ */
+ void wait();
+};
+
+
+#endif /* INCLUDED_GR_SCHEDULER_TPB_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc b/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc
index b2fbdb73b..7f1b40641 100644
--- a/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc
+++ b/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc
@@ -28,6 +28,7 @@
#include <gr_block.h>
#include <gr_block_detail.h>
#include <gr_buffer.h>
+#include <boost/thread.hpp>
#include <iostream>
#include <limits>
#include <assert.h>
@@ -44,14 +45,6 @@
static int which_scheduler = 0;
-
-std::ostream&
-operator << (std::ostream& os, const gr_block *m)
-{
- os << "<gr_block " << m->name() << " (" << m->unique_id() << ")>";
- return os;
-}
-
gr_single_threaded_scheduler_sptr
gr_make_single_threaded_scheduler (const std::vector<gr_block_sptr> &blocks)
{
@@ -162,6 +155,9 @@ gr_single_threaded_scheduler::main_loop ()
nalive = d_blocks.size ();
while (d_enabled && nalive > 0){
+ if (boost::this_thread::interruption_requested())
+ break;
+
gr_block *m = d_blocks[bi].get ();
gr_block_detail *d = m->detail().get ();
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block.cc b/gnuradio-core/src/lib/runtime/gr_top_block.cc
index 3c8e28f70..09e46dfbb 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_top_block.cc
@@ -27,7 +27,6 @@
#include <unistd.h>
#include <gr_top_block.h>
#include <gr_top_block_impl.h>
-#include <gr_top_block_impl_sts.h>
#include <gr_io_signature.h>
#include <iostream>
@@ -43,7 +42,7 @@ gr_top_block::gr_top_block(const std::string &name)
gr_make_io_signature(0,0,0))
{
- d_impl = new gr_top_block_impl_sts(this);
+ d_impl = new gr_top_block_impl(this);
}
gr_top_block::~gr_top_block()
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc b/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
index 591437938..50d480d00 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
+++ b/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
@@ -27,21 +27,58 @@
#include <gr_top_block.h>
#include <gr_top_block_impl.h>
#include <gr_flat_flowgraph.h>
-#include <gr_scheduler_thread.h>
-#include <gr_local_sighandler.h>
+#include <gr_scheduler_sts.h>
+#include <gr_scheduler_tpb.h>
#include <stdexcept>
#include <iostream>
#include <string.h>
#include <unistd.h>
+#include <stdlib.h>
#define GR_TOP_BLOCK_IMPL_DEBUG 0
+
+typedef gr_scheduler_sptr (*scheduler_maker)(gr_flat_flowgraph_sptr ffg);
+
+static struct scheduler_table {
+ const char *name;
+ scheduler_maker f;
+} scheduler_table[] = {
+ { "TPB", gr_scheduler_tpb::make }, // first entry is default
+ { "STS", gr_scheduler_sts::make }
+};
+
+static gr_scheduler_sptr
+make_scheduler(gr_flat_flowgraph_sptr ffg)
+{
+ static scheduler_maker factory = 0;
+
+ if (factory == 0){
+ char *v = getenv("GR_SCHEDULER");
+ if (!v)
+ factory = scheduler_table[0].f; // use default
+ else {
+ for (size_t i = 0; i < sizeof(scheduler_table)/sizeof(scheduler_table[0]); i++){
+ if (strcmp(v, scheduler_table[i].name) == 0){
+ factory = scheduler_table[i].f;
+ break;
+ }
+ }
+ if (factory == 0){
+ std::cerr << "warning: Invalid GR_SCHEDULER environment variable value \""
+ << v << "\". Using \"" << scheduler_table[0].name << "\"\n";
+ factory = scheduler_table[0].f;
+ }
+ }
+ }
+ return factory(ffg);
+}
+
+
gr_top_block_impl::gr_top_block_impl(gr_top_block *owner)
- : d_owner(owner),
- d_running(false),
- d_ffg(),
- d_lock_count(0)
+ : d_owner(owner), d_ffg(),
+ d_state(IDLE), d_lock_count(0)
{
}
@@ -53,14 +90,13 @@ gr_top_block_impl::~gr_top_block_impl()
void
gr_top_block_impl::start()
{
- if (GR_TOP_BLOCK_IMPL_DEBUG)
- std::cout << "start: entered " << this << std::endl;
+ gr_lock_guard l(d_mutex);
- if (d_running)
+ if (d_state != IDLE)
throw std::runtime_error("top_block::start: top block already running or wait() not called after previous stop()");
if (d_lock_count > 0)
- throw std::runtime_error("top_block::start: can't call start with flow graph locked");
+ throw std::runtime_error("top_block::start: can't start with flow graph locked");
// Create new flat flow graph by flattening hierarchy
d_ffg = d_owner->flatten();
@@ -69,77 +105,71 @@ gr_top_block_impl::start()
d_ffg->validate();
d_ffg->setup_connections();
- // Execute scheduler threads
- start_threads();
- d_running = true;
+ d_scheduler = make_scheduler(d_ffg);
+ d_state = RUNNING;
}
+void
+gr_top_block_impl::stop()
+{
+ if (d_scheduler)
+ d_scheduler->stop();
+}
+
+
+void
+gr_top_block_impl::wait()
+{
+ if (d_scheduler)
+ d_scheduler->wait();
+
+ d_state = IDLE;
+}
// N.B. lock() and unlock() cannot be called from a flow graph thread or
// deadlock will occur when reconfiguration happens
void
gr_top_block_impl::lock()
{
- omni_mutex_lock lock(d_reconf);
+ gr_lock_guard lock(d_mutex);
d_lock_count++;
- if (GR_TOP_BLOCK_IMPL_DEBUG)
- std::cout << "runtime: locked, count = " << d_lock_count << std::endl;
}
void
gr_top_block_impl::unlock()
{
- omni_mutex_lock lock(d_reconf);
+ gr_lock_guard lock(d_mutex);
+
if (d_lock_count <= 0){
d_lock_count = 0; // fix it, then complain
throw std::runtime_error("unpaired unlock() call");
}
d_lock_count--;
- if (GR_TOP_BLOCK_IMPL_DEBUG)
- std::cout << "unlock: unlocked, count = " << d_lock_count << std::endl;
+ if (d_lock_count > 0 || d_state == IDLE) // nothing to do
+ return;
- if (d_lock_count == 0) {
- if (GR_TOP_BLOCK_IMPL_DEBUG)
- std::cout << "unlock: restarting flowgraph" << std::endl;
- restart();
- }
+ restart();
}
+/*
+ * restart is called with d_mutex held
+ */
void
gr_top_block_impl::restart()
{
- if (GR_TOP_BLOCK_IMPL_DEBUG)
- std::cout << "restart: entered" << std::endl;
-
- if (!d_running)
- return; // nothing to do
-
- // Stop scheduler threads and wait for completion
- stop();
+ stop(); // Stop scheduler and wait for completion
wait();
- if (GR_TOP_BLOCK_IMPL_DEBUG)
- std::cout << "restart: threads stopped" << std::endl;
// Create new simple flow graph
gr_flat_flowgraph_sptr new_ffg = d_owner->flatten();
new_ffg->validate(); // check consistency, sanity, etc
-
- if (GR_TOP_BLOCK_IMPL_DEBUG) {
- std::cout << std::endl << "*** Existing flat flowgraph @" << d_ffg << ":" << std::endl;
- d_ffg->dump();
- }
new_ffg->merge_connections(d_ffg); // reuse buffers, etc
-
- if (GR_TOP_BLOCK_IMPL_DEBUG) {
- std::cout << std::endl << "*** New flat flowgraph after merge @" << new_ffg << ":" << std::endl;
- new_ffg->dump();
- }
-
d_ffg = new_ffg;
- start_threads();
- d_running = true;
+ // Create a new scheduler to execute it
+ d_scheduler = make_scheduler(d_ffg);
+ d_state = RUNNING;
}
void
@@ -148,14 +178,3 @@ gr_top_block_impl::dump()
if (d_ffg)
d_ffg->dump();
}
-
-gr_block_vector_t
-gr_top_block_impl::make_gr_block_vector(gr_basic_block_vector_t blocks)
-{
- gr_block_vector_t result;
- for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
- result.push_back(make_gr_block_sptr(*p));
- }
-
- return result;
-}
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl.h b/gnuradio-core/src/lib/runtime/gr_top_block_impl.h
index 869f788ef..35fb44ef9 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl.h
+++ b/gnuradio-core/src/lib/runtime/gr_top_block_impl.h
@@ -23,7 +23,11 @@
#ifndef INCLUDED_GR_TOP_BLOCK_IMPL_H
#define INCLUDED_GR_TOP_BLOCK_IMPL_H
-#include <gr_scheduler_thread.h>
+#include <gr_scheduler.h>
+#include <boost/thread.hpp>
+
+typedef boost::mutex gr_mutex; // FIXME move these elsewhere
+typedef boost::lock_guard<boost::mutex> gr_lock_guard;
/*!
*\brief Abstract implementation details of gr_top_block
@@ -37,16 +41,16 @@ class gr_top_block_impl
{
public:
gr_top_block_impl(gr_top_block *owner);
- virtual ~gr_top_block_impl();
+ ~gr_top_block_impl();
// Create and start scheduler threads
- virtual void start();
+ void start();
// Signal scheduler threads to stop
- virtual void stop() = 0;
+ void stop();
// Wait for scheduler threads to exit
- virtual void wait() = 0;
+ void wait();
// Lock the top block to allow reconfiguration
void lock();
@@ -59,22 +63,16 @@ public:
protected:
+ enum tb_state { IDLE, RUNNING };
+
gr_top_block *d_owner;
- bool d_running;
gr_flat_flowgraph_sptr d_ffg;
+ gr_scheduler_sptr d_scheduler;
- omni_mutex d_reconf; // protects d_lock_count
+ gr_mutex d_mutex; // protects d_state and d_lock_count
+ tb_state d_state;
int d_lock_count;
-
- virtual void start_threads() = 0;
-
-/*!
- * Make a vector of gr_block from a vector of gr_basic_block
- *
- * Pass-by-value to avoid problem with possible asynchronous modification
- */
- static gr_block_vector_t make_gr_block_vector(gr_basic_block_vector_t blocks);
-
+
private:
void restart();
};
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc b/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc
deleted file mode 100644
index b3e9da627..000000000
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc
+++ /dev/null
@@ -1,128 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007,2008 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING. If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include <gr_top_block.h>
-#include <gr_top_block_impl_sts.h>
-#include <gr_flat_flowgraph.h>
-#include <gr_scheduler_thread.h>
-#include <gr_local_sighandler.h>
-
-#include <stdexcept>
-#include <iostream>
-#include <string.h>
-#include <unistd.h>
-
-#define GR_TOP_BLOCK_IMPL_STS_DEBUG 0
-
-static gr_top_block_impl *s_impl = 0;
-
-
-// FIXME: This prevents using more than one gr_top_block instance
-
-static void
-runtime_sigint_handler(int signum)
-{
- if (GR_TOP_BLOCK_IMPL_STS_DEBUG){
- char *msg = "SIGINT received, calling stop()\n";
- ::write(1, msg, strlen(msg)); // write is OK to call from signal handler
- }
-
- if (s_impl)
- s_impl->stop();
-}
-
-// ----------------------------------------------------------------
-
-gr_top_block_impl_sts::gr_top_block_impl_sts(gr_top_block *owner)
- : gr_top_block_impl(owner)
-{
- if (s_impl)
- throw std::logic_error("gr_top_block_impl_sts: multiple simultaneous gr_top_blocks not allowed");
-
- s_impl = this;
-}
-
-gr_top_block_impl_sts::~gr_top_block_impl_sts()
-{
- s_impl = 0; // don't call delete we don't own these
-}
-
-void
-gr_top_block_impl_sts::start_threads()
-{
- if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
- std::cout << "start_threads: entered" << std::endl;
-
- d_graphs = d_ffg->partition();
- for (std::vector<gr_basic_block_vector_t>::iterator p = d_graphs.begin();
- p != d_graphs.end(); p++) {
- gr_scheduler_thread *thread = new gr_scheduler_thread(make_gr_block_vector(*p));
- d_threads.push_back(thread);
- if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
- std::cout << "start_threads: starting " << thread << std::endl;
- thread->start();
- }
-}
-
-/*
- * N.B. as currently implemented, it is possible that this may be
- * invoked by the SIGINT handler which is fragile as hell...
- */
-void
-gr_top_block_impl_sts::stop()
-{
- if (GR_TOP_BLOCK_IMPL_STS_DEBUG){
- char *msg = "stop: entered\n";
- ::write(1, msg, strlen(msg));
- }
-
- for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) {
- if (*p)
- (*p)->stop();
- }
-}
-
-void
-gr_top_block_impl_sts::wait()
-{
- if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
- std::cout << "wait: entered" << std::endl;
-
- void *dummy_status; // don't ever dereference this
- gr_local_sighandler sigint(SIGINT, runtime_sigint_handler);
-
- for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) {
- if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
- std::cout << "wait: joining thread " << (*p) << std::endl;
- (*p)->join(&dummy_status); // omnithreads will self-delete, so pointer is now dead
- (*p) = 0; // FIXME: switch to stl::list and actually remove from container
- if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
- std::cout << "wait: join returned" << std::endl;
- }
-
- d_threads.clear();
- d_running = false;
-}
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h b/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h
deleted file mode 100644
index ec2e51cf2..000000000
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING. If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifndef INCLUDED_GR_TOP_BLOCK_IMPL_STS_H
-#define INCLUDED_GR_TOP_BLOCK_IMPL_STS_H
-
-#include <gr_top_block_impl.h>
-#include <gr_scheduler_thread.h>
-
-/*!
- *\brief Implementation details of gr_top_block
- * \ingroup internal
- *
- * Concrete implementation of gr_top_block using gr_single_threaded_scheduler.
- */
-class gr_top_block_impl_sts : public gr_top_block_impl
-{
-public:
- gr_top_block_impl_sts(gr_top_block *owner);
- ~gr_top_block_impl_sts();
-
- // Signal scheduler threads to stop
- void stop();
-
- // Wait for scheduler threads to exit
- void wait();
-
-private:
-
- gr_scheduler_thread_vector_t d_threads;
- std::vector<gr_basic_block_vector_t> d_graphs;
-
- void start_threads();
-};
-
-#endif /* INCLUDED_GR_TOP_BLOCK_IMPL_STS_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
new file mode 100644
index 000000000..02e8deed8
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
@@ -0,0 +1,67 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_tpb_detail.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+#include <gr_buffer.h>
+
+/*
+ * We assume that no worker threads are ever running when the
+ * graph structure is being manipulated, thus it's safe for us to poke
+ * around in our neighbors w/o holding any locks.
+ */
+
+void
+gr_tpb_detail::notify_upstream(gr_block_detail *d)
+{
+ // For each of our inputs, tell the guy upstream that we've consumed
+ // some input, and that he most likely has more output buffer space
+ // available.
+
+ for (size_t i = 0; i < d->d_input.size(); i++){
+ // Can you say, "pointer chasing?"
+ d->d_input[i]->buffer()->link()->detail()->d_tpb.set_output_changed();
+ }
+}
+
+void
+gr_tpb_detail::notify_downstream(gr_block_detail *d)
+{
+ // For each of our outputs, tell the guys downstream that they have
+ // new input available.
+
+ for (size_t i = 0; i < d->d_output.size(); i++){
+ gr_buffer_sptr buf = d->d_output[i];
+ for (size_t j = 0, k = buf->nreaders(); j < k; j++)
+ buf->reader(j)->link()->detail()->d_tpb.set_input_changed();
+ }
+}
+
+void
+gr_tpb_detail::notify_neighbors(gr_block_detail *d)
+{
+ notify_downstream(d);
+ notify_upstream(d);
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
new file mode 100644
index 000000000..9566312dc
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
@@ -0,0 +1,81 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_TPB_DETAIL_H
+#define INCLUDED_GR_TPB_DETAIL_H
+
+#include <boost/thread.hpp>
+
+class gr_block_detail;
+
+/*!
+ * \brief used by thread-per-block scheduler
+ */
+struct gr_tpb_detail {
+ typedef boost::unique_lock<boost::mutex> scoped_lock;
+
+ boost::mutex mutex; //< protects all vars
+ bool input_changed;
+ boost::condition_variable input_cond;
+ bool output_changed;
+ boost::condition_variable output_cond;
+
+ gr_tpb_detail()
+ : input_changed(false), output_changed(false) {}
+
+
+ //! Called by us to tell all our upstream blocks that their output may have changed.
+ void notify_upstream(gr_block_detail *d);
+
+ //! Called by us to tell all our downstream blocks that their input may have changed.
+ void notify_downstream(gr_block_detail *d);
+
+ //! Called by us to notify both upstream and downstream
+ void notify_neighbors(gr_block_detail *d);
+
+ //! Called by us
+ void clear_changed()
+ {
+ scoped_lock guard(mutex);
+ input_changed = false;
+ output_changed = false;
+ }
+
+private:
+
+ //! Used by notify_downstream
+ void set_input_changed()
+ {
+ scoped_lock guard(mutex);
+ input_changed = true;
+ input_cond.notify_one();
+ }
+
+ //! Used by notify_upstream
+ void set_output_changed()
+ {
+ scoped_lock guard(mutex);
+ output_changed = true;
+ output_cond.notify_one();
+ }
+
+};
+
+#endif /* INCLUDED_GR_TPB_DETAIL_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
new file mode 100644
index 000000000..f61e17243
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -0,0 +1,76 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_tpb_thread_body.h>
+#include <iostream>
+
+gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
+ : d_exec(block)
+{
+ // std::cerr << "gr_tpb_thread_body: " << block << std::endl;
+
+ gr_block_detail *d = block->detail().get();
+ gr_block_executor::state s;
+
+ while (1){
+ d->d_tpb.clear_changed();
+ s = d_exec.run_one_iteration();
+
+ switch(s){
+ case gr_block_executor::READY: // Tell neighbors we made progress.
+ d->d_tpb.notify_neighbors(d);
+ break;
+
+ case gr_block_executor::READY_NO_OUTPUT: // Notify upstream only
+ d->d_tpb.notify_upstream(d);
+ break;
+
+ case gr_block_executor::DONE: // Game over.
+ d->d_tpb.notify_neighbors(d);
+ return;
+
+ case gr_block_executor::BLKD_IN: // Wait for input.
+ {
+ gr_tpb_detail::scoped_lock guard(d->d_tpb.mutex);
+ while(!d->d_tpb.input_changed)
+ d->d_tpb.input_cond.wait(guard);
+ }
+ break;
+
+ case gr_block_executor::BLKD_OUT: // Wait for output buffer space.
+ {
+ gr_tpb_detail::scoped_lock guard(d->d_tpb.mutex);
+ while(!d->d_tpb.output_changed)
+ d->d_tpb.output_cond.wait(guard);
+ }
+ break;
+
+ default:
+ assert(0);
+ }
+ }
+}
+
+gr_tpb_thread_body::~gr_tpb_thread_body()
+{
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h
new file mode 100644
index 000000000..a630b1be9
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h
@@ -0,0 +1,45 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_TPB_THREAD_BODY_H
+#define INCLUDED_GR_TPB_THREAD_BODY_H
+
+#include <gr_block_executor.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+
+/*!
+ * \brief The body of each thread-per-block thread.
+ *
+ * One of these is instantiated in its own thread for each block. The
+ * constructor turns into the main loop which returns when the block is
+ * done or is interrupted.
+ */
+
+class gr_tpb_thread_body {
+ gr_block_executor d_exec;
+
+public:
+ gr_tpb_thread_body(gr_block_sptr block);
+ ~gr_tpb_thread_body();
+};
+
+
+#endif /* INCLUDED_GR_TPB_THREAD_BODY_H */
diff --git a/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc b/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc
index ad40f724d..7434cf657 100644
--- a/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc
+++ b/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc
@@ -52,7 +52,7 @@ t0_body ()
int nitems = 4000 / sizeof (int);
int counter = 0;
- gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int)));
+ gr_buffer_sptr buf(gr_make_buffer(nitems, sizeof (int), gr_block_sptr()));
int last_sa;
int sa;
@@ -87,8 +87,8 @@ t1_body ()
int write_counter = 0;
int read_counter = 0;
- gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int)));
- gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0));
+ gr_buffer_sptr buf(gr_make_buffer(nitems, sizeof (int), gr_block_sptr()));
+ gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0, gr_block_sptr()));
int sa;
@@ -162,8 +162,8 @@ t2_body ()
int nitems = (64 * (1L << 10)) / sizeof (int); // 64K worth of ints
- gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int)));
- gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0));
+ gr_buffer_sptr buf(gr_make_buffer (nitems, sizeof (int), gr_block_sptr()));
+ gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0, gr_block_sptr()));
int read_counter = 0;
int write_counter = 0;
@@ -229,7 +229,7 @@ t3_body ()
int nitems = (64 * (1L << 10)) / sizeof (int);
static const int N = 5;
- gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int)));
+ gr_buffer_sptr buf(gr_make_buffer(nitems, sizeof (int), gr_block_sptr()));
gr_buffer_reader_sptr reader[N];
int read_counter[N];
int write_counter = 0;
@@ -237,7 +237,7 @@ t3_body ()
for (int i = 0; i < N; i++){
read_counter[i] = 0;
- reader[i] = gr_buffer_add_reader (buf, 0);
+ reader[i] = gr_buffer_add_reader (buf, 0, gr_block_sptr());
}
for (int lc = 0; lc < 1000; lc++){