diff options
Diffstat (limited to 'lib/gras_impl/input_buffer_queues.hpp')
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 126 |
1 files changed, 40 insertions, 86 deletions
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 3666330..2789f92 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -33,20 +33,17 @@ namespace gnuradio struct InputBufferQueues { enum {MAX_QUEUE_SIZE = 128}; + enum {MAX_AUX_BUFF_BYTES=(1<<16)}; ~InputBufferQueues(void) { this->resize(0); } - void init( - const std::vector<size_t> &input_history_items, - const std::vector<size_t> &input_reserve_items, - const std::vector<size_t> &input_item_sizes - ); + void update_history_bytes(const size_t i, const size_t hist_bytes); //! Call to get an input buffer for work - SBuffer front(const size_t i, const bool conserve_history, bool &potential_GRAS_FORCE_INLINE); + SBuffer front(const size_t i, bool &potential_GRAS_FORCE_INLINE); //! Call when input bytes consumed by work void consume(const size_t i, const size_t bytes_consumed); @@ -98,17 +95,14 @@ struct InputBufferQueues GRAS_FORCE_INLINE void __update(const size_t i) { - _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i]); + _bitset.set(i, _enqueued_bytes[i] != 0); } BitSet _bitset; std::vector<size_t> _enqueued_bytes; std::vector<boost::circular_buffer<SBuffer> > _queues; std::vector<size_t> _history_bytes; - std::vector<size_t> _reserve_bytes; - std::vector<size_t> _post_bytes; std::vector<boost::shared_ptr<BufferQueue> > _aux_queues; - std::vector<bool> _in_aux_buff; }; @@ -118,93 +112,56 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size) _enqueued_bytes.resize(size, 0); _queues.resize(size, boost::circular_buffer<SBuffer>(MAX_QUEUE_SIZE)); _history_bytes.resize(size, 0); - _reserve_bytes.resize(size, 0); - _post_bytes.resize(size, 0); _aux_queues.resize(size); - _in_aux_buff.resize(size, false); -} - -GRAS_FORCE_INLINE void InputBufferQueues::init( - const std::vector<size_t> &input_history_items, - const std::vector<size_t> &input_reserve_items, - const std::vector<size_t> &input_item_sizes -){ - if (this->size() == 0) return; - - const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end()); for (size_t i = 0; i < this->size(); i++) { - ASSERT(input_reserve_items[i] > 0); - + if (_aux_queues[i]) continue; _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue()); + _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); + _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); + } - //determine byte sizes for buffers and dealing with history - const size_t old_history = _history_bytes[i]; - _history_bytes[i] = input_item_sizes[i]*input_history_items[i]; - - //calculate the input reserve aka reserve size - _reserve_bytes[i] = input_item_sizes[i]*input_reserve_items[i]; - _reserve_bytes[i] = std::max(size_t(1), _reserve_bytes[i]); - - //calculate the input reserve aka reserve size - _reserve_bytes[i] = std::max( - _history_bytes[i] + _reserve_bytes[i], - _reserve_bytes[i] - ); - _reserve_bytes[i] = 1; - - //post bytes are the desired buffer size to escape the edge case - _post_bytes[i] = std::max( - input_item_sizes[i]*max_history_items + _reserve_bytes[i], - _reserve_bytes[i] - ); - - //allocate mini buffers for history edge conditions - size_t num_bytes = _post_bytes[i]; - _aux_queues[i]->allocate_one(num_bytes); - _aux_queues[i]->allocate_one(num_bytes); - - //there is history, so enqueue some initial history - if (_history_bytes[i] > old_history) - { - SBuffer buff = _aux_queues[i]->front(); - _aux_queues[i]->pop(); +} - const size_t delta = _history_bytes[i] - old_history; - std::memset(buff.get_actual_memory(), 0, delta); - buff.offset = 0; - buff.length = delta; +inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t hist_bytes) +{ + //there is history, so enqueue some initial history + if (hist_bytes > _history_bytes[i]) + { + SBuffer buff = _aux_queues[i]->front(); + _aux_queues[i]->pop(); - this->push(i, buff); - _in_aux_buff[i] = true; - } - if (_history_bytes[i] < old_history) - { - size_t delta = old_history - _history_bytes[i]; - delta = std::min(delta, _enqueued_bytes[i]); //FIXME - //TODO consume extra delta on push...? so we dont need std::min - this->consume(i, delta); - } + const size_t delta = hist_bytes - _history_bytes[i]; + std::memset(buff.get_actual_memory(), 0, delta); + buff.offset = 0; + buff.length = delta; + + this->push(i, buff); + } + if (hist_bytes < _history_bytes[i]) + { + size_t delta = _history_bytes[i] - hist_bytes; + delta = std::min(delta, _enqueued_bytes[i]); //FIXME + //TODO consume extra delta on push...? so we dont need std::min + this->consume(i, delta); } -} + _history_bytes[i] = hist_bytes; +} -GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, const bool conserve_history, bool &potential_inline) +GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, bool &potential_inline) { //if (_queues[i].empty()) return BuffInfo(); ASSERT(not _queues[i].empty()); ASSERT(this->ready(i)); __prepare(i); - ASSERT(_queues[i].front().length >= _history_bytes[i]); SBuffer &front = _queues[i].front(); const bool unique = front.unique(); //same buffer, different offset and length SBuffer buff = front; - //if (conserve_history) ASSERT(buff.length >= _history_bytes[i]); - //if (conserve_history) buff.length -= _history_bytes[i]; //set the flag that this buffer *might* be inlined as an output buffer potential_inline = unique and (buff.length == front.length); @@ -234,7 +191,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i) - + /* @@ -284,6 +241,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i) //destination buffer is the new front of the queue _queues[i].push_front(dst); } + */ } @@ -298,12 +256,14 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b _queues[i].front().offset += bytes_consumed; _queues[i].front().length -= bytes_consumed; + ASSERT(_queues[i].front().offset <= _queues[i].front().get_actual_length()); + //safe to pop here when the buffer is consumed and no history - if (_queues[i].front().length == 0 and _history_bytes[i] == 0) + //if (_queues[i].front().length == 0/* and _history_bytes[i] == 0*/) { - _queues[i].pop_front(); + // _queues[i].pop_front(); } - +/* else if (_in_aux_buff[i] and _queues[i].front().offset >= 2*_history_bytes[i]) { const SBuffer buff = _queues[i].front(); @@ -321,18 +281,12 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b _queues[i].front().offset -= residual; } } + */ //update the number of bytes in this queue ASSERT(_enqueued_bytes[i] >= bytes_consumed); _enqueued_bytes[i] -= bytes_consumed; - //we have consumed the history, change reqs - if (_enqueued_bytes[i] < _history_bytes[i]) - { - _history_bytes[i] = 0; - _reserve_bytes[i] = 1; //cant be 0 - } - __update(i); } |