summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2012-11-23 11:29:06 -0800
committerJosh Blum2012-11-23 11:29:06 -0800
commit45dd52310ad88b6ea249b310178599a892427648 (patch)
tree19777569e5349a100cd60d3b837ab5949650117c
parent55c68e6f7a2703bfd6566b0b26151e196d7ea62d (diff)
downloadsandhi-45dd52310ad88b6ea249b310178599a892427648.tar.gz
sandhi-45dd52310ad88b6ea249b310178599a892427648.tar.bz2
sandhi-45dd52310ad88b6ea249b310178599a892427648.zip
async atomic notification of stop condition
-rw-r--r--lib/block_handlers.cpp3
-rw-r--r--lib/block_task.cpp10
-rw-r--r--lib/element_impl.hpp1
-rw-r--r--lib/gras_impl/block_actor.hpp1
-rw-r--r--lib/gras_impl/messages.hpp2
-rw-r--r--lib/top_block.cpp9
6 files changed, 23 insertions, 3 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 7e07362..3d66ec9 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -8,7 +8,7 @@ using namespace gras;
void BlockActor::handle_top_active(
- const TopActiveMessage &,
+ const TopActiveMessage &message,
const Theron::Address from
){
MESSAGE_TRACER();
@@ -18,6 +18,7 @@ void BlockActor::handle_top_active(
this->block_ptr->start();
}
this->block_state = BLOCK_STATE_LIVE;
+ this->active_token = message.token;
this->Push(SelfKickMessage(), Theron::Address());
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 7f40f47..0af7cb1 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -113,6 +113,16 @@ void BlockActor::handle_task(void)
this->output_queues.all_ready()
)) return;
+ //------------------------------------------------------------------
+ //-- Asynchronous notification through atomic variable
+ //-- that the executor has instructed workers to stop.
+ //------------------------------------------------------------------
+ if (active_token.expired())
+ {
+ this->mark_done();
+ return;
+ }
+
const size_t num_inputs = this->get_num_inputs();
const size_t num_outputs = this->get_num_outputs();
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 3f4b092..6376b89 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -31,6 +31,7 @@ struct ElementImpl
//top block stuff
SharedThreadGroup thread_group;
Token token;
+ Token active_token;
GlobalBlockConfig top_config;
//things may be in this element
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 932b6b5..e44fbe6 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -149,6 +149,7 @@ struct BlockActor : Apology::Worker
BLOCK_STATE_LIVE,
BLOCK_STATE_DONE,
} block_state;
+ WeakToken active_token;
long buffer_affinity;
std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index 7f3c337..bc6a2b8 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -23,7 +23,7 @@ struct TopAllocMessage
struct TopActiveMessage
{
- //empty
+ Token token;
};
struct TopInertMessage
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 345c46b..4f4ca67 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -76,12 +76,19 @@ void TopBlock::start(void)
(*this)->executor->post_all(TopAllocMessage());
}
{
- (*this)->executor->post_all(TopActiveMessage());
+ TopActiveMessage message;
+ message.token = Token::make();
+ (*this)->active_token = message.token;
+ (*this)->executor->post_all(message);
}
}
void TopBlock::stop(void)
{
+ //reset only reference to active token
+ //workers about to call work see expired
+ (*this)->active_token.reset();
+
//interrupt these "special" threads
(*this)->thread_group->interrupt_all();