summaryrefslogtreecommitdiff
path: root/lib/buffer_queue_circ.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/buffer_queue_circ.cpp')
-rw-r--r--lib/buffer_queue_circ.cpp137
1 files changed, 137 insertions, 0 deletions
diff --git a/lib/buffer_queue_circ.cpp b/lib/buffer_queue_circ.cpp
new file mode 100644
index 0000000..60a2b51
--- /dev/null
+++ b/lib/buffer_queue_circ.cpp
@@ -0,0 +1,137 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <gras/buffer_queue.hpp>
+#include <gras_impl/debug.hpp>
+#include <boost/circular_buffer.hpp>
+#include <vector>
+
+using namespace gras;
+
+SBuffer make_circular_buffer(const size_t num_bytes); //circular_buffer.cpp
+
+static void buffer_dummy_delete(SBuffer &, const SBuffer /*ref holder*/)
+{
+ //NOP
+}
+
+struct BufferQueueCirc : BufferQueue
+{
+ BufferQueueCirc(const SBufferConfig &config, const size_t num);
+
+ ~BufferQueueCirc(void)
+ {
+ _token.reset();
+ _available_buffers.clear();
+ _returned_buffers.clear();
+ }
+
+ SBuffer &front(void);
+
+ void pop(void);
+
+ void push(const SBuffer &buff);
+
+ bool empty(void) const
+ {
+ return _bytes_avail == 0 or _available_buffers.empty();
+ }
+
+ SBufferToken _token;
+ SBuffer _circ_buff;
+ char *_write_ptr;
+ size_t _bytes_avail;
+ size_t _ack_index;
+ boost::circular_buffer<SBuffer> _available_buffers;
+ std::vector<SBuffer> _returned_buffers;
+ std::vector<size_t> _outgone_bytes;
+
+};
+
+BufferQueueCirc::BufferQueueCirc(const SBufferConfig &config, const size_t num_buffs):
+ _token(config.token),
+ _ack_index(0)
+{
+ //allocate a large buffer
+ const size_t num_bytes = config.length * num_buffs;
+ _circ_buff = make_circular_buffer(num_bytes);
+ _write_ptr = (char *)_circ_buff.get_actual_memory();
+ _bytes_avail = _circ_buff.get_actual_length();
+
+ //create dummy deleter to hold ref to circ buff
+ SBufferDeleter deleter = boost::bind(&buffer_dummy_delete, _1, _circ_buff);
+
+ //allocate pool of sbuffers
+ _available_buffers.set_capacity(num_buffs);
+ _returned_buffers.resize(num_buffs);
+ _outgone_bytes.resize(num_buffs, 0);
+ SBufferConfig sconfig = config;
+ sconfig.deleter = deleter;
+ sconfig.memory = _circ_buff.get_actual_memory();
+ for (size_t i = 0; i < num_buffs; i++)
+ {
+ sconfig.user_index = i;
+ SBuffer buff(sconfig);
+ //buffer derefs and returns to this queue thru token callback
+ }
+}
+
+SBuffer &BufferQueueCirc::front(void)
+{
+ ASSERT(not this->empty());
+ SBuffer &front = _available_buffers.front();
+ ASSERT(front);
+ front->config.memory = _write_ptr;
+ return front;
+}
+
+void BufferQueueCirc::pop(void)
+{
+ ASSERT(not this->empty());
+ SBuffer &front = _available_buffers.front();
+ const size_t num_bytes = front.offset;
+
+ //store number of bytes for buffer return
+ _outgone_bytes[front.get_user_index()] = num_bytes;
+
+ //pop the buffer from internal reference
+ _available_buffers.pop_front();
+
+ //adjust the write pointer
+ _write_ptr += num_bytes;
+
+ //handle circular wrap
+ if (_write_ptr > (char *)_circ_buff.get(_circ_buff.get_actual_length()))
+ {
+ _write_ptr -= _circ_buff.get_actual_length();
+ }
+
+ //subtract out of available bytes
+ ASSERT(_bytes_avail >= num_bytes);
+ _bytes_avail -= num_bytes;
+}
+
+void BufferQueueCirc::push(const SBuffer &buff)
+{
+ _returned_buffers[buff.get_user_index()] = buff;
+
+ //ack starting at the expected index and up
+ while (_returned_buffers[_ack_index])
+ {
+ //return the held bytes to the available
+ _bytes_avail += _outgone_bytes[_ack_index];
+
+ //remove the buffer container into the queue
+ _available_buffers.push_back(_returned_buffers[_ack_index]);
+ _returned_buffers[_ack_index].reset();
+
+ //increment the ack index for the next run
+ if (++_ack_index == _returned_buffers.size()) _ack_index = 0;
+ }
+}
+
+BufferQueueSptr BufferQueue::make_circ(
+ const SBufferConfig &config,
+ const size_t num_buffs
+){
+ return BufferQueueSptr(new BufferQueueCirc(config, num_buffs));
+}