diff options
author | Josh Blum | 2013-03-31 20:41:04 -0700 |
---|---|---|
committer | Josh Blum | 2013-03-31 20:41:04 -0700 |
commit | e93c4eee812900480b5b2a8c7deab77417cdf448 (patch) | |
tree | c99af1370859d93fa506baeaaf35c9001a179bf7 /lib | |
parent | 36b49de271c778a1645d27d16cc2e8f4007bed73 (diff) | |
download | sandhi-e93c4eee812900480b5b2a8c7deab77417cdf448.tar.gz sandhi-e93c4eee812900480b5b2a8c7deab77417cdf448.tar.bz2 sandhi-e93c4eee812900480b5b2a8c7deab77417cdf448.zip |
gras: fix #60 by checking done logic after handler
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block_task.cpp | 4 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 7 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 21 |
3 files changed, 17 insertions, 15 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 21986fc..cc707a8 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -211,9 +211,7 @@ void BlockActor::handle_task(void) this->trim_msgs(i); //update the inputs available bit field - const bool has_input_bufs = not this->input_queues.empty(i); - const bool has_input_msgs = not this->input_msgs[i].empty(); - this->inputs_available.set(i, has_input_bufs or has_input_msgs); + this->update_input_avail(i); //missing at least one upstream provider? //since nothing else is coming in, its safe to mark done diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 9006d6e..e620d77 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -130,6 +130,13 @@ struct BlockActor : Apology::Worker if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress()); } + GRAS_FORCE_INLINE void update_input_avail(const size_t i) + { + const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i); + const bool has_input_msgs = not this->input_msgs[i].empty(); + this->inputs_available.set(i, has_input_bufs or has_input_msgs); + } + GRAS_FORCE_INLINE bool is_input_done(const size_t i) { return this->inputs_done[i] and not this->inputs_available[i]; diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index 27704fe..4702329 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -37,7 +37,7 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron:: //handle incoming async message, push into the msg storage if (this->block_state == BLOCK_STATE_DONE) return; this->input_msgs[index].push_back(message.msg); - this->inputs_available.set(index); + this->update_input_avail(index); ta.done(); this->handle_task(); @@ -52,7 +52,7 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th //handle incoming stream buffer, push into the queue if (this->block_state == BLOCK_STATE_DONE) return; this->input_queues.push(index, message.buffer); - this->inputs_available.set(index); + this->update_input_avail(index); ta.done(); this->handle_task(); @@ -76,16 +76,13 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther //an upstream block declared itself done, recheck the token this->inputs_done.set(index, this->input_tokens[index].unique()); - if (this->is_input_done(index)) //missing an upstream provider - { - this->mark_done(); - } - //or re-enter handle task so fail logic can mark done - else - { - ta.done(); - this->handle_task(); - } + + //upstream done, give it one more attempt at task handling + ta.done(); + this->handle_task(); + + //now recheck the status, mark block done if the input is done + if (this->is_input_done(index)) this->mark_done(); } void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address) |