diff options
Diffstat (limited to 'lib/block_handlers.cpp')
-rw-r--r-- | lib/block_handlers.cpp | 222 |
1 files changed, 115 insertions, 107 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 0fd7022..c12820d 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -21,144 +21,150 @@ using namespace gnuradio; -void ElementImpl::handle_block_msg( - const tsbe::TaskInterface &task_iface, - const tsbe::Wax &msg + +void BlockActor::handle_top_active( + const TopActiveMessage &, + const Theron::Address from ){ - if (MESSAGE) std::cerr << "handle_block_msg (" << msg.type().name() << ") " << name << std::endl; + MESSAGE_TRACER(); - //a buffer has returned from the downstream - //(all interested consumers have finished with it) - if (msg.type() == typeid(BufferReturnMessage)) + if (this->block_state != BLOCK_STATE_LIVE) { - const BufferReturnMessage &message = msg.cast<BufferReturnMessage>(); - const size_t index = message.index; - if (this->block_state == BLOCK_STATE_DONE) return; - this->output_queues.push(index, message.buffer); - this->handle_task(task_iface); - return; + this->block_ptr->start(); } - - //self kick, call the handle task method - if (msg.type() == typeid(SelfKickMessage)) + this->block_state = BLOCK_STATE_LIVE; + if (this->input_queues.all_ready() and this->output_queues.all_ready()) { - this->handle_task(task_iface); - return; + this->Push(SelfKickMessage(), Theron::Address()); } - //clearly, this block is near death, hang on sparky - if (msg.type() == typeid(CheckTokensMessage)) + this->Send(0, from); //ACK +} + +void BlockActor::handle_top_inert( + const TopInertMessage &, + const Theron::Address from +){ + MESSAGE_TRACER(); + + if (this->block_state != BLOCK_STATE_DONE) { - if (this->input_queues.all_ready() and not this->forecast_fail) - { - this->handle_task(task_iface); - } - else - { - this->mark_done(task_iface); - } - return; + this->block_ptr->stop(); } + this->mark_done(); - //store the topology's thread group - //erase any potentially old lingering threads - //spawn a new thread if this block is a source - if (msg.type() == typeid(SharedThreadGroup)) + this->Send(0, from); //ACK +} + +void BlockActor::handle_top_token( + const TopTokenMessage &message, + const Theron::Address from +){ + MESSAGE_TRACER(); + + //create input tokens and send allocation hints + for (size_t i = 0; i < this->get_num_inputs(); i++) { - this->thread_group = msg.cast<SharedThreadGroup>(); - this->interruptible_thread.reset(); //erase old one - if (task_iface.get_num_inputs() == 0) //its a source - { - this->interruptible_thread = boost::make_shared<InterruptibleThread>( - this->thread_group, boost::bind(&ElementImpl::task_work, this) - ); - } - return; + this->input_tokens[i] = Token::make(); + InputTokenMessage token_msg; + token_msg.token = this->input_tokens[i]; + this->post_upstream(i, token_msg); + + //TODO, schedule this message as a pre-allocation message + //tell the upstream about the input requirements + OutputHintMessage output_hints; + output_hints.history_bytes = this->input_history_items[i]*this->input_items_sizes[i]; + output_hints.reserve_bytes = this->input_multiple_items[i]; + output_hints.token = this->input_tokens[i]; + this->post_upstream(i, output_hints); + } - //user changed some input settings like history or reserve reqs - if (msg.type() == typeid(UpdateInputsMessage)) + //create output token + for (size_t i = 0; i < this->get_num_outputs(); i++) { - this->input_update(task_iface); - return; + this->output_tokens[i] = Token::make(); + OutputTokenMessage token_msg; + token_msg.token = this->output_tokens[i]; + this->post_downstream(i, token_msg); } - ASSERT(msg.type() == typeid(TopBlockMessage)); + //store a token to the top level topology + this->token_pool.insert(message.token); - //FIXME leave the marked done blocks done... - //this helps QA tests to pass that re-use top block without diconnecting the old design - if (this->block_state == BLOCK_STATE_DONE) return; + this->Send(0, from); //ACK +} - const size_t num_inputs = task_iface.get_num_inputs(); - const size_t num_outputs = task_iface.get_num_outputs(); +void BlockActor::handle_top_hint( + const TopHintMessage &message, + const Theron::Address from +){ + MESSAGE_TRACER(); - //allocate output tokens and send them downstream - if (msg.cast<TopBlockMessage>().what == TopBlockMessage::TOKEN_TIME) - { - for (size_t i = 0; i < num_inputs; i++) - { - this->input_tokens[i] = Token::make(); - task_iface.post_upstream(i, this->input_tokens[i]); + this->hint = message.hint; - //TODO, schedule this message as a pre-allocation message - //tell the upstream about the input requirements - BufferHintMessage message; - message.history_bytes = this->input_history_items[i]*this->input_items_sizes[i]; - message.reserve_bytes = this->input_multiple_items[i]; - message.token = this->input_tokens[i]; - task_iface.post_upstream(i, message); + this->Send(0, from); //ACK +} - } - for (size_t i = 0; i < num_outputs; i++) - { - this->output_tokens[i] = Token::make(); - task_iface.post_downstream(i, this->output_tokens[i]); - } - this->token_pool.insert(msg.cast<TopBlockMessage>().token); - } +void BlockActor::handle_top_thread_group( + const SharedThreadGroup &message, + const Theron::Address from +){ + MESSAGE_TRACER(); - if (msg.cast<TopBlockMessage>().what == TopBlockMessage::ALLOCATE) + //store the topology's thread group + //erase any potentially old lingering threads + //spawn a new thread if this block is a source + this->thread_group = message; + this->interruptible_thread.reset(); //erase old one + if (this->get_num_inputs() == 0) //its a source { - //causes initial processing kick-off for source blocks - this->handle_allocation(task_iface); + this->interruptible_thread = boost::make_shared<InterruptibleThread>( + this->thread_group, boost::bind(&BlockActor::task_work, this) + ); } - if (msg.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE) + this->Send(0, from); //ACK +} + +void BlockActor::handle_self_kick( + const SelfKickMessage &, + const Theron::Address +){ + MESSAGE_TRACER(); + this->handle_task(); +} + +void BlockActor::handle_check_tokens( + const CheckTokensMessage &, + const Theron::Address +){ + MESSAGE_TRACER(); + if (this->input_queues.all_ready() and not this->forecast_fail) { - if (this->block_state != BLOCK_STATE_LIVE) - { - this->block_ptr->start(); - } - this->block_state = BLOCK_STATE_LIVE; - if (this->input_queues.all_ready() and this->output_queues.all_ready()) - { - this->block.post_msg(SelfKickMessage()); - } + this->handle_task(); } - - if (msg.cast<TopBlockMessage>().what == TopBlockMessage::INERT) + else { - if (this->block_state != BLOCK_STATE_DONE) - { - this->block_ptr->stop(); - } - this->mark_done(task_iface); + this->mark_done(); } } -void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) -{ - //std::cout << "topology_update in " << name << std::endl; +void BlockActor::handle_topology( + const Apology::WorkerTopologyMessage &, + const Theron::Address +){ + MESSAGE_TRACER(); - const size_t num_inputs = task_iface.get_num_inputs(); - const size_t num_outputs = task_iface.get_num_outputs(); + const size_t num_inputs = this->get_num_inputs(); + const size_t num_outputs = this->get_num_outputs(); //call check_topology on block before committing settings this->block_ptr->check_topology(num_inputs, num_outputs); //fill the item sizes from the IO signatures - fill_item_sizes_from_sig(this->input_items_sizes, this->input_signature, num_inputs); - fill_item_sizes_from_sig(this->output_items_sizes, this->output_signature, num_outputs); + fill_item_sizes_from_sig(this->input_items_sizes, (*block_ptr)->input_signature, num_inputs); + fill_item_sizes_from_sig(this->output_items_sizes, (*block_ptr)->output_signature, num_outputs); //resize and fill port properties resize_fill_back(this->input_history_items, num_inputs); @@ -194,17 +200,19 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) //a block looses all connections, allow it to free if (num_inputs == 0 and num_outputs == 0) { - this->mark_done(task_iface); + this->mark_done(); } this->topology_init = true; - this->input_update(task_iface); + this->handle_update_inputs(UpdateInputsMessage(), Theron::Address()); } -void ElementImpl::input_update(const tsbe::TaskInterface &task_iface) -{ - const size_t num_inputs = task_iface.get_num_inputs(); - const size_t num_outputs = task_iface.get_num_outputs(); +void BlockActor::handle_update_inputs( + const UpdateInputsMessage &, + const Theron::Address +){ + const size_t num_inputs = this->get_num_inputs(); + const size_t num_outputs = this->get_num_outputs(); //impose input reserve requirements based on relative rate and output multiple resize_fill_grow(this->input_multiple_items, num_inputs, 1); |