diff options
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 33 |
1 files changed, 21 insertions, 12 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 6f70bc2..192ea40 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -92,13 +92,8 @@ void BlockActor::output_fail(const size_t i) throw std::runtime_error("output_fail called on maximum_items buffer"); } - if (buff.length != 0) - { - InputBufferMessage buff_msg; - buff_msg.buffer = buff; - this->post_downstream(i, buff_msg); - this->output_queues.pop(i); - } + //force pop so next work() gets a new buffer + this->flush_output(i, true); } void BlockActor::handle_task(void) @@ -168,8 +163,8 @@ void BlockActor::handle_task(void) { ASSERT(this->output_queues.ready(i)); SBuffer &buff = this->output_queues.front(i); - void *mem = buff.get(buff.length); - const size_t bytes = buff.get_actual_length() - buff.length - buff.offset; + void *mem = buff.get(); + const size_t bytes = buff.get_actual_length() - buff.offset; size_t items = bytes/this->output_items_sizes[i]; this->output_items[i].get() = mem; @@ -241,11 +236,25 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) this->post_downstream(i, buff_msg); } -GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i) +GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i, const bool force_pop) { if (not this->output_queues.ready(i) or this->output_queues.front(i).length == 0) return; + SBuffer &buff = this->output_queues.front(i); InputBufferMessage buff_msg; - buff_msg.buffer = this->output_queues.front(i); + buff_msg.buffer = buff; this->post_downstream(i, buff_msg); - this->output_queues.pop(i); + + //increment buffer for next use + buff.offset += buff.length; + buff.length = 0; + + //when to pop the buffer and give next work a new one + const size_t reserve_bytes = this->output_configs[i].reserve_items/this->output_items_sizes[i]; + if ( + force_pop or (buff.offset*2 > buff.get_actual_length()) or + (buff.get_actual_length() - buff.offset) < reserve_bytes + ) + { + this->output_queues.pop(i); + } } |