summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
m---------grextras0
-rw-r--r--include/gras/block_config.hpp16
-rw-r--r--lib/block_config.cpp1
-rw-r--r--lib/block_handlers.cpp2
-rw-r--r--lib/gras_impl/block_actor.hpp8
-rw-r--r--lib/gras_impl/block_data.hpp1
-rw-r--r--lib/input_handlers.cpp17
-rw-r--r--lib/task_main.cpp9
-rw-r--r--lib/top_block.cpp37
9 files changed, 43 insertions, 48 deletions
diff --git a/grextras b/grextras
-Subproject 1f150b3b57671f1f14066967c65819317493004
+Subproject 81cd59739e2c8181ccd7f2670948e9714ef4049
diff --git a/include/gras/block_config.hpp b/include/gras/block_config.hpp
index c026cba..5e5c0cb 100644
--- a/include/gras/block_config.hpp
+++ b/include/gras/block_config.hpp
@@ -109,22 +109,6 @@ struct GRAS_API InputPortConfig
* Default = 0.
*/
size_t preload_items;
-
- /*!
- * Force this block done when input port is done.
- * When the upstream feeding this port declares done,
- * this block will mark done once upstream notifies.
- * The primary usage is to modify the done logic
- * for the purposes of unit test confiruability.
- *
- * If the force done option is false, the block will
- * not mark done when this port's upstream is done.
- * However, this block will mark done when all
- * input ports are done, reguardless of this setting.
- *
- * Default = true.
- */
- bool force_done;
};
//! Configuration parameters for an output port
diff --git a/lib/block_config.cpp b/lib/block_config.cpp
index 04d37f1..165ab47 100644
--- a/lib/block_config.cpp
+++ b/lib/block_config.cpp
@@ -18,7 +18,6 @@ InputPortConfig::InputPortConfig(void)
maximum_items = 0;
inline_buffer = false;
preload_items = 0;
- force_done = true;
}
OutputPortConfig::OutputPortConfig(void)
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 4cb03ab..a2df81c 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -33,7 +33,7 @@ void BlockActor::handle_top_inert(
this->mark_done();
- this->Send(0, from); //ACK
+ if (from != Theron::Address::Null()) this->Send(0, from); //ACK
}
void BlockActor::handle_top_token(
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 8c9fdf4..840e8c2 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -95,7 +95,6 @@ struct BlockActor : Theron::Actor
void consume(const size_t index, const size_t items);
void task_kicker(void);
void update_input_avail(const size_t index);
- bool is_input_done(const size_t index);
bool is_work_allowed(void);
//work helpers
@@ -123,13 +122,6 @@ GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i)
data->input_queues.update_has_msg(i, has_input_msgs);
}
-GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i)
-{
- const bool force_done = data->input_configs[i].force_done;
- if GRAS_LIKELY(force_done) return data->inputs_done[i] and not data->inputs_available[i];
- return data->inputs_done.all() and data->inputs_available.none();
-}
-
GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void)
{
return (
diff --git a/lib/gras_impl/block_data.hpp b/lib/gras_impl/block_data.hpp
index 8697d48..cbc657e 100644
--- a/lib/gras_impl/block_data.hpp
+++ b/lib/gras_impl/block_data.hpp
@@ -51,6 +51,7 @@ struct BlockData
BitSet inputs_done;
BitSet outputs_done;
std::set<Token> token_pool;
+ boost::system_time first_input_done_time;
//buffer queues and ready conditions
InputBufferQueues input_queues;
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index 89e1d34..3027257 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -63,6 +63,12 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther
MESSAGE_TRACER();
const size_t index = message.index;
+ //record time of the first input declared done
+ if (not data->inputs_done.any())
+ {
+ data->first_input_done_time = boost::get_system_time();
+ }
+
//an upstream block declared itself done, recheck the token
data->inputs_done.set(index, data->input_tokens[index].unique());
@@ -70,8 +76,15 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther
ta.done();
this->task_main();
- //now recheck the status, mark block done if the input is done
- if (this->is_input_done(index)) this->mark_done();
+ //Now check the status, mark block done if the input is done:
+ //When reserve_items is non zero, this ports is a sync input;
+ //mark the block done if the sync input will never be ready again.
+ //Otherwise, only mark done if all inputs are done with no data.
+ const size_t reserve_items = data->input_configs[index].reserve_items;
+ if (
+ (reserve_items != 0 and data->inputs_done[index] and not data->inputs_available[index])
+ or (reserve_items == 0 and data->inputs_done.all() and data->inputs_available.none())
+ ) this->mark_done();
}
void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address)
diff --git a/lib/task_main.cpp b/lib/task_main.cpp
index 2e496fc..5547b08 100644
--- a/lib/task_main.cpp
+++ b/lib/task_main.cpp
@@ -193,15 +193,6 @@ void BlockActor::task_main(void)
data->total_items_produced[i] += data->num_output_items_read[i];
}
- //now recheck the status, mark block done if an input is done
- if GRAS_UNLIKELY(data->inputs_done.any())
- {
- for (size_t i = 0; i < num_inputs; i++)
- {
- if (this->is_input_done(i)) this->mark_done();
- }
- }
-
//still have IO ready? kick off another task
this->task_kicker();
}
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 515d898..74d9523 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -90,6 +90,9 @@ GRAS_FORCE_INLINE void wait_thread_yield(void)
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
+static const boost::posix_time::time_duration CHECK_DONE_INTERVAL = boost::posix_time::milliseconds(100);
+static const boost::posix_time::time_duration INPUT_DONE_GRACE_PERIOD = boost::posix_time::milliseconds(100);
+
void TopBlock::wait(void)
{
//We do not need to join "special" threads;
@@ -99,7 +102,7 @@ void TopBlock::wait(void)
//(*this)->thread_group->join_all();
//QA lockup detection setup
- const bool lockup_debug = getenv("GRAS_LOCKUP_DEBUG") != NULL;
+ bool lockup_debug = getenv("GRAS_LOCKUP_DEBUG") != NULL;
boost::system_time check_done_time = boost::get_system_time();
bool has_a_done = false;
@@ -107,19 +110,31 @@ void TopBlock::wait(void)
while (not (*this)->token.unique())
{
wait_thread_yield();
- if (lockup_debug and boost::get_system_time() > check_done_time)
+
+ //determine if we should check on the done status
+ if (boost::get_system_time() < check_done_time) continue;
+ check_done_time += CHECK_DONE_INTERVAL;
+
+ //optional dot print to understand lockup condition
+ if (has_a_done and lockup_debug)
{
- if (has_a_done)
- {
- std::cerr << this->query("{\"path\":\"/topology.dot\"}") << std::endl;
- check_done_time += boost::posix_time::seconds(3);
- }
- BOOST_FOREACH(Apology::Worker *w, (*this)->executor->get_workers())
+ std::cerr << TopBlock::query("{\"path\":\"/topology.dot\"}") << std::endl;
+ lockup_debug = false;
+ }
+
+ //loop through blocks looking for non-done blocks with done inputs
+ BOOST_FOREACH(Apology::Worker *w, (*this)->executor->get_workers())
+ {
+ BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor());
+ if (actor->data->block_state == BLOCK_STATE_DONE) has_a_done = true;
+ if (actor->data->inputs_done.size() and actor->data->inputs_done.any())
{
- BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor());
- if (actor->data->block_state == BLOCK_STATE_DONE) has_a_done = true;
+ const boost::system_time grace_over = actor->data->first_input_done_time + INPUT_DONE_GRACE_PERIOD;
+ if ((not lockup_debug) and (boost::get_system_time() > grace_over))
+ {
+ actor->GetFramework().Send(TopInertMessage(), Theron::Address::Null(), actor->GetAddress());
+ }
}
- check_done_time += boost::posix_time::seconds(2);
}
}
}