summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/block_produce.cpp16
-rw-r--r--lib/gras_impl/output_buffer_queues.hpp1
2 files changed, 7 insertions, 10 deletions
diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp
index 8ffd9bf..4dde43e 100644
--- a/lib/block_produce.cpp
+++ b/lib/block_produce.cpp
@@ -40,17 +40,13 @@ void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes)
(*this)->block_data->output_queues.front(which_output).length = num_bytes;
}
-void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer)
+void Block::post_output_buffer(const size_t i, const SBuffer &buffer)
{
- boost::shared_ptr<BlockData> &data = (*this)->block_data;
- data->output_queues.consume(which_output);
- ASSERT((buffer.length % data->output_configs[which_output].item_size) == 0);
- const size_t items = buffer.length/data->output_configs[which_output].item_size;
- data->stats.items_produced[which_output] += items;
- InputBufferMessage buff_msg;
- buff_msg.buffer = buffer;
- (*this)->worker->post_downstream(which_output, buff_msg);
- data->num_output_items_read[which_output] += items;
+ (*this)->block_data->output_queues.set_inline(i, buffer);
+ const size_t item_size = (*this)->block_data->output_configs[i].item_size;
+ ASSERT((buffer.length % item_size) == 0);
+ const size_t items = buffer.length/item_size;
+ this->produce(i, items);
}
GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items)
diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp
index 540a5c2..a5895b0 100644
--- a/lib/gras_impl/output_buffer_queues.hpp
+++ b/lib/gras_impl/output_buffer_queues.hpp
@@ -142,6 +142,7 @@ struct OutputBufferQueues
GRAS_FORCE_INLINE void set_inline(const size_t i, const SBuffer &inline_buffer)
{
+ ASSERT(not _inline_buffer[i]);
_inline_buffer[i] = inline_buffer;
_inline_buffer[i].length = 0;
_bitset.set(i);