diff options
Diffstat (limited to 'lib/task_main.cpp')
-rw-r--r-- | lib/task_main.cpp | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/lib/task_main.cpp b/lib/task_main.cpp index a16ec7e..ca05979 100644 --- a/lib/task_main.cpp +++ b/lib/task_main.cpp @@ -53,7 +53,7 @@ static GRAS_FORCE_INLINE void trim_msgs(boost::shared_ptr<BlockData> &data, cons static GRAS_FORCE_INLINE void trim_buffs(boost::shared_ptr<BlockData> &data, const size_t i) { - const size_t num_read = data->num_input_bytes_read[i]; + const size_t num_read = data->num_input_items_read[i]; if GRAS_LIKELY(num_read > 0) { data->input_queues.consume(i, num_read); @@ -86,7 +86,7 @@ void BlockActor::task_main(void) for (size_t i = 0; i < num_inputs; i++) { sort_tags(data, i); - data->num_input_bytes_read[i] = 0; + data->num_input_items_read[i] = 0; data->num_input_msgs_read[i] = 0; ASSERT(data->input_queues.ready(i)); @@ -120,6 +120,8 @@ void BlockActor::task_main(void) data->output_items.max() = 0; for (size_t i = 0; i < num_outputs; i++) { + data->num_output_items_read[i] = 0; + ASSERT(data->output_queues.ready(i)); SBuffer &buff = data->output_queues.front(i); ASSERT(buff.length == 0); //assumes it was flushed last call @@ -169,6 +171,7 @@ void BlockActor::task_main(void) //missing at least one upstream provider? //since nothing else is coming in, its safe to mark done mark_done = mark_done or this->is_input_done(i); + data->total_items_consumed[i] += data->num_input_items_read[i]; } //------------------------------------------------------------------ @@ -187,8 +190,8 @@ void BlockActor::task_main(void) //Post a buffer message downstream only if the produce flag was marked. //So this explicitly after consuming the output queues so pop is called. //This is because pop may have special hooks in it to prepare the buffer. - if GRAS_LIKELY(data->produce_outputs[i]) worker->post_downstream(i, buff_msg); - data->produce_outputs[i] = false; + if GRAS_LIKELY(data->num_output_items_read[i]) worker->post_downstream(i, buff_msg); + data->total_items_produced[i] += data->num_output_items_read[i]; } //marked done by post work logic |