summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp24
1 files changed, 17 insertions, 7 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 59be851..e436bc1 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -120,8 +120,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
ASSERT(this->output_queues.ready(i));
const tsbe::Buffer &buff = this->output_queues.front(i);
- char *mem = ((char *)buff.get_memory());
- const size_t bytes = buff.get_length();
+ char *mem = ((char *)buff.get_memory()) + this->output_bytes_offset[i];
+ const size_t bytes = buff.get_length() - this->output_bytes_offset[i];
const size_t items = bytes/this->output_items_sizes[i];
this->work_io_ptr_mask |= ptrdiff_t(mem);
@@ -162,6 +162,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//------------------------------------------------------------------
//-- process input consumption
//------------------------------------------------------------------
+ bool input_fully_consumed = true;
for (size_t i = 0; i < num_inputs; i++)
{
ASSERT(enable_fixed_rate or ret != Block::WORK_CALLED_PRODUCE);
@@ -170,7 +171,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
- this->input_queues.pop(i, bytes);
+ input_fully_consumed = input_fully_consumed and this->input_queues.pop(i, bytes);
}
//------------------------------------------------------------------
@@ -183,10 +184,19 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->items_produced[i] += items;
const size_t bytes = items*this->output_items_sizes[i];
- tsbe::Buffer &buff = this->output_queues.front(i);
- buff.get_length() = bytes;
- task_iface.post_downstream(i, buff);
- this->output_queues.pop(i);
+ this->output_bytes_offset[i] += bytes;
+
+ //only pass output buffer downstream when the input is fully consumed...
+ //Reasoning: For the sake of dealling with history, we can process the mini history input buffer,
+ //and then call work again on the real input buffer, but still yield one output buffer per input buffer.
+ if (input_fully_consumed)
+ {
+ tsbe::Buffer &buff = this->output_queues.front(i);
+ buff.get_length() = this->output_bytes_offset[i];
+ task_iface.post_downstream(i, buff);
+ this->output_queues.pop(i);
+ this->output_bytes_offset[i] = 0;
+ }
}
//------------------------------------------------------------------