diff options
author | Josh Blum | 2013-06-05 22:26:26 -0700 |
---|---|---|
committer | Josh Blum | 2013-06-05 22:26:26 -0700 |
commit | 43c7ef3e7807ad3035a2882d237b0441e6102817 (patch) | |
tree | e7d559f27cf3329d3c1b7e008b2cb422bd72eda9 /lib/block_handlers.cpp | |
parent | 58a54c2cc19113d7644b80ec032b89fa1c38a54a (diff) | |
download | sandhi-43c7ef3e7807ad3035a2882d237b0441e6102817.tar.gz sandhi-43c7ef3e7807ad3035a2882d237b0441e6102817.tar.bz2 sandhi-43c7ef3e7807ad3035a2882d237b0441e6102817.zip |
gras: compiles now with separate data struct
Diffstat (limited to 'lib/block_handlers.cpp')
-rw-r--r-- | lib/block_handlers.cpp | 70 |
1 files changed, 35 insertions, 35 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index e0ee67a..0cf4b1f 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -13,12 +13,12 @@ void BlockActor::handle_top_active( ){ MESSAGE_TRACER(); - if (this->block_state != BLOCK_STATE_LIVE) + if (data->block_state != BLOCK_STATE_LIVE) { - this->block_ptr->notify_active(); - this->stats.start_time = time_now(); + block_ptr->notify_active(); + data->stats.start_time = time_now(); } - this->block_state = BLOCK_STATE_LIVE; + data->block_state = BLOCK_STATE_LIVE; this->Send(0, from); //ACK @@ -45,17 +45,17 @@ void BlockActor::handle_top_token( //create input tokens and send allocation hints for (size_t i = 0; i < this->get_num_inputs(); i++) { - this->input_tokens[i] = Token::make(); - this->inputs_done.reset(i); + data->input_tokens[i] = Token::make(); + data->inputs_done.reset(i); OutputTokenMessage token_msg; - token_msg.token = this->input_tokens[i]; + token_msg.token = data->input_tokens[i]; this->post_upstream(i, token_msg); //TODO, schedule this message as a pre-allocation message //tell the upstream about the input requirements OutputHintMessage output_hints; - output_hints.reserve_bytes = this->input_configs[i].reserve_items*this->input_configs[i].item_size; - output_hints.token = this->input_tokens[i]; + output_hints.reserve_bytes = data->input_configs[i].reserve_items*data->input_configs[i].item_size; + output_hints.token = data->input_tokens[i]; this->post_upstream(i, output_hints); } @@ -63,15 +63,15 @@ void BlockActor::handle_top_token( //create output token for (size_t i = 0; i < this->get_num_outputs(); i++) { - this->output_tokens[i] = Token::make(); - this->outputs_done.reset(i); + data->output_tokens[i] = Token::make(); + data->outputs_done.reset(i); InputTokenMessage token_msg; - token_msg.token = this->output_tokens[i]; + token_msg.token = data->output_tokens[i]; this->post_downstream(i, token_msg); } //store a token to the top level topology - this->token_pool.insert(message.token); + data->token_pool.insert(message.token); this->Send(0, from); //ACK } @@ -83,18 +83,18 @@ void BlockActor::handle_top_config( MESSAGE_TRACER(); //overwrite with global config only if maxium_items is not set (zero) - for (size_t i = 0; i < this->output_configs.size(); i++) + for (size_t i = 0; i < data->output_configs.size(); i++) { - if (this->output_configs[i].maximum_items == 0) + if (data->output_configs[i].maximum_items == 0) { - this->output_configs[i].maximum_items = message.maximum_output_items; + data->output_configs[i].maximum_items = message.maximum_output_items; } } //overwrite with global node affinity setting for buffers if not set - if (this->buffer_affinity == -1) + if (data->buffer_affinity == -1) { - this->buffer_affinity = message.buffer_affinity; + data->buffer_affinity = message.buffer_affinity; } this->Send(0, from); //ACK @@ -109,12 +109,12 @@ void BlockActor::handle_top_thread_group( //store the topology's thread group //erase any potentially old lingering threads //spawn a new thread if this block is a source - this->thread_group = message; - this->interruptible_thread.reset(); //erase old one - if (this->interruptible_work) + data->thread_group = message; + data->interruptible_thread.reset(); //erase old one + if (data->interruptible_work) { - this->interruptible_thread = boost::make_shared<InterruptibleThread>( - this->thread_group, boost::bind(&BlockActor::task_work, this) + data->interruptible_thread = boost::make_shared<InterruptibleThread>( + data->thread_group, boost::bind(&BlockActor::task_work, this) ); } @@ -138,24 +138,24 @@ void BlockActor::handle_get_stats( //instantaneous states we update here, //and not interleaved with the rest of the code const size_t num_inputs = this->get_num_inputs(); - this->stats.items_enqueued.resize(num_inputs); - this->stats.tags_enqueued.resize(num_inputs); - this->stats.msgs_enqueued.resize(num_inputs); + data->stats.items_enqueued.resize(num_inputs); + data->stats.tags_enqueued.resize(num_inputs); + data->stats.msgs_enqueued.resize(num_inputs); for (size_t i = 0; i < num_inputs; i++) { - this->stats.items_enqueued[i] = this->input_queues.get_items_enqueued(i); - this->stats.tags_enqueued[i] = this->input_tags[i].size(); - this->stats.msgs_enqueued[i] = this->input_msgs[i].size(); + data->stats.items_enqueued[i] = data->input_queues.get_items_enqueued(i); + data->stats.tags_enqueued[i] = data->input_tags[i].size(); + data->stats.msgs_enqueued[i] = data->input_msgs[i].size(); } - this->stats.actor_queue_depth = this->GetNumQueuedMessages(); - this->stats.bytes_copied = this->input_queues.bytes_copied; - this->stats.inputs_idle = this->input_queues.total_idle_times; - this->stats.outputs_idle = this->output_queues.total_idle_times; + data->stats.actor_queue_depth = this->GetNumQueuedMessages(); + data->stats.bytes_copied = data->input_queues.bytes_copied; + data->stats.inputs_idle = data->input_queues.total_idle_times; + data->stats.outputs_idle = data->output_queues.total_idle_times; //create the message reply object GetStatsMessage message; - message.block_id = this->block_ptr->get_uid(); - message.stats = this->stats; + message.block_id = block_ptr->get_uid(); + message.stats = data->stats; message.stats_time = time_now(); this->Send(message, from); //ACK |