diff options
Diffstat (limited to 'include/gras')
-rw-r--r-- | include/gras/block.hpp | 320 | ||||
-rw-r--r-- | include/gras/element.hpp | 76 | ||||
-rw-r--r-- | include/gras/gras.hpp | 69 | ||||
-rw-r--r-- | include/gras/hier_block.hpp | 75 | ||||
-rw-r--r-- | include/gras/io_signature.hpp | 113 | ||||
-rw-r--r-- | include/gras/sbuffer.hpp | 142 | ||||
-rw-r--r-- | include/gras/sbuffer.ipp | 98 | ||||
-rw-r--r-- | include/gras/tags.hpp | 53 | ||||
-rw-r--r-- | include/gras/thread_pool.hpp | 92 | ||||
-rw-r--r-- | include/gras/top_block.hpp | 98 |
10 files changed, 1136 insertions, 0 deletions
diff --git a/include/gras/block.hpp b/include/gras/block.hpp new file mode 100644 index 0000000..983d54a --- /dev/null +++ b/include/gras/block.hpp @@ -0,0 +1,320 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_GRAS_BLOCK_HPP +#define INCLUDED_GRAS_BLOCK_HPP + +#include <gras/element.hpp> +#include <gras/sbuffer.hpp> +#include <gras/tags.hpp> +#include <vector> +#include <string> +#include <boost/range.hpp> //iterator range + +namespace gras +{ + +//! Configuration parameters for an input port +struct GRAS_API InputPortConfig +{ + InputPortConfig(void); + + /*! + * Set buffer inlining for this port config. + * Inlining means that the input buffer can be used as an output buffer. + * The goal is to make better use of cache and memory bandwidth. + * + * By default, inlining is disabled on all input ports. + * The user should enable inlining on an input port + * when it is understood that the work function will read + * before writting to a particular section of the buffer. + * + * The scheduler will inline a buffer when + * * inlining is enabled on the particular input port + * * block holds the only buffer reference aka unique + * * the input buffer has the same affinity as the block + * * the input port has a buffer look-ahead of 0 + * + * Default = false. + */ + bool inline_buffer; + + /*! + * Set the number of input buffer look-ahead items. + * When num look-ahead items are not consumed, + * they will be available for the next work call. + * This is used to implement sample memory for + * things like sliding dot products/FIR filters. + * + * Default = 0. + */ + size_t lookahead_items; +}; + +//! Configuration parameters for an output port +struct GRAS_API OutputPortConfig +{ + OutputPortConfig(void); + + /*! + * Set an output reserve requirement such that work is called + * with an output buffer at least reserve items in size. + * + * Default = 1. + */ + size_t reserve_items; + + /*! + * Constrain the maximum number of items that + * work can be called with for this port. + * + * Default = 0 aka disabled. + */ + size_t maximum_items; +}; + +template <typename PtrType> struct WorkBuffer +{ + //! get a native pointer type to this buffer + inline PtrType get(void) const + { + return _mem; + } + + //! get a pointer of the desired type to this buffer + template <typename T> inline T cast(void) const + { + return reinterpret_cast<T>(this->get()); + } + + //! get the number of items in this buffer + inline size_t size(void) const + { + return _len; + } + + //! Get the memory pointer reference + inline PtrType &get(void) + { + return _mem; + } + + //! Get the buffer length reference + inline size_t &size(void) + { + return _len; + } + + PtrType _mem; + size_t _len; +}; + +struct GRAS_API Block : Element +{ + + //! Contruct an empty/null block + Block(void); + + //! Create a new block given the name + Block(const std::string &name); + + /******************************************************************* + * Deal with input and output port configuration + ******************************************************************/ + + //! Get the configuration rules of an input port + InputPortConfig input_config(const size_t which_input = 0) const; + + //! Set the configuration rules for an input port + void set_input_config(const InputPortConfig &config, const size_t which_input = 0); + + //! Get the configuration rules of an output port + OutputPortConfig output_config(const size_t which_output = 0) const; + + //! Set the configuration rules for an output port + void set_output_config(const OutputPortConfig &config, const size_t which_output = 0); + + /*! + * Enable fixed rate logic. + * When enabled, relative rate is assumed to be set, + * and forecast is automatically called. + * Also, consume will be called automatically. + */ + void set_fixed_rate(const bool fixed_rate); + + //! Get the fixed rate setting + bool fixed_rate(void) const; + + /*! + * The relative rate can be thought of as interpolation/decimation. + * In other words, relative rate is the ratio of output items to input items. + */ + void set_relative_rate(const double relative_rate); + + //! Get the relative rate setting + double relative_rate(void) const; + + /*! + * The output multiple setting controls work output buffer sizes. + * Buffers will be number of items modulo rounted to the multiple. + */ + void set_output_multiple(const size_t multiple); + + //! Get the output multiple setting + size_t output_multiple(void) const; + + /******************************************************************* + * Deal with data production and consumption + ******************************************************************/ + + //! Return options for the work call + enum + { + WORK_CALLED_PRODUCE = -2, + WORK_DONE = -1 + }; + + //! Call during work to consume items + void consume(const size_t which_input, const size_t how_many_items); + + //! Call during work to consume items + void consume_each(const size_t how_many_items); + + //! Call during work to produce items, must return WORK_CALLED_PRODUCE + void produce(const size_t which_output, const size_t how_many_items); + + /******************************************************************* + * Deal with tag handling and tag configuration + ******************************************************************/ + + enum tag_propagation_policy_t + { + TPP_DONT = 0, + TPP_ALL_TO_ALL = 1, + TPP_ONE_TO_ONE = 2 + }; + + uint64_t nitems_read(const size_t which_input); + + uint64_t nitems_written(const size_t which_output); + + tag_propagation_policy_t tag_propagation_policy(void); + + void set_tag_propagation_policy(tag_propagation_policy_t p); + + //! Send a tag to the downstream on the given output port + void post_output_tag(const size_t which_output, const Tag &tag); + + //! Iterator return type get_input_tags - stl and boost compliant + typedef boost::iterator_range<std::vector<Tag>::const_iterator> TagIter; + + //! Get an iterator of item tags for the given input + TagIter get_input_tags(const size_t which_input = 0); + + /******************************************************************* + * Work related routines from basic block + ******************************************************************/ + + //! Called when the flow graph is started, can overload + virtual bool start(void); + + //! Called when the flow graph is stopped, can overload + virtual bool stop(void); + + typedef std::vector<WorkBuffer<const void *> > InputItems; + typedef std::vector<WorkBuffer<void *> > OutputItems; + + //! The official call into the work routine (overload please) + virtual int work( + const InputItems &input_items, + const OutputItems &output_items + ) = 0; + + //! forcast requirements, can be overloaded + virtual void forecast( + int noutput_items, + std::vector<int> &ninput_items_required + ); + + //! scheduler calls when the topology is updated, can be overloaded + virtual bool check_topology(int ninputs, int noutputs); + + /*! + * Set if the work call should be interruptible by stop(). + * Some work implementations block with the expectation of + * getting a boost thread interrupt in a blocking call. + * Set set_interruptible_work(true) if this is the case. + * By default, work implementations are not interruptible. + */ + void set_interruptible_work(const bool enb); + + /******************************************************************* + * routines related to affinity and allocation + ******************************************************************/ + + /*! + * Set the node affinity of this block. + * This call affects how output buffers are allocated. + * By default memory is allocated by malloc. + * When the affinity is set, virtual memory + * will be locked to a physical CPU/memory node. + * \param affinity a memory node on the system + */ + void set_buffer_affinity(const long affinity); + + /*! + * The output buffer allocator method. + * This method is called by the scheduler to allocate output buffers. + * The user may overload this method to create a custom allocator. + * + * Example use case: + * //TODO code example + * + * \param which_output the output port index number + * \param token the token for the buffer's returner + * \param recommend_length the schedulers recommended length in bytes + * \return the token used for the buffer allocation (may be the same) + */ + virtual SBufferToken output_buffer_allocator( + const size_t which_output, + const SBufferToken &token, + const size_t recommend_length + ); + + /*! + * The input buffer allocator method. + * This method is special and very different from allocate output buffers. + * Typically, blocks do not have control of their input buffers. + * When overloaded, an upstream block will ask this block + * to allocate its output buffers. This way, this block will get + * input buffers which were actually allocated by this method. + * + * \param which_input the input port index number + * \param token the token for the buffer's returner + * \param recommend_length the schedulers recommended length in bytes + * \return the token used for the buffer allocation (may be the same) + */ + virtual SBufferToken input_buffer_allocator( + const size_t which_input, + const SBufferToken &token, + const size_t recommend_length + ); + +}; + +} //namespace gras + +#endif /*INCLUDED_GRAS_BLOCK_HPP*/ diff --git a/include/gras/element.hpp b/include/gras/element.hpp new file mode 100644 index 0000000..3142510 --- /dev/null +++ b/include/gras/element.hpp @@ -0,0 +1,76 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_GRAS_ELEMENT_HPP +#define INCLUDED_GRAS_ELEMENT_HPP + +#include <gras/gras.hpp> +#include <gras/io_signature.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> + +namespace gras +{ + +class ElementImpl; + +struct GRAS_API Element : boost::shared_ptr<ElementImpl>, boost::enable_shared_from_this<Element> +{ + + //! Create an empty element + Element(void); + + //! Creates a new element given the name + Element(const std::string &name); + + /*! + * Create an element from a shared pointer to an element. + * Good for that factory function/shared ptr paradigm. + */ + template <typename T> + Element(const boost::shared_ptr<T> &elem) + { + *this = elem->shared_to_element(); + } + + //! Convert a shared ptr of a derived class to an Element + Element &shared_to_element(void); + + //! for internal use only + boost::weak_ptr<Element> weak_self; + + //! An integer ID that is unique across the process + long unique_id(void) const; + + //! Get the name of this element + std::string name(void) const; + + //! get a canonical name for this element + std::string to_string(void) const; + + void set_output_signature(const gras::IOSignature &sig); + + void set_input_signature(const gras::IOSignature &sig); + + const gras::IOSignature &input_signature(void) const; + + const gras::IOSignature &output_signature(void) const; + +}; + +} //namespace gras + +#endif /*INCLUDED_GRAS_ELEMENT_HPP*/ diff --git a/include/gras/gras.hpp b/include/gras/gras.hpp new file mode 100644 index 0000000..bbeffba --- /dev/null +++ b/include/gras/gras.hpp @@ -0,0 +1,69 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_GRAS_GRAS_HPP +#define INCLUDED_GRAS_GRAS_HPP + +#include <ciso646> + +// http://gcc.gnu.org/wiki/Visibility +// Generic helper definitions for shared library support +#if defined _WIN32 || defined __CYGWIN__ + #define GRAS_HELPER_DLL_IMPORT __declspec(dllimport) + #define GRAS_HELPER_DLL_EXPORT __declspec(dllexport) + #define GRAS_HELPER_DLL_LOCAL +#else + #if __GNUC__ >= 4 + #define GRAS_HELPER_DLL_IMPORT __attribute__ ((visibility ("default"))) + #define GRAS_HELPER_DLL_EXPORT __attribute__ ((visibility ("default"))) + #define GRAS_HELPER_DLL_LOCAL __attribute__ ((visibility ("hidden"))) + #else + #define GRAS_HELPER_DLL_IMPORT + #define GRAS_HELPER_DLL_EXPORT + #define GRAS_HELPER_DLL_LOCAL + #endif +#endif + +// Now we use the generic helper definitions above to define GRAS_API and GRAS_LOCAL. +// GRAS_API is used for the public API symbols. It either DLL imports or DLL exports (or does nothing for static build) +// GRAS_LOCAL is used for non-api symbols. + +#ifdef GRAS_DLL // defined if GRAS is compiled as a DLL + #ifdef GRAS_DLL_EXPORTS // defined if we are building the GRAS DLL (instead of using it) + #define GRAS_API GRAS_HELPER_DLL_EXPORT + #else + #define GRAS_API GRAS_HELPER_DLL_IMPORT + #endif // GRAS_DLL_EXPORTS + #define GRAS_LOCAL GRAS_HELPER_DLL_LOCAL +#else // GRAS_DLL is not defined: this means GRAS is a static lib. + #define GRAS_API + #define GRAS_LOCAL +#endif // GRAS_DLL + +#define GRAS_MAX_ALIGNMENT 32 + +//define cross platform attribute macros +#if defined(GRAS_DEBUG) + #define GRAS_FORCE_INLINE inline +#elif defined(BOOST_MSVC) + #define GRAS_FORCE_INLINE __forceinline +#elif defined(__GNUG__) && __GNUG__ >= 4 + #define GRAS_FORCE_INLINE inline __attribute__((always_inline)) +#else + #define GRAS_FORCE_INLINE inline +#endif + +#endif /*INCLUDED_GRAS_GRAS_HPP*/ diff --git a/include/gras/hier_block.hpp b/include/gras/hier_block.hpp new file mode 100644 index 0000000..0574f14 --- /dev/null +++ b/include/gras/hier_block.hpp @@ -0,0 +1,75 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_GRAS_HIER_BLOCK_HPP +#define INCLUDED_GRAS_HIER_BLOCK_HPP + +#include <gras/element.hpp> + +namespace gras +{ + +struct GRAS_API HierBlock : Element +{ + HierBlock(void); + + HierBlock(const std::string &name); + + void connect(const Element &elem); + + void disconnect(const Element &elem); + + void connect( + const Element &src, + const size_t src_index, + const Element &sink, + const size_t sink_index + ); + + void disconnect( + const Element &src, + const size_t src_index, + const Element &sink, + const size_t sink_index + ); + + void disconnect_all(void); + + /*! + * Commit changes to the overall flow graph. + * Call this after modifying connections. + */ + virtual void commit(void); + + /*! + * The lock() call is deprecated. + * Topology can be changed duing design execution. + * The underlying implementation is literally a NOP. + */ + inline void lock(void){} + + /*! + * The unlock() call is deprecated. + * Topology can be changed duing design execution. + * The underlying implementation is this->commit(). + */ + inline void unlock(void){this->commit();} + +}; + +} //namespace gras + +#endif /*INCLUDED_GRAS_HIER_BLOCK_HPP*/ diff --git a/include/gras/io_signature.hpp b/include/gras/io_signature.hpp new file mode 100644 index 0000000..bb0df5f --- /dev/null +++ b/include/gras/io_signature.hpp @@ -0,0 +1,113 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_GRAS_IO_SIGNATURE_HPP +#define INCLUDED_GRAS_IO_SIGNATURE_HPP + +#include <vector> +#include <stdexcept> + +namespace gras +{ + +/*! + * An IO signature describes the input or output specifications + * for the streaming input or output ports of a block. + * Properties are a maximum and minimum number of ports, + * and an item size in bytes for each port. + */ +struct IOSignature : std::vector<int> +{ + static const int IO_INFINITE = -1; + + //! Create an empty signature with infinite IO + IOSignature(void) + { + this->set_min_streams(IO_INFINITE); + this->set_max_streams(IO_INFINITE); + } + + //! Create a signature with the specified min and max streams + IOSignature(const int min_streams, const int max_streams) + { + if (min_streams > max_streams and max_streams != IO_INFINITE) + { + throw std::invalid_argument("io signature fail: min_streams > max_streams"); + } + this->set_min_streams(min_streams); + this->set_max_streams(max_streams); + } + + //! Construct from pointer for backwards compatible shared_ptr usage. + IOSignature(const IOSignature *sig) + { + *this = *sig; + } + + //! Overloaded arrow operator for backwards compatible shared_ptr usage. + IOSignature* operator->(void) + { + return this; + }; + + //! Overloaded arrow operator for backwards compatible shared_ptr usage. + const IOSignature* operator->(void) const + { + return this; + }; + + void set_min_streams(const int val) + { + _min_streams = val; + } + + void set_max_streams(const int val) + { + _max_streams = val; + } + + int min_streams(void) const + { + return _min_streams; + } + + int max_streams(void) const + { + return _max_streams; + } + + int sizeof_stream_item(const int index) const + { + if (this->empty()) return 0; + if (this->size() > unsigned(index)) + { + return this->at(index); + } + return this->back(); + } + + std::vector<int> sizeof_stream_items(void) const + { + return *this; + } + + int _min_streams; + int _max_streams; +}; + +} //namespace gras + +#endif /*INCLUDED_GRAS_IO_SIGNATURE_HPP*/ diff --git a/include/gras/sbuffer.hpp b/include/gras/sbuffer.hpp new file mode 100644 index 0000000..7997e09 --- /dev/null +++ b/include/gras/sbuffer.hpp @@ -0,0 +1,142 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_GRAS_SBUFFER_HPP +#define INCLUDED_GRAS_SBUFFER_HPP + +#include <gras/gras.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> +#include <boost/intrusive_ptr.hpp> +#include <boost/function.hpp> + +namespace gras +{ + +struct SBufferImpl; +struct SBuffer; + +//! The callback function type when buffers dereference +typedef boost::function<void(SBuffer &)> SBufferDeleter; + +//! The token type held by the caller +typedef boost::shared_ptr<SBufferDeleter> SBufferToken; + +//! The token type weak ptr help by buffer +typedef boost::weak_ptr<SBufferDeleter> SBufferTokenWeak; + +struct GRAS_API SBufferConfig +{ + //! Default constructor zeros out buffer config + SBufferConfig(void); + + //! pointer to the memory start + void *memory; + + //! length of the memory in bytes + size_t length; + + //! memory affinity - meta information + long affinity; + + //! index number for custom allocation purposes + size_t user_index; + + //! deleter callback, may be used to free + SBufferDeleter deleter; + + //! token object, called if set under deref condition + SBufferTokenWeak token; +}; + +/*! + * SBuffer is a smart/shared reference counted handler of memory. + * Thank you boost smart/shared pointers for the disambiguation! + * + * Default allocator: + * To use the default system allocator, set the memory to NULL, + * and set length and affinity to the desired values. + * The deleter will be automaically configured by the allocator. + * + * Custom allocator: + * Set all config struct members. Its all you! + * + * Token usage: + * When a token is set, the buffer will not cleanup and call the deleter. + * Rather, the bound function in the token will be called with the buffer. + * A parent object should hold the shared pointer to the token. + * When the parent object deconstructs, the weak pointer will die, + * and the normal buffer cleanup/freeing/deconstruction will happen. + * + * Length and offset usage: + * Length and offset are intentionally object members + * and not part of the ref-counted intrusive_ptr guts. + * These members should be modified carefully + * to pass a subset of the memory downstream. + * + * Length and offset recommendation: + * These values should probably be reset by the + * bound token or when a fresh buffer is popped. + */ +struct GRAS_API SBuffer : boost::intrusive_ptr<SBufferImpl> +{ + //! Default constructor, zeros things + SBuffer(void); + + /*! + * Create a new buffer. + * The config object represents a chunk of memory, + * or instructions for the default allocator. + */ + SBuffer(const SBufferConfig &config); + + /*! + * Get a pointer to the start of the underlying memory + */ + void *get_actual_memory(void) const; + + /*! + * Get the length of the underlying memory in bytes + */ + size_t get_actual_length(void) const; + + //! Get a pointer into valid memory + void *get(const ptrdiff_t delta_bytes = 0) const; + + //! The offset into valid memory in bytes + size_t offset; + + //! The number of valid bytes past offset + size_t length; + + //! Get the affinity of the memory + long get_affinity(void) const; + + //! Get the user index number + size_t get_user_index(void) const; + + //! Unique if caller holds the only reference count + bool unique(void) const; + + //! Get the number of reference holders + size_t use_count(void) const; +}; + +} //namespace gras + +#include <gras/sbuffer.ipp> + +#endif /*INCLUDED_GRAS_SBUFFER_HPP*/ diff --git a/include/gras/sbuffer.ipp b/include/gras/sbuffer.ipp new file mode 100644 index 0000000..c090123 --- /dev/null +++ b/include/gras/sbuffer.ipp @@ -0,0 +1,98 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_GRAS_SBUFFER_IPP +#define INCLUDED_GRAS_SBUFFER_IPP + +#include <boost/detail/atomic_count.hpp> + +namespace gras +{ + +struct SBufferImpl +{ + SBufferImpl(const SBufferConfig &config): + count(0), + config(config) + { + //NOP + } + + boost::detail::atomic_count count; + SBufferConfig config; +}; + + +extern GRAS_API void sbuffer_handle_deref(SBufferImpl *impl); + +GRAS_FORCE_INLINE void intrusive_ptr_add_ref(SBufferImpl *impl) +{ + ++impl->count; +} + +GRAS_FORCE_INLINE void intrusive_ptr_release(SBufferImpl *impl) +{ + if (--impl->count == 0) + { + sbuffer_handle_deref(impl); + } +} + +GRAS_FORCE_INLINE SBuffer::SBuffer(void): + offset(0), + length(0) +{ + //NOP +} + +GRAS_FORCE_INLINE void *SBuffer::get_actual_memory(void) const +{ + return (*this)->config.memory; +} + +GRAS_FORCE_INLINE size_t SBuffer::get_actual_length(void) const +{ + return (*this)->config.length; +} + +GRAS_FORCE_INLINE void *SBuffer::get(const ptrdiff_t delta_bytes) const +{ + return ((char *)(*this)->config.memory) + this->offset + delta_bytes; +} + +GRAS_FORCE_INLINE long SBuffer::get_affinity(void) const +{ + return (*this)->config.affinity; +} + +GRAS_FORCE_INLINE size_t SBuffer::get_user_index(void) const +{ + return (*this)->config.user_index; +} + +GRAS_FORCE_INLINE bool SBuffer::unique(void) const +{ + return (*this)->count == 1; +} + +GRAS_FORCE_INLINE size_t SBuffer::use_count(void) const +{ + return (*this)->count; +} + +} //namespace gras + +#endif /*INCLUDED_GRAS_SBUFFER_IPP*/ diff --git a/include/gras/tags.hpp b/include/gras/tags.hpp new file mode 100644 index 0000000..9faa667 --- /dev/null +++ b/include/gras/tags.hpp @@ -0,0 +1,53 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_GRAS_TAGS_HPP +#define INCLUDED_GRAS_TAGS_HPP + +#include <gras/gras.hpp> +#include <boost/operators.hpp> +#include <PMC/PMC.hpp> +#include <boost/cstdint.hpp> + +namespace gras +{ + +struct GRAS_API Tag : boost::less_than_comparable<Tag> +{ + //! Make an empty tag with null members + Tag(void); + + //! Make a tag from parameters to initialize the members + Tag(const uint64_t &offset, const PMCC &key, const PMCC &value, const PMCC &srcid = PMCC()); + + //! the absolute item count associated with this tag + boost::uint64_t offset; + + //! A symbolic name identifying the type of tag + PMCC key; + + //! The value of this tag -> the sample metadata + PMCC value; + + //! The optional source ID -> something unique + PMCC srcid; +}; + +GRAS_API bool operator<(const Tag &lhs, const Tag &rhs); + +} //namespace gras + +#endif /*INCLUDED_GRAS_TAGS_HPP*/ diff --git a/include/gras/thread_pool.hpp b/include/gras/thread_pool.hpp new file mode 100644 index 0000000..bfda931 --- /dev/null +++ b/include/gras/thread_pool.hpp @@ -0,0 +1,92 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_GRAS_THREAD_POOL_HPP +#define INCLUDED_GRAS_THREAD_POOL_HPP + +#include <gras/gras.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> +#include <string> + +//! ThreadPool is an unexposed Theron Framework +//! Forward declare the Framwork for c++ users +namespace Theron +{ + class Framework; +} + +namespace gras +{ + +struct GRAS_API ThreadPoolConfig +{ + ThreadPoolConfig(void); + + /*! + * The initial number of worker threads to create within the framework. + * Default is the number of CPUs on the system. + */ + size_t thread_count; + + /*! + * Specifies the NUMA processor nodes upon which the framework may execute. + * Default is all NUMA nodes on the system. + */ + size_t node_mask; + + /*! + * Specifies the subset of the processors in each NUMA processor node upon which the framework may execute. + * Default is all CPUs per NUMA node. + */ + size_t processor_mask; + + /*! + * Yield strategy employed by the worker threads in the framework. + * POLITE, ///< Threads go to sleep when not in use. + * STRONG, ///< Threads yield to other threads but don't go to sleep. + * AGGRESSIVE ///< Threads never yield to other threads. + * Default is STRONG. + */ + std::string yield_strategy; +}; + +/*! + * Thread Pool is is a this wrapper of Theron Framework, see link for more details: + * http://docs.theron-library.com/5.00/structTheron_1_1Framework_1_1Parameters.html + */ +struct GRAS_API ThreadPool : boost::shared_ptr<Theron::Framework> +{ + //! Create an empty thread pool + ThreadPool(void); + + //! Create a thread pool from a weak pointer to a framework + ThreadPool(boost::weak_ptr<Theron::Framework> p); + + //! Create a new thread pool with parameters + ThreadPool(const ThreadPoolConfig &config); + + /*! + * When a block is created, it will execute in the active pool. + * Use this call before creating a block to control which + * thread pool that the block's work routine will run in. + */ + void set_active(void); +}; + +} //namespace gras + +#endif /*INCLUDED_GRAS_THREAD_POOL_HPP*/ diff --git a/include/gras/top_block.hpp b/include/gras/top_block.hpp new file mode 100644 index 0000000..8bf366f --- /dev/null +++ b/include/gras/top_block.hpp @@ -0,0 +1,98 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_GRAS_TOP_BLOCK_HPP +#define INCLUDED_GRAS_TOP_BLOCK_HPP + +#include <gras/hier_block.hpp> + +namespace gras +{ + +struct GRAS_API GlobalBlockConfig +{ + GlobalBlockConfig(void); + + /*! + * Constrain the maximum number of items that + * work can be called with for all output ports. + * + * Default = 0 aka disabled. + */ + size_t maximum_output_items; + + /*! + * Set the global memory node affinity. + * Blocks that have not been explicitly set, + * will take on this new buffer_affinity. + * + * Default = -1 aka no affinity. + */ + long buffer_affinity; +}; + +struct GRAS_API TopBlock : HierBlock +{ + TopBlock(void); + + TopBlock(const std::string &name); + + //! Get the global block config settings + GlobalBlockConfig global_config(void) const; + + //! Set the global block config settings + void set_global_config(const GlobalBlockConfig &config); + + /*! + * Commit changes to the overall flow graph. + * Call this after modifying connections. + * Commit is called automatically by start/stop/run. + */ + void commit(void); + + /*! + * Run is for finite flow graph executions. + * Mostly for testing purposes only. + */ + void run(void); + + //! Start a flow graph execution (does not block) + void start(void); + + //! Stop a flow graph execution (does not block) + void stop(void); + + /*! + * Wait for threads to exit after stop() or run(). + * This is a blocking call and will not return until + * all blocks in the topology have been marked done. + */ + virtual void wait(void); + + /*! + * Wait for threads to exit after stop() or run(). + * This is call will block until timeout or done. + * + * \param timeout the timeout in seconds + * \return true of execution completed + */ + virtual bool wait(const double timeout); + +}; + +} //namespace gras + +#endif /*INCLUDED_GRAS_TOP_BLOCK_HPP*/ |