summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/gras/block.hpp46
-rw-r--r--lib/block_message.cpp25
2 files changed, 65 insertions, 6 deletions
diff --git a/include/gras/block.hpp b/include/gras/block.hpp
index 5f51acf..85a6ecc 100644
--- a/include/gras/block.hpp
+++ b/include/gras/block.hpp
@@ -202,15 +202,22 @@ struct GRAS_API Block : Element
*/
virtual void propagate_tags(const size_t which_input, const TagIter &iter);
+ /*!
+ * Send a tag to the given input port on this block.
+ * This is a thread-safe way for external scheduler
+ * entities to post tags into the input of a block.
+ * \param which_input an input port on this block
+ * \param tag the tag to post to the input port
+ */
+ void post_input_tag(const size_t which_input, const Tag &tag);
/*******************************************************************
* Deal with message passing
******************************************************************/
/*!
- * Post output message convenience routine.
* Send a message to the downstream on the given output port.
- * The underlying implementation is a tag with an offset of 0.
+ * Messages are naturally asynchronous to stream and tag data.
*
* \param which_output the index of the output port
* \param msg the message object to pass downstream
@@ -230,6 +237,15 @@ struct GRAS_API Block : Element
*/
PMCC pop_input_msg(const size_t which_input);
+ /*!
+ * Send a message to the given input port on this block.
+ * This is a thread-safe way for external scheduler
+ * entities to post messages into the input of a block.
+ * \param which_input an input port on this block
+ * \param msg the message to post to the input port
+ */
+ void post_input_msg(const size_t which_input, const PMCC &tag);
+
/*******************************************************************
* The property interface:
* Provides polymorphic, thread-safe access to block properties.
@@ -372,6 +388,10 @@ struct GRAS_API Block : Element
*/
void mark_done(void);
+ /*******************************************************************
+ * Direct buffer access API
+ ******************************************************************/
+
/*!
* Get access to the underlying reference counted input buffer.
* This is the same buffer pointed to by input_items[which].
@@ -423,6 +443,20 @@ struct GRAS_API Block : Element
void post_output_buffer(const size_t which_output, const SBuffer &buffer);
/*!
+ * Post a buffer to the given input port on this block.
+ * This is a thread-safe way for external scheduler
+ * entities to post buffers into the input of a block.
+ *
+ * \param which_input an input port on this block
+ * \param buffer the buffer to post to the input port
+ */
+ void post_input_buffer(const size_t which_input, const SBuffer &buffer);
+
+ /*******************************************************************
+ * Scheduler notification API
+ ******************************************************************/
+
+ /*!
* Overload notify_active to get called when block becomes active.
* This will be called when the TopBlock start/run API call executes.
* The default implementation of notify_active is a NOP.
@@ -443,6 +477,10 @@ struct GRAS_API Block : Element
*/
virtual void notify_topology(const size_t num_inputs, const size_t num_outputs);
+ /*******************************************************************
+ * routines related to affinity and allocation
+ ******************************************************************/
+
/*!
* Set if the work call should be interruptible by stop().
* Some work implementations block with the expectation of
@@ -452,10 +490,6 @@ struct GRAS_API Block : Element
*/
void set_interruptible_work(const bool enb);
- /*******************************************************************
- * routines related to affinity and allocation
- ******************************************************************/
-
/*!
* Set the node affinity of this block.
* This call affects how output buffers are allocated.
diff --git a/lib/block_message.cpp b/lib/block_message.cpp
index 43dc7b4..eae0c68 100644
--- a/lib/block_message.cpp
+++ b/lib/block_message.cpp
@@ -47,3 +47,28 @@ void Block::propagate_tags(const size_t i, const TagIter &iter)
}
}
}
+
+void Block::post_input_tag(const size_t which_input, const Tag &tag)
+{
+ InputTagMessage message(tag);
+ message.index = which_input;
+ Theron::Actor &actor = *((*this)->block);
+ actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
+}
+
+void Block::post_input_msg(const size_t which_input, const PMCC &msg)
+{
+ InputMsgMessage message(msg);
+ message.index = which_input;
+ Theron::Actor &actor = *((*this)->block);
+ actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
+}
+
+void Block::post_input_buffer(const size_t which_input, const SBuffer &buffer)
+{
+ InputBufferMessage message;
+ message.index = which_input;
+ message.buffer = buffer;
+ Theron::Actor &actor = *((*this)->block);
+ actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
+}