diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/CMakeLists.txt | 4 | ||||
-rw-r--r-- | lib/block.cpp | 2 | ||||
-rw-r--r-- | lib/block_actor.cpp | 7 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 26 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 222 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 60 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 9 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 102 | ||||
-rw-r--r-- | lib/port_handlers.cpp | 12 | ||||
-rw-r--r-- | lib/top_block.cpp | 30 |
10 files changed, 308 insertions, 166 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index bc5115e..ffc43f1 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -50,8 +50,8 @@ list(APPEND gnuradio_core_sources ${CMAKE_CURRENT_SOURCE_DIR}/block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp #${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp - #${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp - #${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp #${CMAKE_CURRENT_SOURCE_DIR}/port_handlers.cpp ${CMAKE_CURRENT_SOURCE_DIR}/hier_block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/top_block.cpp diff --git a/lib/block.cpp b/lib/block.cpp index 6bfaa9b..2ca4e8b 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -157,7 +157,7 @@ void Block::add_item_tag( const size_t which_output, const Tag &tag ){ - BlockTagMessage message; + InputTagMessage message; message.tag = tag; (*this)->block->post_downstream(which_output, message); } diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp index 3ba6c79..af59009 100644 --- a/lib/block_actor.cpp +++ b/lib/block_actor.cpp @@ -24,5 +24,10 @@ static Theron::Framework global_framework(8); //TODO needs API config BlockActor::BlockActor(void): Apology::Worker(global_framework) { - + this->register_handlers(); +} + +BlockActor::~BlockActor(void) +{ + //NOP } diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 4430d52..826aed9 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -28,33 +28,33 @@ const double EDGE_CASE_MITIGATION = 8.0; //edge case mitigation constant //TODO will need more complicated later -void ElementImpl::buffer_returner(const size_t index, SBuffer &buffer) +void BlockActor::buffer_returner(const size_t index, SBuffer &buffer) { //reset offset and length buffer.offset = 0; buffer.length = 0; - BufferReturnMessage message; + OutputBufferMessage message; message.index = index; message.buffer = buffer; - this->block.post_msg(message); + this->Push(message, Theron::Address()); } static size_t recommend_length( - const std::vector<BufferHintMessage> &hints, + const std::vector<OutputHintMessage> &hints, const size_t output_multiple_bytes, const size_t at_least_bytes ){ //step 1) find the LCM of all reserves to create a super-reserve size_t lcm_bytes = output_multiple_bytes; - BOOST_FOREACH(const BufferHintMessage &hint, hints) + BOOST_FOREACH(const OutputHintMessage &hint, hints) { lcm_bytes = boost::math::lcm(lcm_bytes, hint.reserve_bytes); } //step 2) N x super reserve to minimize history edge case size_t Nlcm_bytes = lcm_bytes; - BOOST_FOREACH(const BufferHintMessage &hint, hints) + BOOST_FOREACH(const OutputHintMessage &hint, hints) { while (hint.history_bytes*EDGE_CASE_MITIGATION > Nlcm_bytes) { @@ -69,10 +69,12 @@ static size_t recommend_length( return std::min(Nlcm_bytes, AHH_TOO_MANY_BYTES); } -void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface) +void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address from) { + MESSAGE_TRACER(); + //allocate output buffers which will also wake up the task - const size_t num_outputs = task_iface.get_num_outputs(); + const size_t num_outputs = this->get_num_outputs(); this->output_buffer_tokens.resize(num_outputs); for (size_t i = 0; i < num_outputs; i++) { @@ -85,7 +87,7 @@ void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface) at_least_items*this->output_items_sizes[i] ); - SBufferDeleter deleter = boost::bind(&ElementImpl::buffer_returner, this, i, _1); + SBufferDeleter deleter = boost::bind(&BlockActor::buffer_returner, this, i, _1); SBufferToken token = SBufferToken(new SBufferDeleter(deleter)); this->output_buffer_tokens[i] = block_ptr->output_buffer_allocator(i, token, bytes); @@ -93,8 +95,10 @@ void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface) InputAllocatorMessage message; message.token = SBufferToken(new SBufferDeleter(deleter)); message.recommend_length = bytes; - task_iface.post_downstream(i, message); + this->post_downstream(i, message); } + + this->Send(0, from); //ACK } SBufferToken Block::output_buffer_allocator( @@ -107,7 +111,7 @@ SBufferToken Block::output_buffer_allocator( SBufferConfig config; config.memory = NULL; config.length = recommend_length; - config.affinity = (*this)->buffer_affinity; + config.affinity = (*this)->block->buffer_affinity; config.token = token; SBuffer buff(config); std::memset(buff.get_actual_memory(), 0, buff.get_actual_length()); diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 0fd7022..c12820d 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -21,144 +21,150 @@ using namespace gnuradio; -void ElementImpl::handle_block_msg( - const tsbe::TaskInterface &task_iface, - const tsbe::Wax &msg + +void BlockActor::handle_top_active( + const TopActiveMessage &, + const Theron::Address from ){ - if (MESSAGE) std::cerr << "handle_block_msg (" << msg.type().name() << ") " << name << std::endl; + MESSAGE_TRACER(); - //a buffer has returned from the downstream - //(all interested consumers have finished with it) - if (msg.type() == typeid(BufferReturnMessage)) + if (this->block_state != BLOCK_STATE_LIVE) { - const BufferReturnMessage &message = msg.cast<BufferReturnMessage>(); - const size_t index = message.index; - if (this->block_state == BLOCK_STATE_DONE) return; - this->output_queues.push(index, message.buffer); - this->handle_task(task_iface); - return; + this->block_ptr->start(); } - - //self kick, call the handle task method - if (msg.type() == typeid(SelfKickMessage)) + this->block_state = BLOCK_STATE_LIVE; + if (this->input_queues.all_ready() and this->output_queues.all_ready()) { - this->handle_task(task_iface); - return; + this->Push(SelfKickMessage(), Theron::Address()); } - //clearly, this block is near death, hang on sparky - if (msg.type() == typeid(CheckTokensMessage)) + this->Send(0, from); //ACK +} + +void BlockActor::handle_top_inert( + const TopInertMessage &, + const Theron::Address from +){ + MESSAGE_TRACER(); + + if (this->block_state != BLOCK_STATE_DONE) { - if (this->input_queues.all_ready() and not this->forecast_fail) - { - this->handle_task(task_iface); - } - else - { - this->mark_done(task_iface); - } - return; + this->block_ptr->stop(); } + this->mark_done(); - //store the topology's thread group - //erase any potentially old lingering threads - //spawn a new thread if this block is a source - if (msg.type() == typeid(SharedThreadGroup)) + this->Send(0, from); //ACK +} + +void BlockActor::handle_top_token( + const TopTokenMessage &message, + const Theron::Address from +){ + MESSAGE_TRACER(); + + //create input tokens and send allocation hints + for (size_t i = 0; i < this->get_num_inputs(); i++) { - this->thread_group = msg.cast<SharedThreadGroup>(); - this->interruptible_thread.reset(); //erase old one - if (task_iface.get_num_inputs() == 0) //its a source - { - this->interruptible_thread = boost::make_shared<InterruptibleThread>( - this->thread_group, boost::bind(&ElementImpl::task_work, this) - ); - } - return; + this->input_tokens[i] = Token::make(); + InputTokenMessage token_msg; + token_msg.token = this->input_tokens[i]; + this->post_upstream(i, token_msg); + + //TODO, schedule this message as a pre-allocation message + //tell the upstream about the input requirements + OutputHintMessage output_hints; + output_hints.history_bytes = this->input_history_items[i]*this->input_items_sizes[i]; + output_hints.reserve_bytes = this->input_multiple_items[i]; + output_hints.token = this->input_tokens[i]; + this->post_upstream(i, output_hints); + } - //user changed some input settings like history or reserve reqs - if (msg.type() == typeid(UpdateInputsMessage)) + //create output token + for (size_t i = 0; i < this->get_num_outputs(); i++) { - this->input_update(task_iface); - return; + this->output_tokens[i] = Token::make(); + OutputTokenMessage token_msg; + token_msg.token = this->output_tokens[i]; + this->post_downstream(i, token_msg); } - ASSERT(msg.type() == typeid(TopBlockMessage)); + //store a token to the top level topology + this->token_pool.insert(message.token); - //FIXME leave the marked done blocks done... - //this helps QA tests to pass that re-use top block without diconnecting the old design - if (this->block_state == BLOCK_STATE_DONE) return; + this->Send(0, from); //ACK +} - const size_t num_inputs = task_iface.get_num_inputs(); - const size_t num_outputs = task_iface.get_num_outputs(); +void BlockActor::handle_top_hint( + const TopHintMessage &message, + const Theron::Address from +){ + MESSAGE_TRACER(); - //allocate output tokens and send them downstream - if (msg.cast<TopBlockMessage>().what == TopBlockMessage::TOKEN_TIME) - { - for (size_t i = 0; i < num_inputs; i++) - { - this->input_tokens[i] = Token::make(); - task_iface.post_upstream(i, this->input_tokens[i]); + this->hint = message.hint; - //TODO, schedule this message as a pre-allocation message - //tell the upstream about the input requirements - BufferHintMessage message; - message.history_bytes = this->input_history_items[i]*this->input_items_sizes[i]; - message.reserve_bytes = this->input_multiple_items[i]; - message.token = this->input_tokens[i]; - task_iface.post_upstream(i, message); + this->Send(0, from); //ACK +} - } - for (size_t i = 0; i < num_outputs; i++) - { - this->output_tokens[i] = Token::make(); - task_iface.post_downstream(i, this->output_tokens[i]); - } - this->token_pool.insert(msg.cast<TopBlockMessage>().token); - } +void BlockActor::handle_top_thread_group( + const SharedThreadGroup &message, + const Theron::Address from +){ + MESSAGE_TRACER(); - if (msg.cast<TopBlockMessage>().what == TopBlockMessage::ALLOCATE) + //store the topology's thread group + //erase any potentially old lingering threads + //spawn a new thread if this block is a source + this->thread_group = message; + this->interruptible_thread.reset(); //erase old one + if (this->get_num_inputs() == 0) //its a source { - //causes initial processing kick-off for source blocks - this->handle_allocation(task_iface); + this->interruptible_thread = boost::make_shared<InterruptibleThread>( + this->thread_group, boost::bind(&BlockActor::task_work, this) + ); } - if (msg.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE) + this->Send(0, from); //ACK +} + +void BlockActor::handle_self_kick( + const SelfKickMessage &, + const Theron::Address +){ + MESSAGE_TRACER(); + this->handle_task(); +} + +void BlockActor::handle_check_tokens( + const CheckTokensMessage &, + const Theron::Address +){ + MESSAGE_TRACER(); + if (this->input_queues.all_ready() and not this->forecast_fail) { - if (this->block_state != BLOCK_STATE_LIVE) - { - this->block_ptr->start(); - } - this->block_state = BLOCK_STATE_LIVE; - if (this->input_queues.all_ready() and this->output_queues.all_ready()) - { - this->block.post_msg(SelfKickMessage()); - } + this->handle_task(); } - - if (msg.cast<TopBlockMessage>().what == TopBlockMessage::INERT) + else { - if (this->block_state != BLOCK_STATE_DONE) - { - this->block_ptr->stop(); - } - this->mark_done(task_iface); + this->mark_done(); } } -void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) -{ - //std::cout << "topology_update in " << name << std::endl; +void BlockActor::handle_topology( + const Apology::WorkerTopologyMessage &, + const Theron::Address +){ + MESSAGE_TRACER(); - const size_t num_inputs = task_iface.get_num_inputs(); - const size_t num_outputs = task_iface.get_num_outputs(); + const size_t num_inputs = this->get_num_inputs(); + const size_t num_outputs = this->get_num_outputs(); //call check_topology on block before committing settings this->block_ptr->check_topology(num_inputs, num_outputs); //fill the item sizes from the IO signatures - fill_item_sizes_from_sig(this->input_items_sizes, this->input_signature, num_inputs); - fill_item_sizes_from_sig(this->output_items_sizes, this->output_signature, num_outputs); + fill_item_sizes_from_sig(this->input_items_sizes, (*block_ptr)->input_signature, num_inputs); + fill_item_sizes_from_sig(this->output_items_sizes, (*block_ptr)->output_signature, num_outputs); //resize and fill port properties resize_fill_back(this->input_history_items, num_inputs); @@ -194,17 +200,19 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) //a block looses all connections, allow it to free if (num_inputs == 0 and num_outputs == 0) { - this->mark_done(task_iface); + this->mark_done(); } this->topology_init = true; - this->input_update(task_iface); + this->handle_update_inputs(UpdateInputsMessage(), Theron::Address()); } -void ElementImpl::input_update(const tsbe::TaskInterface &task_iface) -{ - const size_t num_inputs = task_iface.get_num_inputs(); - const size_t num_outputs = task_iface.get_num_outputs(); +void BlockActor::handle_update_inputs( + const UpdateInputsMessage &, + const Theron::Address +){ + const size_t num_inputs = this->get_num_inputs(); + const size_t num_outputs = this->get_num_outputs(); //impose input reserve requirements based on relative rate and output multiple resize_fill_grow(this->input_multiple_items, num_inputs, 1); diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 98634b7..63d5ff5 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -47,9 +47,64 @@ struct BlockActor : Apology::Worker { BlockActor(void); ~BlockActor(void); - Block *block_ptr; + //do it here so we can match w/ the handler declarations + void register_handlers(void) + { + this->RegisterHandler(this, &BlockActor::handle_topology); + + this->RegisterHandler(this, &BlockActor::handle_top_alloc); + this->RegisterHandler(this, &BlockActor::handle_top_active); + this->RegisterHandler(this, &BlockActor::handle_top_inert); + this->RegisterHandler(this, &BlockActor::handle_top_token); + this->RegisterHandler(this, &BlockActor::handle_top_hint); + this->RegisterHandler(this, &BlockActor::handle_top_thread_group); + + this->RegisterHandler(this, &BlockActor::handle_input_tag); + this->RegisterHandler(this, &BlockActor::handle_input_buffer); + this->RegisterHandler(this, &BlockActor::handle_input_token); + this->RegisterHandler(this, &BlockActor::handle_input_check); + + this->RegisterHandler(this, &BlockActor::handle_output_buffer); + this->RegisterHandler(this, &BlockActor::handle_output_token); + this->RegisterHandler(this, &BlockActor::handle_output_check); + this->RegisterHandler(this, &BlockActor::handle_output_hint); + + this->RegisterHandler(this, &BlockActor::handle_self_kick); + this->RegisterHandler(this, &BlockActor::handle_check_tokens); + this->RegisterHandler(this, &BlockActor::handle_update_inputs); + } + + //handlers + void handle_topology(const Apology::WorkerTopologyMessage &, const Theron::Address); + + void handle_top_alloc(const TopAllocMessage &, const Theron::Address); + void handle_top_active(const TopActiveMessage &, const Theron::Address); + void handle_top_inert(const TopInertMessage &, const Theron::Address); + void handle_top_token(const TopTokenMessage &, const Theron::Address); + void handle_top_hint(const TopHintMessage &, const Theron::Address); + void handle_top_thread_group(const SharedThreadGroup &, const Theron::Address); + + void handle_input_tag(const InputTagMessage &, const Theron::Address); + void handle_input_buffer(const InputBufferMessage &, const Theron::Address); + void handle_input_token(const InputTokenMessage &, const Theron::Address); + void handle_input_check(const InputCheckMessage &, const Theron::Address); + + void handle_output_buffer(const OutputBufferMessage &, const Theron::Address); + void handle_output_token(const OutputTokenMessage &, const Theron::Address); + void handle_output_check(const OutputCheckMessage &, const Theron::Address); + void handle_output_hint(const OutputHintMessage &, const Theron::Address); + + void handle_self_kick(const SelfKickMessage &, const Theron::Address); + void handle_check_tokens(const CheckTokensMessage &, const Theron::Address); + void handle_update_inputs(const UpdateInputsMessage &, const Theron::Address); + + //helpers + void buffer_returner(const size_t index, SBuffer &buffer); + void mark_done(void); + void handle_task(void); + //per port properties std::vector<size_t> input_items_sizes; std::vector<size_t> output_items_sizes; @@ -96,6 +151,7 @@ struct BlockActor : Apology::Worker Block::tag_propagation_policy_t tag_prop_policy; //interruptible thread stuff + SharedThreadGroup thread_group; boost::shared_ptr<InterruptibleThread> interruptible_thread; //handlers @@ -131,7 +187,7 @@ struct BlockActor : Apology::Worker size_t hint; //some kind of allocation hint Affinity buffer_affinity; - std::vector<std::vector<BufferHintMessage> > output_allocation_hints; + std::vector<std::vector<OutputHintMessage> > output_allocation_hints; //rate settings bool enable_fixed_rate; diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index 5aa8e8e..a2f1f5f 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -19,19 +19,20 @@ #include <iostream> #include <stdexcept> +#include <boost/current_function.hpp> //---------------------------------------------------------------------- //-- set to 1 to enable these debugs: //---------------------------------------------------------------------- #define GENESIS 0 #define ARMAGEDDON 0 -#define MESSAGE 0 //---------------------------------------------------------------------- //-- define to enable these debugs: //---------------------------------------------------------------------- //#define WORK_DEBUG #define ASSERTING +#define MESSAGE_TRACING //---------------------------------------------------------------------- //-- various debug prints @@ -39,6 +40,12 @@ #define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush; #define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; +#ifdef MESSAGE_TRACING +#define MESSAGE_TRACER() std::cerr << "Handle message in: " << BOOST_CURRENT_FUNCTION << std::endl << std::flush; +#else +#define MESSAGE_TRACER() +#endif + //---------------------------------------------------------------------- //-- implementation for assert debug //---------------------------------------------------------------------- diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index 0b1bed0..3b57fef 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -19,62 +19,122 @@ #include <gnuradio/sbuffer.hpp> #include <gnuradio/tags.hpp> +#include <gnuradio/sbuffer.hpp> +#include <gras_impl/token.hpp> namespace gnuradio { -struct BlockTagMessage +//---------------------------------------------------------------------- +//-- message from the top block/executor +//-- these messages must be ack'd +//---------------------------------------------------------------------- + +struct TopAllocMessage +{ + //empty +}; + +struct TopActiveMessage +{ + //empty +}; + +struct TopInertMessage +{ + //empty +}; + +struct TopTokenMessage +{ + Token token; +}; + +struct TopHintMessage +{ + size_t hint; +}; + +//---------------------------------------------------------------------- +//-- message to an input port +//---------------------------------------------------------------------- + +struct InputTagMessage { size_t index; Tag tag; }; -struct TopBlockMessage +struct InputBufferMessage { - enum - { - ALLOCATE, - ACTIVE, - INERT, - HINT, - TOKEN_TIME, - } what; - size_t hint; + size_t index; + SBuffer buffer; +}; + +struct InputTokenMessage +{ + size_t index; Token token; }; -struct CheckTokensMessage +struct InputAllocatorMessage { - //empty + size_t index; + SBufferToken token; + size_t recommend_length; }; -struct SelfKickMessage +struct InputCheckMessage { - //empty + size_t index; }; -struct BufferReturnMessage +//---------------------------------------------------------------------- +//-- message to an output port +//---------------------------------------------------------------------- + +struct OutputBufferMessage { size_t index; SBuffer buffer; }; -struct BufferHintMessage +struct OutputTokenMessage { + size_t index; + Token token; +}; + +struct OutputCheckMessage +{ + size_t index; +}; + +struct OutputHintMessage +{ + size_t index; size_t history_bytes; size_t reserve_bytes; WeakToken token; }; -struct UpdateInputsMessage +//---------------------------------------------------------------------- +//-- message to just the block +//---------------------------------------------------------------------- + +struct SelfKickMessage { //empty }; -struct InputAllocatorMessage +struct CheckTokensMessage { - SBufferToken token; - size_t recommend_length; + //empty +}; + +struct UpdateInputsMessage +{ + //empty }; } //namespace gnuradio diff --git a/lib/port_handlers.cpp b/lib/port_handlers.cpp index 848d9ef..a134ff4 100644 --- a/lib/port_handlers.cpp +++ b/lib/port_handlers.cpp @@ -19,6 +19,18 @@ using namespace gnuradio; +//a buffer has returned from the downstream + //(all interested consumers have finished with it) + if (msg.type() == typeid(BufferReturnMessage)) + { + const BufferReturnMessage &message = msg.cast<BufferReturnMessage>(); + const size_t index = message.index; + if (this->block_state == BLOCK_STATE_DONE) return; + this->output_queues.push(index, message.buffer); + this->handle_task(task_iface); + return; + } + void ElementImpl::handle_input_msg( const tsbe::TaskInterface &handle, const size_t index, diff --git a/lib/top_block.cpp b/lib/top_block.cpp index a8f01de..b2c2e8d 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -40,9 +40,7 @@ TopBlock::TopBlock(const std::string &name): void ElementImpl::top_block_cleanup(void) { - TopBlockMessage event; - event.what = TopBlockMessage::INERT; - this->executor->post_all(event); + this->executor->post_all(TopInertMessage()); if (ARMAGEDDON) std::cerr << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n" << "xx Top Block Destroyed: " << name << "\n" @@ -57,10 +55,9 @@ void TopBlock::update(void) void TopBlock::set_buffer_hint(const size_t hint) { - TopBlockMessage event; - event.what = TopBlockMessage::HINT; - event.hint = hint; - (*this)->executor->post_all(event); + TopHintMessage message; + message.hint = hint; + (*this)->executor->post_all(message); } void TopBlock::start(void) @@ -70,20 +67,15 @@ void TopBlock::start(void) (*this)->executor->post_all((*this)->thread_group); } { - TopBlockMessage event; - event.what = TopBlockMessage::TOKEN_TIME; - event.token = (*this)->token; - (*this)->executor->post_all(event); + TopTokenMessage message; + message.token = (*this)->token; + (*this)->executor->post_all(message); } { - TopBlockMessage event; - event.what = TopBlockMessage::ALLOCATE; - (*this)->executor->post_all(event); + (*this)->executor->post_all(TopAllocMessage()); } { - TopBlockMessage event; - event.what = TopBlockMessage::ACTIVE; - (*this)->executor->post_all(event); + (*this)->executor->post_all(TopActiveMessage()); } } @@ -93,9 +85,7 @@ void TopBlock::stop(void) (*this)->thread_group->interrupt_all(); //message all blocks to mark done - TopBlockMessage event; - event.what = TopBlockMessage::INERT; - (*this)->executor->post_all(event); + (*this)->executor->post_all(TopInertMessage()); } void TopBlock::run(void) |