summaryrefslogtreecommitdiff
path: root/include/gras/block.hpp
blob: 983d54a355aee478ecf2b36362a2edcea4f4914a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
//
// Copyright 2012 Josh Blum
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

#ifndef INCLUDED_GRAS_BLOCK_HPP
#define INCLUDED_GRAS_BLOCK_HPP

#include <gras/element.hpp>
#include <gras/sbuffer.hpp>
#include <gras/tags.hpp>
#include <vector>
#include <string>
#include <boost/range.hpp> //iterator range

namespace gras
{

//! Configuration parameters for an input port
struct GRAS_API InputPortConfig
{
    InputPortConfig(void);

    /*!
     * Set buffer inlining for this port config.
     * Inlining means that the input buffer can be used as an output buffer.
     * The goal is to make better use of cache and memory bandwidth.
     *
     * By default, inlining is disabled on all input ports.
     * The user should enable inlining on an input port
     * when it is understood that the work function will read
     * before writting to a particular section of the buffer.
     *
     * The scheduler will inline a buffer when
     *  * inlining is enabled on the particular input port
     *  * block holds the only buffer reference aka unique
     *  * the input buffer has the same affinity as the block
     *  * the input port has a buffer look-ahead of 0
     *
     * Default = false.
     */
    bool inline_buffer;

    /*!
     * Set the number of input buffer look-ahead items.
     * When num look-ahead items are not consumed,
     * they will be available for the next work call.
     * This is used to implement sample memory for
     * things like sliding dot products/FIR filters.
     *
     * Default = 0.
     */
    size_t lookahead_items;
};

//! Configuration parameters for an output port
struct GRAS_API OutputPortConfig
{
    OutputPortConfig(void);

    /*!
     * Set an output reserve requirement such that work is called
     * with an output buffer at least reserve items in size.
     *
     * Default = 1.
     */
    size_t reserve_items;

    /*!
     * Constrain the maximum number of items that
     * work can be called with for this port.
     *
     * Default = 0 aka disabled.
     */
    size_t maximum_items;
};

template <typename PtrType> struct WorkBuffer
{
    //! get a native pointer type to this buffer
    inline PtrType get(void) const
    {
        return _mem;
    }

    //! get a pointer of the desired type to this buffer
    template <typename T> inline T cast(void) const
    {
        return reinterpret_cast<T>(this->get());
    }

    //! get the number of items in this buffer
    inline size_t size(void) const
    {
        return _len;
    }

    //! Get the memory pointer reference
    inline PtrType &get(void)
    {
        return _mem;
    }

    //! Get the buffer length reference
    inline size_t &size(void)
    {
        return _len;
    }

    PtrType _mem;
    size_t _len;
};

struct GRAS_API Block : Element
{

    //! Contruct an empty/null block
    Block(void);

    //! Create a new block given the name
    Block(const std::string &name);

    /*******************************************************************
     * Deal with input and output port configuration
     ******************************************************************/

    //! Get the configuration rules of an input port
    InputPortConfig input_config(const size_t which_input = 0) const;

    //! Set the configuration rules for an input port
    void set_input_config(const InputPortConfig &config, const size_t which_input = 0);

    //! Get the configuration rules of an output port
    OutputPortConfig output_config(const size_t which_output = 0) const;

    //! Set the configuration rules for an output port
    void set_output_config(const OutputPortConfig &config, const size_t which_output = 0);

    /*!
     * Enable fixed rate logic.
     * When enabled, relative rate is assumed to be set,
     * and forecast is automatically called.
     * Also, consume will be called automatically.
     */
    void set_fixed_rate(const bool fixed_rate);

    //! Get the fixed rate setting
    bool fixed_rate(void) const;

    /*!
     * The relative rate can be thought of as interpolation/decimation.
     * In other words, relative rate is the ratio of output items to input items.
     */
    void set_relative_rate(const double relative_rate);

    //! Get the relative rate setting
    double relative_rate(void) const;

    /*!
     * The output multiple setting controls work output buffer sizes.
     * Buffers will be number of items modulo rounted to the multiple.
     */
    void set_output_multiple(const size_t multiple);

    //! Get the output multiple setting
    size_t output_multiple(void) const;

    /*******************************************************************
     * Deal with data production and consumption
     ******************************************************************/

    //! Return options for the work call
    enum
    {
        WORK_CALLED_PRODUCE = -2,
        WORK_DONE = -1
    };

    //! Call during work to consume items
    void consume(const size_t which_input, const size_t how_many_items);

    //! Call during work to consume items
    void consume_each(const size_t how_many_items);

    //! Call during work to produce items, must return WORK_CALLED_PRODUCE
    void produce(const size_t which_output, const size_t how_many_items);

    /*******************************************************************
     * Deal with tag handling and tag configuration
     ******************************************************************/

    enum tag_propagation_policy_t
    {
        TPP_DONT = 0,
        TPP_ALL_TO_ALL = 1,
        TPP_ONE_TO_ONE = 2
    };

    uint64_t nitems_read(const size_t which_input);

    uint64_t nitems_written(const size_t which_output);

    tag_propagation_policy_t tag_propagation_policy(void);

    void set_tag_propagation_policy(tag_propagation_policy_t p);

    //! Send a tag to the downstream on the given output port
    void post_output_tag(const size_t which_output, const Tag &tag);

    //! Iterator return type get_input_tags - stl and boost compliant
    typedef boost::iterator_range<std::vector<Tag>::const_iterator> TagIter;

    //! Get an iterator of item tags for the given input
    TagIter get_input_tags(const size_t which_input = 0);

    /*******************************************************************
     * Work related routines from basic block
     ******************************************************************/

    //! Called when the flow graph is started, can overload
    virtual bool start(void);

    //! Called when the flow graph is stopped, can overload
    virtual bool stop(void);

    typedef std::vector<WorkBuffer<const void *> > InputItems;
    typedef std::vector<WorkBuffer<void *> > OutputItems;

    //! The official call into the work routine (overload please)
    virtual int work(
        const InputItems &input_items,
        const OutputItems &output_items
    ) = 0;

    //! forcast requirements, can be overloaded
    virtual void forecast(
        int noutput_items,
        std::vector<int> &ninput_items_required
    );

    //! scheduler calls when the topology is updated, can be overloaded
    virtual bool check_topology(int ninputs, int noutputs);

    /*!
     * Set if the work call should be interruptible by stop().
     * Some work implementations block with the expectation of
     * getting a boost thread interrupt in a blocking call.
     * Set set_interruptible_work(true) if this is the case.
     * By default, work implementations are not interruptible.
     */
    void set_interruptible_work(const bool enb);

    /*******************************************************************
     * routines related to affinity and allocation
     ******************************************************************/

    /*!
     * Set the node affinity of this block.
     * This call affects how output buffers are allocated.
     * By default memory is allocated by malloc.
     * When the affinity is set, virtual memory
     * will be locked to a physical CPU/memory node.
     * \param affinity a memory node on the system
     */
    void set_buffer_affinity(const long affinity);

    /*!
     * The output buffer allocator method.
     * This method is called by the scheduler to allocate output buffers.
     * The user may overload this method to create a custom allocator.
     *
     * Example use case:
     * //TODO code example
     *
     * \param which_output the output port index number
     * \param token the token for the buffer's returner
     * \param recommend_length the schedulers recommended length in bytes
     * \return the token used for the buffer allocation (may be the same)
     */
    virtual SBufferToken output_buffer_allocator(
        const size_t which_output,
        const SBufferToken &token,
        const size_t recommend_length
    );

    /*!
     * The input buffer allocator method.
     * This method is special and very different from allocate output buffers.
     * Typically, blocks do not have control of their input buffers.
     * When overloaded, an upstream block will ask this block
     * to allocate its output buffers. This way, this block will get
     * input buffers which were actually allocated by this method.
     *
     * \param which_input the input port index number
     * \param token the token for the buffer's returner
     * \param recommend_length the schedulers recommended length in bytes
     * \return the token used for the buffer allocation (may be the same)
     */
    virtual SBufferToken input_buffer_allocator(
        const size_t which_input,
        const SBufferToken &token,
        const size_t recommend_length
    );

};

} //namespace gras

#endif /*INCLUDED_GRAS_BLOCK_HPP*/