diff options
author | Josh Blum | 2012-09-29 14:45:29 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-29 14:45:29 -0700 |
commit | ec1677346389ab3b434d81c6bde15321f3dbe209 (patch) | |
tree | a4fd8498e64dd90f2fc169a9de747e49e2173830 /lib/block_task.cpp | |
parent | b194049a9fb5ab60f15bfcca1a53e39a42339244 (diff) | |
download | sandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.tar.gz sandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.tar.bz2 sandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.zip |
create IO subscriber bitset for tracking done
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 33 |
1 files changed, 10 insertions, 23 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 761e923..3f012da 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -90,22 +90,17 @@ void BlockActor::handle_task(void) const size_t num_inputs = this->get_num_inputs(); const size_t num_outputs = this->get_num_outputs(); - //const bool is_source = (num_inputs == 0); - //const bool is_sink = (num_outputs == 0); this->work_io_ptr_mask = 0; //reset //------------------------------------------------------------------ //-- initialize input buffers before work //------------------------------------------------------------------ size_t num_input_items = REALLY_BIG; //so big that it must std::min - bool inputs_done = false; size_t output_inline_index = 0; for (size_t i = 0; i < num_inputs; i++) { this->sort_tags(i); - inputs_done = inputs_done or this->input_tokens[i].unique(); - ASSERT(this->input_queues.ready(i)); bool potential_inline; const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate, potential_inline); @@ -141,11 +136,8 @@ void BlockActor::handle_task(void) //-- initialize output buffers before work //------------------------------------------------------------------ size_t num_output_items = REALLY_BIG; //so big that it must std::min - bool outputs_done = false; for (size_t i = 0; i < num_outputs; i++) { - outputs_done = outputs_done or this->output_tokens[i].unique(); - ASSERT(this->output_queues.ready(i)); const SBuffer &buff = this->output_queues.front(i); void *mem = buff.get(buff.length); @@ -160,13 +152,6 @@ void BlockActor::handle_task(void) this->produce_items[i] = 0; } - //if we have outputs and at least one port has no downstream subscibers, mark done - if (outputs_done) - { - this->mark_done(); - return; - } - //------------------------------------------------------------------ //-- forecast //------------------------------------------------------------------ @@ -187,7 +172,7 @@ void BlockActor::handle_task(void) if (num_output_items) goto forecast_again_you_jerk; this->forecast_fail = true; - this->conclusion(inputs_done); + this->conclusion(); return; } } @@ -258,17 +243,19 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- Message self based on post-work conditions //------------------------------------------------------------------ - this->conclusion(inputs_done); + this->conclusion(); } -GRAS_FORCE_INLINE void BlockActor::conclusion(const bool inputs_done) +GRAS_FORCE_INLINE void BlockActor::conclusion(void) { - //if there are inputs, and not all are provided for, - //tell the block to check input queues and handle done - if (inputs_done) + + //since nothing else is coming in, its safe to mark done + if ((~this->inputs_done).none()) //no upstream providers { - this->Push(CheckTokensMessage(), Theron::Address()); - return; + if (not this->input_queues.all_ready() or this->forecast_fail) + { + this->mark_done(); + } } //still have IO ready? kick off another task |