diff options
Diffstat (limited to 'lib/task_done.cpp')
-rw-r--r-- | lib/task_done.cpp | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/lib/task_done.cpp b/lib/task_done.cpp new file mode 100644 index 0000000..d116202 --- /dev/null +++ b/lib/task_done.cpp @@ -0,0 +1,68 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include "element_impl.hpp" +#include <gras_impl/block_actor.hpp> + +using namespace gras; + +void Block::mark_done(void) +{ + (*this)->block->mark_done(); +} + +void BlockActor::mark_done(void) +{ + if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first + + this->stats.stop_time = time_now(); + this->block_ptr->notify_inactive(); + + //flush partial output buffers to the downstream + for (size_t i = 0; i < this->get_num_outputs(); i++) + { + if (not this->output_queues.ready(i)) continue; + SBuffer &buff = this->output_queues.front(i); + if (buff.length == 0) continue; + InputBufferMessage buff_msg; + buff_msg.buffer = buff; + this->post_downstream(i, buff_msg); + this->output_queues.pop(i); + } + + this->interruptible_thread.reset(); + + //mark down the new state + this->block_state = BLOCK_STATE_DONE; + + //release upstream, downstream, and executor tokens + this->token_pool.clear(); + + //release all buffers in queues + this->input_queues.flush_all(); + this->output_queues.flush_all(); + + //release all tags and msgs + for (size_t i = 0; i < this->get_num_inputs(); i++) + { + this->input_msgs[i].clear(); + this->input_tags[i].clear(); + } + + //tell the upstream and downstram to re-check their tokens + //this is how the other blocks know who is interested, + //and can decide based on interest to set done or not + for (size_t i = 0; i < this->get_num_inputs(); i++) + { + this->post_upstream(i, OutputCheckMessage()); + } + for (size_t i = 0; i < this->get_num_outputs(); i++) + { + this->post_downstream(i, InputCheckMessage()); + } + + if (ARMAGEDDON) std::cerr + << "==================================================\n" + << "== The " << block_ptr->to_string() << " is done...\n" + << "==================================================\n" + << std::flush; +} |