summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-10-05 01:22:56 -0700
committerJosh Blum2012-10-05 01:22:56 -0700
commit9b06c037fcb618199e99518ee120ef3b2f1e96a7 (patch)
tree1e096af554f692991c28c46605bf5651ba0832b6 /lib
parent075dfb6aae5c309b34789761aee3acd63f9d03fc (diff)
downloadsandhi-9b06c037fcb618199e99518ee120ef3b2f1e96a7.tar.gz
sandhi-9b06c037fcb618199e99518ee120ef3b2f1e96a7.tar.bz2
sandhi-9b06c037fcb618199e99518ee120ef3b2f1e96a7.zip
created configurable thread pool
Diffstat (limited to 'lib')
-rw-r--r--lib/block.cpp4
-rw-r--r--lib/block_actor.cpp63
-rw-r--r--lib/element.cpp2
-rw-r--r--lib/element_impl.hpp1
-rw-r--r--lib/gras_impl/block_actor.hpp4
-rw-r--r--lib/sbuffer.cpp3
6 files changed, 66 insertions, 11 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 84ae09e..85500f3 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -40,6 +40,7 @@ Block::Block(const std::string &name):
Element(name)
{
(*this)->block = boost::shared_ptr<BlockActor>(new BlockActor());
+ (*this)->thread_pool = (*this)->block->thread_pool; //ref copy of pool
(*this)->block->name = name; //for debug purposes
//setup some state variables
@@ -55,6 +56,7 @@ Block::Block(const std::string &name):
this->set_relative_rate(1.0);
this->set_tag_propagation_policy(TPP_ALL_TO_ALL);
this->set_interruptible_work(false);
+ this->set_buffer_affinity(-1);
}
template <typename V, typename T>
@@ -236,7 +238,7 @@ bool Block::check_topology(int, int)
return true;
}
-void Block::set_buffer_affinity(const Affinity &affinity)
+void Block::set_buffer_affinity(const long affinity)
{
(*this)->block->buffer_affinity = affinity;
}
diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp
index c0a294a..0b0199b 100644
--- a/lib/block_actor.cpp
+++ b/lib/block_actor.cpp
@@ -14,27 +14,74 @@
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
+#include <gnuradio/thread_pool.hpp>
#include <gras_impl/block_actor.hpp>
-#include <boost/thread/thread.hpp>
#include <Theron/Framework.h>
using namespace gnuradio;
-static size_t hardware_concurrency(void)
+/***********************************************************************
+ * Thread pool implementation
+ **********************************************************************/
+ThreadPool::ThreadPool(void)
{
- const size_t n = boost::thread::hardware_concurrency();
- return std::max(size_t(2), n);
+ //NOP
}
-static Theron::Framework &get_global_framework(void)
+ThreadPool::ThreadPool(boost::weak_ptr<Theron::Framework> p):
+ boost::shared_ptr<Theron::Framework>(p.lock())
{
- static Theron::Framework framework(hardware_concurrency());
- return framework;
+ //NOP
}
+ThreadPool::ThreadPool(const unsigned long threadCount)
+{
+ if (threadCount == 0) this->reset(new Theron::Framework(Theron::Framework::Parameters()));
+ else this->reset(new Theron::Framework(Theron::Framework::Parameters(threadCount)));
+}
+
+ThreadPool::ThreadPool(const unsigned long threadCount, const unsigned long nodeMask)
+{
+ this->reset(new Theron::Framework(Theron::Framework::Parameters(threadCount, nodeMask)));
+}
+
+ThreadPool::ThreadPool(const unsigned long threadCount, const unsigned long nodeMask, const unsigned long processorMask)
+{
+ this->reset(new Theron::Framework(Theron::Framework::Parameters(threadCount, nodeMask, processorMask)));
+}
+
+/***********************************************************************
+ * Active framework implementation
+ **********************************************************************/
+static boost::weak_ptr<Theron::Framework> weak_framework;
+
+void ThreadPool::set_active(void)
+{
+ weak_framework = *this;
+}
+
+static ThreadPool active_thread_pool;
+
+static ThreadPool get_active_thread_pool(void)
+{
+ if (not weak_framework.lock())
+ {
+ active_thread_pool = ThreadPool(0);
+ active_thread_pool.set_active();
+ std::cout << "Created default thread pool with " << active_thread_pool->GetNumThreads() << " threads." << std::endl;
+ }
+ return weak_framework;
+}
+
+/***********************************************************************
+ * Block actor construction - gets active framework
+ **********************************************************************/
+
BlockActor::BlockActor(void):
- Apology::Worker(get_global_framework())
+ Apology::Worker(*get_active_thread_pool())
{
+ thread_pool = get_active_thread_pool();
+ active_thread_pool.reset(); //actors hold this, now its safe to reset, weak_framework only
this->register_handlers();
}
diff --git a/lib/element.cpp b/lib/element.cpp
index 9987bd4..39cbca3 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -46,6 +46,8 @@ ElementImpl::~ElementImpl(void)
{
if (this->executor) this->top_block_cleanup();
if (this->topology) this->hier_block_cleanup();
+ this->block.reset();
+ this->thread_pool.reset(); //must be deleted after actor
}
Element &Element::shared_to_element(void)
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 95f4f3a..d058e63 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -51,6 +51,7 @@ struct ElementImpl
boost::shared_ptr<Apology::Topology> topology;
boost::shared_ptr<Apology::Executor> executor;
boost::shared_ptr<BlockActor> block;
+ ThreadPool thread_pool;
Apology::Base *get_elem(void) const
{
if (block) return block.get();
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 4fe6359..d85e366 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -22,6 +22,7 @@
#include <gnuradio/gras.hpp>
#include <gnuradio/block.hpp>
#include <gnuradio/top_block.hpp>
+#include <gnuradio/thread_pool.hpp>
#include <Apology/Worker.hpp>
#include <gras_impl/token.hpp>
#include <gras_impl/messages.hpp>
@@ -50,6 +51,7 @@ struct BlockActor : Apology::Worker
~BlockActor(void);
Block *block_ptr;
std::string name; //for debug
+ ThreadPool thread_pool;
//do it here so we can match w/ the handler declarations
void register_handlers(void)
@@ -188,7 +190,7 @@ struct BlockActor : Apology::Worker
BLOCK_STATE_LIVE,
BLOCK_STATE_DONE,
} block_state;
- Affinity buffer_affinity;
+ long buffer_affinity;
std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
diff --git a/lib/sbuffer.cpp b/lib/sbuffer.cpp
index 3133caa..2fe7533 100644
--- a/lib/sbuffer.cpp
+++ b/lib/sbuffer.cpp
@@ -47,6 +47,7 @@ SBufferConfig::SBufferConfig(void)
{
memory = NULL;
length = 0;
+ affinity = -1;
}
static void numanuma_mem_deleter(SBuffer &, numanuma::mem *m)
@@ -61,7 +62,7 @@ static void default_allocator_deleter(SBuffer &, char *m)
static void default_allocator(SBufferConfig &config)
{
- if (config.affinity == Affinity())
+ if (config.affinity == -1)
{
char *m = new char[config.length + GRAS_MAX_ALIGNMENT - 1];
size_t x = size_t(m) + GRAS_MAX_ALIGNMENT - 1;