diff options
author | Josh Blum | 2013-06-06 13:45:50 -0700 |
---|---|---|
committer | Josh Blum | 2013-06-06 13:45:50 -0700 |
commit | b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6 (patch) | |
tree | 6ce12ebd668d120823c652f8b09d055a149d70dc | |
parent | 7889847eed1e8bc003b88b0d6ad4f7904873d2ac (diff) | |
parent | 7350e18b8d5090349390f54b76a0e251b66ce619 (diff) | |
download | sandhi-b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6.tar.gz sandhi-b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6.tar.bz2 sandhi-b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6.zip |
Merge branch 'actor_migration'
m--------- | Apology | 0 | ||||
-rw-r--r-- | include/gras/block.hpp | 8 | ||||
-rw-r--r-- | include/gras/block.i | 1 | ||||
-rw-r--r-- | lib/block.cpp | 64 | ||||
-rw-r--r-- | lib/block_actor.cpp | 5 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 18 | ||||
-rw-r--r-- | lib/block_consume.cpp | 16 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 82 | ||||
-rw-r--r-- | lib/block_message.cpp | 24 | ||||
-rw-r--r-- | lib/block_produce.cpp | 42 | ||||
-rw-r--r-- | lib/block_props.cpp | 12 | ||||
-rw-r--r-- | lib/element.cpp | 4 | ||||
-rw-r--r-- | lib/element_impl.hpp | 19 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 98 | ||||
-rw-r--r-- | lib/gras_impl/block_data.hpp | 88 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 2 | ||||
-rw-r--r-- | lib/hier_block.cpp | 2 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 48 | ||||
-rw-r--r-- | lib/output_handlers.cpp | 36 | ||||
-rw-r--r-- | lib/tag_handlers.hpp | 18 | ||||
-rw-r--r-- | lib/task_done.cpp | 44 | ||||
-rw-r--r-- | lib/task_fail.cpp | 22 | ||||
-rw-r--r-- | lib/task_main.cpp | 78 | ||||
-rw-r--r-- | lib/top_block.cpp | 2 | ||||
-rw-r--r-- | lib/top_block_query.cpp | 30 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 58 | ||||
-rw-r--r-- | tests/thread_pool_test.py | 15 |
27 files changed, 455 insertions, 381 deletions
diff --git a/Apology b/Apology -Subproject 2da9fad22c9f55dced0c8467787e1a570d7ed52 +Subproject 2216c9c098ce50fc3813ad4fae53e60d94e9649 diff --git a/include/gras/block.hpp b/include/gras/block.hpp index 378358e..5a6e5e2 100644 --- a/include/gras/block.hpp +++ b/include/gras/block.hpp @@ -5,6 +5,7 @@ #include <gras/element.hpp> #include <gras/sbuffer.hpp> +#include <gras/thread_pool.hpp> #include <gras/tag_iter.hpp> #include <gras/tags.hpp> #include <gras/work_buffer.hpp> @@ -488,6 +489,13 @@ struct GRAS_API Block : Element ******************************************************************/ /*! + * Set the thread pool of this block. + * Every block is created in the default active thread pool. + * This call will migrate the block to a new specified pool. + */ + void set_thread_pool(const ThreadPool &thread_pool); + + /*! * Set if the work call should be interruptible by stop(). * Some work implementations block with the expectation of * getting a boost thread interrupt in a blocking call. diff --git a/include/gras/block.i b/include/gras/block.i index cce0d15..5c62f7e 100644 --- a/include/gras/block.i +++ b/include/gras/block.i @@ -12,6 +12,7 @@ %include <gras/tag_iter.i> %import <gras/sbuffer.i> %include <gras/buffer_queue.hpp> +%include <gras/thread_pool.hpp> %include <gras/block.hpp> //////////////////////////////////////////////////////////////////////// diff --git a/lib/block.cpp b/lib/block.cpp index 0059569..00238ba 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -32,14 +32,17 @@ Block::Block(void) Block::Block(const std::string &name): Element(name) { - (*this)->block.reset(new BlockActor()); - (*this)->block->prio_token = Token::make(); - (*this)->thread_pool = (*this)->block->thread_pool; //ref copy of pool - (*this)->block->name = name; //for debug purposes + //create non-actor containers + (*this)->block_data.reset(new BlockData()); + (*this)->block_data->block = this; + (*this)->worker.reset(new Apology::Worker()); + + //create actor and init members + (*this)->block_actor.reset(new BlockActor()); + (*this)->setup_actor(); //setup some state variables - (*this)->block->block_ptr = this; - (*this)->block->block_state = BlockActor::BLOCK_STATE_INIT; + (*this)->block_data->block_state = BLOCK_STATE_INIT; //call block methods to init stuff this->set_interruptible_work(false); @@ -51,6 +54,15 @@ Block::~Block(void) //NOP } +void ElementImpl::setup_actor(void) +{ + this->block_actor->worker = this->worker.get(); + this->block_actor->name = name; //for debug purposes + this->block_actor->data = this->block_data; + this->worker->set_actor(this->block_actor.get()); + this->thread_pool = this->block_actor->thread_pool; //ref copy of pool +} + enum block_cleanup_state_type { BLOCK_CLEANUP_WAIT, @@ -59,11 +71,11 @@ enum block_cleanup_state_type BLOCK_CLEANUP_DOTS, }; -static void wait_block_cleanup(ElementImpl &self) +static void wait_actor_idle(const std::string &repr, Theron::Actor &actor) { const boost::system_time start = boost::get_system_time(); block_cleanup_state_type state = BLOCK_CLEANUP_WAIT; - while (self.block->GetNumQueuedMessages()) + while (actor.GetNumQueuedMessages()) { boost::this_thread::sleep(boost::posix_time::milliseconds(1)); switch (state) @@ -71,7 +83,7 @@ static void wait_block_cleanup(ElementImpl &self) case BLOCK_CLEANUP_WAIT: if (boost::get_system_time() > start + boost::posix_time::seconds(1)) { - std::cerr << self.repr << ", waiting for you to finish." << std::endl; + std::cerr << repr << ", waiting for you to finish." << std::endl; state = BLOCK_CLEANUP_WARN; } break; @@ -79,7 +91,7 @@ static void wait_block_cleanup(ElementImpl &self) case BLOCK_CLEANUP_WARN: if (boost::get_system_time() > start + boost::posix_time::seconds(2)) { - std::cerr << self.repr << ", give up the thread context!" << std::endl; + std::cerr << repr << ", give up the thread context!" << std::endl; state = BLOCK_CLEANUP_DAMN; } break; @@ -87,7 +99,7 @@ static void wait_block_cleanup(ElementImpl &self) case BLOCK_CLEANUP_DAMN: if (boost::get_system_time() > start + boost::posix_time::seconds(3)) { - std::cerr << self.repr << " FAIL; application will now hang..." << std::endl; + std::cerr << repr << " FAIL; application will now hang..." << std::endl; state = BLOCK_CLEANUP_DOTS; } break; @@ -100,10 +112,10 @@ static void wait_block_cleanup(ElementImpl &self) void ElementImpl::block_cleanup(void) { //wait for actor to chew through enqueued messages - wait_block_cleanup(*this); + wait_actor_idle(this->repr, *this->block_actor); //delete the actor - this->block.reset(); + this->block_actor.reset(); //unref actor's framework this->thread_pool.reset(); //must be deleted after actor @@ -132,34 +144,34 @@ typename V::value_type &vector_get_resize(V &v, const size_t index) InputPortConfig &Block::input_config(const size_t which_input) { - return vector_get_resize((*this)->block->input_configs, which_input); + return vector_get_resize((*this)->block_data->input_configs, which_input); } const InputPortConfig &Block::input_config(const size_t which_input) const { - return vector_get_const((*this)->block->input_configs, which_input); + return vector_get_const((*this)->block_data->input_configs, which_input); } OutputPortConfig &Block::output_config(const size_t which_output) { - return vector_get_resize((*this)->block->output_configs, which_output); + return vector_get_resize((*this)->block_data->output_configs, which_output); } const OutputPortConfig &Block::output_config(const size_t which_output) const { - return vector_get_const((*this)->block->output_configs, which_output); + return vector_get_const((*this)->block_data->output_configs, which_output); } void Block::commit_config(void) { - Theron::Actor &actor = *((*this)->block); - for (size_t i = 0; i < (*this)->block->get_num_inputs(); i++) + Theron::Actor &actor = *((*this)->block_actor); + for (size_t i = 0; i < (*this)->worker->get_num_inputs(); i++) { InputUpdateMessage message; message.index = i; actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); } - for (size_t i = 0; i < (*this)->block->get_num_outputs(); i++) + for (size_t i = 0; i < (*this)->worker->get_num_outputs(); i++) { OutputUpdateMessage message; message.index = i; @@ -183,12 +195,20 @@ void Block::notify_topology(const size_t, const size_t) return; } +void Block::set_thread_pool(const ThreadPool &thread_pool) +{ + boost::shared_ptr<BlockActor> old_actor = (*this)->block_actor; + (*this)->block_actor.reset(new BlockActor(thread_pool)); + (*this)->setup_actor(); + wait_actor_idle((*this)->repr, *old_actor); +} + void Block::set_buffer_affinity(const long affinity) { - (*this)->block->buffer_affinity = affinity; + (*this)->block_data->buffer_affinity = affinity; } void Block::set_interruptible_work(const bool enb) { - (*this)->block->interruptible_work = enb; + (*this)->block_data->interruptible_work = enb; } diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp index 464167f..9d417fa 100644 --- a/lib/block_actor.cpp +++ b/lib/block_actor.cpp @@ -34,8 +34,8 @@ ThreadPool get_active_thread_pool(void) * Block actor construction - gets active framework **********************************************************************/ -BlockActor::BlockActor(void): - Apology::Worker(*get_active_thread_pool()) +BlockActor::BlockActor(const ThreadPool &tp): + Theron::Actor((tp)? *tp : *get_active_thread_pool()) { const char * gras_tpp = getenv("GRAS_TPP"); if (gras_tpp != NULL) @@ -50,6 +50,7 @@ BlockActor::BlockActor(void): active_thread_pool.reset(); //actors hold this, now its safe to reset, weak_framework only } this->register_handlers(); + this->prio_token = Token::make(); } BlockActor::~BlockActor(void) diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 0dda99a..aa97405 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -58,14 +58,14 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address MESSAGE_TRACER(); //allocate output buffers which will also wake up the task - const size_t num_outputs = this->get_num_outputs(); + const size_t num_outputs = worker->get_num_outputs(); for (size_t i = 0; i < num_outputs; i++) { const size_t bytes = recommend_length( - this->output_allocation_hints[i], - my_round_up_mult(AT_LEAST_BYTES, this->output_configs[i].item_size), - this->output_configs[i].reserve_items*this->output_configs[i].item_size, - this->output_configs[i].maximum_items*this->output_configs[i].item_size + data->output_allocation_hints[i], + my_round_up_mult(AT_LEAST_BYTES, data->output_configs[i].item_size), + data->output_configs[i].reserve_items*data->output_configs[i].item_size, + data->output_configs[i].maximum_items*data->output_configs[i].item_size ); SBufferDeleter deleter = boost::bind(&buffer_returner, this->thread_pool, this->GetAddress(), i, _1); @@ -74,11 +74,11 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address SBufferConfig config; config.memory = NULL; config.length = bytes; - config.affinity = this->buffer_affinity; + config.affinity = data->buffer_affinity; config.token = token; - BufferQueueSptr queue = block_ptr->output_buffer_allocator(i, config); - this->output_queues.set_buffer_queue(i, queue); + BufferQueueSptr queue = data->block->output_buffer_allocator(i, config); + data->output_queues.set_buffer_queue(i, queue); InputAllocMessage message; //new token for the downstream allocator @@ -87,7 +87,7 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address config.token = token; message.config = config; message.token = token; - this->post_downstream(i, message); + worker->post_downstream(i, message); } this->Send(0, from); //ACK diff --git a/lib/block_consume.cpp b/lib/block_consume.cpp index efdb07e..421b3bc 100644 --- a/lib/block_consume.cpp +++ b/lib/block_consume.cpp @@ -10,26 +10,26 @@ using namespace gras; void Block::consume(const size_t which_input, const size_t num_items) { ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative - (*this)->block->consume(which_input, num_items); + (*this)->block_actor->consume(which_input, num_items); } void Block::consume(const size_t num_items) { - const size_t num_inputs = (*this)->block->get_num_inputs(); + const size_t num_inputs = (*this)->worker->get_num_inputs(); for (size_t i = 0; i < num_inputs; i++) { - (*this)->block->consume(i, num_items); + (*this)->block_actor->consume(i, num_items); } } item_index_t Block::get_consumed(const size_t which_input) { - return (*this)->block->stats.items_consumed[which_input]; + return (*this)->block_data->stats.items_consumed[which_input]; } SBuffer Block::get_input_buffer(const size_t which_input) const { - return (*this)->block->input_queues.front(which_input); + return (*this)->block_data->input_queues.front(which_input); } GRAS_FORCE_INLINE void BlockActor::consume(const size_t i, const size_t items) @@ -37,8 +37,8 @@ GRAS_FORCE_INLINE void BlockActor::consume(const size_t i, const size_t items) #ifdef ITEM_CONSPROD std::cerr << name << " consume " << items << std::endl; #endif - this->stats.items_consumed[i] += items; - const size_t bytes = items*this->input_configs[i].item_size; - this->input_queues.consume(i, bytes); + data->stats.items_consumed[i] += items; + const size_t bytes = items*data->input_configs[i].item_size; + data->input_queues.consume(i, bytes); this->trim_tags(i); } diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index e0ee67a..c52d974 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -13,12 +13,12 @@ void BlockActor::handle_top_active( ){ MESSAGE_TRACER(); - if (this->block_state != BLOCK_STATE_LIVE) + if (data->block_state != BLOCK_STATE_LIVE) { - this->block_ptr->notify_active(); - this->stats.start_time = time_now(); + data->block->notify_active(); + data->stats.start_time = time_now(); } - this->block_state = BLOCK_STATE_LIVE; + data->block_state = BLOCK_STATE_LIVE; this->Send(0, from); //ACK @@ -43,35 +43,35 @@ void BlockActor::handle_top_token( MESSAGE_TRACER(); //create input tokens and send allocation hints - for (size_t i = 0; i < this->get_num_inputs(); i++) + for (size_t i = 0; i < worker->get_num_inputs(); i++) { - this->input_tokens[i] = Token::make(); - this->inputs_done.reset(i); + data->input_tokens[i] = Token::make(); + data->inputs_done.reset(i); OutputTokenMessage token_msg; - token_msg.token = this->input_tokens[i]; - this->post_upstream(i, token_msg); + token_msg.token = data->input_tokens[i]; + worker->post_upstream(i, token_msg); //TODO, schedule this message as a pre-allocation message //tell the upstream about the input requirements OutputHintMessage output_hints; - output_hints.reserve_bytes = this->input_configs[i].reserve_items*this->input_configs[i].item_size; - output_hints.token = this->input_tokens[i]; - this->post_upstream(i, output_hints); + output_hints.reserve_bytes = data->input_configs[i].reserve_items*data->input_configs[i].item_size; + output_hints.token = data->input_tokens[i]; + worker->post_upstream(i, output_hints); } //create output token - for (size_t i = 0; i < this->get_num_outputs(); i++) + for (size_t i = 0; i < worker->get_num_outputs(); i++) { - this->output_tokens[i] = Token::make(); - this->outputs_done.reset(i); + data->output_tokens[i] = Token::make(); + data->outputs_done.reset(i); InputTokenMessage token_msg; - token_msg.token = this->output_tokens[i]; - this->post_downstream(i, token_msg); + token_msg.token = data->output_tokens[i]; + worker->post_downstream(i, token_msg); } //store a token to the top level topology - this->token_pool.insert(message.token); + data->token_pool.insert(message.token); this->Send(0, from); //ACK } @@ -83,18 +83,18 @@ void BlockActor::handle_top_config( MESSAGE_TRACER(); //overwrite with global config only if maxium_items is not set (zero) - for (size_t i = 0; i < this->output_configs.size(); i++) + for (size_t i = 0; i < data->output_configs.size(); i++) { - if (this->output_configs[i].maximum_items == 0) + if (data->output_configs[i].maximum_items == 0) { - this->output_configs[i].maximum_items = message.maximum_output_items; + data->output_configs[i].maximum_items = message.maximum_output_items; } } //overwrite with global node affinity setting for buffers if not set - if (this->buffer_affinity == -1) + if (data->buffer_affinity == -1) { - this->buffer_affinity = message.buffer_affinity; + data->buffer_affinity = message.buffer_affinity; } this->Send(0, from); //ACK @@ -109,12 +109,12 @@ void BlockActor::handle_top_thread_group( //store the topology's thread group //erase any potentially old lingering threads //spawn a new thread if this block is a source - this->thread_group = message; - this->interruptible_thread.reset(); //erase old one - if (this->interruptible_work) + data->thread_group = message; + data->interruptible_thread.reset(); //erase old one + if (data->interruptible_work) { - this->interruptible_thread = boost::make_shared<InterruptibleThread>( - this->thread_group, boost::bind(&BlockActor::task_work, this) + data->interruptible_thread = boost::make_shared<InterruptibleThread>( + data->thread_group, boost::bind(&BlockActor::task_work, this) ); } @@ -137,25 +137,25 @@ void BlockActor::handle_get_stats( //instantaneous states we update here, //and not interleaved with the rest of the code - const size_t num_inputs = this->get_num_inputs(); - this->stats.items_enqueued.resize(num_inputs); - this->stats.tags_enqueued.resize(num_inputs); - this->stats.msgs_enqueued.resize(num_inputs); + const size_t num_inputs = worker->get_num_inputs(); + data->stats.items_enqueued.resize(num_inputs); + data->stats.tags_enqueued.resize(num_inputs); + data->stats.msgs_enqueued.resize(num_inputs); for (size_t i = 0; i < num_inputs; i++) { - this->stats.items_enqueued[i] = this->input_queues.get_items_enqueued(i); - this->stats.tags_enqueued[i] = this->input_tags[i].size(); - this->stats.msgs_enqueued[i] = this->input_msgs[i].size(); + data->stats.items_enqueued[i] = data->input_queues.get_items_enqueued(i); + data->stats.tags_enqueued[i] = data->input_tags[i].size(); + data->stats.msgs_enqueued[i] = data->input_msgs[i].size(); } - this->stats.actor_queue_depth = this->GetNumQueuedMessages(); - this->stats.bytes_copied = this->input_queues.bytes_copied; - this->stats.inputs_idle = this->input_queues.total_idle_times; - this->stats.outputs_idle = this->output_queues.total_idle_times; + data->stats.actor_queue_depth = this->GetNumQueuedMessages(); + data->stats.bytes_copied = data->input_queues.bytes_copied; + data->stats.inputs_idle = data->input_queues.total_idle_times; + data->stats.outputs_idle = data->output_queues.total_idle_times; //create the message reply object GetStatsMessage message; - message.block_id = this->block_ptr->get_uid(); - message.stats = this->stats; + message.block_id = data->block->get_uid(); + message.stats = data->stats; message.stats_time = time_now(); this->Send(message, from); //ACK diff --git a/lib/block_message.cpp b/lib/block_message.cpp index ef7dda6..08a3784 100644 --- a/lib/block_message.cpp +++ b/lib/block_message.cpp @@ -8,35 +8,35 @@ using namespace gras; void Block::post_output_tag(const size_t which_output, const Tag &tag) { - (*this)->block->stats.tags_produced[which_output]++; - (*this)->block->post_downstream(which_output, InputTagMessage(tag)); + (*this)->block_data->stats.tags_produced[which_output]++; + (*this)->worker->post_downstream(which_output, InputTagMessage(tag)); } void Block::_post_output_msg(const size_t which_output, const PMCC &msg) { - (*this)->block->stats.msgs_produced[which_output]++; - (*this)->block->post_downstream(which_output, InputMsgMessage(msg)); + (*this)->block_data->stats.msgs_produced[which_output]++; + (*this)->worker->post_downstream(which_output, InputMsgMessage(msg)); } TagIter Block::get_input_tags(const size_t which_input) { - const std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input]; + const std::vector<Tag> &input_tags = (*this)->block_data->input_tags[which_input]; return TagIter(input_tags.begin(), input_tags.end()); } PMCC Block::pop_input_msg(const size_t which_input) { - std::vector<PMCC> &input_msgs = (*this)->block->input_msgs[which_input]; - size_t &num_read = (*this)->block->num_input_msgs_read[which_input]; + std::vector<PMCC> &input_msgs = (*this)->block_data->input_msgs[which_input]; + size_t &num_read = (*this)->block_data->num_input_msgs_read[which_input]; if (num_read >= input_msgs.size()) return PMCC(); PMCC p = input_msgs[num_read++]; - (*this)->block->stats.msgs_consumed[which_input]++; + (*this)->block_data->stats.msgs_consumed[which_input]++; return p; } void Block::propagate_tags(const size_t i, const TagIter &iter) { - const size_t num_outputs = (*this)->block->get_num_outputs(); + const size_t num_outputs = (*this)->worker->get_num_outputs(); for (size_t o = 0; o < num_outputs; o++) { BOOST_FOREACH(gras::Tag t, iter) @@ -52,7 +52,7 @@ void Block::post_input_tag(const size_t which_input, const Tag &tag) { InputTagMessage message(tag); message.index = which_input; - Theron::Actor &actor = *((*this)->block); + Theron::Actor &actor = *((*this)->block_actor); actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); } @@ -60,7 +60,7 @@ void Block::_post_input_msg(const size_t which_input, const PMCC &msg) { InputMsgMessage message(msg); message.index = which_input; - Theron::Actor &actor = *((*this)->block); + Theron::Actor &actor = *((*this)->block_actor); actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); } @@ -69,6 +69,6 @@ void Block::post_input_buffer(const size_t which_input, const SBuffer &buffer) InputBufferMessage message; message.index = which_input; message.buffer = buffer; - Theron::Actor &actor = *((*this)->block); + Theron::Actor &actor = *((*this)->block_actor); actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); } diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp index 584d3a3..7133321 100644 --- a/lib/block_produce.cpp +++ b/lib/block_produce.cpp @@ -9,26 +9,26 @@ using namespace gras; void Block::produce(const size_t which_output, const size_t num_items) { ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative - (*this)->block->produce(which_output, num_items); + (*this)->block_actor->produce(which_output, num_items); } void Block::produce(const size_t num_items) { - const size_t num_outputs = (*this)->block->get_num_outputs(); + const size_t num_outputs = (*this)->worker->get_num_outputs(); for (size_t o = 0; o < num_outputs; o++) { - (*this)->block->produce(o, num_items); + (*this)->block_actor->produce(o, num_items); } } item_index_t Block::get_produced(const size_t which_output) { - return (*this)->block->stats.items_produced[which_output]; + return (*this)->block_data->stats.items_produced[which_output]; } SBuffer Block::get_output_buffer(const size_t which_output) const { - SBuffer &buff = (*this)->block->output_queues.front(which_output); + SBuffer &buff = (*this)->block_data->output_queues.front(which_output); //increment length to auto pop full buffer size, //when user doesnt call pop_output_buffer() buff.length = buff.get_actual_length(); @@ -37,12 +37,19 @@ SBuffer Block::get_output_buffer(const size_t which_output) const void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes) { - (*this)->block->output_queues.front(which_output).length = num_bytes; + (*this)->block_data->output_queues.front(which_output).length = num_bytes; } void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer) { - (*this)->block->produce_buffer(which_output, buffer); + boost::shared_ptr<BlockData> &data = (*this)->block_data; + data->output_queues.consume(which_output); + ASSERT((buffer.length % data->output_configs[which_output].item_size) == 0); + const size_t items = buffer.length/data->output_configs[which_output].item_size; + data->stats.items_produced[which_output] += items; + InputBufferMessage buff_msg; + buff_msg.buffer = buffer; + (*this)->worker->post_downstream(which_output, buff_msg); } GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items) @@ -50,21 +57,10 @@ GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items) #ifdef ITEM_CONSPROD std::cerr << name << " produce " << items << std::endl; #endif - SBuffer &buff = this->output_queues.front(i); - ASSERT((buff.length % output_configs[i].item_size) == 0); - this->stats.items_produced[i] += items; - const size_t bytes = items*this->output_configs[i].item_size; + SBuffer &buff = data->output_queues.front(i); + ASSERT((buff.length % data->output_configs[i].item_size) == 0); + data->stats.items_produced[i] += items; + const size_t bytes = items*data->output_configs[i].item_size; buff.length += bytes; - this->produce_outputs[i] = true; -} - -GRAS_FORCE_INLINE void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) -{ - this->output_queues.consume(i); - ASSERT((buffer.length % output_configs[i].item_size) == 0); - const size_t items = buffer.length/output_configs[i].item_size; - this->stats.items_produced[i] += items; - InputBufferMessage buff_msg; - buff_msg.buffer = buffer; - this->post_downstream(i, buff_msg); + data->produce_outputs[i] = true; } diff --git a/lib/block_props.cpp b/lib/block_props.cpp index b3456ac..a85d948 100644 --- a/lib/block_props.cpp +++ b/lib/block_props.cpp @@ -24,7 +24,7 @@ void BlockActor::handle_prop_access( //call into the handler overload to do the property access try { - reply.value = block_ptr->_handle_prop_access(message.key, message.value, message.set); + reply.value = data->block->_handle_prop_access(message.key, message.value, message.set); } catch (const std::exception &e) { @@ -45,7 +45,7 @@ void BlockActor::handle_prop_access( PMCC Block::_handle_prop_access(const std::string &key, const PMCC &value, const bool set) { - const PropertyRegistryPair &pair = (*this)->block->property_registry[key]; + const PropertyRegistryPair &pair = (*this)->block_data->property_registry[key]; PropertyRegistrySptr pr = (set)? pair.setter : pair.getter; if (not pr) throw std::invalid_argument("no property registered for key: " + key); if (set) @@ -96,20 +96,20 @@ PMCC BlockActor::prop_access_dispatcher(const std::string &key, const PMCC &valu void Block::_register_getter(const std::string &key, void *pr) { - (*this)->block->property_registry[key].getter.reset(reinterpret_cast<PropertyRegistry *>(pr)); + (*this)->block_data->property_registry[key].getter.reset(reinterpret_cast<PropertyRegistry *>(pr)); } void Block::_register_setter(const std::string &key, void *pr) { - (*this)->block->property_registry[key].setter.reset(reinterpret_cast<PropertyRegistry *>(pr)); + (*this)->block_data->property_registry[key].setter.reset(reinterpret_cast<PropertyRegistry *>(pr)); } void Block::_set_property(const std::string &key, const PMCC &value) { - (*this)->block->prop_access_dispatcher(key, value, true); + (*this)->block_actor->prop_access_dispatcher(key, value, true); } PMCC Block::_get_property(const std::string &key) { - return (*this)->block->prop_access_dispatcher(key, PMCC(), false); + return (*this)->block_actor->prop_access_dispatcher(key, PMCC(), false); } diff --git a/lib/element.cpp b/lib/element.cpp index c06d96d..4b73bd4 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -61,7 +61,7 @@ ElementImpl::~ElementImpl(void) { if (this->executor) this->top_block_cleanup(); if (this->topology) this->hier_block_cleanup(); - if (this->block) this->block_cleanup(); + if (this->worker) this->block_cleanup(); } void Element::set_container(WeakContainer *container) @@ -138,5 +138,5 @@ Block *Element::locate_block(const std::string &path) } //return block ptr as result - return elem->block->block_ptr; + return elem->block_data->block; } diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 477b478..23f7127 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -4,6 +4,7 @@ #define INCLUDED_LIBGRAS_ELEMENT_IMPL_HPP #include <gras_impl/block_actor.hpp> +#include <Apology/Worker.hpp> #include <Apology/Topology.hpp> #include <Apology/Executor.hpp> #include <gras/element.hpp> @@ -18,6 +19,8 @@ namespace gras struct ElementImpl { + //setup stuff + void setup_actor(void); //deconstructor stuff ~ElementImpl(void); @@ -42,25 +45,29 @@ struct ElementImpl std::map<std::string, Element> children; //things may be in this element + boost::shared_ptr<Apology::Worker> worker; boost::shared_ptr<Apology::Topology> topology; boost::shared_ptr<Apology::Executor> executor; - boost::shared_ptr<BlockActor> block; + boost::shared_ptr<BlockActor> block_actor; + boost::shared_ptr<BlockData> block_data; ThreadPool thread_pool; Apology::Base *get_elem(void) const { - if (block) return block.get(); - return topology.get(); + if (worker) return worker.get(); + if (topology) return topology.get(); + else throw std::runtime_error("ElementImpl::get_elem fail"); } template <typename MessageType> void bcast_prio_msg(const MessageType &msg) { Theron::Receiver receiver; - BOOST_FOREACH(Apology::Worker *worker, this->executor->get_workers()) + BOOST_FOREACH(Apology::Worker *w, this->executor->get_workers()) { + BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); MessageType message = msg; - message.prio_token = dynamic_cast<BlockActor *>(worker)->prio_token; - worker->GetFramework().Send(message, receiver.GetAddress(), worker->GetAddress()); + message.prio_token = actor->prio_token; + actor->GetFramework().Send(message, receiver.GetAddress(), actor->GetAddress()); } size_t outstandingCount(this->executor->get_workers().size()); while (outstandingCount != 0) diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index a7ed6c5..90e96bb 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -4,40 +4,25 @@ #define INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP #include <gras_impl/debug.hpp> -#include <gras_impl/bitset.hpp> -#include <gras/gras.hpp> #include <gras/block.hpp> #include <gras/top_block.hpp> #include <gras/thread_pool.hpp> #include <Apology/Worker.hpp> -#include <gras_impl/token.hpp> -#include <gras_impl/stats.hpp> #include <gras_impl/messages.hpp> -#include <gras_impl/output_buffer_queues.hpp> -#include <gras_impl/input_buffer_queues.hpp> -#include <gras_impl/interruptible_thread.hpp> -#include <vector> -#include <set> -#include <map> +#include <gras_impl/block_data.hpp> namespace gras { -typedef boost::shared_ptr<PropertyRegistry> PropertyRegistrySptr; -struct PropertyRegistryPair +struct BlockActor : Theron::Actor { - PropertyRegistrySptr setter; - PropertyRegistrySptr getter; -}; - -struct BlockActor : Apology::Worker -{ - BlockActor(void); + BlockActor(const ThreadPool &tp = ThreadPool()); ~BlockActor(void); - Block *block_ptr; std::string name; //for debug ThreadPool thread_pool; Token prio_token; + boost::shared_ptr<BlockData> data; + Apology::Worker *worker; //do it here so we can match w/ the handler declarations void register_handlers(void) @@ -110,68 +95,19 @@ struct BlockActor : Apology::Worker void trim_msgs(const size_t index); void produce(const size_t index, const size_t items); void consume(const size_t index, const size_t items); - void produce_buffer(const size_t index, const SBuffer &buffer); void task_kicker(void); void update_input_avail(const size_t index); bool is_input_done(const size_t index); bool is_work_allowed(void); - //per port properties - std::vector<InputPortConfig> input_configs; - std::vector<OutputPortConfig> output_configs; - - //work buffers for the new work interface - Block::InputItems input_items; - Block::OutputItems output_items; - - //track the subscriber counts - std::vector<Token> input_tokens; - std::vector<Token> output_tokens; - BitSet inputs_done; - BitSet outputs_done; - std::set<Token> token_pool; - - //buffer queues and ready conditions - InputBufferQueues input_queues; - OutputBufferQueues output_queues; - std::vector<bool> produce_outputs; - BitSet inputs_available; - std::vector<time_ticks_t> time_input_not_ready; - std::vector<time_ticks_t> time_output_not_ready; - - //tag and msg tracking - std::vector<bool> input_tags_changed; - std::vector<std::vector<Tag> > input_tags; - std::vector<size_t> num_input_msgs_read; - std::vector<std::vector<PMCC> > input_msgs; - - //interruptible thread stuff - bool interruptible_work; - SharedThreadGroup thread_group; - boost::shared_ptr<InterruptibleThread> interruptible_thread; - //work helpers inline void task_work(void) { - block_ptr->work(this->input_items, this->output_items); + data->block->work(data->input_items, data->output_items); } - //is the fg running? - enum - { - BLOCK_STATE_INIT, - BLOCK_STATE_LIVE, - BLOCK_STATE_DONE, - } block_state; - long buffer_affinity; - - std::vector<std::vector<OutputHintMessage> > output_allocation_hints; - //property stuff - std::map<std::string, PropertyRegistryPair> property_registry; PMCC prop_access_dispatcher(const std::string &key, const PMCC &value, const bool set); - - BlockStats stats; }; //-------------- common functions from this BlockActor class ---------// @@ -183,27 +119,27 @@ GRAS_FORCE_INLINE void BlockActor::task_kicker(void) GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i) { - const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i); - const bool has_input_msgs = not this->input_msgs[i].empty(); - this->inputs_available.set(i, has_input_bufs or has_input_msgs); - this->input_queues.update_has_msg(i, has_input_msgs); + const bool has_input_bufs = not data->input_queues.empty(i) and data->input_queues.ready(i); + const bool has_input_msgs = not data->input_msgs[i].empty(); + data->inputs_available.set(i, has_input_bufs or has_input_msgs); + data->input_queues.update_has_msg(i, has_input_msgs); } GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i) { - const bool force_done = this->input_configs[i].force_done; - if GRAS_LIKELY(force_done) return this->inputs_done[i] and not this->inputs_available[i]; - return this->inputs_done.all() and this->inputs_available.none(); + const bool force_done = data->input_configs[i].force_done; + if GRAS_LIKELY(force_done) return data->inputs_done[i] and not data->inputs_available[i]; + return data->inputs_done.all() and data->inputs_available.none(); } GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void) { return ( this->prio_token.unique() and - this->block_state == BLOCK_STATE_LIVE and - this->inputs_available.any() and - this->input_queues.all_ready() and - this->output_queues.all_ready() + data->block_state == BLOCK_STATE_LIVE and + data->inputs_available.any() and + data->input_queues.all_ready() and + data->output_queues.all_ready() ); } diff --git a/lib/gras_impl/block_data.hpp b/lib/gras_impl/block_data.hpp new file mode 100644 index 0000000..4b6e8de --- /dev/null +++ b/lib/gras_impl/block_data.hpp @@ -0,0 +1,88 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#ifndef INCLUDED_LIBGRAS_IMPL_BLOCK_DATA_HPP +#define INCLUDED_LIBGRAS_IMPL_BLOCK_DATA_HPP + +#include <gras/block.hpp> +#include <gras_impl/debug.hpp> +#include <gras_impl/bitset.hpp> +#include <gras_impl/token.hpp> +#include <gras_impl/stats.hpp> +#include <gras_impl/output_buffer_queues.hpp> +#include <gras_impl/input_buffer_queues.hpp> +#include <gras_impl/interruptible_thread.hpp> +#include <vector> +#include <set> +#include <map> + +namespace gras +{ + +typedef boost::shared_ptr<PropertyRegistry> PropertyRegistrySptr; +struct PropertyRegistryPair +{ + PropertyRegistrySptr setter; + PropertyRegistrySptr getter; +}; + +enum BlockState +{ + BLOCK_STATE_INIT, + BLOCK_STATE_LIVE, + BLOCK_STATE_DONE, +}; + +struct BlockData +{ + //block pointer to call into parent + Block *block; + + //per port properties + std::vector<InputPortConfig> input_configs; + std::vector<OutputPortConfig> output_configs; + + //work buffers for the new work interface + Block::InputItems input_items; + Block::OutputItems output_items; + + //track the subscriber counts + std::vector<Token> input_tokens; + std::vector<Token> output_tokens; + BitSet inputs_done; + BitSet outputs_done; + std::set<Token> token_pool; + + //buffer queues and ready conditions + InputBufferQueues input_queues; + OutputBufferQueues output_queues; + std::vector<bool> produce_outputs; + BitSet inputs_available; + std::vector<time_ticks_t> time_input_not_ready; + std::vector<time_ticks_t> time_output_not_ready; + + //tag and msg tracking + std::vector<bool> input_tags_changed; + std::vector<std::vector<Tag> > input_tags; + std::vector<size_t> num_input_msgs_read; + std::vector<std::vector<PMCC> > input_msgs; + + //interruptible thread stuff + bool interruptible_work; + SharedThreadGroup thread_group; + boost::shared_ptr<InterruptibleThread> interruptible_thread; + + //is the fg running? + BlockState block_state; + long buffer_affinity; + + std::vector<std::vector<OutputHintMessage> > output_allocation_hints; + + //property stuff + std::map<std::string, PropertyRegistryPair> property_registry; + + BlockStats stats; +}; + +} //namespace gras + +#endif /*INCLUDED_LIBGRAS_IMPL_BLOCK_DATA_HPP*/ diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index 04480d8..eb09090 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -48,7 +48,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc); #define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; #ifdef MESSAGE_TRACING -#define MESSAGE_TRACER() std::cerr << block_ptr->to_string() << " in " << BOOST_CURRENT_FUNCTION << std::endl << std::flush; +#define MESSAGE_TRACER() std::cerr << name << " in " << BOOST_CURRENT_FUNCTION << std::endl << std::flush; #else #define MESSAGE_TRACER() #endif diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp index 758af0c..abbdec1 100644 --- a/lib/hier_block.cpp +++ b/lib/hier_block.cpp @@ -15,7 +15,7 @@ HierBlock::HierBlock(void) HierBlock::HierBlock(const std::string &name): Element(name) { - (*this)->topology = boost::shared_ptr<Apology::Topology>(new Apology::Topology()); + (*this)->topology.reset(new Apology::Topology()); } HierBlock::~HierBlock(void) diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index faddda8..89e1d34 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -7,25 +7,25 @@ using namespace gras; void BlockActor::handle_input_tag(const InputTagMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //handle incoming stream tag, push into the tag storage - if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; - this->input_tags[index].push_back(message.tag); - this->input_tags_changed[index] = true; + if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return; + data->input_tags[index].push_back(message.tag); + data->input_tags_changed[index] = true; } void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //handle incoming async message, push into the msg storage - if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; - this->input_msgs[index].push_back(message.msg); + if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return; + data->input_msgs[index].push_back(message.msg); this->update_input_avail(index); ta.done(); @@ -34,13 +34,13 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron:: void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //handle incoming stream buffer, push into the queue - if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; - this->input_queues.push(index, message.buffer); + if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return; + data->input_queues.push(index, message.buffer); this->update_input_avail(index); ta.done(); @@ -49,22 +49,22 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th void BlockActor::handle_input_token(const InputTokenMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); - ASSERT(message.index < this->get_num_inputs()); + ASSERT(message.index < worker->get_num_inputs()); //store the token of the upstream producer - this->token_pool.insert(message.token); + data->token_pool.insert(message.token); } void BlockActor::handle_input_check(const InputCheckMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //an upstream block declared itself done, recheck the token - this->inputs_done.set(index, this->input_tokens[index].unique()); + data->inputs_done.set(index, data->input_tokens[index].unique()); //upstream done, give it one more attempt at task handling ta.done(); @@ -76,26 +76,26 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //handle the upstream block allocation request OutputAllocMessage new_msg; - new_msg.queue = block_ptr->input_buffer_allocator(index, message.config); - if (new_msg.queue) this->post_upstream(index, new_msg); + new_msg.queue = data->block->input_buffer_allocator(index, message.config); + if (new_msg.queue) worker->post_upstream(index, new_msg); } void BlockActor::handle_input_update(const InputUpdateMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t i = message.index; //update buffer queue configuration - if (i >= this->input_queues.size()) return; - const size_t preload_bytes = this->input_configs[i].item_size*this->input_configs[i].preload_items; - const size_t reserve_bytes = this->input_configs[i].item_size*this->input_configs[i].reserve_items; - const size_t maximum_bytes = this->input_configs[i].item_size*this->input_configs[i].maximum_items; - this->input_queues.update_config(i, this->input_configs[i].item_size, preload_bytes, reserve_bytes, maximum_bytes); + if (i >= data->input_queues.size()) return; + const size_t preload_bytes = data->input_configs[i].item_size*data->input_configs[i].preload_items; + const size_t reserve_bytes = data->input_configs[i].item_size*data->input_configs[i].reserve_items; + const size_t maximum_bytes = data->input_configs[i].item_size*data->input_configs[i].maximum_items; + data->input_queues.update_config(i, data->input_configs[i].item_size, preload_bytes, reserve_bytes, maximum_bytes); } diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp index d937bbf..42dfe39 100644 --- a/lib/output_handlers.cpp +++ b/lib/output_handlers.cpp @@ -8,14 +8,14 @@ using namespace gras; void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t index = message.index; //a buffer has returned from the downstream //(all interested consumers have finished with it) - if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; - this->output_queues.push(index, message.buffer); + if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return; + data->output_queues.push(index, message.buffer); ta.done(); this->task_main(); @@ -23,23 +23,23 @@ void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const void BlockActor::handle_output_token(const OutputTokenMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); - ASSERT(message.index < this->get_num_outputs()); + ASSERT(message.index < worker->get_num_outputs()); //store the token of the downstream consumer - this->token_pool.insert(message.token); + data->token_pool.insert(message.token); } void BlockActor::handle_output_check(const OutputCheckMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t index = message.index; //a downstream block has declared itself done, recheck the token - this->outputs_done.set(index, this->output_tokens[index].unique()); - if (this->outputs_done.all()) //no downstream subscribers? + data->outputs_done.set(index, data->output_tokens[index].unique()); + if (data->outputs_done.all()) //no downstream subscribers? { this->mark_done(); } @@ -47,7 +47,7 @@ void BlockActor::handle_output_check(const OutputCheckMessage &message, const Th void BlockActor::handle_output_hint(const OutputHintMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t index = message.index; @@ -57,7 +57,7 @@ void BlockActor::handle_output_hint(const OutputHintMessage &message, const Ther //remove any old hints with expired token //remove any older hints with matching token std::vector<OutputHintMessage> hints; - BOOST_FOREACH(const OutputHintMessage &hint, this->output_allocation_hints[index]) + BOOST_FOREACH(const OutputHintMessage &hint, data->output_allocation_hints[index]) { if (hint.token.expired()) continue; if (hint.token.lock() == message.token.lock()) continue; @@ -67,27 +67,27 @@ void BlockActor::handle_output_hint(const OutputHintMessage &message, const Ther //store the new hint as well hints.push_back(message); - this->output_allocation_hints[index] = hints; + data->output_allocation_hints[index] = hints; } void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t index = message.index; //return of a positive downstream allocation - this->output_queues.set_buffer_queue(index, message.queue); + data->output_queues.set_buffer_queue(index, message.queue); } void BlockActor::handle_output_update(const OutputUpdateMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t i = message.index; //update buffer queue configuration - if (i >= this->output_queues.size()) return; - const size_t reserve_bytes = this->output_configs[i].item_size*this->output_configs[i].reserve_items; - this->output_queues.set_reserve_bytes(i, reserve_bytes); + if (i >= data->output_queues.size()) return; + const size_t reserve_bytes = data->output_configs[i].item_size*data->output_configs[i].reserve_items; + data->output_queues.set_reserve_bytes(i, reserve_bytes); } diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp index 8d504dd..a52b70e 100644 --- a/lib/tag_handlers.hpp +++ b/lib/tag_handlers.hpp @@ -11,10 +11,10 @@ namespace gras GRAS_FORCE_INLINE void BlockActor::sort_tags(const size_t i) { - if GRAS_LIKELY(not this->input_tags_changed[i]) return; - std::vector<Tag> &tags_i = this->input_tags[i]; + if GRAS_LIKELY(not data->input_tags_changed[i]) return; + std::vector<Tag> &tags_i = data->input_tags[i]; std::sort(tags_i.begin(), tags_i.end()); - this->input_tags_changed[i] = false; + data->input_tags_changed[i] = false; } GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) @@ -24,8 +24,8 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) //-- and post trimmed tags to the downstream based on policy //------------------------------------------------------------------ - std::vector<Tag> &tags_i = this->input_tags[i]; - const item_index_t items_consumed_i = this->stats.items_consumed[i]; + std::vector<Tag> &tags_i = data->input_tags[i]; + const item_index_t items_consumed_i = data->stats.items_consumed[i]; size_t last = 0; while (last < tags_i.size() and tags_i[last].offset < items_consumed_i) { @@ -35,19 +35,19 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) if GRAS_LIKELY(last == 0) return; //call the overloaded propagate_tags to do the dirty work - this->block_ptr->propagate_tags(i, TagIter(tags_i.begin(), tags_i.begin()+last)); + data->block->propagate_tags(i, TagIter(tags_i.begin(), tags_i.begin()+last)); //now its safe to perform the erasure tags_i.erase(tags_i.begin(), tags_i.begin()+last); - this->stats.tags_consumed[i] += last; + data->stats.tags_consumed[i] += last; } GRAS_FORCE_INLINE void BlockActor::trim_msgs(const size_t i) { - const size_t num_read = this->num_input_msgs_read[i]; + const size_t num_read = data->num_input_msgs_read[i]; if GRAS_UNLIKELY(num_read > 0) { - std::vector<PMCC> &input_msgs = this->input_msgs[i]; + std::vector<PMCC> &input_msgs = data->input_msgs[i]; input_msgs.erase(input_msgs.begin(), input_msgs.begin()+num_read); } } diff --git a/lib/task_done.cpp b/lib/task_done.cpp index d116202..999fd2c 100644 --- a/lib/task_done.cpp +++ b/lib/task_done.cpp @@ -7,62 +7,62 @@ using namespace gras; void Block::mark_done(void) { - (*this)->block->mark_done(); + (*this)->block_actor->mark_done(); } void BlockActor::mark_done(void) { - if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first + if (data->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first - this->stats.stop_time = time_now(); - this->block_ptr->notify_inactive(); + data->stats.stop_time = time_now(); + data->block->notify_inactive(); //flush partial output buffers to the downstream - for (size_t i = 0; i < this->get_num_outputs(); i++) + for (size_t i = 0; i < worker->get_num_outputs(); i++) { - if (not this->output_queues.ready(i)) continue; - SBuffer &buff = this->output_queues.front(i); + if (not data->output_queues.ready(i)) continue; + SBuffer &buff = data->output_queues.front(i); if (buff.length == 0) continue; InputBufferMessage buff_msg; buff_msg.buffer = buff; - this->post_downstream(i, buff_msg); - this->output_queues.pop(i); + worker->post_downstream(i, buff_msg); + data->output_queues.pop(i); } - this->interruptible_thread.reset(); + data->interruptible_thread.reset(); //mark down the new state - this->block_state = BLOCK_STATE_DONE; + data->block_state = BLOCK_STATE_DONE; //release upstream, downstream, and executor tokens - this->token_pool.clear(); + data->token_pool.clear(); //release all buffers in queues - this->input_queues.flush_all(); - this->output_queues.flush_all(); + data->input_queues.flush_all(); + data->output_queues.flush_all(); //release all tags and msgs - for (size_t i = 0; i < this->get_num_inputs(); i++) + for (size_t i = 0; i < worker->get_num_inputs(); i++) { - this->input_msgs[i].clear(); - this->input_tags[i].clear(); + data->input_msgs[i].clear(); + data->input_tags[i].clear(); } //tell the upstream and downstram to re-check their tokens //this is how the other blocks know who is interested, //and can decide based on interest to set done or not - for (size_t i = 0; i < this->get_num_inputs(); i++) + for (size_t i = 0; i < worker->get_num_inputs(); i++) { - this->post_upstream(i, OutputCheckMessage()); + worker->post_upstream(i, OutputCheckMessage()); } - for (size_t i = 0; i < this->get_num_outputs(); i++) + for (size_t i = 0; i < worker->get_num_outputs(); i++) { - this->post_downstream(i, InputCheckMessage()); + worker->post_downstream(i, InputCheckMessage()); } if (ARMAGEDDON) std::cerr << "==================================================\n" - << "== The " << block_ptr->to_string() << " is done...\n" + << "== The " << name << " is done...\n" << "==================================================\n" << std::flush; } diff --git a/lib/task_fail.cpp b/lib/task_fail.cpp index 720e2e1..415c13b 100644 --- a/lib/task_fail.cpp +++ b/lib/task_fail.cpp @@ -7,52 +7,52 @@ using namespace gras; void Block::mark_output_fail(const size_t which_output) { - (*this)->block->output_fail(which_output); + (*this)->block_actor->output_fail(which_output); } void Block::mark_input_fail(const size_t which_input) { - (*this)->block->input_fail(which_input); + (*this)->block_actor->input_fail(which_input); } void BlockActor::input_fail(const size_t i) { //input failed, accumulate and try again - if (not this->input_queues.is_accumulated(i)) + if (not data->input_queues.is_accumulated(i)) { - this->input_queues.accumulate(i); + data->input_queues.accumulate(i); this->task_kicker(); return; } //otherwise check for done, else wait for more - if (this->inputs_done[i]) + if (data->inputs_done[i]) { this->mark_done(); return; } //check that the input is not already maxed - if (this->input_queues.is_front_maximal(i)) + if (data->input_queues.is_front_maximal(i)) { throw std::runtime_error("input_fail called on maximum_items buffer in " + name); } //mark fail: not ready until a new buffer appears - this->input_queues.fail(i); + data->input_queues.fail(i); } void BlockActor::output_fail(const size_t i) { - SBuffer &buff = this->output_queues.front(i); + SBuffer &buff = data->output_queues.front(i); //check that the input is not already maxed - const size_t front_items = buff.length/this->output_configs[i].item_size; - if (front_items >= this->output_configs[i].maximum_items) + const size_t front_items = buff.length/data->output_configs[i].item_size; + if (front_items >= data->output_configs[i].maximum_items) { throw std::runtime_error("output_fail called on maximum_items buffer in " + name); } //mark fail: not ready until a new buffer appears - this->output_queues.fail(i); + data->output_queues.fail(i); } diff --git a/lib/task_main.cpp b/lib/task_main.cpp index 01e38ce..cb3e0ee 100644 --- a/lib/task_main.cpp +++ b/lib/task_main.cpp @@ -7,7 +7,7 @@ using namespace gras; void BlockActor::task_main(void) { - TimerAccumulate ta_prep(this->stats.total_time_prep); + TimerAccumulate ta_prep(data->stats.total_time_prep); //------------------------------------------------------------------ //-- Decide if its possible to continue any processing: @@ -16,40 +16,40 @@ void BlockActor::task_main(void) //------------------------------------------------------------------ if GRAS_UNLIKELY(not this->is_work_allowed()) return; - const size_t num_inputs = this->get_num_inputs(); - const size_t num_outputs = this->get_num_outputs(); + const size_t num_inputs = worker->get_num_inputs(); + const size_t num_outputs = worker->get_num_outputs(); //------------------------------------------------------------------ //-- initialize input buffers before work //------------------------------------------------------------------ size_t output_inline_index = 0; - this->input_items.min() = ~0; - this->input_items.max() = 0; + data->input_items.min() = ~0; + data->input_items.max() = 0; for (size_t i = 0; i < num_inputs; i++) { this->sort_tags(i); - this->num_input_msgs_read[i] = 0; + data->num_input_msgs_read[i] = 0; - ASSERT(this->input_queues.ready(i)); - const SBuffer &buff = this->input_queues.front(i); + ASSERT(data->input_queues.ready(i)); + const SBuffer &buff = data->input_queues.front(i); const void *mem = buff.get(); - size_t items = buff.length/this->input_configs[i].item_size; + size_t items = buff.length/data->input_configs[i].item_size; - this->input_items.vec()[i] = mem; - this->input_items[i].get() = mem; - this->input_items[i].size() = items; - this->input_items.min() = std::min(this->input_items.min(), items); - this->input_items.max() = std::max(this->input_items.max(), items); + data->input_items.vec()[i] = mem; + data->input_items[i].get() = mem; + data->input_items[i].size() = items; + data->input_items.min() = std::min(data->input_items.min(), items); + data->input_items.max() = std::max(data->input_items.max(), items); //inline dealings, how and when input buffers can be inlined into output buffers //* if GRAS_UNLIKELY( buff.unique() and - input_configs[i].inline_buffer and + data->input_configs[i].inline_buffer and output_inline_index < num_outputs and - buff.get_affinity() == this->buffer_affinity + buff.get_affinity() == data->buffer_affinity ){ - this->output_queues.set_inline(output_inline_index++, buff); + data->output_queues.set_inline(output_inline_index++, buff); } //*/ } @@ -57,41 +57,41 @@ void BlockActor::task_main(void) //------------------------------------------------------------------ //-- initialize output buffers before work //------------------------------------------------------------------ - this->output_items.min() = ~0; - this->output_items.max() = 0; + data->output_items.min() = ~0; + data->output_items.max() = 0; for (size_t i = 0; i < num_outputs; i++) { - ASSERT(this->output_queues.ready(i)); - SBuffer &buff = this->output_queues.front(i); + ASSERT(data->output_queues.ready(i)); + SBuffer &buff = data->output_queues.front(i); ASSERT(buff.length == 0); //assumes it was flushed last call void *mem = buff.get(); const size_t bytes = buff.get_actual_length() - buff.offset; - size_t items = bytes/this->output_configs[i].item_size; + size_t items = bytes/data->output_configs[i].item_size; - this->output_items.vec()[i] = mem; - this->output_items[i].get() = mem; - this->output_items[i].size() = items; - this->output_items.min() = std::min(this->output_items.min(), items); - this->output_items.max() = std::max(this->output_items.max(), items); + data->output_items.vec()[i] = mem; + data->output_items[i].get() = mem; + data->output_items[i].size() = items; + data->output_items.min() = std::min(data->output_items.min(), items); + data->output_items.max() = std::max(data->output_items.max(), items); } //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ ta_prep.done(); - this->stats.work_count++; - if GRAS_UNLIKELY(this->interruptible_thread) + data->stats.work_count++; + if GRAS_UNLIKELY(data->interruptible_thread) { - TimerAccumulate ta_work(this->stats.total_time_work); - this->interruptible_thread->call(); + TimerAccumulate ta_work(data->stats.total_time_work); + data->interruptible_thread->call(); } else { - TimerAccumulate ta_work(this->stats.total_time_work); + TimerAccumulate ta_work(data->stats.total_time_work); this->task_work(); } - this->stats.time_last_work = time_now(); - TimerAccumulate ta_post(this->stats.total_time_post); + data->stats.time_last_work = time_now(); + TimerAccumulate ta_post(data->stats.total_time_post); //------------------------------------------------------------------ //-- Post-work output tasks @@ -99,18 +99,18 @@ void BlockActor::task_main(void) for (size_t i = 0; i < num_outputs; i++) { //buffer may be popped by one of the special buffer api hooks - if GRAS_UNLIKELY(this->output_queues.empty(i)) continue; + if GRAS_UNLIKELY(data->output_queues.empty(i)) continue; //grab a copy of the front buffer then consume from the queue InputBufferMessage buff_msg; - buff_msg.buffer = this->output_queues.front(i); - this->output_queues.consume(i); + buff_msg.buffer = data->output_queues.front(i); + data->output_queues.consume(i); //Post a buffer message downstream only if the produce flag was marked. //So this explicitly after consuming the output queues so pop is called. //This is because pop may have special hooks in it to prepare the buffer. - if GRAS_LIKELY(this->produce_outputs[i]) this->post_downstream(i, buff_msg); - this->produce_outputs[i] = false; + if GRAS_LIKELY(data->produce_outputs[i]) worker->post_downstream(i, buff_msg); + data->produce_outputs[i] = false; } //------------------------------------------------------------------ diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 2106a54..12055f8 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -20,7 +20,7 @@ TopBlock::TopBlock(void) TopBlock::TopBlock(const std::string &name): HierBlock(name) { - (*this)->executor = boost::shared_ptr<Apology::Executor>(new Apology::Executor((*this)->topology.get())); + (*this)->executor.reset(new Apology::Executor((*this)->topology.get())); (*this)->token = Token::make(); (*this)->thread_group = SharedThreadGroup(new boost::thread_group()); if (GENESIS) std::cerr diff --git a/lib/top_block_query.cpp b/lib/top_block_query.cpp index c38fbd9..79dadfb 100644 --- a/lib/top_block_query.cpp +++ b/lib/top_block_query.cpp @@ -32,12 +32,12 @@ static ptree query_blocks(ElementImpl *self, const ptree &) { ptree root; ptree e; - BOOST_FOREACH(Apology::Worker *worker, self->executor->get_workers()) + BOOST_FOREACH(Apology::Worker *w, self->executor->get_workers()) { - BlockActor *block = dynamic_cast<BlockActor *>(worker); + BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); ptree prop_e; typedef std::pair<std::string, PropertyRegistryPair> PropRegistryKVP; - BOOST_FOREACH(const PropRegistryKVP &p, block->property_registry) + BOOST_FOREACH(const PropRegistryKVP &p, actor->data->property_registry) { ptree prop_attrs; if (p.second.setter) @@ -56,7 +56,7 @@ static ptree query_blocks(ElementImpl *self, const ptree &) block_attrs.push_back(std::make_pair(p.first, prop_attrs)); prop_e.push_back(std::make_pair("props", block_attrs)); } - e.push_back(std::make_pair(block->block_ptr->get_uid(), prop_e)); + e.push_back(std::make_pair(actor->data->block->get_uid(), prop_e)); } root.push_back(std::make_pair("blocks", e)); return root; @@ -77,16 +77,18 @@ static ptree query_stats(ElementImpl *self, const ptree &query) //get stats with custom receiver and set high prio GetStatsReceiver receiver; size_t outstandingCount(0); - BOOST_FOREACH(Apology::Worker *worker, self->executor->get_workers()) + BOOST_FOREACH(Apology::Worker *w, self->executor->get_workers()) { + BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); + //filter workers not needed in query - const std::string id = dynamic_cast<BlockActor *>(worker)->block_ptr->get_uid(); + const std::string id = actor->data->block->get_uid(); if (std::find(block_ids.begin(), block_ids.end(), id) == block_ids.end()) continue; //send a message to the block's actor to query stats GetStatsMessage message; - message.prio_token = dynamic_cast<BlockActor *>(worker)->prio_token; - worker->GetFramework().Send(message, receiver.GetAddress(), worker->GetAddress()); + message.prio_token = actor->prio_token; + actor->GetFramework().Send(message, receiver.GetAddress(), actor->GetAddress()); outstandingCount++; } while (outstandingCount) outstandingCount -= receiver.Wait(outstandingCount); @@ -169,19 +171,19 @@ static ptree query_props(ElementImpl *self, const ptree &query) const std::string block_id = query.get<std::string>("block"); const std::string prop_key = query.get<std::string>("key"); const bool set = query.count("value") != 0; - BOOST_FOREACH(Apology::Worker *worker, self->executor->get_workers()) + BOOST_FOREACH(Apology::Worker *w, self->executor->get_workers()) { - BlockActor *block = dynamic_cast<BlockActor *>(worker); - if (block->block_ptr->get_uid() != block_id) continue; + BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); + if (actor->data->block->get_uid() != block_id) continue; if (set) { - const std::type_info &t = block->property_registry[prop_key].setter->type(); + const std::type_info &t = actor->data->property_registry[prop_key].setter->type(); const PMCC p = ptree_to_pmc(query.get_child("value"), t); - block->prop_access_dispatcher(prop_key, p, true); + actor->prop_access_dispatcher(prop_key, p, true); } else { - PMCC p = block->prop_access_dispatcher(prop_key, PMC(), false); + PMCC p = actor->prop_access_dispatcher(prop_key, PMC(), false); ptree v = pmc_to_ptree(p); root.push_back(std::make_pair("block", query.get_child("block"))); root.push_back(std::make_pair("key", query.get_child("key"))); diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index 38ea138..1586f9d 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -24,49 +24,49 @@ void BlockActor::handle_topology( ){ MESSAGE_TRACER(); - const size_t num_inputs = this->get_num_inputs(); - const size_t num_outputs = this->get_num_outputs(); + const size_t num_inputs = worker->get_num_inputs(); + const size_t num_outputs = worker->get_num_outputs(); //call notify_topology on block before committing settings - this->block_ptr->notify_topology(num_inputs, num_outputs); + data->block->notify_topology(num_inputs, num_outputs); //resize and fill port properties - resize_fill_back(this->input_configs, num_inputs); - resize_fill_back(this->output_configs, num_outputs); + resize_fill_back(data->input_configs, num_inputs); + resize_fill_back(data->output_configs, num_outputs); //resize the bytes consumed/produced - resize_fill_grow(this->stats.items_consumed, num_inputs, 0); - resize_fill_grow(this->stats.tags_consumed, num_inputs, 0); - resize_fill_grow(this->stats.msgs_consumed, num_inputs, 0); - resize_fill_grow(this->stats.items_produced, num_outputs, 0); - resize_fill_grow(this->stats.tags_produced, num_outputs, 0); - resize_fill_grow(this->stats.msgs_produced, num_outputs, 0); + resize_fill_grow(data->stats.items_consumed, num_inputs, 0); + resize_fill_grow(data->stats.tags_consumed, num_inputs, 0); + resize_fill_grow(data->stats.msgs_consumed, num_inputs, 0); + resize_fill_grow(data->stats.items_produced, num_outputs, 0); + resize_fill_grow(data->stats.tags_produced, num_outputs, 0); + resize_fill_grow(data->stats.msgs_produced, num_outputs, 0); //resize all work buffers to match current connections - this->input_items.resize(num_inputs); - this->output_items.resize(num_outputs); - this->input_queues.resize(num_inputs); - this->output_queues.resize(num_outputs); - this->produce_outputs.resize(num_outputs, false); - this->inputs_available.resize(num_inputs); - if (num_inputs == 0) this->inputs_available.resize(1, true); //so its always "available" + data->input_items.resize(num_inputs); + data->output_items.resize(num_outputs); + data->input_queues.resize(num_inputs); + data->output_queues.resize(num_outputs); + data->produce_outputs.resize(num_outputs, false); + data->inputs_available.resize(num_inputs); + if (num_inputs == 0) data->inputs_available.resize(1, true); //so its always "available" //copy the name into the queues for debug purposes - this->input_queues.name = this->name; - this->output_queues.name = this->name; + data->input_queues.name = this->name; + data->output_queues.name = this->name; //resize the token trackers - this->input_tokens.resize(num_inputs); - this->output_tokens.resize(num_outputs); - this->inputs_done.resize(num_inputs); - this->outputs_done.resize(num_outputs); - this->output_allocation_hints.resize(num_outputs); + data->input_tokens.resize(num_inputs); + data->output_tokens.resize(num_outputs); + data->inputs_done.resize(num_inputs); + data->outputs_done.resize(num_outputs); + data->output_allocation_hints.resize(num_outputs); //resize tags vector to match sizes - this->input_tags_changed.resize(num_inputs); - this->input_tags.resize(num_inputs); - this->num_input_msgs_read.resize(num_inputs); - this->input_msgs.resize(num_inputs); + data->input_tags_changed.resize(num_inputs); + data->input_tags.resize(num_inputs); + data->num_input_msgs_read.resize(num_inputs); + data->input_msgs.resize(num_inputs); //a block looses all connections, allow it to free if (num_inputs == 0 and num_outputs == 0) diff --git a/tests/thread_pool_test.py b/tests/thread_pool_test.py index 0f8b513..2765b07 100644 --- a/tests/thread_pool_test.py +++ b/tests/thread_pool_test.py @@ -2,6 +2,7 @@ import unittest import gras +from demo_blocks import * class ThreadPoolTest(unittest.TestCase): @@ -23,5 +24,19 @@ class ThreadPoolTest(unittest.TestCase): #here we assume prio 0.0 (default) can always be set self.assertTrue(gras.ThreadPool.test_thread_priority(0.0)) + def test_migrate_to_thread_pool(self): + tb = gras.TopBlock() + vec_source = VectorSource(numpy.uint32, [0, 9, 8, 7, 6]) + vec_sink = VectorSink(numpy.uint32) + + c = gras.ThreadPoolConfig() + tp = gras.ThreadPool(c) + + vec_source.set_thread_pool(tp) + tb.connect(vec_source, vec_sink) + tb.run() + + self.assertEqual(vec_sink.get_vector(), (0, 9, 8, 7, 6)) + if __name__ == '__main__': unittest.main() |