summaryrefslogtreecommitdiff
path: root/lib/gras_impl/input_buffer_queues.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gras_impl/input_buffer_queues.hpp')
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp126
1 files changed, 40 insertions, 86 deletions
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 3666330..2789f92 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -33,20 +33,17 @@ namespace gnuradio
struct InputBufferQueues
{
enum {MAX_QUEUE_SIZE = 128};
+ enum {MAX_AUX_BUFF_BYTES=(1<<16)};
~InputBufferQueues(void)
{
this->resize(0);
}
- void init(
- const std::vector<size_t> &input_history_items,
- const std::vector<size_t> &input_reserve_items,
- const std::vector<size_t> &input_item_sizes
- );
+ void update_history_bytes(const size_t i, const size_t hist_bytes);
//! Call to get an input buffer for work
- SBuffer front(const size_t i, const bool conserve_history, bool &potential_GRAS_FORCE_INLINE);
+ SBuffer front(const size_t i, bool &potential_GRAS_FORCE_INLINE);
//! Call when input bytes consumed by work
void consume(const size_t i, const size_t bytes_consumed);
@@ -98,17 +95,14 @@ struct InputBufferQueues
GRAS_FORCE_INLINE void __update(const size_t i)
{
- _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i]);
+ _bitset.set(i, _enqueued_bytes[i] != 0);
}
BitSet _bitset;
std::vector<size_t> _enqueued_bytes;
std::vector<boost::circular_buffer<SBuffer> > _queues;
std::vector<size_t> _history_bytes;
- std::vector<size_t> _reserve_bytes;
- std::vector<size_t> _post_bytes;
std::vector<boost::shared_ptr<BufferQueue> > _aux_queues;
- std::vector<bool> _in_aux_buff;
};
@@ -118,93 +112,56 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size)
_enqueued_bytes.resize(size, 0);
_queues.resize(size, boost::circular_buffer<SBuffer>(MAX_QUEUE_SIZE));
_history_bytes.resize(size, 0);
- _reserve_bytes.resize(size, 0);
- _post_bytes.resize(size, 0);
_aux_queues.resize(size);
- _in_aux_buff.resize(size, false);
-}
-
-GRAS_FORCE_INLINE void InputBufferQueues::init(
- const std::vector<size_t> &input_history_items,
- const std::vector<size_t> &input_reserve_items,
- const std::vector<size_t> &input_item_sizes
-){
- if (this->size() == 0) return;
-
- const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end());
for (size_t i = 0; i < this->size(); i++)
{
- ASSERT(input_reserve_items[i] > 0);
-
+ if (_aux_queues[i]) continue;
_aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue());
+ _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
+ _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
+ }
- //determine byte sizes for buffers and dealing with history
- const size_t old_history = _history_bytes[i];
- _history_bytes[i] = input_item_sizes[i]*input_history_items[i];
-
- //calculate the input reserve aka reserve size
- _reserve_bytes[i] = input_item_sizes[i]*input_reserve_items[i];
- _reserve_bytes[i] = std::max(size_t(1), _reserve_bytes[i]);
-
- //calculate the input reserve aka reserve size
- _reserve_bytes[i] = std::max(
- _history_bytes[i] + _reserve_bytes[i],
- _reserve_bytes[i]
- );
- _reserve_bytes[i] = 1;
-
- //post bytes are the desired buffer size to escape the edge case
- _post_bytes[i] = std::max(
- input_item_sizes[i]*max_history_items + _reserve_bytes[i],
- _reserve_bytes[i]
- );
-
- //allocate mini buffers for history edge conditions
- size_t num_bytes = _post_bytes[i];
- _aux_queues[i]->allocate_one(num_bytes);
- _aux_queues[i]->allocate_one(num_bytes);
-
- //there is history, so enqueue some initial history
- if (_history_bytes[i] > old_history)
- {
- SBuffer buff = _aux_queues[i]->front();
- _aux_queues[i]->pop();
+}
- const size_t delta = _history_bytes[i] - old_history;
- std::memset(buff.get_actual_memory(), 0, delta);
- buff.offset = 0;
- buff.length = delta;
+inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t hist_bytes)
+{
+ //there is history, so enqueue some initial history
+ if (hist_bytes > _history_bytes[i])
+ {
+ SBuffer buff = _aux_queues[i]->front();
+ _aux_queues[i]->pop();
- this->push(i, buff);
- _in_aux_buff[i] = true;
- }
- if (_history_bytes[i] < old_history)
- {
- size_t delta = old_history - _history_bytes[i];
- delta = std::min(delta, _enqueued_bytes[i]); //FIXME
- //TODO consume extra delta on push...? so we dont need std::min
- this->consume(i, delta);
- }
+ const size_t delta = hist_bytes - _history_bytes[i];
+ std::memset(buff.get_actual_memory(), 0, delta);
+ buff.offset = 0;
+ buff.length = delta;
+
+ this->push(i, buff);
+ }
+ if (hist_bytes < _history_bytes[i])
+ {
+ size_t delta = _history_bytes[i] - hist_bytes;
+ delta = std::min(delta, _enqueued_bytes[i]); //FIXME
+ //TODO consume extra delta on push...? so we dont need std::min
+ this->consume(i, delta);
}
-}
+ _history_bytes[i] = hist_bytes;
+}
-GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, const bool conserve_history, bool &potential_inline)
+GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, bool &potential_inline)
{
//if (_queues[i].empty()) return BuffInfo();
ASSERT(not _queues[i].empty());
ASSERT(this->ready(i));
__prepare(i);
- ASSERT(_queues[i].front().length >= _history_bytes[i]);
SBuffer &front = _queues[i].front();
const bool unique = front.unique();
//same buffer, different offset and length
SBuffer buff = front;
- //if (conserve_history) ASSERT(buff.length >= _history_bytes[i]);
- //if (conserve_history) buff.length -= _history_bytes[i];
//set the flag that this buffer *might* be inlined as an output buffer
potential_inline = unique and (buff.length == front.length);
@@ -234,7 +191,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i)
-
+ /*
@@ -284,6 +241,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i)
//destination buffer is the new front of the queue
_queues[i].push_front(dst);
}
+ */
}
@@ -298,12 +256,14 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b
_queues[i].front().offset += bytes_consumed;
_queues[i].front().length -= bytes_consumed;
+ ASSERT(_queues[i].front().offset <= _queues[i].front().get_actual_length());
+
//safe to pop here when the buffer is consumed and no history
- if (_queues[i].front().length == 0 and _history_bytes[i] == 0)
+ //if (_queues[i].front().length == 0/* and _history_bytes[i] == 0*/)
{
- _queues[i].pop_front();
+ // _queues[i].pop_front();
}
-
+/*
else if (_in_aux_buff[i] and _queues[i].front().offset >= 2*_history_bytes[i])
{
const SBuffer buff = _queues[i].front();
@@ -321,18 +281,12 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b
_queues[i].front().offset -= residual;
}
}
+ */
//update the number of bytes in this queue
ASSERT(_enqueued_bytes[i] >= bytes_consumed);
_enqueued_bytes[i] -= bytes_consumed;
- //we have consumed the history, change reqs
- if (_enqueued_bytes[i] < _history_bytes[i])
- {
- _history_bytes[i] = 0;
- _reserve_bytes[i] = 1; //cant be 0
- }
-
__update(i);
}