summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
m---------Apology0
-rw-r--r--lib/block_handlers.cpp5
-rw-r--r--lib/block_task.cpp10
-rw-r--r--lib/element_impl.hpp10
-rw-r--r--lib/gras_impl/block_actor.hpp8
-rw-r--r--lib/gras_impl/messages.hpp2
-rw-r--r--lib/top_block.cpp8
-rw-r--r--lib/top_block_stats.cpp12
8 files changed, 34 insertions, 21 deletions
diff --git a/Apology b/Apology
-Subproject 4e863eedc2b673b576597a38ac749b57416bb22
+Subproject 9515615c9f47b58d1cceada514bc2107d51ca0b
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index a6d49bb..7a5b0d7 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -19,7 +19,6 @@ void BlockActor::handle_top_active(
this->stats.start_time = time_now();
}
this->block_state = BLOCK_STATE_LIVE;
- this->active_token = message.token;
this->Send(0, from); //ACK
@@ -32,6 +31,8 @@ void BlockActor::handle_top_inert(
){
MESSAGE_TRACER();
+ ASSERT(this->prio_count.Load() != 0);
+ this->prio_count.Decrement();
this->mark_done();
this->Send(0, from); //ACK
@@ -134,6 +135,8 @@ void BlockActor::handle_get_stats(
const GetStatsMessage &,
const Theron::Address from
){
+ MESSAGE_TRACER();
+ this->prio_count.Decrement();
GetStatsMessage message;
message.block_id = this->block_ptr->to_string();
message.stats = this->stats;
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index a3342a6..35c0041 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -108,16 +108,6 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
if GRAS_UNLIKELY(not this->is_work_allowed()) return;
- //------------------------------------------------------------------
- //-- Asynchronous notification through atomic variable
- //-- that the executor has instructed workers to stop.
- //------------------------------------------------------------------
- if GRAS_UNLIKELY(active_token.expired())
- {
- this->mark_done();
- return;
- }
-
const size_t num_inputs = this->get_num_inputs();
const size_t num_outputs = this->get_num_outputs();
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index c707268..e127e01 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -10,6 +10,7 @@
#include <gras/block.hpp>
#include <gras_impl/token.hpp>
#include <gras_impl/interruptible_thread.hpp>
+#include <boost/foreach.hpp>
namespace gras
{
@@ -31,7 +32,6 @@ struct ElementImpl
//top block stuff
SharedThreadGroup thread_group;
Token token;
- Token active_token;
GlobalBlockConfig top_config;
//things may be in this element
@@ -45,6 +45,14 @@ struct ElementImpl
return topology.get();
}
+ //call this before sending a high prio message to all workers
+ void pre_post_all_set_prio(void)
+ {
+ BOOST_FOREACH(Apology::Worker *worker, this->executor->get_workers())
+ {
+ dynamic_cast<BlockActor *>(worker)->prio_count.Increment();
+ }
+ }
};
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index ff285e0..5e08687 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -16,6 +16,7 @@
#include <gras_impl/output_buffer_queues.hpp>
#include <gras_impl/input_buffer_queues.hpp>
#include <gras_impl/interruptible_thread.hpp>
+#include <Theron/Detail/Threading/Atomic.h>
#include <vector>
#include <set>
@@ -30,6 +31,9 @@ struct BlockActor : Apology::Worker
std::string name; //for debug
ThreadPool thread_pool;
+ //! Priority count to hack in high-prio worker queue
+ Theron::Detail::Atomic::UInt32 prio_count;
+
//do it here so we can match w/ the handler declarations
void register_handlers(void)
{
@@ -115,7 +119,10 @@ struct BlockActor : Apology::Worker
GRAS_FORCE_INLINE bool is_work_allowed(void)
{
+ //high prio when the count is positive and there are enqueued messages (avoids race)
+ const bool has_high_prio_msg = this->prio_count.Load() and this->GetNumQueuedMessages();
return (
+ not has_high_prio_msg and
this->block_state == BLOCK_STATE_LIVE and
this->input_queues.all_ready() and
this->inputs_available.any() and
@@ -167,7 +174,6 @@ struct BlockActor : Apology::Worker
BLOCK_STATE_LIVE,
BLOCK_STATE_DONE,
} block_state;
- WeakToken active_token;
long buffer_affinity;
std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index 86b6a93..6cc685b 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -25,7 +25,7 @@ struct TopAllocMessage
struct TopActiveMessage
{
- Token token;
+ //empty
};
struct TopInertMessage
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index fe06286..10d5ef2 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -32,6 +32,7 @@ TopBlock::TopBlock(const std::string &name):
void ElementImpl::top_block_cleanup(void)
{
+ this->pre_post_all_set_prio();
this->executor->post_all(TopInertMessage());
this->topology->clear_all();
this->executor->commit();
@@ -77,22 +78,17 @@ void TopBlock::start(void)
}
{
TopActiveMessage message;
- message.token = Token::make();
- (*this)->active_token = message.token;
(*this)->executor->post_all(message);
}
}
void TopBlock::stop(void)
{
- //reset only reference to active token
- //workers about to call work see expired
- (*this)->active_token.reset();
-
//interrupt these "special" threads
(*this)->thread_group->interrupt_all();
//message all blocks to mark done
+ (*this)->pre_post_all_set_prio();
(*this)->executor->post_all(TopInertMessage());
}
diff --git a/lib/top_block_stats.cpp b/lib/top_block_stats.cpp
index 76e87ad..8cbdb0c 100644
--- a/lib/top_block_stats.cpp
+++ b/lib/top_block_stats.cpp
@@ -24,8 +24,18 @@ struct GetStatsReceiver : Theron::Receiver
std::string TopBlock::get_stats(const std::string &)
{
+ //get stats with custom receiver and set high prio
GetStatsReceiver receiver;
- (*this)->executor->post_all(GetStatsMessage(), receiver);
+ size_t outstandingCount(0);
+ BOOST_FOREACH(Apology::Worker *worker, (*this)->executor->get_workers())
+ {
+ dynamic_cast<BlockActor *>(worker)->prio_count.Increment();
+ worker->Push(GetStatsMessage(), receiver.GetAddress());
+ outstandingCount++;
+ }
+ while (outstandingCount) outstandingCount -= receiver.Wait(outstandingCount);
+
+ //now format the xml result
std::string xml;
xml += str(boost::format(" <now>%llu</now>\n") % time_now());
xml += str(boost::format(" <tps>%llu</tps>\n") % time_tps());