blob: 4e592af6038975b3dbe4a8559a76630a9d976b51 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
#ifndef INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP
#define INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP
#include <gras_impl/debug.hpp>
#include <gras/sbuffer.hpp>
#include <boost/bind.hpp>
#include <boost/circular_buffer.hpp>
#include <vector>
namespace gras
{
struct EndlessBufferQueue
{
enum {MAX_QUEUE_SIZE = 128};
static SBuffer make_circular_buffer(const size_t num_bytes);
EndlessBufferQueue(const size_t num_bytes);
SBuffer &front(void);
void pop(const size_t num_bytes);
void push(const SBuffer &buff);
GRAS_FORCE_INLINE bool empty(void) const
{
return _bytes_avail == 0 or _available_buffers.empty();
}
SBufferToken _token;
SBuffer _circ_buff;
char *_write_ptr;
size_t _bytes_avail;
size_t _ack_index;
boost::circular_buffer<SBuffer> _available_buffers;
std::vector<SBuffer> _returned_buffers;
};
EndlessBufferQueue::EndlessBufferQueue(const size_t num_bytes)
{
_ack_index = 0;
//allocate a large buffer
_circ_buff = EndlessBufferQueue::make_circular_buffer(num_bytes);
_write_ptr = (char *)_circ_buff.get_actual_memory();
_bytes_avail = _circ_buff.get_actual_length();
//create token as buffer returner
SBufferDeleter deleter = boost::bind(&EndlessBufferQueue::push, this, _1);
_token = SBufferToken(new SBufferDeleter(deleter));
//allocate pool of sbuffers
_available_buffers.resize(MAX_QUEUE_SIZE);
_returned_buffers.resize(MAX_QUEUE_SIZE);
SBufferConfig config;
config.memory = _circ_buff.get_actual_memory();
config.length = _circ_buff.get_actual_length();
for (size_t i = 0; i < _available_buffers.size(); i++)
{
config.user_index = i;
_available_buffers.push_back(SBuffer(config));
}
}
GRAS_FORCE_INLINE SBuffer &EndlessBufferQueue::front(void)
{
ASSERT(not this->empty());
SBuffer &front = _available_buffers.front();
front->config.memory = _write_ptr;
front->config.length = _bytes_avail;
front.offset = 0;
front.length = 0;
return front;
}
GRAS_FORCE_INLINE void EndlessBufferQueue::pop(const size_t num_bytes)
{
ASSERT(_bytes_avail >= num_bytes);
SBuffer &front = _available_buffers.front();
front->config.length = num_bytes;
_write_ptr += num_bytes;
if (_write_ptr > (char *)_circ_buff.get(_circ_buff.get_actual_length()))
{
_write_ptr -= _circ_buff.get_actual_length();
}
_bytes_avail -= num_bytes;
}
void EndlessBufferQueue::push(const SBuffer &buff)
{
_returned_buffers[buff.get_user_index()] = buff;
while (_returned_buffers[_ack_index])
{
_available_buffers.push_back(_returned_buffers[_ack_index]);
_returned_buffers[_ack_index].reset();
_ack_index++;
if (_ack_index == _returned_buffers.size()) _ack_index = 0;
}
}
} //namespace gras
#endif /*INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP*/
|