diff options
Diffstat (limited to 'lib/output_handlers.cpp')
-rw-r--r-- | lib/output_handlers.cpp | 34 |
1 files changed, 17 insertions, 17 deletions
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp index d937bbf..22a86ef 100644 --- a/lib/output_handlers.cpp +++ b/lib/output_handlers.cpp @@ -8,14 +8,14 @@ using namespace gras; void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t index = message.index; //a buffer has returned from the downstream //(all interested consumers have finished with it) - if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; - this->output_queues.push(index, message.buffer); + if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return; + data->output_queues.push(index, message.buffer); ta.done(); this->task_main(); @@ -23,23 +23,23 @@ void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const void BlockActor::handle_output_token(const OutputTokenMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); ASSERT(message.index < this->get_num_outputs()); //store the token of the downstream consumer - this->token_pool.insert(message.token); + data->token_pool.insert(message.token); } void BlockActor::handle_output_check(const OutputCheckMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t index = message.index; //a downstream block has declared itself done, recheck the token - this->outputs_done.set(index, this->output_tokens[index].unique()); - if (this->outputs_done.all()) //no downstream subscribers? + data->outputs_done.set(index, data->output_tokens[index].unique()); + if (data->outputs_done.all()) //no downstream subscribers? { this->mark_done(); } @@ -47,7 +47,7 @@ void BlockActor::handle_output_check(const OutputCheckMessage &message, const Th void BlockActor::handle_output_hint(const OutputHintMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t index = message.index; @@ -57,7 +57,7 @@ void BlockActor::handle_output_hint(const OutputHintMessage &message, const Ther //remove any old hints with expired token //remove any older hints with matching token std::vector<OutputHintMessage> hints; - BOOST_FOREACH(const OutputHintMessage &hint, this->output_allocation_hints[index]) + BOOST_FOREACH(const OutputHintMessage &hint, data->output_allocation_hints[index]) { if (hint.token.expired()) continue; if (hint.token.lock() == message.token.lock()) continue; @@ -67,27 +67,27 @@ void BlockActor::handle_output_hint(const OutputHintMessage &message, const Ther //store the new hint as well hints.push_back(message); - this->output_allocation_hints[index] = hints; + data->output_allocation_hints[index] = hints; } void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t index = message.index; //return of a positive downstream allocation - this->output_queues.set_buffer_queue(index, message.queue); + data->output_queues.set_buffer_queue(index, message.queue); } void BlockActor::handle_output_update(const OutputUpdateMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t i = message.index; //update buffer queue configuration - if (i >= this->output_queues.size()) return; - const size_t reserve_bytes = this->output_configs[i].item_size*this->output_configs[i].reserve_items; - this->output_queues.set_reserve_bytes(i, reserve_bytes); + if (i >= data->output_queues.size()) return; + const size_t reserve_bytes = data->output_configs[i].item_size*data->output_configs[i].reserve_items; + data->output_queues.set_reserve_bytes(i, reserve_bytes); } |