// // Copyright 2012 Josh Blum // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with io_sig program. If not, see . #ifndef INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP #define INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP #include #include #include #include #include #include #include #include //memcpy/memset namespace gnuradio { struct InputBufferQueues { ~InputBufferQueues(void) { this->resize(0); } void init( const std::vector &input_history_items, const std::vector &input_multiple_items, const std::vector &input_item_sizes ); //! Call to get an input buffer for work SBuffer front(const size_t i, bool &potential_GRAS_FORCE_INLINE); //! Call when input bytes consumed by work void consume(const size_t i, const size_t bytes_consumed); void resize(const size_t size); GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buffer) { _queues[i].push_back(buffer); _enqueued_bytes[i] += _queues[i].back().length; __update(i); } GRAS_FORCE_INLINE void flush(const size_t i) { _queues[i] = std::deque(); _bitset.reset(i); } size_t size(void) const { return _queues.size(); } GRAS_FORCE_INLINE void flush_all(void) { const size_t old_size = this->size(); this->resize(0); this->resize(old_size); } GRAS_FORCE_INLINE bool ready(const size_t i) const { return _bitset[i]; } GRAS_FORCE_INLINE bool empty(const size_t i) const { return not _bitset[i]; } GRAS_FORCE_INLINE bool all_ready(void) const { return (~_bitset).none(); } void __prepare(const size_t i); GRAS_FORCE_INLINE void __update(const size_t i) { _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i]); } boost::dynamic_bitset<> _bitset; std::vector _enqueued_bytes; std::vector > _queues; std::vector _history_bytes; std::vector _reserve_bytes; std::vector _multiple_bytes; std::vector _post_bytes; std::vector > _aux_queues; }; GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size) { _bitset.resize(size); _enqueued_bytes.resize(size, 0); _queues.resize(size); _history_bytes.resize(size, 0); _reserve_bytes.resize(size, 0); _multiple_bytes.resize(size, 0); _post_bytes.resize(size, 0); _aux_queues.resize(size); } static size_t round_up_to_multiple(const size_t at_least, const size_t multiple) { size_t result = (multiple*at_least)/multiple; while (result < at_least) result += multiple; ASSERT((multiple*result)/multiple == result); return result; } GRAS_FORCE_INLINE void InputBufferQueues::init( const std::vector &input_history_items, const std::vector &input_multiple_items, const std::vector &input_item_sizes ){ if (this->size() == 0) return; const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end()); for (size_t i = 0; i < this->size(); i++) { ASSERT(input_multiple_items[i] > 0); _aux_queues[i] = boost::shared_ptr(new BufferQueue()); //determine byte sizes for buffers and dealing with history const size_t old_history = _history_bytes[i]; _history_bytes[i] = input_item_sizes[i]*input_history_items[i]; //calculate the input multiple aka reserve size _multiple_bytes[i] = input_item_sizes[i]*input_multiple_items[i]; _multiple_bytes[i] = std::max(size_t(1), _multiple_bytes[i]); //calculate the input multiple aka reserve size _reserve_bytes[i] = round_up_to_multiple( _history_bytes[i] + _multiple_bytes[i], _multiple_bytes[i] ); //post bytes are the desired buffer size to escape the edge case _post_bytes[i] = round_up_to_multiple( input_item_sizes[i]*max_history_items + _reserve_bytes[i], _multiple_bytes[i] ); //allocate mini buffers for history edge conditions size_t num_bytes = _post_bytes[i]; _aux_queues[i]->allocate_one(num_bytes); _aux_queues[i]->allocate_one(num_bytes); //there is history, so enqueue some initial history if (_history_bytes[i] > old_history) { SBuffer buff = _aux_queues[i]->front(); _aux_queues[i]->pop(); const size_t delta = _history_bytes[i] - old_history; std::memset(buff.get_actual_memory(), 0, delta); buff.offset = 0; buff.length = delta; this->push(i, buff); } if (_history_bytes[i] < old_history) { size_t delta = old_history - _history_bytes[i]; delta = std::min(delta, _enqueued_bytes[i]); //FIXME //TODO consume extra delta on push...? so we dont need std::min this->consume(i, delta); } } } GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, bool &potential_inline) { //if (_queues[i].empty()) return BuffInfo(); ASSERT(not _queues[i].empty()); ASSERT(this->ready(i)); __prepare(i); ASSERT(_queues[i].front().length >= _history_bytes[i]); SBuffer &front = _queues[i].front(); const bool unique = front.unique(); //same buffer, different offset and length SBuffer buff = front; buff.length -= _history_bytes[i]; buff.length /= _multiple_bytes[i]; buff.length *= _multiple_bytes[i]; //set the flag that this buffer *might* be inlined as an output buffer potential_inline = unique and (buff.length == front.length); return buff; } GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i) { //HERE(); //assumes that we are always pushing proper history buffs on front ASSERT(_queues[i].front().length >= _history_bytes[i]); while (_queues[i].front().length < _reserve_bytes[i]) { SBuffer &front = _queues[i].front(); SBuffer dst; //do we need a new buffer: //- is the buffer unique (queue has only reference)? //- can its remaining space meet reserve requirements? const bool enough_space = front.get_actual_length() >= _reserve_bytes[i] + front.offset; if (enough_space and front.unique()) { dst = _queues[i].front(); _queues[i].pop_front(); } else { dst = _aux_queues[i]->front(); _aux_queues[i]->pop(); dst.offset = 0; dst.length = 0; } SBuffer src = _queues[i].front(); _queues[i].pop_front(); const size_t dst_tail = dst.get_actual_length() - (dst.offset + dst.length); const size_t bytes = std::min(dst_tail, src.length); //const size_t bytes = std::min(std::min(dst_tail, src.length), _post_bytes[i]); std::memcpy(dst.get(dst.length), src.get(), bytes); //update buffer additions, consumptions dst.length += bytes; src.offset += bytes; src.length -= bytes; //keep the source buffer if not fully consumed if (src.length) _queues[i].push_front(src); //destination buffer is the new front of the queue _queues[i].push_front(dst); } } GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t bytes_consumed) { //if (bytes_consumed == 0) return true; //assert that we dont consume past the bounds of the buffer ASSERT(_queues[i].front().length >= bytes_consumed); //update bounds on the current buffer _queues[i].front().offset += bytes_consumed; _queues[i].front().length -= bytes_consumed; //safe to pop here when the buffer is consumed and no history if (_queues[i].front().length == 0 and _history_bytes[i] == 0) { _queues[i].pop_front(); } //update the number of bytes in this queue ASSERT(_enqueued_bytes[i] >= bytes_consumed); _enqueued_bytes[i] -= bytes_consumed; __update(i); } } //namespace gnuradio #endif /*INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP*/