summaryrefslogtreecommitdiff
path: root/lib/task_done.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/task_done.cpp')
-rw-r--r--lib/task_done.cpp26
1 files changed, 13 insertions, 13 deletions
diff --git a/lib/task_done.cpp b/lib/task_done.cpp
index d116202..b6e61a5 100644
--- a/lib/task_done.cpp
+++ b/lib/task_done.cpp
@@ -12,40 +12,40 @@ void Block::mark_done(void)
void BlockActor::mark_done(void)
{
- if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
+ if (data->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
- this->stats.stop_time = time_now();
- this->block_ptr->notify_inactive();
+ data->stats.stop_time = time_now();
+ block_ptr->notify_inactive();
//flush partial output buffers to the downstream
for (size_t i = 0; i < this->get_num_outputs(); i++)
{
- if (not this->output_queues.ready(i)) continue;
- SBuffer &buff = this->output_queues.front(i);
+ if (not data->output_queues.ready(i)) continue;
+ SBuffer &buff = data->output_queues.front(i);
if (buff.length == 0) continue;
InputBufferMessage buff_msg;
buff_msg.buffer = buff;
this->post_downstream(i, buff_msg);
- this->output_queues.pop(i);
+ data->output_queues.pop(i);
}
- this->interruptible_thread.reset();
+ data->interruptible_thread.reset();
//mark down the new state
- this->block_state = BLOCK_STATE_DONE;
+ data->block_state = BLOCK_STATE_DONE;
//release upstream, downstream, and executor tokens
- this->token_pool.clear();
+ data->token_pool.clear();
//release all buffers in queues
- this->input_queues.flush_all();
- this->output_queues.flush_all();
+ data->input_queues.flush_all();
+ data->output_queues.flush_all();
//release all tags and msgs
for (size_t i = 0; i < this->get_num_inputs(); i++)
{
- this->input_msgs[i].clear();
- this->input_tags[i].clear();
+ data->input_msgs[i].clear();
+ data->input_tags[i].clear();
}
//tell the upstream and downstram to re-check their tokens