1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
//
// Copyright 2012 Josh Blum
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "element_impl.hpp"
#include <gras_impl/block_actor.hpp>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
using namespace gras;
const size_t AT_LEAST_DEFAULT_ITEMS = 1 << 13;
const size_t AHH_TOO_MANY_BYTES = 1 << 20; //TODO
const size_t THIS_MANY_BUFFERS = 32;
const double EDGE_CASE_MITIGATION = 8.0; //edge case mitigation constant
//TODO will need more complicated later
void BlockActor::buffer_returner(const size_t index, SBuffer &buffer)
{
//reset offset and length
buffer.offset = 0;
buffer.length = 0;
OutputBufferMessage message;
message.index = index;
message.buffer = buffer;
this->Push(message, Theron::Address());
}
static size_t recommend_length(
const std::vector<OutputHintMessage> &hints,
const size_t hint_bytes,
const size_t at_least_bytes,
const size_t at_most_bytes
){
//step 1) find the min of all reserves to create a super-reserve
size_t min_bytes = at_least_bytes;
BOOST_FOREACH(const OutputHintMessage &hint, hints)
{
min_bytes = std::max(min_bytes, hint.reserve_bytes);
}
//step 2) N x super reserve to minimize history edge case
size_t Nmin_bytes = min_bytes;
BOOST_FOREACH(const OutputHintMessage &hint, hints)
{
while (hint.history_bytes*EDGE_CASE_MITIGATION > Nmin_bytes)
{
Nmin_bytes += min_bytes;
}
}
while (hint_bytes > Nmin_bytes)
{
Nmin_bytes += min_bytes;
}
//step 3) cap the maximum size by the upper bound (if set)
if (at_most_bytes) Nmin_bytes = std::min(Nmin_bytes, at_most_bytes);
else Nmin_bytes = std::min(Nmin_bytes, AHH_TOO_MANY_BYTES);
return Nmin_bytes;
}
void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address from)
{
MESSAGE_TRACER();
//allocate output buffers which will also wake up the task
const size_t num_outputs = this->get_num_outputs();
this->output_buffer_tokens.resize(num_outputs);
for (size_t i = 0; i < num_outputs; i++)
{
const size_t bytes = recommend_length(
this->output_allocation_hints[i],
AT_LEAST_DEFAULT_ITEMS*this->output_items_sizes[i],
this->output_configs[i].reserve_items*this->output_items_sizes[i],
this->output_configs[i].maximum_items*this->output_items_sizes[i]
);
SBufferDeleter deleter = boost::bind(&BlockActor::buffer_returner, this, i, _1);
SBufferToken token = SBufferToken(new SBufferDeleter(deleter));
this->output_buffer_tokens[i] = block_ptr->output_buffer_allocator(i, token, bytes);
InputAllocMessage message;
message.token = SBufferToken(new SBufferDeleter(deleter));
message.recommend_length = bytes;
this->post_downstream(i, message);
}
this->Send(0, from); //ACK
}
SBufferToken Block::output_buffer_allocator(
const size_t,
const SBufferToken &token,
const size_t recommend_length
){
for (size_t j = 0; j < THIS_MANY_BUFFERS; j++)
{
SBufferConfig config;
config.memory = NULL;
config.length = recommend_length;
config.affinity = (*this)->block->buffer_affinity;
config.token = token;
SBuffer buff(config);
std::memset(buff.get_actual_memory(), 0, buff.get_actual_length());
//buffer derefs here and the token messages it back to the block
}
return token;
}
SBufferToken Block::input_buffer_allocator(
const size_t,
const SBufferToken &,
const size_t
){
return SBufferToken(); //null
}
|