summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/block_produce.cpp2
-rw-r--r--lib/gras_impl/block_actor.hpp21
-rw-r--r--lib/gras_impl/output_buffer_queues.hpp13
-rw-r--r--lib/task_main.cpp16
4 files changed, 28 insertions, 24 deletions
diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp
index 9fbc468..584d3a3 100644
--- a/lib/block_produce.cpp
+++ b/lib/block_produce.cpp
@@ -60,7 +60,7 @@ GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items)
GRAS_FORCE_INLINE void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
{
- this->flush_output(i);
+ this->output_queues.consume(i);
ASSERT((buffer.length % output_configs[i].item_size) == 0);
const size_t items = buffer.length/output_configs[i].item_size;
this->stats.items_produced[i] += items;
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 2799313..5444bb0 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -104,7 +104,6 @@ struct BlockActor : Apology::Worker
void produce(const size_t index, const size_t items);
void consume(const size_t index, const size_t items);
void produce_buffer(const size_t index, const SBuffer &buffer);
- void flush_output(const size_t index);
void task_kicker(void);
void update_input_avail(const size_t index);
bool is_input_done(const size_t index);
@@ -168,26 +167,6 @@ struct BlockActor : Apology::Worker
//-------------- common functions from this BlockActor class ---------//
-GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i)
-{
- if GRAS_UNLIKELY(this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return;
- SBuffer &buff = this->output_queues.front(i);
- if GRAS_LIKELY(this->produce_outputs[i])
- {
- this->produce_outputs[i] = false;
- InputBufferMessage buff_msg;
- buff_msg.buffer = buff;
- this->post_downstream(i, buff_msg);
- }
-
- //increment buffer for next use
- buff.offset += buff.length;
- buff.length = 0;
-
- //release whatever has been used of the output buffer
- this->output_queues.pop(i);
-}
-
GRAS_FORCE_INLINE void BlockActor::task_kicker(void)
{
if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress());
diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp
index deea0e1..251de96 100644
--- a/lib/gras_impl/output_buffer_queues.hpp
+++ b/lib/gras_impl/output_buffer_queues.hpp
@@ -53,6 +53,19 @@ struct OutputBufferQueues
return _queues[i]->front();
}
+ GRAS_FORCE_INLINE void consume(const size_t i)
+ {
+ ASSERT(not this->empty(i));
+ SBuffer &buff = this->front(i);
+ if GRAS_UNLIKELY(buff.length == 0) return;
+
+ //increment buffer for next use
+ buff.offset += buff.length;
+ buff.length = 0;
+
+ this->pop(i);
+ }
+
GRAS_FORCE_INLINE void pop(const size_t i)
{
ASSERT(_queues[i]);
diff --git a/lib/task_main.cpp b/lib/task_main.cpp
index 9ab786a..6c3c23d 100644
--- a/lib/task_main.cpp
+++ b/lib/task_main.cpp
@@ -52,7 +52,7 @@ void BlockActor::task_main(void)
//copy buffer reference but push with zero length, same offset
SBuffer new_obuff = buff;
new_obuff.length = 0;
- this->flush_output(output_inline_index);
+ this->output_queues.consume(output_inline_index);
this->output_queues.push(output_inline_index, new_obuff); //you got inlined!
output_inline_index++; //done do this output port again
}
@@ -103,7 +103,19 @@ void BlockActor::task_main(void)
//------------------------------------------------------------------
for (size_t i = 0; i < num_outputs; i++)
{
- this->flush_output(i);
+ //buffer may be popped by one of the special buffer api hooks
+ if GRAS_UNLIKELY(this->output_queues.empty(i)) continue;
+
+ //grab a copy of the front buffer then consume from the queue
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = this->output_queues.front(i);
+ this->output_queues.consume(i);
+
+ //Post a buffer message downstream only if the produce flag was marked.
+ //So this explicitly after consuming the output queues so pop is called.
+ //This is because pop may have special hooks in it to prepare the buffer.
+ if GRAS_LIKELY(this->produce_outputs[i]) this->post_downstream(i, buff_msg);
+ this->produce_outputs[i] = false;
}
//------------------------------------------------------------------