diff options
m--------- | PMC | 0 | ||||
m--------- | grextras | 0 | ||||
-rw-r--r-- | lib/block_produce.cpp | 2 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 21 | ||||
-rw-r--r-- | lib/gras_impl/output_buffer_queues.hpp | 36 | ||||
-rw-r--r-- | lib/task_main.cpp | 21 | ||||
-rw-r--r-- | python/gras/GRAS_Tags.i | 27 |
7 files changed, 55 insertions, 52 deletions
diff --git a/PMC b/PMC -Subproject 963f85f9a8e050a6999b9bf4ed600393c5e4201 +Subproject 09f9ae45cf42f32a2a00e4cc39c9856aa3128bc diff --git a/grextras b/grextras -Subproject 515ca8d4160ec19daa4668e233cef0da597aaca +Subproject baad45ff699786f1416f09c6beaad4eb99f0745 diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp index 9fbc468..584d3a3 100644 --- a/lib/block_produce.cpp +++ b/lib/block_produce.cpp @@ -60,7 +60,7 @@ GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items) GRAS_FORCE_INLINE void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) { - this->flush_output(i); + this->output_queues.consume(i); ASSERT((buffer.length % output_configs[i].item_size) == 0); const size_t items = buffer.length/output_configs[i].item_size; this->stats.items_produced[i] += items; diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 2799313..5444bb0 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -104,7 +104,6 @@ struct BlockActor : Apology::Worker void produce(const size_t index, const size_t items); void consume(const size_t index, const size_t items); void produce_buffer(const size_t index, const SBuffer &buffer); - void flush_output(const size_t index); void task_kicker(void); void update_input_avail(const size_t index); bool is_input_done(const size_t index); @@ -168,26 +167,6 @@ struct BlockActor : Apology::Worker //-------------- common functions from this BlockActor class ---------// -GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i) -{ - if GRAS_UNLIKELY(this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return; - SBuffer &buff = this->output_queues.front(i); - if GRAS_LIKELY(this->produce_outputs[i]) - { - this->produce_outputs[i] = false; - InputBufferMessage buff_msg; - buff_msg.buffer = buff; - this->post_downstream(i, buff_msg); - } - - //increment buffer for next use - buff.offset += buff.length; - buff.length = 0; - - //release whatever has been used of the output buffer - this->output_queues.pop(i); -} - GRAS_FORCE_INLINE void BlockActor::task_kicker(void) { if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress()); diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp index deea0e1..f0859a3 100644 --- a/lib/gras_impl/output_buffer_queues.hpp +++ b/lib/gras_impl/output_buffer_queues.hpp @@ -31,6 +31,7 @@ struct OutputBufferQueues _bitset.resize(size); _queues.resize(size); _reserve_bytes.resize(size, 1); + _inline_buffer.resize(size); } GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buff) @@ -49,12 +50,38 @@ struct OutputBufferQueues GRAS_FORCE_INLINE SBuffer &front(const size_t i) { + if GRAS_UNLIKELY(_inline_buffer[i]) return _inline_buffer[i]; ASSERT(not this->empty(i)); return _queues[i]->front(); } + GRAS_FORCE_INLINE void consume(const size_t i) + { + if GRAS_UNLIKELY(_inline_buffer[i]) + { + _inline_buffer[i].reset(); + return; + } + + ASSERT(not this->empty(i)); + SBuffer &buff = this->front(i); + if GRAS_UNLIKELY(buff.length == 0) return; + + //increment buffer for next use + buff.offset += buff.length; + buff.length = 0; + + this->pop(i); + } + GRAS_FORCE_INLINE void pop(const size_t i) { + if GRAS_UNLIKELY(_inline_buffer[i]) + { + _inline_buffer[i].reset(); + return; + } + ASSERT(_queues[i]); ASSERT(not _queues[i]->empty()); _queues[i]->pop(); @@ -73,6 +100,7 @@ struct OutputBufferQueues GRAS_FORCE_INLINE bool empty(const size_t i) const { + if GRAS_UNLIKELY(_inline_buffer[i]) return false; return (not _queues[i] or _queues[i]->empty()); } @@ -98,9 +126,17 @@ struct OutputBufferQueues _bitset.set(i, avail >= _reserve_bytes[i]); } + GRAS_FORCE_INLINE void set_inline(const size_t i, const SBuffer &inline_buffer) + { + _inline_buffer[i] = inline_buffer; + _inline_buffer[i].length = 0; + _bitset.set(i); + } + BitSet _bitset; std::vector<BufferQueueSptr> _queues; std::vector<size_t> _reserve_bytes; + std::vector<SBuffer> _inline_buffer; }; diff --git a/lib/task_main.cpp b/lib/task_main.cpp index 9ab786a..01e38ce 100644 --- a/lib/task_main.cpp +++ b/lib/task_main.cpp @@ -49,12 +49,7 @@ void BlockActor::task_main(void) output_inline_index < num_outputs and buff.get_affinity() == this->buffer_affinity ){ - //copy buffer reference but push with zero length, same offset - SBuffer new_obuff = buff; - new_obuff.length = 0; - this->flush_output(output_inline_index); - this->output_queues.push(output_inline_index, new_obuff); //you got inlined! - output_inline_index++; //done do this output port again + this->output_queues.set_inline(output_inline_index++, buff); } //*/ } @@ -103,7 +98,19 @@ void BlockActor::task_main(void) //------------------------------------------------------------------ for (size_t i = 0; i < num_outputs; i++) { - this->flush_output(i); + //buffer may be popped by one of the special buffer api hooks + if GRAS_UNLIKELY(this->output_queues.empty(i)) continue; + + //grab a copy of the front buffer then consume from the queue + InputBufferMessage buff_msg; + buff_msg.buffer = this->output_queues.front(i); + this->output_queues.consume(i); + + //Post a buffer message downstream only if the produce flag was marked. + //So this explicitly after consuming the output queues so pop is called. + //This is because pop may have special hooks in it to prepare the buffer. + if GRAS_LIKELY(this->produce_outputs[i]) this->post_downstream(i, buff_msg); + this->produce_outputs[i] = false; } //------------------------------------------------------------------ diff --git a/python/gras/GRAS_Tags.i b/python/gras/GRAS_Tags.i index c2c640e..84cbbe3 100644 --- a/python/gras/GRAS_Tags.i +++ b/python/gras/GRAS_Tags.i @@ -10,27 +10,8 @@ DECL_PMC_SWIG_TYPE(gras::StreamTag, swig_stream_tag) DECL_PMC_SWIG_TYPE(gras::PacketMsg, swig_packet_msg) %pythoncode %{ - -from GRAS_SBuffer import SBuffer - -RegisterPy2PMC( - is_py = lambda x: isinstance(x, StreamTag), - py2pmc = swig_stream_tag_to_pmc, -) - -RegisterPMC2Py( - is_pmc = pmc_is_swig_stream_tag, - pmc2py = pmc_to_swig_stream_tag, -) - -RegisterPy2PMC( - is_py = lambda x: isinstance(x, PacketMsg), - py2pmc = swig_packet_msg_to_pmc, -) - -RegisterPMC2Py( - is_pmc = pmc_is_swig_packet_msg, - pmc2py = pmc_to_swig_packet_msg, -) - +from GRAS_Tags import StreamTag, PacketMsg %} + +REG_PMC_SWIG_TYPE(swig_stream_tag, StreamTag) +REG_PMC_SWIG_TYPE(swig_packet_msg, PacketMsg) |