1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
#include <gras_impl/block_actor.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
using namespace gras;
void BlockActor::handle_top_active(
const TopActiveMessage &message,
const Theron::Address from
){
MESSAGE_TRACER();
if (this->block_state != BLOCK_STATE_LIVE)
{
this->block_ptr->start();
}
this->block_state = BLOCK_STATE_LIVE;
this->active_token = message.token;
this->Push(SelfKickMessage(), Theron::Address());
this->Send(0, from); //ACK
}
void BlockActor::handle_top_inert(
const TopInertMessage &,
const Theron::Address from
){
MESSAGE_TRACER();
this->mark_done();
this->Send(0, from); //ACK
}
void BlockActor::handle_top_token(
const TopTokenMessage &message,
const Theron::Address from
){
MESSAGE_TRACER();
//create input tokens and send allocation hints
for (size_t i = 0; i < this->get_num_inputs(); i++)
{
this->input_tokens[i] = Token::make();
this->inputs_done.reset(i);
OutputTokenMessage token_msg;
token_msg.token = this->input_tokens[i];
this->post_upstream(i, token_msg);
//TODO, schedule this message as a pre-allocation message
//tell the upstream about the input requirements
OutputHintMessage output_hints;
output_hints.reserve_bytes = this->input_configs[i].reserve_items*this->input_items_sizes[i];
output_hints.token = this->input_tokens[i];
this->post_upstream(i, output_hints);
}
//create output token
for (size_t i = 0; i < this->get_num_outputs(); i++)
{
this->output_tokens[i] = Token::make();
this->outputs_done.reset(i);
InputTokenMessage token_msg;
token_msg.token = this->output_tokens[i];
this->post_downstream(i, token_msg);
}
//store a token to the top level topology
this->token_pool.insert(message.token);
this->Send(0, from); //ACK
}
void BlockActor::handle_top_config(
const GlobalBlockConfig &message,
const Theron::Address from
){
MESSAGE_TRACER();
//overwrite with global config only if maxium_items is not set (zero)
for (size_t i = 0; i < this->output_configs.size(); i++)
{
if (this->output_configs[i].maximum_items == 0)
{
this->output_configs[i].maximum_items = message.maximum_output_items;
}
}
//overwrite with global node affinity setting for buffers if not set
if (this->buffer_affinity == -1)
{
this->buffer_affinity = message.buffer_affinity;
}
this->Send(0, from); //ACK
}
void BlockActor::handle_top_thread_group(
const SharedThreadGroup &message,
const Theron::Address from
){
MESSAGE_TRACER();
//store the topology's thread group
//erase any potentially old lingering threads
//spawn a new thread if this block is a source
this->thread_group = message;
this->interruptible_thread.reset(); //erase old one
if (this->interruptible_work)
{
this->interruptible_thread = boost::make_shared<InterruptibleThread>(
this->thread_group, boost::bind(&BlockActor::task_work, this)
);
}
this->Send(0, from); //ACK
}
void BlockActor::handle_self_kick(
const SelfKickMessage &,
const Theron::Address
){
MESSAGE_TRACER();
this->handle_task();
}
|