diff options
author | Josh Blum | 2013-06-20 19:48:29 -0700 |
---|---|---|
committer | Josh Blum | 2013-06-20 19:48:29 -0700 |
commit | 37bd56f3301795e89294c048883324b0353237ef (patch) | |
tree | 77d1a9066c0f416c6758eab154dcf2706e197595 /lib | |
parent | 630a272e6725a547327366e543106e63c23fd816 (diff) | |
download | sandhi-37bd56f3301795e89294c048883324b0353237ef.tar.gz sandhi-37bd56f3301795e89294c048883324b0353237ef.tar.bz2 sandhi-37bd56f3301795e89294c048883324b0353237ef.zip |
gras: work on goddamn done logic
How blocks mark themselves done has been one of the most annoying things in this development.
This done logic is only valuable for QA tests, it doesnt even exist in the practical use case.
How it works now:
* blocks mark done when sync inputs are done or all inputs are done
* removed the force_done input config, its no longer needed
* the wait() implementation gives blocks a grace period between
an input becoming done and the block itself becoming done.
After the grace period, the block is forced done.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block_config.cpp | 1 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 2 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 8 | ||||
-rw-r--r-- | lib/gras_impl/block_data.hpp | 1 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 17 | ||||
-rw-r--r-- | lib/task_main.cpp | 9 | ||||
-rw-r--r-- | lib/top_block.cpp | 37 |
7 files changed, 43 insertions, 32 deletions
diff --git a/lib/block_config.cpp b/lib/block_config.cpp index 04d37f1..165ab47 100644 --- a/lib/block_config.cpp +++ b/lib/block_config.cpp @@ -18,7 +18,6 @@ InputPortConfig::InputPortConfig(void) maximum_items = 0; inline_buffer = false; preload_items = 0; - force_done = true; } OutputPortConfig::OutputPortConfig(void) diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 4cb03ab..a2df81c 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -33,7 +33,7 @@ void BlockActor::handle_top_inert( this->mark_done(); - this->Send(0, from); //ACK + if (from != Theron::Address::Null()) this->Send(0, from); //ACK } void BlockActor::handle_top_token( diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 8c9fdf4..840e8c2 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -95,7 +95,6 @@ struct BlockActor : Theron::Actor void consume(const size_t index, const size_t items); void task_kicker(void); void update_input_avail(const size_t index); - bool is_input_done(const size_t index); bool is_work_allowed(void); //work helpers @@ -123,13 +122,6 @@ GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i) data->input_queues.update_has_msg(i, has_input_msgs); } -GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i) -{ - const bool force_done = data->input_configs[i].force_done; - if GRAS_LIKELY(force_done) return data->inputs_done[i] and not data->inputs_available[i]; - return data->inputs_done.all() and data->inputs_available.none(); -} - GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void) { return ( diff --git a/lib/gras_impl/block_data.hpp b/lib/gras_impl/block_data.hpp index 8697d48..cbc657e 100644 --- a/lib/gras_impl/block_data.hpp +++ b/lib/gras_impl/block_data.hpp @@ -51,6 +51,7 @@ struct BlockData BitSet inputs_done; BitSet outputs_done; std::set<Token> token_pool; + boost::system_time first_input_done_time; //buffer queues and ready conditions InputBufferQueues input_queues; diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index 89e1d34..3027257 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -63,6 +63,12 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther MESSAGE_TRACER(); const size_t index = message.index; + //record time of the first input declared done + if (not data->inputs_done.any()) + { + data->first_input_done_time = boost::get_system_time(); + } + //an upstream block declared itself done, recheck the token data->inputs_done.set(index, data->input_tokens[index].unique()); @@ -70,8 +76,15 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther ta.done(); this->task_main(); - //now recheck the status, mark block done if the input is done - if (this->is_input_done(index)) this->mark_done(); + //Now check the status, mark block done if the input is done: + //When reserve_items is non zero, this ports is a sync input; + //mark the block done if the sync input will never be ready again. + //Otherwise, only mark done if all inputs are done with no data. + const size_t reserve_items = data->input_configs[index].reserve_items; + if ( + (reserve_items != 0 and data->inputs_done[index] and not data->inputs_available[index]) + or (reserve_items == 0 and data->inputs_done.all() and data->inputs_available.none()) + ) this->mark_done(); } void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address) diff --git a/lib/task_main.cpp b/lib/task_main.cpp index 2e496fc..5547b08 100644 --- a/lib/task_main.cpp +++ b/lib/task_main.cpp @@ -193,15 +193,6 @@ void BlockActor::task_main(void) data->total_items_produced[i] += data->num_output_items_read[i]; } - //now recheck the status, mark block done if an input is done - if GRAS_UNLIKELY(data->inputs_done.any()) - { - for (size_t i = 0; i < num_inputs; i++) - { - if (this->is_input_done(i)) this->mark_done(); - } - } - //still have IO ready? kick off another task this->task_kicker(); } diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 515d898..74d9523 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -90,6 +90,9 @@ GRAS_FORCE_INLINE void wait_thread_yield(void) boost::this_thread::sleep(boost::posix_time::milliseconds(1)); } +static const boost::posix_time::time_duration CHECK_DONE_INTERVAL = boost::posix_time::milliseconds(100); +static const boost::posix_time::time_duration INPUT_DONE_GRACE_PERIOD = boost::posix_time::milliseconds(100); + void TopBlock::wait(void) { //We do not need to join "special" threads; @@ -99,7 +102,7 @@ void TopBlock::wait(void) //(*this)->thread_group->join_all(); //QA lockup detection setup - const bool lockup_debug = getenv("GRAS_LOCKUP_DEBUG") != NULL; + bool lockup_debug = getenv("GRAS_LOCKUP_DEBUG") != NULL; boost::system_time check_done_time = boost::get_system_time(); bool has_a_done = false; @@ -107,19 +110,31 @@ void TopBlock::wait(void) while (not (*this)->token.unique()) { wait_thread_yield(); - if (lockup_debug and boost::get_system_time() > check_done_time) + + //determine if we should check on the done status + if (boost::get_system_time() < check_done_time) continue; + check_done_time += CHECK_DONE_INTERVAL; + + //optional dot print to understand lockup condition + if (has_a_done and lockup_debug) { - if (has_a_done) - { - std::cerr << this->query("{\"path\":\"/topology.dot\"}") << std::endl; - check_done_time += boost::posix_time::seconds(3); - } - BOOST_FOREACH(Apology::Worker *w, (*this)->executor->get_workers()) + std::cerr << TopBlock::query("{\"path\":\"/topology.dot\"}") << std::endl; + lockup_debug = false; + } + + //loop through blocks looking for non-done blocks with done inputs + BOOST_FOREACH(Apology::Worker *w, (*this)->executor->get_workers()) + { + BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); + if (actor->data->block_state == BLOCK_STATE_DONE) has_a_done = true; + if (actor->data->inputs_done.size() and actor->data->inputs_done.any()) { - BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); - if (actor->data->block_state == BLOCK_STATE_DONE) has_a_done = true; + const boost::system_time grace_over = actor->data->first_input_done_time + INPUT_DONE_GRACE_PERIOD; + if ((not lockup_debug) and (boost::get_system_time() > grace_over)) + { + actor->GetFramework().Send(TopInertMessage(), Theron::Address::Null(), actor->GetAddress()); + } } - check_done_time += boost::posix_time::seconds(2); } } } |