summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/doxygen/other/thread_affinity.dox65
-rw-r--r--gnuradio-core/src/examples/mp-sched/CMakeLists.txt1
-rwxr-xr-xgnuradio-core/src/examples/mp-sched/affinity_set.py73
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.cc18
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.h21
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.i5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.cc21
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.h14
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc7
-rw-r--r--gruel/src/lib/thread.cc5
10 files changed, 229 insertions, 1 deletions
diff --git a/docs/doxygen/other/thread_affinity.dox b/docs/doxygen/other/thread_affinity.dox
new file mode 100644
index 000000000..fbbc449a4
--- /dev/null
+++ b/docs/doxygen/other/thread_affinity.dox
@@ -0,0 +1,65 @@
+/*! \page page_affinity Block Thread Affinity
+
+\section intro Introduction
+
+In the thread-per-block scheduler, you can set the block's core
+affinity. Each block can be pinned to a group cores or be set back
+to use the standard kernel scheduler.
+
+The implementation is done by adding new functions to the GRUEL
+library:
+
+\code
+ gr_thread_t get_current_thread_id();
+ void thread_bind_to_processor(unsigned int n);
+ void thread_bind_to_processor(const std::vector<unsigned int> &mask);
+ void thread_bind_to_processor(gr_thread_t thread, unsigned int n);
+ void thread_bind_to_processor(gr_thread_t thread, const std::vector<unsigned int> &mask);
+ void thread_unbind();
+ void thread_unbind(gr_thread_t thread);
+\endcode
+
+The ability to set a thread's affinity to a core or groups of cores is
+not implemented in the Boost thread library, and so we have made our
+own portability library. In particular, the gruel::gr_thread_t type is
+defined as the thread type for the given system. The other functions
+are designed to be portable as well by calling the specific
+implementation for the thread affinity for a particular platform.
+
+There are functions to set a thread to a group of cores. If the thread
+is not given, the current thread is used. If a single number is
+passed, only that core is set (this is equivalent to a core mask with
+just a single value).
+
+Similarly, there are functions to unset the affinity. This practically
+implements the setting of the thread's affinity to all possible
+cores. Again, the function that does not take a thread argument unsets
+the affinity for the current thread.
+
+
+\section api GNU Radio Block API
+
+Each block has two new data members:
+
+- threaded: a boolean value that is true if the block is attached to a
+ thread.
+- thread: a gruel::gr_thread_t handle to the block's thread.
+
+A block can set and unset it's affinity at any time using the
+following member functions:
+
+- gr_block::set_processor_affinity(const std::vector<unsigned int> &mask)
+- gr_block::unset_processor_affinity()
+
+Where \p mask is a vector of core numbers to set the thread's affinity
+to.
+
+The current core affinity can be retrieved using the member function:
+
+- gr_block::processor_affinity()
+
+When set before the flowgraph is started, the scheduler will set the
+thread's affinity when it is started. When already running, the
+block's affinity will be immediately set.
+
+*/
diff --git a/gnuradio-core/src/examples/mp-sched/CMakeLists.txt b/gnuradio-core/src/examples/mp-sched/CMakeLists.txt
index dc47d17f9..d2d910ecf 100644
--- a/gnuradio-core/src/examples/mp-sched/CMakeLists.txt
+++ b/gnuradio-core/src/examples/mp-sched/CMakeLists.txt
@@ -20,6 +20,7 @@
include(GrPython)
GR_PYTHON_INSTALL(PROGRAMS
+ affinity_set.py
plot_flops.py
run_synthetic.py
synthetic.py
diff --git a/gnuradio-core/src/examples/mp-sched/affinity_set.py b/gnuradio-core/src/examples/mp-sched/affinity_set.py
new file mode 100755
index 000000000..b34477d14
--- /dev/null
+++ b/gnuradio-core/src/examples/mp-sched/affinity_set.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+##################################################
+# Gnuradio Python Flow Graph
+# Title: Affinity Set Test
+##################################################
+
+from gnuradio import eng_notation
+from gnuradio import gr
+from gnuradio.eng_option import eng_option
+from gnuradio.gr import firdes
+from optparse import OptionParser
+import sys
+
+class affinity_set(gr.top_block):
+
+ def __init__(self):
+ gr.top_block.__init__(self, "Affinity Set Test")
+
+ ##################################################
+ # Variables
+ ##################################################
+ self.samp_rate = samp_rate = 32000
+
+ ##################################################
+ # Blocks
+ ##################################################
+ vec_len = 1
+ self.gr_throttle_0 = gr.throttle(gr.sizeof_gr_complex*vec_len, samp_rate)
+ self.gr_null_source_0 = gr.null_source(gr.sizeof_gr_complex*vec_len)
+ self.gr_null_sink_0 = gr.null_sink(gr.sizeof_gr_complex*vec_len)
+ self.gr_filt_0 = gr.fir_filter_ccc(1, 40000*[0.2+0.3j,])
+ self.gr_filt_1 = gr.fir_filter_ccc(1, 40000*[0.2+0.3j,])
+
+ self.gr_filt_0.set_processor_affinity([0,])
+ self.gr_filt_1.set_processor_affinity([1,2])
+
+ ##################################################
+ # Connections
+ ##################################################
+ self.connect((self.gr_null_source_0, 0), (self.gr_throttle_0, 0))
+ self.connect((self.gr_throttle_0, 0), (self.gr_filt_0, 0))
+ self.connect((self.gr_filt_0, 0), (self.gr_filt_1, 0))
+ self.connect((self.gr_filt_1, 0), (self.gr_null_sink_0, 0))
+
+
+ # QT sink close method reimplementation
+
+ def get_samp_rate(self):
+ return self.samp_rate
+
+ def set_samp_rate(self, samp_rate):
+ self.samp_rate = samp_rate
+
+if __name__ == '__main__':
+ parser = OptionParser(option_class=eng_option, usage="%prog: [options]")
+ (options, args) = parser.parse_args()
+ tb = affinity_set()
+ tb.start()
+
+ while(1):
+ ret = raw_input('Press Enter to quit: ')
+ if(len(ret) == 0):
+ tb.stop()
+ sys.exit(0)
+ elif(ret.lower() == "none"):
+ tb.gr_filt_0.unset_processor_affinity()
+ else:
+ try:
+ n = int(ret)
+ except ValueError:
+ print "Invalid number"
+ else:
+ tb.gr_filt_0.set_processor_affinity([n,])
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index 5ba30955f..dca1fcf83 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -251,6 +251,24 @@ gr_block::is_set_max_noutput_items()
return d_max_noutput_items_set;
}
+void
+gr_block::set_processor_affinity(const std::vector<unsigned int> &mask)
+{
+ d_affinity = mask;
+ if(d_detail) {
+ d_detail->set_processor_affinity(d_affinity);
+ }
+}
+
+void
+gr_block::unset_processor_affinity()
+{
+ d_affinity.clear();
+ if(d_detail) {
+ d_detail->unset_processor_affinity();
+ }
+}
+
std::ostream&
operator << (std::ostream& os, const gr_block *m)
{
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 7a70bdaf0..9906d3632 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -359,6 +359,26 @@ class GR_CORE_API gr_block : public gr_basic_block {
}
// ----------------------------------------------------------------------------
+ // Functions to handle thread affinity
+
+ /*!
+ * \brief Set the thread's affinity to processor core \p n.
+ *
+ * \param mask a vector of unsigned ints of the core numbers available to this block.
+ */
+ void set_processor_affinity(const std::vector<unsigned int> &mask);
+
+ /*!
+ * \brief Remove processor affinity to a specific core.
+ */
+ void unset_processor_affinity();
+
+ /*!
+ * \brief Get the current processor affinity.
+ */
+ std::vector<unsigned int> processor_affinity() { return d_affinity; }
+
+ // ----------------------------------------------------------------------------
private:
@@ -373,6 +393,7 @@ class GR_CORE_API gr_block : public gr_basic_block {
bool d_max_noutput_items_set; // if d_max_noutput_items is valid
int d_max_noutput_items; // value of max_noutput_items for this block
tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags downstream
+ std::vector<unsigned int> d_affinity; // thread affinity proc. mask
protected:
gr_block (void){} //allows pure virtual interface sub-classes
diff --git a/gnuradio-core/src/lib/runtime/gr_block.i b/gnuradio-core/src/lib/runtime/gr_block.i
index db6c1d04a..89685d41f 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.i
+++ b/gnuradio-core/src/lib/runtime/gr_block.i
@@ -66,6 +66,11 @@ class gr_block : public gr_basic_block {
void set_min_output_buffer(long min_output_buffer);
void set_min_output_buffer(int port, long min_output_buffer);
+ // Methods to manage processor affinity.
+ void set_processor_affinity(const std::vector<unsigned int> &mask);
+ void unset_processor_affinity();
+ std::vector<unsigned int> processor_affinity();
+
// internal use
gr_block_detail_sptr detail () const { return d_detail; }
void set_detail (gr_block_detail_sptr detail) { d_detail = detail; }
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
index c65493473..e6199be37 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
@@ -201,3 +201,24 @@ gr_block_detail::get_tags_in_range(std::vector<gr_tag_t> &v,
}
}
}
+
+void
+gr_block_detail::set_processor_affinity(const std::vector<unsigned int> &mask)
+{
+ if(threaded) {
+ try {
+ gruel::thread_bind_to_processor(thread, mask);
+ }
+ catch (std::runtime_error e) {
+ std::cerr << "set_processor_affinity: invalid mask." << std::endl;;
+ }
+ }
+}
+
+void
+gr_block_detail::unset_processor_affinity()
+{
+ if(threaded) {
+ gruel::thread_unbind(thread);
+ }
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index af00ea7c7..bcc42c7a0 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -155,6 +155,20 @@ class GR_CORE_API gr_block_detail {
uint64_t abs_end,
const pmt::pmt_t &key);
+ /*!
+ * \brief Set core affinity of block to the cores in the vector mask.
+ *
+ * \param mask a vector of unsigned ints of the core numbers available to this block.
+ */
+ void set_processor_affinity(const std::vector<unsigned int> &mask);
+
+ /*!
+ * \brief Unset core affinity.
+ */
+ void unset_processor_affinity();
+
+ bool threaded; // set if thread is currently running.
+ gruel::gr_thread_t thread; // portable thread handle
gr_tpb_detail d_tpb; // used by thread-per-block scheduler
int d_produce_or;
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index 9f17a48a8..cea374fac 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -38,6 +38,13 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
gr_block_executor::state s;
pmt_t msg;
+ d->threaded = true;
+ d->thread = gruel::get_current_thread_id();
+
+ // Set thread affinity if it was set before fg was started.
+ if(block->processor_affinity().size() > 0) {
+ gruel::thread_bind_to_processor(d->thread, block->processor_affinity());
+ }
while (1){
boost::this_thread::interruption_point();
diff --git a/gruel/src/lib/thread.cc b/gruel/src/lib/thread.cc
index 46ba1745a..ab022c0bd 100644
--- a/gruel/src/lib/thread.cc
+++ b/gruel/src/lib/thread.cc
@@ -118,13 +118,16 @@ namespace gruel {
}
void
- thread_bind_to_processor(gr_thread_t thread, const std::vector<unsigned int> &mask)
+ thread_bind_to_processor(const std::vector<unsigned int> &mask)
{
// Not implemented on OSX
}
void
thread_bind_to_processor(gr_thread_t thread, const std::vector<unsigned int> &mask)
+ {
+ // Not implemented on OSX
+ }
void
thread_unbind()