diff options
Diffstat (limited to 'lib/input_handlers.cpp')
-rw-r--r-- | lib/input_handlers.cpp | 42 |
1 files changed, 21 insertions, 21 deletions
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index faddda8..1210688 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -7,25 +7,25 @@ using namespace gras; void BlockActor::handle_input_tag(const InputTagMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //handle incoming stream tag, push into the tag storage - if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; - this->input_tags[index].push_back(message.tag); - this->input_tags_changed[index] = true; + if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return; + data->input_tags[index].push_back(message.tag); + data->input_tags_changed[index] = true; } void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //handle incoming async message, push into the msg storage - if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; - this->input_msgs[index].push_back(message.msg); + if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return; + data->input_msgs[index].push_back(message.msg); this->update_input_avail(index); ta.done(); @@ -34,13 +34,13 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron:: void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //handle incoming stream buffer, push into the queue - if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; - this->input_queues.push(index, message.buffer); + if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return; + data->input_queues.push(index, message.buffer); this->update_input_avail(index); ta.done(); @@ -49,22 +49,22 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th void BlockActor::handle_input_token(const InputTokenMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); ASSERT(message.index < this->get_num_inputs()); //store the token of the upstream producer - this->token_pool.insert(message.token); + data->token_pool.insert(message.token); } void BlockActor::handle_input_check(const InputCheckMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //an upstream block declared itself done, recheck the token - this->inputs_done.set(index, this->input_tokens[index].unique()); + data->inputs_done.set(index, data->input_tokens[index].unique()); //upstream done, give it one more attempt at task handling ta.done(); @@ -76,7 +76,7 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; @@ -88,14 +88,14 @@ void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Ther void BlockActor::handle_input_update(const InputUpdateMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t i = message.index; //update buffer queue configuration - if (i >= this->input_queues.size()) return; - const size_t preload_bytes = this->input_configs[i].item_size*this->input_configs[i].preload_items; - const size_t reserve_bytes = this->input_configs[i].item_size*this->input_configs[i].reserve_items; - const size_t maximum_bytes = this->input_configs[i].item_size*this->input_configs[i].maximum_items; - this->input_queues.update_config(i, this->input_configs[i].item_size, preload_bytes, reserve_bytes, maximum_bytes); + if (i >= data->input_queues.size()) return; + const size_t preload_bytes = data->input_configs[i].item_size*data->input_configs[i].preload_items; + const size_t reserve_bytes = data->input_configs[i].item_size*data->input_configs[i].reserve_items; + const size_t maximum_bytes = data->input_configs[i].item_size*data->input_configs[i].maximum_items; + data->input_queues.update_config(i, data->input_configs[i].item_size, preload_bytes, reserve_bytes, maximum_bytes); } |