summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/lib/runtime/gr_block.h
blob: c3898a5c020a5a4926b96e7087c43c23d75e6200 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
//
// Copyright 2012 Josh Blum
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

#ifndef INCLUDED_GNURADIO_GR_BLOCK_H
#define INCLUDED_GNURADIO_GR_BLOCK_H

#include <gr_core_api.h>
#include <gras/block.hpp>
#include <gr_io_signature.h>
#include <gr_types.h>
#include <gr_tags.h>
#include <string>
#include <deque>
#include <map>
#include <boost/foreach.hpp>
#include <gruel/thread.h>

namespace gnuradio
{
//! dummy entry, just here for legacy purposes
template <typename T>
boost::shared_ptr<T> get_initial_sptr(T *p)
{
    return boost::shared_ptr<T>(p);
}
}

struct GR_CORE_API gr_block : gras::Block
{

    gr_block(void);

    gr_block(
        const std::string &name,
        gr_io_signature_sptr input_signature,
        gr_io_signature_sptr output_signature
    );

    virtual ~gr_block(void);

    gr_io_signature_sptr input_signature(void) const;
    gr_io_signature_sptr output_signature(void) const;

    void set_input_signature(gr_io_signature_sptr sig);
    void set_output_signature(gr_io_signature_sptr sig);

    virtual bool check_topology(int ninputs, int noutputs);

    //! Overload me! I am the forecast
    virtual void forecast(int, std::vector<int> &);

    //! Return options for the work call
    enum
    {
        WORK_CALLED_PRODUCE = -2,
        WORK_DONE = -1
    };

    /*!
    * \brief compute output items from input items
    *
    * \param noutput_items	number of output items to write on each output stream
    * \param ninput_items	number of input items available on each input stream
    * \param input_items		vector of pointers to the input items, one entry per input stream
    * \param output_items	vector of pointers to the output items, one entry per output stream
    *
    * \returns number of items actually written to each output stream, or -1 on EOF.
    * It is OK to return a value less than noutput_items.  -1 <= return value <= noutput_items
    *
    * general_work must call consume or consume_each to indicate how many items
    * were consumed on each input stream.
    */
    virtual int general_work(
        int noutput_items,
        gr_vector_int &ninput_items,
        gr_vector_const_void_star &input_items,
        gr_vector_void_star &output_items
    );

    virtual bool start(void);
    virtual bool stop(void);

    //! Call during work to consume items
    void consume_each(const int how_many_items);

    void consume(const size_t i, const int how_many_items);

    void produce(const size_t o, const int how_many_items);

    //! Get absolute count of all items consumed on the given input port
    uint64_t nitems_read(const size_t which_input = 0);

    //! Get absolute count of all items produced on the given output port
    uint64_t nitems_written(const size_t which_output = 0);

    void add_item_tag(
        const size_t which_output, const gr_tag_t &tag
    );

    void add_item_tag(
        const size_t which_output,
        uint64_t abs_offset,
        const pmt::pmt_t &key,
        const pmt::pmt_t &value,
        const pmt::pmt_t &srcid=pmt::PMT_F
    );

    void get_tags_in_range(
        std::vector<gr_tag_t> &tags,
        const size_t which_input,
        uint64_t abs_start,
        uint64_t abs_end,
        const pmt::pmt_t &key = pmt::pmt_t()
    );

    void set_alignment(const size_t alignment);

    bool is_unaligned(void);

    size_t fixed_rate_noutput_to_ninput(const size_t noutput_items);

    size_t interpolation(void) const;

    void set_interpolation(const size_t);

    size_t decimation(void) const;

    void set_decimation(const size_t);

    int max_noutput_items(void) const;

    void set_max_noutput_items(int);

    void unset_max_noutput_items(void);

    bool is_set_max_noutput_items(void) const;

    /*******************************************************************
     * Deal with input and output port configuration
     ******************************************************************/

    unsigned history(void) const;

    void set_history(unsigned history);

    /*!
     * Enable fixed rate logic.
     * When enabled, relative rate is assumed to be set,
     * and forecast is automatically called.
     * Also, consume will be called automatically.
     */
    void set_fixed_rate(const bool fixed_rate);

    //! Get the fixed rate setting
    bool fixed_rate(void) const;

    /*!
     * The relative rate can be thought of as interpolation/decimation.
     * In other words, relative rate is the ratio of output items to input items.
     */
    void set_relative_rate(const double relative_rate);

    //! Get the relative rate setting
    double relative_rate(void) const;

    /*!
     * The output multiple setting controls work output buffer sizes.
     * Buffers will be number of items modulo rounted to the multiple.
     */
    void set_output_multiple(const size_t multiple);

    //! Get the output multiple setting
    size_t output_multiple(void) const;

    /*******************************************************************
     * Deal with tag handling and tag configuration
     ******************************************************************/

    enum tag_propagation_policy_t
    {
        TPP_DONT = 0,
        TPP_ALL_TO_ALL = 1,
        TPP_ONE_TO_ONE = 2
    };

    tag_propagation_policy_t tag_propagation_policy(void);

    void set_tag_propagation_policy(tag_propagation_policy_t p);

    ///////////// TODO //////////////////////
    void set_max_output_buffer(long){}
    void set_max_output_buffer(int, long){}
    long max_output_buffer(size_t){return 0;}
    void set_min_output_buffer(long){}
    void set_min_output_buffer(int, long){}
    long min_output_buffer(size_t){return 0;}

    ///////////// ALIAS stuff - is it used? //////////////////////
    std::string          d_symbol_alias;
    std::string          d_symbol_name;
    std::string symbol_name() const { return d_symbol_name; }
    bool alias_set() { return !d_symbol_alias.empty(); }
    std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
    pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
    void set_block_alias(std::string name){d_symbol_alias = name;}

    ///////////// MSG stuff not implemented //////////////////////
    typedef std::deque<pmt::pmt_t>    msg_queue_t;
    typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>    msg_queue_map_t;
    typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr;
    msg_queue_map_t msg_queue;
    pmt::pmt_t message_subscribers;

  typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
  typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
  d_msg_handlers_t d_msg_handlers;

    template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){}

    void message_port_register_in(pmt::pmt_t /*port_id*/){}
    void message_port_register_out(pmt::pmt_t /*port_id*/){}
    void message_port_pub(pmt::pmt_t /*port_id*/, pmt::pmt_t /*msg*/){}
    void message_port_sub(pmt::pmt_t /*port_id*/, pmt::pmt_t /*target*/){}
    void message_port_unsub(pmt::pmt_t /*port_id*/, pmt::pmt_t /*target*/){}

    virtual bool message_port_is_hier(pmt::pmt_t port_id) { (void) port_id; /*std::cout << "is_hier\n";*/ return false; }
    virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { (void) port_id; /*std::cout << "is_hier_in\n";*/ return false; }
    virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { (void) port_id; /*std::cout << "is_hier_out\n";*/ return false; }

    /*!
    * \brief Get input message port names.
    *
    * Returns the available input message ports for a block. The
    * return object is a PMT vector that is filled with PMT symbols.
    */
    pmt::pmt_t message_ports_in(){return pmt::PMT_NIL;}

    /*!
    * \brief Get output message port names.
    *
    * Returns the available output message ports for a block. The
    * return object is a PMT vector that is filled with PMT symbols.
    */
    pmt::pmt_t message_ports_out(){return pmt::PMT_NIL;}

    //! is the queue empty?
    bool empty_p(pmt::pmt_t which_port) { 
        if(msg_queue.find(which_port) == msg_queue.end())
          throw std::runtime_error("port does not exist!");
        return msg_queue[which_port].empty();
    }
    bool empty_p() { 
        bool rv = true;
        BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
        return rv;
    }

    //| Acquires and release the mutex
    void insert_tail( pmt::pmt_t /*which_port*/, pmt::pmt_t /*msg*/){}
    /*!
    * \returns returns pmt at head of queue or pmt_t() if empty.
    */
    pmt::pmt_t delete_head_nowait( pmt::pmt_t /*which_port*/){return pmt::PMT_NIL;}

    /*!
    * \returns returns pmt at head of queue or pmt_t() if empty.
    */
    pmt::pmt_t delete_head_blocking( pmt::pmt_t /*which_port*/){return pmt::PMT_NIL;}

    msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
        return msg_queue[which_port].begin();
    }

    void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
        msg_queue[which_port].erase(it);
    }

    virtual bool has_msg_port(pmt::pmt_t which_port){
        if(msg_queue.find(which_port) != msg_queue.end()){
            return true;
        }
        if(pmt::pmt_dict_has_key(message_subscribers, which_port)){
            return true;
        }
        return false;
    }

  /*!
   * \brief Tests if there is a handler attached to port \p which_port
   */
   bool has_msg_handler(pmt::pmt_t which_port) {
     return (d_msg_handlers.find(which_port) != d_msg_handlers.end());
   }

  /*
   * This function is called by the runtime system to dispatch messages.
   *
   * The thread-safety guarantees mentioned in set_msg_handler are implemented
   * by the callers of this method.
   */
  virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
  {
    // AA Update this
    if(has_msg_handler(which_port)) {  // Is there a handler?
      d_msg_handlers[which_port](msg); // Yes, invoke it.
    }
  }

  /*! Used by block's setters and work functions to make
   * setting/resetting of parameters thread-safe.
   *
   * Used by calling gruel::scoped_lock l(d_setlock);
   */ 
  gruel::mutex d_setlock;

  // ----------------------------------------------------------------------------
  // Functions to handle thread affinity
  std::vector<int> d_affinity;              // thread affinity proc. mask

  /*!
   * \brief Set the thread's affinity to processor core \p n.
   *
   * \param mask a vector of unsigned ints of the core numbers available to this block.
   */
  void set_processor_affinity(const std::vector<int> &mask){d_affinity=mask;}

  /*!
   * \brief Remove processor affinity to a specific core.
   */
  void unset_processor_affinity(){}

  /*!
   * \brief Get the current processor affinity.
   */
  std::vector<int> processor_affinity() { return d_affinity; }

    ///////////////// private vars //////////////////////

    gr_vector_int _work_ninput_items;
    gr_vector_int _fcast_ninput_items;
    gr_vector_const_void_star _work_input_items;
    gr_vector_void_star _work_output_items;
    ptrdiff_t _work_io_ptr_mask;
    size_t _output_multiple_items;
    double _relative_rate;
    bool _enable_fixed_rate;
    size_t _input_history_items;
    tag_propagation_policy_t _tag_prop_policy;
    size_t _interp, _decim;
    gr_io_signature_sptr _in_sig, _out_sig;

    ///////////////// the Block overloads //////////////////////

    //! implements work -> calls general work
    void work(const InputItems &, const OutputItems &);

    //! notifications of new topological commits
    void notify_topology(const size_t, const size_t);

    //! start notification
    void notify_active(void);

    //! stop notification
    void notify_inactive(void);

    //! implements tag_propagation_policy()
    virtual void propagate_tags(const size_t, const gras::TagIter &);

    void _update_input_reserve(void);

    gras::BufferQueueSptr input_buffer_allocator(const size_t, const gras::SBufferConfig &);
    gras::BufferQueueSptr output_buffer_allocator(const size_t, const gras::SBufferConfig &);

};

typedef boost::shared_ptr<gr_block> gr_block_sptr;

#endif /*INCLUDED_GNURADIO_GR_BLOCK_H*/