summaryrefslogtreecommitdiff
path: root/lib/task_main.cpp
blob: 9ab786a5afbd00c7a8e5278f5f1fadc8def3d76b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.

#include <gras_impl/block_actor.hpp>
#include "tag_handlers.hpp"

using namespace gras;

void BlockActor::task_main(void)
{
    TimerAccumulate ta_prep(this->stats.total_time_prep);

    //------------------------------------------------------------------
    //-- Decide if its possible to continue any processing:
    //-- Handle task may get called for incoming buffers,
    //-- however, not all ports may have available buffers.
    //------------------------------------------------------------------
    if GRAS_UNLIKELY(not this->is_work_allowed()) return;

    const size_t num_inputs = this->get_num_inputs();
    const size_t num_outputs = this->get_num_outputs();

    //------------------------------------------------------------------
    //-- initialize input buffers before work
    //------------------------------------------------------------------
    size_t output_inline_index = 0;
    this->input_items.min() = ~0;
    this->input_items.max() = 0;
    for (size_t i = 0; i < num_inputs; i++)
    {
        this->sort_tags(i);
        this->num_input_msgs_read[i] = 0;

        ASSERT(this->input_queues.ready(i));
        const SBuffer &buff = this->input_queues.front(i);
        const void *mem = buff.get();
        size_t items = buff.length/this->input_configs[i].item_size;

        this->input_items.vec()[i] = mem;
        this->input_items[i].get() = mem;
        this->input_items[i].size() = items;
        this->input_items.min() = std::min(this->input_items.min(), items);
        this->input_items.max() = std::max(this->input_items.max(), items);

        //inline dealings, how and when input buffers can be inlined into output buffers
        //*
        if GRAS_UNLIKELY(
            buff.unique() and
            input_configs[i].inline_buffer and
            output_inline_index < num_outputs and
            buff.get_affinity() == this->buffer_affinity
        ){
            //copy buffer reference but push with zero length, same offset
            SBuffer new_obuff = buff;
            new_obuff.length = 0;
            this->flush_output(output_inline_index);
            this->output_queues.push(output_inline_index, new_obuff); //you got inlined!
            output_inline_index++; //done do this output port again
        }
        //*/
    }

    //------------------------------------------------------------------
    //-- initialize output buffers before work
    //------------------------------------------------------------------
    this->output_items.min() = ~0;
    this->output_items.max() = 0;
    for (size_t i = 0; i < num_outputs; i++)
    {
        ASSERT(this->output_queues.ready(i));
        SBuffer &buff = this->output_queues.front(i);
        ASSERT(buff.length == 0); //assumes it was flushed last call
        void *mem = buff.get();
        const size_t bytes = buff.get_actual_length() - buff.offset;
        size_t items = bytes/this->output_configs[i].item_size;

        this->output_items.vec()[i] = mem;
        this->output_items[i].get() = mem;
        this->output_items[i].size() = items;
        this->output_items.min() = std::min(this->output_items.min(), items);
        this->output_items.max() = std::max(this->output_items.max(), items);
    }

    //------------------------------------------------------------------
    //-- the work
    //------------------------------------------------------------------
    ta_prep.done();
    this->stats.work_count++;
    if GRAS_UNLIKELY(this->interruptible_thread)
    {
        TimerAccumulate ta_work(this->stats.total_time_work);
        this->interruptible_thread->call();
    }
    else
    {
        TimerAccumulate ta_work(this->stats.total_time_work);
        this->task_work();
    }
    this->stats.time_last_work = time_now();
    TimerAccumulate ta_post(this->stats.total_time_post);

    //------------------------------------------------------------------
    //-- Post-work output tasks
    //------------------------------------------------------------------
    for (size_t i = 0; i < num_outputs; i++)
    {
        this->flush_output(i);
    }

    //------------------------------------------------------------------
    //-- Post-work input tasks
    //------------------------------------------------------------------
    for (size_t i = 0; i < num_inputs; i++)
    {
        this->trim_msgs(i);

        //update the inputs available bit field
        this->update_input_avail(i);

        //missing at least one upstream provider?
        //since nothing else is coming in, its safe to mark done
        if GRAS_UNLIKELY(this->is_input_done(i)) this->mark_done();
    }

    //still have IO ready? kick off another task
    this->task_kicker();
}