summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2013-07-15 23:51:59 -0700
committerJosh Blum2013-07-15 23:51:59 -0700
commit6d07c50dafbe38049dbede7241e9e66eea0a699e (patch)
tree444d46efa7526a6ca0bebb4be95f878fc4f13d50 /lib
parent6f9ab7732da5731ee0db59d9623111b7af9b2e94 (diff)
downloadsandhi-6d07c50dafbe38049dbede7241e9e66eea0a699e.tar.gz
sandhi-6d07c50dafbe38049dbede7241e9e66eea0a699e.tar.bz2
sandhi-6d07c50dafbe38049dbede7241e9e66eea0a699e.zip
gras: make all start/commit messages high-prio
This makes doing live topology changes w/ commit instant, even if much buffering is backed up.
Diffstat (limited to 'lib')
-rw-r--r--lib/block_handlers.cpp17
-rw-r--r--lib/gras_impl/block_actor.hpp4
-rw-r--r--lib/gras_impl/messages.hpp26
-rw-r--r--lib/register_messages.cpp4
-rw-r--r--lib/top_block.cpp14
5 files changed, 42 insertions, 23 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index a2df81c..ca1ad97 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -22,7 +22,9 @@ void BlockActor::handle_top_active(
this->Send(0, from); //ACK
- this->task_kicker();
+ //work could have been skipped by a high prio msg
+ //forcefully kick the task to recheck in a new call
+ this->Send(SelfKickMessage(), this->GetAddress());
}
void BlockActor::handle_top_inert(
@@ -77,37 +79,38 @@ void BlockActor::handle_top_token(
}
void BlockActor::handle_top_config(
- const GlobalBlockConfig &message,
+ const TopConfigMessage &message,
const Theron::Address from
){
MESSAGE_TRACER();
+ const GlobalBlockConfig &config = message.config;
//overwrite with global config only if maxium_items is not set (zero)
for (size_t i = 0; i < data->output_configs.size(); i++)
{
if (data->output_configs[i].maximum_items == 0)
{
- data->output_configs[i].maximum_items = message.maximum_output_items;
+ data->output_configs[i].maximum_items = config.maximum_output_items;
}
}
//overwrite with global node affinity setting for buffers if not set
if (data->global_config.buffer_affinity == -1)
{
- data->global_config.buffer_affinity = message.buffer_affinity;
+ data->global_config.buffer_affinity = config.buffer_affinity;
}
//overwrite with global interruptable setting for work if not set
if (data->global_config.interruptible_work == false)
{
- data->global_config.interruptible_work = message.interruptible_work;
+ data->global_config.interruptible_work = config.interruptible_work;
}
this->Send(0, from); //ACK
}
void BlockActor::handle_top_thread_group(
- const SharedThreadGroup &message,
+ const TopThreadMessage &message,
const Theron::Address from
){
MESSAGE_TRACER();
@@ -115,7 +118,7 @@ void BlockActor::handle_top_thread_group(
//store the topology's thread group
//erase any potentially old lingering threads
//spawn a new thread if this block is a source
- data->thread_group = message;
+ data->thread_group = message.thread_group;
data->interruptible_thread.reset(); //erase old one
if (data->global_config.interruptible_work)
{
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index cfbc2c0..a64dc55 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -64,8 +64,8 @@ struct BlockActor : Theron::Actor
void handle_top_active(const TopActiveMessage &, const Theron::Address);
void handle_top_inert(const TopInertMessage &, const Theron::Address);
void handle_top_token(const TopTokenMessage &, const Theron::Address);
- void handle_top_config(const GlobalBlockConfig &, const Theron::Address);
- void handle_top_thread_group(const SharedThreadGroup &, const Theron::Address);
+ void handle_top_config(const TopConfigMessage &, const Theron::Address);
+ void handle_top_thread_group(const TopThreadMessage &, const Theron::Address);
void handle_input_tag(const InputTagMessage &, const Theron::Address);
void handle_input_msg(const InputMsgMessage &, const Theron::Address);
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index 4294ab8..263fd5d 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -9,6 +9,8 @@
#include <gras/sbuffer.hpp>
#include <gras_impl/token.hpp>
#include <gras_impl/stats.hpp>
+#include <gras/block_config.hpp>
+#include <gras_impl/interruptible_thread.hpp>
namespace gras
{
@@ -20,12 +22,12 @@ namespace gras
struct TopAllocMessage
{
- //empty
+ Token prio_token;
};
struct TopActiveMessage
{
- //empty
+ Token prio_token;
};
struct TopInertMessage
@@ -36,6 +38,19 @@ struct TopInertMessage
struct TopTokenMessage
{
Token token;
+ Token prio_token;
+};
+
+struct TopConfigMessage
+{
+ GlobalBlockConfig config;
+ Token prio_token;
+};
+
+struct TopThreadMessage
+{
+ SharedThreadGroup thread_group;
+ Token prio_token;
};
//----------------------------------------------------------------------
@@ -156,16 +171,13 @@ struct GetStatsMessage
} //namespace gras
#include <Theron/Register.h>
-#include <gras/top_block.hpp>
-#include <gras_impl/messages.hpp>
-#include <gras_impl/interruptible_thread.hpp>
THERON_DECLARE_REGISTERED_MESSAGE(gras::TopAllocMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::TopActiveMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::TopInertMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::TopTokenMessage);
-THERON_DECLARE_REGISTERED_MESSAGE(gras::GlobalBlockConfig);
-THERON_DECLARE_REGISTERED_MESSAGE(gras::SharedThreadGroup);
+THERON_DECLARE_REGISTERED_MESSAGE(gras::TopConfigMessage);
+THERON_DECLARE_REGISTERED_MESSAGE(gras::TopThreadMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputTagMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputMsgMessage);
diff --git a/lib/register_messages.cpp b/lib/register_messages.cpp
index eda73ce..e18a0a4 100644
--- a/lib/register_messages.cpp
+++ b/lib/register_messages.cpp
@@ -8,8 +8,8 @@ THERON_DEFINE_REGISTERED_MESSAGE(gras::TopAllocMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::TopActiveMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::TopInertMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::TopTokenMessage);
-THERON_DEFINE_REGISTERED_MESSAGE(gras::GlobalBlockConfig);
-THERON_DEFINE_REGISTERED_MESSAGE(gras::SharedThreadGroup);
+THERON_DEFINE_REGISTERED_MESSAGE(gras::TopConfigMessage);
+THERON_DEFINE_REGISTERED_MESSAGE(gras::TopThreadMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::InputTagMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::InputMsgMessage);
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 74d9523..0c8572d 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -50,22 +50,26 @@ void TopBlock::start(void)
{
(*this)->executor->commit();
{
- (*this)->executor->post_all((*this)->thread_group);
+ TopThreadMessage message;
+ message.thread_group = (*this)->thread_group;
+ (*this)->bcast_prio_msg(message);
}
{
TopTokenMessage message;
message.token = (*this)->token;
- (*this)->executor->post_all(message);
+ (*this)->bcast_prio_msg(message);
}
{
//send the global block config before alloc
- (*this)->executor->post_all((*this)->top_config);
+ TopConfigMessage message;
+ message.config = (*this)->top_config;
+ (*this)->bcast_prio_msg(message);
}
{
- (*this)->executor->post_all(TopAllocMessage());
+ (*this)->bcast_prio_msg(TopAllocMessage());
}
{
- (*this)->executor->post_all(TopActiveMessage());
+ (*this)->bcast_prio_msg(TopActiveMessage());
}
}