diff options
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 16 |
1 files changed, 6 insertions, 10 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 4acc1b9..3843b28 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -29,13 +29,11 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) //flush partial output buffers to the downstream for (size_t i = 0; i < task_iface.get_num_outputs(); i++) { - if (this->output_bytes_offset[i] == 0) continue; - ASSERT(this->output_queues.ready(i)); + if (not this->output_queues.ready(i)) continue; SBuffer &buff = this->output_queues.front(i); - buff.length = this->output_bytes_offset[i]; + if (buff.length == 0) continue; task_iface.post_downstream(i, buff); this->output_queues.pop(i); - this->output_bytes_offset[i] = 0; } //mark down the new state @@ -134,8 +132,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) ASSERT(this->output_queues.ready(i)); const SBuffer &buff = this->output_queues.front(i); - void *mem = buff.get(this->output_bytes_offset[i]); - const size_t bytes = buff.length - this->output_bytes_offset[i]; + void *mem = buff.get(buff.length); + const size_t bytes = buff.get_actual_length() - buff.length - buff.offset; const size_t items = bytes/this->output_items_sizes[i]; this->work_io_ptr_mask |= ptrdiff_t(mem); @@ -216,20 +214,18 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->produce_items[i] = 0; if (items == 0) continue; + SBuffer &buff = this->output_queues.front(i); this->items_produced[i] += items; const size_t bytes = items*this->output_items_sizes[i]; - this->output_bytes_offset[i] += bytes; + buff.length += bytes; //only pass output buffer downstream when the input is fully consumed... //Reasoning: For the sake of dealling with history, we can process the mini history input buffer, //and then call work again on the real input buffer, but still yield one output buffer per input buffer. if (input_allows_flush) { - SBuffer &buff = this->output_queues.front(i); - buff.length = this->output_bytes_offset[i]; task_iface.post_downstream(i, buff); this->output_queues.pop(i); - this->output_bytes_offset[i] = 0; } } |