summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp33
1 files changed, 21 insertions, 12 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 6f70bc2..192ea40 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -92,13 +92,8 @@ void BlockActor::output_fail(const size_t i)
throw std::runtime_error("output_fail called on maximum_items buffer");
}
- if (buff.length != 0)
- {
- InputBufferMessage buff_msg;
- buff_msg.buffer = buff;
- this->post_downstream(i, buff_msg);
- this->output_queues.pop(i);
- }
+ //force pop so next work() gets a new buffer
+ this->flush_output(i, true);
}
void BlockActor::handle_task(void)
@@ -168,8 +163,8 @@ void BlockActor::handle_task(void)
{
ASSERT(this->output_queues.ready(i));
SBuffer &buff = this->output_queues.front(i);
- void *mem = buff.get(buff.length);
- const size_t bytes = buff.get_actual_length() - buff.length - buff.offset;
+ void *mem = buff.get();
+ const size_t bytes = buff.get_actual_length() - buff.offset;
size_t items = bytes/this->output_items_sizes[i];
this->output_items[i].get() = mem;
@@ -241,11 +236,25 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
this->post_downstream(i, buff_msg);
}
-GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i)
+GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i, const bool force_pop)
{
if (not this->output_queues.ready(i) or this->output_queues.front(i).length == 0) return;
+ SBuffer &buff = this->output_queues.front(i);
InputBufferMessage buff_msg;
- buff_msg.buffer = this->output_queues.front(i);
+ buff_msg.buffer = buff;
this->post_downstream(i, buff_msg);
- this->output_queues.pop(i);
+
+ //increment buffer for next use
+ buff.offset += buff.length;
+ buff.length = 0;
+
+ //when to pop the buffer and give next work a new one
+ const size_t reserve_bytes = this->output_configs[i].reserve_items/this->output_items_sizes[i];
+ if (
+ force_pop or (buff.offset*2 > buff.get_actual_length()) or
+ (buff.get_actual_length() - buff.offset) < reserve_bytes
+ )
+ {
+ this->output_queues.pop(i);
+ }
}