diff options
m--------- | Apology | 0 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 5 | ||||
-rw-r--r-- | lib/block_task.cpp | 10 | ||||
-rw-r--r-- | lib/element_impl.hpp | 10 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 8 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 2 | ||||
-rw-r--r-- | lib/top_block.cpp | 8 | ||||
-rw-r--r-- | lib/top_block_stats.cpp | 12 |
8 files changed, 34 insertions, 21 deletions
diff --git a/Apology b/Apology -Subproject 4e863eedc2b673b576597a38ac749b57416bb22 +Subproject 9515615c9f47b58d1cceada514bc2107d51ca0b diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index a6d49bb..7a5b0d7 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -19,7 +19,6 @@ void BlockActor::handle_top_active( this->stats.start_time = time_now(); } this->block_state = BLOCK_STATE_LIVE; - this->active_token = message.token; this->Send(0, from); //ACK @@ -32,6 +31,8 @@ void BlockActor::handle_top_inert( ){ MESSAGE_TRACER(); + ASSERT(this->prio_count.Load() != 0); + this->prio_count.Decrement(); this->mark_done(); this->Send(0, from); //ACK @@ -134,6 +135,8 @@ void BlockActor::handle_get_stats( const GetStatsMessage &, const Theron::Address from ){ + MESSAGE_TRACER(); + this->prio_count.Decrement(); GetStatsMessage message; message.block_id = this->block_ptr->to_string(); message.stats = this->stats; diff --git a/lib/block_task.cpp b/lib/block_task.cpp index a3342a6..35c0041 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -108,16 +108,6 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ if GRAS_UNLIKELY(not this->is_work_allowed()) return; - //------------------------------------------------------------------ - //-- Asynchronous notification through atomic variable - //-- that the executor has instructed workers to stop. - //------------------------------------------------------------------ - if GRAS_UNLIKELY(active_token.expired()) - { - this->mark_done(); - return; - } - const size_t num_inputs = this->get_num_inputs(); const size_t num_outputs = this->get_num_outputs(); diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index c707268..e127e01 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -10,6 +10,7 @@ #include <gras/block.hpp> #include <gras_impl/token.hpp> #include <gras_impl/interruptible_thread.hpp> +#include <boost/foreach.hpp> namespace gras { @@ -31,7 +32,6 @@ struct ElementImpl //top block stuff SharedThreadGroup thread_group; Token token; - Token active_token; GlobalBlockConfig top_config; //things may be in this element @@ -45,6 +45,14 @@ struct ElementImpl return topology.get(); } + //call this before sending a high prio message to all workers + void pre_post_all_set_prio(void) + { + BOOST_FOREACH(Apology::Worker *worker, this->executor->get_workers()) + { + dynamic_cast<BlockActor *>(worker)->prio_count.Increment(); + } + } }; diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index ff285e0..5e08687 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -16,6 +16,7 @@ #include <gras_impl/output_buffer_queues.hpp> #include <gras_impl/input_buffer_queues.hpp> #include <gras_impl/interruptible_thread.hpp> +#include <Theron/Detail/Threading/Atomic.h> #include <vector> #include <set> @@ -30,6 +31,9 @@ struct BlockActor : Apology::Worker std::string name; //for debug ThreadPool thread_pool; + //! Priority count to hack in high-prio worker queue + Theron::Detail::Atomic::UInt32 prio_count; + //do it here so we can match w/ the handler declarations void register_handlers(void) { @@ -115,7 +119,10 @@ struct BlockActor : Apology::Worker GRAS_FORCE_INLINE bool is_work_allowed(void) { + //high prio when the count is positive and there are enqueued messages (avoids race) + const bool has_high_prio_msg = this->prio_count.Load() and this->GetNumQueuedMessages(); return ( + not has_high_prio_msg and this->block_state == BLOCK_STATE_LIVE and this->input_queues.all_ready() and this->inputs_available.any() and @@ -167,7 +174,6 @@ struct BlockActor : Apology::Worker BLOCK_STATE_LIVE, BLOCK_STATE_DONE, } block_state; - WeakToken active_token; long buffer_affinity; std::vector<std::vector<OutputHintMessage> > output_allocation_hints; diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index 86b6a93..6cc685b 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -25,7 +25,7 @@ struct TopAllocMessage struct TopActiveMessage { - Token token; + //empty }; struct TopInertMessage diff --git a/lib/top_block.cpp b/lib/top_block.cpp index fe06286..10d5ef2 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -32,6 +32,7 @@ TopBlock::TopBlock(const std::string &name): void ElementImpl::top_block_cleanup(void) { + this->pre_post_all_set_prio(); this->executor->post_all(TopInertMessage()); this->topology->clear_all(); this->executor->commit(); @@ -77,22 +78,17 @@ void TopBlock::start(void) } { TopActiveMessage message; - message.token = Token::make(); - (*this)->active_token = message.token; (*this)->executor->post_all(message); } } void TopBlock::stop(void) { - //reset only reference to active token - //workers about to call work see expired - (*this)->active_token.reset(); - //interrupt these "special" threads (*this)->thread_group->interrupt_all(); //message all blocks to mark done + (*this)->pre_post_all_set_prio(); (*this)->executor->post_all(TopInertMessage()); } diff --git a/lib/top_block_stats.cpp b/lib/top_block_stats.cpp index 76e87ad..8cbdb0c 100644 --- a/lib/top_block_stats.cpp +++ b/lib/top_block_stats.cpp @@ -24,8 +24,18 @@ struct GetStatsReceiver : Theron::Receiver std::string TopBlock::get_stats(const std::string &) { + //get stats with custom receiver and set high prio GetStatsReceiver receiver; - (*this)->executor->post_all(GetStatsMessage(), receiver); + size_t outstandingCount(0); + BOOST_FOREACH(Apology::Worker *worker, (*this)->executor->get_workers()) + { + dynamic_cast<BlockActor *>(worker)->prio_count.Increment(); + worker->Push(GetStatsMessage(), receiver.GetAddress()); + outstandingCount++; + } + while (outstandingCount) outstandingCount -= receiver.Wait(outstandingCount); + + //now format the xml result std::string xml; xml += str(boost::format(" <now>%llu</now>\n") % time_now()); xml += str(boost::format(" <tps>%llu</tps>\n") % time_tps()); |