diff options
author | Josh Blum | 2012-09-29 14:45:29 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-29 14:45:29 -0700 |
commit | ec1677346389ab3b434d81c6bde15321f3dbe209 (patch) | |
tree | a4fd8498e64dd90f2fc169a9de747e49e2173830 | |
parent | b194049a9fb5ab60f15bfcca1a53e39a42339244 (diff) | |
download | sandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.tar.gz sandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.tar.bz2 sandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.zip |
create IO subscriber bitset for tracking done
-rw-r--r-- | lib/block.cpp | 1 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 6 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 29 | ||||
-rw-r--r-- | lib/block_task.cpp | 33 | ||||
-rw-r--r-- | lib/element.cpp | 2 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 9 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 5 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 9 | ||||
-rw-r--r-- | lib/output_handlers.cpp | 3 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 2 |
10 files changed, 48 insertions, 51 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 21165f2..40b220b 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -28,6 +28,7 @@ Block::Block(const std::string &name): Element(name) { (*this)->block = boost::shared_ptr<BlockActor>(new BlockActor()); + (*this)->block->name = name; //for debug purposes //setup some state variables (*this)->block->topology_init = false; diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index b2da9ea..ad39159 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -74,6 +74,12 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address { MESSAGE_TRACER(); + if (this->block_state == BLOCK_STATE_DONE) + { + this->Send(0, from); //ACK + return; + } + //allocate output buffers which will also wake up the task const size_t num_outputs = this->get_num_outputs(); this->output_buffer_tokens.resize(num_outputs); diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index f386804..6925a60 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -27,6 +27,12 @@ void BlockActor::handle_top_active( ){ MESSAGE_TRACER(); + if (this->block_state == BLOCK_STATE_DONE) + { + this->Send(0, from); //ACK + return; + } + if (this->block_state != BLOCK_STATE_LIVE) { this->block_ptr->start(); @@ -61,10 +67,17 @@ void BlockActor::handle_top_token( ){ MESSAGE_TRACER(); + if (this->block_state == BLOCK_STATE_DONE) + { + this->Send(0, from); //ACK + return; + } + //create input tokens and send allocation hints for (size_t i = 0; i < this->get_num_inputs(); i++) { this->input_tokens[i] = Token::make(); + this->inputs_done.reset(i); OutputTokenMessage token_msg; token_msg.token = this->input_tokens[i]; this->post_upstream(i, token_msg); @@ -83,6 +96,7 @@ void BlockActor::handle_top_token( for (size_t i = 0; i < this->get_num_outputs(); i++) { this->output_tokens[i] = Token::make(); + this->outputs_done.reset(i); InputTokenMessage token_msg; token_msg.token = this->output_tokens[i]; this->post_downstream(i, token_msg); @@ -133,18 +147,3 @@ void BlockActor::handle_self_kick( MESSAGE_TRACER(); this->handle_task(); } - -void BlockActor::handle_check_tokens( - const CheckTokensMessage &, - const Theron::Address -){ - MESSAGE_TRACER(); - if (this->input_queues.all_ready() and not this->forecast_fail) - { - this->handle_task(); - } - else - { - this->mark_done(); - } -} diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 761e923..3f012da 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -90,22 +90,17 @@ void BlockActor::handle_task(void) const size_t num_inputs = this->get_num_inputs(); const size_t num_outputs = this->get_num_outputs(); - //const bool is_source = (num_inputs == 0); - //const bool is_sink = (num_outputs == 0); this->work_io_ptr_mask = 0; //reset //------------------------------------------------------------------ //-- initialize input buffers before work //------------------------------------------------------------------ size_t num_input_items = REALLY_BIG; //so big that it must std::min - bool inputs_done = false; size_t output_inline_index = 0; for (size_t i = 0; i < num_inputs; i++) { this->sort_tags(i); - inputs_done = inputs_done or this->input_tokens[i].unique(); - ASSERT(this->input_queues.ready(i)); bool potential_inline; const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate, potential_inline); @@ -141,11 +136,8 @@ void BlockActor::handle_task(void) //-- initialize output buffers before work //------------------------------------------------------------------ size_t num_output_items = REALLY_BIG; //so big that it must std::min - bool outputs_done = false; for (size_t i = 0; i < num_outputs; i++) { - outputs_done = outputs_done or this->output_tokens[i].unique(); - ASSERT(this->output_queues.ready(i)); const SBuffer &buff = this->output_queues.front(i); void *mem = buff.get(buff.length); @@ -160,13 +152,6 @@ void BlockActor::handle_task(void) this->produce_items[i] = 0; } - //if we have outputs and at least one port has no downstream subscibers, mark done - if (outputs_done) - { - this->mark_done(); - return; - } - //------------------------------------------------------------------ //-- forecast //------------------------------------------------------------------ @@ -187,7 +172,7 @@ void BlockActor::handle_task(void) if (num_output_items) goto forecast_again_you_jerk; this->forecast_fail = true; - this->conclusion(inputs_done); + this->conclusion(); return; } } @@ -258,17 +243,19 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- Message self based on post-work conditions //------------------------------------------------------------------ - this->conclusion(inputs_done); + this->conclusion(); } -GRAS_FORCE_INLINE void BlockActor::conclusion(const bool inputs_done) +GRAS_FORCE_INLINE void BlockActor::conclusion(void) { - //if there are inputs, and not all are provided for, - //tell the block to check input queues and handle done - if (inputs_done) + + //since nothing else is coming in, its safe to mark done + if ((~this->inputs_done).none()) //no upstream providers { - this->Push(CheckTokensMessage(), Theron::Address()); - return; + if (not this->input_queues.all_ready() or this->forecast_fail) + { + this->mark_done(); + } } //still have IO ready? kick off another task diff --git a/lib/element.cpp b/lib/element.cpp index a733d31..c7afb8b 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -34,7 +34,7 @@ Element::Element(const std::string &name) (*this)->name = name; (*this)->unique_id = ++unique_id_pool; - if (GENESIS) std::cerr << "New element: " << name << " " << (*this)->unique_id << std::endl; + if (GENESIS) std::cerr << "New element: " << to_string() << std::endl; //default io signature to something IOSignature sig; sig.push_back(1); diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 8932ded..60c730f 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -27,6 +27,7 @@ #include <gras_impl/output_buffer_queues.hpp> #include <gras_impl/input_buffer_queues.hpp> #include <gras_impl/interruptible_thread.hpp> +#include <boost/dynamic_bitset.hpp> #include <vector> #include <set> @@ -48,6 +49,7 @@ struct BlockActor : Apology::Worker BlockActor(void); ~BlockActor(void); Block *block_ptr; + std::string name; //for debug //do it here so we can match w/ the handler declarations void register_handlers(void) @@ -74,7 +76,6 @@ struct BlockActor : Apology::Worker this->RegisterHandler(this, &BlockActor::handle_output_alloc); this->RegisterHandler(this, &BlockActor::handle_self_kick); - this->RegisterHandler(this, &BlockActor::handle_check_tokens); this->RegisterHandler(this, &BlockActor::handle_update_inputs); } @@ -101,7 +102,6 @@ struct BlockActor : Apology::Worker void handle_output_alloc(const OutputAllocMessage &, const Theron::Address); void handle_self_kick(const SelfKickMessage &, const Theron::Address); - void handle_check_tokens(const CheckTokensMessage &, const Theron::Address); void handle_update_inputs(const UpdateInputsMessage &, const Theron::Address); //helpers @@ -110,7 +110,7 @@ struct BlockActor : Apology::Worker void handle_task(void); void sort_tags(const size_t index); void trim_tags(const size_t index); - void conclusion(const bool); + void conclusion(void); //per port properties std::vector<size_t> input_items_sizes; @@ -144,8 +144,9 @@ struct BlockActor : Apology::Worker //track the subscriber counts std::vector<Token> input_tokens; std::vector<Token> output_tokens; + boost::dynamic_bitset<> inputs_done; + boost::dynamic_bitset<> outputs_done; std::set<Token> token_pool; - std::vector<SBufferToken> output_buffer_tokens; //buffer queues and ready conditions diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index 183befd..ed0ef81 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -137,11 +137,6 @@ struct SelfKickMessage //empty }; -struct CheckTokensMessage -{ - //empty -}; - struct UpdateInputsMessage { //empty diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index f01ad28..461600f 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -55,9 +55,14 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther const size_t index = message.index; //an upstream block declared itself done, recheck the token - if (this->input_queues.empty(index) and this->input_tokens[index].unique()) + this->inputs_done.set(index, this->input_tokens[index].unique()); + + if ((~this->inputs_done).none()) //no upstream providers { - this->mark_done(); + if (not this->input_queues.all_ready()) + { + this->mark_done(); + } } } diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp index bf63470..fa01b51 100644 --- a/lib/output_handlers.cpp +++ b/lib/output_handlers.cpp @@ -47,7 +47,8 @@ void BlockActor::handle_output_check(const OutputCheckMessage &message, const Th const size_t index = message.index; //a downstream block has declared itself done, recheck the token - if (this->output_tokens[index].unique()) + this->outputs_done.set(index, this->output_tokens[index].unique()); + if ((~this->outputs_done).none()) //no downstream subscribers? { this->mark_done(); } diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index 53c4961..8208fc6 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -60,6 +60,8 @@ void BlockActor::handle_topology( this->input_tokens.resize(num_inputs); this->output_tokens.resize(num_outputs); + this->inputs_done.resize(num_inputs); + this->outputs_done.resize(num_outputs); this->output_allocation_hints.resize(num_outputs); //resize tags vector to match sizes |