summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/block.cpp42
-rw-r--r--lib/block_actor.cpp1
-rw-r--r--lib/element_impl.hpp1
3 files changed, 28 insertions, 16 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 23b6f18..a7b31f7 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -32,16 +32,13 @@ Block::Block(void)
Block::Block(const std::string &name):
Element(name)
{
- (*this)->block_actor.reset(new BlockActor());
+ //create non-actor containers
+ (*this)->block_data.reset(new BlockData());
(*this)->worker.reset(new Apology::Worker());
- (*this)->worker->set_actor((*this)->block_actor.get());
- (*this)->block_actor->worker = (*this)->worker.get();
- (*this)->block_actor->prio_token = Token::make();
- (*this)->thread_pool = (*this)->block_actor->thread_pool; //ref copy of pool
- (*this)->block_actor->name = name; //for debug purposes
- (*this)->block_actor->block_ptr = this;
- (*this)->block_actor->data.reset(new BlockData());
- (*this)->block_data = (*this)->block_actor->data;
+
+ //create actor and init members
+ (*this)->block_actor.reset(new BlockActor());
+ (*this)->setup_actor(this);
//setup some state variables
(*this)->block_data->block_state = BLOCK_STATE_INIT;
@@ -56,6 +53,16 @@ Block::~Block(void)
//NOP
}
+void ElementImpl::setup_actor(Block *block_ptr)
+{
+ this->block_actor->worker = this->worker.get();
+ this->block_actor->name = name; //for debug purposes
+ this->block_actor->block_ptr = block_ptr;
+ this->block_actor->data = this->block_data;
+ this->worker->set_actor(this->block_actor.get());
+ this->thread_pool = this->block_actor->thread_pool; //ref copy of pool
+}
+
enum block_cleanup_state_type
{
BLOCK_CLEANUP_WAIT,
@@ -64,11 +71,11 @@ enum block_cleanup_state_type
BLOCK_CLEANUP_DOTS,
};
-static void wait_block_cleanup(ElementImpl &self)
+static void wait_actor_idle(const std::string &repr, Theron::Actor &actor)
{
const boost::system_time start = boost::get_system_time();
block_cleanup_state_type state = BLOCK_CLEANUP_WAIT;
- while (self.block_actor->GetNumQueuedMessages())
+ while (actor.GetNumQueuedMessages())
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
switch (state)
@@ -76,7 +83,7 @@ static void wait_block_cleanup(ElementImpl &self)
case BLOCK_CLEANUP_WAIT:
if (boost::get_system_time() > start + boost::posix_time::seconds(1))
{
- std::cerr << self.repr << ", waiting for you to finish." << std::endl;
+ std::cerr << repr << ", waiting for you to finish." << std::endl;
state = BLOCK_CLEANUP_WARN;
}
break;
@@ -84,7 +91,7 @@ static void wait_block_cleanup(ElementImpl &self)
case BLOCK_CLEANUP_WARN:
if (boost::get_system_time() > start + boost::posix_time::seconds(2))
{
- std::cerr << self.repr << ", give up the thread context!" << std::endl;
+ std::cerr << repr << ", give up the thread context!" << std::endl;
state = BLOCK_CLEANUP_DAMN;
}
break;
@@ -92,7 +99,7 @@ static void wait_block_cleanup(ElementImpl &self)
case BLOCK_CLEANUP_DAMN:
if (boost::get_system_time() > start + boost::posix_time::seconds(3))
{
- std::cerr << self.repr << " FAIL; application will now hang..." << std::endl;
+ std::cerr << repr << " FAIL; application will now hang..." << std::endl;
state = BLOCK_CLEANUP_DOTS;
}
break;
@@ -105,7 +112,7 @@ static void wait_block_cleanup(ElementImpl &self)
void ElementImpl::block_cleanup(void)
{
//wait for actor to chew through enqueued messages
- wait_block_cleanup(*this);
+ wait_actor_idle(this->repr, *this->block_actor);
//delete the actor
this->block_actor.reset();
@@ -190,7 +197,10 @@ void Block::notify_topology(const size_t, const size_t)
void Block::set_thread_pool(const ThreadPool &thread_pool)
{
- //TODO
+ boost::shared_ptr<BlockActor> old_actor = (*this)->block_actor;
+ (*this)->block_actor.reset(new BlockActor(thread_pool));
+ (*this)->setup_actor(this);
+ wait_actor_idle((*this)->repr, *old_actor);
}
void Block::set_buffer_affinity(const long affinity)
diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp
index 6e09d41..9d417fa 100644
--- a/lib/block_actor.cpp
+++ b/lib/block_actor.cpp
@@ -50,6 +50,7 @@ BlockActor::BlockActor(const ThreadPool &tp):
active_thread_pool.reset(); //actors hold this, now its safe to reset, weak_framework only
}
this->register_handlers();
+ this->prio_token = Token::make();
}
BlockActor::~BlockActor(void)
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 6b1bee0..6d9e551 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -48,6 +48,7 @@ struct ElementImpl
boost::shared_ptr<Apology::Executor> executor;
boost::shared_ptr<BlockActor> block_actor;
boost::shared_ptr<BlockData> block_data;
+ void setup_actor(Block *block_ptr);
ThreadPool thread_pool;
Apology::Base *get_elem(void) const
{