summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2013-03-31 20:41:04 -0700
committerJosh Blum2013-03-31 20:41:04 -0700
commite93c4eee812900480b5b2a8c7deab77417cdf448 (patch)
treec99af1370859d93fa506baeaaf35c9001a179bf7 /lib
parent36b49de271c778a1645d27d16cc2e8f4007bed73 (diff)
downloadsandhi-e93c4eee812900480b5b2a8c7deab77417cdf448.tar.gz
sandhi-e93c4eee812900480b5b2a8c7deab77417cdf448.tar.bz2
sandhi-e93c4eee812900480b5b2a8c7deab77417cdf448.zip
gras: fix #60 by checking done logic after handler
Diffstat (limited to 'lib')
-rw-r--r--lib/block_task.cpp4
-rw-r--r--lib/gras_impl/block_actor.hpp7
-rw-r--r--lib/input_handlers.cpp21
3 files changed, 17 insertions, 15 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 21986fc..cc707a8 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -211,9 +211,7 @@ void BlockActor::handle_task(void)
this->trim_msgs(i);
//update the inputs available bit field
- const bool has_input_bufs = not this->input_queues.empty(i);
- const bool has_input_msgs = not this->input_msgs[i].empty();
- this->inputs_available.set(i, has_input_bufs or has_input_msgs);
+ this->update_input_avail(i);
//missing at least one upstream provider?
//since nothing else is coming in, its safe to mark done
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 9006d6e..e620d77 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -130,6 +130,13 @@ struct BlockActor : Apology::Worker
if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress());
}
+ GRAS_FORCE_INLINE void update_input_avail(const size_t i)
+ {
+ const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i);
+ const bool has_input_msgs = not this->input_msgs[i].empty();
+ this->inputs_available.set(i, has_input_bufs or has_input_msgs);
+ }
+
GRAS_FORCE_INLINE bool is_input_done(const size_t i)
{
return this->inputs_done[i] and not this->inputs_available[i];
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index 27704fe..4702329 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -37,7 +37,7 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::
//handle incoming async message, push into the msg storage
if (this->block_state == BLOCK_STATE_DONE) return;
this->input_msgs[index].push_back(message.msg);
- this->inputs_available.set(index);
+ this->update_input_avail(index);
ta.done();
this->handle_task();
@@ -52,7 +52,7 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th
//handle incoming stream buffer, push into the queue
if (this->block_state == BLOCK_STATE_DONE) return;
this->input_queues.push(index, message.buffer);
- this->inputs_available.set(index);
+ this->update_input_avail(index);
ta.done();
this->handle_task();
@@ -76,16 +76,13 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther
//an upstream block declared itself done, recheck the token
this->inputs_done.set(index, this->input_tokens[index].unique());
- if (this->is_input_done(index)) //missing an upstream provider
- {
- this->mark_done();
- }
- //or re-enter handle task so fail logic can mark done
- else
- {
- ta.done();
- this->handle_task();
- }
+
+ //upstream done, give it one more attempt at task handling
+ ta.done();
+ this->handle_task();
+
+ //now recheck the status, mark block done if the input is done
+ if (this->is_input_done(index)) this->mark_done();
}
void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address)