summaryrefslogtreecommitdiff
path: root/lib/output_handlers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/output_handlers.cpp')
-rw-r--r--lib/output_handlers.cpp34
1 files changed, 17 insertions, 17 deletions
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp
index d937bbf..22a86ef 100644
--- a/lib/output_handlers.cpp
+++ b/lib/output_handlers.cpp
@@ -8,14 +8,14 @@ using namespace gras;
void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t index = message.index;
//a buffer has returned from the downstream
//(all interested consumers have finished with it)
- if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
- this->output_queues.push(index, message.buffer);
+ if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return;
+ data->output_queues.push(index, message.buffer);
ta.done();
this->task_main();
@@ -23,23 +23,23 @@ void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const
void BlockActor::handle_output_token(const OutputTokenMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
ASSERT(message.index < this->get_num_outputs());
//store the token of the downstream consumer
- this->token_pool.insert(message.token);
+ data->token_pool.insert(message.token);
}
void BlockActor::handle_output_check(const OutputCheckMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t index = message.index;
//a downstream block has declared itself done, recheck the token
- this->outputs_done.set(index, this->output_tokens[index].unique());
- if (this->outputs_done.all()) //no downstream subscribers?
+ data->outputs_done.set(index, data->output_tokens[index].unique());
+ if (data->outputs_done.all()) //no downstream subscribers?
{
this->mark_done();
}
@@ -47,7 +47,7 @@ void BlockActor::handle_output_check(const OutputCheckMessage &message, const Th
void BlockActor::handle_output_hint(const OutputHintMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t index = message.index;
@@ -57,7 +57,7 @@ void BlockActor::handle_output_hint(const OutputHintMessage &message, const Ther
//remove any old hints with expired token
//remove any older hints with matching token
std::vector<OutputHintMessage> hints;
- BOOST_FOREACH(const OutputHintMessage &hint, this->output_allocation_hints[index])
+ BOOST_FOREACH(const OutputHintMessage &hint, data->output_allocation_hints[index])
{
if (hint.token.expired()) continue;
if (hint.token.lock() == message.token.lock()) continue;
@@ -67,27 +67,27 @@ void BlockActor::handle_output_hint(const OutputHintMessage &message, const Ther
//store the new hint as well
hints.push_back(message);
- this->output_allocation_hints[index] = hints;
+ data->output_allocation_hints[index] = hints;
}
void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t index = message.index;
//return of a positive downstream allocation
- this->output_queues.set_buffer_queue(index, message.queue);
+ data->output_queues.set_buffer_queue(index, message.queue);
}
void BlockActor::handle_output_update(const OutputUpdateMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t i = message.index;
//update buffer queue configuration
- if (i >= this->output_queues.size()) return;
- const size_t reserve_bytes = this->output_configs[i].item_size*this->output_configs[i].reserve_items;
- this->output_queues.set_reserve_bytes(i, reserve_bytes);
+ if (i >= data->output_queues.size()) return;
+ const size_t reserve_bytes = data->output_configs[i].item_size*data->output_configs[i].reserve_items;
+ data->output_queues.set_reserve_bytes(i, reserve_bytes);
}