diff options
m--------- | grextras | 0 | ||||
-rw-r--r-- | include/gras/block_config.hpp | 16 | ||||
-rw-r--r-- | lib/block_config.cpp | 1 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 2 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 8 | ||||
-rw-r--r-- | lib/gras_impl/block_data.hpp | 1 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 17 | ||||
-rw-r--r-- | lib/task_main.cpp | 9 | ||||
-rw-r--r-- | lib/top_block.cpp | 37 |
9 files changed, 43 insertions, 48 deletions
diff --git a/grextras b/grextras -Subproject 1f150b3b57671f1f14066967c65819317493004 +Subproject 81cd59739e2c8181ccd7f2670948e9714ef4049 diff --git a/include/gras/block_config.hpp b/include/gras/block_config.hpp index c026cba..5e5c0cb 100644 --- a/include/gras/block_config.hpp +++ b/include/gras/block_config.hpp @@ -109,22 +109,6 @@ struct GRAS_API InputPortConfig * Default = 0. */ size_t preload_items; - - /*! - * Force this block done when input port is done. - * When the upstream feeding this port declares done, - * this block will mark done once upstream notifies. - * The primary usage is to modify the done logic - * for the purposes of unit test confiruability. - * - * If the force done option is false, the block will - * not mark done when this port's upstream is done. - * However, this block will mark done when all - * input ports are done, reguardless of this setting. - * - * Default = true. - */ - bool force_done; }; //! Configuration parameters for an output port diff --git a/lib/block_config.cpp b/lib/block_config.cpp index 04d37f1..165ab47 100644 --- a/lib/block_config.cpp +++ b/lib/block_config.cpp @@ -18,7 +18,6 @@ InputPortConfig::InputPortConfig(void) maximum_items = 0; inline_buffer = false; preload_items = 0; - force_done = true; } OutputPortConfig::OutputPortConfig(void) diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 4cb03ab..a2df81c 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -33,7 +33,7 @@ void BlockActor::handle_top_inert( this->mark_done(); - this->Send(0, from); //ACK + if (from != Theron::Address::Null()) this->Send(0, from); //ACK } void BlockActor::handle_top_token( diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 8c9fdf4..840e8c2 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -95,7 +95,6 @@ struct BlockActor : Theron::Actor void consume(const size_t index, const size_t items); void task_kicker(void); void update_input_avail(const size_t index); - bool is_input_done(const size_t index); bool is_work_allowed(void); //work helpers @@ -123,13 +122,6 @@ GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i) data->input_queues.update_has_msg(i, has_input_msgs); } -GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i) -{ - const bool force_done = data->input_configs[i].force_done; - if GRAS_LIKELY(force_done) return data->inputs_done[i] and not data->inputs_available[i]; - return data->inputs_done.all() and data->inputs_available.none(); -} - GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void) { return ( diff --git a/lib/gras_impl/block_data.hpp b/lib/gras_impl/block_data.hpp index 8697d48..cbc657e 100644 --- a/lib/gras_impl/block_data.hpp +++ b/lib/gras_impl/block_data.hpp @@ -51,6 +51,7 @@ struct BlockData BitSet inputs_done; BitSet outputs_done; std::set<Token> token_pool; + boost::system_time first_input_done_time; //buffer queues and ready conditions InputBufferQueues input_queues; diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index 89e1d34..3027257 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -63,6 +63,12 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther MESSAGE_TRACER(); const size_t index = message.index; + //record time of the first input declared done + if (not data->inputs_done.any()) + { + data->first_input_done_time = boost::get_system_time(); + } + //an upstream block declared itself done, recheck the token data->inputs_done.set(index, data->input_tokens[index].unique()); @@ -70,8 +76,15 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther ta.done(); this->task_main(); - //now recheck the status, mark block done if the input is done - if (this->is_input_done(index)) this->mark_done(); + //Now check the status, mark block done if the input is done: + //When reserve_items is non zero, this ports is a sync input; + //mark the block done if the sync input will never be ready again. + //Otherwise, only mark done if all inputs are done with no data. + const size_t reserve_items = data->input_configs[index].reserve_items; + if ( + (reserve_items != 0 and data->inputs_done[index] and not data->inputs_available[index]) + or (reserve_items == 0 and data->inputs_done.all() and data->inputs_available.none()) + ) this->mark_done(); } void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address) diff --git a/lib/task_main.cpp b/lib/task_main.cpp index 2e496fc..5547b08 100644 --- a/lib/task_main.cpp +++ b/lib/task_main.cpp @@ -193,15 +193,6 @@ void BlockActor::task_main(void) data->total_items_produced[i] += data->num_output_items_read[i]; } - //now recheck the status, mark block done if an input is done - if GRAS_UNLIKELY(data->inputs_done.any()) - { - for (size_t i = 0; i < num_inputs; i++) - { - if (this->is_input_done(i)) this->mark_done(); - } - } - //still have IO ready? kick off another task this->task_kicker(); } diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 515d898..74d9523 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -90,6 +90,9 @@ GRAS_FORCE_INLINE void wait_thread_yield(void) boost::this_thread::sleep(boost::posix_time::milliseconds(1)); } +static const boost::posix_time::time_duration CHECK_DONE_INTERVAL = boost::posix_time::milliseconds(100); +static const boost::posix_time::time_duration INPUT_DONE_GRACE_PERIOD = boost::posix_time::milliseconds(100); + void TopBlock::wait(void) { //We do not need to join "special" threads; @@ -99,7 +102,7 @@ void TopBlock::wait(void) //(*this)->thread_group->join_all(); //QA lockup detection setup - const bool lockup_debug = getenv("GRAS_LOCKUP_DEBUG") != NULL; + bool lockup_debug = getenv("GRAS_LOCKUP_DEBUG") != NULL; boost::system_time check_done_time = boost::get_system_time(); bool has_a_done = false; @@ -107,19 +110,31 @@ void TopBlock::wait(void) while (not (*this)->token.unique()) { wait_thread_yield(); - if (lockup_debug and boost::get_system_time() > check_done_time) + + //determine if we should check on the done status + if (boost::get_system_time() < check_done_time) continue; + check_done_time += CHECK_DONE_INTERVAL; + + //optional dot print to understand lockup condition + if (has_a_done and lockup_debug) { - if (has_a_done) - { - std::cerr << this->query("{\"path\":\"/topology.dot\"}") << std::endl; - check_done_time += boost::posix_time::seconds(3); - } - BOOST_FOREACH(Apology::Worker *w, (*this)->executor->get_workers()) + std::cerr << TopBlock::query("{\"path\":\"/topology.dot\"}") << std::endl; + lockup_debug = false; + } + + //loop through blocks looking for non-done blocks with done inputs + BOOST_FOREACH(Apology::Worker *w, (*this)->executor->get_workers()) + { + BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); + if (actor->data->block_state == BLOCK_STATE_DONE) has_a_done = true; + if (actor->data->inputs_done.size() and actor->data->inputs_done.any()) { - BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); - if (actor->data->block_state == BLOCK_STATE_DONE) has_a_done = true; + const boost::system_time grace_over = actor->data->first_input_done_time + INPUT_DONE_GRACE_PERIOD; + if ((not lockup_debug) and (boost::get_system_time() > grace_over)) + { + actor->GetFramework().Send(TopInertMessage(), Theron::Address::Null(), actor->GetAddress()); + } } - check_done_time += boost::posix_time::seconds(2); } } } |