// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. #include <gras_impl/block_actor.hpp> #include <boost/make_shared.hpp> #include <boost/bind.hpp> using namespace gras; void BlockActor::handle_top_active( const TopActiveMessage &message, const Theron::Address from ){ MESSAGE_TRACER(); if (this->block_state != BLOCK_STATE_LIVE) { this->block_ptr->notify_active(); this->stats.start_time = time_now(); } this->block_state = BLOCK_STATE_LIVE; this->Send(0, from); //ACK this->task_kicker(); } void BlockActor::handle_top_inert( const TopInertMessage &, const Theron::Address from ){ MESSAGE_TRACER(); this->mark_done(); this->Send(0, from); //ACK } void BlockActor::handle_top_token( const TopTokenMessage &message, const Theron::Address from ){ MESSAGE_TRACER(); //create input tokens and send allocation hints for (size_t i = 0; i < this->get_num_inputs(); i++) { this->input_tokens[i] = Token::make(); this->inputs_done.reset(i); OutputTokenMessage token_msg; token_msg.token = this->input_tokens[i]; this->post_upstream(i, token_msg); //TODO, schedule this message as a pre-allocation message //tell the upstream about the input requirements OutputHintMessage output_hints; output_hints.reserve_bytes = this->input_configs[i].reserve_items*this->input_configs[i].item_size; output_hints.token = this->input_tokens[i]; this->post_upstream(i, output_hints); } //create output token for (size_t i = 0; i < this->get_num_outputs(); i++) { this->output_tokens[i] = Token::make(); this->outputs_done.reset(i); InputTokenMessage token_msg; token_msg.token = this->output_tokens[i]; this->post_downstream(i, token_msg); } //store a token to the top level topology this->token_pool.insert(message.token); this->Send(0, from); //ACK } void BlockActor::handle_top_config( const GlobalBlockConfig &message, const Theron::Address from ){ MESSAGE_TRACER(); //overwrite with global config only if maxium_items is not set (zero) for (size_t i = 0; i < this->output_configs.size(); i++) { if (this->output_configs[i].maximum_items == 0) { this->output_configs[i].maximum_items = message.maximum_output_items; } } //overwrite with global node affinity setting for buffers if not set if (this->buffer_affinity == -1) { this->buffer_affinity = message.buffer_affinity; } this->Send(0, from); //ACK } void BlockActor::handle_top_thread_group( const SharedThreadGroup &message, const Theron::Address from ){ MESSAGE_TRACER(); //store the topology's thread group //erase any potentially old lingering threads //spawn a new thread if this block is a source this->thread_group = message; this->interruptible_thread.reset(); //erase old one if (this->interruptible_work) { this->interruptible_thread = boost::make_shared<InterruptibleThread>( this->thread_group, boost::bind(&BlockActor::task_work, this) ); } this->Send(0, from); //ACK } void BlockActor::handle_self_kick( const SelfKickMessage &, const Theron::Address ){ MESSAGE_TRACER(); this->task_main(); } void BlockActor::handle_get_stats( const GetStatsMessage &, const Theron::Address from ){ MESSAGE_TRACER(); //instantaneous states we update here, //and not interleaved with the rest of the code const size_t num_inputs = this->get_num_inputs(); this->stats.items_enqueued.resize(num_inputs); this->stats.tags_enqueued.resize(num_inputs); this->stats.msgs_enqueued.resize(num_inputs); for (size_t i = 0; i < num_inputs; i++) { this->stats.items_enqueued[i] = this->input_queues.get_items_enqueued(i); this->stats.tags_enqueued[i] = this->input_tags[i].size(); this->stats.msgs_enqueued[i] = this->input_msgs[i].size(); } //create the message reply object GetStatsMessage message; message.block_id = this->block_ptr->to_string(); message.stats = this->stats; message.stats_time = time_now(); this->Send(message, from); //ACK //work could have been skipped by a high prio msg //forcefully kick the task to recheck in a new call this->Send(SelfKickMessage(), this->GetAddress()); }