summaryrefslogtreecommitdiff
path: root/gnuradio-core/src
diff options
context:
space:
mode:
authorTom Rondeau2012-12-19 13:38:24 -0500
committerNicholas Corgan2013-02-06 12:35:06 -0800
commit01eab0c7e283db9c1cfff0a26296a49128062cca (patch)
tree018a9becab8641ebac71910a186d45324591e30c /gnuradio-core/src
parent7bc415bd2875132ce321c5913950d23e7c9ad8b1 (diff)
downloadgnuradio-01eab0c7e283db9c1cfff0a26296a49128062cca.tar.gz
gnuradio-01eab0c7e283db9c1cfff0a26296a49128062cca.tar.bz2
gnuradio-01eab0c7e283db9c1cfff0a26296a49128062cca.zip
core: working thread affinity concept into gr_blocks.
Example in gnuradio-core/src/examples/mp-sched/affinity_set.py Documentation describing API in docs/doxygen/other/thread_affinity.dox
Diffstat (limited to 'gnuradio-core/src')
-rw-r--r--gnuradio-core/src/examples/mp-sched/CMakeLists.txt1
-rwxr-xr-xgnuradio-core/src/examples/mp-sched/affinity_set.py73
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.cc18
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.h21
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.i5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.cc21
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.h14
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc7
8 files changed, 160 insertions, 0 deletions
diff --git a/gnuradio-core/src/examples/mp-sched/CMakeLists.txt b/gnuradio-core/src/examples/mp-sched/CMakeLists.txt
index dc47d17f9..d2d910ecf 100644
--- a/gnuradio-core/src/examples/mp-sched/CMakeLists.txt
+++ b/gnuradio-core/src/examples/mp-sched/CMakeLists.txt
@@ -20,6 +20,7 @@
include(GrPython)
GR_PYTHON_INSTALL(PROGRAMS
+ affinity_set.py
plot_flops.py
run_synthetic.py
synthetic.py
diff --git a/gnuradio-core/src/examples/mp-sched/affinity_set.py b/gnuradio-core/src/examples/mp-sched/affinity_set.py
new file mode 100755
index 000000000..b34477d14
--- /dev/null
+++ b/gnuradio-core/src/examples/mp-sched/affinity_set.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+##################################################
+# Gnuradio Python Flow Graph
+# Title: Affinity Set Test
+##################################################
+
+from gnuradio import eng_notation
+from gnuradio import gr
+from gnuradio.eng_option import eng_option
+from gnuradio.gr import firdes
+from optparse import OptionParser
+import sys
+
+class affinity_set(gr.top_block):
+
+ def __init__(self):
+ gr.top_block.__init__(self, "Affinity Set Test")
+
+ ##################################################
+ # Variables
+ ##################################################
+ self.samp_rate = samp_rate = 32000
+
+ ##################################################
+ # Blocks
+ ##################################################
+ vec_len = 1
+ self.gr_throttle_0 = gr.throttle(gr.sizeof_gr_complex*vec_len, samp_rate)
+ self.gr_null_source_0 = gr.null_source(gr.sizeof_gr_complex*vec_len)
+ self.gr_null_sink_0 = gr.null_sink(gr.sizeof_gr_complex*vec_len)
+ self.gr_filt_0 = gr.fir_filter_ccc(1, 40000*[0.2+0.3j,])
+ self.gr_filt_1 = gr.fir_filter_ccc(1, 40000*[0.2+0.3j,])
+
+ self.gr_filt_0.set_processor_affinity([0,])
+ self.gr_filt_1.set_processor_affinity([1,2])
+
+ ##################################################
+ # Connections
+ ##################################################
+ self.connect((self.gr_null_source_0, 0), (self.gr_throttle_0, 0))
+ self.connect((self.gr_throttle_0, 0), (self.gr_filt_0, 0))
+ self.connect((self.gr_filt_0, 0), (self.gr_filt_1, 0))
+ self.connect((self.gr_filt_1, 0), (self.gr_null_sink_0, 0))
+
+
+ # QT sink close method reimplementation
+
+ def get_samp_rate(self):
+ return self.samp_rate
+
+ def set_samp_rate(self, samp_rate):
+ self.samp_rate = samp_rate
+
+if __name__ == '__main__':
+ parser = OptionParser(option_class=eng_option, usage="%prog: [options]")
+ (options, args) = parser.parse_args()
+ tb = affinity_set()
+ tb.start()
+
+ while(1):
+ ret = raw_input('Press Enter to quit: ')
+ if(len(ret) == 0):
+ tb.stop()
+ sys.exit(0)
+ elif(ret.lower() == "none"):
+ tb.gr_filt_0.unset_processor_affinity()
+ else:
+ try:
+ n = int(ret)
+ except ValueError:
+ print "Invalid number"
+ else:
+ tb.gr_filt_0.set_processor_affinity([n,])
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index 5ba30955f..dca1fcf83 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -251,6 +251,24 @@ gr_block::is_set_max_noutput_items()
return d_max_noutput_items_set;
}
+void
+gr_block::set_processor_affinity(const std::vector<unsigned int> &mask)
+{
+ d_affinity = mask;
+ if(d_detail) {
+ d_detail->set_processor_affinity(d_affinity);
+ }
+}
+
+void
+gr_block::unset_processor_affinity()
+{
+ d_affinity.clear();
+ if(d_detail) {
+ d_detail->unset_processor_affinity();
+ }
+}
+
std::ostream&
operator << (std::ostream& os, const gr_block *m)
{
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 7a70bdaf0..9906d3632 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -359,6 +359,26 @@ class GR_CORE_API gr_block : public gr_basic_block {
}
// ----------------------------------------------------------------------------
+ // Functions to handle thread affinity
+
+ /*!
+ * \brief Set the thread's affinity to processor core \p n.
+ *
+ * \param mask a vector of unsigned ints of the core numbers available to this block.
+ */
+ void set_processor_affinity(const std::vector<unsigned int> &mask);
+
+ /*!
+ * \brief Remove processor affinity to a specific core.
+ */
+ void unset_processor_affinity();
+
+ /*!
+ * \brief Get the current processor affinity.
+ */
+ std::vector<unsigned int> processor_affinity() { return d_affinity; }
+
+ // ----------------------------------------------------------------------------
private:
@@ -373,6 +393,7 @@ class GR_CORE_API gr_block : public gr_basic_block {
bool d_max_noutput_items_set; // if d_max_noutput_items is valid
int d_max_noutput_items; // value of max_noutput_items for this block
tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags downstream
+ std::vector<unsigned int> d_affinity; // thread affinity proc. mask
protected:
gr_block (void){} //allows pure virtual interface sub-classes
diff --git a/gnuradio-core/src/lib/runtime/gr_block.i b/gnuradio-core/src/lib/runtime/gr_block.i
index db6c1d04a..89685d41f 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.i
+++ b/gnuradio-core/src/lib/runtime/gr_block.i
@@ -66,6 +66,11 @@ class gr_block : public gr_basic_block {
void set_min_output_buffer(long min_output_buffer);
void set_min_output_buffer(int port, long min_output_buffer);
+ // Methods to manage processor affinity.
+ void set_processor_affinity(const std::vector<unsigned int> &mask);
+ void unset_processor_affinity();
+ std::vector<unsigned int> processor_affinity();
+
// internal use
gr_block_detail_sptr detail () const { return d_detail; }
void set_detail (gr_block_detail_sptr detail) { d_detail = detail; }
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
index c65493473..e6199be37 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
@@ -201,3 +201,24 @@ gr_block_detail::get_tags_in_range(std::vector<gr_tag_t> &v,
}
}
}
+
+void
+gr_block_detail::set_processor_affinity(const std::vector<unsigned int> &mask)
+{
+ if(threaded) {
+ try {
+ gruel::thread_bind_to_processor(thread, mask);
+ }
+ catch (std::runtime_error e) {
+ std::cerr << "set_processor_affinity: invalid mask." << std::endl;;
+ }
+ }
+}
+
+void
+gr_block_detail::unset_processor_affinity()
+{
+ if(threaded) {
+ gruel::thread_unbind(thread);
+ }
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index af00ea7c7..bcc42c7a0 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -155,6 +155,20 @@ class GR_CORE_API gr_block_detail {
uint64_t abs_end,
const pmt::pmt_t &key);
+ /*!
+ * \brief Set core affinity of block to the cores in the vector mask.
+ *
+ * \param mask a vector of unsigned ints of the core numbers available to this block.
+ */
+ void set_processor_affinity(const std::vector<unsigned int> &mask);
+
+ /*!
+ * \brief Unset core affinity.
+ */
+ void unset_processor_affinity();
+
+ bool threaded; // set if thread is currently running.
+ gruel::gr_thread_t thread; // portable thread handle
gr_tpb_detail d_tpb; // used by thread-per-block scheduler
int d_produce_or;
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index 9f17a48a8..cea374fac 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -38,6 +38,13 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
gr_block_executor::state s;
pmt_t msg;
+ d->threaded = true;
+ d->thread = gruel::get_current_thread_id();
+
+ // Set thread affinity if it was set before fg was started.
+ if(block->processor_affinity().size() > 0) {
+ gruel::thread_bind_to_processor(d->thread, block->processor_affinity());
+ }
while (1){
boost::this_thread::interruption_point();