summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
m---------gnuradio0
-rw-r--r--include/gras/block.hpp28
-rw-r--r--lib/block.cpp10
-rw-r--r--lib/block_task.cpp9
-rw-r--r--lib/gras_impl/block_actor.hpp2
5 files changed, 49 insertions, 0 deletions
diff --git a/gnuradio b/gnuradio
-Subproject e065aad8918d972377807c8296f4dd060f102f6
+Subproject f70a803adc1cbea01838fa45517ea7cde2e5c24
diff --git a/include/gras/block.hpp b/include/gras/block.hpp
index bde70b1..65649b2 100644
--- a/include/gras/block.hpp
+++ b/include/gras/block.hpp
@@ -226,6 +226,34 @@ struct GRAS_API Block : Element
void mark_done(void);
/*!
+ * Get access to the underlying reference counted buffer.
+ * This is the same buffer pointed to by input_items[which].
+ * This function must be called during the call to work().
+ * Use this function to implement passive work-flows.
+ *
+ * \param which_input the input port index
+ * \return a const reference to the buffer
+ */
+ const SBuffer &get_input_buffer(const size_t which_input);
+
+ /*!
+ * Post the given output buffer to the downstream.
+ * This function must be called during the call to work().
+ * Use this function to implement passive work-flows.
+ *
+ * Take the following rules into account:
+ * - The buffer will be immediately sent to the downstream.
+ * - The value for get_produced will automatically increase.
+ * - buffer.length should be in number of bytes (not items).
+ * - Do not call produce() for items in this buffer.
+ * - Call post_output_tag() before post_output_buffer().
+ *
+ * \param which_output the output port index
+ * \param buffer the buffer to send downstream
+ */
+ void post_output_buffer(const size_t which_output, const SBuffer &buffer);
+
+ /*!
* Overload notify_topology to get called on topological changes.
* Use notify_topology to perform one-time resizing operations
* to avoid a conditional resizing operation inside the work().
diff --git a/lib/block.cpp b/lib/block.cpp
index ee7142c..039daf3 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -167,3 +167,13 @@ void Block::mark_done(void)
{
(*this)->block->mark_done();
}
+
+const SBuffer &Block::get_input_buffer(const size_t which_input)
+{
+ return (*this)->block->input_queues.front(which_input);
+}
+
+void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer)
+{
+ (*this)->block->produce_buffer(which_output, buffer);
+}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 5e1db89..d3e12f6 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -233,3 +233,12 @@ void BlockActor::produce(const size_t i, const size_t items)
const size_t bytes = items*this->output_items_sizes[i];
buff.length += bytes;
}
+
+void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
+{
+ const size_t items = buffer.length/output_items_sizes[i];
+ this->items_produced[i] += items;
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = buffer;
+ this->post_downstream(i, buff_msg);
+}
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 42a05a7..6e2903b 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -92,6 +92,8 @@ struct BlockActor : Apology::Worker
void trim_tags(const size_t index);
void produce(const size_t index, const size_t items);
void consume(const size_t index, const size_t items);
+ void produce_buffer(const size_t index, const SBuffer &buffer);
+
GRAS_FORCE_INLINE bool any_inputs_done(void)
{
if (this->inputs_done.none() or this->input_queues.all_ready()) return false;