summaryrefslogtreecommitdiff
path: root/lib/block_handlers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/block_handlers.cpp')
-rw-r--r--lib/block_handlers.cpp222
1 files changed, 115 insertions, 107 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 0fd7022..c12820d 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -21,144 +21,150 @@
using namespace gnuradio;
-void ElementImpl::handle_block_msg(
- const tsbe::TaskInterface &task_iface,
- const tsbe::Wax &msg
+
+void BlockActor::handle_top_active(
+ const TopActiveMessage &,
+ const Theron::Address from
){
- if (MESSAGE) std::cerr << "handle_block_msg (" << msg.type().name() << ") " << name << std::endl;
+ MESSAGE_TRACER();
- //a buffer has returned from the downstream
- //(all interested consumers have finished with it)
- if (msg.type() == typeid(BufferReturnMessage))
+ if (this->block_state != BLOCK_STATE_LIVE)
{
- const BufferReturnMessage &message = msg.cast<BufferReturnMessage>();
- const size_t index = message.index;
- if (this->block_state == BLOCK_STATE_DONE) return;
- this->output_queues.push(index, message.buffer);
- this->handle_task(task_iface);
- return;
+ this->block_ptr->start();
}
-
- //self kick, call the handle task method
- if (msg.type() == typeid(SelfKickMessage))
+ this->block_state = BLOCK_STATE_LIVE;
+ if (this->input_queues.all_ready() and this->output_queues.all_ready())
{
- this->handle_task(task_iface);
- return;
+ this->Push(SelfKickMessage(), Theron::Address());
}
- //clearly, this block is near death, hang on sparky
- if (msg.type() == typeid(CheckTokensMessage))
+ this->Send(0, from); //ACK
+}
+
+void BlockActor::handle_top_inert(
+ const TopInertMessage &,
+ const Theron::Address from
+){
+ MESSAGE_TRACER();
+
+ if (this->block_state != BLOCK_STATE_DONE)
{
- if (this->input_queues.all_ready() and not this->forecast_fail)
- {
- this->handle_task(task_iface);
- }
- else
- {
- this->mark_done(task_iface);
- }
- return;
+ this->block_ptr->stop();
}
+ this->mark_done();
- //store the topology's thread group
- //erase any potentially old lingering threads
- //spawn a new thread if this block is a source
- if (msg.type() == typeid(SharedThreadGroup))
+ this->Send(0, from); //ACK
+}
+
+void BlockActor::handle_top_token(
+ const TopTokenMessage &message,
+ const Theron::Address from
+){
+ MESSAGE_TRACER();
+
+ //create input tokens and send allocation hints
+ for (size_t i = 0; i < this->get_num_inputs(); i++)
{
- this->thread_group = msg.cast<SharedThreadGroup>();
- this->interruptible_thread.reset(); //erase old one
- if (task_iface.get_num_inputs() == 0) //its a source
- {
- this->interruptible_thread = boost::make_shared<InterruptibleThread>(
- this->thread_group, boost::bind(&ElementImpl::task_work, this)
- );
- }
- return;
+ this->input_tokens[i] = Token::make();
+ InputTokenMessage token_msg;
+ token_msg.token = this->input_tokens[i];
+ this->post_upstream(i, token_msg);
+
+ //TODO, schedule this message as a pre-allocation message
+ //tell the upstream about the input requirements
+ OutputHintMessage output_hints;
+ output_hints.history_bytes = this->input_history_items[i]*this->input_items_sizes[i];
+ output_hints.reserve_bytes = this->input_multiple_items[i];
+ output_hints.token = this->input_tokens[i];
+ this->post_upstream(i, output_hints);
+
}
- //user changed some input settings like history or reserve reqs
- if (msg.type() == typeid(UpdateInputsMessage))
+ //create output token
+ for (size_t i = 0; i < this->get_num_outputs(); i++)
{
- this->input_update(task_iface);
- return;
+ this->output_tokens[i] = Token::make();
+ OutputTokenMessage token_msg;
+ token_msg.token = this->output_tokens[i];
+ this->post_downstream(i, token_msg);
}
- ASSERT(msg.type() == typeid(TopBlockMessage));
+ //store a token to the top level topology
+ this->token_pool.insert(message.token);
- //FIXME leave the marked done blocks done...
- //this helps QA tests to pass that re-use top block without diconnecting the old design
- if (this->block_state == BLOCK_STATE_DONE) return;
+ this->Send(0, from); //ACK
+}
- const size_t num_inputs = task_iface.get_num_inputs();
- const size_t num_outputs = task_iface.get_num_outputs();
+void BlockActor::handle_top_hint(
+ const TopHintMessage &message,
+ const Theron::Address from
+){
+ MESSAGE_TRACER();
- //allocate output tokens and send them downstream
- if (msg.cast<TopBlockMessage>().what == TopBlockMessage::TOKEN_TIME)
- {
- for (size_t i = 0; i < num_inputs; i++)
- {
- this->input_tokens[i] = Token::make();
- task_iface.post_upstream(i, this->input_tokens[i]);
+ this->hint = message.hint;
- //TODO, schedule this message as a pre-allocation message
- //tell the upstream about the input requirements
- BufferHintMessage message;
- message.history_bytes = this->input_history_items[i]*this->input_items_sizes[i];
- message.reserve_bytes = this->input_multiple_items[i];
- message.token = this->input_tokens[i];
- task_iface.post_upstream(i, message);
+ this->Send(0, from); //ACK
+}
- }
- for (size_t i = 0; i < num_outputs; i++)
- {
- this->output_tokens[i] = Token::make();
- task_iface.post_downstream(i, this->output_tokens[i]);
- }
- this->token_pool.insert(msg.cast<TopBlockMessage>().token);
- }
+void BlockActor::handle_top_thread_group(
+ const SharedThreadGroup &message,
+ const Theron::Address from
+){
+ MESSAGE_TRACER();
- if (msg.cast<TopBlockMessage>().what == TopBlockMessage::ALLOCATE)
+ //store the topology's thread group
+ //erase any potentially old lingering threads
+ //spawn a new thread if this block is a source
+ this->thread_group = message;
+ this->interruptible_thread.reset(); //erase old one
+ if (this->get_num_inputs() == 0) //its a source
{
- //causes initial processing kick-off for source blocks
- this->handle_allocation(task_iface);
+ this->interruptible_thread = boost::make_shared<InterruptibleThread>(
+ this->thread_group, boost::bind(&BlockActor::task_work, this)
+ );
}
- if (msg.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE)
+ this->Send(0, from); //ACK
+}
+
+void BlockActor::handle_self_kick(
+ const SelfKickMessage &,
+ const Theron::Address
+){
+ MESSAGE_TRACER();
+ this->handle_task();
+}
+
+void BlockActor::handle_check_tokens(
+ const CheckTokensMessage &,
+ const Theron::Address
+){
+ MESSAGE_TRACER();
+ if (this->input_queues.all_ready() and not this->forecast_fail)
{
- if (this->block_state != BLOCK_STATE_LIVE)
- {
- this->block_ptr->start();
- }
- this->block_state = BLOCK_STATE_LIVE;
- if (this->input_queues.all_ready() and this->output_queues.all_ready())
- {
- this->block.post_msg(SelfKickMessage());
- }
+ this->handle_task();
}
-
- if (msg.cast<TopBlockMessage>().what == TopBlockMessage::INERT)
+ else
{
- if (this->block_state != BLOCK_STATE_DONE)
- {
- this->block_ptr->stop();
- }
- this->mark_done(task_iface);
+ this->mark_done();
}
}
-void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
-{
- //std::cout << "topology_update in " << name << std::endl;
+void BlockActor::handle_topology(
+ const Apology::WorkerTopologyMessage &,
+ const Theron::Address
+){
+ MESSAGE_TRACER();
- const size_t num_inputs = task_iface.get_num_inputs();
- const size_t num_outputs = task_iface.get_num_outputs();
+ const size_t num_inputs = this->get_num_inputs();
+ const size_t num_outputs = this->get_num_outputs();
//call check_topology on block before committing settings
this->block_ptr->check_topology(num_inputs, num_outputs);
//fill the item sizes from the IO signatures
- fill_item_sizes_from_sig(this->input_items_sizes, this->input_signature, num_inputs);
- fill_item_sizes_from_sig(this->output_items_sizes, this->output_signature, num_outputs);
+ fill_item_sizes_from_sig(this->input_items_sizes, (*block_ptr)->input_signature, num_inputs);
+ fill_item_sizes_from_sig(this->output_items_sizes, (*block_ptr)->output_signature, num_outputs);
//resize and fill port properties
resize_fill_back(this->input_history_items, num_inputs);
@@ -194,17 +200,19 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
//a block looses all connections, allow it to free
if (num_inputs == 0 and num_outputs == 0)
{
- this->mark_done(task_iface);
+ this->mark_done();
}
this->topology_init = true;
- this->input_update(task_iface);
+ this->handle_update_inputs(UpdateInputsMessage(), Theron::Address());
}
-void ElementImpl::input_update(const tsbe::TaskInterface &task_iface)
-{
- const size_t num_inputs = task_iface.get_num_inputs();
- const size_t num_outputs = task_iface.get_num_outputs();
+void BlockActor::handle_update_inputs(
+ const UpdateInputsMessage &,
+ const Theron::Address
+){
+ const size_t num_inputs = this->get_num_inputs();
+ const size_t num_outputs = this->get_num_outputs();
//impose input reserve requirements based on relative rate and output multiple
resize_fill_grow(this->input_multiple_items, num_inputs, 1);