summaryrefslogtreecommitdiff
path: root/lib/block_handlers.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-09-29 14:45:29 -0700
committerJosh Blum2012-09-29 14:45:29 -0700
commitec1677346389ab3b434d81c6bde15321f3dbe209 (patch)
treea4fd8498e64dd90f2fc169a9de747e49e2173830 /lib/block_handlers.cpp
parentb194049a9fb5ab60f15bfcca1a53e39a42339244 (diff)
downloadsandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.tar.gz
sandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.tar.bz2
sandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.zip
create IO subscriber bitset for tracking done
Diffstat (limited to 'lib/block_handlers.cpp')
-rw-r--r--lib/block_handlers.cpp29
1 files changed, 14 insertions, 15 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index f386804..6925a60 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -27,6 +27,12 @@ void BlockActor::handle_top_active(
){
MESSAGE_TRACER();
+ if (this->block_state == BLOCK_STATE_DONE)
+ {
+ this->Send(0, from); //ACK
+ return;
+ }
+
if (this->block_state != BLOCK_STATE_LIVE)
{
this->block_ptr->start();
@@ -61,10 +67,17 @@ void BlockActor::handle_top_token(
){
MESSAGE_TRACER();
+ if (this->block_state == BLOCK_STATE_DONE)
+ {
+ this->Send(0, from); //ACK
+ return;
+ }
+
//create input tokens and send allocation hints
for (size_t i = 0; i < this->get_num_inputs(); i++)
{
this->input_tokens[i] = Token::make();
+ this->inputs_done.reset(i);
OutputTokenMessage token_msg;
token_msg.token = this->input_tokens[i];
this->post_upstream(i, token_msg);
@@ -83,6 +96,7 @@ void BlockActor::handle_top_token(
for (size_t i = 0; i < this->get_num_outputs(); i++)
{
this->output_tokens[i] = Token::make();
+ this->outputs_done.reset(i);
InputTokenMessage token_msg;
token_msg.token = this->output_tokens[i];
this->post_downstream(i, token_msg);
@@ -133,18 +147,3 @@ void BlockActor::handle_self_kick(
MESSAGE_TRACER();
this->handle_task();
}
-
-void BlockActor::handle_check_tokens(
- const CheckTokensMessage &,
- const Theron::Address
-){
- MESSAGE_TRACER();
- if (this->input_queues.all_ready() and not this->forecast_fail)
- {
- this->handle_task();
- }
- else
- {
- this->mark_done();
- }
-}