summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2013-03-16 04:18:37 -0700
committerJosh Blum2013-03-16 04:18:37 -0700
commitcebd974738fea792f70bca8ab0bb1d73e2116d81 (patch)
treefe53f1ce5795385ac257430912ae2d9bc4129120
parent7cc848c9e69dd3c1f70e2872f1a4fb2043c94e66 (diff)
downloadsandhi-cebd974738fea792f70bca8ab0bb1d73e2116d81.tar.gz
sandhi-cebd974738fea792f70bca8ab0bb1d73e2116d81.tar.bz2
sandhi-cebd974738fea792f70bca8ab0bb1d73e2116d81.zip
gras: added num_read count for pop msg to fix #53
-rw-r--r--lib/block.cpp6
-rw-r--r--lib/block_task.cpp18
-rw-r--r--lib/gras_impl/block_actor.hpp2
-rw-r--r--lib/input_handlers.cpp1
-rw-r--r--lib/tag_handlers.hpp10
-rw-r--r--lib/topology_handler.cpp1
6 files changed, 29 insertions, 9 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 9bf3b9c..d2f0a0d 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -181,9 +181,9 @@ void Block::post_output_msg(const size_t which_output, const PMCC &msg)
PMCC Block::pop_input_msg(const size_t which_input)
{
std::vector<PMCC> &input_msgs = (*this)->block->input_msgs[which_input];
- if (input_msgs.empty()) return PMCC();
- PMCC p = input_msgs.front();
- input_msgs.erase(input_msgs.begin());
+ size_t &num_read = (*this)->block->num_input_msgs_read[which_input];
+ if (num_read >= input_msgs.size()) return PMCC();
+ PMCC p = input_msgs[num_read++];
(*this)->block->stats.msgs_consumed[which_input]++;
return p;
}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index c460d01..87d0c8a 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -120,6 +120,7 @@ void BlockActor::handle_task(void)
for (size_t i = 0; i < num_inputs; i++)
{
this->sort_tags(i);
+ this->num_input_msgs_read[i] = 0;
ASSERT(this->input_queues.ready(i));
const SBuffer &buff = this->input_queues.front(i);
@@ -188,7 +189,7 @@ void BlockActor::handle_task(void)
TimerAccumulate ta_post(this->stats.total_time_post);
//------------------------------------------------------------------
- //-- Flush output buffers downstream
+ //-- Post-work output tasks
//------------------------------------------------------------------
for (size_t i = 0; i < num_outputs; i++)
{
@@ -196,14 +197,19 @@ void BlockActor::handle_task(void)
}
//------------------------------------------------------------------
- //-- Message self based on post-work conditions
+ //-- Post-work input tasks
//------------------------------------------------------------------
- //missing at least one upstream provider?
- //since nothing else is coming in, its safe to mark done
for (size_t i = 0; i < num_inputs; i++)
{
- const bool nothing = this->input_queues.empty(i) and this->input_msgs[i].empty();
- this->inputs_available.set(i, not nothing);
+ this->trim_msgs(i);
+
+ //update the inputs available bit field
+ const bool has_input_bufs = not this->input_queues.empty(i);
+ const bool has_input_msgs = not this->input_msgs[i].empty();
+ this->inputs_available.set(i, has_input_bufs or has_input_msgs);
+
+ //missing at least one upstream provider?
+ //since nothing else is coming in, its safe to mark done
if GRAS_UNLIKELY(this->is_input_done(i)) this->mark_done();
}
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 5e08687..935c87d 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -101,6 +101,7 @@ struct BlockActor : Apology::Worker
void output_fail(const size_t index);
void sort_tags(const size_t index);
void trim_tags(const size_t index);
+ void trim_msgs(const size_t index);
void produce(const size_t index, const size_t items);
void consume(const size_t index, const size_t items);
void produce_buffer(const size_t index, const SBuffer &buffer);
@@ -154,6 +155,7 @@ struct BlockActor : Apology::Worker
//tag tracking
std::vector<bool> input_tags_changed;
std::vector<std::vector<Tag> > input_tags;
+ std::vector<size_t> num_input_msgs_read;
std::vector<std::vector<PMCC> > input_msgs;
//interruptible thread stuff
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index e55f72e..c15885d 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -40,6 +40,7 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th
if (this->block_state == BLOCK_STATE_DONE) return;
this->input_queues.push(index, message.buffer);
this->inputs_available.set(index);
+
ta.done();
this->handle_task();
}
diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp
index 73c9fb7..425748c 100644
--- a/lib/tag_handlers.hpp
+++ b/lib/tag_handlers.hpp
@@ -42,6 +42,16 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
this->stats.tags_consumed[i] += last;
}
+GRAS_FORCE_INLINE void BlockActor::trim_msgs(const size_t i)
+{
+ const size_t num_read = this->num_input_msgs_read[i];
+ if GRAS_UNLIKELY(num_read > 0)
+ {
+ std::vector<PMCC> &input_msgs = this->input_msgs[i];
+ input_msgs.erase(input_msgs.begin(), input_msgs.begin()+num_read);
+ }
+}
+
} //namespace gras
#endif /*INCLUDED_LIBGRAS_TAG_HANDLERS_HPP*/
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index ac4d859..aec8f68 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -64,6 +64,7 @@ void BlockActor::handle_topology(
//resize tags vector to match sizes
this->input_tags_changed.resize(num_inputs);
this->input_tags.resize(num_inputs);
+ this->num_input_msgs_read.resize(num_inputs);
this->input_msgs.resize(num_inputs);
//a block looses all connections, allow it to free