summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/lib/runtime/gr_block.h
blob: 0783e86848f3f8f40809c7ad077f52f060797181 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
/* -*- c++ -*- */
/*
 * Copyright 2004,2007,2009,2010 Free Software Foundation, Inc.
 *
 * This file is part of GNU Radio
 *
 * GNU Radio is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3, or (at your option)
 * any later version.
 *
 * GNU Radio is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with GNU Radio; see the file COPYING.  If not, write to
 * the Free Software Foundation, Inc., 51 Franklin Street,
 * Boston, MA 02110-1301, USA.
 */

#ifndef INCLUDED_GR_BLOCK_H
#define INCLUDED_GR_BLOCK_H

#include <gr_core_api.h>
#include <gr_basic_block.h>
#include <gr_tags.h>

/*!
 * \brief The abstract base class for all 'terminal' processing blocks.
 * \ingroup base_blk
 *
 * A signal processing flow is constructed by creating a tree of
 * hierarchical blocks, which at any level may also contain terminal nodes
 * that actually implement signal processing functions. This is the base
 * class for all such leaf nodes.

 * Blocks have a set of input streams and output streams.  The
 * input_signature and output_signature define the number of input
 * streams and output streams respectively, and the type of the data
 * items in each stream.
 *
 * Although blocks may consume data on each input stream at a
 * different rate, all outputs streams must produce data at the same
 * rate.  That rate may be different from any of the input rates.
 *
 * User derived blocks override two methods, forecast and general_work,
 * to implement their signal processing behavior. forecast is called
 * by the system scheduler to determine how many items are required on
 * each input stream in order to produce a given number of output
 * items.
 *
 * general_work is called to perform the signal processing in the block.
 * It reads the input items and writes the output items.
 */

class GR_CORE_API gr_block : public gr_basic_block {

 public:

  //! Magic return values from general_work
  enum {
    WORK_CALLED_PRODUCE = -2,
    WORK_DONE = -1
  };

  enum tag_propagation_policy_t {
    TPP_DONT = 0,
    TPP_ALL_TO_ALL = 1,
    TPP_ONE_TO_ONE = 2
  };

  virtual ~gr_block ();

  /*!
   * Assume block computes y_i = f(x_i, x_i-1, x_i-2, x_i-3...)
   * History is the number of x_i's that are examined to produce one y_i.
   * This comes in handy for FIR filters, where we use history to
   * ensure that our input contains the appropriate "history" for the
   * filter.   History should be equal to the number of filter taps.
   */
  unsigned history () const { return d_history; }
  void  set_history (unsigned history) { d_history = history; }

  /*!
   * \brief Return true if this block has a fixed input to output rate.
   *
   * If true, then fixed_rate_in_to_out and fixed_rate_out_to_in may be called.
   */
  bool fixed_rate() const { return d_fixed_rate; }

  // ----------------------------------------------------------------
  //		override these to define your behavior
  // ----------------------------------------------------------------

  /*!
   * \brief  Estimate input requirements given output request
   *
   * \param noutput_items           number of output items to produce
   * \param ninput_items_required   number of input items required on each input stream
   *
   * Given a request to product \p noutput_items, estimate the number of
   * data items required on each input stream.  The estimate doesn't have
   * to be exact, but should be close.
   */
  virtual void forecast (int noutput_items,
			 gr_vector_int &ninput_items_required);

  /*!
   * \brief compute output items from input items
   *
   * \param noutput_items	number of output items to write on each output stream
   * \param ninput_items	number of input items available on each input stream
   * \param input_items		vector of pointers to the input items, one entry per input stream
   * \param output_items	vector of pointers to the output items, one entry per output stream
   *
   * \returns number of items actually written to each output stream, or -1 on EOF.
   * It is OK to return a value less than noutput_items.  -1 <= return value <= noutput_items
   *
   * general_work must call consume or consume_each to indicate how many items
   * were consumed on each input stream.
   */
  virtual int general_work (int noutput_items,
			    gr_vector_int &ninput_items,
			    gr_vector_const_void_star &input_items,
			    gr_vector_void_star &output_items);

  /*!
   * \brief Called to enable drivers, etc for i/o devices.
   *
   * This allows a block to enable an associated driver to begin
   * transfering data just before we start to execute the scheduler.
   * The end result is that this reduces latency in the pipeline when
   * dealing with audio devices, usrps, etc.
   */
  virtual bool start();

  /*!
   * \brief Called to disable drivers, etc for i/o devices.
   */
  virtual bool stop();

  // ----------------------------------------------------------------

  /*!
   * \brief Constrain the noutput_items argument passed to forecast and general_work
   *
   * set_output_multiple causes the scheduler to ensure that the noutput_items
   * argument passed to forecast and general_work will be an integer multiple
   * of \param multiple  The default value of output multiple is 1.
   */
  void set_output_multiple (int multiple);
  int  output_multiple () const { return d_output_multiple; }
  bool  output_multiple_set () const { return d_output_multiple_set; }

  /*!
   * \brief Constrains buffers to work on a set item alignment (for SIMD)
   *
   * set_alignment_multiple causes the scheduler to ensure that the noutput_items
   * argument passed to forecast and general_work will be an integer multiple
   * of \param multiple  The default value is 1.
   *
   * This control is similar to the output_multiple setting, except
   * that if the number of items passed to the block is less than the
   * output_multiple, this value is ignored and the block can produce
   * like normal. The d_unaligned value is set to the number of items
   * the block is off by. In the next call to general_work, the
   * noutput_items is set to d_unaligned or less until
   * d_unaligned==0. The buffers are now aligned again and the aligned
   * calls can be performed again.
   */
  void set_alignment (int multiple);
  int  alignment () const { return d_output_multiple; }

  void set_unaligned (int na);
  int unaligned () const { return d_unaligned; }
  void set_is_unaligned (bool u);
  bool is_unaligned () const { return d_is_unaligned; }

  /*!
   * \brief Tell the scheduler \p how_many_items of input stream \p which_input were consumed.
   */
  void consume (int which_input, int how_many_items);

  /*!
   * \brief Tell the scheduler \p how_many_items were consumed on each input stream.
   */
  void consume_each (int how_many_items);

  /*!
   * \brief Tell the scheduler \p how_many_items were produced on output stream \p which_output.
   *
   * If the block's general_work method calls produce, \p general_work must return WORK_CALLED_PRODUCE.
   */
  void produce (int which_output, int how_many_items);

  /*!
   * \brief Set the approximate output rate / input rate
   *
   * Provide a hint to the buffer allocator and scheduler.
   * The default relative_rate is 1.0
   *
   * decimators have relative_rates < 1.0
   * interpolators have relative_rates > 1.0
   */
  void  set_relative_rate (double relative_rate);

  /*!
   * \brief return the approximate output rate / input rate
   */
  double relative_rate () const	{ return d_relative_rate; }

  /*
   * The following two methods provide special case info to the
   * scheduler in the event that a block has a fixed input to output
   * ratio.  gr_sync_block, gr_sync_decimator and gr_sync_interpolator
   * override these.  If you're fixed rate, subclass one of those.
   */
  /*!
   * \brief Given ninput samples, return number of output samples that will be produced.
   * N.B. this is only defined if fixed_rate returns true.
   * Generally speaking, you don't need to override this.
   */
  virtual int fixed_rate_ninput_to_noutput(int ninput);

  /*!
   * \brief Given noutput samples, return number of input samples required to produce noutput.
   * N.B. this is only defined if fixed_rate returns true.
   * Generally speaking, you don't need to override this.
   */
  virtual int fixed_rate_noutput_to_ninput(int noutput);

  /*!
   * \brief Return the number of items read on input stream which_input
   */
  uint64_t nitems_read(unsigned int which_input);

  /*!
   * \brief  Return the number of items written on output stream which_output
   */
  uint64_t nitems_written(unsigned int which_output);

  /*!
   * \brief Asks for the policy used by the scheduler to moved tags downstream.
   */
  tag_propagation_policy_t tag_propagation_policy();

  /*!
   * \brief Set the policy by the scheduler to determine how tags are moved downstream.
   */
  void set_tag_propagation_policy(tag_propagation_policy_t p);

  /*!
   * \brief Return the minimum number of output items this block can
   * produce during a call to work.
   *
   * Should be 0 for most blocks.  Useful if we're dealing with packets and
   * the block produces one packet per call to work.
  */
  int min_noutput_items() const { return d_min_noutput_items; }

  /*!
   * \brief Set the minimum number of output items this block can
   * produce during a call to work.
   *
   * \param m the minimum noutput_items this block can produce.
   */
  void set_min_noutput_items(int m) { d_min_noutput_items = m; }

  /*!
   * \brief Return the maximum number of output items this block will
   * handle during a call to work.
   */
  int max_noutput_items();

  /*!
   * \brief Set the maximum number of output items this block will
   * handle during a call to work.
   *
   * \param m the maximum noutput_items this block will handle.
   */
  void set_max_noutput_items(int m);

  /*!
   * \brief Clear the switch for using the max_noutput_items value of this block.
   *
   * When is_set_max_noutput_items() returns 'true', the scheduler
   * will use the value returned by max_noutput_items() to limit the
   * size of the number of items possible for this block's work
   * function. If is_set_max_notput_items() returns 'false', then the
   * scheduler ignores the internal value and uses the value set
   * globally in the top_block.
   *
   * Use this value to clear the 'is_set' flag so the scheduler will
   * ignore this. Use the set_max_noutput_items(m) call to both set a
   * new value for max_noutput_items and to reenable its use in the
   * scheduler.
   */
  void unset_max_noutput_items();

  /*!
   * \brief Ask the block if the flag is or is not set to use the
   * internal value of max_noutput_items during a call to work.
   */
  bool is_set_max_noutput_items();

  /*
   * Used to expand the vectors that hold the min/max buffer sizes.
   *
   * Specifically, when -1 is used, the vectors are just initialized
   * with 1 value; this is used by the flat_flowgraph to expand when
   * required to add a new value for new ports on these blocks.
   */
  void expand_minmax_buffer(int port) {
    if((size_t)port >= d_max_output_buffer.size())
      set_max_output_buffer(port, -1);
    if((size_t)port >= d_min_output_buffer.size())
      set_min_output_buffer(port, -1);
  }

  /*!
   * \brief Returns max buffer size on output port \p i.
   */
  long max_output_buffer(size_t i) {
    if(i >= d_max_output_buffer.size())
      throw std::invalid_argument("gr_basic_block::max_output_buffer: port out of range.");
    return d_max_output_buffer[i];
  }

  /*!
   * \brief Sets max buffer size on all output ports.
   */
  void set_max_output_buffer(long max_output_buffer) { 
    for(int i = 0; i < output_signature()->max_streams(); i++) {
      set_max_output_buffer(i, max_output_buffer);
    }
  }

  /*!
   * \brief Sets max buffer size on output port \p port.
   */
  void set_max_output_buffer(int port, long max_output_buffer) {
    if((size_t)port >= d_max_output_buffer.size())
      d_max_output_buffer.push_back(max_output_buffer);
    else
      d_max_output_buffer[port] = max_output_buffer; 
  }

  /*!
   * \brief Returns min buffer size on output port \p i.
   */
  long min_output_buffer(size_t i) {
    if(i >= d_min_output_buffer.size())
      throw std::invalid_argument("gr_basic_block::min_output_buffer: port out of range.");
    return d_min_output_buffer[i];
  }

  /*!
   * \brief Sets min buffer size on all output ports.
   */
  void set_min_output_buffer(long min_output_buffer) {
    for(int i=0; i<output_signature()->max_streams(); i++) {
      set_min_output_buffer(i, min_output_buffer);
    }
  }

  /*!
   * \brief Sets min buffer size on output port \p port.
   */
  void set_min_output_buffer(int port, long min_output_buffer) {
    if((size_t)port >= d_min_output_buffer.size())
      d_min_output_buffer.push_back(min_output_buffer);
    else
      d_min_output_buffer[port] = min_output_buffer; 
  }

  // --------------- Performance counter functions -------------

  /*!
   * \brief Gets average noutput_items performance counter.
   */
  float pc_noutput_items();

  /*!
   * \brief Gets variance of noutput_items performance counter.
   */
  float pc_noutput_items_var();

  /*!
   * \brief Gets average num items produced performance counter.
   */
  float pc_nproduced();

  /*!
   * \brief Gets variance of  num items produced performance counter.
   */
  float pc_nproduced_var();

  /*!
   * \brief Gets average fullness of \p which input buffer.
   */
  float pc_input_buffers_full(int which);

  /*!
   * \brief Gets variance of fullness of \p which input buffer.
   */
  float pc_input_buffers_full_var(int which);

  /*!
   * \brief Gets average fullness of all input buffers.
   */
  std::vector<float> pc_input_buffers_full();

  /*!
   * \brief Gets variance of fullness of all input buffers.
   */
  std::vector<float> pc_input_buffers_full_var();

  /*!
   * \brief Gets average fullness of \p which input buffer.
   */
  float pc_output_buffers_full(int which);

  /*!
   * \brief Gets variance of fullness of \p which input buffer.
   */
  float pc_output_buffers_full_var(int which);

  /*!
   * \brief Gets average fullness of all output buffers.
   */
  std::vector<float> pc_output_buffers_full();
  /*!
   * \brief Gets variance of fullness of all output buffers.
   */
  std::vector<float> pc_output_buffers_full_var();

  /*!
   * \brief Gets average clock cycles spent in work.
   */
  float pc_work_time();

  /*!
   * \brief Gets average clock cycles spent in work.
   */
  float pc_work_time_var();

  /*!
   * \brief Resets the performance counters
   */
  void reset_perf_counters();


  // ----------------------------------------------------------------------------
  // Functions to handle thread affinity

  /*!
   * \brief Set the thread's affinity to processor core \p n.
   *
   * \param mask a vector of unsigned ints of the core numbers available to this block.
   */
  void set_processor_affinity(const std::vector<unsigned int> &mask);

  /*!
   * \brief Remove processor affinity to a specific core.
   */
  void unset_processor_affinity();

  /*!
   * \brief Get the current processor affinity.
   */
  std::vector<unsigned int> processor_affinity() { return d_affinity; }

  // ----------------------------------------------------------------------------

 private:

  int                   d_output_multiple;
  bool                  d_output_multiple_set;
  int                   d_unaligned;
  bool                  d_is_unaligned;
  double                d_relative_rate;	// approx output_rate / input_rate
  gr_block_detail_sptr	d_detail;		// implementation details
  unsigned              d_history;
  bool                  d_fixed_rate;
  bool                  d_max_noutput_items_set;     // if d_max_noutput_items is valid
  int                   d_max_noutput_items;         // value of max_noutput_items for this block
  int                   d_min_noutput_items;
  tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags downstream
  std::vector<unsigned int> d_affinity;              // thread affinity proc. mask

 protected:
  gr_block (void){} //allows pure virtual interface sub-classes
  gr_block (const std::string &name,
            gr_io_signature_sptr input_signature,
            gr_io_signature_sptr output_signature);

  void set_fixed_rate(bool fixed_rate){ d_fixed_rate = fixed_rate; }


  /*!
   * \brief  Adds a new tag onto the given output buffer.
   *
   * \param which_output an integer of which output stream to attach the tag
   * \param abs_offset   a uint64 number of the absolute item number
   *                     assicated with the tag. Can get from nitems_written.
   * \param key          the tag key as a PMT symbol
   * \param value        any PMT holding any value for the given key
   * \param srcid        optional source ID specifier; defaults to PMT_F
   */
  inline void add_item_tag(unsigned int which_output,
		    uint64_t abs_offset,
		    const pmt::pmt_t &key,
		    const pmt::pmt_t &value,
		    const pmt::pmt_t &srcid=pmt::PMT_F)
    {
        gr_tag_t tag;
        tag.offset = abs_offset;
        tag.key = key;
        tag.value = value;
        tag.srcid = srcid;
        this->add_item_tag(which_output, tag);
    }

 /*!
   * \brief  Adds a new tag onto the given output buffer.
   *
   * \param which_output an integer of which output stream to attach the tag
   * \param tag the tag object to add
   */
  void add_item_tag(unsigned int which_output, const gr_tag_t &tag);

  /*!
   * \brief  Removes a tag from the given input buffer.
   *
   * \param which_input an integer of which input stream to remove the tag from
   * \param abs_offset   a uint64 number of the absolute item number
   *                     assicated with the tag. Can get from nitems_written.
   * \param key          the tag key as a PMT symbol
   * \param value        any PMT holding any value for the given key
   * \param srcid        optional source ID specifier; defaults to PMT_F
   *
   * If no such tag is found, does nothing.
   */
  inline void remove_item_tag(unsigned int which_input,
		    uint64_t abs_offset,
		    const pmt::pmt_t &key,
		    const pmt::pmt_t &value,
		    const pmt::pmt_t &srcid=pmt::PMT_F)
  {
      gr_tag_t tag;
      tag.offset = abs_offset;
      tag.key = key;
      tag.value = value;
      tag.srcid = srcid;
      this->remove_item_tag(which_input, tag);
  }

 /*!
   * \brief  Removes a tag from the given input buffer.
   *
   * If no such tag is found, does nothing.
   *
   * \param which_input an integer of which input stream to remove the tag from
   * \param tag the tag object to remove
   */
  void remove_item_tag(unsigned int which_input, const gr_tag_t &tag);

  /*!
   * \brief Given a [start,end), returns a vector of all tags in the range.
   *
   * Range of counts is from start to end-1.
   *
   * Tags are tuples of:
   *      (item count, source id, key, value)
   *
   * \param v            a vector reference to return tags into
   * \param which_input  an integer of which input stream to pull from
   * \param abs_start    a uint64 count of the start of the range of interest
   * \param abs_end      a uint64 count of the end of the range of interest
   */
  void get_tags_in_range(std::vector<gr_tag_t> &v,
			 unsigned int which_input,
			 uint64_t abs_start,
			 uint64_t abs_end);

  /*!
   * \brief Given a [start,end), returns a vector of all tags in the range
   * with a given key.
   *
   * Range of counts is from start to end-1.
   *
   * Tags are tuples of:
   *      (item count, source id, key, value)
   *
   * \param v            a vector reference to return tags into
   * \param which_input  an integer of which input stream to pull from
   * \param abs_start    a uint64 count of the start of the range of interest
   * \param abs_end      a uint64 count of the end of the range of interest
   * \param key          a PMT symbol key to filter only tags of this key
   */
  void get_tags_in_range(std::vector<gr_tag_t> &v,
			 unsigned int which_input,
			 uint64_t abs_start,
			 uint64_t abs_end,
			 const pmt::pmt_t &key);

  std::vector<long>    d_max_output_buffer;
  std::vector<long>    d_min_output_buffer;

  /*! Used by block's setters and work functions to make
   * setting/resetting of parameters thread-safe.
   *
   * Used by calling gruel::scoped_lock l(d_setlock);
   */ 
  gruel::mutex d_setlock;

  // These are really only for internal use, but leaving them public avoids
  // having to work up an ever-varying list of friend GR_CORE_APIs

 public:
  gr_block_detail_sptr detail () const { return d_detail; }
  void set_detail (gr_block_detail_sptr detail) { d_detail = detail; }
};

typedef std::vector<gr_block_sptr> gr_block_vector_t;
typedef std::vector<gr_block_sptr>::iterator gr_block_viter_t;

inline gr_block_sptr cast_to_block_sptr(gr_basic_block_sptr p)
{
  return boost::dynamic_pointer_cast<gr_block, gr_basic_block>(p);
}


std::ostream&
operator << (std::ostream& os, const gr_block *m);

#endif /* INCLUDED_GR_BLOCK_H */