blob: f0859a367f2f757e2a34b319ebadd4c0e12df8dd (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
#ifndef INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP
#define INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP
#include <gras/buffer_queue.hpp>
#include <gras_impl/bitset.hpp>
#include <vector>
namespace gras
{
struct OutputBufferQueues
{
std::string name; //for debug
void set_buffer_queue(const size_t i, BufferQueueSptr queue)
{
_queues[i] = queue;
_update(i);
}
void set_reserve_bytes(const size_t i, const size_t num_bytes)
{
_reserve_bytes[i] = num_bytes;
if (_queues[i]) _update(i);
}
GRAS_FORCE_INLINE void resize(const size_t size)
{
_bitset.resize(size);
_queues.resize(size);
_reserve_bytes.resize(size, 1);
_inline_buffer.resize(size);
}
GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buff)
{
if (not _queues[i]) return; //block is likely done, throw out buffer
_queues[i]->push(buff);
_update(i);
}
GRAS_FORCE_INLINE void flush_all(void)
{
const size_t old_size = this->size();
this->resize(0);
this->resize(old_size);
}
GRAS_FORCE_INLINE SBuffer &front(const size_t i)
{
if GRAS_UNLIKELY(_inline_buffer[i]) return _inline_buffer[i];
ASSERT(not this->empty(i));
return _queues[i]->front();
}
GRAS_FORCE_INLINE void consume(const size_t i)
{
if GRAS_UNLIKELY(_inline_buffer[i])
{
_inline_buffer[i].reset();
return;
}
ASSERT(not this->empty(i));
SBuffer &buff = this->front(i);
if GRAS_UNLIKELY(buff.length == 0) return;
//increment buffer for next use
buff.offset += buff.length;
buff.length = 0;
this->pop(i);
}
GRAS_FORCE_INLINE void pop(const size_t i)
{
if GRAS_UNLIKELY(_inline_buffer[i])
{
_inline_buffer[i].reset();
return;
}
ASSERT(_queues[i]);
ASSERT(not _queues[i]->empty());
_queues[i]->pop();
_update(i);
}
GRAS_FORCE_INLINE void fail(const size_t i)
{
_bitset.reset(i);
}
GRAS_FORCE_INLINE bool ready(const size_t i) const
{
return _bitset[i];
}
GRAS_FORCE_INLINE bool empty(const size_t i) const
{
if GRAS_UNLIKELY(_inline_buffer[i]) return false;
return (not _queues[i] or _queues[i]->empty());
}
GRAS_FORCE_INLINE bool all_ready(void) const
{
return _bitset.all();
}
GRAS_FORCE_INLINE size_t size(void) const
{
return _queues.size();
}
GRAS_FORCE_INLINE void _update(const size_t i)
{
if (not _queues[i] or _queues[i]->empty())
{
_bitset.reset(i);
return;
}
const SBuffer &front = _queues[i]->front();
const size_t avail = front.get_actual_length() - front.offset - front.length;
_bitset.set(i, avail >= _reserve_bytes[i]);
}
GRAS_FORCE_INLINE void set_inline(const size_t i, const SBuffer &inline_buffer)
{
_inline_buffer[i] = inline_buffer;
_inline_buffer[i].length = 0;
_bitset.set(i);
}
BitSet _bitset;
std::vector<BufferQueueSptr> _queues;
std::vector<size_t> _reserve_bytes;
std::vector<SBuffer> _inline_buffer;
};
} //namespace gras
#endif /*INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP*/
|