diff options
author | Josh Blum | 2012-10-05 01:22:56 -0700 |
---|---|---|
committer | Josh Blum | 2012-10-05 01:22:56 -0700 |
commit | 9b06c037fcb618199e99518ee120ef3b2f1e96a7 (patch) | |
tree | 1e096af554f692991c28c46605bf5651ba0832b6 /lib | |
parent | 075dfb6aae5c309b34789761aee3acd63f9d03fc (diff) | |
download | sandhi-9b06c037fcb618199e99518ee120ef3b2f1e96a7.tar.gz sandhi-9b06c037fcb618199e99518ee120ef3b2f1e96a7.tar.bz2 sandhi-9b06c037fcb618199e99518ee120ef3b2f1e96a7.zip |
created configurable thread pool
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block.cpp | 4 | ||||
-rw-r--r-- | lib/block_actor.cpp | 63 | ||||
-rw-r--r-- | lib/element.cpp | 2 | ||||
-rw-r--r-- | lib/element_impl.hpp | 1 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 4 | ||||
-rw-r--r-- | lib/sbuffer.cpp | 3 |
6 files changed, 66 insertions, 11 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 84ae09e..85500f3 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -40,6 +40,7 @@ Block::Block(const std::string &name): Element(name) { (*this)->block = boost::shared_ptr<BlockActor>(new BlockActor()); + (*this)->thread_pool = (*this)->block->thread_pool; //ref copy of pool (*this)->block->name = name; //for debug purposes //setup some state variables @@ -55,6 +56,7 @@ Block::Block(const std::string &name): this->set_relative_rate(1.0); this->set_tag_propagation_policy(TPP_ALL_TO_ALL); this->set_interruptible_work(false); + this->set_buffer_affinity(-1); } template <typename V, typename T> @@ -236,7 +238,7 @@ bool Block::check_topology(int, int) return true; } -void Block::set_buffer_affinity(const Affinity &affinity) +void Block::set_buffer_affinity(const long affinity) { (*this)->block->buffer_affinity = affinity; } diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp index c0a294a..0b0199b 100644 --- a/lib/block_actor.cpp +++ b/lib/block_actor.cpp @@ -14,27 +14,74 @@ // You should have received a copy of the GNU Lesser General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. +#include <gnuradio/thread_pool.hpp> #include <gras_impl/block_actor.hpp> -#include <boost/thread/thread.hpp> #include <Theron/Framework.h> using namespace gnuradio; -static size_t hardware_concurrency(void) +/*********************************************************************** + * Thread pool implementation + **********************************************************************/ +ThreadPool::ThreadPool(void) { - const size_t n = boost::thread::hardware_concurrency(); - return std::max(size_t(2), n); + //NOP } -static Theron::Framework &get_global_framework(void) +ThreadPool::ThreadPool(boost::weak_ptr<Theron::Framework> p): + boost::shared_ptr<Theron::Framework>(p.lock()) { - static Theron::Framework framework(hardware_concurrency()); - return framework; + //NOP } +ThreadPool::ThreadPool(const unsigned long threadCount) +{ + if (threadCount == 0) this->reset(new Theron::Framework(Theron::Framework::Parameters())); + else this->reset(new Theron::Framework(Theron::Framework::Parameters(threadCount))); +} + +ThreadPool::ThreadPool(const unsigned long threadCount, const unsigned long nodeMask) +{ + this->reset(new Theron::Framework(Theron::Framework::Parameters(threadCount, nodeMask))); +} + +ThreadPool::ThreadPool(const unsigned long threadCount, const unsigned long nodeMask, const unsigned long processorMask) +{ + this->reset(new Theron::Framework(Theron::Framework::Parameters(threadCount, nodeMask, processorMask))); +} + +/*********************************************************************** + * Active framework implementation + **********************************************************************/ +static boost::weak_ptr<Theron::Framework> weak_framework; + +void ThreadPool::set_active(void) +{ + weak_framework = *this; +} + +static ThreadPool active_thread_pool; + +static ThreadPool get_active_thread_pool(void) +{ + if (not weak_framework.lock()) + { + active_thread_pool = ThreadPool(0); + active_thread_pool.set_active(); + std::cout << "Created default thread pool with " << active_thread_pool->GetNumThreads() << " threads." << std::endl; + } + return weak_framework; +} + +/*********************************************************************** + * Block actor construction - gets active framework + **********************************************************************/ + BlockActor::BlockActor(void): - Apology::Worker(get_global_framework()) + Apology::Worker(*get_active_thread_pool()) { + thread_pool = get_active_thread_pool(); + active_thread_pool.reset(); //actors hold this, now its safe to reset, weak_framework only this->register_handlers(); } diff --git a/lib/element.cpp b/lib/element.cpp index 9987bd4..39cbca3 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -46,6 +46,8 @@ ElementImpl::~ElementImpl(void) { if (this->executor) this->top_block_cleanup(); if (this->topology) this->hier_block_cleanup(); + this->block.reset(); + this->thread_pool.reset(); //must be deleted after actor } Element &Element::shared_to_element(void) diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 95f4f3a..d058e63 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -51,6 +51,7 @@ struct ElementImpl boost::shared_ptr<Apology::Topology> topology; boost::shared_ptr<Apology::Executor> executor; boost::shared_ptr<BlockActor> block; + ThreadPool thread_pool; Apology::Base *get_elem(void) const { if (block) return block.get(); diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 4fe6359..d85e366 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -22,6 +22,7 @@ #include <gnuradio/gras.hpp> #include <gnuradio/block.hpp> #include <gnuradio/top_block.hpp> +#include <gnuradio/thread_pool.hpp> #include <Apology/Worker.hpp> #include <gras_impl/token.hpp> #include <gras_impl/messages.hpp> @@ -50,6 +51,7 @@ struct BlockActor : Apology::Worker ~BlockActor(void); Block *block_ptr; std::string name; //for debug + ThreadPool thread_pool; //do it here so we can match w/ the handler declarations void register_handlers(void) @@ -188,7 +190,7 @@ struct BlockActor : Apology::Worker BLOCK_STATE_LIVE, BLOCK_STATE_DONE, } block_state; - Affinity buffer_affinity; + long buffer_affinity; std::vector<std::vector<OutputHintMessage> > output_allocation_hints; diff --git a/lib/sbuffer.cpp b/lib/sbuffer.cpp index 3133caa..2fe7533 100644 --- a/lib/sbuffer.cpp +++ b/lib/sbuffer.cpp @@ -47,6 +47,7 @@ SBufferConfig::SBufferConfig(void) { memory = NULL; length = 0; + affinity = -1; } static void numanuma_mem_deleter(SBuffer &, numanuma::mem *m) @@ -61,7 +62,7 @@ static void default_allocator_deleter(SBuffer &, char *m) static void default_allocator(SBufferConfig &config) { - if (config.affinity == Affinity()) + if (config.affinity == -1) { char *m = new char[config.length + GRAS_MAX_ALIGNMENT - 1]; size_t x = size_t(m) + GRAS_MAX_ALIGNMENT - 1; |