summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp18
1 files changed, 12 insertions, 6 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index c460d01..87d0c8a 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -120,6 +120,7 @@ void BlockActor::handle_task(void)
for (size_t i = 0; i < num_inputs; i++)
{
this->sort_tags(i);
+ this->num_input_msgs_read[i] = 0;
ASSERT(this->input_queues.ready(i));
const SBuffer &buff = this->input_queues.front(i);
@@ -188,7 +189,7 @@ void BlockActor::handle_task(void)
TimerAccumulate ta_post(this->stats.total_time_post);
//------------------------------------------------------------------
- //-- Flush output buffers downstream
+ //-- Post-work output tasks
//------------------------------------------------------------------
for (size_t i = 0; i < num_outputs; i++)
{
@@ -196,14 +197,19 @@ void BlockActor::handle_task(void)
}
//------------------------------------------------------------------
- //-- Message self based on post-work conditions
+ //-- Post-work input tasks
//------------------------------------------------------------------
- //missing at least one upstream provider?
- //since nothing else is coming in, its safe to mark done
for (size_t i = 0; i < num_inputs; i++)
{
- const bool nothing = this->input_queues.empty(i) and this->input_msgs[i].empty();
- this->inputs_available.set(i, not nothing);
+ this->trim_msgs(i);
+
+ //update the inputs available bit field
+ const bool has_input_bufs = not this->input_queues.empty(i);
+ const bool has_input_msgs = not this->input_msgs[i].empty();
+ this->inputs_available.set(i, has_input_bufs or has_input_msgs);
+
+ //missing at least one upstream provider?
+ //since nothing else is coming in, its safe to mark done
if GRAS_UNLIKELY(this->is_input_done(i)) this->mark_done();
}