diff options
Diffstat (limited to 'lib/task_done.cpp')
-rw-r--r-- | lib/task_done.cpp | 26 |
1 files changed, 13 insertions, 13 deletions
diff --git a/lib/task_done.cpp b/lib/task_done.cpp index d116202..b6e61a5 100644 --- a/lib/task_done.cpp +++ b/lib/task_done.cpp @@ -12,40 +12,40 @@ void Block::mark_done(void) void BlockActor::mark_done(void) { - if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first + if (data->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first - this->stats.stop_time = time_now(); - this->block_ptr->notify_inactive(); + data->stats.stop_time = time_now(); + block_ptr->notify_inactive(); //flush partial output buffers to the downstream for (size_t i = 0; i < this->get_num_outputs(); i++) { - if (not this->output_queues.ready(i)) continue; - SBuffer &buff = this->output_queues.front(i); + if (not data->output_queues.ready(i)) continue; + SBuffer &buff = data->output_queues.front(i); if (buff.length == 0) continue; InputBufferMessage buff_msg; buff_msg.buffer = buff; this->post_downstream(i, buff_msg); - this->output_queues.pop(i); + data->output_queues.pop(i); } - this->interruptible_thread.reset(); + data->interruptible_thread.reset(); //mark down the new state - this->block_state = BLOCK_STATE_DONE; + data->block_state = BLOCK_STATE_DONE; //release upstream, downstream, and executor tokens - this->token_pool.clear(); + data->token_pool.clear(); //release all buffers in queues - this->input_queues.flush_all(); - this->output_queues.flush_all(); + data->input_queues.flush_all(); + data->output_queues.flush_all(); //release all tags and msgs for (size_t i = 0; i < this->get_num_inputs(); i++) { - this->input_msgs[i].clear(); - this->input_tags[i].clear(); + data->input_msgs[i].clear(); + data->input_tags[i].clear(); } //tell the upstream and downstram to re-check their tokens |