summaryrefslogtreecommitdiff
path: root/lib/block_produce.cpp
diff options
context:
space:
mode:
authorJosh Blum2013-06-22 15:11:22 -0700
committerJosh Blum2013-06-22 15:11:22 -0700
commit407a198a07dc1a333bda8b5c2e731ce0ee793ebf (patch)
treef7b40f1d683a9411f86dd7267388103601980a66 /lib/block_produce.cpp
parent78139a05aa2f1516688f29359538fbc09b8c3e2e (diff)
downloadsandhi-407a198a07dc1a333bda8b5c2e731ce0ee793ebf.tar.gz
sandhi-407a198a07dc1a333bda8b5c2e731ce0ee793ebf.tar.bz2
sandhi-407a198a07dc1a333bda8b5c2e731ce0ee793ebf.zip
gras: use inline queue for post output buffer
* simplifies logic * fixes bug #103 An assertion fires if multiple post output buffers are used per work(). This could be fixed in the future if its truely needed.
Diffstat (limited to 'lib/block_produce.cpp')
-rw-r--r--lib/block_produce.cpp16
1 files changed, 6 insertions, 10 deletions
diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp
index 8ffd9bf..4dde43e 100644
--- a/lib/block_produce.cpp
+++ b/lib/block_produce.cpp
@@ -40,17 +40,13 @@ void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes)
(*this)->block_data->output_queues.front(which_output).length = num_bytes;
}
-void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer)
+void Block::post_output_buffer(const size_t i, const SBuffer &buffer)
{
- boost::shared_ptr<BlockData> &data = (*this)->block_data;
- data->output_queues.consume(which_output);
- ASSERT((buffer.length % data->output_configs[which_output].item_size) == 0);
- const size_t items = buffer.length/data->output_configs[which_output].item_size;
- data->stats.items_produced[which_output] += items;
- InputBufferMessage buff_msg;
- buff_msg.buffer = buffer;
- (*this)->worker->post_downstream(which_output, buff_msg);
- data->num_output_items_read[which_output] += items;
+ (*this)->block_data->output_queues.set_inline(i, buffer);
+ const size_t item_size = (*this)->block_data->output_configs[i].item_size;
+ ASSERT((buffer.length % item_size) == 0);
+ const size_t items = buffer.length/item_size;
+ this->produce(i, items);
}
GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items)