diff options
author | Josh Blum | 2012-12-13 21:44:44 -0800 |
---|---|---|
committer | Josh Blum | 2012-12-13 21:44:44 -0800 |
commit | 6a03c661ede88203ff90eb01bf1e678b87cb6056 (patch) | |
tree | 1895a1956fde8cc9e6656784fcd5e4f34cb6982c /lib/gras_impl | |
parent | 83ad787f994d6efbde505ce9915f44bef2b5408d (diff) | |
download | sandhi-6a03c661ede88203ff90eb01bf1e678b87cb6056.tar.gz sandhi-6a03c661ede88203ff90eb01bf1e678b87cb6056.tar.bz2 sandhi-6a03c661ede88203ff90eb01bf1e678b87cb6056.zip |
endless buffer acking implementation
Diffstat (limited to 'lib/gras_impl')
-rw-r--r-- | lib/gras_impl/endless_buffer_queue.hpp | 31 |
1 files changed, 20 insertions, 11 deletions
diff --git a/lib/gras_impl/endless_buffer_queue.hpp b/lib/gras_impl/endless_buffer_queue.hpp index 68797a2..4e592af 100644 --- a/lib/gras_impl/endless_buffer_queue.hpp +++ b/lib/gras_impl/endless_buffer_queue.hpp @@ -7,6 +7,7 @@ #include <gras/sbuffer.hpp> #include <boost/bind.hpp> #include <boost/circular_buffer.hpp> +#include <vector> namespace gras { @@ -25,18 +26,22 @@ struct EndlessBufferQueue void push(const SBuffer &buff); + GRAS_FORCE_INLINE bool empty(void) const + { + return _bytes_avail == 0 or _available_buffers.empty(); + } + SBufferToken _token; SBuffer _circ_buff; char *_write_ptr; size_t _bytes_avail; - size_t _cur_index, _ack_index; + size_t _ack_index; boost::circular_buffer<SBuffer> _available_buffers; - boost::circular_buffer<SBuffer> _returned_buffers; + std::vector<SBuffer> _returned_buffers; }; EndlessBufferQueue::EndlessBufferQueue(const size_t num_bytes) { - _cur_index = 0; _ack_index = 0; //allocate a large buffer @@ -56,14 +61,14 @@ EndlessBufferQueue::EndlessBufferQueue(const size_t num_bytes) config.length = _circ_buff.get_actual_length(); for (size_t i = 0; i < _available_buffers.size(); i++) { + config.user_index = i; _available_buffers.push_back(SBuffer(config)); } } GRAS_FORCE_INLINE SBuffer &EndlessBufferQueue::front(void) { - ASSERT(_bytes_avail); - ASSERT(not _available_buffers.empty()); + ASSERT(not this->empty()); SBuffer &front = _available_buffers.front(); front->config.memory = _write_ptr; front->config.length = _bytes_avail; @@ -77,20 +82,24 @@ GRAS_FORCE_INLINE void EndlessBufferQueue::pop(const size_t num_bytes) ASSERT(_bytes_avail >= num_bytes); SBuffer &front = _available_buffers.front(); front->config.length = num_bytes; - front->config.user_index = _cur_index++; _write_ptr += num_bytes; - if (_write_ptr > (char *)_circ_buff.get(circ_buff.get_actual_length())) + if (_write_ptr > (char *)_circ_buff.get(_circ_buff.get_actual_length())) { - _write_ptr -= circ_buff.get_actual_length(); + _write_ptr -= _circ_buff.get_actual_length(); } _bytes_avail -= num_bytes; } void EndlessBufferQueue::push(const SBuffer &buff) { - _returned_buffers.push_back(buff); - //BOOST_FOREACH( - //TODO update available + _returned_buffers[buff.get_user_index()] = buff; + while (_returned_buffers[_ack_index]) + { + _available_buffers.push_back(_returned_buffers[_ack_index]); + _returned_buffers[_ack_index].reset(); + _ack_index++; + if (_ack_index == _returned_buffers.size()) _ack_index = 0; + } } } //namespace gras |