diff options
-rw-r--r-- | docs/doxygen/other/thread_affinity.dox | 65 | ||||
-rw-r--r-- | gnuradio-core/src/examples/mp-sched/CMakeLists.txt | 1 | ||||
-rwxr-xr-x | gnuradio-core/src/examples/mp-sched/affinity_set.py | 73 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block.cc | 18 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block.h | 21 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block.i | 5 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block_detail.cc | 21 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block_detail.h | 14 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc | 7 | ||||
-rw-r--r-- | gruel/src/lib/thread.cc | 5 |
10 files changed, 229 insertions, 1 deletions
diff --git a/docs/doxygen/other/thread_affinity.dox b/docs/doxygen/other/thread_affinity.dox new file mode 100644 index 000000000..fbbc449a4 --- /dev/null +++ b/docs/doxygen/other/thread_affinity.dox @@ -0,0 +1,65 @@ +/*! \page page_affinity Block Thread Affinity + +\section intro Introduction + +In the thread-per-block scheduler, you can set the block's core +affinity. Each block can be pinned to a group cores or be set back +to use the standard kernel scheduler. + +The implementation is done by adding new functions to the GRUEL +library: + +\code + gr_thread_t get_current_thread_id(); + void thread_bind_to_processor(unsigned int n); + void thread_bind_to_processor(const std::vector<unsigned int> &mask); + void thread_bind_to_processor(gr_thread_t thread, unsigned int n); + void thread_bind_to_processor(gr_thread_t thread, const std::vector<unsigned int> &mask); + void thread_unbind(); + void thread_unbind(gr_thread_t thread); +\endcode + +The ability to set a thread's affinity to a core or groups of cores is +not implemented in the Boost thread library, and so we have made our +own portability library. In particular, the gruel::gr_thread_t type is +defined as the thread type for the given system. The other functions +are designed to be portable as well by calling the specific +implementation for the thread affinity for a particular platform. + +There are functions to set a thread to a group of cores. If the thread +is not given, the current thread is used. If a single number is +passed, only that core is set (this is equivalent to a core mask with +just a single value). + +Similarly, there are functions to unset the affinity. This practically +implements the setting of the thread's affinity to all possible +cores. Again, the function that does not take a thread argument unsets +the affinity for the current thread. + + +\section api GNU Radio Block API + +Each block has two new data members: + +- threaded: a boolean value that is true if the block is attached to a + thread. +- thread: a gruel::gr_thread_t handle to the block's thread. + +A block can set and unset it's affinity at any time using the +following member functions: + +- gr_block::set_processor_affinity(const std::vector<unsigned int> &mask) +- gr_block::unset_processor_affinity() + +Where \p mask is a vector of core numbers to set the thread's affinity +to. + +The current core affinity can be retrieved using the member function: + +- gr_block::processor_affinity() + +When set before the flowgraph is started, the scheduler will set the +thread's affinity when it is started. When already running, the +block's affinity will be immediately set. + +*/ diff --git a/gnuradio-core/src/examples/mp-sched/CMakeLists.txt b/gnuradio-core/src/examples/mp-sched/CMakeLists.txt index dc47d17f9..d2d910ecf 100644 --- a/gnuradio-core/src/examples/mp-sched/CMakeLists.txt +++ b/gnuradio-core/src/examples/mp-sched/CMakeLists.txt @@ -20,6 +20,7 @@ include(GrPython) GR_PYTHON_INSTALL(PROGRAMS + affinity_set.py plot_flops.py run_synthetic.py synthetic.py diff --git a/gnuradio-core/src/examples/mp-sched/affinity_set.py b/gnuradio-core/src/examples/mp-sched/affinity_set.py new file mode 100755 index 000000000..b34477d14 --- /dev/null +++ b/gnuradio-core/src/examples/mp-sched/affinity_set.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python +################################################## +# Gnuradio Python Flow Graph +# Title: Affinity Set Test +################################################## + +from gnuradio import eng_notation +from gnuradio import gr +from gnuradio.eng_option import eng_option +from gnuradio.gr import firdes +from optparse import OptionParser +import sys + +class affinity_set(gr.top_block): + + def __init__(self): + gr.top_block.__init__(self, "Affinity Set Test") + + ################################################## + # Variables + ################################################## + self.samp_rate = samp_rate = 32000 + + ################################################## + # Blocks + ################################################## + vec_len = 1 + self.gr_throttle_0 = gr.throttle(gr.sizeof_gr_complex*vec_len, samp_rate) + self.gr_null_source_0 = gr.null_source(gr.sizeof_gr_complex*vec_len) + self.gr_null_sink_0 = gr.null_sink(gr.sizeof_gr_complex*vec_len) + self.gr_filt_0 = gr.fir_filter_ccc(1, 40000*[0.2+0.3j,]) + self.gr_filt_1 = gr.fir_filter_ccc(1, 40000*[0.2+0.3j,]) + + self.gr_filt_0.set_processor_affinity([0,]) + self.gr_filt_1.set_processor_affinity([1,2]) + + ################################################## + # Connections + ################################################## + self.connect((self.gr_null_source_0, 0), (self.gr_throttle_0, 0)) + self.connect((self.gr_throttle_0, 0), (self.gr_filt_0, 0)) + self.connect((self.gr_filt_0, 0), (self.gr_filt_1, 0)) + self.connect((self.gr_filt_1, 0), (self.gr_null_sink_0, 0)) + + + # QT sink close method reimplementation + + def get_samp_rate(self): + return self.samp_rate + + def set_samp_rate(self, samp_rate): + self.samp_rate = samp_rate + +if __name__ == '__main__': + parser = OptionParser(option_class=eng_option, usage="%prog: [options]") + (options, args) = parser.parse_args() + tb = affinity_set() + tb.start() + + while(1): + ret = raw_input('Press Enter to quit: ') + if(len(ret) == 0): + tb.stop() + sys.exit(0) + elif(ret.lower() == "none"): + tb.gr_filt_0.unset_processor_affinity() + else: + try: + n = int(ret) + except ValueError: + print "Invalid number" + else: + tb.gr_filt_0.set_processor_affinity([n,]) diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc index 5ba30955f..dca1fcf83 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_block.cc @@ -251,6 +251,24 @@ gr_block::is_set_max_noutput_items() return d_max_noutput_items_set; } +void +gr_block::set_processor_affinity(const std::vector<unsigned int> &mask) +{ + d_affinity = mask; + if(d_detail) { + d_detail->set_processor_affinity(d_affinity); + } +} + +void +gr_block::unset_processor_affinity() +{ + d_affinity.clear(); + if(d_detail) { + d_detail->unset_processor_affinity(); + } +} + std::ostream& operator << (std::ostream& os, const gr_block *m) { diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h index 7a70bdaf0..9906d3632 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.h +++ b/gnuradio-core/src/lib/runtime/gr_block.h @@ -359,6 +359,26 @@ class GR_CORE_API gr_block : public gr_basic_block { } // ---------------------------------------------------------------------------- + // Functions to handle thread affinity + + /*! + * \brief Set the thread's affinity to processor core \p n. + * + * \param mask a vector of unsigned ints of the core numbers available to this block. + */ + void set_processor_affinity(const std::vector<unsigned int> &mask); + + /*! + * \brief Remove processor affinity to a specific core. + */ + void unset_processor_affinity(); + + /*! + * \brief Get the current processor affinity. + */ + std::vector<unsigned int> processor_affinity() { return d_affinity; } + + // ---------------------------------------------------------------------------- private: @@ -373,6 +393,7 @@ class GR_CORE_API gr_block : public gr_basic_block { bool d_max_noutput_items_set; // if d_max_noutput_items is valid int d_max_noutput_items; // value of max_noutput_items for this block tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags downstream + std::vector<unsigned int> d_affinity; // thread affinity proc. mask protected: gr_block (void){} //allows pure virtual interface sub-classes diff --git a/gnuradio-core/src/lib/runtime/gr_block.i b/gnuradio-core/src/lib/runtime/gr_block.i index db6c1d04a..89685d41f 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.i +++ b/gnuradio-core/src/lib/runtime/gr_block.i @@ -66,6 +66,11 @@ class gr_block : public gr_basic_block { void set_min_output_buffer(long min_output_buffer); void set_min_output_buffer(int port, long min_output_buffer); + // Methods to manage processor affinity. + void set_processor_affinity(const std::vector<unsigned int> &mask); + void unset_processor_affinity(); + std::vector<unsigned int> processor_affinity(); + // internal use gr_block_detail_sptr detail () const { return d_detail; } void set_detail (gr_block_detail_sptr detail) { d_detail = detail; } diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc index c65493473..e6199be37 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc @@ -201,3 +201,24 @@ gr_block_detail::get_tags_in_range(std::vector<gr_tag_t> &v, } } } + +void +gr_block_detail::set_processor_affinity(const std::vector<unsigned int> &mask) +{ + if(threaded) { + try { + gruel::thread_bind_to_processor(thread, mask); + } + catch (std::runtime_error e) { + std::cerr << "set_processor_affinity: invalid mask." << std::endl;; + } + } +} + +void +gr_block_detail::unset_processor_affinity() +{ + if(threaded) { + gruel::thread_unbind(thread); + } +} diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h index af00ea7c7..bcc42c7a0 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_detail.h +++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h @@ -155,6 +155,20 @@ class GR_CORE_API gr_block_detail { uint64_t abs_end, const pmt::pmt_t &key); + /*! + * \brief Set core affinity of block to the cores in the vector mask. + * + * \param mask a vector of unsigned ints of the core numbers available to this block. + */ + void set_processor_affinity(const std::vector<unsigned int> &mask); + + /*! + * \brief Unset core affinity. + */ + void unset_processor_affinity(); + + bool threaded; // set if thread is currently running. + gruel::gr_thread_t thread; // portable thread handle gr_tpb_detail d_tpb; // used by thread-per-block scheduler int d_produce_or; diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc index 9f17a48a8..cea374fac 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc +++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc @@ -38,6 +38,13 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item gr_block_executor::state s; pmt_t msg; + d->threaded = true; + d->thread = gruel::get_current_thread_id(); + + // Set thread affinity if it was set before fg was started. + if(block->processor_affinity().size() > 0) { + gruel::thread_bind_to_processor(d->thread, block->processor_affinity()); + } while (1){ boost::this_thread::interruption_point(); diff --git a/gruel/src/lib/thread.cc b/gruel/src/lib/thread.cc index 46ba1745a..ab022c0bd 100644 --- a/gruel/src/lib/thread.cc +++ b/gruel/src/lib/thread.cc @@ -118,13 +118,16 @@ namespace gruel { } void - thread_bind_to_processor(gr_thread_t thread, const std::vector<unsigned int> &mask) + thread_bind_to_processor(const std::vector<unsigned int> &mask) { // Not implemented on OSX } void thread_bind_to_processor(gr_thread_t thread, const std::vector<unsigned int> &mask) + { + // Not implemented on OSX + } void thread_unbind() |