summaryrefslogtreecommitdiff
path: root/lib/block_allocator.cpp
blob: 8246037b996a13ec095cecb8e6cc2f66decc2984 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.

#include "element_impl.hpp"
#include <gras_impl/block_actor.hpp>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>

using namespace gras;

const size_t AT_LEAST_BYTES = 32*(1024); //kiB per buffer
const size_t AHH_TOO_MANY_BYTES = 32*(1024*1024); //MiB enough for me
const size_t THIS_MANY_BUFFERS = 8; //pool size

static void buffer_returner(ThreadPool tp, Theron::Address addr, const size_t index, SBuffer &buffer)
{
    //reset offset and length
    buffer.offset = 0;
    buffer.length = 0;
    buffer.last = NULL;

    OutputBufferMessage message;
    message.index = index;
    message.buffer = buffer;
    tp->Send(message, Theron::Address::Null(), addr);
}

static size_t recommend_length(
    const std::vector<OutputHintMessage> &hints,
    const size_t hint_bytes,
    const size_t at_least_bytes,
    const size_t at_most_bytes
){
    //step 1) find the min of all reserves to create a super-reserve
    size_t min_bytes = at_least_bytes;
    BOOST_FOREACH(const OutputHintMessage &hint, hints)
    {
        min_bytes = std::max(min_bytes, hint.reserve_bytes);
    }

    //step 2) N x super reserve of hard-coded mimimum items
    size_t Nmin_bytes = min_bytes;
    while (hint_bytes > Nmin_bytes)
    {
        Nmin_bytes += min_bytes;
    }

    //step 3) cap the maximum size by the upper bound (if set)
    if (at_most_bytes) Nmin_bytes = std::min(Nmin_bytes, at_most_bytes);
    else Nmin_bytes = std::min(Nmin_bytes, AHH_TOO_MANY_BYTES);

    return Nmin_bytes;
}

#define my_round_up_mult(num, mult) (((num)*(mult))+(mult)-1)/(mult)

void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address from)
{
    MESSAGE_TRACER();

    //allocate output buffers which will also wake up the task
    const size_t num_outputs = worker->get_num_outputs();
    for (size_t i = 0; i < num_outputs; i++)
    {
        const size_t bytes = recommend_length(
            data->output_allocation_hints[i],
            my_round_up_mult(AT_LEAST_BYTES, data->output_configs[i].item_size),
            data->output_configs[i].reserve_items*data->output_configs[i].item_size,
            data->output_configs[i].maximum_items*data->output_configs[i].item_size
        );

        SBufferDeleter deleter = boost::bind(&buffer_returner, this->thread_pool, this->GetAddress(), i, _1);
        SBufferToken token = SBufferToken(new SBufferDeleter(deleter));

        SBufferConfig config;
        config.memory = NULL;
        config.length = bytes;
        config.affinity = data->global_config.buffer_affinity;
        config.token = token;

        BufferQueueSptr queue = data->block->output_buffer_allocator(i, config);
        data->output_queues.set_buffer_queue(i, queue);

        InputAllocMessage message;
        message.config = config;
        message.token = token;
        worker->post_downstream(i, message);
    }

    this->Send(0, from); //ACK
}

BufferQueueSptr Block::output_buffer_allocator(
    const size_t, const SBufferConfig &config
){
    return BufferQueue::make_pool(config, THIS_MANY_BUFFERS);
}

BufferQueueSptr Block::input_buffer_allocator(
    const size_t, const SBufferConfig &
){
    return BufferQueueSptr(); //null
}