summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2013-09-12 23:06:12 -0700
committerJosh Blum2013-09-12 23:06:12 -0700
commit1494e66b4b448132030c233ef75dd9210b90e9ef (patch)
tree7420af1cff1b0d31ef24fb0d366d8e6f198c4c0d
parentf9c0d4c2e39aa28cc501ceb6479afc32f7849b11 (diff)
downloadsandhi-1494e66b4b448132030c233ef75dd9210b90e9ef.tar.gz
sandhi-1494e66b4b448132030c233ef75dd9210b90e9ef.tar.bz2
sandhi-1494e66b4b448132030c233ef75dd9210b90e9ef.zip
gras: added thread pool to global config
-rw-r--r--include/gras/block.i1
-rw-r--r--include/gras/block_config.hpp9
-rw-r--r--include/gras/element.hpp7
-rw-r--r--include/gras/element.i1
-rw-r--r--include/gras/hier_block.hpp1
-rw-r--r--include/gras/top_block.hpp7
-rw-r--r--lib/block.cpp13
-rw-r--r--lib/block_config.cpp12
-rw-r--r--lib/element.cpp5
-rw-r--r--lib/hier_block.cpp1
-rw-r--r--lib/top_block.cpp6
-rw-r--r--tests/thread_pool_test.py3
12 files changed, 60 insertions, 6 deletions
diff --git a/include/gras/block.i b/include/gras/block.i
index 8daa55a..1804c34 100644
--- a/include/gras/block.i
+++ b/include/gras/block.i
@@ -14,7 +14,6 @@
%include <gras/tag_iter.i>
%import <gras/sbuffer.i>
%include <gras/buffer_queue.hpp>
-%include <gras/thread_pool.hpp>
%include <gras/block_config.hpp>
%include <gras/block.hpp>
diff --git a/include/gras/block_config.hpp b/include/gras/block_config.hpp
index c90731b..17ff182 100644
--- a/include/gras/block_config.hpp
+++ b/include/gras/block_config.hpp
@@ -4,6 +4,7 @@
#define INCLUDED_GRAS_BLOCK_CONFIG_HPP
#include <gras/gras.hpp>
+#include <gras/thread_pool.hpp>
#include <cstddef>
namespace gras
@@ -53,6 +54,14 @@ struct GRAS_API GlobalBlockConfig
* Default = false.
*/
bool interruptible_work;
+
+ /*!
+ * This member sets the thread pool for the block.
+ * The block's actor will migrate to the new pool.
+ *
+ * Default = null thread pool.
+ */
+ ThreadPool thread_pool;
};
//! Configuration parameters for an input port
diff --git a/include/gras/element.hpp b/include/gras/element.hpp
index ce5d658..84f7062 100644
--- a/include/gras/element.hpp
+++ b/include/gras/element.hpp
@@ -76,6 +76,13 @@ struct GRAS_API Element : Callable, boost::shared_ptr<ElementImpl>
//! Get the global block config settings
GlobalBlockConfig &global_config(void);
+ /*!
+ * Commit changes to the global configuration.
+ * Call this after modifying the global config.
+ * Must be call to apply changes to the global config.
+ */
+ virtual void commit_config(void);
+
/*******************************************************************
* identification interface
******************************************************************/
diff --git a/include/gras/element.i b/include/gras/element.i
index 1380a97..e577e9d 100644
--- a/include/gras/element.i
+++ b/include/gras/element.i
@@ -27,6 +27,7 @@ namespace gras
%include <std_string.i>
%import <PMC/PMC.i>
%include <gras/gras.hpp>
+%include <gras/thread_pool.hpp>
%include <gras/block_config.hpp>
%include <gras/callable.hpp>
%include <gras/element.hpp>
diff --git a/include/gras/hier_block.hpp b/include/gras/hier_block.hpp
index 4a020be..b2da9be 100644
--- a/include/gras/hier_block.hpp
+++ b/include/gras/hier_block.hpp
@@ -18,6 +18,7 @@ struct GRAS_API HierBlock : Element
/*!
* Commit changes to the global configuration.
+ * Call this after modifying the global config.
* Must be call to apply changes to the global config.
*/
void commit_config(void);
diff --git a/include/gras/top_block.hpp b/include/gras/top_block.hpp
index 3dcf23f..3a1b9e5 100644
--- a/include/gras/top_block.hpp
+++ b/include/gras/top_block.hpp
@@ -17,6 +17,13 @@ struct GRAS_API TopBlock : HierBlock
virtual ~TopBlock(void);
/*!
+ * Commit changes to the global configuration.
+ * Call this after modifying the global config.
+ * Commit config is called automatically by start/stop/run.
+ */
+ void commit_config(void);
+
+ /*!
* Commit changes to the overall flow graph.
* Call this after modifying connections.
* Commit is called automatically by start/stop/run.
diff --git a/lib/block.cpp b/lib/block.cpp
index 406e33c..51b661f 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -144,6 +144,18 @@ const OutputPortConfig &Block::output_config(const size_t which_output) const
void Block::commit_config(void)
{
Theron::Actor &actor = *((*this)->block_actor);
+
+ //handle thread pool migration
+ const ThreadPool &thread_pool = this->global_config().thread_pool;
+ if (thread_pool and thread_pool != (*this)->block_actor->thread_pool)
+ {
+ boost::shared_ptr<BlockActor> old_actor = (*this)->block_actor;
+ (*this)->block_actor.reset(BlockActor::make(thread_pool));
+ (*this)->setup_actor();
+ wait_actor_idle((*this)->repr, *old_actor);
+ }
+
+ //update messages for in and out ports
for (size_t i = 0; i < (*this)->worker->get_num_inputs(); i++)
{
InputUpdateMessage message;
@@ -156,7 +168,6 @@ void Block::commit_config(void)
message.index = i;
actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
}
-
}
void Block::notify_active(void)
diff --git a/lib/block_config.cpp b/lib/block_config.cpp
index e34aa0c..227ae08 100644
--- a/lib/block_config.cpp
+++ b/lib/block_config.cpp
@@ -13,23 +13,29 @@ GlobalBlockConfig::GlobalBlockConfig(void)
void GlobalBlockConfig::merge(const GlobalBlockConfig &config)
{
- //overwrite with global config only if maxium_items is not set (zero)
+ //overwrite with config's max items only if maxium_items is not set (zero)
if (this->maximum_output_items == 0)
{
this->maximum_output_items = config.maximum_output_items;
}
- //overwrite with global node affinity setting for buffers if not set
+ //overwrite with config's node affinity setting for buffers if not set
if (this->buffer_affinity == -1)
{
this->buffer_affinity = config.buffer_affinity;
}
- //overwrite with global interruptable setting for work if not set
+ //overwrite with config's interruptable setting for work if not set
if (this->interruptible_work == false)
{
this->interruptible_work = config.interruptible_work;
}
+
+ //overwrite with config's thread pool for actor if not set
+ if (not this->thread_pool)
+ {
+ this->thread_pool = config.thread_pool;
+ }
}
InputPortConfig::InputPortConfig(void)
diff --git a/lib/element.cpp b/lib/element.cpp
index 5574c02..efd8dfd 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -94,6 +94,11 @@ GlobalBlockConfig &Element::global_config(void)
return (*this)->global_config;
}
+void Element::commit_config(void)
+{
+ //NOP -- this call gets overridden
+}
+
void Element::adopt_element(const std::string &name, const Element &child)
{
if (child->parent) throw std::invalid_argument(str(boost::format(
diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp
index 5d497de..5ca74a9 100644
--- a/lib/hier_block.cpp
+++ b/lib/hier_block.cpp
@@ -30,6 +30,7 @@ void HierBlock::commit_config(void)
{
BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor());
actor->data->block->global_config().merge((*this)->global_config);
+ actor->data->block->commit_config();
}
}
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index dd2fd5d..4892e77 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -31,8 +31,14 @@ void ElementImpl::top_block_cleanup(void)
this->executor->commit();
}
+void TopBlock::commit_config(void)
+{
+ HierBlock::commit_config();
+}
+
void TopBlock::commit(void)
{
+ this->commit_config();
this->start(); //ok to re-start, means update
}
diff --git a/tests/thread_pool_test.py b/tests/thread_pool_test.py
index 58a00fe..56e0640 100644
--- a/tests/thread_pool_test.py
+++ b/tests/thread_pool_test.py
@@ -33,7 +33,8 @@ class ThreadPoolTest(unittest.TestCase):
c = gras.ThreadPoolConfig()
tp = gras.ThreadPool(c)
- vec_source.set_thread_pool(tp)
+ vec_source.global_config().thread_pool = tp
+ vec_source.commit_config()
tb.connect(vec_source, vec_sink)
tb.run()