summaryrefslogtreecommitdiff
path: root/lib/task_done.cpp
blob: b6e61a5c69cd858462817ab4e3b36c668abf47fb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.

#include "element_impl.hpp"
#include <gras_impl/block_actor.hpp>

using namespace gras;

void Block::mark_done(void)
{
    (*this)->block->mark_done();
}

void BlockActor::mark_done(void)
{
    if (data->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first

    data->stats.stop_time = time_now();
    block_ptr->notify_inactive();

    //flush partial output buffers to the downstream
    for (size_t i = 0; i < this->get_num_outputs(); i++)
    {
        if (not data->output_queues.ready(i)) continue;
        SBuffer &buff = data->output_queues.front(i);
        if (buff.length == 0) continue;
        InputBufferMessage buff_msg;
        buff_msg.buffer = buff;
        this->post_downstream(i, buff_msg);
        data->output_queues.pop(i);
    }

    data->interruptible_thread.reset();

    //mark down the new state
    data->block_state = BLOCK_STATE_DONE;

    //release upstream, downstream, and executor tokens
    data->token_pool.clear();

    //release all buffers in queues
    data->input_queues.flush_all();
    data->output_queues.flush_all();

    //release all tags and msgs
    for (size_t i = 0; i < this->get_num_inputs(); i++)
    {
        data->input_msgs[i].clear();
        data->input_tags[i].clear();
    }

    //tell the upstream and downstram to re-check their tokens
    //this is how the other blocks know who is interested,
    //and can decide based on interest to set done or not
    for (size_t i = 0; i < this->get_num_inputs(); i++)
    {
        this->post_upstream(i, OutputCheckMessage());
    }
    for (size_t i = 0; i < this->get_num_outputs(); i++)
    {
        this->post_downstream(i, InputCheckMessage());
    }

    if (ARMAGEDDON) std::cerr
        << "==================================================\n"
        << "== The " << block_ptr->to_string() << " is done...\n"
        << "==================================================\n"
        << std::flush;
}