diff options
author | Josh Blum | 2012-11-17 16:59:46 -0800 |
---|---|---|
committer | Josh Blum | 2012-11-17 16:59:46 -0800 |
commit | eecfc6800e08bf3eec1c8fd93e8b07baf06230b3 (patch) | |
tree | 0f34714429bcf884138919d9330940a32d3e8751 | |
parent | 692c629daa1d265ed6e2c845013c8cf6bec09f78 (diff) | |
download | sandhi-eecfc6800e08bf3eec1c8fd93e8b07baf06230b3.tar.gz sandhi-eecfc6800e08bf3eec1c8fd93e8b07baf06230b3.tar.bz2 sandhi-eecfc6800e08bf3eec1c8fd93e8b07baf06230b3.zip |
added common flush code, flush before produce_buffer
-rw-r--r-- | lib/block_task.cpp | 16 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 1 |
2 files changed, 13 insertions, 4 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index d3e12f6..2495f01 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -197,10 +197,7 @@ void BlockActor::handle_task(void) (buff.get_actual_length() - buff.length) < reserve_bytes ) { - InputBufferMessage buff_msg; - buff_msg.buffer = buff; - this->post_downstream(i, buff_msg); - this->output_queues.pop(i); + this->flush_output(i); } } @@ -236,9 +233,20 @@ void BlockActor::produce(const size_t i, const size_t items) void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) { + this->flush_output(i); const size_t items = buffer.length/output_items_sizes[i]; this->items_produced[i] += items; InputBufferMessage buff_msg; buff_msg.buffer = buffer; this->post_downstream(i, buff_msg); } + +GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i) +{ + if (not this->output_queues.ready(i)) return; + SBuffer &buff = this->output_queues.front(i); + InputBufferMessage buff_msg; + buff_msg.buffer = buff; + this->post_downstream(i, buff_msg); + this->output_queues.pop(i); +} diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 6e2903b..7c53a08 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -93,6 +93,7 @@ struct BlockActor : Apology::Worker void produce(const size_t index, const size_t items); void consume(const size_t index, const size_t items); void produce_buffer(const size_t index, const SBuffer &buffer); + void flush_output(const size_t index); GRAS_FORCE_INLINE bool any_inputs_done(void) { |