diff options
author | Josh Blum | 2012-09-19 18:14:23 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-19 18:14:23 -0700 |
commit | d3d7cd4a0a5ce7704e5bdab5871ce516870f509d (patch) | |
tree | 8f5aea4b53955d084483729b2859e79dc0888543 /lib | |
parent | e6020edf49b81dca6fcf3bd1eafbb2ea0415e2a2 (diff) | |
download | sandhi-d3d7cd4a0a5ce7704e5bdab5871ce516870f509d.tar.gz sandhi-d3d7cd4a0a5ce7704e5bdab5871ce516870f509d.tar.bz2 sandhi-d3d7cd4a0a5ce7704e5bdab5871ce516870f509d.zip |
cleanup input queues, and do /smart/ output flush logic
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block_task.cpp | 9 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 89 |
2 files changed, 25 insertions, 73 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 8f70daa..5214e9c 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -228,7 +228,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //------------------------------------------------------------------ //-- process input consumption //------------------------------------------------------------------ - bool input_allows_flush = true; for (size_t i = 0; i < num_inputs; i++) { ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE); @@ -237,7 +236,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; - input_allows_flush = input_allows_flush and this->input_queues.consume(i, bytes); + this->input_queues.consume(i, bytes); } //------------------------------------------------------------------ @@ -254,10 +253,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) const size_t bytes = items*this->output_items_sizes[i]; buff.length += bytes; - //only pass output buffer downstream when the input is fully consumed... - //Reasoning: For the sake of dealling with history, we can process the mini history input buffer, - //and then call work again on the real input buffer, but still yield one output buffer per input buffer. - if (input_allows_flush) + //dont always pass output buffers downstream for the sake of efficiency + if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length()) { task_iface.post_downstream(i, buff); this->output_queues.pop(i); diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index f205edf..df670c4 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -42,30 +42,11 @@ struct InputBufferQueues const std::vector<size_t> &input_item_sizes ); - /*! - * Rules for front: - * - * If we are within the mini history buffer, - * memcpy post bytes from the head of the input buffer. - * The caller must chew through the mini history buffer - * until offset bytes passes the history requirement. - * - * Otherwise, resolve pointers to the input buffer, - * moving the memory and length by num history bytes. - */ + //! Call to get an input buffer for work SBuffer front(const size_t i, bool &potential_inline); - /*! - * Rules for consume: - * - * If we were operating in a mini history buffer, do nothing. - * Otherwise, check if the input buffer was entirely consumed. - * If so, pop the input buffer, copy the tail end of the buffer - * into the mini history buffer, and reset the offset condition. - * - * \return true if the input allows output flushing - */ - bool consume(const size_t i, const size_t bytes_consumed); + //! Call when input bytes consumed by work + void consume(const size_t i, const size_t bytes_consumed); void resize(const size_t size); @@ -124,7 +105,6 @@ struct InputBufferQueues std::vector<size_t> _multiple_bytes; std::vector<size_t> _post_bytes; std::vector<boost::shared_ptr<BufferQueue> > _aux_queues; - std::vector<bool> _in_hist_buff; }; @@ -138,7 +118,14 @@ inline void InputBufferQueues::resize(const size_t size) _multiple_bytes.resize(size, 0); _post_bytes.resize(size, 0); _aux_queues.resize(size); - _in_hist_buff.resize(size, false); +} + +static size_t round_up_to_multiple(const size_t at_least, const size_t multiple) +{ + size_t result = (multiple*at_least)/multiple; + while (result < at_least) result += multiple; + ASSERT((multiple*result)/multiple == result); + return result; } @@ -149,7 +136,7 @@ inline void InputBufferQueues::init( ){ if (this->size() == 0) return; - //const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end()); + const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end()); for (size_t i = 0; i < this->size(); i++) { @@ -166,19 +153,19 @@ inline void InputBufferQueues::init( _multiple_bytes[i] = std::max(size_t(1), _multiple_bytes[i]); //calculate the input multiple aka reserve size - _reserve_bytes[i] = _multiple_bytes[i]; - while (_reserve_bytes[i] < (_history_bytes[i] + _multiple_bytes[i])) - { - _reserve_bytes[i] += _multiple_bytes[i]; - } + _reserve_bytes[i] = round_up_to_multiple( + _history_bytes[i] + _multiple_bytes[i], + _multiple_bytes[i] + ); //post bytes are the desired buffer size to escape the edge case - //_post_bytes[i] = input_item_sizes[i]*max_history_items; - //_post_bytes[i] = std::max(_post_bytes[i], _reserve_bytes[i]); - //_post_bytes[i] += _reserve_bytes[i]; //pad for round down issues + _post_bytes[i] = round_up_to_multiple( + input_item_sizes[i]*max_history_items + _reserve_bytes[i], + _multiple_bytes[i] + ); //allocate mini buffers for history edge conditions - size_t num_bytes = (1 << 17);//_post_bytes[i]; + size_t num_bytes = _post_bytes[i]; _aux_queues[i]->allocate_one(num_bytes); _aux_queues[i]->allocate_one(num_bytes); @@ -194,8 +181,6 @@ inline void InputBufferQueues::init( buff.length = delta; this->push(i, buff); - //_queues[i].push_front(buff); - //_in_hist_buff[i] = true; } if (_history_bytes[i] < old_history) { @@ -221,15 +206,9 @@ inline SBuffer InputBufferQueues::front(const size_t i, bool &potential_inline) //same buffer, different offset and length SBuffer buff = front; - /*VAR(buff.length); - VAR(buff.offset);*/ buff.length -= _history_bytes[i]; buff.length /= _multiple_bytes[i]; buff.length *= _multiple_bytes[i]; - /*VAR(_reserve_bytes[i]); - VAR(_history_bytes[i]); - VAR(_multiple_bytes[i]); - VAR(buff.length);*/ //set the flag that this buffer *might* be inlined as an output buffer potential_inline = unique and (buff.length == front.length); @@ -245,11 +224,6 @@ inline void InputBufferQueues::__prepare(const size_t i) while (_queues[i].front().length < _reserve_bytes[i]) { - /*HERE(); - VAR(_queues[i].front().length); - VAR(_reserve_bytes[i]); - VAR(_history_bytes[i]); - */ SBuffer &front = _queues[i].front(); SBuffer dst; @@ -268,7 +242,6 @@ inline void InputBufferQueues::__prepare(const size_t i) _aux_queues[i]->pop(); dst.offset = 0; dst.length = 0; - //_in_hist_buff[i] = true; } SBuffer src = _queues[i].front(); @@ -292,11 +265,9 @@ inline void InputBufferQueues::__prepare(const size_t i) } -inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consumed) +inline void InputBufferQueues::consume(const size_t i, const size_t bytes_consumed) { //if (bytes_consumed == 0) return true; - //HERE(); - //VAR(bytes_consumed); //assert that we dont consume past the bounds of the buffer ASSERT(_queues[i].front().length >= bytes_consumed); @@ -310,27 +281,11 @@ inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consum { _queues[i].pop_front(); } -/* - //otherwise, see if this is a mini history buff we can pop - else if (_in_hist_buff[i] and _queues[i].front().length >= 2*_history_bytes[i]) - { - const size_t residual = _queues[i].front().length; - _queues[i].pop_front(); - _in_hist_buff[i] = false; - ASSERT(not _queues[i].empty()); - ASSERT(_queues[i].front().offset > residual); - _queues[i].front().offset -= residual; - _queues[i].front().length += residual; - ASSERT(_queues[i].front().offset >= _history_bytes[i]); - } - */ //update the number of bytes in this queue ASSERT(_enqueued_bytes[i] >= bytes_consumed); _enqueued_bytes[i] -= bytes_consumed; __update(i); - - return not _in_hist_buff[i]; } } //namespace gnuradio |