diff options
-rw-r--r-- | include/gras/buffer_queue.hpp | 38 | ||||
-rw-r--r-- | lib/block_task.cpp | 5 | ||||
-rw-r--r-- | lib/buffer_queue_circ.cpp | 120 | ||||
-rw-r--r-- | lib/buffer_queue_pool.cpp | 30 | ||||
-rw-r--r-- | lib/circular_buffer.cpp | 4 | ||||
-rw-r--r-- | lib/gras_impl/endless_buffer_queue.hpp | 107 |
6 files changed, 172 insertions, 132 deletions
diff --git a/include/gras/buffer_queue.hpp b/include/gras/buffer_queue.hpp index 5125555..da8de41 100644 --- a/include/gras/buffer_queue.hpp +++ b/include/gras/buffer_queue.hpp @@ -18,11 +18,41 @@ typedef boost::shared_ptr<BufferQueue> BufferQueueSptr; struct BufferQueue { - //! Create a buffer queue using the pool allocator - GRAS_API static BufferQueueSptr make_pool(const SBufferConfig &config, const size_t num_buffs); + /*! + * Create a buffer queue object using the pool allocator. + * A pool of buffers contains individual buffer allocations. + * + * The config param is used to pass information + * about per-buffer size, affinity, and token. + * + * \param config used to alloc one buffer + * \param num_buffs alloc this many buffs + * \return a new buffer queue sptr + */ + GRAS_API static BufferQueueSptr make_pool( + const SBufferConfig &config, + const size_t num_buffs + ); - //! Create a buffer queue using the circular buffer allocator - GRAS_API static BufferQueueSptr make_circ(const SBufferConfig &config); + /*! + * Create a buffer queue object using the circular allocator. + * The circular allocator contains one large double-mapped buffer. + * This buffer is virtually mapped so that buff[n+len] = buff[n]. + * + * Pieces of this buffer can be passed around in smaller chunks. + * The number of chunks is dictated by num_buffs, + * the size of the chunks is dictated by config.length. + * The size of the circular buffer will be num_buffs*config.length + * + * \param config used to alloc one buffer + * \param num_buffs this many smaller chunks + * + * \return a new buffer queue sptr + */ + GRAS_API static BufferQueueSptr make_circ( + const SBufferConfig &config, + const size_t num_buffs + ); //! Get a reference to the buffer at the front of the queue virtual SBuffer &front(void) = 0; diff --git a/lib/block_task.cpp b/lib/block_task.cpp index efdc574..c84f702 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -94,6 +94,9 @@ void BlockActor::output_fail(const size_t i) //force pop so next work() gets a new buffer this->flush_output(i, true); + + //mark fail: not ready until a new buffer appears + this->output_queues.fail(i); } GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void) @@ -264,7 +267,7 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i, const bool force_pop) { - if (not this->output_queues.ready(i) or this->output_queues.front(i).length == 0) return; + if (this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return; SBuffer &buff = this->output_queues.front(i); InputBufferMessage buff_msg; buff_msg.buffer = buff; diff --git a/lib/buffer_queue_circ.cpp b/lib/buffer_queue_circ.cpp index 41a31c7..5e69fe0 100644 --- a/lib/buffer_queue_circ.cpp +++ b/lib/buffer_queue_circ.cpp @@ -1,11 +1,127 @@ // Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. #include <gras/buffer_queue.hpp> +#include <gras_impl/debug.hpp> +#include <boost/circular_buffer.hpp> +#include <vector> using namespace gras; +SBuffer make_circular_buffer(const size_t num_bytes); //circular_buffer.cpp + +struct BufferQueueCirc : BufferQueue +{ + BufferQueueCirc(const SBufferConfig &config, const size_t num); + + ~BufferQueueCirc(void) + { + _token.reset(); + _available_buffers.clear(); + _returned_buffers.clear(); + } + + SBuffer &front(void); + + void pop(void); + + void push(const SBuffer &buff); + + bool empty(void) const + { + return _bytes_avail == 0 or _available_buffers.empty(); + } + + SBufferToken _token; + SBuffer _circ_buff; + char *_write_ptr; + size_t _bytes_avail; + size_t _ack_index; + boost::circular_buffer<SBuffer> _available_buffers; + std::vector<SBuffer> _returned_buffers; + std::vector<size_t> _outgone_bytes; + +}; + +BufferQueueCirc::BufferQueueCirc(const SBufferConfig &config, const size_t num_buffs): + _token(config.token), + _ack_index(0) +{ + //allocate a large buffer + const size_t num_bytes = config.length * num_buffs; + _circ_buff = make_circular_buffer(num_bytes); + _write_ptr = (char *)_circ_buff.get_actual_memory(); + _bytes_avail = _circ_buff.get_actual_length(); + + //allocate pool of sbuffers + _available_buffers.resize(num_buffs); + _returned_buffers.resize(num_buffs); + _outgone_bytes.resize(num_buffs, 0); + SBufferConfig sconfig = config; + sconfig.memory = _circ_buff.get_actual_memory(); + for (size_t i = 0; i < _available_buffers.size(); i++) + { + sconfig.user_index = i; + SBuffer(sconfig); + //buffer derefs and returns to this queue thru token callback + } +} + +SBuffer &BufferQueueCirc::front(void) +{ + ASSERT(not this->empty()); + SBuffer &front = _available_buffers.front(); + front->config.memory = _write_ptr; + return front; +} + +void BufferQueueCirc::pop(void) +{ + ASSERT(not this->empty()); + SBuffer &front = _available_buffers.front(); + const size_t num_bytes = front.offset; + + //store number of bytes for buffer return + _outgone_bytes[front.get_user_index()] = num_bytes; + + //pop the buffer from internal reference + _available_buffers.pop_front(); + + //adjust the write pointer + _write_ptr += num_bytes; + + //handle circular wrap + if (_write_ptr > (char *)_circ_buff.get(_circ_buff.get_actual_length())) + { + _write_ptr -= _circ_buff.get_actual_length(); + } + + //subtract out of available bytes + ASSERT(_bytes_avail >= num_bytes); + _bytes_avail -= num_bytes; +} + +void BufferQueueCirc::push(const SBuffer &buff) +{ + _returned_buffers[buff.get_user_index()] = buff; + + //ack starting at the expected index and up + while (_returned_buffers[_ack_index]) + { + //return the held bytes to the available + _bytes_avail += _outgone_bytes[_ack_index]; + + //remove the buffer container into the queue + _available_buffers.push_back(_returned_buffers[_ack_index]); + _returned_buffers[_ack_index].reset(); + + //increment the ack index for the next run + if (++_ack_index == _returned_buffers.size()) _ack_index = 0; + } +} + BufferQueueSptr BufferQueue::make_circ( - const SBufferConfig &config + const SBufferConfig &config, + const size_t num_buffs ){ - + return BufferQueueSptr(new BufferQueueCirc(config, num_buffs)); } diff --git a/lib/buffer_queue_pool.cpp b/lib/buffer_queue_pool.cpp index 7a531a4..db9b68a 100644 --- a/lib/buffer_queue_pool.cpp +++ b/lib/buffer_queue_pool.cpp @@ -9,44 +9,44 @@ using namespace gras; struct BufferQueuePool : BufferQueue { BufferQueuePool(const SBufferConfig &config, const size_t num): - token(config.token), //save config, its holds token - queue(boost::circular_buffer<SBuffer>(num)) + _token(config.token), //save config, its holds token + _queue(boost::circular_buffer<SBuffer>(num)) { //NOP } ~BufferQueuePool(void) { - token.reset(); - queue.clear(); + _token.reset(); + _queue.clear(); } SBuffer &front(void) { - ASSERT(not queue.empty()); - ASSERT(queue.front()); - return queue.front(); + ASSERT(not _queue.empty()); + ASSERT(_queue.front()); + return _queue.front(); } void pop(void) { - ASSERT(not queue.empty()); - queue.front().reset(); //dont hold ref - queue.pop_front(); + ASSERT(not _queue.empty()); + _queue.front().reset(); //dont hold ref + _queue.pop_front(); } void push(const SBuffer &buff) { - queue.push_back(buff); + _queue.push_back(buff); } bool empty(void) const { - return queue.empty(); + return _queue.empty(); } - SBufferToken token; - boost::circular_buffer<SBuffer> queue; + SBufferToken _token; + boost::circular_buffer<SBuffer> _queue; }; @@ -60,7 +60,7 @@ BufferQueueSptr BufferQueue::make_pool( { SBuffer buff(config); std::memset(buff.get_actual_memory(), 0, buff.get_actual_length()); - //bq->push(buff); + //buffer derefs and returns to this queue thru token callback } return queue; diff --git a/lib/circular_buffer.cpp b/lib/circular_buffer.cpp index 1aa745b..7f482a7 100644 --- a/lib/circular_buffer.cpp +++ b/lib/circular_buffer.cpp @@ -137,8 +137,7 @@ static void circular_buffer_delete(SBuffer &buff, CircularBuffer *circ_buff) delete circ_buff; } -/* -SBuffer EndlessBufferQueue::make_circular_buffer(const size_t num_bytes) +SBuffer make_circular_buffer(const size_t num_bytes) { CircularBuffer *circ_buff = new CircularBuffer(num_bytes); SBufferDeleter deleter = boost::bind(&circular_buffer_delete, _1, circ_buff); @@ -148,4 +147,3 @@ SBuffer EndlessBufferQueue::make_circular_buffer(const size_t num_bytes) config.deleter = deleter; return SBuffer(config); } -*/ diff --git a/lib/gras_impl/endless_buffer_queue.hpp b/lib/gras_impl/endless_buffer_queue.hpp deleted file mode 100644 index 4e592af..0000000 --- a/lib/gras_impl/endless_buffer_queue.hpp +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. - -#ifndef INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP -#define INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP - -#include <gras_impl/debug.hpp> -#include <gras/sbuffer.hpp> -#include <boost/bind.hpp> -#include <boost/circular_buffer.hpp> -#include <vector> - -namespace gras -{ - -struct EndlessBufferQueue -{ - enum {MAX_QUEUE_SIZE = 128}; - - static SBuffer make_circular_buffer(const size_t num_bytes); - - EndlessBufferQueue(const size_t num_bytes); - - SBuffer &front(void); - - void pop(const size_t num_bytes); - - void push(const SBuffer &buff); - - GRAS_FORCE_INLINE bool empty(void) const - { - return _bytes_avail == 0 or _available_buffers.empty(); - } - - SBufferToken _token; - SBuffer _circ_buff; - char *_write_ptr; - size_t _bytes_avail; - size_t _ack_index; - boost::circular_buffer<SBuffer> _available_buffers; - std::vector<SBuffer> _returned_buffers; -}; - -EndlessBufferQueue::EndlessBufferQueue(const size_t num_bytes) -{ - _ack_index = 0; - - //allocate a large buffer - _circ_buff = EndlessBufferQueue::make_circular_buffer(num_bytes); - _write_ptr = (char *)_circ_buff.get_actual_memory(); - _bytes_avail = _circ_buff.get_actual_length(); - - //create token as buffer returner - SBufferDeleter deleter = boost::bind(&EndlessBufferQueue::push, this, _1); - _token = SBufferToken(new SBufferDeleter(deleter)); - - //allocate pool of sbuffers - _available_buffers.resize(MAX_QUEUE_SIZE); - _returned_buffers.resize(MAX_QUEUE_SIZE); - SBufferConfig config; - config.memory = _circ_buff.get_actual_memory(); - config.length = _circ_buff.get_actual_length(); - for (size_t i = 0; i < _available_buffers.size(); i++) - { - config.user_index = i; - _available_buffers.push_back(SBuffer(config)); - } -} - -GRAS_FORCE_INLINE SBuffer &EndlessBufferQueue::front(void) -{ - ASSERT(not this->empty()); - SBuffer &front = _available_buffers.front(); - front->config.memory = _write_ptr; - front->config.length = _bytes_avail; - front.offset = 0; - front.length = 0; - return front; -} - -GRAS_FORCE_INLINE void EndlessBufferQueue::pop(const size_t num_bytes) -{ - ASSERT(_bytes_avail >= num_bytes); - SBuffer &front = _available_buffers.front(); - front->config.length = num_bytes; - _write_ptr += num_bytes; - if (_write_ptr > (char *)_circ_buff.get(_circ_buff.get_actual_length())) - { - _write_ptr -= _circ_buff.get_actual_length(); - } - _bytes_avail -= num_bytes; -} - -void EndlessBufferQueue::push(const SBuffer &buff) -{ - _returned_buffers[buff.get_user_index()] = buff; - while (_returned_buffers[_ack_index]) - { - _available_buffers.push_back(_returned_buffers[_ack_index]); - _returned_buffers[_ack_index].reset(); - _ack_index++; - if (_ack_index == _returned_buffers.size()) _ack_index = 0; - } -} - -} //namespace gras - -#endif /*INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP*/ |