summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/block.cpp13
-rw-r--r--lib/block_config.cpp12
-rw-r--r--lib/element.cpp5
-rw-r--r--lib/hier_block.cpp1
-rw-r--r--lib/top_block.cpp6
5 files changed, 33 insertions, 4 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 406e33c..51b661f 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -144,6 +144,18 @@ const OutputPortConfig &Block::output_config(const size_t which_output) const
void Block::commit_config(void)
{
Theron::Actor &actor = *((*this)->block_actor);
+
+ //handle thread pool migration
+ const ThreadPool &thread_pool = this->global_config().thread_pool;
+ if (thread_pool and thread_pool != (*this)->block_actor->thread_pool)
+ {
+ boost::shared_ptr<BlockActor> old_actor = (*this)->block_actor;
+ (*this)->block_actor.reset(BlockActor::make(thread_pool));
+ (*this)->setup_actor();
+ wait_actor_idle((*this)->repr, *old_actor);
+ }
+
+ //update messages for in and out ports
for (size_t i = 0; i < (*this)->worker->get_num_inputs(); i++)
{
InputUpdateMessage message;
@@ -156,7 +168,6 @@ void Block::commit_config(void)
message.index = i;
actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
}
-
}
void Block::notify_active(void)
diff --git a/lib/block_config.cpp b/lib/block_config.cpp
index e34aa0c..227ae08 100644
--- a/lib/block_config.cpp
+++ b/lib/block_config.cpp
@@ -13,23 +13,29 @@ GlobalBlockConfig::GlobalBlockConfig(void)
void GlobalBlockConfig::merge(const GlobalBlockConfig &config)
{
- //overwrite with global config only if maxium_items is not set (zero)
+ //overwrite with config's max items only if maxium_items is not set (zero)
if (this->maximum_output_items == 0)
{
this->maximum_output_items = config.maximum_output_items;
}
- //overwrite with global node affinity setting for buffers if not set
+ //overwrite with config's node affinity setting for buffers if not set
if (this->buffer_affinity == -1)
{
this->buffer_affinity = config.buffer_affinity;
}
- //overwrite with global interruptable setting for work if not set
+ //overwrite with config's interruptable setting for work if not set
if (this->interruptible_work == false)
{
this->interruptible_work = config.interruptible_work;
}
+
+ //overwrite with config's thread pool for actor if not set
+ if (not this->thread_pool)
+ {
+ this->thread_pool = config.thread_pool;
+ }
}
InputPortConfig::InputPortConfig(void)
diff --git a/lib/element.cpp b/lib/element.cpp
index 5574c02..efd8dfd 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -94,6 +94,11 @@ GlobalBlockConfig &Element::global_config(void)
return (*this)->global_config;
}
+void Element::commit_config(void)
+{
+ //NOP -- this call gets overridden
+}
+
void Element::adopt_element(const std::string &name, const Element &child)
{
if (child->parent) throw std::invalid_argument(str(boost::format(
diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp
index 5d497de..5ca74a9 100644
--- a/lib/hier_block.cpp
+++ b/lib/hier_block.cpp
@@ -30,6 +30,7 @@ void HierBlock::commit_config(void)
{
BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor());
actor->data->block->global_config().merge((*this)->global_config);
+ actor->data->block->commit_config();
}
}
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index dd2fd5d..4892e77 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -31,8 +31,14 @@ void ElementImpl::top_block_cleanup(void)
this->executor->commit();
}
+void TopBlock::commit_config(void)
+{
+ HierBlock::commit_config();
+}
+
void TopBlock::commit(void)
{
+ this->commit_config();
this->start(); //ok to re-start, means update
}