diff options
-rw-r--r-- | include/gnuradio/block.hpp | 11 | ||||
-rw-r--r-- | include/gnuradio/gr_block.h | 7 | ||||
-rw-r--r-- | lib/block.cpp | 12 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 1 | ||||
-rw-r--r-- | lib/block_task.cpp | 8 | ||||
-rw-r--r-- | lib/element_impl.hpp | 1 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 46 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 28 | ||||
-rw-r--r-- | swig/runtime.i | 2 |
9 files changed, 98 insertions, 18 deletions
diff --git a/include/gnuradio/block.hpp b/include/gnuradio/block.hpp index cfaa00b..459e25b 100644 --- a/include/gnuradio/block.hpp +++ b/include/gnuradio/block.hpp @@ -74,9 +74,16 @@ struct GRAS_API Block : Element * Basic routines from basic block ******************************************************************/ - size_t history(const size_t which_input = 0) const; + //! Get the number of history items (default 0) + size_t input_history(const size_t which_input = 0) const; - void set_history(const size_t history, const size_t which_input = 0); + /*! + * Set the number of items that will be saved from the previous run. + * Input buffers will begin with an overlap of the previous's buffer's + * num history items. This is used to implement sample memory for + * things like sliding dot products/FIR filters. + */ + void set_input_history(const size_t history, const size_t which_input = 0); void set_output_multiple(const size_t multiple, const size_t which_output = 0); diff --git a/include/gnuradio/gr_block.h b/include/gnuradio/gr_block.h index 2bbbf3a..1abeab6 100644 --- a/include/gnuradio/gr_block.h +++ b/include/gnuradio/gr_block.h @@ -70,12 +70,15 @@ struct GRAS_API gr_block : gnuradio::Block unsigned history(void) const { - return gnuradio::Block::history(); + //implement off-by-one history compat + return this->input_history()+1; } void set_history(unsigned history) { - gnuradio::Block::set_history(history); + //implement off-by-one history compat + if (history == 0) history++; + this->set_input_history(history-1); } void set_alignment(const size_t alignment); diff --git a/lib/block.cpp b/lib/block.cpp index 39939cb..406e16b 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -28,7 +28,7 @@ Block::Block(void) Block::Block(const std::string &name): Element(name) { - this->set_history(0); + this->set_input_history(0); this->set_output_multiple(1); this->set_fixed_rate(true); this->set_relative_rate(1.0); @@ -67,15 +67,13 @@ typename V::value_type vector_get(const V &v, const size_t index) return v[index]; } -size_t Block::history(const size_t which_input) const +size_t Block::input_history(const size_t which_input) const { return vector_get((*this)->input_history_items, which_input); } -void Block::set_history(const size_t history_, const size_t which_input) +void Block::set_input_history(const size_t history, const size_t which_input) { - //FIXME why is history - 1 (gnuradio loves this) - const size_t history = (history_ == 0)? 0 : history_-1; vector_set((*this)->input_history_items, history, which_input); } @@ -92,6 +90,7 @@ void Block::set_output_multiple(const size_t multiple, const size_t which_output void Block::consume(const size_t which_input, const size_t how_many_items) { (*this)->consume_items[which_input] = how_many_items; + (*this)->consume_called[which_input] = true; } void Block::consume_each(const size_t how_many_items) @@ -99,6 +98,7 @@ void Block::consume_each(const size_t how_many_items) for (size_t i = 0; i < (*this)->consume_items.size(); i++) { (*this)->consume_items[i] = how_many_items; + (*this)->consume_called[i] = true; } } @@ -114,7 +114,7 @@ void Block::set_input_inline(const size_t which_input, const bool enb) bool Block::input_inline(const size_t which_input) const { - return (*this)->input_inline_enables[which_input]; + return vector_get((*this)->input_inline_enables, which_input); } void Block::set_fixed_rate(const bool fixed_rate) diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 2cb167f..243d158 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -166,6 +166,7 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) this->input_items.resize(num_inputs); this->output_items.resize(num_outputs); this->consume_items.resize(num_inputs, 0); + this->consume_called.resize(num_inputs, false); this->produce_items.resize(num_outputs, 0); this->input_queues.resize(num_inputs); this->output_queues.resize(num_outputs); diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 114c073..a9b88bd 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -73,6 +73,10 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) { + #ifdef WORK_DEBUG + WorkDebugPrinter(this->name); + #endif + //------------------------------------------------------------------ //-- Decide if its possible to continue any processing: //-- Handle task may get called for incoming buffers, @@ -83,7 +87,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->input_queues.all_ready() and this->output_queues.all_ready() )) return; - if (WORK) std::cout << "=== calling work on " << name << " ===" << std::endl; const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); @@ -124,6 +127,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->work_input_items[i] = mem; this->work_ninput_items[i] = items; num_input_items = std::min(num_input_items, items); + this->consume_called[i] = false; //inline dealings, how and when input buffers can be inlined into output buffers //TODO, check that the buff.get_affinity() matches this block or we dont inline @@ -223,7 +227,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) for (size_t i = 0; i < num_inputs; i++) { ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE); - const size_t items = (enable_fixed_rate)? (myulround((noutput_items/this->relative_rate))) : this->consume_items[i]; + const size_t items = (this->consume_called[i])? this->consume_items[i] : (myulround((noutput_items/this->relative_rate))); this->consume_items[i] = 0; this->items_consumed[i] += items; diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 556fbd1..0ec4650 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -87,6 +87,7 @@ struct ElementImpl //track work's calls to produce and consume std::vector<size_t> produce_items; std::vector<size_t> consume_items; + std::vector<bool> consume_called; //track the subscriber counts std::vector<Token> input_tokens; diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index c235c1f..69f7b4a 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -20,14 +20,28 @@ #include <iostream> #include <stdexcept> +//---------------------------------------------------------------------- +//-- set to 1 to enable these debugs: +//---------------------------------------------------------------------- #define GENESIS 0 #define ARMAGEDDON 0 #define MESSAGE 0 -#define WORK 0 -#define ASSERTING 0 +//---------------------------------------------------------------------- +//-- define to enable these debugs: +//---------------------------------------------------------------------- +//#define WORK_DEBUG +#define ASSERTING + +//---------------------------------------------------------------------- +//-- various debug prints +//---------------------------------------------------------------------- #define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush; #define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; + +//---------------------------------------------------------------------- +//-- implementation for assert debug +//---------------------------------------------------------------------- #ifdef ASSERTING #define ASSERT(x) {if(not (x)) \ { \ @@ -38,4 +52,32 @@ #define ASSERT(x) #endif +//---------------------------------------------------------------------- +//-- implementation for work debug +//---------------------------------------------------------------------- +#ifdef WORK_DEBUG + +#include <boost/thread/mutex.hpp> + +static boost::mutex work_debug_mutex; + +struct WorkDebugPrinter +{ + WorkDebugPrinter(const std::string &name): + lock(work_debug_mutex), name(name) + { + std::cout << "-----> begin work on " << name << std::endl; + } + + ~WorkDebugPrinter(void) + { + std::cout << "<----- end work on " << name << std::endl; + } + + boost::mutex::scoped_lock lock; + std::string name; +}; + +#endif + #endif /*INCLUDED_LIBGRAS_IMPL_DEBUG_HPP*/ diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 2188509..9e7bf63 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -124,6 +124,7 @@ struct InputBufferQueues std::vector<size_t> _multiple_bytes; std::vector<size_t> _post_bytes; std::vector<boost::shared_ptr<BufferQueue> > _aux_queues; + std::vector<bool> _in_hist_buff; }; @@ -137,6 +138,7 @@ inline void InputBufferQueues::resize(const size_t size) _multiple_bytes.resize(size, 0); _post_bytes.resize(size, 0); _aux_queues.resize(size); + _in_hist_buff.resize(size, false); } @@ -157,10 +159,15 @@ inline void InputBufferQueues::init( //determine byte sizes for buffers and dealing with history _history_bytes[i] = input_item_sizes[i]*input_history_items[i]; + + //calculate the input multiple aka reserve size _reserve_bytes[i] = input_item_sizes[i]*input_multiple_items[i]; _multiple_bytes[i] = std::max(size_t(1), _reserve_bytes[i]); + + //post bytes are the desired buffer size to escape the edge case _post_bytes[i] = input_item_sizes[i]*max_history_items; _post_bytes[i] = std::max(_post_bytes[i], _reserve_bytes[i]); + _post_bytes[i] += input_item_sizes[i]; //pad for round down issues //allocate mini buffers for history edge conditions size_t num_bytes = _history_bytes[i] + _post_bytes[i]; @@ -179,6 +186,7 @@ inline void InputBufferQueues::init( buff.length = 0; _queues[i].push_front(buff); + _in_hist_buff[i] = true; } } } @@ -191,6 +199,7 @@ inline SBuffer InputBufferQueues::front(const size_t i, bool &potential_inline) ASSERT(not _queues[i].empty()); ASSERT(this->ready(i)); __prepare(i); + ASSERT(_queues[i].front().offset >= _history_bytes[i]); SBuffer &front = _queues[i].front(); const bool unique = front.unique(); @@ -233,6 +242,7 @@ inline void InputBufferQueues::__prepare(const size_t i) hist_bytes = _history_bytes[i]; dst.offset = hist_bytes; dst.length = 0; + _in_hist_buff[i] = true; } SBuffer src = _queues[i].front(); @@ -263,9 +273,6 @@ inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consum //assert that we dont consume past the bounds of the buffer ASSERT(_queues[i].front().length >= bytes_consumed); - //this is an optimization - const bool minibuff = (_history_bytes[i] != 0) and (_queues[i].front().offset == _history_bytes[i]) and (bytes_consumed == _post_bytes[i]); - //update bounds on the current buffer _queues[i].front().offset += bytes_consumed; _queues[i].front().length -= bytes_consumed; @@ -276,12 +283,25 @@ inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consum _queues[i].pop_front(); } + //otherwise, see if this is a mini history buff we can pop + else if (_in_hist_buff[i] and _queues[i].front().offset >= 2*_history_bytes[i]) + { + const size_t residual = _queues[i].front().length; + _queues[i].pop_front(); + _in_hist_buff[i] = false; + ASSERT(not _queues[i].empty()); + ASSERT(_queues[i].front().offset > residual); + _queues[i].front().offset -= residual; + _queues[i].front().length += residual; + ASSERT(_queues[i].front().offset >= _history_bytes[i]); + } + //update the number of bytes in this queue ASSERT(_enqueued_bytes[i] >= bytes_consumed); _enqueued_bytes[i] -= bytes_consumed; __update(i); - return not minibuff; //not true on minibuff + return not _in_hist_buff[i]; } } //namespace gnuradio diff --git a/swig/runtime.i b/swig/runtime.i index e41f711..8f56b9e 100644 --- a/swig/runtime.i +++ b/swig/runtime.i @@ -57,7 +57,9 @@ namespace gnuradio { struct Block : Element{}; + struct HierBlock : Element{}; } +struct gr_hier_block2 : gnuradio::HierBlock{}; struct gr_block : gnuradio::Block{}; struct gr_sync_block : gr_block{}; struct gr_sync_interpolator : gr_sync_block{}; |