diff options
author | Josh Blum | 2013-03-16 04:18:37 -0700 |
---|---|---|
committer | Josh Blum | 2013-03-16 04:18:37 -0700 |
commit | cebd974738fea792f70bca8ab0bb1d73e2116d81 (patch) | |
tree | fe53f1ce5795385ac257430912ae2d9bc4129120 | |
parent | 7cc848c9e69dd3c1f70e2872f1a4fb2043c94e66 (diff) | |
download | sandhi-cebd974738fea792f70bca8ab0bb1d73e2116d81.tar.gz sandhi-cebd974738fea792f70bca8ab0bb1d73e2116d81.tar.bz2 sandhi-cebd974738fea792f70bca8ab0bb1d73e2116d81.zip |
gras: added num_read count for pop msg to fix #53
-rw-r--r-- | lib/block.cpp | 6 | ||||
-rw-r--r-- | lib/block_task.cpp | 18 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 2 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 1 | ||||
-rw-r--r-- | lib/tag_handlers.hpp | 10 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 1 |
6 files changed, 29 insertions, 9 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 9bf3b9c..d2f0a0d 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -181,9 +181,9 @@ void Block::post_output_msg(const size_t which_output, const PMCC &msg) PMCC Block::pop_input_msg(const size_t which_input) { std::vector<PMCC> &input_msgs = (*this)->block->input_msgs[which_input]; - if (input_msgs.empty()) return PMCC(); - PMCC p = input_msgs.front(); - input_msgs.erase(input_msgs.begin()); + size_t &num_read = (*this)->block->num_input_msgs_read[which_input]; + if (num_read >= input_msgs.size()) return PMCC(); + PMCC p = input_msgs[num_read++]; (*this)->block->stats.msgs_consumed[which_input]++; return p; } diff --git a/lib/block_task.cpp b/lib/block_task.cpp index c460d01..87d0c8a 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -120,6 +120,7 @@ void BlockActor::handle_task(void) for (size_t i = 0; i < num_inputs; i++) { this->sort_tags(i); + this->num_input_msgs_read[i] = 0; ASSERT(this->input_queues.ready(i)); const SBuffer &buff = this->input_queues.front(i); @@ -188,7 +189,7 @@ void BlockActor::handle_task(void) TimerAccumulate ta_post(this->stats.total_time_post); //------------------------------------------------------------------ - //-- Flush output buffers downstream + //-- Post-work output tasks //------------------------------------------------------------------ for (size_t i = 0; i < num_outputs; i++) { @@ -196,14 +197,19 @@ void BlockActor::handle_task(void) } //------------------------------------------------------------------ - //-- Message self based on post-work conditions + //-- Post-work input tasks //------------------------------------------------------------------ - //missing at least one upstream provider? - //since nothing else is coming in, its safe to mark done for (size_t i = 0; i < num_inputs; i++) { - const bool nothing = this->input_queues.empty(i) and this->input_msgs[i].empty(); - this->inputs_available.set(i, not nothing); + this->trim_msgs(i); + + //update the inputs available bit field + const bool has_input_bufs = not this->input_queues.empty(i); + const bool has_input_msgs = not this->input_msgs[i].empty(); + this->inputs_available.set(i, has_input_bufs or has_input_msgs); + + //missing at least one upstream provider? + //since nothing else is coming in, its safe to mark done if GRAS_UNLIKELY(this->is_input_done(i)) this->mark_done(); } diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 5e08687..935c87d 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -101,6 +101,7 @@ struct BlockActor : Apology::Worker void output_fail(const size_t index); void sort_tags(const size_t index); void trim_tags(const size_t index); + void trim_msgs(const size_t index); void produce(const size_t index, const size_t items); void consume(const size_t index, const size_t items); void produce_buffer(const size_t index, const SBuffer &buffer); @@ -154,6 +155,7 @@ struct BlockActor : Apology::Worker //tag tracking std::vector<bool> input_tags_changed; std::vector<std::vector<Tag> > input_tags; + std::vector<size_t> num_input_msgs_read; std::vector<std::vector<PMCC> > input_msgs; //interruptible thread stuff diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index e55f72e..c15885d 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -40,6 +40,7 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th if (this->block_state == BLOCK_STATE_DONE) return; this->input_queues.push(index, message.buffer); this->inputs_available.set(index); + ta.done(); this->handle_task(); } diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp index 73c9fb7..425748c 100644 --- a/lib/tag_handlers.hpp +++ b/lib/tag_handlers.hpp @@ -42,6 +42,16 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) this->stats.tags_consumed[i] += last; } +GRAS_FORCE_INLINE void BlockActor::trim_msgs(const size_t i) +{ + const size_t num_read = this->num_input_msgs_read[i]; + if GRAS_UNLIKELY(num_read > 0) + { + std::vector<PMCC> &input_msgs = this->input_msgs[i]; + input_msgs.erase(input_msgs.begin(), input_msgs.begin()+num_read); + } +} + } //namespace gras #endif /*INCLUDED_LIBGRAS_TAG_HANDLERS_HPP*/ diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index ac4d859..aec8f68 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -64,6 +64,7 @@ void BlockActor::handle_topology( //resize tags vector to match sizes this->input_tags_changed.resize(num_inputs); this->input_tags.resize(num_inputs); + this->num_input_msgs_read.resize(num_inputs); this->input_msgs.resize(num_inputs); //a block looses all connections, allow it to free |