summaryrefslogtreecommitdiff
path: root/gnuradio-core/src
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-core/src')
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.cc11
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.h10
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.cc28
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.h19
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_executor.cc43
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc2
6 files changed, 77 insertions, 36 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index 73a86e38b..1fb4633e5 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -166,12 +166,17 @@ gr_block::get_tags_in_range(unsigned int which_output,
return d_detail->get_tags_in_range(which_output, start, end, key);
}
-void
-gr_block::handle_tags()
+int
+gr_block::tag_handling_method()
{
- d_detail->handle_tags();
+ return d_detail->tag_handling_method();
}
+void
+gr_block::set_tag_handling_method(int m)
+{
+ set_tag_handling_method(m);
+}
std::ostream&
operator << (std::ostream& os, const gr_block *m)
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 4d1d6f875..4b246884e 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -208,14 +208,8 @@ class gr_block : public gr_basic_block {
*/
uint64_t nitems_written(unsigned int which_output);
-
- /*!
- * \brief Function to move tags downstream
- *
- * The default behavior proxies to gr_block_detail, which just moves all tags
- * from input to output and flows them all downstream.
- */
- virtual void handle_tags();
+ int tag_handling_method();
+ void set_tag_handling_method(int m);
// ----------------------------------------------------------------------------
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
index b3d1a7194..b7dc52a60 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
@@ -42,7 +42,7 @@ gr_block_detail::gr_block_detail (unsigned int ninputs, unsigned int noutputs)
d_ninputs (ninputs), d_noutputs (noutputs),
d_input (ninputs), d_output (noutputs),
d_done (false),
- d_last_tag(0)
+ d_tag_handling_method(gr_block_detail::TAGS_ALL_TO_ALL)
{
s_ncurrently_allocated++;
}
@@ -203,18 +203,18 @@ gr_block_detail::get_tags_in_range(unsigned int which_input,
return found_items_by_key;
}
-void
-gr_block_detail::handle_tags()
-{
- for(unsigned int i = 0; i < d_ninputs; i++) {
- pmt_t tuple;
- while(d_input[i]->get_tag(d_last_tag, tuple)) {
- d_last_tag++;
- if(!sink_p()) {
- for(unsigned int o = 0; o < d_noutputs; o++) {
- d_output[o]->add_item_tag(tuple);
- }
- }
- }
+int
+gr_block_detail::tag_handling_method()
+{
+ return d_tag_handling_method;
+}
+
+void
+gr_block_detail::set_tag_handling_method(int m)
+{
+ if((m == TAGS_ONE_TO_ONE) && (ninputs() != noutputs())) {
+ throw std::invalid_argument ("gr_block_detail::set_handling method to ONE-TO-ONE requires ninputs == noutputs");
}
+
+ d_tag_handling_method = m;
}
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index 3a2b82190..711f59cf0 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -37,6 +37,13 @@
*/
class gr_block_detail {
public:
+
+ enum {
+ TAGS_NONE = 0,
+ TAGS_ALL_TO_ALL = 1,
+ TAGS_ONE_TO_ONE = 2
+ };
+
~gr_block_detail ();
int ninputs () const { return d_ninputs; }
@@ -153,13 +160,8 @@ class gr_block_detail {
uint64_t abs_end,
const pmt::pmt_t &key);
- /*!
- * \brief Default tag handler; moves all tags downstream
- *
- * Move all tags from input to output and flows them all downstream. Each input
- * stream's tags get appended to each output streams tags.
- */
- void handle_tags();
+ int tag_handling_method();
+ void set_tag_handling_method(int m);
gr_tpb_detail d_tpb; // used by thread-per-block scheduler
int d_produce_or;
@@ -172,8 +174,7 @@ class gr_block_detail {
std::vector<gr_buffer_reader_sptr> d_input;
std::vector<gr_buffer_sptr> d_output;
bool d_done;
-
- size_t d_last_tag; // keep track of which tags we've already received from upstream
+ int d_tag_handling_method;
gr_block_detail (unsigned int ninputs, unsigned int noutputs);
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
index 2c21a0b0f..f201c3937 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_executor.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
@@ -294,12 +294,55 @@ gr_block_executor::run_one_iteration()
for (int i = 0; i < d->noutputs (); i++)
d_output_items[i] = d->output(i)->write_pointer();
+ // store number of items consumed so far on in stream
+ std::vector<uint64_t> start_count;
+ for (int i = 0; i < d->ninputs(); i++)
+ start_count.push_back(d->nitems_read(i));
+
// Do the actual work of the block
int n = m->general_work (noutput_items, d_ninput_items,
d_input_items, d_output_items);
LOG(*d_log << " general_work: noutput_items = " << noutput_items
<< " result = " << n << std::endl);
+ // store number of items consumed after work
+ std::vector<uint64_t> end_count;
+ for (int i = 0; i < d->ninputs (); i++)
+ end_count.push_back(d->nitems_read(i));
+
+ // Move tags downstream
+ // if a sink, we don't need to move downstream;
+ // and do not bother if block uses TAGS_NONE attribute
+ if(!d->sink_p() && (d->tag_handling_method() != gr_block_detail::TAGS_NONE)) {
+
+ // every tag on every input propogates to everyone downstream
+ if(d->tag_handling_method() == gr_block_detail::TAGS_ALL_TO_ALL) {
+ for(int i = 0; i < d->ninputs(); i++) {
+ std::vector<pmt::pmt_t> tuple = d->get_tags_in_range(i, start_count[i], end_count[i]);
+ std::vector<pmt::pmt_t>::iterator t;
+ for(t = tuple.begin(); t != tuple.end(); t++ ) {
+ for(int o = 0; o < d->noutputs(); o++)
+ d->output(o)->add_item_tag(*t);
+ }
+ }
+ }
+
+ // tags from input i only go to output i
+ // this requires d->ninputs() == d->noutputs; this is checked when this
+ // type of tag-handling system is selected in gr_block_detail
+ else if(d->tag_handling_method() == gr_block_detail::TAGS_ONE_TO_ONE) {
+ for(int i = 0; i < d->ninputs(); i++) {
+ std::vector<pmt::pmt_t> tuple = d->get_tags_in_range(i, start_count[i], end_count[i]);
+ std::vector<pmt::pmt_t>::iterator t;
+ for(t = tuple.begin(); t != tuple.end(); t++ ) {
+ d->output(i)->add_item_tag(*t);
+ }
+ }
+ }
+
+ // else ; do nothing
+ }
+
if (n == gr_block::WORK_DONE)
goto were_done;
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index 6a84f4be8..03eef17d9 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -45,8 +45,6 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
while ((msg = d->d_tpb.delete_head_nowait()))
block->handle_msg(msg);
- block->handle_tags();
-
d->d_tpb.clear_changed();
s = d_exec.run_one_iteration();