diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block.cpp | 13 | ||||
-rw-r--r-- | lib/block_config.cpp | 12 | ||||
-rw-r--r-- | lib/element.cpp | 5 | ||||
-rw-r--r-- | lib/hier_block.cpp | 1 | ||||
-rw-r--r-- | lib/top_block.cpp | 6 |
5 files changed, 33 insertions, 4 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 406e33c..51b661f 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -144,6 +144,18 @@ const OutputPortConfig &Block::output_config(const size_t which_output) const void Block::commit_config(void) { Theron::Actor &actor = *((*this)->block_actor); + + //handle thread pool migration + const ThreadPool &thread_pool = this->global_config().thread_pool; + if (thread_pool and thread_pool != (*this)->block_actor->thread_pool) + { + boost::shared_ptr<BlockActor> old_actor = (*this)->block_actor; + (*this)->block_actor.reset(BlockActor::make(thread_pool)); + (*this)->setup_actor(); + wait_actor_idle((*this)->repr, *old_actor); + } + + //update messages for in and out ports for (size_t i = 0; i < (*this)->worker->get_num_inputs(); i++) { InputUpdateMessage message; @@ -156,7 +168,6 @@ void Block::commit_config(void) message.index = i; actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); } - } void Block::notify_active(void) diff --git a/lib/block_config.cpp b/lib/block_config.cpp index e34aa0c..227ae08 100644 --- a/lib/block_config.cpp +++ b/lib/block_config.cpp @@ -13,23 +13,29 @@ GlobalBlockConfig::GlobalBlockConfig(void) void GlobalBlockConfig::merge(const GlobalBlockConfig &config) { - //overwrite with global config only if maxium_items is not set (zero) + //overwrite with config's max items only if maxium_items is not set (zero) if (this->maximum_output_items == 0) { this->maximum_output_items = config.maximum_output_items; } - //overwrite with global node affinity setting for buffers if not set + //overwrite with config's node affinity setting for buffers if not set if (this->buffer_affinity == -1) { this->buffer_affinity = config.buffer_affinity; } - //overwrite with global interruptable setting for work if not set + //overwrite with config's interruptable setting for work if not set if (this->interruptible_work == false) { this->interruptible_work = config.interruptible_work; } + + //overwrite with config's thread pool for actor if not set + if (not this->thread_pool) + { + this->thread_pool = config.thread_pool; + } } InputPortConfig::InputPortConfig(void) diff --git a/lib/element.cpp b/lib/element.cpp index 5574c02..efd8dfd 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -94,6 +94,11 @@ GlobalBlockConfig &Element::global_config(void) return (*this)->global_config; } +void Element::commit_config(void) +{ + //NOP -- this call gets overridden +} + void Element::adopt_element(const std::string &name, const Element &child) { if (child->parent) throw std::invalid_argument(str(boost::format( diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp index 5d497de..5ca74a9 100644 --- a/lib/hier_block.cpp +++ b/lib/hier_block.cpp @@ -30,6 +30,7 @@ void HierBlock::commit_config(void) { BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); actor->data->block->global_config().merge((*this)->global_config); + actor->data->block->commit_config(); } } diff --git a/lib/top_block.cpp b/lib/top_block.cpp index dd2fd5d..4892e77 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -31,8 +31,14 @@ void ElementImpl::top_block_cleanup(void) this->executor->commit(); } +void TopBlock::commit_config(void) +{ + HierBlock::commit_config(); +} + void TopBlock::commit(void) { + this->commit_config(); this->start(); //ok to re-start, means update } |