summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/gras/buffer_queue.hpp38
-rw-r--r--lib/block_task.cpp5
-rw-r--r--lib/buffer_queue_circ.cpp120
-rw-r--r--lib/buffer_queue_pool.cpp30
-rw-r--r--lib/circular_buffer.cpp4
-rw-r--r--lib/gras_impl/endless_buffer_queue.hpp107
6 files changed, 172 insertions, 132 deletions
diff --git a/include/gras/buffer_queue.hpp b/include/gras/buffer_queue.hpp
index 5125555..da8de41 100644
--- a/include/gras/buffer_queue.hpp
+++ b/include/gras/buffer_queue.hpp
@@ -18,11 +18,41 @@ typedef boost::shared_ptr<BufferQueue> BufferQueueSptr;
struct BufferQueue
{
- //! Create a buffer queue using the pool allocator
- GRAS_API static BufferQueueSptr make_pool(const SBufferConfig &config, const size_t num_buffs);
+ /*!
+ * Create a buffer queue object using the pool allocator.
+ * A pool of buffers contains individual buffer allocations.
+ *
+ * The config param is used to pass information
+ * about per-buffer size, affinity, and token.
+ *
+ * \param config used to alloc one buffer
+ * \param num_buffs alloc this many buffs
+ * \return a new buffer queue sptr
+ */
+ GRAS_API static BufferQueueSptr make_pool(
+ const SBufferConfig &config,
+ const size_t num_buffs
+ );
- //! Create a buffer queue using the circular buffer allocator
- GRAS_API static BufferQueueSptr make_circ(const SBufferConfig &config);
+ /*!
+ * Create a buffer queue object using the circular allocator.
+ * The circular allocator contains one large double-mapped buffer.
+ * This buffer is virtually mapped so that buff[n+len] = buff[n].
+ *
+ * Pieces of this buffer can be passed around in smaller chunks.
+ * The number of chunks is dictated by num_buffs,
+ * the size of the chunks is dictated by config.length.
+ * The size of the circular buffer will be num_buffs*config.length
+ *
+ * \param config used to alloc one buffer
+ * \param num_buffs this many smaller chunks
+ *
+ * \return a new buffer queue sptr
+ */
+ GRAS_API static BufferQueueSptr make_circ(
+ const SBufferConfig &config,
+ const size_t num_buffs
+ );
//! Get a reference to the buffer at the front of the queue
virtual SBuffer &front(void) = 0;
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index efdc574..c84f702 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -94,6 +94,9 @@ void BlockActor::output_fail(const size_t i)
//force pop so next work() gets a new buffer
this->flush_output(i, true);
+
+ //mark fail: not ready until a new buffer appears
+ this->output_queues.fail(i);
}
GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void)
@@ -264,7 +267,7 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i, const bool force_pop)
{
- if (not this->output_queues.ready(i) or this->output_queues.front(i).length == 0) return;
+ if (this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return;
SBuffer &buff = this->output_queues.front(i);
InputBufferMessage buff_msg;
buff_msg.buffer = buff;
diff --git a/lib/buffer_queue_circ.cpp b/lib/buffer_queue_circ.cpp
index 41a31c7..5e69fe0 100644
--- a/lib/buffer_queue_circ.cpp
+++ b/lib/buffer_queue_circ.cpp
@@ -1,11 +1,127 @@
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
#include <gras/buffer_queue.hpp>
+#include <gras_impl/debug.hpp>
+#include <boost/circular_buffer.hpp>
+#include <vector>
using namespace gras;
+SBuffer make_circular_buffer(const size_t num_bytes); //circular_buffer.cpp
+
+struct BufferQueueCirc : BufferQueue
+{
+ BufferQueueCirc(const SBufferConfig &config, const size_t num);
+
+ ~BufferQueueCirc(void)
+ {
+ _token.reset();
+ _available_buffers.clear();
+ _returned_buffers.clear();
+ }
+
+ SBuffer &front(void);
+
+ void pop(void);
+
+ void push(const SBuffer &buff);
+
+ bool empty(void) const
+ {
+ return _bytes_avail == 0 or _available_buffers.empty();
+ }
+
+ SBufferToken _token;
+ SBuffer _circ_buff;
+ char *_write_ptr;
+ size_t _bytes_avail;
+ size_t _ack_index;
+ boost::circular_buffer<SBuffer> _available_buffers;
+ std::vector<SBuffer> _returned_buffers;
+ std::vector<size_t> _outgone_bytes;
+
+};
+
+BufferQueueCirc::BufferQueueCirc(const SBufferConfig &config, const size_t num_buffs):
+ _token(config.token),
+ _ack_index(0)
+{
+ //allocate a large buffer
+ const size_t num_bytes = config.length * num_buffs;
+ _circ_buff = make_circular_buffer(num_bytes);
+ _write_ptr = (char *)_circ_buff.get_actual_memory();
+ _bytes_avail = _circ_buff.get_actual_length();
+
+ //allocate pool of sbuffers
+ _available_buffers.resize(num_buffs);
+ _returned_buffers.resize(num_buffs);
+ _outgone_bytes.resize(num_buffs, 0);
+ SBufferConfig sconfig = config;
+ sconfig.memory = _circ_buff.get_actual_memory();
+ for (size_t i = 0; i < _available_buffers.size(); i++)
+ {
+ sconfig.user_index = i;
+ SBuffer(sconfig);
+ //buffer derefs and returns to this queue thru token callback
+ }
+}
+
+SBuffer &BufferQueueCirc::front(void)
+{
+ ASSERT(not this->empty());
+ SBuffer &front = _available_buffers.front();
+ front->config.memory = _write_ptr;
+ return front;
+}
+
+void BufferQueueCirc::pop(void)
+{
+ ASSERT(not this->empty());
+ SBuffer &front = _available_buffers.front();
+ const size_t num_bytes = front.offset;
+
+ //store number of bytes for buffer return
+ _outgone_bytes[front.get_user_index()] = num_bytes;
+
+ //pop the buffer from internal reference
+ _available_buffers.pop_front();
+
+ //adjust the write pointer
+ _write_ptr += num_bytes;
+
+ //handle circular wrap
+ if (_write_ptr > (char *)_circ_buff.get(_circ_buff.get_actual_length()))
+ {
+ _write_ptr -= _circ_buff.get_actual_length();
+ }
+
+ //subtract out of available bytes
+ ASSERT(_bytes_avail >= num_bytes);
+ _bytes_avail -= num_bytes;
+}
+
+void BufferQueueCirc::push(const SBuffer &buff)
+{
+ _returned_buffers[buff.get_user_index()] = buff;
+
+ //ack starting at the expected index and up
+ while (_returned_buffers[_ack_index])
+ {
+ //return the held bytes to the available
+ _bytes_avail += _outgone_bytes[_ack_index];
+
+ //remove the buffer container into the queue
+ _available_buffers.push_back(_returned_buffers[_ack_index]);
+ _returned_buffers[_ack_index].reset();
+
+ //increment the ack index for the next run
+ if (++_ack_index == _returned_buffers.size()) _ack_index = 0;
+ }
+}
+
BufferQueueSptr BufferQueue::make_circ(
- const SBufferConfig &config
+ const SBufferConfig &config,
+ const size_t num_buffs
){
-
+ return BufferQueueSptr(new BufferQueueCirc(config, num_buffs));
}
diff --git a/lib/buffer_queue_pool.cpp b/lib/buffer_queue_pool.cpp
index 7a531a4..db9b68a 100644
--- a/lib/buffer_queue_pool.cpp
+++ b/lib/buffer_queue_pool.cpp
@@ -9,44 +9,44 @@ using namespace gras;
struct BufferQueuePool : BufferQueue
{
BufferQueuePool(const SBufferConfig &config, const size_t num):
- token(config.token), //save config, its holds token
- queue(boost::circular_buffer<SBuffer>(num))
+ _token(config.token), //save config, its holds token
+ _queue(boost::circular_buffer<SBuffer>(num))
{
//NOP
}
~BufferQueuePool(void)
{
- token.reset();
- queue.clear();
+ _token.reset();
+ _queue.clear();
}
SBuffer &front(void)
{
- ASSERT(not queue.empty());
- ASSERT(queue.front());
- return queue.front();
+ ASSERT(not _queue.empty());
+ ASSERT(_queue.front());
+ return _queue.front();
}
void pop(void)
{
- ASSERT(not queue.empty());
- queue.front().reset(); //dont hold ref
- queue.pop_front();
+ ASSERT(not _queue.empty());
+ _queue.front().reset(); //dont hold ref
+ _queue.pop_front();
}
void push(const SBuffer &buff)
{
- queue.push_back(buff);
+ _queue.push_back(buff);
}
bool empty(void) const
{
- return queue.empty();
+ return _queue.empty();
}
- SBufferToken token;
- boost::circular_buffer<SBuffer> queue;
+ SBufferToken _token;
+ boost::circular_buffer<SBuffer> _queue;
};
@@ -60,7 +60,7 @@ BufferQueueSptr BufferQueue::make_pool(
{
SBuffer buff(config);
std::memset(buff.get_actual_memory(), 0, buff.get_actual_length());
- //bq->push(buff);
+ //buffer derefs and returns to this queue thru token callback
}
return queue;
diff --git a/lib/circular_buffer.cpp b/lib/circular_buffer.cpp
index 1aa745b..7f482a7 100644
--- a/lib/circular_buffer.cpp
+++ b/lib/circular_buffer.cpp
@@ -137,8 +137,7 @@ static void circular_buffer_delete(SBuffer &buff, CircularBuffer *circ_buff)
delete circ_buff;
}
-/*
-SBuffer EndlessBufferQueue::make_circular_buffer(const size_t num_bytes)
+SBuffer make_circular_buffer(const size_t num_bytes)
{
CircularBuffer *circ_buff = new CircularBuffer(num_bytes);
SBufferDeleter deleter = boost::bind(&circular_buffer_delete, _1, circ_buff);
@@ -148,4 +147,3 @@ SBuffer EndlessBufferQueue::make_circular_buffer(const size_t num_bytes)
config.deleter = deleter;
return SBuffer(config);
}
-*/
diff --git a/lib/gras_impl/endless_buffer_queue.hpp b/lib/gras_impl/endless_buffer_queue.hpp
deleted file mode 100644
index 4e592af..0000000
--- a/lib/gras_impl/endless_buffer_queue.hpp
+++ /dev/null
@@ -1,107 +0,0 @@
-// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
-
-#ifndef INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP
-#define INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP
-
-#include <gras_impl/debug.hpp>
-#include <gras/sbuffer.hpp>
-#include <boost/bind.hpp>
-#include <boost/circular_buffer.hpp>
-#include <vector>
-
-namespace gras
-{
-
-struct EndlessBufferQueue
-{
- enum {MAX_QUEUE_SIZE = 128};
-
- static SBuffer make_circular_buffer(const size_t num_bytes);
-
- EndlessBufferQueue(const size_t num_bytes);
-
- SBuffer &front(void);
-
- void pop(const size_t num_bytes);
-
- void push(const SBuffer &buff);
-
- GRAS_FORCE_INLINE bool empty(void) const
- {
- return _bytes_avail == 0 or _available_buffers.empty();
- }
-
- SBufferToken _token;
- SBuffer _circ_buff;
- char *_write_ptr;
- size_t _bytes_avail;
- size_t _ack_index;
- boost::circular_buffer<SBuffer> _available_buffers;
- std::vector<SBuffer> _returned_buffers;
-};
-
-EndlessBufferQueue::EndlessBufferQueue(const size_t num_bytes)
-{
- _ack_index = 0;
-
- //allocate a large buffer
- _circ_buff = EndlessBufferQueue::make_circular_buffer(num_bytes);
- _write_ptr = (char *)_circ_buff.get_actual_memory();
- _bytes_avail = _circ_buff.get_actual_length();
-
- //create token as buffer returner
- SBufferDeleter deleter = boost::bind(&EndlessBufferQueue::push, this, _1);
- _token = SBufferToken(new SBufferDeleter(deleter));
-
- //allocate pool of sbuffers
- _available_buffers.resize(MAX_QUEUE_SIZE);
- _returned_buffers.resize(MAX_QUEUE_SIZE);
- SBufferConfig config;
- config.memory = _circ_buff.get_actual_memory();
- config.length = _circ_buff.get_actual_length();
- for (size_t i = 0; i < _available_buffers.size(); i++)
- {
- config.user_index = i;
- _available_buffers.push_back(SBuffer(config));
- }
-}
-
-GRAS_FORCE_INLINE SBuffer &EndlessBufferQueue::front(void)
-{
- ASSERT(not this->empty());
- SBuffer &front = _available_buffers.front();
- front->config.memory = _write_ptr;
- front->config.length = _bytes_avail;
- front.offset = 0;
- front.length = 0;
- return front;
-}
-
-GRAS_FORCE_INLINE void EndlessBufferQueue::pop(const size_t num_bytes)
-{
- ASSERT(_bytes_avail >= num_bytes);
- SBuffer &front = _available_buffers.front();
- front->config.length = num_bytes;
- _write_ptr += num_bytes;
- if (_write_ptr > (char *)_circ_buff.get(_circ_buff.get_actual_length()))
- {
- _write_ptr -= _circ_buff.get_actual_length();
- }
- _bytes_avail -= num_bytes;
-}
-
-void EndlessBufferQueue::push(const SBuffer &buff)
-{
- _returned_buffers[buff.get_user_index()] = buff;
- while (_returned_buffers[_ack_index])
- {
- _available_buffers.push_back(_returned_buffers[_ack_index]);
- _returned_buffers[_ack_index].reset();
- _ack_index++;
- if (_ack_index == _returned_buffers.size()) _ack_index = 0;
- }
-}
-
-} //namespace gras
-
-#endif /*INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP*/