diff options
author | Josh Blum | 2012-12-15 09:53:25 -0800 |
---|---|---|
committer | Josh Blum | 2012-12-15 09:53:25 -0800 |
commit | b7c0a59c0a86f289f55935b19efaf448e892eefb (patch) | |
tree | 1e641f9816237481124835ac2fbf2792a8b1d50f | |
parent | 6a03c661ede88203ff90eb01bf1e678b87cb6056 (diff) | |
download | sandhi-b7c0a59c0a86f289f55935b19efaf448e892eefb.tar.gz sandhi-b7c0a59c0a86f289f55935b19efaf448e892eefb.tar.bz2 sandhi-b7c0a59c0a86f289f55935b19efaf448e892eefb.zip |
work on the buffer queue api
-rw-r--r-- | include/gras/CMakeLists.txt | 1 | ||||
-rw-r--r-- | include/gras/block.hpp | 9 | ||||
-rw-r--r-- | include/gras/block.i | 1 | ||||
-rw-r--r-- | include/gras/buffer_queue.hpp | 43 | ||||
-rw-r--r-- | lib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 28 | ||||
-rw-r--r-- | lib/block_task.cpp | 7 | ||||
-rw-r--r-- | lib/buffer_queue_circ.cpp | 3 | ||||
-rw-r--r-- | lib/buffer_queue_pool.cpp | 54 | ||||
-rw-r--r-- | lib/circular_buffer.cpp | 4 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 3 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 6 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 3 | ||||
-rw-r--r-- | lib/gras_impl/output_buffer_queues.hpp | 72 | ||||
-rw-r--r-- | lib/gras_impl/simple_buffer_queue.hpp (renamed from lib/gras_impl/buffer_queue.hpp) | 14 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 4 | ||||
-rw-r--r-- | lib/output_handlers.cpp | 6 |
17 files changed, 190 insertions, 70 deletions
diff --git a/include/gras/CMakeLists.txt b/include/gras/CMakeLists.txt index e8fb85e..f0b0d3a 100644 --- a/include/gras/CMakeLists.txt +++ b/include/gras/CMakeLists.txt @@ -21,6 +21,7 @@ install(FILES thread_pool.hpp top_block.hpp work_buffer.hpp + buffer_queue.hpp DESTINATION include/gras COMPONENT ${GRAS_COMP_DEVEL} diff --git a/include/gras/block.hpp b/include/gras/block.hpp index 3899561..524ff00 100644 --- a/include/gras/block.hpp +++ b/include/gras/block.hpp @@ -8,6 +8,7 @@ #include <gras/tag_iter.hpp> #include <gras/tags.hpp> #include <gras/work_buffer.hpp> +#include <gras/buffer_queue.hpp> #include <vector> #include <string> @@ -337,9 +338,9 @@ struct GRAS_API Block : Element * \param which_output the output port index number * \param token the token for the buffer's returner * \param recommend_length the schedulers recommended length in bytes - * \return the token used for the buffer allocation (may be the same) + * \return a shared ptr to a new buffer queue object */ - virtual SBufferToken output_buffer_allocator( + virtual BufferQueueSptr output_buffer_allocator( const size_t which_output, const SBufferToken &token, const size_t recommend_length @@ -356,9 +357,9 @@ struct GRAS_API Block : Element * \param which_input the input port index number * \param token the token for the buffer's returner * \param recommend_length the schedulers recommended length in bytes - * \return the token used for the buffer allocation (may be the same) + * \return a shared ptr to a new buffer queue object */ - virtual SBufferToken input_buffer_allocator( + virtual BufferQueueSptr input_buffer_allocator( const size_t which_input, const SBufferToken &token, const size_t recommend_length diff --git a/include/gras/block.i b/include/gras/block.i index 80c3905..ef2738c 100644 --- a/include/gras/block.i +++ b/include/gras/block.i @@ -11,6 +11,7 @@ %import <gras/tags.i> %include <gras/tag_iter.i> %import <gras/sbuffer.i> +%include <gras/buffer_queue.hpp> %include <gras/block.hpp> #endif /*INCLUDED_GRAS_BLOCK_I*/ diff --git a/include/gras/buffer_queue.hpp b/include/gras/buffer_queue.hpp new file mode 100644 index 0000000..5125555 --- /dev/null +++ b/include/gras/buffer_queue.hpp @@ -0,0 +1,43 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#ifndef INCLUDED_GRAS_BUFFER_QUEUE_HPP +#define INCLUDED_GRAS_BUFFER_QUEUE_HPP + +#include <gras/gras.hpp> +#include <gras/sbuffer.hpp> +#include <boost/shared_ptr.hpp> + +namespace gras +{ + +struct BufferQueue; + +typedef boost::shared_ptr<BufferQueue> BufferQueueSptr; + +//! Buffer Queue is an interface enabling us to create custom buffer allocators. +struct BufferQueue +{ + + //! Create a buffer queue using the pool allocator + GRAS_API static BufferQueueSptr make_pool(const SBufferConfig &config, const size_t num_buffs); + + //! Create a buffer queue using the circular buffer allocator + GRAS_API static BufferQueueSptr make_circ(const SBufferConfig &config); + + //! Get a reference to the buffer at the front of the queue + virtual SBuffer &front(void) = 0; + + //! Pop off the used portion of the queue + virtual void pop(void) = 0; + + //! Push a used buffer back into the queue + virtual void push(const SBuffer &buff) = 0; + + //! Is the queue empty? + virtual bool empty(void) const = 0; + +}; + +} //namespace gras + +#endif /*INCLUDED_GRAS_BUFFER_QUEUE_HPP*/ diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 84c8f4e..bc38d06 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -41,6 +41,8 @@ list(APPEND GRAS_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/element.cpp ${CMAKE_CURRENT_SOURCE_DIR}/sbuffer.cpp ${CMAKE_CURRENT_SOURCE_DIR}/circular_buffer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/buffer_queue_circ.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/buffer_queue_pool.cpp ${CMAKE_CURRENT_SOURCE_DIR}/tags.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 64b3aed..54654ba 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -58,7 +58,6 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address //allocate output buffers which will also wake up the task const size_t num_outputs = this->get_num_outputs(); - this->output_buffer_tokens.resize(num_outputs); for (size_t i = 0; i < num_outputs; i++) { const size_t bytes = recommend_length( @@ -71,7 +70,8 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address SBufferDeleter deleter = boost::bind(&BlockActor::buffer_returner, this, i, _1); SBufferToken token = SBufferToken(new SBufferDeleter(deleter)); - this->output_buffer_tokens[i] = block_ptr->output_buffer_allocator(i, token, bytes); + BufferQueueSptr queue = block_ptr->output_buffer_allocator(i, token, bytes); + this->output_queues.set_buffer_queue(i, queue); InputAllocMessage message; message.token = SBufferToken(new SBufferDeleter(deleter)); @@ -82,29 +82,23 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address this->Send(0, from); //ACK } -SBufferToken Block::output_buffer_allocator( +BufferQueueSptr Block::output_buffer_allocator( const size_t, const SBufferToken &token, const size_t recommend_length ){ - for (size_t j = 0; j < THIS_MANY_BUFFERS; j++) - { - SBufferConfig config; - config.memory = NULL; - config.length = recommend_length; - config.affinity = (*this)->block->buffer_affinity; - config.token = token; - SBuffer buff(config); - std::memset(buff.get_actual_memory(), 0, buff.get_actual_length()); - //buffer derefs here and the token messages it back to the block - } - return token; + SBufferConfig config; + config.memory = NULL; + config.length = recommend_length; + config.affinity = (*this)->block->buffer_affinity; + config.token = token; + return BufferQueue::make_pool(config, THIS_MANY_BUFFERS); } -SBufferToken Block::input_buffer_allocator( +BufferQueueSptr Block::input_buffer_allocator( const size_t, const SBufferToken &, const size_t ){ - return SBufferToken(); //null + return BufferQueueSptr(); //null } diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 9e5917d..6b18798 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -31,9 +31,6 @@ void BlockActor::mark_done(void) //release upstream, downstream, and executor tokens this->token_pool.clear(); - //release allocator tokens, buffers can now call deleters - this->output_buffer_tokens.clear(); - //release all buffers in queues this->input_queues.flush_all(); this->output_queues.flush_all(); @@ -156,7 +153,8 @@ void BlockActor::handle_task(void) this->input_items.max() = std::max(this->input_items.max(), items); //inline dealings, how and when input buffers can be inlined into output buffers - //continue; + continue; //FIXME to implement needs change + /* if ( buff.unique() and input_configs[i].inline_buffer and @@ -170,6 +168,7 @@ void BlockActor::handle_task(void) this->output_queues.push_front(output_inline_index, new_obuff); //you got inlined! output_inline_index++; //done do this output port again } + */ } //------------------------------------------------------------------ diff --git a/lib/buffer_queue_circ.cpp b/lib/buffer_queue_circ.cpp new file mode 100644 index 0000000..9d0dbcf --- /dev/null +++ b/lib/buffer_queue_circ.cpp @@ -0,0 +1,3 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include <gras/buffer_queue.hpp> diff --git a/lib/buffer_queue_pool.cpp b/lib/buffer_queue_pool.cpp new file mode 100644 index 0000000..a30f4a6 --- /dev/null +++ b/lib/buffer_queue_pool.cpp @@ -0,0 +1,54 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include <gras/buffer_queue.hpp> +#include <gras_impl/debug.hpp> +#include <boost/circular_buffer.hpp> + +using namespace gras; + +struct BufferQueuePool : BufferQueue +{ + BufferQueuePool(const size_t num) + { + queue.resize(num); + } + + SBuffer &front(void) + { + return queue.front(); + } + + void pop(void) + { + ASSERT(not queue.empty()); + queue.front() = SBuffer(); //dont hold ref + queue.pop_front(); + } + + void push(const SBuffer &buff) + { + queue.push_back(buff); + } + + bool empty(void) const + { + return queue.empty(); + } + + boost::circular_buffer<SBuffer> queue; + +}; + +BufferQueueSptr BufferQueue::make_pool( + const SBufferConfig &config, + const size_t num_buffs +){ + BufferQueueSptr bq(new BufferQueuePool(num_buffs)); + for (size_t i = 0; i < num_buffs; i++) + { + SBuffer buff(config); + std::memset(buff.get_actual_memory(), 0, buff.get_actual_length()); + bq->push(buff); + } + return bq; +} diff --git a/lib/circular_buffer.cpp b/lib/circular_buffer.cpp index 820129d..1aa745b 100644 --- a/lib/circular_buffer.cpp +++ b/lib/circular_buffer.cpp @@ -1,6 +1,6 @@ // Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. -#include <gras_impl/endless_buffer_queue.hpp> +#include <gras/buffer_queue.hpp> #include <boost/format.hpp> #include <boost/bind.hpp> #include <boost/interprocess/shared_memory_object.hpp> @@ -137,6 +137,7 @@ static void circular_buffer_delete(SBuffer &buff, CircularBuffer *circ_buff) delete circ_buff; } +/* SBuffer EndlessBufferQueue::make_circular_buffer(const size_t num_bytes) { CircularBuffer *circ_buff = new CircularBuffer(num_bytes); @@ -147,3 +148,4 @@ SBuffer EndlessBufferQueue::make_circular_buffer(const size_t num_bytes) config.deleter = deleter; return SBuffer(config); } +*/ diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index d204ec4..8717fa2 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -122,11 +122,10 @@ struct BlockActor : Apology::Worker BitSet inputs_done; BitSet outputs_done; std::set<Token> token_pool; - std::vector<SBufferToken> output_buffer_tokens; //buffer queues and ready conditions InputBufferQueues input_queues; - OutputBufferQueues<SBuffer> output_queues; + OutputBufferQueues output_queues; BitSet inputs_available; //tag tracking diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 83ed44b..7f70d39 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -5,7 +5,7 @@ #include <gras_impl/debug.hpp> #include <gras_impl/bitset.hpp> -#include <gras_impl/buffer_queue.hpp> +#include <gras_impl/simple_buffer_queue.hpp> #include <gras/sbuffer.hpp> #include <vector> #include <queue> @@ -172,7 +172,7 @@ struct InputBufferQueues std::vector<size_t> _maximum_bytes; std::vector<boost::circular_buffer<SBuffer> > _queues; std::vector<size_t> _preload_bytes; - std::vector<boost::shared_ptr<BufferQueue> > _aux_queues; + std::vector<boost::shared_ptr<SimpleBufferQueue> > _aux_queues; }; @@ -207,7 +207,7 @@ inline void InputBufferQueues::update_config( _aux_queues[i]->empty() or _aux_queues[i]->front().get_actual_length() != _maximum_bytes[i] ){ - _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue()); + _aux_queues[i].reset(new SimpleBufferQueue()); _aux_queues[i]->allocate_one(_maximum_bytes[i]); _aux_queues[i]->allocate_one(_maximum_bytes[i]); _aux_queues[i]->allocate_one(_maximum_bytes[i]); diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index eae3258..62d48e6 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -3,6 +3,7 @@ #ifndef INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP #define INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP +#include <gras/buffer_queue.hpp> #include <gras/sbuffer.hpp> #include <gras/tags.hpp> #include <gras/sbuffer.hpp> @@ -104,7 +105,7 @@ struct OutputHintMessage struct OutputAllocMessage { size_t index; - SBufferToken token; + BufferQueueSptr queue; }; //---------------------------------------------------------------------- diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp index 722a4c2..5674c3d 100644 --- a/lib/gras_impl/output_buffer_queues.hpp +++ b/lib/gras_impl/output_buffer_queues.hpp @@ -3,64 +3,75 @@ #ifndef INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP #define INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP +#include <gras/buffer_queue.hpp> #include <gras_impl/bitset.hpp> #include <vector> -#include <boost/circular_buffer.hpp> namespace gras { -template <typename T> struct OutputBufferQueues { - enum {MAX_QUEUE_SIZE = 128}; - BitSet _bitset; - std::vector<boost::circular_buffer<T> > _queues; + void set_buffer_queue(const size_t i, BufferQueueSptr queue) + { + _queues[i] = queue; + _update(i); + } + + void set_reserve_bytes(const size_t i, const size_t num_bytes) + { + _reserve_bytes[i] = num_bytes; + _update(i); + } GRAS_FORCE_INLINE void resize(const size_t size) { _bitset.resize(size); - _queues.resize(size, boost::circular_buffer<T>(MAX_QUEUE_SIZE)); + _queues.resize(size); + _reserve_bytes.resize(size, 1); } - GRAS_FORCE_INLINE void push(const size_t i, const T &value) + GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &value) { - _queues[i].push_back(value); - _bitset.set(i); + _queues[i]->push(value); + _update(i); + } + + GRAS_FORCE_INLINE void flush_all(void) + { + const size_t old_size = this->size(); + this->resize(0); + this->resize(old_size); } //! used for input buffer inlining + /* GRAS_FORCE_INLINE void push_front(const size_t i, const T &value) { ASSERT(not _queues[i].full()); _queues[i].push_front(value); _bitset.set(i); } + */ - GRAS_FORCE_INLINE const T &front(const size_t i) const + GRAS_FORCE_INLINE SBuffer &front(const size_t i) { - return _queues[i].front(); - } - - GRAS_FORCE_INLINE T &front(const size_t i) - { - ASSERT(not _queues[i].empty()); - return _queues[i].front(); + ASSERT(not _queues[i]->empty()); + return _queues[i]->front(); } GRAS_FORCE_INLINE void pop(const size_t i) { - _queues[i].front() = T(); - _queues[i].pop_front(); - _bitset.set(i, not _queues[i].empty()); + _queues[i]->pop(); + _update(i); } GRAS_FORCE_INLINE void fail(const size_t i) { _bitset.reset(i); } - +/* GRAS_FORCE_INLINE void flush(const size_t i) { _queues[i].clear(); @@ -71,15 +82,18 @@ struct OutputBufferQueues { for (size_t i = 0; i < this->size(); i++) this->flush(i); } - +*/ GRAS_FORCE_INLINE bool ready(const size_t i) const { - return not _queues[i].empty(); + if (_queues[i]->empty()) return false; + const SBuffer &front = _queues[i]->front(); + const size_t avail = front.get_actual_length() - front.offset - front.length; + return avail >= _reserve_bytes[i]; } GRAS_FORCE_INLINE bool empty(const size_t i) const { - return _queues[i].empty(); + return _queues[i]->empty(); } GRAS_FORCE_INLINE bool all_ready(void) const @@ -91,6 +105,16 @@ struct OutputBufferQueues { return _queues.size(); } + + GRAS_FORCE_INLINE void _update(const size_t i) + { + _bitset.set(i, this->ready(i)); + } + + BitSet _bitset; + std::vector<BufferQueueSptr> _queues; + std::vector<size_t> _reserve_bytes; + }; } //namespace gras diff --git a/lib/gras_impl/buffer_queue.hpp b/lib/gras_impl/simple_buffer_queue.hpp index 0e6be9e..db6ecc0 100644 --- a/lib/gras_impl/buffer_queue.hpp +++ b/lib/gras_impl/simple_buffer_queue.hpp @@ -1,7 +1,7 @@ // Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. -#ifndef INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP -#define INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP +#ifndef INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP +#define INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP #include <gras/sbuffer.hpp> #include <boost/bind.hpp> @@ -10,15 +10,15 @@ namespace gras { -struct BufferQueue : std::queue<SBuffer> +struct SimpleBufferQueue : std::queue<SBuffer> { - BufferQueue(void) + SimpleBufferQueue(void) { - SBufferDeleter deleter = boost::bind(&BufferQueue::push_back, this, _1); + SBufferDeleter deleter = boost::bind(&SimpleBufferQueue::push_back, this, _1); _token = SBufferToken(new SBufferDeleter(deleter)); } - ~BufferQueue(void) + ~SimpleBufferQueue(void) { _token.reset(); while (not this->empty()) @@ -47,4 +47,4 @@ struct BufferQueue : std::queue<SBuffer> } //namespace gras -#endif /*INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP*/ +#endif /*INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP*/ diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index 02f62f3..bef5234 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -66,8 +66,8 @@ void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Ther //handle the upstream block allocation request OutputAllocMessage new_msg; - new_msg.token = block_ptr->input_buffer_allocator( + new_msg.queue = block_ptr->input_buffer_allocator( index, message.token, message.recommend_length ); - if (new_msg.token) this->post_upstream(index, new_msg); + if (new_msg.queue) this->post_upstream(index, new_msg); } diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp index 5876f89..a0bad75 100644 --- a/lib/output_handlers.cpp +++ b/lib/output_handlers.cpp @@ -70,9 +70,5 @@ void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Th const size_t index = message.index; //return of a positive downstream allocation - //reset the token, and clear old output buffers - //the new token from the downstream is installed - this->output_buffer_tokens[index].reset(); - this->output_queues.flush(index); - this->output_buffer_tokens[index] = message.token; + this->output_queues.set_buffer_queue(index, message.queue); } |