// // Copyright 2012 Josh Blum // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with this program. If not, see . #ifndef INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP #define INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP #include #include #include #include #include #include #include #include //memcpy/memset #include namespace gras { struct InputBufferQueues { enum {MAX_QUEUE_SIZE = 128}; enum {MAX_AUX_BUFF_BYTES=(1<<16)}; ~InputBufferQueues(void) { this->resize(0); } void update_history_bytes(const size_t i, const size_t hist_bytes); //! Call to get an input buffer for work GRAS_FORCE_INLINE SBuffer &front(const size_t i) { ASSERT(not _queues[i].empty()); ASSERT(this->ready(i)); return _queues[i].front(); } //! Call when input bytes consumed by work void consume(const size_t i, const size_t bytes_consumed); void resize(const size_t size); void accumulate(const size_t i, const size_t item_size); /*! * Can we consider this queue's buffers to be accumulated? * Either the first buffer holds all of the enqueued bytes * or the first buffer is larger than we can accumulate. */ GRAS_FORCE_INLINE bool is_accumulated(const size_t i) const { ASSERT(not _queues[i].empty()); return (_queues[i].front().length == _enqueued_bytes[i]) or (_queues[i].front().length >= MAX_AUX_BUFF_BYTES); } GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buffer) { ASSERT(not _queues[i].full()); _queues[i].push_back(buffer); _enqueued_bytes[i] += _queues[i].back().length; __update(i); } GRAS_FORCE_INLINE void flush(const size_t i) { _queues[i].clear(); _bitset.reset(i); } size_t size(void) const { return _queues.size(); } GRAS_FORCE_INLINE void flush_all(void) { const size_t old_size = this->size(); this->resize(0); this->resize(old_size); } GRAS_FORCE_INLINE bool ready(const size_t i) const { return _bitset[i]; } GRAS_FORCE_INLINE bool empty(const size_t i) const { return not _bitset[i]; } GRAS_FORCE_INLINE bool all_ready(void) const { return _bitset.all(); } GRAS_FORCE_INLINE void __update(const size_t i) { _bitset.set(i, _enqueued_bytes[i] != 0); } BitSet _bitset; std::vector _enqueued_bytes; std::vector > _queues; std::vector _history_bytes; std::vector > _aux_queues; }; GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size) { _bitset.resize(size); _enqueued_bytes.resize(size, 0); _queues.resize(size, boost::circular_buffer(MAX_QUEUE_SIZE)); _history_bytes.resize(size, 0); _aux_queues.resize(size); for (size_t i = 0; i < this->size(); i++) { if (_aux_queues[i]) continue; _aux_queues[i] = boost::shared_ptr(new BufferQueue()); _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); } } inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t hist_bytes) { //there is history, so enqueue some initial history if (hist_bytes > _history_bytes[i]) { SBuffer buff = _aux_queues[i]->front(); _aux_queues[i]->pop(); const size_t delta = hist_bytes - _history_bytes[i]; std::memset(buff.get_actual_memory(), 0, delta); buff.offset = 0; buff.length = delta; this->push(i, buff); } if (hist_bytes < _history_bytes[i]) { size_t delta = _history_bytes[i] - hist_bytes; delta = std::min(delta, _enqueued_bytes[i]); //FIXME //TODO consume extra delta on push...? so we dont need std::min this->consume(i, delta); } _history_bytes[i] = hist_bytes; } GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size) { ASSERT(not _aux_queues[i]->empty()); SBuffer accum_buff = _aux_queues[i]->front(); _aux_queues[i]->pop(); accum_buff.offset = 0; accum_buff.length = 0; size_t free_bytes = accum_buff.get_actual_length(); free_bytes /= item_size; free_bytes *= item_size; while (not _queues[i].empty() and free_bytes != 0) { SBuffer &front = _queues[i].front(); const size_t bytes = std::min(front.length, free_bytes); std::memcpy(accum_buff.get(accum_buff.length), front.get(), bytes); accum_buff.length += bytes; free_bytes -= bytes; front.length -= bytes; front.offset += bytes; if (front.length == 0) _queues[i].pop_front(); } _queues[i].push_front(accum_buff); ASSERT(this->is_accumulated(i)); } GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t bytes_consumed) { //assert that we dont consume past the bounds of the buffer ASSERT(_queues[i].front().length >= bytes_consumed); //update bounds on the current buffer _queues[i].front().offset += bytes_consumed; _queues[i].front().length -= bytes_consumed; ASSERT(_queues[i].front().offset <= _queues[i].front().get_actual_length()); //update the number of bytes in this queue ASSERT(_enqueued_bytes[i] >= bytes_consumed); _enqueued_bytes[i] -= bytes_consumed; __update(i); } } //namespace gras #endif /*INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP*/