summaryrefslogtreecommitdiff
path: root/lib/gras_impl
diff options
context:
space:
mode:
authorJosh Blum2012-12-13 21:44:44 -0800
committerJosh Blum2012-12-13 21:44:44 -0800
commit6a03c661ede88203ff90eb01bf1e678b87cb6056 (patch)
tree1895a1956fde8cc9e6656784fcd5e4f34cb6982c /lib/gras_impl
parent83ad787f994d6efbde505ce9915f44bef2b5408d (diff)
downloadsandhi-6a03c661ede88203ff90eb01bf1e678b87cb6056.tar.gz
sandhi-6a03c661ede88203ff90eb01bf1e678b87cb6056.tar.bz2
sandhi-6a03c661ede88203ff90eb01bf1e678b87cb6056.zip
endless buffer acking implementation
Diffstat (limited to 'lib/gras_impl')
-rw-r--r--lib/gras_impl/endless_buffer_queue.hpp31
1 files changed, 20 insertions, 11 deletions
diff --git a/lib/gras_impl/endless_buffer_queue.hpp b/lib/gras_impl/endless_buffer_queue.hpp
index 68797a2..4e592af 100644
--- a/lib/gras_impl/endless_buffer_queue.hpp
+++ b/lib/gras_impl/endless_buffer_queue.hpp
@@ -7,6 +7,7 @@
#include <gras/sbuffer.hpp>
#include <boost/bind.hpp>
#include <boost/circular_buffer.hpp>
+#include <vector>
namespace gras
{
@@ -25,18 +26,22 @@ struct EndlessBufferQueue
void push(const SBuffer &buff);
+ GRAS_FORCE_INLINE bool empty(void) const
+ {
+ return _bytes_avail == 0 or _available_buffers.empty();
+ }
+
SBufferToken _token;
SBuffer _circ_buff;
char *_write_ptr;
size_t _bytes_avail;
- size_t _cur_index, _ack_index;
+ size_t _ack_index;
boost::circular_buffer<SBuffer> _available_buffers;
- boost::circular_buffer<SBuffer> _returned_buffers;
+ std::vector<SBuffer> _returned_buffers;
};
EndlessBufferQueue::EndlessBufferQueue(const size_t num_bytes)
{
- _cur_index = 0;
_ack_index = 0;
//allocate a large buffer
@@ -56,14 +61,14 @@ EndlessBufferQueue::EndlessBufferQueue(const size_t num_bytes)
config.length = _circ_buff.get_actual_length();
for (size_t i = 0; i < _available_buffers.size(); i++)
{
+ config.user_index = i;
_available_buffers.push_back(SBuffer(config));
}
}
GRAS_FORCE_INLINE SBuffer &EndlessBufferQueue::front(void)
{
- ASSERT(_bytes_avail);
- ASSERT(not _available_buffers.empty());
+ ASSERT(not this->empty());
SBuffer &front = _available_buffers.front();
front->config.memory = _write_ptr;
front->config.length = _bytes_avail;
@@ -77,20 +82,24 @@ GRAS_FORCE_INLINE void EndlessBufferQueue::pop(const size_t num_bytes)
ASSERT(_bytes_avail >= num_bytes);
SBuffer &front = _available_buffers.front();
front->config.length = num_bytes;
- front->config.user_index = _cur_index++;
_write_ptr += num_bytes;
- if (_write_ptr > (char *)_circ_buff.get(circ_buff.get_actual_length()))
+ if (_write_ptr > (char *)_circ_buff.get(_circ_buff.get_actual_length()))
{
- _write_ptr -= circ_buff.get_actual_length();
+ _write_ptr -= _circ_buff.get_actual_length();
}
_bytes_avail -= num_bytes;
}
void EndlessBufferQueue::push(const SBuffer &buff)
{
- _returned_buffers.push_back(buff);
- //BOOST_FOREACH(
- //TODO update available
+ _returned_buffers[buff.get_user_index()] = buff;
+ while (_returned_buffers[_ack_index])
+ {
+ _available_buffers.push_back(_returned_buffers[_ack_index]);
+ _returned_buffers[_ack_index].reset();
+ _ack_index++;
+ if (_ack_index == _returned_buffers.size()) _ack_index = 0;
+ }
}
} //namespace gras