diff options
author | Josh Blum | 2013-06-15 23:33:18 -0700 |
---|---|---|
committer | Josh Blum | 2013-06-15 23:33:18 -0700 |
commit | e6d6d285cfd7f4d8f63c45bc77cb53943a04a5eb (patch) | |
tree | 904c042399ee0cac75b3f66483269feb836dca27 /lib/task_main.cpp | |
parent | f6140c831e1585eed07d35c17a2792f214e94636 (diff) | |
download | sandhi-e6d6d285cfd7f4d8f63c45bc77cb53943a04a5eb.tar.gz sandhi-e6d6d285cfd7f4d8f63c45bc77cb53943a04a5eb.tar.bz2 sandhi-e6d6d285cfd7f4d8f63c45bc77cb53943a04a5eb.zip |
gras: dont update item counts until after work
This lets API calls like get_consumed/produced
keep their values until after work is called.
The propagate tags overload need this to work.
Diffstat (limited to 'lib/task_main.cpp')
-rw-r--r-- | lib/task_main.cpp | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/lib/task_main.cpp b/lib/task_main.cpp index a16ec7e..ca05979 100644 --- a/lib/task_main.cpp +++ b/lib/task_main.cpp @@ -53,7 +53,7 @@ static GRAS_FORCE_INLINE void trim_msgs(boost::shared_ptr<BlockData> &data, cons static GRAS_FORCE_INLINE void trim_buffs(boost::shared_ptr<BlockData> &data, const size_t i) { - const size_t num_read = data->num_input_bytes_read[i]; + const size_t num_read = data->num_input_items_read[i]; if GRAS_LIKELY(num_read > 0) { data->input_queues.consume(i, num_read); @@ -86,7 +86,7 @@ void BlockActor::task_main(void) for (size_t i = 0; i < num_inputs; i++) { sort_tags(data, i); - data->num_input_bytes_read[i] = 0; + data->num_input_items_read[i] = 0; data->num_input_msgs_read[i] = 0; ASSERT(data->input_queues.ready(i)); @@ -120,6 +120,8 @@ void BlockActor::task_main(void) data->output_items.max() = 0; for (size_t i = 0; i < num_outputs; i++) { + data->num_output_items_read[i] = 0; + ASSERT(data->output_queues.ready(i)); SBuffer &buff = data->output_queues.front(i); ASSERT(buff.length == 0); //assumes it was flushed last call @@ -169,6 +171,7 @@ void BlockActor::task_main(void) //missing at least one upstream provider? //since nothing else is coming in, its safe to mark done mark_done = mark_done or this->is_input_done(i); + data->total_items_consumed[i] += data->num_input_items_read[i]; } //------------------------------------------------------------------ @@ -187,8 +190,8 @@ void BlockActor::task_main(void) //Post a buffer message downstream only if the produce flag was marked. //So this explicitly after consuming the output queues so pop is called. //This is because pop may have special hooks in it to prepare the buffer. - if GRAS_LIKELY(data->produce_outputs[i]) worker->post_downstream(i, buff_msg); - data->produce_outputs[i] = false; + if GRAS_LIKELY(data->num_output_items_read[i]) worker->post_downstream(i, buff_msg); + data->total_items_produced[i] += data->num_output_items_read[i]; } //marked done by post work logic |