diff options
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 24 |
1 files changed, 17 insertions, 7 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 59be851..e436bc1 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -120,8 +120,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) ASSERT(this->output_queues.ready(i)); const tsbe::Buffer &buff = this->output_queues.front(i); - char *mem = ((char *)buff.get_memory()); - const size_t bytes = buff.get_length(); + char *mem = ((char *)buff.get_memory()) + this->output_bytes_offset[i]; + const size_t bytes = buff.get_length() - this->output_bytes_offset[i]; const size_t items = bytes/this->output_items_sizes[i]; this->work_io_ptr_mask |= ptrdiff_t(mem); @@ -162,6 +162,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //------------------------------------------------------------------ //-- process input consumption //------------------------------------------------------------------ + bool input_fully_consumed = true; for (size_t i = 0; i < num_inputs; i++) { ASSERT(enable_fixed_rate or ret != Block::WORK_CALLED_PRODUCE); @@ -170,7 +171,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; - this->input_queues.pop(i, bytes); + input_fully_consumed = input_fully_consumed and this->input_queues.pop(i, bytes); } //------------------------------------------------------------------ @@ -183,10 +184,19 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_produced[i] += items; const size_t bytes = items*this->output_items_sizes[i]; - tsbe::Buffer &buff = this->output_queues.front(i); - buff.get_length() = bytes; - task_iface.post_downstream(i, buff); - this->output_queues.pop(i); + this->output_bytes_offset[i] += bytes; + + //only pass output buffer downstream when the input is fully consumed... + //Reasoning: For the sake of dealling with history, we can process the mini history input buffer, + //and then call work again on the real input buffer, but still yield one output buffer per input buffer. + if (input_fully_consumed) + { + tsbe::Buffer &buff = this->output_queues.front(i); + buff.get_length() = this->output_bytes_offset[i]; + task_iface.post_downstream(i, buff); + this->output_queues.pop(i); + this->output_bytes_offset[i] = 0; + } } //------------------------------------------------------------------ |