diff options
author | Josh Blum | 2013-07-15 23:51:59 -0700 |
---|---|---|
committer | Josh Blum | 2013-07-15 23:51:59 -0700 |
commit | 6d07c50dafbe38049dbede7241e9e66eea0a699e (patch) | |
tree | 444d46efa7526a6ca0bebb4be95f878fc4f13d50 /lib | |
parent | 6f9ab7732da5731ee0db59d9623111b7af9b2e94 (diff) | |
download | sandhi-6d07c50dafbe38049dbede7241e9e66eea0a699e.tar.gz sandhi-6d07c50dafbe38049dbede7241e9e66eea0a699e.tar.bz2 sandhi-6d07c50dafbe38049dbede7241e9e66eea0a699e.zip |
gras: make all start/commit messages high-prio
This makes doing live topology changes w/ commit
instant, even if much buffering is backed up.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block_handlers.cpp | 17 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 4 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 26 | ||||
-rw-r--r-- | lib/register_messages.cpp | 4 | ||||
-rw-r--r-- | lib/top_block.cpp | 14 |
5 files changed, 42 insertions, 23 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index a2df81c..ca1ad97 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -22,7 +22,9 @@ void BlockActor::handle_top_active( this->Send(0, from); //ACK - this->task_kicker(); + //work could have been skipped by a high prio msg + //forcefully kick the task to recheck in a new call + this->Send(SelfKickMessage(), this->GetAddress()); } void BlockActor::handle_top_inert( @@ -77,37 +79,38 @@ void BlockActor::handle_top_token( } void BlockActor::handle_top_config( - const GlobalBlockConfig &message, + const TopConfigMessage &message, const Theron::Address from ){ MESSAGE_TRACER(); + const GlobalBlockConfig &config = message.config; //overwrite with global config only if maxium_items is not set (zero) for (size_t i = 0; i < data->output_configs.size(); i++) { if (data->output_configs[i].maximum_items == 0) { - data->output_configs[i].maximum_items = message.maximum_output_items; + data->output_configs[i].maximum_items = config.maximum_output_items; } } //overwrite with global node affinity setting for buffers if not set if (data->global_config.buffer_affinity == -1) { - data->global_config.buffer_affinity = message.buffer_affinity; + data->global_config.buffer_affinity = config.buffer_affinity; } //overwrite with global interruptable setting for work if not set if (data->global_config.interruptible_work == false) { - data->global_config.interruptible_work = message.interruptible_work; + data->global_config.interruptible_work = config.interruptible_work; } this->Send(0, from); //ACK } void BlockActor::handle_top_thread_group( - const SharedThreadGroup &message, + const TopThreadMessage &message, const Theron::Address from ){ MESSAGE_TRACER(); @@ -115,7 +118,7 @@ void BlockActor::handle_top_thread_group( //store the topology's thread group //erase any potentially old lingering threads //spawn a new thread if this block is a source - data->thread_group = message; + data->thread_group = message.thread_group; data->interruptible_thread.reset(); //erase old one if (data->global_config.interruptible_work) { diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index cfbc2c0..a64dc55 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -64,8 +64,8 @@ struct BlockActor : Theron::Actor void handle_top_active(const TopActiveMessage &, const Theron::Address); void handle_top_inert(const TopInertMessage &, const Theron::Address); void handle_top_token(const TopTokenMessage &, const Theron::Address); - void handle_top_config(const GlobalBlockConfig &, const Theron::Address); - void handle_top_thread_group(const SharedThreadGroup &, const Theron::Address); + void handle_top_config(const TopConfigMessage &, const Theron::Address); + void handle_top_thread_group(const TopThreadMessage &, const Theron::Address); void handle_input_tag(const InputTagMessage &, const Theron::Address); void handle_input_msg(const InputMsgMessage &, const Theron::Address); diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index 4294ab8..263fd5d 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -9,6 +9,8 @@ #include <gras/sbuffer.hpp> #include <gras_impl/token.hpp> #include <gras_impl/stats.hpp> +#include <gras/block_config.hpp> +#include <gras_impl/interruptible_thread.hpp> namespace gras { @@ -20,12 +22,12 @@ namespace gras struct TopAllocMessage { - //empty + Token prio_token; }; struct TopActiveMessage { - //empty + Token prio_token; }; struct TopInertMessage @@ -36,6 +38,19 @@ struct TopInertMessage struct TopTokenMessage { Token token; + Token prio_token; +}; + +struct TopConfigMessage +{ + GlobalBlockConfig config; + Token prio_token; +}; + +struct TopThreadMessage +{ + SharedThreadGroup thread_group; + Token prio_token; }; //---------------------------------------------------------------------- @@ -156,16 +171,13 @@ struct GetStatsMessage } //namespace gras #include <Theron/Register.h> -#include <gras/top_block.hpp> -#include <gras_impl/messages.hpp> -#include <gras_impl/interruptible_thread.hpp> THERON_DECLARE_REGISTERED_MESSAGE(gras::TopAllocMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::TopActiveMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::TopInertMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::TopTokenMessage); -THERON_DECLARE_REGISTERED_MESSAGE(gras::GlobalBlockConfig); -THERON_DECLARE_REGISTERED_MESSAGE(gras::SharedThreadGroup); +THERON_DECLARE_REGISTERED_MESSAGE(gras::TopConfigMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gras::TopThreadMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputTagMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputMsgMessage); diff --git a/lib/register_messages.cpp b/lib/register_messages.cpp index eda73ce..e18a0a4 100644 --- a/lib/register_messages.cpp +++ b/lib/register_messages.cpp @@ -8,8 +8,8 @@ THERON_DEFINE_REGISTERED_MESSAGE(gras::TopAllocMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::TopActiveMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::TopInertMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::TopTokenMessage); -THERON_DEFINE_REGISTERED_MESSAGE(gras::GlobalBlockConfig); -THERON_DEFINE_REGISTERED_MESSAGE(gras::SharedThreadGroup); +THERON_DEFINE_REGISTERED_MESSAGE(gras::TopConfigMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gras::TopThreadMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::InputTagMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::InputMsgMessage); diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 74d9523..0c8572d 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -50,22 +50,26 @@ void TopBlock::start(void) { (*this)->executor->commit(); { - (*this)->executor->post_all((*this)->thread_group); + TopThreadMessage message; + message.thread_group = (*this)->thread_group; + (*this)->bcast_prio_msg(message); } { TopTokenMessage message; message.token = (*this)->token; - (*this)->executor->post_all(message); + (*this)->bcast_prio_msg(message); } { //send the global block config before alloc - (*this)->executor->post_all((*this)->top_config); + TopConfigMessage message; + message.config = (*this)->top_config; + (*this)->bcast_prio_msg(message); } { - (*this)->executor->post_all(TopAllocMessage()); + (*this)->bcast_prio_msg(TopAllocMessage()); } { - (*this)->executor->post_all(TopActiveMessage()); + (*this)->bcast_prio_msg(TopActiveMessage()); } } |