summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2013-06-20 19:48:29 -0700
committerJosh Blum2013-06-20 19:48:29 -0700
commit37bd56f3301795e89294c048883324b0353237ef (patch)
tree77d1a9066c0f416c6758eab154dcf2706e197595 /lib
parent630a272e6725a547327366e543106e63c23fd816 (diff)
downloadsandhi-37bd56f3301795e89294c048883324b0353237ef.tar.gz
sandhi-37bd56f3301795e89294c048883324b0353237ef.tar.bz2
sandhi-37bd56f3301795e89294c048883324b0353237ef.zip
gras: work on goddamn done logic
How blocks mark themselves done has been one of the most annoying things in this development. This done logic is only valuable for QA tests, it doesnt even exist in the practical use case. How it works now: * blocks mark done when sync inputs are done or all inputs are done * removed the force_done input config, its no longer needed * the wait() implementation gives blocks a grace period between an input becoming done and the block itself becoming done. After the grace period, the block is forced done.
Diffstat (limited to 'lib')
-rw-r--r--lib/block_config.cpp1
-rw-r--r--lib/block_handlers.cpp2
-rw-r--r--lib/gras_impl/block_actor.hpp8
-rw-r--r--lib/gras_impl/block_data.hpp1
-rw-r--r--lib/input_handlers.cpp17
-rw-r--r--lib/task_main.cpp9
-rw-r--r--lib/top_block.cpp37
7 files changed, 43 insertions, 32 deletions
diff --git a/lib/block_config.cpp b/lib/block_config.cpp
index 04d37f1..165ab47 100644
--- a/lib/block_config.cpp
+++ b/lib/block_config.cpp
@@ -18,7 +18,6 @@ InputPortConfig::InputPortConfig(void)
maximum_items = 0;
inline_buffer = false;
preload_items = 0;
- force_done = true;
}
OutputPortConfig::OutputPortConfig(void)
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 4cb03ab..a2df81c 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -33,7 +33,7 @@ void BlockActor::handle_top_inert(
this->mark_done();
- this->Send(0, from); //ACK
+ if (from != Theron::Address::Null()) this->Send(0, from); //ACK
}
void BlockActor::handle_top_token(
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 8c9fdf4..840e8c2 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -95,7 +95,6 @@ struct BlockActor : Theron::Actor
void consume(const size_t index, const size_t items);
void task_kicker(void);
void update_input_avail(const size_t index);
- bool is_input_done(const size_t index);
bool is_work_allowed(void);
//work helpers
@@ -123,13 +122,6 @@ GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i)
data->input_queues.update_has_msg(i, has_input_msgs);
}
-GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i)
-{
- const bool force_done = data->input_configs[i].force_done;
- if GRAS_LIKELY(force_done) return data->inputs_done[i] and not data->inputs_available[i];
- return data->inputs_done.all() and data->inputs_available.none();
-}
-
GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void)
{
return (
diff --git a/lib/gras_impl/block_data.hpp b/lib/gras_impl/block_data.hpp
index 8697d48..cbc657e 100644
--- a/lib/gras_impl/block_data.hpp
+++ b/lib/gras_impl/block_data.hpp
@@ -51,6 +51,7 @@ struct BlockData
BitSet inputs_done;
BitSet outputs_done;
std::set<Token> token_pool;
+ boost::system_time first_input_done_time;
//buffer queues and ready conditions
InputBufferQueues input_queues;
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index 89e1d34..3027257 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -63,6 +63,12 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther
MESSAGE_TRACER();
const size_t index = message.index;
+ //record time of the first input declared done
+ if (not data->inputs_done.any())
+ {
+ data->first_input_done_time = boost::get_system_time();
+ }
+
//an upstream block declared itself done, recheck the token
data->inputs_done.set(index, data->input_tokens[index].unique());
@@ -70,8 +76,15 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther
ta.done();
this->task_main();
- //now recheck the status, mark block done if the input is done
- if (this->is_input_done(index)) this->mark_done();
+ //Now check the status, mark block done if the input is done:
+ //When reserve_items is non zero, this ports is a sync input;
+ //mark the block done if the sync input will never be ready again.
+ //Otherwise, only mark done if all inputs are done with no data.
+ const size_t reserve_items = data->input_configs[index].reserve_items;
+ if (
+ (reserve_items != 0 and data->inputs_done[index] and not data->inputs_available[index])
+ or (reserve_items == 0 and data->inputs_done.all() and data->inputs_available.none())
+ ) this->mark_done();
}
void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address)
diff --git a/lib/task_main.cpp b/lib/task_main.cpp
index 2e496fc..5547b08 100644
--- a/lib/task_main.cpp
+++ b/lib/task_main.cpp
@@ -193,15 +193,6 @@ void BlockActor::task_main(void)
data->total_items_produced[i] += data->num_output_items_read[i];
}
- //now recheck the status, mark block done if an input is done
- if GRAS_UNLIKELY(data->inputs_done.any())
- {
- for (size_t i = 0; i < num_inputs; i++)
- {
- if (this->is_input_done(i)) this->mark_done();
- }
- }
-
//still have IO ready? kick off another task
this->task_kicker();
}
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 515d898..74d9523 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -90,6 +90,9 @@ GRAS_FORCE_INLINE void wait_thread_yield(void)
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
+static const boost::posix_time::time_duration CHECK_DONE_INTERVAL = boost::posix_time::milliseconds(100);
+static const boost::posix_time::time_duration INPUT_DONE_GRACE_PERIOD = boost::posix_time::milliseconds(100);
+
void TopBlock::wait(void)
{
//We do not need to join "special" threads;
@@ -99,7 +102,7 @@ void TopBlock::wait(void)
//(*this)->thread_group->join_all();
//QA lockup detection setup
- const bool lockup_debug = getenv("GRAS_LOCKUP_DEBUG") != NULL;
+ bool lockup_debug = getenv("GRAS_LOCKUP_DEBUG") != NULL;
boost::system_time check_done_time = boost::get_system_time();
bool has_a_done = false;
@@ -107,19 +110,31 @@ void TopBlock::wait(void)
while (not (*this)->token.unique())
{
wait_thread_yield();
- if (lockup_debug and boost::get_system_time() > check_done_time)
+
+ //determine if we should check on the done status
+ if (boost::get_system_time() < check_done_time) continue;
+ check_done_time += CHECK_DONE_INTERVAL;
+
+ //optional dot print to understand lockup condition
+ if (has_a_done and lockup_debug)
{
- if (has_a_done)
- {
- std::cerr << this->query("{\"path\":\"/topology.dot\"}") << std::endl;
- check_done_time += boost::posix_time::seconds(3);
- }
- BOOST_FOREACH(Apology::Worker *w, (*this)->executor->get_workers())
+ std::cerr << TopBlock::query("{\"path\":\"/topology.dot\"}") << std::endl;
+ lockup_debug = false;
+ }
+
+ //loop through blocks looking for non-done blocks with done inputs
+ BOOST_FOREACH(Apology::Worker *w, (*this)->executor->get_workers())
+ {
+ BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor());
+ if (actor->data->block_state == BLOCK_STATE_DONE) has_a_done = true;
+ if (actor->data->inputs_done.size() and actor->data->inputs_done.any())
{
- BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor());
- if (actor->data->block_state == BLOCK_STATE_DONE) has_a_done = true;
+ const boost::system_time grace_over = actor->data->first_input_done_time + INPUT_DONE_GRACE_PERIOD;
+ if ((not lockup_debug) and (boost::get_system_time() > grace_over))
+ {
+ actor->GetFramework().Send(TopInertMessage(), Theron::Address::Null(), actor->GetAddress());
+ }
}
- check_done_time += boost::posix_time::seconds(2);
}
}
}