summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.cc61
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.h5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_buffer.cc35
-rw-r--r--gnuradio-core/src/lib/runtime/gr_buffer.h36
4 files changed, 93 insertions, 44 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
index 2ab762a30..85b663ac6 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
@@ -158,65 +158,48 @@ gr_block_detail::add_item_tag(unsigned int which_output,
throw pmt::pmt_wrong_type("gr_block_detail::set_item_tag key", key);
}
else {
+ // build tag tuple
pmt::pmt_t nitem = pmt::pmt_from_uint64(abs_offset);
pmt::pmt_t tuple = pmt::pmt_make_tuple(nitem, srcid, key, value);
- d_item_tags.push_back(tuple);
+
+ // Add tag to gr_buffer's deque tags
+ d_output[which_output]->add_item_tag(tuple);
}
}
std::deque<pmt::pmt_t>
-gr_block_detail::get_tags_in_range(unsigned int which_output,
+gr_block_detail::get_tags_in_range(unsigned int which_input,
gr_uint64 abs_start,
gr_uint64 abs_end)
{
- std::deque<pmt::pmt_t> found_items;
- std::deque<pmt::pmt_t>::iterator itr = d_item_tags.begin();
-
- gr_uint64 item_time;
- while(itr != d_item_tags.end()) {
- item_time = pmt::pmt_to_uint64(pmt::pmt_tuple_ref(*itr, 0));
-
- // items are pushed onto list in sequential order; stop if we're past end
- if(item_time > abs_end) {
- break;
- }
-
- if((item_time >= abs_start) && (item_time <= abs_end)) {
- found_items.push_back(*itr);
- }
-
- itr++;
- }
-
- return found_items;
+ // get from gr_buffer_reader's deque of tags
+ return d_input[which_input]->get_tags_in_range(which_input,
+ abs_start,
+ abs_end);
}
std::deque<pmt::pmt_t>
-gr_block_detail::get_tags_in_range(unsigned int which_output,
+gr_block_detail::get_tags_in_range(unsigned int which_input,
gr_uint64 abs_start,
gr_uint64 abs_end,
const pmt::pmt_t &key)
{
- std::deque<pmt::pmt_t> found_items;
- std::deque<pmt::pmt_t>::iterator itr = d_item_tags.begin();
-
- gr_uint64 item_time;
- pmt::pmt_t itemkey;
- while(itr != d_item_tags.end()) {
- item_time = pmt::pmt_to_uint64(pmt::pmt_tuple_ref(*itr, 0));
+ std::deque<pmt::pmt_t> found_items, found_items_by_key;
- // items are pushed onto list in sequential order; stop if we're past end
- if(item_time > abs_end) {
- break;
- }
+ // get from gr_buffer_reader's deque of tags
+ found_items = d_input[which_input]->get_tags_in_range(which_input,
+ abs_start,
+ abs_end);
+ // Filter further by key name
+ pmt::pmt_t itemkey;
+ std::deque<pmt::pmt_t>::iterator itr;
+ for(itr = found_items.begin(); itr != found_items.end(); itr++) {
itemkey = pmt::pmt_tuple_ref(*itr, 2);
- if((item_time >= abs_start) && (item_time <= abs_end) && pmt::pmt_eqv(key, itemkey)) {
- found_items.push_back(*itr);
+ if(pmt::pmt_eqv(key, itemkey)) {
+ found_items_by_key.push_back(*itr);
}
-
- itr++;
}
- return found_items;
+ return found_items_by_key;
}
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index 547d7c22f..ada807f68 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -119,8 +119,8 @@ class gr_block_detail {
/*!
* \brief Given a [start,end), returns a deque copy of all tags in the range.
*
- * Pass-through function to gr_buffer to get a deque of tags in given range.
- * Range of counts is from start to end-1.
+ * Pass-through function to gr_buffer_reader to get a deque of tags
+ * in given range. Range of counts is from start to end-1.
*
* Tags are tuples of:
* (item count, source id, key, value)
@@ -165,7 +165,6 @@ class gr_block_detail {
unsigned int d_noutputs;
std::vector<gr_buffer_reader_sptr> d_input;
std::vector<gr_buffer_sptr> d_output;
- std::deque<pmt::pmt_t> d_item_tags;
bool d_done;
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.cc b/gnuradio-core/src/lib/runtime/gr_buffer.cc
index 89db99b69..da7866f48 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.cc
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.cc
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2004,2009 Free Software Foundation, Inc.
+ * Copyright 2004,2009,2010 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -216,6 +216,12 @@ gr_buffer::drop_reader (gr_buffer_reader *reader)
d_readers.erase (result);
}
+void
+gr_buffer::add_item_tag(const pmt::pmt_t &tag)
+{
+ d_item_tags.push_back(tag);
+}
+
long
gr_buffer_ncurrently_allocated ()
{
@@ -257,6 +263,33 @@ gr_buffer_reader::update_read_pointer (int nitems)
d_abs_read_offset += nitems;
}
+std::deque<pmt::pmt_t>
+gr_buffer_reader::get_tags_in_range(unsigned int which_output,
+ gr_uint64 abs_start,
+ gr_uint64 abs_end)
+{
+ std::deque<pmt::pmt_t> found_items;
+ std::deque<pmt::pmt_t>::iterator itr = d_item_tags.begin();
+
+ gr_uint64 item_time;
+ while(itr != d_item_tags.end()) {
+ item_time = pmt::pmt_to_uint64(pmt::pmt_tuple_ref(*itr, 0));
+
+ // items are pushed onto list in sequential order; stop if we're past end
+ if(item_time > abs_end) {
+ break;
+ }
+
+ if((item_time >= abs_start) && (item_time <= abs_end)) {
+ found_items.push_back(*itr);
+ }
+
+ itr++;
+ }
+
+ return found_items;
+}
+
long
gr_buffer_reader_ncurrently_allocated ()
{
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.h b/gnuradio-core/src/lib/runtime/gr_buffer.h
index 8af72741a..e50f638b5 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.h
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.h
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2004,2009 Free Software Foundation, Inc.
+ * Copyright 2004,2009,2010 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -26,6 +26,7 @@
#include <gr_runtime_types.h>
#include <boost/weak_ptr.hpp>
#include <gruel/thread.h>
+#include <gruel/pmt.h>
class gr_vmcircbuf;
@@ -90,6 +91,19 @@ class gr_buffer {
gr_uint64 nitems_written() { return d_abs_write_offset; }
+
+ /*!
+ * \brief Adds a new tag to the deque of tags on a given buffer.
+ *
+ * Adds a new tag to deque of tags on a given buffer. This takes the input
+ * parameters and builds a PMT tuple from it. It then calls
+ * gr_buffer::add_item_tag(pmt::pmt_t t), which appends the
+ * tag onto its deque of tags.
+ *
+ * \param tag a PMT tuple containing the new tag
+ */
+ void add_item_tag(const pmt::pmt_t &tag);
+
// -------------------------------------------------------------------------
private:
@@ -107,6 +121,8 @@ class gr_buffer {
std::vector<gr_buffer_reader *> d_readers;
boost::weak_ptr<gr_block> d_link; // block that writes to this buffer
+ std::deque<pmt::pmt_t> d_item_tags;
+
//
// The mutex protects d_write_index, d_abs_write_offset, d_done and the d_read_index's
// and d_abs_read_offset's in the buffer readers.
@@ -232,6 +248,23 @@ class gr_buffer_reader {
*/
gr_block_sptr link() { return gr_block_sptr(d_link); }
+
+ /*!
+ * \brief Given a [start,end), returns a deque copy of all tags in the range.
+ *
+ * Get a deque of tags in given range. Range of counts is from start to end-1.
+ *
+ * Tags are tuples of:
+ * (item count, source id, key, value)
+ *
+ * \param which_input an integer of which input stream to pull from
+ * \param abs_start a uint64 count of the start of the range of interest
+ * \param abs_end a uint64 count of the end of the range of interest
+ */
+ std::deque<pmt::pmt_t> get_tags_in_range(unsigned int which_input,
+ gr_uint64 abs_start,
+ gr_uint64 abs_end);
+
// -------------------------------------------------------------------------
private:
@@ -245,6 +278,7 @@ class gr_buffer_reader {
unsigned int d_read_index; // in items [0,d->buffer.d_bufsize)
gr_uint64 d_abs_read_offset; // num items seen since the start
boost::weak_ptr<gr_block> d_link; // block that reads via this buffer reader
+ std::deque<pmt::pmt_t> d_item_tags;
//! constructor is private. Use gr_buffer::add_reader to create instances
gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index, gr_block_sptr link);