summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-09-29 14:45:29 -0700
committerJosh Blum2012-09-29 14:45:29 -0700
commitec1677346389ab3b434d81c6bde15321f3dbe209 (patch)
treea4fd8498e64dd90f2fc169a9de747e49e2173830 /lib/block_task.cpp
parentb194049a9fb5ab60f15bfcca1a53e39a42339244 (diff)
downloadsandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.tar.gz
sandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.tar.bz2
sandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.zip
create IO subscriber bitset for tracking done
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp33
1 files changed, 10 insertions, 23 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 761e923..3f012da 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -90,22 +90,17 @@ void BlockActor::handle_task(void)
const size_t num_inputs = this->get_num_inputs();
const size_t num_outputs = this->get_num_outputs();
- //const bool is_source = (num_inputs == 0);
- //const bool is_sink = (num_outputs == 0);
this->work_io_ptr_mask = 0; //reset
//------------------------------------------------------------------
//-- initialize input buffers before work
//------------------------------------------------------------------
size_t num_input_items = REALLY_BIG; //so big that it must std::min
- bool inputs_done = false;
size_t output_inline_index = 0;
for (size_t i = 0; i < num_inputs; i++)
{
this->sort_tags(i);
- inputs_done = inputs_done or this->input_tokens[i].unique();
-
ASSERT(this->input_queues.ready(i));
bool potential_inline;
const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate, potential_inline);
@@ -141,11 +136,8 @@ void BlockActor::handle_task(void)
//-- initialize output buffers before work
//------------------------------------------------------------------
size_t num_output_items = REALLY_BIG; //so big that it must std::min
- bool outputs_done = false;
for (size_t i = 0; i < num_outputs; i++)
{
- outputs_done = outputs_done or this->output_tokens[i].unique();
-
ASSERT(this->output_queues.ready(i));
const SBuffer &buff = this->output_queues.front(i);
void *mem = buff.get(buff.length);
@@ -160,13 +152,6 @@ void BlockActor::handle_task(void)
this->produce_items[i] = 0;
}
- //if we have outputs and at least one port has no downstream subscibers, mark done
- if (outputs_done)
- {
- this->mark_done();
- return;
- }
-
//------------------------------------------------------------------
//-- forecast
//------------------------------------------------------------------
@@ -187,7 +172,7 @@ void BlockActor::handle_task(void)
if (num_output_items) goto forecast_again_you_jerk;
this->forecast_fail = true;
- this->conclusion(inputs_done);
+ this->conclusion();
return;
}
}
@@ -258,17 +243,19 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- Message self based on post-work conditions
//------------------------------------------------------------------
- this->conclusion(inputs_done);
+ this->conclusion();
}
-GRAS_FORCE_INLINE void BlockActor::conclusion(const bool inputs_done)
+GRAS_FORCE_INLINE void BlockActor::conclusion(void)
{
- //if there are inputs, and not all are provided for,
- //tell the block to check input queues and handle done
- if (inputs_done)
+
+ //since nothing else is coming in, its safe to mark done
+ if ((~this->inputs_done).none()) //no upstream providers
{
- this->Push(CheckTokensMessage(), Theron::Address());
- return;
+ if (not this->input_queues.all_ready() or this->forecast_fail)
+ {
+ this->mark_done();
+ }
}
//still have IO ready? kick off another task