summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2013-06-22 15:11:22 -0700
committerJosh Blum2013-06-22 15:11:22 -0700
commit407a198a07dc1a333bda8b5c2e731ce0ee793ebf (patch)
treef7b40f1d683a9411f86dd7267388103601980a66 /lib
parent78139a05aa2f1516688f29359538fbc09b8c3e2e (diff)
downloadsandhi-407a198a07dc1a333bda8b5c2e731ce0ee793ebf.tar.gz
sandhi-407a198a07dc1a333bda8b5c2e731ce0ee793ebf.tar.bz2
sandhi-407a198a07dc1a333bda8b5c2e731ce0ee793ebf.zip
gras: use inline queue for post output buffer
* simplifies logic * fixes bug #103 An assertion fires if multiple post output buffers are used per work(). This could be fixed in the future if its truely needed.
Diffstat (limited to 'lib')
-rw-r--r--lib/block_produce.cpp16
-rw-r--r--lib/gras_impl/output_buffer_queues.hpp1
2 files changed, 7 insertions, 10 deletions
diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp
index 8ffd9bf..4dde43e 100644
--- a/lib/block_produce.cpp
+++ b/lib/block_produce.cpp
@@ -40,17 +40,13 @@ void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes)
(*this)->block_data->output_queues.front(which_output).length = num_bytes;
}
-void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer)
+void Block::post_output_buffer(const size_t i, const SBuffer &buffer)
{
- boost::shared_ptr<BlockData> &data = (*this)->block_data;
- data->output_queues.consume(which_output);
- ASSERT((buffer.length % data->output_configs[which_output].item_size) == 0);
- const size_t items = buffer.length/data->output_configs[which_output].item_size;
- data->stats.items_produced[which_output] += items;
- InputBufferMessage buff_msg;
- buff_msg.buffer = buffer;
- (*this)->worker->post_downstream(which_output, buff_msg);
- data->num_output_items_read[which_output] += items;
+ (*this)->block_data->output_queues.set_inline(i, buffer);
+ const size_t item_size = (*this)->block_data->output_configs[i].item_size;
+ ASSERT((buffer.length % item_size) == 0);
+ const size_t items = buffer.length/item_size;
+ this->produce(i, items);
}
GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items)
diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp
index 540a5c2..a5895b0 100644
--- a/lib/gras_impl/output_buffer_queues.hpp
+++ b/lib/gras_impl/output_buffer_queues.hpp
@@ -142,6 +142,7 @@ struct OutputBufferQueues
GRAS_FORCE_INLINE void set_inline(const size_t i, const SBuffer &inline_buffer)
{
+ ASSERT(not _inline_buffer[i]);
_inline_buffer[i] = inline_buffer;
_inline_buffer[i].length = 0;
_bitset.set(i);