summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-09-10 16:12:43 -0700
committerJosh Blum2012-09-10 16:12:43 -0700
commit053d2f5cdbb4027d65aa7cb8af851a9edeac1d0a (patch)
treeadf2f5347509d8523f20e62ffc33f69dbe602946 /lib/block_task.cpp
parentf26a477d4526d2abf9310b0b620c66376759d4db (diff)
downloadsandhi-053d2f5cdbb4027d65aa7cb8af851a9edeac1d0a.tar.gz
sandhi-053d2f5cdbb4027d65aa7cb8af851a9edeac1d0a.tar.bz2
sandhi-053d2f5cdbb4027d65aa7cb8af851a9edeac1d0a.zip
remove output bytes offset, can use sbuffer's length
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp16
1 files changed, 6 insertions, 10 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 4acc1b9..3843b28 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -29,13 +29,11 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
//flush partial output buffers to the downstream
for (size_t i = 0; i < task_iface.get_num_outputs(); i++)
{
- if (this->output_bytes_offset[i] == 0) continue;
- ASSERT(this->output_queues.ready(i));
+ if (not this->output_queues.ready(i)) continue;
SBuffer &buff = this->output_queues.front(i);
- buff.length = this->output_bytes_offset[i];
+ if (buff.length == 0) continue;
task_iface.post_downstream(i, buff);
this->output_queues.pop(i);
- this->output_bytes_offset[i] = 0;
}
//mark down the new state
@@ -134,8 +132,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
ASSERT(this->output_queues.ready(i));
const SBuffer &buff = this->output_queues.front(i);
- void *mem = buff.get(this->output_bytes_offset[i]);
- const size_t bytes = buff.length - this->output_bytes_offset[i];
+ void *mem = buff.get(buff.length);
+ const size_t bytes = buff.get_actual_length() - buff.length - buff.offset;
const size_t items = bytes/this->output_items_sizes[i];
this->work_io_ptr_mask |= ptrdiff_t(mem);
@@ -216,20 +214,18 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->produce_items[i] = 0;
if (items == 0) continue;
+ SBuffer &buff = this->output_queues.front(i);
this->items_produced[i] += items;
const size_t bytes = items*this->output_items_sizes[i];
- this->output_bytes_offset[i] += bytes;
+ buff.length += bytes;
//only pass output buffer downstream when the input is fully consumed...
//Reasoning: For the sake of dealling with history, we can process the mini history input buffer,
//and then call work again on the real input buffer, but still yield one output buffer per input buffer.
if (input_allows_flush)
{
- SBuffer &buff = this->output_queues.front(i);
- buff.length = this->output_bytes_offset[i];
task_iface.post_downstream(i, buff);
this->output_queues.pop(i);
- this->output_bytes_offset[i] = 0;
}
}