summaryrefslogtreecommitdiff
path: root/lib/block_handlers.cpp
diff options
context:
space:
mode:
authorJosh Blum2013-06-05 22:26:26 -0700
committerJosh Blum2013-06-05 22:26:26 -0700
commit43c7ef3e7807ad3035a2882d237b0441e6102817 (patch)
treee7d559f27cf3329d3c1b7e008b2cb422bd72eda9 /lib/block_handlers.cpp
parent58a54c2cc19113d7644b80ec032b89fa1c38a54a (diff)
downloadsandhi-43c7ef3e7807ad3035a2882d237b0441e6102817.tar.gz
sandhi-43c7ef3e7807ad3035a2882d237b0441e6102817.tar.bz2
sandhi-43c7ef3e7807ad3035a2882d237b0441e6102817.zip
gras: compiles now with separate data struct
Diffstat (limited to 'lib/block_handlers.cpp')
-rw-r--r--lib/block_handlers.cpp70
1 files changed, 35 insertions, 35 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index e0ee67a..0cf4b1f 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -13,12 +13,12 @@ void BlockActor::handle_top_active(
){
MESSAGE_TRACER();
- if (this->block_state != BLOCK_STATE_LIVE)
+ if (data->block_state != BLOCK_STATE_LIVE)
{
- this->block_ptr->notify_active();
- this->stats.start_time = time_now();
+ block_ptr->notify_active();
+ data->stats.start_time = time_now();
}
- this->block_state = BLOCK_STATE_LIVE;
+ data->block_state = BLOCK_STATE_LIVE;
this->Send(0, from); //ACK
@@ -45,17 +45,17 @@ void BlockActor::handle_top_token(
//create input tokens and send allocation hints
for (size_t i = 0; i < this->get_num_inputs(); i++)
{
- this->input_tokens[i] = Token::make();
- this->inputs_done.reset(i);
+ data->input_tokens[i] = Token::make();
+ data->inputs_done.reset(i);
OutputTokenMessage token_msg;
- token_msg.token = this->input_tokens[i];
+ token_msg.token = data->input_tokens[i];
this->post_upstream(i, token_msg);
//TODO, schedule this message as a pre-allocation message
//tell the upstream about the input requirements
OutputHintMessage output_hints;
- output_hints.reserve_bytes = this->input_configs[i].reserve_items*this->input_configs[i].item_size;
- output_hints.token = this->input_tokens[i];
+ output_hints.reserve_bytes = data->input_configs[i].reserve_items*data->input_configs[i].item_size;
+ output_hints.token = data->input_tokens[i];
this->post_upstream(i, output_hints);
}
@@ -63,15 +63,15 @@ void BlockActor::handle_top_token(
//create output token
for (size_t i = 0; i < this->get_num_outputs(); i++)
{
- this->output_tokens[i] = Token::make();
- this->outputs_done.reset(i);
+ data->output_tokens[i] = Token::make();
+ data->outputs_done.reset(i);
InputTokenMessage token_msg;
- token_msg.token = this->output_tokens[i];
+ token_msg.token = data->output_tokens[i];
this->post_downstream(i, token_msg);
}
//store a token to the top level topology
- this->token_pool.insert(message.token);
+ data->token_pool.insert(message.token);
this->Send(0, from); //ACK
}
@@ -83,18 +83,18 @@ void BlockActor::handle_top_config(
MESSAGE_TRACER();
//overwrite with global config only if maxium_items is not set (zero)
- for (size_t i = 0; i < this->output_configs.size(); i++)
+ for (size_t i = 0; i < data->output_configs.size(); i++)
{
- if (this->output_configs[i].maximum_items == 0)
+ if (data->output_configs[i].maximum_items == 0)
{
- this->output_configs[i].maximum_items = message.maximum_output_items;
+ data->output_configs[i].maximum_items = message.maximum_output_items;
}
}
//overwrite with global node affinity setting for buffers if not set
- if (this->buffer_affinity == -1)
+ if (data->buffer_affinity == -1)
{
- this->buffer_affinity = message.buffer_affinity;
+ data->buffer_affinity = message.buffer_affinity;
}
this->Send(0, from); //ACK
@@ -109,12 +109,12 @@ void BlockActor::handle_top_thread_group(
//store the topology's thread group
//erase any potentially old lingering threads
//spawn a new thread if this block is a source
- this->thread_group = message;
- this->interruptible_thread.reset(); //erase old one
- if (this->interruptible_work)
+ data->thread_group = message;
+ data->interruptible_thread.reset(); //erase old one
+ if (data->interruptible_work)
{
- this->interruptible_thread = boost::make_shared<InterruptibleThread>(
- this->thread_group, boost::bind(&BlockActor::task_work, this)
+ data->interruptible_thread = boost::make_shared<InterruptibleThread>(
+ data->thread_group, boost::bind(&BlockActor::task_work, this)
);
}
@@ -138,24 +138,24 @@ void BlockActor::handle_get_stats(
//instantaneous states we update here,
//and not interleaved with the rest of the code
const size_t num_inputs = this->get_num_inputs();
- this->stats.items_enqueued.resize(num_inputs);
- this->stats.tags_enqueued.resize(num_inputs);
- this->stats.msgs_enqueued.resize(num_inputs);
+ data->stats.items_enqueued.resize(num_inputs);
+ data->stats.tags_enqueued.resize(num_inputs);
+ data->stats.msgs_enqueued.resize(num_inputs);
for (size_t i = 0; i < num_inputs; i++)
{
- this->stats.items_enqueued[i] = this->input_queues.get_items_enqueued(i);
- this->stats.tags_enqueued[i] = this->input_tags[i].size();
- this->stats.msgs_enqueued[i] = this->input_msgs[i].size();
+ data->stats.items_enqueued[i] = data->input_queues.get_items_enqueued(i);
+ data->stats.tags_enqueued[i] = data->input_tags[i].size();
+ data->stats.msgs_enqueued[i] = data->input_msgs[i].size();
}
- this->stats.actor_queue_depth = this->GetNumQueuedMessages();
- this->stats.bytes_copied = this->input_queues.bytes_copied;
- this->stats.inputs_idle = this->input_queues.total_idle_times;
- this->stats.outputs_idle = this->output_queues.total_idle_times;
+ data->stats.actor_queue_depth = this->GetNumQueuedMessages();
+ data->stats.bytes_copied = data->input_queues.bytes_copied;
+ data->stats.inputs_idle = data->input_queues.total_idle_times;
+ data->stats.outputs_idle = data->output_queues.total_idle_times;
//create the message reply object
GetStatsMessage message;
- message.block_id = this->block_ptr->get_uid();
- message.stats = this->stats;
+ message.block_id = block_ptr->get_uid();
+ message.stats = data->stats;
message.stats_time = time_now();
this->Send(message, from); //ACK