summaryrefslogtreecommitdiff
path: root/lib/task_main.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/task_main.cpp')
-rw-r--r--lib/task_main.cpp11
1 files changed, 7 insertions, 4 deletions
diff --git a/lib/task_main.cpp b/lib/task_main.cpp
index a16ec7e..ca05979 100644
--- a/lib/task_main.cpp
+++ b/lib/task_main.cpp
@@ -53,7 +53,7 @@ static GRAS_FORCE_INLINE void trim_msgs(boost::shared_ptr<BlockData> &data, cons
static GRAS_FORCE_INLINE void trim_buffs(boost::shared_ptr<BlockData> &data, const size_t i)
{
- const size_t num_read = data->num_input_bytes_read[i];
+ const size_t num_read = data->num_input_items_read[i];
if GRAS_LIKELY(num_read > 0)
{
data->input_queues.consume(i, num_read);
@@ -86,7 +86,7 @@ void BlockActor::task_main(void)
for (size_t i = 0; i < num_inputs; i++)
{
sort_tags(data, i);
- data->num_input_bytes_read[i] = 0;
+ data->num_input_items_read[i] = 0;
data->num_input_msgs_read[i] = 0;
ASSERT(data->input_queues.ready(i));
@@ -120,6 +120,8 @@ void BlockActor::task_main(void)
data->output_items.max() = 0;
for (size_t i = 0; i < num_outputs; i++)
{
+ data->num_output_items_read[i] = 0;
+
ASSERT(data->output_queues.ready(i));
SBuffer &buff = data->output_queues.front(i);
ASSERT(buff.length == 0); //assumes it was flushed last call
@@ -169,6 +171,7 @@ void BlockActor::task_main(void)
//missing at least one upstream provider?
//since nothing else is coming in, its safe to mark done
mark_done = mark_done or this->is_input_done(i);
+ data->total_items_consumed[i] += data->num_input_items_read[i];
}
//------------------------------------------------------------------
@@ -187,8 +190,8 @@ void BlockActor::task_main(void)
//Post a buffer message downstream only if the produce flag was marked.
//So this explicitly after consuming the output queues so pop is called.
//This is because pop may have special hooks in it to prepare the buffer.
- if GRAS_LIKELY(data->produce_outputs[i]) worker->post_downstream(i, buff_msg);
- data->produce_outputs[i] = false;
+ if GRAS_LIKELY(data->num_output_items_read[i]) worker->post_downstream(i, buff_msg);
+ data->total_items_produced[i] += data->num_output_items_read[i];
}
//marked done by post work logic