diff options
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index c460d01..87d0c8a 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -120,6 +120,7 @@ void BlockActor::handle_task(void) for (size_t i = 0; i < num_inputs; i++) { this->sort_tags(i); + this->num_input_msgs_read[i] = 0; ASSERT(this->input_queues.ready(i)); const SBuffer &buff = this->input_queues.front(i); @@ -188,7 +189,7 @@ void BlockActor::handle_task(void) TimerAccumulate ta_post(this->stats.total_time_post); //------------------------------------------------------------------ - //-- Flush output buffers downstream + //-- Post-work output tasks //------------------------------------------------------------------ for (size_t i = 0; i < num_outputs; i++) { @@ -196,14 +197,19 @@ void BlockActor::handle_task(void) } //------------------------------------------------------------------ - //-- Message self based on post-work conditions + //-- Post-work input tasks //------------------------------------------------------------------ - //missing at least one upstream provider? - //since nothing else is coming in, its safe to mark done for (size_t i = 0; i < num_inputs; i++) { - const bool nothing = this->input_queues.empty(i) and this->input_msgs[i].empty(); - this->inputs_available.set(i, not nothing); + this->trim_msgs(i); + + //update the inputs available bit field + const bool has_input_bufs = not this->input_queues.empty(i); + const bool has_input_msgs = not this->input_msgs[i].empty(); + this->inputs_available.set(i, has_input_bufs or has_input_msgs); + + //missing at least one upstream provider? + //since nothing else is coming in, its safe to mark done if GRAS_UNLIKELY(this->is_input_done(i)) this->mark_done(); } |