From 50f8123fc082ff9b1666277175e6f79f666404e5 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Wed, 10 Apr 2013 02:57:11 -0500 Subject: gras: switched to using token for prio messages See #70 for more details --- lib/block.cpp | 1 + lib/block_handlers.cpp | 7 ++++--- lib/block_props.cpp | 7 +++++-- lib/element_impl.hpp | 14 +++++++++++--- lib/gras_impl/block_actor.hpp | 22 ++-------------------- lib/gras_impl/messages.hpp | 4 +++- lib/top_block.cpp | 6 ++---- lib/top_block_query.cpp | 5 +++-- 8 files changed, 31 insertions(+), 35 deletions(-) diff --git a/lib/block.cpp b/lib/block.cpp index 5bb959b..6de4208 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -32,6 +32,7 @@ Block::Block(const std::string &name): Element(name) { (*this)->block.reset(new BlockActor()); + (*this)->block->prio_token = Token::make(); (*this)->thread_pool = (*this)->block->thread_pool; //ref copy of pool (*this)->block->name = name; //for debug purposes diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 1f744f4..0be6b85 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -34,8 +34,6 @@ void BlockActor::handle_top_inert( this->mark_done(); this->Send(0, from); //ACK - - this->highPrioAck(); } void BlockActor::handle_top_token( @@ -157,5 +155,8 @@ void BlockActor::handle_get_stats( message.stats_time = time_now(); this->Send(message, from); //ACK - this->highPrioAck(); + + //work could have been skipped by a high prio msg + //forcefully kick the task to recheck in a new call + this->Send(SelfKickMessage(), this->GetAddress()); } diff --git a/lib/block_props.cpp b/lib/block_props.cpp index 12ab339..8e68434 100644 --- a/lib/block_props.cpp +++ b/lib/block_props.cpp @@ -37,7 +37,10 @@ void BlockActor::handle_prop_access( //send the reply this->Send(reply, from); //ACK - this->highPrioAck(); + + //work could have been skipped by a high prio msg + //forcefully kick the task to recheck in a new call + this->Send(SelfKickMessage(), this->GetAddress()); } PMCC Block::_handle_prop_access(const std::string &key, const PMCC &value, const bool set) @@ -81,11 +84,11 @@ static PMCC prop_access_dispatcher(ActorType &actor, const std::string &key, con { PropAccessReceiver receiver; PropAccessMessage message; + message.prio_token = actor->prio_token; message.set = set; message.key = key; message.value = value; actor->Push(message, receiver.GetAddress()); - actor->highPrioPreNotify(); receiver.Wait(); if (not receiver.message.error.empty()) { diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 3148d93..727e756 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -51,12 +51,20 @@ struct ElementImpl return topology.get(); } - //call this before sending a high prio message to all workers - void pre_post_all_set_prio(void) + template + void bcast_prio_msg(const MessageType &msg) { + Theron::Receiver receiver; BOOST_FOREACH(Apology::Worker *worker, this->executor->get_workers()) { - dynamic_cast(worker)->highPrioPreNotify(); + MessageType message = msg; + message.prio_token = dynamic_cast(worker)->prio_token; + worker->Push(message, receiver.GetAddress()); + } + size_t outstandingCount(this->executor->get_workers().size()); + while (outstandingCount != 0) + { + outstandingCount -= receiver.Wait(outstandingCount); } } diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 250b4c6..00f48f4 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -31,24 +30,7 @@ struct BlockActor : Apology::Worker Block *block_ptr; std::string name; //for debug ThreadPool thread_pool; - - //! Priority count to hack in high-prio worker queue - Theron::Detail::Atomic::UInt32 prioCount; - GRAS_FORCE_INLINE void highPrioPreNotify(void) - { - this->prioCount.Increment(); - } - GRAS_FORCE_INLINE void highPrioAck(void) - { - ASSERT(this->prioCount.Load() != 0); - this->prioCount.Decrement(); - this->handle_task(); - } - GRAS_FORCE_INLINE bool hasHighPrioMsg(void) - { - //high prio when the count is positive and there are enqueued messages (avoids race) - return this->prioCount.Load() and this->GetNumQueuedMessages(); - } + Token prio_token; //do it here so we can match w/ the handler declarations void register_handlers(void) @@ -146,7 +128,7 @@ struct BlockActor : Apology::Worker GRAS_FORCE_INLINE bool is_work_allowed(void) { return ( - not this->hasHighPrioMsg() and + this->prio_token.unique() and this->block_state == BLOCK_STATE_LIVE and this->inputs_available.any() and this->input_queues.all_ready() and diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index 15a7c2e..a959bef 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -30,7 +30,7 @@ struct TopActiveMessage struct TopInertMessage { - //empty + Token prio_token; }; struct TopTokenMessage @@ -133,6 +133,7 @@ struct OutputUpdateMessage struct PropAccessMessage { + Token prio_token; bool set; std::string key; PMCC value; @@ -146,6 +147,7 @@ struct SelfKickMessage struct GetStatsMessage { + Token prio_token; std::string block_id; BlockStats stats; time_ticks_t stats_time; diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 5370920..9a402f4 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -37,8 +37,7 @@ TopBlock::~TopBlock(void) void ElementImpl::top_block_cleanup(void) { - this->pre_post_all_set_prio(); - this->executor->post_all(TopInertMessage()); + this->bcast_prio_msg(TopInertMessage()); this->topology->clear_all(); this->executor->commit(); if (ARMAGEDDON) std::cerr @@ -93,8 +92,7 @@ void TopBlock::stop(void) (*this)->thread_group->interrupt_all(); //message all blocks to mark done - (*this)->pre_post_all_set_prio(); - (*this)->executor->post_all(TopInertMessage()); + (*this)->bcast_prio_msg(TopInertMessage()); } void TopBlock::run(void) diff --git a/lib/top_block_query.cpp b/lib/top_block_query.cpp index 96e2847..09a3d10 100644 --- a/lib/top_block_query.cpp +++ b/lib/top_block_query.cpp @@ -75,8 +75,9 @@ static std::string query_stats(ElementImpl *self, const boost::property_tree::pt if (std::find(block_ids.begin(), block_ids.end(), id) == block_ids.end()) continue; //send a message to the block's actor to query stats - dynamic_cast(worker)->highPrioPreNotify(); - worker->Push(GetStatsMessage(), receiver.GetAddress()); + GetStatsMessage message; + message.prio_token = dynamic_cast(worker)->prio_token; + worker->Push(message, receiver.GetAddress()); outstandingCount++; } while (outstandingCount) outstandingCount -= receiver.Wait(outstandingCount); -- cgit