summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/CMakeLists.txt4
-rw-r--r--lib/block.cpp2
-rw-r--r--lib/block_actor.cpp7
-rw-r--r--lib/block_allocator.cpp26
-rw-r--r--lib/block_handlers.cpp222
-rw-r--r--lib/gras_impl/block_actor.hpp60
-rw-r--r--lib/gras_impl/debug.hpp9
-rw-r--r--lib/gras_impl/messages.hpp102
-rw-r--r--lib/port_handlers.cpp12
-rw-r--r--lib/top_block.cpp30
10 files changed, 308 insertions, 166 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index bc5115e..ffc43f1 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -50,8 +50,8 @@ list(APPEND gnuradio_core_sources
${CMAKE_CURRENT_SOURCE_DIR}/block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp
#${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp
- #${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp
- #${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp
#${CMAKE_CURRENT_SOURCE_DIR}/port_handlers.cpp
${CMAKE_CURRENT_SOURCE_DIR}/hier_block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/top_block.cpp
diff --git a/lib/block.cpp b/lib/block.cpp
index 6bfaa9b..2ca4e8b 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -157,7 +157,7 @@ void Block::add_item_tag(
const size_t which_output,
const Tag &tag
){
- BlockTagMessage message;
+ InputTagMessage message;
message.tag = tag;
(*this)->block->post_downstream(which_output, message);
}
diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp
index 3ba6c79..af59009 100644
--- a/lib/block_actor.cpp
+++ b/lib/block_actor.cpp
@@ -24,5 +24,10 @@ static Theron::Framework global_framework(8); //TODO needs API config
BlockActor::BlockActor(void):
Apology::Worker(global_framework)
{
-
+ this->register_handlers();
+}
+
+BlockActor::~BlockActor(void)
+{
+ //NOP
}
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index 4430d52..826aed9 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -28,33 +28,33 @@ const double EDGE_CASE_MITIGATION = 8.0; //edge case mitigation constant
//TODO will need more complicated later
-void ElementImpl::buffer_returner(const size_t index, SBuffer &buffer)
+void BlockActor::buffer_returner(const size_t index, SBuffer &buffer)
{
//reset offset and length
buffer.offset = 0;
buffer.length = 0;
- BufferReturnMessage message;
+ OutputBufferMessage message;
message.index = index;
message.buffer = buffer;
- this->block.post_msg(message);
+ this->Push(message, Theron::Address());
}
static size_t recommend_length(
- const std::vector<BufferHintMessage> &hints,
+ const std::vector<OutputHintMessage> &hints,
const size_t output_multiple_bytes,
const size_t at_least_bytes
){
//step 1) find the LCM of all reserves to create a super-reserve
size_t lcm_bytes = output_multiple_bytes;
- BOOST_FOREACH(const BufferHintMessage &hint, hints)
+ BOOST_FOREACH(const OutputHintMessage &hint, hints)
{
lcm_bytes = boost::math::lcm(lcm_bytes, hint.reserve_bytes);
}
//step 2) N x super reserve to minimize history edge case
size_t Nlcm_bytes = lcm_bytes;
- BOOST_FOREACH(const BufferHintMessage &hint, hints)
+ BOOST_FOREACH(const OutputHintMessage &hint, hints)
{
while (hint.history_bytes*EDGE_CASE_MITIGATION > Nlcm_bytes)
{
@@ -69,10 +69,12 @@ static size_t recommend_length(
return std::min(Nlcm_bytes, AHH_TOO_MANY_BYTES);
}
-void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface)
+void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address from)
{
+ MESSAGE_TRACER();
+
//allocate output buffers which will also wake up the task
- const size_t num_outputs = task_iface.get_num_outputs();
+ const size_t num_outputs = this->get_num_outputs();
this->output_buffer_tokens.resize(num_outputs);
for (size_t i = 0; i < num_outputs; i++)
{
@@ -85,7 +87,7 @@ void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface)
at_least_items*this->output_items_sizes[i]
);
- SBufferDeleter deleter = boost::bind(&ElementImpl::buffer_returner, this, i, _1);
+ SBufferDeleter deleter = boost::bind(&BlockActor::buffer_returner, this, i, _1);
SBufferToken token = SBufferToken(new SBufferDeleter(deleter));
this->output_buffer_tokens[i] = block_ptr->output_buffer_allocator(i, token, bytes);
@@ -93,8 +95,10 @@ void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface)
InputAllocatorMessage message;
message.token = SBufferToken(new SBufferDeleter(deleter));
message.recommend_length = bytes;
- task_iface.post_downstream(i, message);
+ this->post_downstream(i, message);
}
+
+ this->Send(0, from); //ACK
}
SBufferToken Block::output_buffer_allocator(
@@ -107,7 +111,7 @@ SBufferToken Block::output_buffer_allocator(
SBufferConfig config;
config.memory = NULL;
config.length = recommend_length;
- config.affinity = (*this)->buffer_affinity;
+ config.affinity = (*this)->block->buffer_affinity;
config.token = token;
SBuffer buff(config);
std::memset(buff.get_actual_memory(), 0, buff.get_actual_length());
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 0fd7022..c12820d 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -21,144 +21,150 @@
using namespace gnuradio;
-void ElementImpl::handle_block_msg(
- const tsbe::TaskInterface &task_iface,
- const tsbe::Wax &msg
+
+void BlockActor::handle_top_active(
+ const TopActiveMessage &,
+ const Theron::Address from
){
- if (MESSAGE) std::cerr << "handle_block_msg (" << msg.type().name() << ") " << name << std::endl;
+ MESSAGE_TRACER();
- //a buffer has returned from the downstream
- //(all interested consumers have finished with it)
- if (msg.type() == typeid(BufferReturnMessage))
+ if (this->block_state != BLOCK_STATE_LIVE)
{
- const BufferReturnMessage &message = msg.cast<BufferReturnMessage>();
- const size_t index = message.index;
- if (this->block_state == BLOCK_STATE_DONE) return;
- this->output_queues.push(index, message.buffer);
- this->handle_task(task_iface);
- return;
+ this->block_ptr->start();
}
-
- //self kick, call the handle task method
- if (msg.type() == typeid(SelfKickMessage))
+ this->block_state = BLOCK_STATE_LIVE;
+ if (this->input_queues.all_ready() and this->output_queues.all_ready())
{
- this->handle_task(task_iface);
- return;
+ this->Push(SelfKickMessage(), Theron::Address());
}
- //clearly, this block is near death, hang on sparky
- if (msg.type() == typeid(CheckTokensMessage))
+ this->Send(0, from); //ACK
+}
+
+void BlockActor::handle_top_inert(
+ const TopInertMessage &,
+ const Theron::Address from
+){
+ MESSAGE_TRACER();
+
+ if (this->block_state != BLOCK_STATE_DONE)
{
- if (this->input_queues.all_ready() and not this->forecast_fail)
- {
- this->handle_task(task_iface);
- }
- else
- {
- this->mark_done(task_iface);
- }
- return;
+ this->block_ptr->stop();
}
+ this->mark_done();
- //store the topology's thread group
- //erase any potentially old lingering threads
- //spawn a new thread if this block is a source
- if (msg.type() == typeid(SharedThreadGroup))
+ this->Send(0, from); //ACK
+}
+
+void BlockActor::handle_top_token(
+ const TopTokenMessage &message,
+ const Theron::Address from
+){
+ MESSAGE_TRACER();
+
+ //create input tokens and send allocation hints
+ for (size_t i = 0; i < this->get_num_inputs(); i++)
{
- this->thread_group = msg.cast<SharedThreadGroup>();
- this->interruptible_thread.reset(); //erase old one
- if (task_iface.get_num_inputs() == 0) //its a source
- {
- this->interruptible_thread = boost::make_shared<InterruptibleThread>(
- this->thread_group, boost::bind(&ElementImpl::task_work, this)
- );
- }
- return;
+ this->input_tokens[i] = Token::make();
+ InputTokenMessage token_msg;
+ token_msg.token = this->input_tokens[i];
+ this->post_upstream(i, token_msg);
+
+ //TODO, schedule this message as a pre-allocation message
+ //tell the upstream about the input requirements
+ OutputHintMessage output_hints;
+ output_hints.history_bytes = this->input_history_items[i]*this->input_items_sizes[i];
+ output_hints.reserve_bytes = this->input_multiple_items[i];
+ output_hints.token = this->input_tokens[i];
+ this->post_upstream(i, output_hints);
+
}
- //user changed some input settings like history or reserve reqs
- if (msg.type() == typeid(UpdateInputsMessage))
+ //create output token
+ for (size_t i = 0; i < this->get_num_outputs(); i++)
{
- this->input_update(task_iface);
- return;
+ this->output_tokens[i] = Token::make();
+ OutputTokenMessage token_msg;
+ token_msg.token = this->output_tokens[i];
+ this->post_downstream(i, token_msg);
}
- ASSERT(msg.type() == typeid(TopBlockMessage));
+ //store a token to the top level topology
+ this->token_pool.insert(message.token);
- //FIXME leave the marked done blocks done...
- //this helps QA tests to pass that re-use top block without diconnecting the old design
- if (this->block_state == BLOCK_STATE_DONE) return;
+ this->Send(0, from); //ACK
+}
- const size_t num_inputs = task_iface.get_num_inputs();
- const size_t num_outputs = task_iface.get_num_outputs();
+void BlockActor::handle_top_hint(
+ const TopHintMessage &message,
+ const Theron::Address from
+){
+ MESSAGE_TRACER();
- //allocate output tokens and send them downstream
- if (msg.cast<TopBlockMessage>().what == TopBlockMessage::TOKEN_TIME)
- {
- for (size_t i = 0; i < num_inputs; i++)
- {
- this->input_tokens[i] = Token::make();
- task_iface.post_upstream(i, this->input_tokens[i]);
+ this->hint = message.hint;
- //TODO, schedule this message as a pre-allocation message
- //tell the upstream about the input requirements
- BufferHintMessage message;
- message.history_bytes = this->input_history_items[i]*this->input_items_sizes[i];
- message.reserve_bytes = this->input_multiple_items[i];
- message.token = this->input_tokens[i];
- task_iface.post_upstream(i, message);
+ this->Send(0, from); //ACK
+}
- }
- for (size_t i = 0; i < num_outputs; i++)
- {
- this->output_tokens[i] = Token::make();
- task_iface.post_downstream(i, this->output_tokens[i]);
- }
- this->token_pool.insert(msg.cast<TopBlockMessage>().token);
- }
+void BlockActor::handle_top_thread_group(
+ const SharedThreadGroup &message,
+ const Theron::Address from
+){
+ MESSAGE_TRACER();
- if (msg.cast<TopBlockMessage>().what == TopBlockMessage::ALLOCATE)
+ //store the topology's thread group
+ //erase any potentially old lingering threads
+ //spawn a new thread if this block is a source
+ this->thread_group = message;
+ this->interruptible_thread.reset(); //erase old one
+ if (this->get_num_inputs() == 0) //its a source
{
- //causes initial processing kick-off for source blocks
- this->handle_allocation(task_iface);
+ this->interruptible_thread = boost::make_shared<InterruptibleThread>(
+ this->thread_group, boost::bind(&BlockActor::task_work, this)
+ );
}
- if (msg.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE)
+ this->Send(0, from); //ACK
+}
+
+void BlockActor::handle_self_kick(
+ const SelfKickMessage &,
+ const Theron::Address
+){
+ MESSAGE_TRACER();
+ this->handle_task();
+}
+
+void BlockActor::handle_check_tokens(
+ const CheckTokensMessage &,
+ const Theron::Address
+){
+ MESSAGE_TRACER();
+ if (this->input_queues.all_ready() and not this->forecast_fail)
{
- if (this->block_state != BLOCK_STATE_LIVE)
- {
- this->block_ptr->start();
- }
- this->block_state = BLOCK_STATE_LIVE;
- if (this->input_queues.all_ready() and this->output_queues.all_ready())
- {
- this->block.post_msg(SelfKickMessage());
- }
+ this->handle_task();
}
-
- if (msg.cast<TopBlockMessage>().what == TopBlockMessage::INERT)
+ else
{
- if (this->block_state != BLOCK_STATE_DONE)
- {
- this->block_ptr->stop();
- }
- this->mark_done(task_iface);
+ this->mark_done();
}
}
-void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
-{
- //std::cout << "topology_update in " << name << std::endl;
+void BlockActor::handle_topology(
+ const Apology::WorkerTopologyMessage &,
+ const Theron::Address
+){
+ MESSAGE_TRACER();
- const size_t num_inputs = task_iface.get_num_inputs();
- const size_t num_outputs = task_iface.get_num_outputs();
+ const size_t num_inputs = this->get_num_inputs();
+ const size_t num_outputs = this->get_num_outputs();
//call check_topology on block before committing settings
this->block_ptr->check_topology(num_inputs, num_outputs);
//fill the item sizes from the IO signatures
- fill_item_sizes_from_sig(this->input_items_sizes, this->input_signature, num_inputs);
- fill_item_sizes_from_sig(this->output_items_sizes, this->output_signature, num_outputs);
+ fill_item_sizes_from_sig(this->input_items_sizes, (*block_ptr)->input_signature, num_inputs);
+ fill_item_sizes_from_sig(this->output_items_sizes, (*block_ptr)->output_signature, num_outputs);
//resize and fill port properties
resize_fill_back(this->input_history_items, num_inputs);
@@ -194,17 +200,19 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
//a block looses all connections, allow it to free
if (num_inputs == 0 and num_outputs == 0)
{
- this->mark_done(task_iface);
+ this->mark_done();
}
this->topology_init = true;
- this->input_update(task_iface);
+ this->handle_update_inputs(UpdateInputsMessage(), Theron::Address());
}
-void ElementImpl::input_update(const tsbe::TaskInterface &task_iface)
-{
- const size_t num_inputs = task_iface.get_num_inputs();
- const size_t num_outputs = task_iface.get_num_outputs();
+void BlockActor::handle_update_inputs(
+ const UpdateInputsMessage &,
+ const Theron::Address
+){
+ const size_t num_inputs = this->get_num_inputs();
+ const size_t num_outputs = this->get_num_outputs();
//impose input reserve requirements based on relative rate and output multiple
resize_fill_grow(this->input_multiple_items, num_inputs, 1);
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 98634b7..63d5ff5 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -47,9 +47,64 @@ struct BlockActor : Apology::Worker
{
BlockActor(void);
~BlockActor(void);
-
Block *block_ptr;
+ //do it here so we can match w/ the handler declarations
+ void register_handlers(void)
+ {
+ this->RegisterHandler(this, &BlockActor::handle_topology);
+
+ this->RegisterHandler(this, &BlockActor::handle_top_alloc);
+ this->RegisterHandler(this, &BlockActor::handle_top_active);
+ this->RegisterHandler(this, &BlockActor::handle_top_inert);
+ this->RegisterHandler(this, &BlockActor::handle_top_token);
+ this->RegisterHandler(this, &BlockActor::handle_top_hint);
+ this->RegisterHandler(this, &BlockActor::handle_top_thread_group);
+
+ this->RegisterHandler(this, &BlockActor::handle_input_tag);
+ this->RegisterHandler(this, &BlockActor::handle_input_buffer);
+ this->RegisterHandler(this, &BlockActor::handle_input_token);
+ this->RegisterHandler(this, &BlockActor::handle_input_check);
+
+ this->RegisterHandler(this, &BlockActor::handle_output_buffer);
+ this->RegisterHandler(this, &BlockActor::handle_output_token);
+ this->RegisterHandler(this, &BlockActor::handle_output_check);
+ this->RegisterHandler(this, &BlockActor::handle_output_hint);
+
+ this->RegisterHandler(this, &BlockActor::handle_self_kick);
+ this->RegisterHandler(this, &BlockActor::handle_check_tokens);
+ this->RegisterHandler(this, &BlockActor::handle_update_inputs);
+ }
+
+ //handlers
+ void handle_topology(const Apology::WorkerTopologyMessage &, const Theron::Address);
+
+ void handle_top_alloc(const TopAllocMessage &, const Theron::Address);
+ void handle_top_active(const TopActiveMessage &, const Theron::Address);
+ void handle_top_inert(const TopInertMessage &, const Theron::Address);
+ void handle_top_token(const TopTokenMessage &, const Theron::Address);
+ void handle_top_hint(const TopHintMessage &, const Theron::Address);
+ void handle_top_thread_group(const SharedThreadGroup &, const Theron::Address);
+
+ void handle_input_tag(const InputTagMessage &, const Theron::Address);
+ void handle_input_buffer(const InputBufferMessage &, const Theron::Address);
+ void handle_input_token(const InputTokenMessage &, const Theron::Address);
+ void handle_input_check(const InputCheckMessage &, const Theron::Address);
+
+ void handle_output_buffer(const OutputBufferMessage &, const Theron::Address);
+ void handle_output_token(const OutputTokenMessage &, const Theron::Address);
+ void handle_output_check(const OutputCheckMessage &, const Theron::Address);
+ void handle_output_hint(const OutputHintMessage &, const Theron::Address);
+
+ void handle_self_kick(const SelfKickMessage &, const Theron::Address);
+ void handle_check_tokens(const CheckTokensMessage &, const Theron::Address);
+ void handle_update_inputs(const UpdateInputsMessage &, const Theron::Address);
+
+ //helpers
+ void buffer_returner(const size_t index, SBuffer &buffer);
+ void mark_done(void);
+ void handle_task(void);
+
//per port properties
std::vector<size_t> input_items_sizes;
std::vector<size_t> output_items_sizes;
@@ -96,6 +151,7 @@ struct BlockActor : Apology::Worker
Block::tag_propagation_policy_t tag_prop_policy;
//interruptible thread stuff
+ SharedThreadGroup thread_group;
boost::shared_ptr<InterruptibleThread> interruptible_thread;
//handlers
@@ -131,7 +187,7 @@ struct BlockActor : Apology::Worker
size_t hint; //some kind of allocation hint
Affinity buffer_affinity;
- std::vector<std::vector<BufferHintMessage> > output_allocation_hints;
+ std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
//rate settings
bool enable_fixed_rate;
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index 5aa8e8e..a2f1f5f 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -19,19 +19,20 @@
#include <iostream>
#include <stdexcept>
+#include <boost/current_function.hpp>
//----------------------------------------------------------------------
//-- set to 1 to enable these debugs:
//----------------------------------------------------------------------
#define GENESIS 0
#define ARMAGEDDON 0
-#define MESSAGE 0
//----------------------------------------------------------------------
//-- define to enable these debugs:
//----------------------------------------------------------------------
//#define WORK_DEBUG
#define ASSERTING
+#define MESSAGE_TRACING
//----------------------------------------------------------------------
//-- various debug prints
@@ -39,6 +40,12 @@
#define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush;
#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush;
+#ifdef MESSAGE_TRACING
+#define MESSAGE_TRACER() std::cerr << "Handle message in: " << BOOST_CURRENT_FUNCTION << std::endl << std::flush;
+#else
+#define MESSAGE_TRACER()
+#endif
+
//----------------------------------------------------------------------
//-- implementation for assert debug
//----------------------------------------------------------------------
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index 0b1bed0..3b57fef 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -19,62 +19,122 @@
#include <gnuradio/sbuffer.hpp>
#include <gnuradio/tags.hpp>
+#include <gnuradio/sbuffer.hpp>
+#include <gras_impl/token.hpp>
namespace gnuradio
{
-struct BlockTagMessage
+//----------------------------------------------------------------------
+//-- message from the top block/executor
+//-- these messages must be ack'd
+//----------------------------------------------------------------------
+
+struct TopAllocMessage
+{
+ //empty
+};
+
+struct TopActiveMessage
+{
+ //empty
+};
+
+struct TopInertMessage
+{
+ //empty
+};
+
+struct TopTokenMessage
+{
+ Token token;
+};
+
+struct TopHintMessage
+{
+ size_t hint;
+};
+
+//----------------------------------------------------------------------
+//-- message to an input port
+//----------------------------------------------------------------------
+
+struct InputTagMessage
{
size_t index;
Tag tag;
};
-struct TopBlockMessage
+struct InputBufferMessage
{
- enum
- {
- ALLOCATE,
- ACTIVE,
- INERT,
- HINT,
- TOKEN_TIME,
- } what;
- size_t hint;
+ size_t index;
+ SBuffer buffer;
+};
+
+struct InputTokenMessage
+{
+ size_t index;
Token token;
};
-struct CheckTokensMessage
+struct InputAllocatorMessage
{
- //empty
+ size_t index;
+ SBufferToken token;
+ size_t recommend_length;
};
-struct SelfKickMessage
+struct InputCheckMessage
{
- //empty
+ size_t index;
};
-struct BufferReturnMessage
+//----------------------------------------------------------------------
+//-- message to an output port
+//----------------------------------------------------------------------
+
+struct OutputBufferMessage
{
size_t index;
SBuffer buffer;
};
-struct BufferHintMessage
+struct OutputTokenMessage
{
+ size_t index;
+ Token token;
+};
+
+struct OutputCheckMessage
+{
+ size_t index;
+};
+
+struct OutputHintMessage
+{
+ size_t index;
size_t history_bytes;
size_t reserve_bytes;
WeakToken token;
};
-struct UpdateInputsMessage
+//----------------------------------------------------------------------
+//-- message to just the block
+//----------------------------------------------------------------------
+
+struct SelfKickMessage
{
//empty
};
-struct InputAllocatorMessage
+struct CheckTokensMessage
{
- SBufferToken token;
- size_t recommend_length;
+ //empty
+};
+
+struct UpdateInputsMessage
+{
+ //empty
};
} //namespace gnuradio
diff --git a/lib/port_handlers.cpp b/lib/port_handlers.cpp
index 848d9ef..a134ff4 100644
--- a/lib/port_handlers.cpp
+++ b/lib/port_handlers.cpp
@@ -19,6 +19,18 @@
using namespace gnuradio;
+//a buffer has returned from the downstream
+ //(all interested consumers have finished with it)
+ if (msg.type() == typeid(BufferReturnMessage))
+ {
+ const BufferReturnMessage &message = msg.cast<BufferReturnMessage>();
+ const size_t index = message.index;
+ if (this->block_state == BLOCK_STATE_DONE) return;
+ this->output_queues.push(index, message.buffer);
+ this->handle_task(task_iface);
+ return;
+ }
+
void ElementImpl::handle_input_msg(
const tsbe::TaskInterface &handle,
const size_t index,
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index a8f01de..b2c2e8d 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -40,9 +40,7 @@ TopBlock::TopBlock(const std::string &name):
void ElementImpl::top_block_cleanup(void)
{
- TopBlockMessage event;
- event.what = TopBlockMessage::INERT;
- this->executor->post_all(event);
+ this->executor->post_all(TopInertMessage());
if (ARMAGEDDON) std::cerr
<< "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"
<< "xx Top Block Destroyed: " << name << "\n"
@@ -57,10 +55,9 @@ void TopBlock::update(void)
void TopBlock::set_buffer_hint(const size_t hint)
{
- TopBlockMessage event;
- event.what = TopBlockMessage::HINT;
- event.hint = hint;
- (*this)->executor->post_all(event);
+ TopHintMessage message;
+ message.hint = hint;
+ (*this)->executor->post_all(message);
}
void TopBlock::start(void)
@@ -70,20 +67,15 @@ void TopBlock::start(void)
(*this)->executor->post_all((*this)->thread_group);
}
{
- TopBlockMessage event;
- event.what = TopBlockMessage::TOKEN_TIME;
- event.token = (*this)->token;
- (*this)->executor->post_all(event);
+ TopTokenMessage message;
+ message.token = (*this)->token;
+ (*this)->executor->post_all(message);
}
{
- TopBlockMessage event;
- event.what = TopBlockMessage::ALLOCATE;
- (*this)->executor->post_all(event);
+ (*this)->executor->post_all(TopAllocMessage());
}
{
- TopBlockMessage event;
- event.what = TopBlockMessage::ACTIVE;
- (*this)->executor->post_all(event);
+ (*this)->executor->post_all(TopActiveMessage());
}
}
@@ -93,9 +85,7 @@ void TopBlock::stop(void)
(*this)->thread_group->interrupt_all();
//message all blocks to mark done
- TopBlockMessage event;
- event.what = TopBlockMessage::INERT;
- (*this)->executor->post_all(event);
+ (*this)->executor->post_all(TopInertMessage());
}
void TopBlock::run(void)