summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/block.cpp1
-rw-r--r--lib/block_handlers.cpp7
-rw-r--r--lib/block_props.cpp7
-rw-r--r--lib/element_impl.hpp14
-rw-r--r--lib/gras_impl/block_actor.hpp22
-rw-r--r--lib/gras_impl/messages.hpp4
-rw-r--r--lib/top_block.cpp6
-rw-r--r--lib/top_block_query.cpp5
8 files changed, 31 insertions, 35 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 5bb959b..6de4208 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -32,6 +32,7 @@ Block::Block(const std::string &name):
Element(name)
{
(*this)->block.reset(new BlockActor());
+ (*this)->block->prio_token = Token::make();
(*this)->thread_pool = (*this)->block->thread_pool; //ref copy of pool
(*this)->block->name = name; //for debug purposes
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 1f744f4..0be6b85 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -34,8 +34,6 @@ void BlockActor::handle_top_inert(
this->mark_done();
this->Send(0, from); //ACK
-
- this->highPrioAck();
}
void BlockActor::handle_top_token(
@@ -157,5 +155,8 @@ void BlockActor::handle_get_stats(
message.stats_time = time_now();
this->Send(message, from); //ACK
- this->highPrioAck();
+
+ //work could have been skipped by a high prio msg
+ //forcefully kick the task to recheck in a new call
+ this->Send(SelfKickMessage(), this->GetAddress());
}
diff --git a/lib/block_props.cpp b/lib/block_props.cpp
index 12ab339..8e68434 100644
--- a/lib/block_props.cpp
+++ b/lib/block_props.cpp
@@ -37,7 +37,10 @@ void BlockActor::handle_prop_access(
//send the reply
this->Send(reply, from); //ACK
- this->highPrioAck();
+
+ //work could have been skipped by a high prio msg
+ //forcefully kick the task to recheck in a new call
+ this->Send(SelfKickMessage(), this->GetAddress());
}
PMCC Block::_handle_prop_access(const std::string &key, const PMCC &value, const bool set)
@@ -81,11 +84,11 @@ static PMCC prop_access_dispatcher(ActorType &actor, const std::string &key, con
{
PropAccessReceiver receiver;
PropAccessMessage message;
+ message.prio_token = actor->prio_token;
message.set = set;
message.key = key;
message.value = value;
actor->Push(message, receiver.GetAddress());
- actor->highPrioPreNotify();
receiver.Wait();
if (not receiver.message.error.empty())
{
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 3148d93..727e756 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -51,12 +51,20 @@ struct ElementImpl
return topology.get();
}
- //call this before sending a high prio message to all workers
- void pre_post_all_set_prio(void)
+ template <typename MessageType>
+ void bcast_prio_msg(const MessageType &msg)
{
+ Theron::Receiver receiver;
BOOST_FOREACH(Apology::Worker *worker, this->executor->get_workers())
{
- dynamic_cast<BlockActor *>(worker)->highPrioPreNotify();
+ MessageType message = msg;
+ message.prio_token = dynamic_cast<BlockActor *>(worker)->prio_token;
+ worker->Push(message, receiver.GetAddress());
+ }
+ size_t outstandingCount(this->executor->get_workers().size());
+ while (outstandingCount != 0)
+ {
+ outstandingCount -= receiver.Wait(outstandingCount);
}
}
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 250b4c6..00f48f4 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -16,7 +16,6 @@
#include <gras_impl/output_buffer_queues.hpp>
#include <gras_impl/input_buffer_queues.hpp>
#include <gras_impl/interruptible_thread.hpp>
-#include <Theron/Detail/Threading/Atomic.h>
#include <vector>
#include <set>
#include <map>
@@ -31,24 +30,7 @@ struct BlockActor : Apology::Worker
Block *block_ptr;
std::string name; //for debug
ThreadPool thread_pool;
-
- //! Priority count to hack in high-prio worker queue
- Theron::Detail::Atomic::UInt32 prioCount;
- GRAS_FORCE_INLINE void highPrioPreNotify(void)
- {
- this->prioCount.Increment();
- }
- GRAS_FORCE_INLINE void highPrioAck(void)
- {
- ASSERT(this->prioCount.Load() != 0);
- this->prioCount.Decrement();
- this->handle_task();
- }
- GRAS_FORCE_INLINE bool hasHighPrioMsg(void)
- {
- //high prio when the count is positive and there are enqueued messages (avoids race)
- return this->prioCount.Load() and this->GetNumQueuedMessages();
- }
+ Token prio_token;
//do it here so we can match w/ the handler declarations
void register_handlers(void)
@@ -146,7 +128,7 @@ struct BlockActor : Apology::Worker
GRAS_FORCE_INLINE bool is_work_allowed(void)
{
return (
- not this->hasHighPrioMsg() and
+ this->prio_token.unique() and
this->block_state == BLOCK_STATE_LIVE and
this->inputs_available.any() and
this->input_queues.all_ready() and
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index 15a7c2e..a959bef 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -30,7 +30,7 @@ struct TopActiveMessage
struct TopInertMessage
{
- //empty
+ Token prio_token;
};
struct TopTokenMessage
@@ -133,6 +133,7 @@ struct OutputUpdateMessage
struct PropAccessMessage
{
+ Token prio_token;
bool set;
std::string key;
PMCC value;
@@ -146,6 +147,7 @@ struct SelfKickMessage
struct GetStatsMessage
{
+ Token prio_token;
std::string block_id;
BlockStats stats;
time_ticks_t stats_time;
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 5370920..9a402f4 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -37,8 +37,7 @@ TopBlock::~TopBlock(void)
void ElementImpl::top_block_cleanup(void)
{
- this->pre_post_all_set_prio();
- this->executor->post_all(TopInertMessage());
+ this->bcast_prio_msg(TopInertMessage());
this->topology->clear_all();
this->executor->commit();
if (ARMAGEDDON) std::cerr
@@ -93,8 +92,7 @@ void TopBlock::stop(void)
(*this)->thread_group->interrupt_all();
//message all blocks to mark done
- (*this)->pre_post_all_set_prio();
- (*this)->executor->post_all(TopInertMessage());
+ (*this)->bcast_prio_msg(TopInertMessage());
}
void TopBlock::run(void)
diff --git a/lib/top_block_query.cpp b/lib/top_block_query.cpp
index 96e2847..09a3d10 100644
--- a/lib/top_block_query.cpp
+++ b/lib/top_block_query.cpp
@@ -75,8 +75,9 @@ static std::string query_stats(ElementImpl *self, const boost::property_tree::pt
if (std::find(block_ids.begin(), block_ids.end(), id) == block_ids.end()) continue;
//send a message to the block's actor to query stats
- dynamic_cast<BlockActor *>(worker)->highPrioPreNotify();
- worker->Push(GetStatsMessage(), receiver.GetAddress());
+ GetStatsMessage message;
+ message.prio_token = dynamic_cast<BlockActor *>(worker)->prio_token;
+ worker->Push(message, receiver.GetAddress());
outstandingCount++;
}
while (outstandingCount) outstandingCount -= receiver.Wait(outstandingCount);