summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/lib/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-core/src/lib/runtime')
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h25
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.cc107
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.h115
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.i14
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.cc120
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.h49
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_executor.cc38
-rw-r--r--gnuradio-core/src/lib/runtime/gr_buffer.cc12
-rw-r--r--gnuradio-core/src/lib/runtime/gr_buffer.h9
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tags.h5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc7
-rw-r--r--gnuradio-core/src/lib/runtime/gr_types.h1
-rw-r--r--gnuradio-core/src/lib/runtime/qa_gr_top_block.cc21
-rw-r--r--gnuradio-core/src/lib/runtime/qa_gr_top_block.h7
14 files changed, 497 insertions, 33 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index 9cc2ad775..b4935d8ac 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -54,18 +54,6 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_
typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
private:
- /*
- * This function is called by the runtime system to dispatch messages.
- *
- * The thread-safety guarantees mentioned in set_msg_handler are implemented
- * by the callers of this method.
- */
- void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
- {
- // AA Update this
- if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
- d_msg_handlers[which_port](msg); // Yes, invoke it.
- };
//msg_handler_t d_msg_handler;
typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
@@ -117,6 +105,19 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_
*/
void set_color(vcolor color) { d_color = color; }
vcolor color() const { return d_color; }
+
+ /*
+ * This function is called by the runtime system to dispatch messages.
+ *
+ * The thread-safety guarantees mentioned in set_msg_handler are implemented
+ * by the callers of this method.
+ */
+ virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
+ {
+ // AA Update this
+ if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
+ d_msg_handlers[which_port](msg); // Yes, invoke it.
+ };
// Message passing interface
pmt::pmt_t message_subscribers;
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index 43aebf0bf..f52f7a6ba 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -31,8 +31,8 @@
#include <gr_block_registry.h>
gr_block::gr_block (const std::string &name,
- gr_io_signature_sptr input_signature,
- gr_io_signature_sptr output_signature)
+ gr_io_signature_sptr input_signature,
+ gr_io_signature_sptr output_signature)
: gr_basic_block(name, input_signature, output_signature),
d_output_multiple (1),
d_output_multiple_set(false),
@@ -43,6 +43,7 @@ gr_block::gr_block (const std::string &name,
d_fixed_rate(false),
d_max_noutput_items_set(false),
d_max_noutput_items(0),
+ d_min_noutput_items(0),
d_tag_propagation_policy(TPP_ALL_TO_ALL),
d_max_output_buffer(std::max(output_signature->max_streams(),1), -1),
d_min_output_buffer(std::max(output_signature->max_streams(),1), -1)
@@ -187,6 +188,13 @@ gr_block::add_item_tag(unsigned int which_output,
}
void
+gr_block::remove_item_tag(unsigned int which_input,
+ const gr_tag_t &tag)
+{
+ d_detail->remove_item_tag(which_input, tag);
+}
+
+void
gr_block::get_tags_in_range(std::vector<gr_tag_t> &v,
unsigned int which_output,
uint64_t start, uint64_t end)
@@ -244,6 +252,101 @@ gr_block::is_set_max_noutput_items()
return d_max_noutput_items_set;
}
+void
+gr_block::set_processor_affinity(const std::vector<unsigned int> &mask)
+{
+ d_affinity = mask;
+ if(d_detail) {
+ d_detail->set_processor_affinity(d_affinity);
+ }
+}
+
+void
+gr_block::unset_processor_affinity()
+{
+ d_affinity.clear();
+ if(d_detail) {
+ d_detail->unset_processor_affinity();
+ }
+}
+
+float
+gr_block::pc_noutput_items()
+{
+ if(d_detail) {
+ return d_detail->pc_noutput_items();
+ }
+ else {
+ return 0;
+ }
+}
+
+float
+gr_block::pc_nproduced()
+{
+ if(d_detail) {
+ return d_detail->pc_nproduced();
+ }
+ else {
+ return 0;
+ }
+}
+
+float
+gr_block::pc_input_buffers_full(int which)
+{
+ if(d_detail) {
+ return d_detail->pc_input_buffers_full(static_cast<size_t>(which));
+ }
+ else {
+ return 0;
+ }
+}
+
+std::vector<float>
+gr_block::pc_input_buffers_full()
+{
+ if(d_detail) {
+ return d_detail->pc_input_buffers_full();
+ }
+ else {
+ return std::vector<float>(1,0);
+ }
+}
+
+float
+gr_block::pc_output_buffers_full(int which)
+{
+ if(d_detail) {
+ return d_detail->pc_output_buffers_full(static_cast<size_t>(which));
+ }
+ else {
+ return 0;
+ }
+}
+
+std::vector<float>
+gr_block::pc_output_buffers_full()
+{
+ if(d_detail) {
+ return d_detail->pc_output_buffers_full();
+ }
+ else {
+ return std::vector<float>(1,0);
+ }
+}
+
+float
+gr_block::pc_work_time()
+{
+ if(d_detail) {
+ return d_detail->pc_work_time();
+ }
+ else {
+ return 0;
+ }
+}
+
std::ostream&
operator << (std::ostream& os, const gr_block *m)
{
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 57e3fda90..bd9ff42df 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -252,13 +252,30 @@ class GR_CORE_API gr_block : public gr_basic_block {
void set_tag_propagation_policy(tag_propagation_policy_t p);
/*!
+ * \brief Return the minimum number of output items this block can
+ * produce during a call to work.
+ *
+ * Should be 0 for most blocks. Useful if we're dealing with packets and
+ * the block produces one packet per call to work.
+ */
+ int min_noutput_items() const { return d_min_noutput_items; }
+
+ /*!
+ * \brief Set the minimum number of output items this block can
+ * produce during a call to work.
+ *
+ * \param m the minimum noutput_items this block can produce.
+ */
+ void set_min_noutput_items(int m) { d_min_noutput_items = m; }
+
+ /*!
* \brief Return the maximum number of output items this block will
* handle during a call to work.
*/
int max_noutput_items();
/*!
- * \brief Set the maximum number of ouput items htis block will
+ * \brief Set the maximum number of output items this block will
* handle during a call to work.
*
* \param m the maximum noutput_items this block will handle.
@@ -358,6 +375,64 @@ class GR_CORE_API gr_block : public gr_basic_block {
d_min_output_buffer[port] = min_output_buffer;
}
+ // --------------- Performance counter functions -------------
+
+ /*!
+ * \brief Gets average noutput_items performance counter.
+ */
+ float pc_noutput_items();
+
+ /*!
+ * \brief Gets average num items produced performance counter.
+ */
+ float pc_nproduced();
+
+ /*!
+ * \brief Gets average average fullness of \p which input buffer.
+ */
+ float pc_input_buffers_full(int which);
+
+ /*!
+ * \brief Gets average fullness of all input buffers.
+ */
+ std::vector<float> pc_input_buffers_full();
+
+ /*!
+ * \brief Gets average fullness of \p which input buffer.
+ */
+ float pc_output_buffers_full(int which);
+
+ /*!
+ * \brief Gets average fullness of all output buffers.
+ */
+ std::vector<float> pc_output_buffers_full();
+
+ /*!
+ * \brief Gets average clock cycles spent in work.
+ */
+ float pc_work_time();
+
+
+ // ----------------------------------------------------------------------------
+ // Functions to handle thread affinity
+
+ /*!
+ * \brief Set the thread's affinity to processor core \p n.
+ *
+ * \param mask a vector of unsigned ints of the core numbers available to this block.
+ */
+ void set_processor_affinity(const std::vector<unsigned int> &mask);
+
+ /*!
+ * \brief Remove processor affinity to a specific core.
+ */
+ void unset_processor_affinity();
+
+ /*!
+ * \brief Get the current processor affinity.
+ */
+ std::vector<unsigned int> processor_affinity() { return d_affinity; }
+
// ----------------------------------------------------------------------------
private:
@@ -370,9 +445,11 @@ class GR_CORE_API gr_block : public gr_basic_block {
gr_block_detail_sptr d_detail; // implementation details
unsigned d_history;
bool d_fixed_rate;
+ int d_min_noutput_items;
bool d_max_noutput_items_set; // if d_max_noutput_items is valid
int d_max_noutput_items; // value of max_noutput_items for this block
tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags downstream
+ std::vector<unsigned int> d_affinity; // thread affinity proc. mask
protected:
gr_block (void){} //allows pure virtual interface sub-classes
@@ -416,6 +493,42 @@ class GR_CORE_API gr_block : public gr_basic_block {
void add_item_tag(unsigned int which_output, const gr_tag_t &tag);
/*!
+ * \brief Removes a tag from the given input buffer.
+ *
+ * \param which_input an integer of which input stream to remove the tag from
+ * \param abs_offset a uint64 number of the absolute item number
+ * assicated with the tag. Can get from nitems_written.
+ * \param key the tag key as a PMT symbol
+ * \param value any PMT holding any value for the given key
+ * \param srcid optional source ID specifier; defaults to PMT_F
+ *
+ * If no such tag is found, does nothing.
+ */
+ inline void remove_item_tag(unsigned int which_input,
+ uint64_t abs_offset,
+ const pmt::pmt_t &key,
+ const pmt::pmt_t &value,
+ const pmt::pmt_t &srcid=pmt::PMT_F)
+ {
+ gr_tag_t tag;
+ tag.offset = abs_offset;
+ tag.key = key;
+ tag.value = value;
+ tag.srcid = srcid;
+ this->remove_item_tag(which_input, tag);
+ }
+
+ /*!
+ * \brief Removes a tag from the given input buffer.
+ *
+ * If no such tag is found, does nothing.
+ *
+ * \param which_input an integer of which input stream to remove the tag from
+ * \param tag the tag object to remove
+ */
+ void remove_item_tag(unsigned int which_input, const gr_tag_t &tag);
+
+ /*!
* \brief Given a [start,end), returns a vector of all tags in the range.
*
* Range of counts is from start to end-1.
diff --git a/gnuradio-core/src/lib/runtime/gr_block.i b/gnuradio-core/src/lib/runtime/gr_block.i
index db6c1d04a..c016f2c28 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.i
+++ b/gnuradio-core/src/lib/runtime/gr_block.i
@@ -66,6 +66,20 @@ class gr_block : public gr_basic_block {
void set_min_output_buffer(long min_output_buffer);
void set_min_output_buffer(int port, long min_output_buffer);
+ // Methods to access performance counters
+ float pc_noutput_items();
+ float pc_nproduced();
+ float pc_input_buffers_full(int which);
+ std::vector<float> pc_input_buffers_full();
+ float pc_output_buffers_full(int which);
+ std::vector<float> pc_output_buffers_full();
+ float pc_work_time();
+
+ // Methods to manage processor affinity.
+ void set_processor_affinity(const gr_vector_uint &mask);
+ void unset_processor_affinity();
+ gr_vector_uint processor_affinity();
+
// internal use
gr_block_detail_sptr detail () const { return d_detail; }
void set_detail (gr_block_detail_sptr detail) { d_detail = detail; }
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
index 337c9518e..ff20e0e85 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
@@ -26,6 +26,7 @@
#include <gr_block_detail.h>
#include <gr_buffer.h>
+#include <iostream>
using namespace pmt;
@@ -41,7 +42,11 @@ gr_block_detail::gr_block_detail (unsigned int ninputs, unsigned int noutputs)
: d_produce_or(0),
d_ninputs (ninputs), d_noutputs (noutputs),
d_input (ninputs), d_output (noutputs),
- d_done (false)
+ d_done (false),
+ d_avg_noutput_items(0), d_avg_nproduced(0),
+ d_avg_input_buffers_full(ninputs, 0),
+ d_avg_output_buffers_full(noutputs, 0),
+ d_avg_work_time(0)
{
s_ncurrently_allocated++;
}
@@ -156,6 +161,18 @@ gr_block_detail::add_item_tag(unsigned int which_output, const gr_tag_t &tag)
}
void
+gr_block_detail::remove_item_tag(unsigned int which_input, const gr_tag_t &tag)
+{
+ if(!pmt_is_symbol(tag.key)) {
+ throw pmt_wrong_type("gr_block_detail::add_item_tag key", tag.key);
+ }
+ else {
+ // Add tag to gr_buffer's deque tags
+ d_input[which_input]->buffer()->remove_item_tag(tag);
+ }
+}
+
+void
gr_block_detail::get_tags_in_range(std::vector<gr_tag_t> &v,
unsigned int which_input,
uint64_t abs_start,
@@ -189,3 +206,104 @@ gr_block_detail::get_tags_in_range(std::vector<gr_tag_t> &v,
}
}
}
+
+void
+gr_block_detail::set_processor_affinity(const std::vector<unsigned int> &mask)
+{
+ if(threaded) {
+ try {
+ gruel::thread_bind_to_processor(thread, mask);
+ }
+ catch (std::runtime_error e) {
+ std::cerr << "set_processor_affinity: invalid mask." << std::endl;;
+ }
+ }
+}
+
+void
+gr_block_detail::unset_processor_affinity()
+{
+ if(threaded) {
+ gruel::thread_unbind(thread);
+ }
+}
+
+void
+gr_block_detail::start_perf_counters()
+{
+ d_start_of_work = gruel::high_res_timer_now();
+}
+
+void
+gr_block_detail::stop_perf_counters(int noutput_items, int nproduced)
+{
+ float alpha = 0.05;
+ float beta = 1.0-alpha;
+
+ d_end_of_work = gruel::high_res_timer_now();
+ gruel::high_res_timer_type diff = d_end_of_work - d_start_of_work;
+ d_avg_work_time = beta*d_avg_work_time + alpha*diff;
+
+ d_avg_nproduced = beta*d_avg_nproduced + alpha*nproduced;
+ d_avg_noutput_items = beta*d_avg_noutput_items + alpha*noutput_items;
+
+ for(size_t i=0; i < d_input.size(); i++) {
+ float pfull = static_cast<float>(d_input[i]->items_available()) /
+ static_cast<float>(d_input[i]->max_possible_items_available());
+ d_avg_input_buffers_full[i] = beta*d_avg_input_buffers_full[i] + alpha*pfull;
+ }
+
+ for(size_t i=0; i < d_output.size(); i++) {
+ float pfull = 1.0f - static_cast<float>(d_output[i]->space_available()) /
+ static_cast<float>(d_output[i]->bufsize());
+ d_avg_output_buffers_full[i] = beta*d_avg_output_buffers_full[i] + alpha*pfull;
+ }
+}
+
+float
+gr_block_detail::pc_noutput_items()
+{
+ return d_avg_noutput_items;
+}
+
+float
+gr_block_detail::pc_nproduced()
+{
+ return d_avg_nproduced;
+}
+
+float
+gr_block_detail::pc_input_buffers_full(size_t which)
+{
+ if(which < d_avg_input_buffers_full.size())
+ return d_avg_input_buffers_full[which];
+ else
+ return 0;
+}
+
+std::vector<float>
+gr_block_detail::pc_input_buffers_full()
+{
+ return d_avg_input_buffers_full;
+}
+
+float
+gr_block_detail::pc_output_buffers_full(size_t which)
+{
+ if(which < d_avg_output_buffers_full.size())
+ return d_avg_output_buffers_full[which];
+ else
+ return 0;
+}
+
+std::vector<float>
+gr_block_detail::pc_output_buffers_full()
+{
+ return d_avg_output_buffers_full;
+}
+
+float
+gr_block_detail::pc_work_time()
+{
+ return d_avg_work_time;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index 16d9f4d42..a8ed8da90 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -27,6 +27,7 @@
#include <gr_runtime_types.h>
#include <gr_tpb_detail.h>
#include <gr_tags.h>
+#include <gruel/high_res_timer.h>
#include <stdexcept>
/*!
@@ -95,8 +96,7 @@ class GR_CORE_API gr_block_detail {
/*!
* \brief Adds a new tag to the given output stream.
*
- * This takes the input parameters and builds a PMT tuple
- * from it. It then calls gr_buffer::add_item_tag(pmt::pmt_t t),
+ * Calls gr_buffer::add_item_tag(),
* which appends the tag onto its deque.
*
* \param which_output an integer of which output stream to attach the tag
@@ -105,6 +105,16 @@ class GR_CORE_API gr_block_detail {
void add_item_tag(unsigned int which_output, const gr_tag_t &tag);
/*!
+ * \brief Removes a tag from the given input stream.
+ *
+ * Calls gr_buffer::remove_item_tag(), which removes the tag from its deque.
+ *
+ * \param which_input an integer of which input stream to remove the tag from
+ * \param tag the tag object to add
+ */
+ void remove_item_tag(unsigned int which_input, const gr_tag_t &tag);
+
+ /*!
* \brief Given a [start,end), returns a vector of all tags in the range.
*
* Pass-through function to gr_buffer_reader to get a vector of tags
@@ -146,6 +156,33 @@ class GR_CORE_API gr_block_detail {
uint64_t abs_end,
const pmt::pmt_t &key);
+ /*!
+ * \brief Set core affinity of block to the cores in the vector mask.
+ *
+ * \param mask a vector of unsigned ints of the core numbers available to this block.
+ */
+ void set_processor_affinity(const std::vector<unsigned int> &mask);
+
+ /*!
+ * \brief Unset core affinity.
+ */
+ void unset_processor_affinity();
+
+ bool threaded; // set if thread is currently running.
+ gruel::gr_thread_t thread; // portable thread handle
+
+ void start_perf_counters();
+ void stop_perf_counters(int noutput_items, int nproduced);
+
+ // Calls to get performance counter items
+ float pc_noutput_items();
+ float pc_nproduced();
+ float pc_input_buffers_full(size_t which);
+ std::vector<float> pc_input_buffers_full();
+ float pc_output_buffers_full(size_t which);
+ std::vector<float> pc_output_buffers_full();
+ float pc_work_time();
+
gr_tpb_detail d_tpb; // used by thread-per-block scheduler
int d_produce_or;
@@ -158,6 +195,14 @@ class GR_CORE_API gr_block_detail {
std::vector<gr_buffer_sptr> d_output;
bool d_done;
+ // Performance counters
+ float d_avg_noutput_items;
+ float d_avg_nproduced;
+ std::vector<float> d_avg_input_buffers_full;
+ std::vector<float> d_avg_output_buffers_full;
+ gruel::high_res_timer_type d_start_of_work, d_end_of_work;
+ float d_avg_work_time;
+
gr_block_detail (unsigned int ninputs, unsigned int noutputs);
friend struct gr_tpb_detail;
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
index 375b58f56..27f591452 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_executor.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
@@ -64,22 +64,21 @@ round_down (unsigned int n, unsigned int multiple)
// on is done.
//
static int
-min_available_space (gr_block_detail *d, int output_multiple)
+min_available_space (gr_block_detail *d, int output_multiple, int min_noutput_items)
{
- int min_space = std::numeric_limits<int>::max();
-
+ int min_space = std::numeric_limits<int>::max();
+ if (min_noutput_items == 0)
+ min_noutput_items = 1;
for (int i = 0; i < d->noutputs (); i++){
gruel::scoped_lock guard(*d->output(i)->mutex());
-#if 0
- int n = round_down(d->output(i)->space_available(), output_multiple);
-#else
- int n = round_down(std::min(d->output(i)->space_available(),
- d->output(i)->bufsize()/2),
- output_multiple);
-#endif
- if (n == 0){ // We're blocked on output.
- if (d->output(i)->done()){ // Downstream is done, therefore we're done.
- return -1;
+ int avail_n = round_down(d->output(i)->space_available(), output_multiple);
+ int best_n = round_down(d->output(i)->bufsize()/2, output_multiple);
+ if (best_n < min_noutput_items)
+ throw std::runtime_error("Buffer too small for min_noutput_items");
+ int n = std::min(avail_n, best_n);
+ if (n < min_noutput_items){ // We're blocked on output.
+ if (d->output(i)->done()){ // Downstream is done, therefore we're done.
+ return -1;
}
return 0;
}
@@ -205,7 +204,7 @@ gr_block_executor::run_one_iteration()
d_start_nitems_read.resize(0);
// determine the minimum available output space
- noutput_items = min_available_space (d, m->output_multiple ());
+ noutput_items = min_available_space (d, m->output_multiple (), m->min_noutput_items ());
noutput_items = std::min(noutput_items, max_noutput_items);
LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
if (noutput_items == -1) // we're done
@@ -286,7 +285,7 @@ gr_block_executor::run_one_iteration()
}
// determine the minimum available output space
- noutput_items = min_available_space (d, m->output_multiple ());
+ noutput_items = min_available_space (d, m->output_multiple (), m->min_noutput_items ());
if (ENABLE_LOGGING){
*d_log << " regular ";
if (m->relative_rate() >= 1.0)
@@ -420,9 +419,18 @@ gr_block_executor::run_one_iteration()
for (int i = 0; i < d->ninputs(); i++)
d_start_nitems_read[i] = d->nitems_read(i);
+#ifdef GR_PERFORMANCE_COUNTERS
+ d->start_perf_counters();
+#endif /* GR_PERFORMANCE_COUNTERS */
+
// Do the actual work of the block
int n = m->general_work (noutput_items, d_ninput_items,
d_input_items, d_output_items);
+
+#ifdef GR_PERFORMANCE_COUNTERS
+ d->stop_perf_counters(noutput_items, n);
+#endif /* GR_PERFORMANCE_COUNTERS */
+
LOG(*d_log << " general_work: noutput_items = " << noutput_items
<< " result = " << n << std::endl);
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.cc b/gnuradio-core/src/lib/runtime/gr_buffer.cc
index b923ca57a..369959d65 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.cc
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.cc
@@ -234,6 +234,18 @@ gr_buffer::add_item_tag(const gr_tag_t &tag)
}
void
+gr_buffer::remove_item_tag(const gr_tag_t &tag)
+{
+ gruel::scoped_lock guard(*mutex());
+ for (std::deque<gr_tag_t>::iterator it = d_item_tags.begin(); it != d_item_tags.end(); ++it) {
+ if (*it == tag) {
+ d_item_tags.erase(it);
+ break;
+ }
+ }
+}
+
+void
gr_buffer::prune_tags(uint64_t max_time)
{
/* NOTE: this function _should_ lock the mutex before editing
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.h b/gnuradio-core/src/lib/runtime/gr_buffer.h
index 67d48fb2d..28ea97726 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.h
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.h
@@ -103,6 +103,15 @@ class GR_CORE_API gr_buffer {
void add_item_tag(const gr_tag_t &tag);
/*!
+ * \brief Removes an existing tag from the buffer.
+ *
+ * If no such tag is found, does nothing.
+ *
+ * \param tag the tag that needs to be removed
+ */
+ void remove_item_tag(const gr_tag_t &tag);
+
+ /*!
* \brief Removes all tags before \p max_time from buffer
*
* \param max_time the time (item number) to trim up until.
diff --git a/gnuradio-core/src/lib/runtime/gr_tags.h b/gnuradio-core/src/lib/runtime/gr_tags.h
index 8bffcd0fe..a9ca90235 100644
--- a/gnuradio-core/src/lib/runtime/gr_tags.h
+++ b/gnuradio-core/src/lib/runtime/gr_tags.h
@@ -45,6 +45,11 @@ struct GR_CORE_API gr_tag_t{
){
return x.offset < y.offset;
}
+
+ inline bool operator == (const gr_tag_t &t) const
+ {
+ return (t.key == key) && (t.value == value) && (t.srcid == srcid) && (t.offset == offset);
+ }
};
#endif /*INCLUDED_GR_TAGS_H*/
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index 9f17a48a8..cea374fac 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -38,6 +38,13 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
gr_block_executor::state s;
pmt_t msg;
+ d->threaded = true;
+ d->thread = gruel::get_current_thread_id();
+
+ // Set thread affinity if it was set before fg was started.
+ if(block->processor_affinity().size() > 0) {
+ gruel::thread_bind_to_processor(d->thread, block->processor_affinity());
+ }
while (1){
boost::this_thread::interruption_point();
diff --git a/gnuradio-core/src/lib/runtime/gr_types.h b/gnuradio-core/src/lib/runtime/gr_types.h
index ad6cee768..db13e456a 100644
--- a/gnuradio-core/src/lib/runtime/gr_types.h
+++ b/gnuradio-core/src/lib/runtime/gr_types.h
@@ -31,6 +31,7 @@
#include <gr_complex.h>
typedef std::vector<int> gr_vector_int;
+typedef std::vector<unsigned int> gr_vector_uint;
typedef std::vector<float> gr_vector_float;
typedef std::vector<double> gr_vector_double;
typedef std::vector<void *> gr_vector_void_star;
diff --git a/gnuradio-core/src/lib/runtime/qa_gr_top_block.cc b/gnuradio-core/src/lib/runtime/qa_gr_top_block.cc
index a0b4755a8..1d3dafadf 100644
--- a/gnuradio-core/src/lib/runtime/qa_gr_top_block.cc
+++ b/gnuradio-core/src/lib/runtime/qa_gr_top_block.cc
@@ -262,3 +262,24 @@ void qa_gr_top_block::t10_reconfig_max_output_buffer()
// Wait for flowgraph to end on its own
tb->wait();
}
+
+void qa_gr_top_block::t11_set_block_affinity()
+{
+ gr_top_block_sptr tb = gr_make_top_block("top");
+ gr_block_sptr src (gr_make_null_source(sizeof(float)));
+ gr_block_sptr snk (gr_make_null_sink(sizeof(float)));
+
+ std::vector<unsigned int> set(1, 0), ret;
+ src->set_processor_affinity(set);
+
+ tb->connect(src, 0, snk, 0);
+ tb->start();
+ tb->stop();
+ tb->wait();
+
+ ret = src->processor_affinity();
+
+ // We only set the core affinity to 0 because we always know at
+ // least one thread core exists to use.
+ CPPUNIT_ASSERT_EQUAL(set[0], ret[0]);
+}
diff --git a/gnuradio-core/src/lib/runtime/qa_gr_top_block.h b/gnuradio-core/src/lib/runtime/qa_gr_top_block.h
index bb891abca..634eeab1f 100644
--- a/gnuradio-core/src/lib/runtime/qa_gr_top_block.h
+++ b/gnuradio-core/src/lib/runtime/qa_gr_top_block.h
@@ -38,6 +38,11 @@ class qa_gr_top_block : public CppUnit::TestCase
CPPUNIT_TEST(t4_reconfigure); // triggers 'join never returns' bug
CPPUNIT_TEST(t5_max_noutputs);
CPPUNIT_TEST(t6_reconfig_max_noutputs);
+ CPPUNIT_TEST(t7_max_noutputs_per_block);
+ CPPUNIT_TEST(t8_reconfig_max_noutputs_per_block);
+ CPPUNIT_TEST(t9_max_output_buffer);
+ CPPUNIT_TEST(t10_reconfig_max_output_buffer);
+ CPPUNIT_TEST(t11_set_block_affinity);
CPPUNIT_TEST_SUITE_END();
@@ -54,6 +59,8 @@ private:
void t8_reconfig_max_noutputs_per_block();
void t9_max_output_buffer();
void t10_reconfig_max_output_buffer();
+ void t11_set_block_affinity();
+
};
#endif /* INCLUDED_QA_GR_TOP_BLOCK_H */