diff options
author | Josh Blum | 2013-09-12 23:06:12 -0700 |
---|---|---|
committer | Josh Blum | 2013-09-12 23:06:12 -0700 |
commit | 1494e66b4b448132030c233ef75dd9210b90e9ef (patch) | |
tree | 7420af1cff1b0d31ef24fb0d366d8e6f198c4c0d | |
parent | f9c0d4c2e39aa28cc501ceb6479afc32f7849b11 (diff) | |
download | sandhi-1494e66b4b448132030c233ef75dd9210b90e9ef.tar.gz sandhi-1494e66b4b448132030c233ef75dd9210b90e9ef.tar.bz2 sandhi-1494e66b4b448132030c233ef75dd9210b90e9ef.zip |
gras: added thread pool to global config
-rw-r--r-- | include/gras/block.i | 1 | ||||
-rw-r--r-- | include/gras/block_config.hpp | 9 | ||||
-rw-r--r-- | include/gras/element.hpp | 7 | ||||
-rw-r--r-- | include/gras/element.i | 1 | ||||
-rw-r--r-- | include/gras/hier_block.hpp | 1 | ||||
-rw-r--r-- | include/gras/top_block.hpp | 7 | ||||
-rw-r--r-- | lib/block.cpp | 13 | ||||
-rw-r--r-- | lib/block_config.cpp | 12 | ||||
-rw-r--r-- | lib/element.cpp | 5 | ||||
-rw-r--r-- | lib/hier_block.cpp | 1 | ||||
-rw-r--r-- | lib/top_block.cpp | 6 | ||||
-rw-r--r-- | tests/thread_pool_test.py | 3 |
12 files changed, 60 insertions, 6 deletions
diff --git a/include/gras/block.i b/include/gras/block.i index 8daa55a..1804c34 100644 --- a/include/gras/block.i +++ b/include/gras/block.i @@ -14,7 +14,6 @@ %include <gras/tag_iter.i> %import <gras/sbuffer.i> %include <gras/buffer_queue.hpp> -%include <gras/thread_pool.hpp> %include <gras/block_config.hpp> %include <gras/block.hpp> diff --git a/include/gras/block_config.hpp b/include/gras/block_config.hpp index c90731b..17ff182 100644 --- a/include/gras/block_config.hpp +++ b/include/gras/block_config.hpp @@ -4,6 +4,7 @@ #define INCLUDED_GRAS_BLOCK_CONFIG_HPP #include <gras/gras.hpp> +#include <gras/thread_pool.hpp> #include <cstddef> namespace gras @@ -53,6 +54,14 @@ struct GRAS_API GlobalBlockConfig * Default = false. */ bool interruptible_work; + + /*! + * This member sets the thread pool for the block. + * The block's actor will migrate to the new pool. + * + * Default = null thread pool. + */ + ThreadPool thread_pool; }; //! Configuration parameters for an input port diff --git a/include/gras/element.hpp b/include/gras/element.hpp index ce5d658..84f7062 100644 --- a/include/gras/element.hpp +++ b/include/gras/element.hpp @@ -76,6 +76,13 @@ struct GRAS_API Element : Callable, boost::shared_ptr<ElementImpl> //! Get the global block config settings GlobalBlockConfig &global_config(void); + /*! + * Commit changes to the global configuration. + * Call this after modifying the global config. + * Must be call to apply changes to the global config. + */ + virtual void commit_config(void); + /******************************************************************* * identification interface ******************************************************************/ diff --git a/include/gras/element.i b/include/gras/element.i index 1380a97..e577e9d 100644 --- a/include/gras/element.i +++ b/include/gras/element.i @@ -27,6 +27,7 @@ namespace gras %include <std_string.i> %import <PMC/PMC.i> %include <gras/gras.hpp> +%include <gras/thread_pool.hpp> %include <gras/block_config.hpp> %include <gras/callable.hpp> %include <gras/element.hpp> diff --git a/include/gras/hier_block.hpp b/include/gras/hier_block.hpp index 4a020be..b2da9be 100644 --- a/include/gras/hier_block.hpp +++ b/include/gras/hier_block.hpp @@ -18,6 +18,7 @@ struct GRAS_API HierBlock : Element /*! * Commit changes to the global configuration. + * Call this after modifying the global config. * Must be call to apply changes to the global config. */ void commit_config(void); diff --git a/include/gras/top_block.hpp b/include/gras/top_block.hpp index 3dcf23f..3a1b9e5 100644 --- a/include/gras/top_block.hpp +++ b/include/gras/top_block.hpp @@ -17,6 +17,13 @@ struct GRAS_API TopBlock : HierBlock virtual ~TopBlock(void); /*! + * Commit changes to the global configuration. + * Call this after modifying the global config. + * Commit config is called automatically by start/stop/run. + */ + void commit_config(void); + + /*! * Commit changes to the overall flow graph. * Call this after modifying connections. * Commit is called automatically by start/stop/run. diff --git a/lib/block.cpp b/lib/block.cpp index 406e33c..51b661f 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -144,6 +144,18 @@ const OutputPortConfig &Block::output_config(const size_t which_output) const void Block::commit_config(void) { Theron::Actor &actor = *((*this)->block_actor); + + //handle thread pool migration + const ThreadPool &thread_pool = this->global_config().thread_pool; + if (thread_pool and thread_pool != (*this)->block_actor->thread_pool) + { + boost::shared_ptr<BlockActor> old_actor = (*this)->block_actor; + (*this)->block_actor.reset(BlockActor::make(thread_pool)); + (*this)->setup_actor(); + wait_actor_idle((*this)->repr, *old_actor); + } + + //update messages for in and out ports for (size_t i = 0; i < (*this)->worker->get_num_inputs(); i++) { InputUpdateMessage message; @@ -156,7 +168,6 @@ void Block::commit_config(void) message.index = i; actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); } - } void Block::notify_active(void) diff --git a/lib/block_config.cpp b/lib/block_config.cpp index e34aa0c..227ae08 100644 --- a/lib/block_config.cpp +++ b/lib/block_config.cpp @@ -13,23 +13,29 @@ GlobalBlockConfig::GlobalBlockConfig(void) void GlobalBlockConfig::merge(const GlobalBlockConfig &config) { - //overwrite with global config only if maxium_items is not set (zero) + //overwrite with config's max items only if maxium_items is not set (zero) if (this->maximum_output_items == 0) { this->maximum_output_items = config.maximum_output_items; } - //overwrite with global node affinity setting for buffers if not set + //overwrite with config's node affinity setting for buffers if not set if (this->buffer_affinity == -1) { this->buffer_affinity = config.buffer_affinity; } - //overwrite with global interruptable setting for work if not set + //overwrite with config's interruptable setting for work if not set if (this->interruptible_work == false) { this->interruptible_work = config.interruptible_work; } + + //overwrite with config's thread pool for actor if not set + if (not this->thread_pool) + { + this->thread_pool = config.thread_pool; + } } InputPortConfig::InputPortConfig(void) diff --git a/lib/element.cpp b/lib/element.cpp index 5574c02..efd8dfd 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -94,6 +94,11 @@ GlobalBlockConfig &Element::global_config(void) return (*this)->global_config; } +void Element::commit_config(void) +{ + //NOP -- this call gets overridden +} + void Element::adopt_element(const std::string &name, const Element &child) { if (child->parent) throw std::invalid_argument(str(boost::format( diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp index 5d497de..5ca74a9 100644 --- a/lib/hier_block.cpp +++ b/lib/hier_block.cpp @@ -30,6 +30,7 @@ void HierBlock::commit_config(void) { BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); actor->data->block->global_config().merge((*this)->global_config); + actor->data->block->commit_config(); } } diff --git a/lib/top_block.cpp b/lib/top_block.cpp index dd2fd5d..4892e77 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -31,8 +31,14 @@ void ElementImpl::top_block_cleanup(void) this->executor->commit(); } +void TopBlock::commit_config(void) +{ + HierBlock::commit_config(); +} + void TopBlock::commit(void) { + this->commit_config(); this->start(); //ok to re-start, means update } diff --git a/tests/thread_pool_test.py b/tests/thread_pool_test.py index 58a00fe..56e0640 100644 --- a/tests/thread_pool_test.py +++ b/tests/thread_pool_test.py @@ -33,7 +33,8 @@ class ThreadPoolTest(unittest.TestCase): c = gras.ThreadPoolConfig() tp = gras.ThreadPool(c) - vec_source.set_thread_pool(tp) + vec_source.global_config().thread_pool = tp + vec_source.commit_config() tb.connect(vec_source, vec_sink) tb.run() |