blob: 5e69fe0f2ca4237f2ec317985c4afeab4127da6a (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
#include <gras/buffer_queue.hpp>
#include <gras_impl/debug.hpp>
#include <boost/circular_buffer.hpp>
#include <vector>
using namespace gras;
SBuffer make_circular_buffer(const size_t num_bytes); //circular_buffer.cpp
struct BufferQueueCirc : BufferQueue
{
BufferQueueCirc(const SBufferConfig &config, const size_t num);
~BufferQueueCirc(void)
{
_token.reset();
_available_buffers.clear();
_returned_buffers.clear();
}
SBuffer &front(void);
void pop(void);
void push(const SBuffer &buff);
bool empty(void) const
{
return _bytes_avail == 0 or _available_buffers.empty();
}
SBufferToken _token;
SBuffer _circ_buff;
char *_write_ptr;
size_t _bytes_avail;
size_t _ack_index;
boost::circular_buffer<SBuffer> _available_buffers;
std::vector<SBuffer> _returned_buffers;
std::vector<size_t> _outgone_bytes;
};
BufferQueueCirc::BufferQueueCirc(const SBufferConfig &config, const size_t num_buffs):
_token(config.token),
_ack_index(0)
{
//allocate a large buffer
const size_t num_bytes = config.length * num_buffs;
_circ_buff = make_circular_buffer(num_bytes);
_write_ptr = (char *)_circ_buff.get_actual_memory();
_bytes_avail = _circ_buff.get_actual_length();
//allocate pool of sbuffers
_available_buffers.resize(num_buffs);
_returned_buffers.resize(num_buffs);
_outgone_bytes.resize(num_buffs, 0);
SBufferConfig sconfig = config;
sconfig.memory = _circ_buff.get_actual_memory();
for (size_t i = 0; i < _available_buffers.size(); i++)
{
sconfig.user_index = i;
SBuffer(sconfig);
//buffer derefs and returns to this queue thru token callback
}
}
SBuffer &BufferQueueCirc::front(void)
{
ASSERT(not this->empty());
SBuffer &front = _available_buffers.front();
front->config.memory = _write_ptr;
return front;
}
void BufferQueueCirc::pop(void)
{
ASSERT(not this->empty());
SBuffer &front = _available_buffers.front();
const size_t num_bytes = front.offset;
//store number of bytes for buffer return
_outgone_bytes[front.get_user_index()] = num_bytes;
//pop the buffer from internal reference
_available_buffers.pop_front();
//adjust the write pointer
_write_ptr += num_bytes;
//handle circular wrap
if (_write_ptr > (char *)_circ_buff.get(_circ_buff.get_actual_length()))
{
_write_ptr -= _circ_buff.get_actual_length();
}
//subtract out of available bytes
ASSERT(_bytes_avail >= num_bytes);
_bytes_avail -= num_bytes;
}
void BufferQueueCirc::push(const SBuffer &buff)
{
_returned_buffers[buff.get_user_index()] = buff;
//ack starting at the expected index and up
while (_returned_buffers[_ack_index])
{
//return the held bytes to the available
_bytes_avail += _outgone_bytes[_ack_index];
//remove the buffer container into the queue
_available_buffers.push_back(_returned_buffers[_ack_index]);
_returned_buffers[_ack_index].reset();
//increment the ack index for the next run
if (++_ack_index == _returned_buffers.size()) _ack_index = 0;
}
}
BufferQueueSptr BufferQueue::make_circ(
const SBufferConfig &config,
const size_t num_buffs
){
return BufferQueueSptr(new BufferQueueCirc(config, num_buffs));
}
|