diff options
m--------- | gnuradio | 0 | ||||
-rw-r--r-- | include/gras/block.hpp | 28 | ||||
-rw-r--r-- | lib/block.cpp | 10 | ||||
-rw-r--r-- | lib/block_task.cpp | 9 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 2 |
5 files changed, 49 insertions, 0 deletions
diff --git a/gnuradio b/gnuradio -Subproject e065aad8918d972377807c8296f4dd060f102f6 +Subproject f70a803adc1cbea01838fa45517ea7cde2e5c24 diff --git a/include/gras/block.hpp b/include/gras/block.hpp index bde70b1..65649b2 100644 --- a/include/gras/block.hpp +++ b/include/gras/block.hpp @@ -226,6 +226,34 @@ struct GRAS_API Block : Element void mark_done(void); /*! + * Get access to the underlying reference counted buffer. + * This is the same buffer pointed to by input_items[which]. + * This function must be called during the call to work(). + * Use this function to implement passive work-flows. + * + * \param which_input the input port index + * \return a const reference to the buffer + */ + const SBuffer &get_input_buffer(const size_t which_input); + + /*! + * Post the given output buffer to the downstream. + * This function must be called during the call to work(). + * Use this function to implement passive work-flows. + * + * Take the following rules into account: + * - The buffer will be immediately sent to the downstream. + * - The value for get_produced will automatically increase. + * - buffer.length should be in number of bytes (not items). + * - Do not call produce() for items in this buffer. + * - Call post_output_tag() before post_output_buffer(). + * + * \param which_output the output port index + * \param buffer the buffer to send downstream + */ + void post_output_buffer(const size_t which_output, const SBuffer &buffer); + + /*! * Overload notify_topology to get called on topological changes. * Use notify_topology to perform one-time resizing operations * to avoid a conditional resizing operation inside the work(). diff --git a/lib/block.cpp b/lib/block.cpp index ee7142c..039daf3 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -167,3 +167,13 @@ void Block::mark_done(void) { (*this)->block->mark_done(); } + +const SBuffer &Block::get_input_buffer(const size_t which_input) +{ + return (*this)->block->input_queues.front(which_input); +} + +void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer) +{ + (*this)->block->produce_buffer(which_output, buffer); +} diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 5e1db89..d3e12f6 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -233,3 +233,12 @@ void BlockActor::produce(const size_t i, const size_t items) const size_t bytes = items*this->output_items_sizes[i]; buff.length += bytes; } + +void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) +{ + const size_t items = buffer.length/output_items_sizes[i]; + this->items_produced[i] += items; + InputBufferMessage buff_msg; + buff_msg.buffer = buffer; + this->post_downstream(i, buff_msg); +} diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 42a05a7..6e2903b 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -92,6 +92,8 @@ struct BlockActor : Apology::Worker void trim_tags(const size_t index); void produce(const size_t index, const size_t items); void consume(const size_t index, const size_t items); + void produce_buffer(const size_t index, const SBuffer &buffer); + GRAS_FORCE_INLINE bool any_inputs_done(void) { if (this->inputs_done.none() or this->input_queues.all_ready()) return false; |