summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2012-11-21 21:45:38 -0800
committerJosh Blum2012-11-21 21:45:38 -0800
commit41d589fc1f18569ec2f835073cf1482c02a66972 (patch)
treed3f87443620e9cc7264b3d7e21bbdadb7be39f7c
parent8cf5baeac93a2c8e908126055ba0c96f2bfd2510 (diff)
downloadsandhi-41d589fc1f18569ec2f835073cf1482c02a66972.tar.gz
sandhi-41d589fc1f18569ec2f835073cf1482c02a66972.tar.bz2
sandhi-41d589fc1f18569ec2f835073cf1482c02a66972.zip
addressed some bugs from the stitching commit
-rw-r--r--lib/block_task.cpp33
-rw-r--r--lib/gras_impl/block_actor.hpp2
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp2
3 files changed, 23 insertions, 14 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 6f70bc2..192ea40 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -92,13 +92,8 @@ void BlockActor::output_fail(const size_t i)
throw std::runtime_error("output_fail called on maximum_items buffer");
}
- if (buff.length != 0)
- {
- InputBufferMessage buff_msg;
- buff_msg.buffer = buff;
- this->post_downstream(i, buff_msg);
- this->output_queues.pop(i);
- }
+ //force pop so next work() gets a new buffer
+ this->flush_output(i, true);
}
void BlockActor::handle_task(void)
@@ -168,8 +163,8 @@ void BlockActor::handle_task(void)
{
ASSERT(this->output_queues.ready(i));
SBuffer &buff = this->output_queues.front(i);
- void *mem = buff.get(buff.length);
- const size_t bytes = buff.get_actual_length() - buff.length - buff.offset;
+ void *mem = buff.get();
+ const size_t bytes = buff.get_actual_length() - buff.offset;
size_t items = bytes/this->output_items_sizes[i];
this->output_items[i].get() = mem;
@@ -241,11 +236,25 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
this->post_downstream(i, buff_msg);
}
-GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i)
+GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i, const bool force_pop)
{
if (not this->output_queues.ready(i) or this->output_queues.front(i).length == 0) return;
+ SBuffer &buff = this->output_queues.front(i);
InputBufferMessage buff_msg;
- buff_msg.buffer = this->output_queues.front(i);
+ buff_msg.buffer = buff;
this->post_downstream(i, buff_msg);
- this->output_queues.pop(i);
+
+ //increment buffer for next use
+ buff.offset += buff.length;
+ buff.length = 0;
+
+ //when to pop the buffer and give next work a new one
+ const size_t reserve_bytes = this->output_configs[i].reserve_items/this->output_items_sizes[i];
+ if (
+ force_pop or (buff.offset*2 > buff.get_actual_length()) or
+ (buff.get_actual_length() - buff.offset) < reserve_bytes
+ )
+ {
+ this->output_queues.pop(i);
+ }
}
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 27f8400..932b6b5 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -93,7 +93,7 @@ struct BlockActor : Apology::Worker
void produce(const size_t index, const size_t items);
void consume(const size_t index, const size_t items);
void produce_buffer(const size_t index, const SBuffer &buffer);
- void flush_output(const size_t index);
+ void flush_output(const size_t index, const bool force_pop = false);
GRAS_FORCE_INLINE bool is_input_done(const size_t i)
{
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 1bff51f..ff88240 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -97,7 +97,7 @@ struct InputBufferQueues
_queues[i].push_back(buffer);
}
- _enqueued_bytes[i] += _queues[i].back().length;
+ _enqueued_bytes[i] += buffer.length;
__update(i);
}