path: root/lib
diff options
authorJosh Blum2012-09-19 18:14:23 -0700
committerJosh Blum2012-09-19 18:14:23 -0700
commitd3d7cd4a0a5ce7704e5bdab5871ce516870f509d (patch)
tree8f5aea4b53955d084483729b2859e79dc0888543 /lib
parente6020edf49b81dca6fcf3bd1eafbb2ea0415e2a2 (diff)
cleanup input queues, and do /smart/ output flush logic
Diffstat (limited to 'lib')
2 files changed, 25 insertions, 73 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 8f70daa..5214e9c 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -228,7 +228,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//-- process input consumption
- bool input_allows_flush = true;
for (size_t i = 0; i < num_inputs; i++)
ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE);
@@ -237,7 +236,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
- input_allows_flush = input_allows_flush and this->input_queues.consume(i, bytes);
+ this->input_queues.consume(i, bytes);
@@ -254,10 +253,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
const size_t bytes = items*this->output_items_sizes[i];
buff.length += bytes;
- //only pass output buffer downstream when the input is fully consumed...
- //Reasoning: For the sake of dealling with history, we can process the mini history input buffer,
- //and then call work again on the real input buffer, but still yield one output buffer per input buffer.
- if (input_allows_flush)
+ //dont always pass output buffers downstream for the sake of efficiency
+ if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length())
task_iface.post_downstream(i, buff);
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index f205edf..df670c4 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -42,30 +42,11 @@ struct InputBufferQueues
const std::vector<size_t> &input_item_sizes
- /*!
- * Rules for front:
- *
- * If we are within the mini history buffer,
- * memcpy post bytes from the head of the input buffer.
- * The caller must chew through the mini history buffer
- * until offset bytes passes the history requirement.
- *
- * Otherwise, resolve pointers to the input buffer,
- * moving the memory and length by num history bytes.
- */
+ //! Call to get an input buffer for work
SBuffer front(const size_t i, bool &potential_inline);
- /*!
- * Rules for consume:
- *
- * If we were operating in a mini history buffer, do nothing.
- * Otherwise, check if the input buffer was entirely consumed.
- * If so, pop the input buffer, copy the tail end of the buffer
- * into the mini history buffer, and reset the offset condition.
- *
- * \return true if the input allows output flushing
- */
- bool consume(const size_t i, const size_t bytes_consumed);
+ //! Call when input bytes consumed by work
+ void consume(const size_t i, const size_t bytes_consumed);
void resize(const size_t size);
@@ -124,7 +105,6 @@ struct InputBufferQueues
std::vector<size_t> _multiple_bytes;
std::vector<size_t> _post_bytes;
std::vector<boost::shared_ptr<BufferQueue> > _aux_queues;
- std::vector<bool> _in_hist_buff;
@@ -138,7 +118,14 @@ inline void InputBufferQueues::resize(const size_t size)
_multiple_bytes.resize(size, 0);
_post_bytes.resize(size, 0);
- _in_hist_buff.resize(size, false);
+static size_t round_up_to_multiple(const size_t at_least, const size_t multiple)
+ size_t result = (multiple*at_least)/multiple;
+ while (result < at_least) result += multiple;
+ ASSERT((multiple*result)/multiple == result);
+ return result;
@@ -149,7 +136,7 @@ inline void InputBufferQueues::init(
if (this->size() == 0) return;
- //const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end());
+ const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end());
for (size_t i = 0; i < this->size(); i++)
@@ -166,19 +153,19 @@ inline void InputBufferQueues::init(
_multiple_bytes[i] = std::max(size_t(1), _multiple_bytes[i]);
//calculate the input multiple aka reserve size
- _reserve_bytes[i] = _multiple_bytes[i];
- while (_reserve_bytes[i] < (_history_bytes[i] + _multiple_bytes[i]))
- {
- _reserve_bytes[i] += _multiple_bytes[i];
- }
+ _reserve_bytes[i] = round_up_to_multiple(
+ _history_bytes[i] + _multiple_bytes[i],
+ _multiple_bytes[i]
+ );
//post bytes are the desired buffer size to escape the edge case
- //_post_bytes[i] = input_item_sizes[i]*max_history_items;
- //_post_bytes[i] = std::max(_post_bytes[i], _reserve_bytes[i]);
- //_post_bytes[i] += _reserve_bytes[i]; //pad for round down issues
+ _post_bytes[i] = round_up_to_multiple(
+ input_item_sizes[i]*max_history_items + _reserve_bytes[i],
+ _multiple_bytes[i]
+ );
//allocate mini buffers for history edge conditions
- size_t num_bytes = (1 << 17);//_post_bytes[i];
+ size_t num_bytes = _post_bytes[i];
@@ -194,8 +181,6 @@ inline void InputBufferQueues::init(
buff.length = delta;
this->push(i, buff);
- //_queues[i].push_front(buff);
- //_in_hist_buff[i] = true;
if (_history_bytes[i] < old_history)
@@ -221,15 +206,9 @@ inline SBuffer InputBufferQueues::front(const size_t i, bool &potential_inline)
//same buffer, different offset and length
SBuffer buff = front;
- /*VAR(buff.length);
- VAR(buff.offset);*/
buff.length -= _history_bytes[i];
buff.length /= _multiple_bytes[i];
buff.length *= _multiple_bytes[i];
- /*VAR(_reserve_bytes[i]);
- VAR(_history_bytes[i]);
- VAR(_multiple_bytes[i]);
- VAR(buff.length);*/
//set the flag that this buffer *might* be inlined as an output buffer
potential_inline = unique and (buff.length == front.length);
@@ -245,11 +224,6 @@ inline void InputBufferQueues::__prepare(const size_t i)
while (_queues[i].front().length < _reserve_bytes[i])
- /*HERE();
- VAR(_queues[i].front().length);
- VAR(_reserve_bytes[i]);
- VAR(_history_bytes[i]);
- */
SBuffer &front = _queues[i].front();
SBuffer dst;
@@ -268,7 +242,6 @@ inline void InputBufferQueues::__prepare(const size_t i)
dst.offset = 0;
dst.length = 0;
- //_in_hist_buff[i] = true;
SBuffer src = _queues[i].front();
@@ -292,11 +265,9 @@ inline void InputBufferQueues::__prepare(const size_t i)
-inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consumed)
+inline void InputBufferQueues::consume(const size_t i, const size_t bytes_consumed)
//if (bytes_consumed == 0) return true;
- //HERE();
- //VAR(bytes_consumed);
//assert that we dont consume past the bounds of the buffer
ASSERT(_queues[i].front().length >= bytes_consumed);
@@ -310,27 +281,11 @@ inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consum
- //otherwise, see if this is a mini history buff we can pop
- else if (_in_hist_buff[i] and _queues[i].front().length >= 2*_history_bytes[i])
- {
- const size_t residual = _queues[i].front().length;
- _queues[i].pop_front();
- _in_hist_buff[i] = false;
- ASSERT(not _queues[i].empty());
- ASSERT(_queues[i].front().offset > residual);
- _queues[i].front().offset -= residual;
- _queues[i].front().length += residual;
- ASSERT(_queues[i].front().offset >= _history_bytes[i]);
- }
- */
//update the number of bytes in this queue
ASSERT(_enqueued_bytes[i] >= bytes_consumed);
_enqueued_bytes[i] -= bytes_consumed;
- return not _in_hist_buff[i];
} //namespace gnuradio