blob: 9ab786a5afbd00c7a8e5278f5f1fadc8def3d76b (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
#include <gras_impl/block_actor.hpp>
#include "tag_handlers.hpp"
using namespace gras;
void BlockActor::task_main(void)
{
TimerAccumulate ta_prep(this->stats.total_time_prep);
//------------------------------------------------------------------
//-- Decide if its possible to continue any processing:
//-- Handle task may get called for incoming buffers,
//-- however, not all ports may have available buffers.
//------------------------------------------------------------------
if GRAS_UNLIKELY(not this->is_work_allowed()) return;
const size_t num_inputs = this->get_num_inputs();
const size_t num_outputs = this->get_num_outputs();
//------------------------------------------------------------------
//-- initialize input buffers before work
//------------------------------------------------------------------
size_t output_inline_index = 0;
this->input_items.min() = ~0;
this->input_items.max() = 0;
for (size_t i = 0; i < num_inputs; i++)
{
this->sort_tags(i);
this->num_input_msgs_read[i] = 0;
ASSERT(this->input_queues.ready(i));
const SBuffer &buff = this->input_queues.front(i);
const void *mem = buff.get();
size_t items = buff.length/this->input_configs[i].item_size;
this->input_items.vec()[i] = mem;
this->input_items[i].get() = mem;
this->input_items[i].size() = items;
this->input_items.min() = std::min(this->input_items.min(), items);
this->input_items.max() = std::max(this->input_items.max(), items);
//inline dealings, how and when input buffers can be inlined into output buffers
//*
if GRAS_UNLIKELY(
buff.unique() and
input_configs[i].inline_buffer and
output_inline_index < num_outputs and
buff.get_affinity() == this->buffer_affinity
){
//copy buffer reference but push with zero length, same offset
SBuffer new_obuff = buff;
new_obuff.length = 0;
this->flush_output(output_inline_index);
this->output_queues.push(output_inline_index, new_obuff); //you got inlined!
output_inline_index++; //done do this output port again
}
//*/
}
//------------------------------------------------------------------
//-- initialize output buffers before work
//------------------------------------------------------------------
this->output_items.min() = ~0;
this->output_items.max() = 0;
for (size_t i = 0; i < num_outputs; i++)
{
ASSERT(this->output_queues.ready(i));
SBuffer &buff = this->output_queues.front(i);
ASSERT(buff.length == 0); //assumes it was flushed last call
void *mem = buff.get();
const size_t bytes = buff.get_actual_length() - buff.offset;
size_t items = bytes/this->output_configs[i].item_size;
this->output_items.vec()[i] = mem;
this->output_items[i].get() = mem;
this->output_items[i].size() = items;
this->output_items.min() = std::min(this->output_items.min(), items);
this->output_items.max() = std::max(this->output_items.max(), items);
}
//------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
ta_prep.done();
this->stats.work_count++;
if GRAS_UNLIKELY(this->interruptible_thread)
{
TimerAccumulate ta_work(this->stats.total_time_work);
this->interruptible_thread->call();
}
else
{
TimerAccumulate ta_work(this->stats.total_time_work);
this->task_work();
}
this->stats.time_last_work = time_now();
TimerAccumulate ta_post(this->stats.total_time_post);
//------------------------------------------------------------------
//-- Post-work output tasks
//------------------------------------------------------------------
for (size_t i = 0; i < num_outputs; i++)
{
this->flush_output(i);
}
//------------------------------------------------------------------
//-- Post-work input tasks
//------------------------------------------------------------------
for (size_t i = 0; i < num_inputs; i++)
{
this->trim_msgs(i);
//update the inputs available bit field
this->update_input_avail(i);
//missing at least one upstream provider?
//since nothing else is coming in, its safe to mark done
if GRAS_UNLIKELY(this->is_input_done(i)) this->mark_done();
}
//still have IO ready? kick off another task
this->task_kicker();
}
|