diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block_produce.cpp | 16 | ||||
-rw-r--r-- | lib/gras_impl/output_buffer_queues.hpp | 1 |
2 files changed, 7 insertions, 10 deletions
diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp index 8ffd9bf..4dde43e 100644 --- a/lib/block_produce.cpp +++ b/lib/block_produce.cpp @@ -40,17 +40,13 @@ void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes) (*this)->block_data->output_queues.front(which_output).length = num_bytes; } -void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer) +void Block::post_output_buffer(const size_t i, const SBuffer &buffer) { - boost::shared_ptr<BlockData> &data = (*this)->block_data; - data->output_queues.consume(which_output); - ASSERT((buffer.length % data->output_configs[which_output].item_size) == 0); - const size_t items = buffer.length/data->output_configs[which_output].item_size; - data->stats.items_produced[which_output] += items; - InputBufferMessage buff_msg; - buff_msg.buffer = buffer; - (*this)->worker->post_downstream(which_output, buff_msg); - data->num_output_items_read[which_output] += items; + (*this)->block_data->output_queues.set_inline(i, buffer); + const size_t item_size = (*this)->block_data->output_configs[i].item_size; + ASSERT((buffer.length % item_size) == 0); + const size_t items = buffer.length/item_size; + this->produce(i, items); } GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items) diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp index 540a5c2..a5895b0 100644 --- a/lib/gras_impl/output_buffer_queues.hpp +++ b/lib/gras_impl/output_buffer_queues.hpp @@ -142,6 +142,7 @@ struct OutputBufferQueues GRAS_FORCE_INLINE void set_inline(const size_t i, const SBuffer &inline_buffer) { + ASSERT(not _inline_buffer[i]); _inline_buffer[i] = inline_buffer; _inline_buffer[i].length = 0; _bitset.set(i); |