diff options
-rw-r--r-- | lib/block.cpp | 42 | ||||
-rw-r--r-- | lib/block_actor.cpp | 1 | ||||
-rw-r--r-- | lib/element_impl.hpp | 1 |
3 files changed, 28 insertions, 16 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 23b6f18..a7b31f7 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -32,16 +32,13 @@ Block::Block(void) Block::Block(const std::string &name): Element(name) { - (*this)->block_actor.reset(new BlockActor()); + //create non-actor containers + (*this)->block_data.reset(new BlockData()); (*this)->worker.reset(new Apology::Worker()); - (*this)->worker->set_actor((*this)->block_actor.get()); - (*this)->block_actor->worker = (*this)->worker.get(); - (*this)->block_actor->prio_token = Token::make(); - (*this)->thread_pool = (*this)->block_actor->thread_pool; //ref copy of pool - (*this)->block_actor->name = name; //for debug purposes - (*this)->block_actor->block_ptr = this; - (*this)->block_actor->data.reset(new BlockData()); - (*this)->block_data = (*this)->block_actor->data; + + //create actor and init members + (*this)->block_actor.reset(new BlockActor()); + (*this)->setup_actor(this); //setup some state variables (*this)->block_data->block_state = BLOCK_STATE_INIT; @@ -56,6 +53,16 @@ Block::~Block(void) //NOP } +void ElementImpl::setup_actor(Block *block_ptr) +{ + this->block_actor->worker = this->worker.get(); + this->block_actor->name = name; //for debug purposes + this->block_actor->block_ptr = block_ptr; + this->block_actor->data = this->block_data; + this->worker->set_actor(this->block_actor.get()); + this->thread_pool = this->block_actor->thread_pool; //ref copy of pool +} + enum block_cleanup_state_type { BLOCK_CLEANUP_WAIT, @@ -64,11 +71,11 @@ enum block_cleanup_state_type BLOCK_CLEANUP_DOTS, }; -static void wait_block_cleanup(ElementImpl &self) +static void wait_actor_idle(const std::string &repr, Theron::Actor &actor) { const boost::system_time start = boost::get_system_time(); block_cleanup_state_type state = BLOCK_CLEANUP_WAIT; - while (self.block_actor->GetNumQueuedMessages()) + while (actor.GetNumQueuedMessages()) { boost::this_thread::sleep(boost::posix_time::milliseconds(1)); switch (state) @@ -76,7 +83,7 @@ static void wait_block_cleanup(ElementImpl &self) case BLOCK_CLEANUP_WAIT: if (boost::get_system_time() > start + boost::posix_time::seconds(1)) { - std::cerr << self.repr << ", waiting for you to finish." << std::endl; + std::cerr << repr << ", waiting for you to finish." << std::endl; state = BLOCK_CLEANUP_WARN; } break; @@ -84,7 +91,7 @@ static void wait_block_cleanup(ElementImpl &self) case BLOCK_CLEANUP_WARN: if (boost::get_system_time() > start + boost::posix_time::seconds(2)) { - std::cerr << self.repr << ", give up the thread context!" << std::endl; + std::cerr << repr << ", give up the thread context!" << std::endl; state = BLOCK_CLEANUP_DAMN; } break; @@ -92,7 +99,7 @@ static void wait_block_cleanup(ElementImpl &self) case BLOCK_CLEANUP_DAMN: if (boost::get_system_time() > start + boost::posix_time::seconds(3)) { - std::cerr << self.repr << " FAIL; application will now hang..." << std::endl; + std::cerr << repr << " FAIL; application will now hang..." << std::endl; state = BLOCK_CLEANUP_DOTS; } break; @@ -105,7 +112,7 @@ static void wait_block_cleanup(ElementImpl &self) void ElementImpl::block_cleanup(void) { //wait for actor to chew through enqueued messages - wait_block_cleanup(*this); + wait_actor_idle(this->repr, *this->block_actor); //delete the actor this->block_actor.reset(); @@ -190,7 +197,10 @@ void Block::notify_topology(const size_t, const size_t) void Block::set_thread_pool(const ThreadPool &thread_pool) { - //TODO + boost::shared_ptr<BlockActor> old_actor = (*this)->block_actor; + (*this)->block_actor.reset(new BlockActor(thread_pool)); + (*this)->setup_actor(this); + wait_actor_idle((*this)->repr, *old_actor); } void Block::set_buffer_affinity(const long affinity) diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp index 6e09d41..9d417fa 100644 --- a/lib/block_actor.cpp +++ b/lib/block_actor.cpp @@ -50,6 +50,7 @@ BlockActor::BlockActor(const ThreadPool &tp): active_thread_pool.reset(); //actors hold this, now its safe to reset, weak_framework only } this->register_handlers(); + this->prio_token = Token::make(); } BlockActor::~BlockActor(void) diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 6b1bee0..6d9e551 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -48,6 +48,7 @@ struct ElementImpl boost::shared_ptr<Apology::Executor> executor; boost::shared_ptr<BlockActor> block_actor; boost::shared_ptr<BlockData> block_data; + void setup_actor(Block *block_ptr); ThreadPool thread_pool; Apology::Base *get_elem(void) const { |