summaryrefslogtreecommitdiff
path: root/lib/gras_impl/input_buffer_queues.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gras_impl/input_buffer_queues.hpp')
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp25
1 files changed, 21 insertions, 4 deletions
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index adc4edd..aa097ff 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -42,7 +42,7 @@ struct InputBufferQueues
this->resize(0);
}
- void update_config(const size_t i, const size_t, const size_t, const size_t);
+ void update_config(const size_t i, const size_t, const size_t, const size_t, const size_t);
//! Call to get an input buffer for work
GRAS_FORCE_INLINE const SBuffer &front(const size_t i)
@@ -52,6 +52,13 @@ struct InputBufferQueues
//special case when the null buffer is possible
if (_queues[i].empty()) return get_null_buff();
+ //there are enough enqueued bytes, but not in the front buffer
+ if (_queues[i].front().length < _reserve_bytes[i])
+ {
+ this->accumulate(i);
+ }
+ ASSERT(_queues[i].front().length >= _reserve_bytes[i]);
+
return _queues[i].front();
}
@@ -60,7 +67,7 @@ struct InputBufferQueues
void resize(const size_t size);
- void accumulate(const size_t i, const size_t item_size);
+ void accumulate(const size_t i);
/*!
* Can we consider this queue's buffers to be accumulated?
@@ -140,6 +147,7 @@ struct InputBufferQueues
}
BitSet _bitset;
+ std::vector<size_t> _items_sizes;
std::vector<size_t> _enqueued_bytes;
std::vector<size_t> _reserve_bytes;
std::vector<size_t> _maximum_bytes;
@@ -152,6 +160,7 @@ struct InputBufferQueues
GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size)
{
_bitset.resize(size);
+ _items_sizes.resize(size, 0);
_enqueued_bytes.resize(size, 0);
_reserve_bytes.resize(size, 1);
_maximum_bytes.resize(size, MAX_AUX_BUFF_BYTES);
@@ -163,11 +172,14 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size)
inline void InputBufferQueues::update_config(
const size_t i,
+ const size_t item_size,
const size_t hist_bytes,
const size_t reserve_bytes,
const size_t maximum_bytes
)
{
+ _items_sizes[i] = item_size;
+
//first allocate the aux buffer
if (maximum_bytes != 0) _maximum_bytes[i] = maximum_bytes;
_maximum_bytes[i] = std::max(_maximum_bytes[i], reserve_bytes);
@@ -208,16 +220,21 @@ inline void InputBufferQueues::update_config(
this->__update(i);
}
-GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size)
+GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i)
{
+ if (_aux_queues[i]->empty())
+ {
+ _aux_queues[i]->allocate_one(_maximum_bytes[i]);
+ }
ASSERT(not _aux_queues[i]->empty());
+
SBuffer accum_buff = _aux_queues[i]->front();
_aux_queues[i]->pop();
accum_buff.offset = 0;
accum_buff.length = 0;
size_t free_bytes = accum_buff.get_actual_length();
- free_bytes /= item_size; free_bytes *= item_size;
+ free_bytes /= _items_sizes[i]; free_bytes *= _items_sizes[i];
while (not _queues[i].empty() and free_bytes != 0)
{