diff options
Diffstat (limited to 'lib/block_produce.cpp')
-rw-r--r-- | lib/block_produce.cpp | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp new file mode 100644 index 0000000..9fbc468 --- /dev/null +++ b/lib/block_produce.cpp @@ -0,0 +1,70 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include "element_impl.hpp" +#include <gras_impl/block_actor.hpp> +#include <gras/block.hpp> + +using namespace gras; + +void Block::produce(const size_t which_output, const size_t num_items) +{ + ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative + (*this)->block->produce(which_output, num_items); +} + +void Block::produce(const size_t num_items) +{ + const size_t num_outputs = (*this)->block->get_num_outputs(); + for (size_t o = 0; o < num_outputs; o++) + { + (*this)->block->produce(o, num_items); + } +} + +item_index_t Block::get_produced(const size_t which_output) +{ + return (*this)->block->stats.items_produced[which_output]; +} + +SBuffer Block::get_output_buffer(const size_t which_output) const +{ + SBuffer &buff = (*this)->block->output_queues.front(which_output); + //increment length to auto pop full buffer size, + //when user doesnt call pop_output_buffer() + buff.length = buff.get_actual_length(); + return buff; +} + +void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes) +{ + (*this)->block->output_queues.front(which_output).length = num_bytes; +} + +void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer) +{ + (*this)->block->produce_buffer(which_output, buffer); +} + +GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items) +{ + #ifdef ITEM_CONSPROD + std::cerr << name << " produce " << items << std::endl; + #endif + SBuffer &buff = this->output_queues.front(i); + ASSERT((buff.length % output_configs[i].item_size) == 0); + this->stats.items_produced[i] += items; + const size_t bytes = items*this->output_configs[i].item_size; + buff.length += bytes; + this->produce_outputs[i] = true; +} + +GRAS_FORCE_INLINE void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) +{ + this->flush_output(i); + ASSERT((buffer.length % output_configs[i].item_size) == 0); + const size_t items = buffer.length/output_configs[i].item_size; + this->stats.items_produced[i] += items; + InputBufferMessage buff_msg; + buff_msg.buffer = buffer; + this->post_downstream(i, buff_msg); +} |