diff options
Diffstat (limited to 'gnuradio-core/src/lib/runtime')
32 files changed, 1336 insertions, 498 deletions
diff --git a/gnuradio-core/src/lib/runtime/Makefile.am b/gnuradio-core/src/lib/runtime/Makefile.am index 550031b94..b21b32412 100644 --- a/gnuradio-core/src/lib/runtime/Makefile.am +++ b/gnuradio-core/src/lib/runtime/Makefile.am @@ -21,7 +21,7 @@ include $(top_srcdir)/Makefile.common -AM_CPPFLAGS = $(STD_DEFINES_AND_INCLUDES) $(CPPUNIT_INCLUDES) $(WITH_INCLUDES) +AM_CPPFLAGS = $(STD_DEFINES_AND_INCLUDES) $(CPPUNIT_INCLUDES) $(GRUEL_INCLUDES) $(WITH_INCLUDES) noinst_LTLIBRARIES = libruntime.la libruntime-qa.la @@ -35,6 +35,7 @@ libruntime_la_SOURCES = \ gr_flat_flowgraph.cc \ gr_block.cc \ gr_block_detail.cc \ + gr_block_executor.cc \ gr_hier_block2.cc \ gr_hier_block2_detail.cc \ gr_buffer.cc \ @@ -48,16 +49,19 @@ libruntime_la_SOURCES = \ gr_pagesize.cc \ gr_preferences.cc \ gr_realtime.cc \ - gr_scheduler_thread.cc \ + gr_scheduler.cc \ + gr_scheduler_sts.cc \ + gr_scheduler_tpb.cc \ gr_single_threaded_scheduler.cc \ gr_sptr_magic.cc \ gr_sync_block.cc \ gr_sync_decimator.cc \ gr_sync_interpolator.cc \ + gr_tmp_path.cc \ gr_top_block.cc \ gr_top_block_impl.cc \ - gr_top_block_impl_sts.cc \ - gr_tmp_path.cc \ + gr_tpb_detail.cc \ + gr_tpb_thread_body.cc \ gr_vmcircbuf.cc \ gr_vmcircbuf_mmap_shm_open.cc \ gr_vmcircbuf_mmap_tmpfile.cc \ @@ -82,6 +86,7 @@ grinclude_HEADERS = \ gr_flat_flowgraph.h \ gr_block.h \ gr_block_detail.h \ + gr_block_executor.h \ gr_hier_block2.h \ gr_hier_block2_detail.h \ gr_buffer.h \ @@ -97,7 +102,9 @@ grinclude_HEADERS = \ gr_preferences.h \ gr_realtime.h \ gr_runtime_types.h \ - gr_scheduler_thread.h \ + gr_scheduler.h \ + gr_scheduler_sts.h \ + gr_scheduler_tpb.h \ gr_select_handler.h \ gr_single_threaded_scheduler.h \ gr_sptr_magic.h \ @@ -106,7 +113,8 @@ grinclude_HEADERS = \ gr_sync_interpolator.h \ gr_top_block.h \ gr_top_block_impl.h \ - gr_top_block_impl_sts.h \ + gr_tpb_detail.h \ + gr_tpb_thread_body.h \ gr_timer.h \ gr_tmp_path.h \ gr_types.h \ diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc index 0a8fb92c2..7c2e9901b 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_block.cc @@ -110,3 +110,11 @@ gr_block::fixed_rate_noutput_to_ninput(int noutput) { throw std::runtime_error("Unimplemented"); } + +std::ostream& +operator << (std::ostream& os, const gr_block *m) +{ + os << "<gr_block " << m->name() << " (" << m->unique_id() << ")>"; + return os; +} + diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h index 79237ee83..437b610b4 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.h +++ b/gnuradio-core/src/lib/runtime/gr_block.h @@ -214,9 +214,13 @@ class gr_block : public gr_basic_block { typedef std::vector<gr_block_sptr> gr_block_vector_t; typedef std::vector<gr_block_sptr>::iterator gr_block_viter_t; -inline gr_block_sptr make_gr_block_sptr(gr_basic_block_sptr p) +inline gr_block_sptr cast_to_block_sptr(gr_basic_block_sptr p) { return boost::dynamic_pointer_cast<gr_block, gr_basic_block>(p); } + +std::ostream& +operator << (std::ostream& os, const gr_block *m); + #endif /* INCLUDED_GR_BLOCK_H */ diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h index a3b7731c0..2856c402c 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_detail.h +++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h @@ -24,6 +24,7 @@ #define INCLUDED_GR_BLOCK_DETAIL_H #include <gr_runtime_types.h> +#include <gr_tpb_detail.h> #include <stdexcept> /*! @@ -34,7 +35,6 @@ * of almost all users of GNU Radio. This decoupling also means that * we can make changes to the guts without having to recompile everything. */ - class gr_block_detail { public: ~gr_block_detail (); @@ -73,8 +73,14 @@ class gr_block_detail { */ void consume_each (int how_many_items); + /*! + * \brief Tell the scheduler \p how_many_items were produced on each output stream. + */ void produce_each (int how_many_items); + + gr_tpb_detail d_tpb; // used by thread-per-block scheduler + // ---------------------------------------------------------------------------- private: @@ -84,8 +90,11 @@ class gr_block_detail { std::vector<gr_buffer_sptr> d_output; bool d_done; + gr_block_detail (unsigned int ninputs, unsigned int noutputs); + friend class gr_tpb_detail; + friend gr_block_detail_sptr gr_make_block_detail (unsigned int ninputs, unsigned int noutputs); }; diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc new file mode 100644 index 000000000..fd3a916d4 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_block_executor.cc @@ -0,0 +1,329 @@ +/* -*- c++ -*- */ +/* + * Copyright 2004,2008 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gr_block_executor.h> +#include <gr_block.h> +#include <gr_block_detail.h> +#include <gr_buffer.h> +#include <boost/thread.hpp> +#include <iostream> +#include <limits> +#include <assert.h> +#include <stdio.h> + +// must be defined to either 0 or 1 +#define ENABLE_LOGGING 0 + +#if (ENABLE_LOGGING) +#define LOG(x) do { x; } while(0) +#else +#define LOG(x) do {;} while(0) +#endif + +static int which_scheduler = 0; + +inline static unsigned int +round_up (unsigned int n, unsigned int multiple) +{ + return ((n + multiple - 1) / multiple) * multiple; +} + +inline static unsigned int +round_down (unsigned int n, unsigned int multiple) +{ + return (n / multiple) * multiple; +} + +// +// Return minimum available write space in all our downstream buffers +// or -1 if we're output blocked and the output we're blocked +// on is done. +// +static int +min_available_space (gr_block_detail *d, int output_multiple) +{ + int min_space = std::numeric_limits<int>::max(); + + for (int i = 0; i < d->noutputs (); i++){ + gr_buffer::scoped_lock guard(*d->output(i)->mutex()); +#if 0 + int n = round_down(d->output(i)->space_available(), output_multiple); +#else + int n = round_down(std::min(d->output(i)->space_available(), + d->output(i)->bufsize()/2), + output_multiple); +#endif + if (n == 0){ // We're blocked on output. + if (d->output(i)->done()){ // Downstream is done, therefore we're done. + return -1; + } + return 0; + } + min_space = std::min (min_space, n); + } + return min_space; +} + + + +gr_block_executor::gr_block_executor (gr_block_sptr block) + : d_block(block), d_log(0) +{ + if (ENABLE_LOGGING){ + char name[100]; + snprintf(name, sizeof(name), "sst-%03d.log", which_scheduler++); + d_log = new std::ofstream(name); + std::unitbuf(*d_log); // make it unbuffered... + *d_log << "gr_block_executor: " + << d_block << std::endl; + } + + d_block->start(); // enable any drivers, etc. +} + +gr_block_executor::~gr_block_executor () +{ + if (ENABLE_LOGGING) + delete d_log; + + d_block->stop(); // stop any drivers, etc. +} + +gr_block_executor::state +gr_block_executor::run_one_iteration() +{ + int noutput_items; + int max_items_avail; + + gr_block *m = d_block.get(); + gr_block_detail *d = m->detail().get(); + + LOG(*d_log << std::endl << m); + + if (d->done()){ + assert(0); + return DONE; + } + + if (d->source_p ()){ + d_ninput_items_required.resize (0); + d_ninput_items.resize (0); + d_input_items.resize (0); + d_input_done.resize(0); + d_output_items.resize (d->noutputs ()); + + // determine the minimum available output space + noutput_items = min_available_space (d, m->output_multiple ()); + LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl); + if (noutput_items == -1) // we're done + goto were_done; + + if (noutput_items == 0){ // we're output blocked + LOG(*d_log << " BLKD_OUT\n"); + return BLKD_OUT; + } + + goto setup_call_to_work; // jump to common code + } + + else if (d->sink_p ()){ + d_ninput_items_required.resize (d->ninputs ()); + d_ninput_items.resize (d->ninputs ()); + d_input_items.resize (d->ninputs ()); + d_input_done.resize(d->ninputs()); + d_output_items.resize (0); + LOG(*d_log << " sink\n"); + + max_items_avail = 0; + for (int i = 0; i < d->ninputs (); i++){ + { + /* + * Acquire the mutex and grab local copies of items_available and done. + */ + gr_buffer::scoped_lock guard(*d->input(i)->mutex()); + d_ninput_items[i] = d->input(i)->items_available(); + d_input_done[i] = d->input(i)->done(); + } + + LOG(*d_log << " d_ninput_items[" << i << "] = " << d_ninput_items[i] << std::endl); + LOG(*d_log << " d_input_done[" << i << "] = " << d_input_done[i] << std::endl); + + if (d_ninput_items[i] < m->output_multiple() && d_input_done[i]) + goto were_done; + + max_items_avail = std::max (max_items_avail, d_ninput_items[i]); + } + + // take a swag at how much output we can sink + noutput_items = (int) (max_items_avail * m->relative_rate ()); + noutput_items = round_down (noutput_items, m->output_multiple ()); + LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl); + LOG(*d_log << " noutput_items = " << noutput_items << std::endl); + + if (noutput_items == 0){ // we're blocked on input + LOG(*d_log << " BLKD_IN\n"); + return BLKD_IN; + } + + goto try_again; // Jump to code shared with regular case. + } + + else { + // do the regular thing + d_ninput_items_required.resize (d->ninputs ()); + d_ninput_items.resize (d->ninputs ()); + d_input_items.resize (d->ninputs ()); + d_input_done.resize(d->ninputs()); + d_output_items.resize (d->noutputs ()); + + max_items_avail = 0; + for (int i = 0; i < d->ninputs (); i++){ + { + /* + * Acquire the mutex and grab local copies of items_available and done. + */ + gr_buffer::scoped_lock guard(*d->input(i)->mutex()); + d_ninput_items[i] = d->input(i)->items_available (); + d_input_done[i] = d->input(i)->done(); + } + max_items_avail = std::max (max_items_avail, d_ninput_items[i]); + } + + // determine the minimum available output space + noutput_items = min_available_space (d, m->output_multiple ()); + if (ENABLE_LOGGING){ + *d_log << " regular "; + if (m->relative_rate() >= 1.0) + *d_log << "1:" << m->relative_rate() << std::endl; + else + *d_log << 1.0/m->relative_rate() << ":1\n"; + *d_log << " max_items_avail = " << max_items_avail << std::endl; + *d_log << " noutput_items = " << noutput_items << std::endl; + } + if (noutput_items == -1) // we're done + goto were_done; + + if (noutput_items == 0){ // we're output blocked + LOG(*d_log << " BLKD_OUT\n"); + return BLKD_OUT; + } + + try_again: + if (m->fixed_rate()){ + // try to work it forward starting with max_items_avail. + // We want to try to consume all the input we've got. + int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail); + reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple()); + if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items) + noutput_items = reqd_noutput_items; + } + + // ask the block how much input they need to produce noutput_items + m->forecast (noutput_items, d_ninput_items_required); + + // See if we've got sufficient input available + + int i; + for (i = 0; i < d->ninputs (); i++) + if (d_ninput_items_required[i] > d_ninput_items[i]) // not enough + break; + + if (i < d->ninputs ()){ // not enough input on input[i] + // if we can, try reducing the size of our output request + if (noutput_items > m->output_multiple ()){ + noutput_items /= 2; + noutput_items = round_up (noutput_items, m->output_multiple ()); + goto try_again; + } + + // We're blocked on input + LOG(*d_log << " BLKD_IN\n"); + if (d_input_done[i]) // If the upstream block is done, we're done + goto were_done; + + // Is it possible to ever fulfill this request? + if (d_ninput_items_required[i] > d->input(i)->max_possible_items_available ()){ + // Nope, never going to happen... + std::cerr << "\nsched: <gr_block " << m->name() + << " (" << m->unique_id() << ")>" + << " is requesting more input data\n" + << " than we can provide.\n" + << " ninput_items_required = " + << d_ninput_items_required[i] << "\n" + << " max_possible_items_available = " + << d->input(i)->max_possible_items_available() << "\n" + << " If this is a filter, consider reducing the number of taps.\n"; + goto were_done; + } + + return BLKD_IN; + } + + // We've got enough data on each input to produce noutput_items. + // Finish setting up the call to work. + + for (int i = 0; i < d->ninputs (); i++) + d_input_items[i] = d->input(i)->read_pointer(); + + setup_call_to_work: + + for (int i = 0; i < d->noutputs (); i++) + d_output_items[i] = d->output(i)->write_pointer(); + + // Do the actual work of the block + int n = m->general_work (noutput_items, d_ninput_items, + d_input_items, d_output_items); + LOG(*d_log << " general_work: noutput_items = " << noutput_items + << " result = " << n << std::endl); + + if (n == -1) // block is done + goto were_done; + + d->produce_each (n); // advance write pointers + if (n > 0) + return READY; + + // We didn't produce any output even though we called general_work. + // We have (most likely) consumed some input. + + // If this is a source, it's broken. + if (d->source_p()){ + std::cerr << "gr_block_executor: source " << m + << " returned 0 from work. We're marking it DONE.\n"; + // FIXME maybe we ought to raise an exception... + goto were_done; + } + + // Have the caller try again... + return READY_NO_OUTPUT; + } + assert (0); + + were_done: + LOG(*d_log << " were_done\n"); + d->set_done (true); + return DONE; +} diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.h b/gnuradio-core/src/lib/runtime/gr_block_executor.h new file mode 100644 index 000000000..41b5ede7c --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_block_executor.h @@ -0,0 +1,69 @@ +/* -*- c++ -*- */ +/* + * Copyright 2004,2008 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_GR_BLOCK_EXECUTOR_H +#define INCLUDED_GR_BLOCK_EXECUTOR_H + +#include <gr_runtime_types.h> +#include <fstream> + +//class gr_block_executor; +//typedef boost::shared_ptr<gr_block_executor> gr_block_executor_sptr; + + +/*! + * \brief Manage the execution of a single block. + * \ingroup internal + */ + +class gr_block_executor { +protected: + gr_block_sptr d_block; // The block we're trying to run + std::ofstream *d_log; + + // These are allocated here so we don't have to on each iteration + + gr_vector_int d_ninput_items_required; + gr_vector_int d_ninput_items; + gr_vector_const_void_star d_input_items; + std::vector<bool> d_input_done; + gr_vector_void_star d_output_items; + + public: + gr_block_executor(gr_block_sptr block); + ~gr_block_executor (); + + enum state { + READY, // We made progress; everything's cool. + READY_NO_OUTPUT, // We consumed some input, but produced no output. + BLKD_IN, // no progress; we're blocked waiting for input data. + BLKD_OUT, // no progress; we're blocked waiting for output buffer space. + DONE, // we're done; don't call me again. + }; + + /* + * \brief Run one iteration. + */ + state run_one_iteration(); +}; + +#endif /* INCLUDED_GR_BLOCK_EXECUTOR_H */ diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.cc b/gnuradio-core/src/lib/runtime/gr_buffer.cc index 77f0c7c43..31a471ea7 100644 --- a/gnuradio-core/src/lib/runtime/gr_buffer.cc +++ b/gnuradio-core/src/lib/runtime/gr_buffer.cc @@ -77,10 +77,10 @@ minimum_buffer_items (long type_size, long page_size) } -gr_buffer::gr_buffer (int nitems, size_t sizeof_item) +gr_buffer::gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link) : d_base (0), d_bufsize (0), d_vmcircbuf (0), - d_sizeof_item (sizeof_item), d_write_index (0), - d_done (false) + d_sizeof_item (sizeof_item), d_link(link), + d_write_index (0), d_done (false) { if (!allocate_buffer (nitems, sizeof_item)) throw std::bad_alloc (); @@ -89,9 +89,9 @@ gr_buffer::gr_buffer (int nitems, size_t sizeof_item) } gr_buffer_sptr -gr_make_buffer (int nitems, size_t sizeof_item) +gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link) { - return gr_buffer_sptr (new gr_buffer (nitems, sizeof_item)); + return gr_buffer_sptr (new gr_buffer (nitems, sizeof_item, link)); } gr_buffer::~gr_buffer () @@ -146,7 +146,7 @@ gr_buffer::allocate_buffer (int nitems, size_t sizeof_item) int -gr_buffer::space_available () const +gr_buffer::space_available () { if (d_readers.empty ()) return d_bufsize - 1; // See comment below @@ -175,18 +175,27 @@ gr_buffer::write_pointer () void gr_buffer::update_write_pointer (int nitems) { + scoped_lock guard(*mutex()); d_write_index = index_add (d_write_index, nitems); } +void +gr_buffer::set_done (bool done) +{ + scoped_lock guard(*mutex()); + d_done = done; +} + gr_buffer_reader_sptr -gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload) +gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link) { if (nzero_preload < 0) throw std::invalid_argument("gr_buffer_add_reader: nzero_preload must be >= 0"); gr_buffer_reader_sptr r (new gr_buffer_reader (buf, buf->index_sub(buf->d_write_index, - nzero_preload))); + nzero_preload), + link)); buf->d_readers.push_back (r.get ()); return r; @@ -214,8 +223,9 @@ gr_buffer_ncurrently_allocated () // ---------------------------------------------------------------------------- -gr_buffer_reader::gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index) - : d_buffer (buffer), d_read_index (read_index) +gr_buffer_reader::gr_buffer_reader(gr_buffer_sptr buffer, unsigned int read_index, + gr_block_sptr link) + : d_buffer(buffer), d_read_index(read_index), d_link(link) { s_buffer_reader_count++; } @@ -241,6 +251,7 @@ gr_buffer_reader::read_pointer () void gr_buffer_reader::update_read_pointer (int nitems) { + scoped_lock guard(*mutex()); d_read_index = d_buffer->index_add (d_read_index, nitems); } diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.h b/gnuradio-core/src/lib/runtime/gr_buffer.h index cf578c89d..75063cc6a 100644 --- a/gnuradio-core/src/lib/runtime/gr_buffer.h +++ b/gnuradio-core/src/lib/runtime/gr_buffer.h @@ -24,6 +24,8 @@ #define INCLUDED_GR_BUFFER_H #include <gr_runtime_types.h> +#include <boost/weak_ptr.hpp> +#include <boost/thread.hpp> class gr_vmcircbuf; @@ -33,8 +35,12 @@ class gr_vmcircbuf; * The total size of the buffer will be rounded up to a system * dependent boundary. This is typically the system page size, but * under MS windows is 64KB. + * + * \param nitems is the minimum number of items the buffer will hold. + * \param sizeof_item is the size of an item in bytes. + * \param link is the block that writes to this buffer. */ -gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item); +gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link=gr_block_sptr()); /*! @@ -43,12 +49,20 @@ gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item); */ class gr_buffer { public: + + typedef boost::unique_lock<boost::mutex> scoped_lock; + virtual ~gr_buffer (); /*! * \brief return number of items worth of space available for writing */ - int space_available () const; + int space_available (); + + /*! + * \brief return size of this buffer in items + */ + int bufsize() const { return d_bufsize; } /*! * \brief return pointer to write buffer. @@ -63,17 +77,26 @@ class gr_buffer { */ void update_write_pointer (int nitems); - - void set_done (bool done) { d_done = done; } + void set_done (bool done); bool done () const { return d_done; } + /*! + * \brief Return the block that writes to this buffer. + */ + gr_block_sptr link() { return gr_block_sptr(d_link); } + + size_t nreaders() const { return d_readers.size(); } + gr_buffer_reader* reader(size_t index) { return d_readers[index]; } + + boost::mutex *mutex() { return &d_mutex; } + // ------------------------------------------------------------------------- private: friend class gr_buffer_reader; - friend gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item); - friend gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload); + friend gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link); + friend gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link); protected: char *d_base; // base address of buffer @@ -81,8 +104,14 @@ class gr_buffer { private: gr_vmcircbuf *d_vmcircbuf; size_t d_sizeof_item; // in bytes - unsigned int d_write_index; // in items [0,d_bufsize) std::vector<gr_buffer_reader *> d_readers; + boost::weak_ptr<gr_block> d_link; // block that writes to this buffer + + // + // The mutex protects d_write_index, d_done and the d_read_index's in the buffer readers. + // + boost::mutex d_mutex; + unsigned int d_write_index; // in items [0,d_bufsize) bool d_done; unsigned @@ -116,11 +145,15 @@ class gr_buffer { * * Allocate a buffer that holds at least \p nitems of size \p sizeof_item. * + * \param nitems is the minimum number of items the buffer will hold. + * \param sizeof_item is the size of an item in bytes. + * \param link is the block that writes to this buffer. + * * The total size of the buffer will be rounded up to a system * dependent boundary. This is typically the system page size, but * under MS windows is 64KB. */ - gr_buffer (int nitems, size_t sizeof_item); + gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link); /*! * \brief disassociate \p reader from this buffer @@ -132,8 +165,10 @@ class gr_buffer { /*! * \brief create a new gr_buffer_reader and attach it to buffer \p buf * \param nzero_preload -- number of zero items to "preload" into buffer. + * \param link is the block that reads from the buffer using this gr_buffer_reader. */ -gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload); +gr_buffer_reader_sptr +gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link=gr_block_sptr()); //! returns # of gr_buffers currently allocated long gr_buffer_ncurrently_allocated (); @@ -147,8 +182,10 @@ long gr_buffer_ncurrently_allocated (); */ class gr_buffer_reader { - public: + + typedef gr_buffer::scoped_lock scoped_lock; + ~gr_buffer_reader (); /*! @@ -183,19 +220,29 @@ class gr_buffer_reader { void set_done (bool done) { d_buffer->set_done (done); } bool done () const { return d_buffer->done (); } + boost::mutex *mutex() { return d_buffer->mutex(); } + + + /*! + * \brief Return the block that reads via this reader. + */ + gr_block_sptr link() { return gr_block_sptr(d_link); } + // ------------------------------------------------------------------------- private: friend class gr_buffer; - friend gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload); + friend gr_buffer_reader_sptr + gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link); gr_buffer_sptr d_buffer; unsigned int d_read_index; // in items [0,d->buffer.d_bufsize) + boost::weak_ptr<gr_block> d_link; // block that reads via this buffer reader //! constructor is private. Use gr_buffer::add_reader to create instances - gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index); + gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index, gr_block_sptr link); }; //! returns # of gr_buffer_readers currently allocated diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.i b/gnuradio-core/src/lib/runtime/gr_buffer.i index 38e1d945d..4c1c5afae 100644 --- a/gnuradio-core/src/lib/runtime/gr_buffer.i +++ b/gnuradio-core/src/lib/runtime/gr_buffer.i @@ -26,14 +26,14 @@ typedef boost::shared_ptr<gr_buffer> gr_buffer_sptr; %rename(buffer) gr_make_buffer; %ignore gr_buffer; -gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item); +gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link); class gr_buffer { public: ~gr_buffer (); private: - gr_buffer (int nitems, size_t sizeof_item); + gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link); }; @@ -43,7 +43,7 @@ typedef boost::shared_ptr<gr_buffer_reader> gr_buffer_reader_sptr; %ignore gr_buffer_reader; %rename(buffer_add_reader) gr_buffer_add_reader; -gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload); +gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link); class gr_buffer_reader { public: @@ -51,7 +51,7 @@ class gr_buffer_reader { private: friend class gr_buffer; - gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index); + gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index, gr_block_sptr link); }; diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc index aa1aa8353..031eb6dfd 100644 --- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc +++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc @@ -33,6 +33,11 @@ #define GR_FLAT_FLOWGRAPH_DEBUG 0 +// 32Kbyte buffer size between blocks +#define GR_FIXED_BUFFER_SIZE (32*(1L<<10)) + +static const unsigned int s_fixed_buffer_size = GR_FIXED_BUFFER_SIZE; + gr_flat_flowgraph_sptr gr_make_flat_flowgraph() { @@ -54,7 +59,7 @@ gr_flat_flowgraph::setup_connections() // Assign block details to blocks for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) - make_gr_block_sptr(*p)->set_detail(allocate_block_detail(*p)); + cast_to_block_sptr(*p)->set_detail(allocate_block_detail(*p)); // Connect inputs to outputs for each block for(gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) @@ -84,11 +89,15 @@ gr_flat_flowgraph::allocate_block_detail(gr_basic_block_sptr block) gr_buffer_sptr gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port) { - gr_block_sptr grblock = make_gr_block_sptr(block); + gr_block_sptr grblock = cast_to_block_sptr(block); if (!grblock) throw std::runtime_error("allocate_buffer found non-gr_block"); int item_size = block->output_signature()->sizeof_stream_item(port); - int nitems = s_fixed_buffer_size/item_size; + + // *2 because we're now only filling them 1/2 way in order to + // increase the available parallelism when using the TPB scheduler. + // (We're double buffering, where we used to single buffer) + int nitems = s_fixed_buffer_size * 2 / item_size; // Make sure there are at least twice the output_multiple no. of items if (nitems < 2*grblock->output_multiple()) // Note: this means output_multiple() @@ -99,7 +108,7 @@ gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port) gr_basic_block_vector_t blocks = calc_downstream_blocks(block, port); for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) { - gr_block_sptr dgrblock = make_gr_block_sptr(*p); + gr_block_sptr dgrblock = cast_to_block_sptr(*p); if (!dgrblock) throw std::runtime_error("allocate_buffer found non-gr_block"); @@ -109,13 +118,13 @@ gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port) nitems = std::max(nitems, static_cast<int>(2*(decimation*multiple+history))); } - return gr_make_buffer(nitems, item_size); + return gr_make_buffer(nitems, item_size, grblock); } void gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block) { - gr_block_sptr grblock = make_gr_block_sptr(block); + gr_block_sptr grblock = cast_to_block_sptr(block); if (!grblock) throw std::runtime_error("connect_block_inputs found non-gr_block"); @@ -130,7 +139,7 @@ gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block) int dst_port = e->dst().port(); int src_port = e->src().port(); gr_basic_block_sptr src_block = e->src().block(); - gr_block_sptr src_grblock = make_gr_block_sptr(src_block); + gr_block_sptr src_grblock = cast_to_block_sptr(src_block); if (!src_grblock) throw std::runtime_error("connect_block_inputs found non-gr_block"); gr_buffer_sptr src_buffer = src_grblock->detail()->output(src_port); @@ -138,7 +147,7 @@ gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block) if (GR_FLAT_FLOWGRAPH_DEBUG) std::cout << "Setting input " << dst_port << " from edge " << (*e) << std::endl; - detail->set_input(dst_port, gr_buffer_add_reader(src_buffer, grblock->history()-1)); + detail->set_input(dst_port, gr_buffer_add_reader(src_buffer, grblock->history()-1, grblock)); } } @@ -149,7 +158,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg) // by flattening will need one; existing blocks still in the new flowgraph will // already have one. for (gr_basic_block_viter_t p = d_blocks.begin(); p != d_blocks.end(); p++) { - gr_block_sptr block = make_gr_block_sptr(*p); + gr_block_sptr block = cast_to_block_sptr(*p); if (!block->detail()) { if (GR_FLAT_FLOWGRAPH_DEBUG) @@ -177,7 +186,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg) if (GR_FLAT_FLOWGRAPH_DEBUG) std::cout << "not in new edge list" << std::endl; // zero the buffer reader on RHS of old edge - gr_block_sptr block(make_gr_block_sptr(old_edge->dst().block())); + gr_block_sptr block(cast_to_block_sptr(old_edge->dst().block())); int port = old_edge->dst().port(); block->detail()->set_input(port, gr_buffer_reader_sptr()); } @@ -189,7 +198,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg) // Now connect inputs to outputs, reusing old buffer readers if they exist for (gr_basic_block_viter_t p = d_blocks.begin(); p != d_blocks.end(); p++) { - gr_block_sptr block = make_gr_block_sptr(*p); + gr_block_sptr block = cast_to_block_sptr(*p); if (GR_FLAT_FLOWGRAPH_DEBUG) std::cout << "merge: merging " << (*p) << "..."; @@ -208,7 +217,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg) gr_edge edge = calc_upstream_edge(*p, i); // Fish out old buffer reader and see if it matches correct buffer from edge list - gr_block_sptr src_block = make_gr_block_sptr(edge.src().block()); + gr_block_sptr src_block = cast_to_block_sptr(edge.src().block()); gr_block_detail_sptr src_detail = src_block->detail(); gr_buffer_sptr src_buffer = src_detail->output(edge.src().port()); gr_buffer_reader_sptr old_reader; @@ -225,7 +234,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg) std::cout << "needs a new reader" << std::endl; // Create new buffer reader and assign - detail->set_input(i, gr_buffer_add_reader(src_buffer, block->history()-1)); + detail->set_input(i, gr_buffer_add_reader(src_buffer, block->history()-1, block)); } } } @@ -248,7 +257,7 @@ void gr_flat_flowgraph::dump() for (gr_basic_block_viter_t p = d_blocks.begin(); p != d_blocks.end(); p++) { std::cout << " block: " << (*p) << std::endl; - gr_block_detail_sptr detail = make_gr_block_sptr(*p)->detail(); + gr_block_detail_sptr detail = cast_to_block_sptr(*p)->detail(); std::cout << " detail @" << detail << ":" << std::endl; int ni = detail->ninputs(); @@ -269,3 +278,14 @@ void gr_flat_flowgraph::dump() } } + +gr_block_vector_t +gr_flat_flowgraph::make_block_vector(gr_basic_block_vector_t &blocks) +{ + gr_block_vector_t result; + for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) { + result.push_back(cast_to_block_sptr(*p)); + } + + return result; +} diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h index 184ee4514..673c4df16 100644 --- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h +++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h @@ -26,9 +26,6 @@ #include <gr_flowgraph.h> #include <gr_block.h> -// 32Kbyte buffer size between blocks -#define GR_FIXED_BUFFER_SIZE (32*(1L<<10)) - // Create a shared pointer to a heap allocated gr_flat_flowgraph // (types defined in gr_runtime_types.h) gr_flat_flowgraph_sptr gr_make_flat_flowgraph(); @@ -55,10 +52,14 @@ public: void dump(); + /*! + * Make a vector of gr_block from a vector of gr_basic_block + */ + static gr_block_vector_t make_block_vector(gr_basic_block_vector_t &blocks); + private: gr_flat_flowgraph(); - static const unsigned int s_fixed_buffer_size = GR_FIXED_BUFFER_SIZE; gr_block_detail_sptr allocate_block_detail(gr_basic_block_sptr block); gr_buffer_sptr allocate_buffer(gr_basic_block_sptr block, int port); void connect_block_inputs(gr_basic_block_sptr block); diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h index c97a50782..fc407e72b 100644 --- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h +++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h @@ -122,6 +122,9 @@ public: // Return vector of connected blocks gr_basic_block_vector_t calc_used_blocks(); + // Return toplogically sorted vector of blocks. All the sources come first. + gr_basic_block_vector_t topological_sort(gr_basic_block_vector_t &blocks); + // Return vector of vectors of disjointly connected blocks, topologically // sorted. std::vector<gr_basic_block_vector_t> partition(); @@ -149,7 +152,6 @@ private: gr_basic_block_vector_t calc_reachable_blocks(gr_basic_block_sptr block, gr_basic_block_vector_t &blocks); void reachable_dfs_visit(gr_basic_block_sptr block, gr_basic_block_vector_t &blocks); gr_basic_block_vector_t calc_adjacent_blocks(gr_basic_block_sptr block, gr_basic_block_vector_t &blocks); - gr_basic_block_vector_t topological_sort(gr_basic_block_vector_t &blocks); gr_basic_block_vector_t sort_sources_first(gr_basic_block_vector_t &blocks); bool source_p(gr_basic_block_sptr block); void topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_vector_t &output); diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc index 32cac2ea8..a026851d2 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc @@ -303,7 +303,7 @@ gr_hier_block2_detail::resolve_endpoint(const gr_endpoint &endp, bool is_input) std::stringstream msg; // Check if endpoint is a leaf node - if (make_gr_block_sptr(endp.block())) + if (cast_to_block_sptr(endp.block())) return endp; // Check if endpoint is a hierarchical block diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler.cc b/gnuradio-core/src/lib/runtime/gr_scheduler.cc new file mode 100644 index 000000000..e4d8b3dd9 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_scheduler.cc @@ -0,0 +1,33 @@ +/* -*- c++ -*- */ +/* + * Copyright 2008 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <gr_scheduler.h> + +gr_scheduler::gr_scheduler(gr_flat_flowgraph_sptr ffg) +{ +} + +gr_scheduler::~gr_scheduler() +{ +} diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler.h b/gnuradio-core/src/lib/runtime/gr_scheduler.h new file mode 100644 index 000000000..13bc1ff14 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_scheduler.h @@ -0,0 +1,64 @@ +/* -*- c++ -*- */ +/* + * Copyright 2008 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef INCLUDED_GR_SCHEDULER_H +#define INCLUDED_GR_SCHEDULER_H + +#include <boost/utility.hpp> +#include <gr_block.h> +#include <gr_flat_flowgraph.h> + + +class gr_scheduler; +typedef boost::shared_ptr<gr_scheduler> gr_scheduler_sptr; + + +/*! + * \brief Abstract scheduler that takes a flattened flow graph and runs it. + * + * Preconditions: details, buffers and buffer readers have been assigned. + */ +class gr_scheduler : boost::noncopyable +{ + +public: + /*! + * \brief Construct a scheduler and begin evaluating the graph. + * + * The scheduler will continue running until all blocks until they + * report that they are done or the stop method is called. + */ + gr_scheduler(gr_flat_flowgraph_sptr ffg); + + virtual ~gr_scheduler(); + + /*! + * \brief Tell the scheduler to stop executing. + */ + virtual void stop() = 0; + + /*! + * \brief Block until the graph is done. + */ + virtual void wait() = 0; +}; + +#endif /* INCLUDED_GR_SCHEDULER_H */ diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc new file mode 100644 index 000000000..fefc0dc70 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc @@ -0,0 +1,87 @@ +/* -*- c++ -*- */ +/* + * Copyright 2008 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <gr_scheduler_sts.h> +#include <gr_single_threaded_scheduler.h> +#include <gruel/thread_body_wrapper.h> + +class sts_container +{ + gr_block_vector_t d_blocks; + +public: + + sts_container(gr_block_vector_t blocks) + : d_blocks(blocks) {} + + void operator()() + { + gr_make_single_threaded_scheduler(d_blocks)->run(); + } +}; + + +gr_scheduler_sptr +gr_scheduler_sts::make(gr_flat_flowgraph_sptr ffg) +{ + return gr_scheduler_sptr(new gr_scheduler_sts(ffg)); +} + +gr_scheduler_sts::gr_scheduler_sts(gr_flat_flowgraph_sptr ffg) + : gr_scheduler(ffg) +{ + // Split the flattened flow graph into discrete partitions, each + // of which is topologically sorted. + + std::vector<gr_basic_block_vector_t> graphs = ffg->partition(); + + // For each partition, create a thread to evaluate it using + // an instance of the gr_single_threaded_scheduler + + for (std::vector<gr_basic_block_vector_t>::iterator p = graphs.begin(); + p != graphs.end(); p++) { + + gr_block_vector_t blocks = gr_flat_flowgraph::make_block_vector(*p); + d_threads.create_thread( + gruel::thread_body_wrapper<sts_container>(sts_container(blocks), + "single-threaded-scheduler")); + } +} + +gr_scheduler_sts::~gr_scheduler_sts() +{ + stop(); +} + +void +gr_scheduler_sts::stop() +{ + d_threads.interrupt_all(); +} + +void +gr_scheduler_sts::wait() +{ + d_threads.join_all(); +} diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h new file mode 100644 index 000000000..4cf835156 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h @@ -0,0 +1,62 @@ +/* -*- c++ -*- */ +/* + * Copyright 2008 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#ifndef INCLUDED_GR_SCHEDULER_STS_H +#define INCLUDED_GR_SCHEDULER_STS_H + +#include <gr_scheduler.h> +#include <gruel/thread_group.h> + +/*! + * \brief Concrete scheduler that uses the single_threaded_scheduler + */ +class gr_scheduler_sts : public gr_scheduler +{ + gruel::thread_group d_threads; + +protected: + /*! + * \brief Construct a scheduler and begin evaluating the graph. + * + * The scheduler will continue running until all blocks until they + * report that they are done or the stop method is called. + */ + gr_scheduler_sts(gr_flat_flowgraph_sptr ffg); + +public: + static gr_scheduler_sptr make(gr_flat_flowgraph_sptr ffg); + + ~gr_scheduler_sts(); + + /*! + * \brief Tell the scheduler to stop executing. + */ + void stop(); + + /*! + * \brief Block until the graph is done. + */ + void wait(); +}; + + + + +#endif /* INCLUDED_GR_SCHEDULER_STS_H */ diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc deleted file mode 100644 index 07bd60500..000000000 --- a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc +++ /dev/null @@ -1,110 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2007 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * GNU Radio is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3, or (at your option) - * any later version. - * - * GNU Radio is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with GNU Radio; see the file COPYING. If not, write to - * the Free Software Foundation, Inc., 51 Franklin Street, - * Boston, MA 02110-1301, USA. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include <gr_scheduler_thread.h> -#include <iostream> -#include <stdio.h> - -#ifdef HAVE_SIGNAL_H -#include <signal.h> -#endif - -#define GR_SCHEDULER_THREAD_DEBUG 0 - -gr_scheduler_thread::gr_scheduler_thread(gr_block_vector_t graph) : - omni_thread(NULL, PRIORITY_NORMAL), - d_sts(gr_make_single_threaded_scheduler(graph)) -{ -} - -gr_scheduler_thread::~gr_scheduler_thread() -{ -} - -void gr_scheduler_thread::start() -{ - if (GR_SCHEDULER_THREAD_DEBUG) - std::cout << "gr_scheduler_thread::start() " - << this << std::endl; - start_undetached(); -} - -void * -gr_scheduler_thread::run_undetached(void *arg) -{ - // This is the first code to run in the new thread context. - - /* - * In general, on a *nix system, any thread of a process can receive - * any asynchronous signal. - * - * http://www.serpentine.com/blog/threads-faq/mixing-threads-and-signals-unix/ - * http://www.linuxjournal.com/article/2121 - * - * We really don't want to be handling asynchronous signals such - * as SIGINT and SIGHUP here. We mask them off in the signal - * processing threads so that they'll get handled by the mainline - * thread. We leave the synchronous signals SIGQUIT, SIGBUS, - * SIGILL, SIGSEGV etc alone - * - * FIXME? It might be better to mask them all off in the parent - * thread then dedicate a single thread to handling all signals - * using sigwait. - */ -#if defined(HAVE_PTHREAD_SIGMASK) || defined(HAVE_SIGPROCMASK) - sigset_t old_set; - sigset_t new_set; - int r; - sigemptyset(&new_set); - sigaddset(&new_set, SIGINT); - sigaddset(&new_set, SIGHUP); - sigaddset(&new_set, SIGPIPE); - sigaddset(&new_set, SIGALRM); - sigaddset(&new_set, SIGCHLD); - -#ifdef HAVE_PTHREAD_SIGMASK - r = pthread_sigmask(SIG_BLOCK, &new_set, &old_set); - if (r != 0) - perror("pthread_sigmask"); -#else - r = sigprocmask(SIG_BLOCK, &new_set, &old_set); - if (r != 0) - perror("sigprocmask"); -#endif -#endif - // Run the single-threaded scheduler - d_sts->run(); - return 0; -} - -void -gr_scheduler_thread::stop() -{ - if (0 && GR_SCHEDULER_THREAD_DEBUG) // FIXME not safe to call from signal handler - std::cout << "gr_scheduler_thread::stop() " - << this << std::endl; - d_sts->stop(); -} diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.h b/gnuradio-core/src/lib/runtime/gr_scheduler_thread.h deleted file mode 100644 index 89daba403..000000000 --- a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.h +++ /dev/null @@ -1,59 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2007 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * GNU Radio is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3, or (at your option) - * any later version. - * - * GNU Radio is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with GNU Radio; see the file COPYING. If not, write to - * the Free Software Foundation, Inc., 51 Franklin Street, - * Boston, MA 02110-1301, USA. - */ - -#ifndef INCLUDED_GR_SCHEDULER_THREAD_H -#define INCLUDED_GR_SCHEDULER_THREAD_H - -#include <omnithread.h> -#include <gr_single_threaded_scheduler.h> -#include <gr_block.h> - -// omnithread calls delete on itself after thread exits, so can't use shared ptr -class gr_scheduler_thread; -typedef std::vector<gr_scheduler_thread *> gr_scheduler_thread_vector_t; -typedef gr_scheduler_thread_vector_t::iterator gr_scheduler_thread_viter_t; - -/*! - *\brief A single thread of execution for the scheduler - * - * \ingroup internal - * This class implements a single thread that runs undetached, and - * invokes the single-threaded block scheduler. The runtime makes - * one of these for each distinct partition of a flowgraph and runs - * them in parallel. - * - */ -class gr_scheduler_thread : public omni_thread -{ -private: - gr_single_threaded_scheduler_sptr d_sts; - -public: - gr_scheduler_thread(gr_block_vector_t graph); - ~gr_scheduler_thread(); - - virtual void *run_undetached(void *arg); - void start(); - void stop(); -}; - -#endif /* INCLUDED_GR_SCHEDULER_THREAD_H */ diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc new file mode 100644 index 000000000..af0338570 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc @@ -0,0 +1,95 @@ +/* -*- c++ -*- */ +/* + * Copyright 2008 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <gr_scheduler_tpb.h> +#include <gr_tpb_thread_body.h> +#include <gruel/thread_body_wrapper.h> +#include <sstream> + +/* + * You know, a lambda expression would be sooo much easier... + */ +class tpb_container +{ + gr_block_sptr d_block; + +public: + tpb_container(gr_block_sptr block) : d_block(block) {} + + void operator()() + { + gr_tpb_thread_body body(d_block); + } +}; + + +gr_scheduler_sptr +gr_scheduler_tpb::make(gr_flat_flowgraph_sptr ffg) +{ + return gr_scheduler_sptr(new gr_scheduler_tpb(ffg)); +} + +gr_scheduler_tpb::gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg) + : gr_scheduler(ffg) +{ + // Get a topologically sorted vector of all the blocks in use. + // Being topologically sorted probably isn't going to matter, but + // there's a non-zero chance it might help... + + gr_basic_block_vector_t used_blocks = ffg->calc_used_blocks(); + used_blocks = ffg->topological_sort(used_blocks); + gr_block_vector_t blocks = gr_flat_flowgraph::make_block_vector(used_blocks); + + // Ensure that the done flag is clear on all blocks + + for (size_t i = 0; i < blocks.size(); i++){ + blocks[i]->detail()->set_done(false); + } + + // Fire off a thead for each block + + for (size_t i = 0; i < blocks.size(); i++){ + std::stringstream name; + name << "thread-per-block[" << i << "]: " << blocks[i]; + d_threads.create_thread( + gruel::thread_body_wrapper<tpb_container>(tpb_container(blocks[i]), name.str())); + } +} + +gr_scheduler_tpb::~gr_scheduler_tpb() +{ + stop(); +} + +void +gr_scheduler_tpb::stop() +{ + d_threads.interrupt_all(); +} + +void +gr_scheduler_tpb::wait() +{ + d_threads.join_all(); +} diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h new file mode 100644 index 000000000..16a0c0204 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h @@ -0,0 +1,60 @@ +/* -*- c++ -*- */ +/* + * Copyright 2008 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#ifndef INCLUDED_GR_SCHEDULER_TPB_H +#define INCLUDED_GR_SCHEDULER_TPB_H + +#include <gr_scheduler.h> +#include <gruel/thread_group.h> + +/*! + * \brief Concrete scheduler that uses a kernel thread-per-block + */ +class gr_scheduler_tpb : public gr_scheduler +{ + gruel::thread_group d_threads; + +protected: + /*! + * \brief Construct a scheduler and begin evaluating the graph. + * + * The scheduler will continue running until all blocks until they + * report that they are done or the stop method is called. + */ + gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg); + +public: + static gr_scheduler_sptr make(gr_flat_flowgraph_sptr ffg); + + ~gr_scheduler_tpb(); + + /*! + * \brief Tell the scheduler to stop executing. + */ + void stop(); + + /*! + * \brief Block until the graph is done. + */ + void wait(); +}; + + +#endif /* INCLUDED_GR_SCHEDULER_TPB_H */ diff --git a/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc b/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc index b2fbdb73b..7f1b40641 100644 --- a/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc +++ b/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc @@ -28,6 +28,7 @@ #include <gr_block.h> #include <gr_block_detail.h> #include <gr_buffer.h> +#include <boost/thread.hpp> #include <iostream> #include <limits> #include <assert.h> @@ -44,14 +45,6 @@ static int which_scheduler = 0; - -std::ostream& -operator << (std::ostream& os, const gr_block *m) -{ - os << "<gr_block " << m->name() << " (" << m->unique_id() << ")>"; - return os; -} - gr_single_threaded_scheduler_sptr gr_make_single_threaded_scheduler (const std::vector<gr_block_sptr> &blocks) { @@ -162,6 +155,9 @@ gr_single_threaded_scheduler::main_loop () nalive = d_blocks.size (); while (d_enabled && nalive > 0){ + if (boost::this_thread::interruption_requested()) + break; + gr_block *m = d_blocks[bi].get (); gr_block_detail *d = m->detail().get (); diff --git a/gnuradio-core/src/lib/runtime/gr_top_block.cc b/gnuradio-core/src/lib/runtime/gr_top_block.cc index 3c8e28f70..09e46dfbb 100644 --- a/gnuradio-core/src/lib/runtime/gr_top_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_top_block.cc @@ -27,7 +27,6 @@ #include <unistd.h> #include <gr_top_block.h> #include <gr_top_block_impl.h> -#include <gr_top_block_impl_sts.h> #include <gr_io_signature.h> #include <iostream> @@ -43,7 +42,7 @@ gr_top_block::gr_top_block(const std::string &name) gr_make_io_signature(0,0,0)) { - d_impl = new gr_top_block_impl_sts(this); + d_impl = new gr_top_block_impl(this); } gr_top_block::~gr_top_block() diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc b/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc index 591437938..50d480d00 100644 --- a/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc +++ b/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc @@ -27,21 +27,58 @@ #include <gr_top_block.h> #include <gr_top_block_impl.h> #include <gr_flat_flowgraph.h> -#include <gr_scheduler_thread.h> -#include <gr_local_sighandler.h> +#include <gr_scheduler_sts.h> +#include <gr_scheduler_tpb.h> #include <stdexcept> #include <iostream> #include <string.h> #include <unistd.h> +#include <stdlib.h> #define GR_TOP_BLOCK_IMPL_DEBUG 0 + +typedef gr_scheduler_sptr (*scheduler_maker)(gr_flat_flowgraph_sptr ffg); + +static struct scheduler_table { + const char *name; + scheduler_maker f; +} scheduler_table[] = { + { "TPB", gr_scheduler_tpb::make }, // first entry is default + { "STS", gr_scheduler_sts::make } +}; + +static gr_scheduler_sptr +make_scheduler(gr_flat_flowgraph_sptr ffg) +{ + static scheduler_maker factory = 0; + + if (factory == 0){ + char *v = getenv("GR_SCHEDULER"); + if (!v) + factory = scheduler_table[0].f; // use default + else { + for (size_t i = 0; i < sizeof(scheduler_table)/sizeof(scheduler_table[0]); i++){ + if (strcmp(v, scheduler_table[i].name) == 0){ + factory = scheduler_table[i].f; + break; + } + } + if (factory == 0){ + std::cerr << "warning: Invalid GR_SCHEDULER environment variable value \"" + << v << "\". Using \"" << scheduler_table[0].name << "\"\n"; + factory = scheduler_table[0].f; + } + } + } + return factory(ffg); +} + + gr_top_block_impl::gr_top_block_impl(gr_top_block *owner) - : d_owner(owner), - d_running(false), - d_ffg(), - d_lock_count(0) + : d_owner(owner), d_ffg(), + d_state(IDLE), d_lock_count(0) { } @@ -53,14 +90,13 @@ gr_top_block_impl::~gr_top_block_impl() void gr_top_block_impl::start() { - if (GR_TOP_BLOCK_IMPL_DEBUG) - std::cout << "start: entered " << this << std::endl; + gr_lock_guard l(d_mutex); - if (d_running) + if (d_state != IDLE) throw std::runtime_error("top_block::start: top block already running or wait() not called after previous stop()"); if (d_lock_count > 0) - throw std::runtime_error("top_block::start: can't call start with flow graph locked"); + throw std::runtime_error("top_block::start: can't start with flow graph locked"); // Create new flat flow graph by flattening hierarchy d_ffg = d_owner->flatten(); @@ -69,77 +105,71 @@ gr_top_block_impl::start() d_ffg->validate(); d_ffg->setup_connections(); - // Execute scheduler threads - start_threads(); - d_running = true; + d_scheduler = make_scheduler(d_ffg); + d_state = RUNNING; } +void +gr_top_block_impl::stop() +{ + if (d_scheduler) + d_scheduler->stop(); +} + + +void +gr_top_block_impl::wait() +{ + if (d_scheduler) + d_scheduler->wait(); + + d_state = IDLE; +} // N.B. lock() and unlock() cannot be called from a flow graph thread or // deadlock will occur when reconfiguration happens void gr_top_block_impl::lock() { - omni_mutex_lock lock(d_reconf); + gr_lock_guard lock(d_mutex); d_lock_count++; - if (GR_TOP_BLOCK_IMPL_DEBUG) - std::cout << "runtime: locked, count = " << d_lock_count << std::endl; } void gr_top_block_impl::unlock() { - omni_mutex_lock lock(d_reconf); + gr_lock_guard lock(d_mutex); + if (d_lock_count <= 0){ d_lock_count = 0; // fix it, then complain throw std::runtime_error("unpaired unlock() call"); } d_lock_count--; - if (GR_TOP_BLOCK_IMPL_DEBUG) - std::cout << "unlock: unlocked, count = " << d_lock_count << std::endl; + if (d_lock_count > 0 || d_state == IDLE) // nothing to do + return; - if (d_lock_count == 0) { - if (GR_TOP_BLOCK_IMPL_DEBUG) - std::cout << "unlock: restarting flowgraph" << std::endl; - restart(); - } + restart(); } +/* + * restart is called with d_mutex held + */ void gr_top_block_impl::restart() { - if (GR_TOP_BLOCK_IMPL_DEBUG) - std::cout << "restart: entered" << std::endl; - - if (!d_running) - return; // nothing to do - - // Stop scheduler threads and wait for completion - stop(); + stop(); // Stop scheduler and wait for completion wait(); - if (GR_TOP_BLOCK_IMPL_DEBUG) - std::cout << "restart: threads stopped" << std::endl; // Create new simple flow graph gr_flat_flowgraph_sptr new_ffg = d_owner->flatten(); new_ffg->validate(); // check consistency, sanity, etc - - if (GR_TOP_BLOCK_IMPL_DEBUG) { - std::cout << std::endl << "*** Existing flat flowgraph @" << d_ffg << ":" << std::endl; - d_ffg->dump(); - } new_ffg->merge_connections(d_ffg); // reuse buffers, etc - - if (GR_TOP_BLOCK_IMPL_DEBUG) { - std::cout << std::endl << "*** New flat flowgraph after merge @" << new_ffg << ":" << std::endl; - new_ffg->dump(); - } - d_ffg = new_ffg; - start_threads(); - d_running = true; + // Create a new scheduler to execute it + d_scheduler = make_scheduler(d_ffg); + d_state = RUNNING; } void @@ -148,14 +178,3 @@ gr_top_block_impl::dump() if (d_ffg) d_ffg->dump(); } - -gr_block_vector_t -gr_top_block_impl::make_gr_block_vector(gr_basic_block_vector_t blocks) -{ - gr_block_vector_t result; - for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) { - result.push_back(make_gr_block_sptr(*p)); - } - - return result; -} diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl.h b/gnuradio-core/src/lib/runtime/gr_top_block_impl.h index 869f788ef..35fb44ef9 100644 --- a/gnuradio-core/src/lib/runtime/gr_top_block_impl.h +++ b/gnuradio-core/src/lib/runtime/gr_top_block_impl.h @@ -23,7 +23,11 @@ #ifndef INCLUDED_GR_TOP_BLOCK_IMPL_H #define INCLUDED_GR_TOP_BLOCK_IMPL_H -#include <gr_scheduler_thread.h> +#include <gr_scheduler.h> +#include <boost/thread.hpp> + +typedef boost::mutex gr_mutex; // FIXME move these elsewhere +typedef boost::lock_guard<boost::mutex> gr_lock_guard; /*! *\brief Abstract implementation details of gr_top_block @@ -37,16 +41,16 @@ class gr_top_block_impl { public: gr_top_block_impl(gr_top_block *owner); - virtual ~gr_top_block_impl(); + ~gr_top_block_impl(); // Create and start scheduler threads - virtual void start(); + void start(); // Signal scheduler threads to stop - virtual void stop() = 0; + void stop(); // Wait for scheduler threads to exit - virtual void wait() = 0; + void wait(); // Lock the top block to allow reconfiguration void lock(); @@ -59,22 +63,16 @@ public: protected: + enum tb_state { IDLE, RUNNING }; + gr_top_block *d_owner; - bool d_running; gr_flat_flowgraph_sptr d_ffg; + gr_scheduler_sptr d_scheduler; - omni_mutex d_reconf; // protects d_lock_count + gr_mutex d_mutex; // protects d_state and d_lock_count + tb_state d_state; int d_lock_count; - - virtual void start_threads() = 0; - -/*! - * Make a vector of gr_block from a vector of gr_basic_block - * - * Pass-by-value to avoid problem with possible asynchronous modification - */ - static gr_block_vector_t make_gr_block_vector(gr_basic_block_vector_t blocks); - + private: void restart(); }; diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc b/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc deleted file mode 100644 index b3e9da627..000000000 --- a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc +++ /dev/null @@ -1,128 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2007,2008 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * GNU Radio is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3, or (at your option) - * any later version. - * - * GNU Radio is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with GNU Radio; see the file COPYING. If not, write to - * the Free Software Foundation, Inc., 51 Franklin Street, - * Boston, MA 02110-1301, USA. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include <gr_top_block.h> -#include <gr_top_block_impl_sts.h> -#include <gr_flat_flowgraph.h> -#include <gr_scheduler_thread.h> -#include <gr_local_sighandler.h> - -#include <stdexcept> -#include <iostream> -#include <string.h> -#include <unistd.h> - -#define GR_TOP_BLOCK_IMPL_STS_DEBUG 0 - -static gr_top_block_impl *s_impl = 0; - - -// FIXME: This prevents using more than one gr_top_block instance - -static void -runtime_sigint_handler(int signum) -{ - if (GR_TOP_BLOCK_IMPL_STS_DEBUG){ - char *msg = "SIGINT received, calling stop()\n"; - ::write(1, msg, strlen(msg)); // write is OK to call from signal handler - } - - if (s_impl) - s_impl->stop(); -} - -// ---------------------------------------------------------------- - -gr_top_block_impl_sts::gr_top_block_impl_sts(gr_top_block *owner) - : gr_top_block_impl(owner) -{ - if (s_impl) - throw std::logic_error("gr_top_block_impl_sts: multiple simultaneous gr_top_blocks not allowed"); - - s_impl = this; -} - -gr_top_block_impl_sts::~gr_top_block_impl_sts() -{ - s_impl = 0; // don't call delete we don't own these -} - -void -gr_top_block_impl_sts::start_threads() -{ - if (GR_TOP_BLOCK_IMPL_STS_DEBUG) - std::cout << "start_threads: entered" << std::endl; - - d_graphs = d_ffg->partition(); - for (std::vector<gr_basic_block_vector_t>::iterator p = d_graphs.begin(); - p != d_graphs.end(); p++) { - gr_scheduler_thread *thread = new gr_scheduler_thread(make_gr_block_vector(*p)); - d_threads.push_back(thread); - if (GR_TOP_BLOCK_IMPL_STS_DEBUG) - std::cout << "start_threads: starting " << thread << std::endl; - thread->start(); - } -} - -/* - * N.B. as currently implemented, it is possible that this may be - * invoked by the SIGINT handler which is fragile as hell... - */ -void -gr_top_block_impl_sts::stop() -{ - if (GR_TOP_BLOCK_IMPL_STS_DEBUG){ - char *msg = "stop: entered\n"; - ::write(1, msg, strlen(msg)); - } - - for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) { - if (*p) - (*p)->stop(); - } -} - -void -gr_top_block_impl_sts::wait() -{ - if (GR_TOP_BLOCK_IMPL_STS_DEBUG) - std::cout << "wait: entered" << std::endl; - - void *dummy_status; // don't ever dereference this - gr_local_sighandler sigint(SIGINT, runtime_sigint_handler); - - for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) { - if (GR_TOP_BLOCK_IMPL_STS_DEBUG) - std::cout << "wait: joining thread " << (*p) << std::endl; - (*p)->join(&dummy_status); // omnithreads will self-delete, so pointer is now dead - (*p) = 0; // FIXME: switch to stl::list and actually remove from container - if (GR_TOP_BLOCK_IMPL_STS_DEBUG) - std::cout << "wait: join returned" << std::endl; - } - - d_threads.clear(); - d_running = false; -} diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h b/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h deleted file mode 100644 index ec2e51cf2..000000000 --- a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h +++ /dev/null @@ -1,55 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2007 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * GNU Radio is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3, or (at your option) - * any later version. - * - * GNU Radio is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with GNU Radio; see the file COPYING. If not, write to - * the Free Software Foundation, Inc., 51 Franklin Street, - * Boston, MA 02110-1301, USA. - */ - -#ifndef INCLUDED_GR_TOP_BLOCK_IMPL_STS_H -#define INCLUDED_GR_TOP_BLOCK_IMPL_STS_H - -#include <gr_top_block_impl.h> -#include <gr_scheduler_thread.h> - -/*! - *\brief Implementation details of gr_top_block - * \ingroup internal - * - * Concrete implementation of gr_top_block using gr_single_threaded_scheduler. - */ -class gr_top_block_impl_sts : public gr_top_block_impl -{ -public: - gr_top_block_impl_sts(gr_top_block *owner); - ~gr_top_block_impl_sts(); - - // Signal scheduler threads to stop - void stop(); - - // Wait for scheduler threads to exit - void wait(); - -private: - - gr_scheduler_thread_vector_t d_threads; - std::vector<gr_basic_block_vector_t> d_graphs; - - void start_threads(); -}; - -#endif /* INCLUDED_GR_TOP_BLOCK_IMPL_STS_H */ diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc new file mode 100644 index 000000000..02e8deed8 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc @@ -0,0 +1,67 @@ +/* -*- c++ -*- */ +/* + * Copyright 2008 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <gr_tpb_detail.h> +#include <gr_block.h> +#include <gr_block_detail.h> +#include <gr_buffer.h> + +/* + * We assume that no worker threads are ever running when the + * graph structure is being manipulated, thus it's safe for us to poke + * around in our neighbors w/o holding any locks. + */ + +void +gr_tpb_detail::notify_upstream(gr_block_detail *d) +{ + // For each of our inputs, tell the guy upstream that we've consumed + // some input, and that he most likely has more output buffer space + // available. + + for (size_t i = 0; i < d->d_input.size(); i++){ + // Can you say, "pointer chasing?" + d->d_input[i]->buffer()->link()->detail()->d_tpb.set_output_changed(); + } +} + +void +gr_tpb_detail::notify_downstream(gr_block_detail *d) +{ + // For each of our outputs, tell the guys downstream that they have + // new input available. + + for (size_t i = 0; i < d->d_output.size(); i++){ + gr_buffer_sptr buf = d->d_output[i]; + for (size_t j = 0, k = buf->nreaders(); j < k; j++) + buf->reader(j)->link()->detail()->d_tpb.set_input_changed(); + } +} + +void +gr_tpb_detail::notify_neighbors(gr_block_detail *d) +{ + notify_downstream(d); + notify_upstream(d); +} diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h new file mode 100644 index 000000000..9566312dc --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h @@ -0,0 +1,81 @@ +/* -*- c++ -*- */ +/* + * Copyright 2008 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#ifndef INCLUDED_GR_TPB_DETAIL_H +#define INCLUDED_GR_TPB_DETAIL_H + +#include <boost/thread.hpp> + +class gr_block_detail; + +/*! + * \brief used by thread-per-block scheduler + */ +struct gr_tpb_detail { + typedef boost::unique_lock<boost::mutex> scoped_lock; + + boost::mutex mutex; //< protects all vars + bool input_changed; + boost::condition_variable input_cond; + bool output_changed; + boost::condition_variable output_cond; + + gr_tpb_detail() + : input_changed(false), output_changed(false) {} + + + //! Called by us to tell all our upstream blocks that their output may have changed. + void notify_upstream(gr_block_detail *d); + + //! Called by us to tell all our downstream blocks that their input may have changed. + void notify_downstream(gr_block_detail *d); + + //! Called by us to notify both upstream and downstream + void notify_neighbors(gr_block_detail *d); + + //! Called by us + void clear_changed() + { + scoped_lock guard(mutex); + input_changed = false; + output_changed = false; + } + +private: + + //! Used by notify_downstream + void set_input_changed() + { + scoped_lock guard(mutex); + input_changed = true; + input_cond.notify_one(); + } + + //! Used by notify_upstream + void set_output_changed() + { + scoped_lock guard(mutex); + output_changed = true; + output_cond.notify_one(); + } + +}; + +#endif /* INCLUDED_GR_TPB_DETAIL_H */ diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc new file mode 100644 index 000000000..f61e17243 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc @@ -0,0 +1,76 @@ +/* -*- c++ -*- */ +/* + * Copyright 2008 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <gr_tpb_thread_body.h> +#include <iostream> + +gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block) + : d_exec(block) +{ + // std::cerr << "gr_tpb_thread_body: " << block << std::endl; + + gr_block_detail *d = block->detail().get(); + gr_block_executor::state s; + + while (1){ + d->d_tpb.clear_changed(); + s = d_exec.run_one_iteration(); + + switch(s){ + case gr_block_executor::READY: // Tell neighbors we made progress. + d->d_tpb.notify_neighbors(d); + break; + + case gr_block_executor::READY_NO_OUTPUT: // Notify upstream only + d->d_tpb.notify_upstream(d); + break; + + case gr_block_executor::DONE: // Game over. + d->d_tpb.notify_neighbors(d); + return; + + case gr_block_executor::BLKD_IN: // Wait for input. + { + gr_tpb_detail::scoped_lock guard(d->d_tpb.mutex); + while(!d->d_tpb.input_changed) + d->d_tpb.input_cond.wait(guard); + } + break; + + case gr_block_executor::BLKD_OUT: // Wait for output buffer space. + { + gr_tpb_detail::scoped_lock guard(d->d_tpb.mutex); + while(!d->d_tpb.output_changed) + d->d_tpb.output_cond.wait(guard); + } + break; + + default: + assert(0); + } + } +} + +gr_tpb_thread_body::~gr_tpb_thread_body() +{ +} diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h new file mode 100644 index 000000000..a630b1be9 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h @@ -0,0 +1,45 @@ +/* -*- c++ -*- */ +/* + * Copyright 2008 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#ifndef INCLUDED_GR_TPB_THREAD_BODY_H +#define INCLUDED_GR_TPB_THREAD_BODY_H + +#include <gr_block_executor.h> +#include <gr_block.h> +#include <gr_block_detail.h> + +/*! + * \brief The body of each thread-per-block thread. + * + * One of these is instantiated in its own thread for each block. The + * constructor turns into the main loop which returns when the block is + * done or is interrupted. + */ + +class gr_tpb_thread_body { + gr_block_executor d_exec; + +public: + gr_tpb_thread_body(gr_block_sptr block); + ~gr_tpb_thread_body(); +}; + + +#endif /* INCLUDED_GR_TPB_THREAD_BODY_H */ diff --git a/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc b/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc index ad40f724d..7434cf657 100644 --- a/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc +++ b/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc @@ -52,7 +52,7 @@ t0_body () int nitems = 4000 / sizeof (int); int counter = 0; - gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int))); + gr_buffer_sptr buf(gr_make_buffer(nitems, sizeof (int), gr_block_sptr())); int last_sa; int sa; @@ -87,8 +87,8 @@ t1_body () int write_counter = 0; int read_counter = 0; - gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int))); - gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0)); + gr_buffer_sptr buf(gr_make_buffer(nitems, sizeof (int), gr_block_sptr())); + gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0, gr_block_sptr())); int sa; @@ -162,8 +162,8 @@ t2_body () int nitems = (64 * (1L << 10)) / sizeof (int); // 64K worth of ints - gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int))); - gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0)); + gr_buffer_sptr buf(gr_make_buffer (nitems, sizeof (int), gr_block_sptr())); + gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0, gr_block_sptr())); int read_counter = 0; int write_counter = 0; @@ -229,7 +229,7 @@ t3_body () int nitems = (64 * (1L << 10)) / sizeof (int); static const int N = 5; - gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int))); + gr_buffer_sptr buf(gr_make_buffer(nitems, sizeof (int), gr_block_sptr())); gr_buffer_reader_sptr reader[N]; int read_counter[N]; int write_counter = 0; @@ -237,7 +237,7 @@ t3_body () for (int i = 0; i < N; i++){ read_counter[i] = 0; - reader[i] = gr_buffer_add_reader (buf, 0); + reader[i] = gr_buffer_add_reader (buf, 0, gr_block_sptr()); } for (int lc = 0; lc < 1000; lc++){ |