diff options
Diffstat (limited to 'lib/gras_impl/input_buffer_queues.hpp')
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 146 |
1 files changed, 29 insertions, 117 deletions
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 2789f92..1b663c3 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -43,13 +43,21 @@ struct InputBufferQueues void update_history_bytes(const size_t i, const size_t hist_bytes); //! Call to get an input buffer for work - SBuffer front(const size_t i, bool &potential_GRAS_FORCE_INLINE); + GRAS_FORCE_INLINE SBuffer &front(const size_t i) + { + ASSERT(not _queues[i].empty()); + ASSERT(this->ready(i)); + + return _queues[i].front(); + } //! Call when input bytes consumed by work void consume(const size_t i, const size_t bytes_consumed); void resize(const size_t size); + void accumulate(const size_t i, const size_t item_size); + GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buffer) { ASSERT(not _queues[i].full()); @@ -91,8 +99,6 @@ struct InputBufferQueues return _bitset.all(); } - void __prepare(const size_t i); - GRAS_FORCE_INLINE void __update(const size_t i) { _bitset.set(i, _enqueued_bytes[i] != 0); @@ -120,6 +126,8 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size) _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue()); _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); + _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); + _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); } } @@ -150,105 +158,35 @@ inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t _history_bytes[i] = hist_bytes; } -GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, bool &potential_inline) +GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size) { - //if (_queues[i].empty()) return BuffInfo(); - - ASSERT(not _queues[i].empty()); - ASSERT(this->ready(i)); - __prepare(i); - SBuffer &front = _queues[i].front(); - const bool unique = front.unique(); - - //same buffer, different offset and length - SBuffer buff = front; + ASSERT(not _aux_queues[i]->empty()); + SBuffer accum_buff = _aux_queues[i]->front(); + _aux_queues[i]->pop(); + accum_buff.offset = 0; + accum_buff.length = 0; - //set the flag that this buffer *might* be inlined as an output buffer - potential_inline = unique and (buff.length == front.length); + size_t free_bytes = accum_buff.get_actual_length(); + free_bytes /= item_size; free_bytes *= item_size; - return buff; -} - -GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i) -{ - SBufferConfig config; - config.memory = NULL; - config.length = _enqueued_bytes[i]; - SBuffer newbuff(config); - newbuff.offset = 0; - newbuff.length = 0; - - while (not _queues[i].empty()) + while (not _queues[i].empty() and free_bytes != 0) { - std::memcpy(newbuff.get(newbuff.length), _queues[i].front().get(), _queues[i].front().length); - newbuff.length += _queues[i].front().length; - _queues[i].pop_front(); + SBuffer &front = _queues[i].front(); + const size_t bytes = std::min(front.length, free_bytes); + std::memcpy(accum_buff.get(accum_buff.length), front.get(), bytes); + accum_buff.length += bytes; + free_bytes -= bytes; + front.length -= bytes; + front.offset += bytes; + if (front.length == 0) _queues[i].pop_front(); } - _queues[i].push_back(newbuff); + _queues[i].push_front(accum_buff); return; - - - - - /* - - - - - //HERE(); - //assumes that we are always pushing proper history buffs on front - //ASSERT(_queues[i].front().length >= _history_bytes[i]); - - while (_queues[i].front().length < _reserve_bytes[i]) - { - SBuffer &front = _queues[i].front(); - SBuffer dst; - - //do we need a new buffer: - //- is the buffer unique (queue has only reference)? - //- can its remaining space meet reserve requirements? - const bool enough_space = front.get_actual_length() >= _reserve_bytes[i] + front.offset; - if (enough_space and front.unique()) - { - dst = _queues[i].front(); - _queues[i].pop_front(); - } - else - { - dst = _aux_queues[i]->front(); - _aux_queues[i]->pop(); - dst.offset = 0; - dst.length = 0; - _in_aux_buff[i] = true; - } - - SBuffer src = _queues[i].front(); - _queues[i].pop_front(); - const size_t dst_tail = dst.get_actual_length() - (dst.offset + dst.length); - const size_t bytes = std::min(dst_tail, src.length); - //const size_t bytes = std::min(std::min(dst_tail, src.length), _post_bytes[i]); - std::memcpy(dst.get(dst.length), src.get(), bytes); - - //update buffer additions, consumptions - dst.length += bytes; - src.offset += bytes; - src.length -= bytes; - - //keep the source buffer if not fully consumed - if (src.length) _queues[i].push_front(src); - - //destination buffer is the new front of the queue - _queues[i].push_front(dst); - } - */ } - GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t bytes_consumed) { - //if (bytes_consumed == 0) return true; - //assert that we dont consume past the bounds of the buffer ASSERT(_queues[i].front().length >= bytes_consumed); @@ -258,37 +196,11 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b ASSERT(_queues[i].front().offset <= _queues[i].front().get_actual_length()); - //safe to pop here when the buffer is consumed and no history - //if (_queues[i].front().length == 0/* and _history_bytes[i] == 0*/) - { - // _queues[i].pop_front(); - } -/* - else if (_in_aux_buff[i] and _queues[i].front().offset >= 2*_history_bytes[i]) - { - const SBuffer buff = _queues[i].front(); - _queues[i].pop_front(); - - if (_queues[i].empty()) - { - _queues[i].push_front(buff); - } - else - { - _in_aux_buff[i] = false; - const size_t residual = buff.length; - _queues[i].front().length += residual; - _queues[i].front().offset -= residual; - } - } - */ - //update the number of bytes in this queue ASSERT(_enqueued_bytes[i] >= bytes_consumed); _enqueued_bytes[i] -= bytes_consumed; __update(i); - } } //namespace gnuradio |