From 094a5945da96ce202eacb665cf401ab96c6c1d95 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Wed, 5 Dec 2012 02:15:06 -0800 Subject: tag is just an offset and object, created stream tag and pkt message --- include/gras/block.hpp | 49 ++++++++++++++++++-------------- include/gras/block.i | 2 +- include/gras/tags.hpp | 76 +++++++++++++++++++++++++++++++++++++++++++++----- include/gras/tags.i | 9 +----- 4 files changed, 99 insertions(+), 37 deletions(-) (limited to 'include') diff --git a/include/gras/block.hpp b/include/gras/block.hpp index 71da047..acdcda1 100644 --- a/include/gras/block.hpp +++ b/include/gras/block.hpp @@ -154,27 +154,6 @@ struct GRAS_API Block : Element //! Get an iterator of item tags for the given input TagIter get_input_tags(const size_t which_input); - /*! - * Erase all tags on the given input port. - * This method may be called from the work() context - * to erase all of the queued up tags on the input. - * Once erased, messages cannot be propagated downstream. - * This method allows a user to treat an input port - * as an async message source without a data stream. - * In this case, after processing messages from get_input_tags(), - * the user should call erase_input_tags() before retuning from work(). - */ - void erase_input_tags(const size_t which_input); - - /*! - * Pop input message convenience routine. - * This routine reads the first input tag, - * and erases this tag from the given port. - * The intention is to simplify the use case - * for using this for port messages only. - */ - Tag pop_input_msg(const size_t which_input); - /*! * Overload me to implement custom tag propagation logic: * @@ -191,6 +170,34 @@ struct GRAS_API Block : Element */ virtual void propagate_tags(const size_t which_input, const TagIter &iter); + + /******************************************************************* + * Deal with message passing + ******************************************************************/ + + /*! + * Post output message convenience routine. + * Send a message to the downstream on the given output port. + * The underlying implementation is a tag with an offset of 0. + * + * \param which_output the index of the output port + * \param msg the message object to pass downstream + */ + void post_output_msg(const size_t which_output, const PMCC &msg); + + /*! + * Pop input message convenience routine. + * This routine reads the first input message, + * and erases this message from the given port. + * The intention is to simplify the use case + * for using this for port messages only. + * If no message, the return value is null. + * + * \param which_input the index of the input port + * \return the message on the front of the queue + */ + PMCC pop_input_msg(const size_t which_input); + /******************************************************************* * Work related routines and fail states ******************************************************************/ diff --git a/include/gras/block.i b/include/gras/block.i index 94ea772..53458eb 100644 --- a/include/gras/block.i +++ b/include/gras/block.i @@ -8,7 +8,7 @@ %} %include -%include +%import %import %include diff --git a/include/gras/tags.hpp b/include/gras/tags.hpp index 41a3e88..58eb058 100644 --- a/include/gras/tags.hpp +++ b/include/gras/tags.hpp @@ -4,37 +4,93 @@ #define INCLUDED_GRAS_TAGS_HPP #include +#include #include #include namespace gras { +/*! + * A Tag is a combination of absolute item count and associated object. + * + * When used with an item stream, a tag is used to decorate a stream item. + * The offset associates the object with the absolute position of an item. + * + * When used for message passing, only the object is used. + * The offset should always be set to zero. + */ struct GRAS_API Tag : boost::less_than_comparable { //! Make a tag from parameters to initialize the members Tag ( const item_index_t &offset = 0, - const PMCC &key = PMCC(), - const PMCC &value = PMCC(), - const PMCC &srcid = PMCC() + const PMCC &value = PMCC() ); //! the absolute item count associated with this tag item_index_t offset; + /*! + * The object contained in this tag, which could be anything. + * Common types for this value are StreamTag and PacketMsg + */ + PMCC object; +}; + +GRAS_API bool operator<(const Tag &lhs, const Tag &rhs); + +/*! + * A stream tag is a commonly used structure + * used to decorate a stream item with metadata. + */ +struct GRAS_API StreamTag +{ + //! Initialization constructor for stream tag + StreamTag + ( + const PMCC &key = PMCC(), + const PMCC &val = PMCC(), + const PMCC &src = PMCC() + ); + //! A symbolic name identifying the type of tag PMCC key; //! The value of this tag -> the sample metadata - PMCC value; + PMCC val; //! The optional source ID -> something unique - PMCC srcid; + PMCC src; }; -GRAS_API bool operator<(const Tag &lhs, const Tag &rhs); +GRAS_API bool operator==(const StreamTag &lhs, const StreamTag &rhs); + +/*! + * A packet message is a commonly used structure + * used to pass packet-based data between blocks. + */ +struct GRAS_API PacketMsg +{ + //! Initialization constructor for packet message + PacketMsg + ( + const PMCC &info = PMCC(), + const SBuffer &buff = SBuffer() + ); + + //! Convenience contructor to initialize from buffer + PacketMsg(const SBuffer &buff); + + //! Metadata info associated with this packet + PMCC info; + + //! The raw bytes of the packet - ref counted buffer + SBuffer buff; +}; + +GRAS_API bool operator==(const PacketMsg &lhs, const PacketMsg &rhs); } //namespace gras @@ -44,7 +100,13 @@ GRAS_API bool operator<(const Tag &lhs, const Tag &rhs); namespace gras { //! Iterator return type stl and boost compliant - typedef boost::iterator_range::const_iterator> TagIter; + //typedef boost::iterator_range::const_iterator> TagIter; + struct TagIter : boost::iterator_range::const_iterator> + { + TagIter(void){} + template + TagIter(const T &t0, const T &t1): boost::iterator_range::const_iterator>(t0, t1){} + }; } //namespace gras diff --git a/include/gras/tags.i b/include/gras/tags.i index 79a750b..18b3932 100644 --- a/include/gras/tags.i +++ b/include/gras/tags.i @@ -9,19 +9,12 @@ %include %include +%import %import //////////////////////////////////////////////////////////////////////// // Turn TagIter into a python iterable object //////////////////////////////////////////////////////////////////////// -namespace gras -{ - struct TagIter - { - //declared empty - }; -} - %extend gras::TagIter { %insert("python") -- cgit