diff options
author | Josh Blum | 2012-09-06 23:20:15 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-06 23:20:15 -0700 |
commit | 77485f255b040846890c3c64d764e00f5de5cfe2 (patch) | |
tree | 7d2c97c07cc334dd61054249d6309ad56c90effd /lib | |
parent | f84970693f4e1c9919e139c1d99daa74691b5a46 (diff) | |
download | sandhi-77485f255b040846890c3c64d764e00f5de5cfe2.tar.gz sandhi-77485f255b040846890c3c64d764e00f5de5cfe2.tar.bz2 sandhi-77485f255b040846890c3c64d764e00f5de5cfe2.zip |
work on simplifying buffer queue logic
Diffstat (limited to 'lib')
-rw-r--r-- | lib/gras_impl/buffer_queue.hpp | 64 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 358 |
2 files changed, 212 insertions, 210 deletions
diff --git a/lib/gras_impl/buffer_queue.hpp b/lib/gras_impl/buffer_queue.hpp new file mode 100644 index 0000000..e9ac220 --- /dev/null +++ b/lib/gras_impl/buffer_queue.hpp @@ -0,0 +1,64 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP +#define INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP + +#include <tsbe/buffer.hpp> +#include <boost/bind.hpp> +#include <queue> + +namespace gnuradio +{ + +struct BufferQueue : std::queue<tsbe::Buffer> +{ + BufferQueue(void) + { + tsbe::BufferDeleter deleter = boost::bind(&BufferQueue::push, this, _1); + _token = tsbe::BufferToken(new tsbe::BufferDeleter(deleter)); + } + + ~BufferQueue(void) + { + _token.reset(); + this->flush(); + } + + void flush(void) + { + while (not this->empty()) + { + this->pop(); + } + } + + void allocate_one(const size_t num_bytes) + { + tsbe::BufferConfig config; + config.memory = NULL; + config.length = num_bytes; + config.token = _token; + tsbe::Buffer buff(config); + //buffer derefs here and the token messages it back to the queue + } + + tsbe::BufferToken _token; +}; + +} //namespace gnuradio + +#endif /*INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP*/ diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 8c2c41a..4814fdd 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -18,38 +18,30 @@ #define INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP #include <gras_impl/debug.hpp> +#include <gras_impl/buffer_queue.hpp> #include <tsbe/buffer.hpp> #include <boost/dynamic_bitset.hpp> #include <vector> +#include <queue> #include <deque> #include <cstring> //memcpy/memset namespace gnuradio { -inline void my_buff_alloc(tsbe::Buffer &buff, const size_t num_bytes) +struct BufferWOffset { - if (num_bytes == 0) - { - buff = tsbe::Buffer(); //empty - return; - } - if (not buff or buff.get_length() != num_bytes) + BufferWOffset(void): offset(0), length(0){} + BufferWOffset(const tsbe::Buffer &buffer): + offset(0), length(buffer.get_length()), buffer(buffer){} + + inline char *mem_offset(void) const { - tsbe::BufferConfig config; - config.memory = NULL; - config.length = num_bytes; - buff = tsbe::Buffer(config); - std::memset(buff.get_memory(), 0, buff.get_length()); + return ((char *)buffer.get_memory()) + offset; } -} -struct BufferWOffset -{ - BufferWOffset(void): offset(0){} - BufferWOffset(const tsbe::Buffer &buffer): - offset(0), buffer(buffer){} size_t offset; + size_t length; tsbe::Buffer buffer; }; @@ -61,37 +53,17 @@ struct BuffInfo struct InputBufferQueues { - boost::dynamic_bitset<> _bitset; - std::vector<std::deque<BufferWOffset> > _queues; - std::vector<size_t> _history_bytes; - std::vector<size_t> _post_bytes; - std::vector<tsbe::Buffer> _history_buffs; - - std::vector<size_t> _reserve_bytes; - std::vector<size_t> _enqueued_bytes; - std::vector<tsbe::Buffer> _reserve_buffs; - - template <typename V> - inline void init( - const V &input_history_items, - const V &input_item_sizes - ){ - if (this->size() == 0) return; - - const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end()); - - for (size_t i = 0; i < this->size(); i++) - { - //determine byte sizes for buffers and dealing with history - _history_bytes[i] = input_item_sizes[i]*input_history_items[i]; - _post_bytes[i] = input_item_sizes[i]*max_history_items; - - //allocate mini buffer for history edge conditions - const size_t num_bytes = _history_bytes[i] + _post_bytes[i]; - my_buff_alloc(_history_buffs[i], num_bytes); - } + ~InputBufferQueues(void) + { + this->resize(0); } + void init( + const std::vector<size_t> &input_history_items, + const std::vector<size_t> &input_multiple_items, + const std::vector<size_t> &input_item_sizes + ); + /*! * Rules for front: * @@ -103,44 +75,7 @@ struct InputBufferQueues * Otherwise, resolve pointers to the input buffer, * moving the memory and length by num history bytes. */ - inline BuffInfo front(const size_t i) - { - __prepare_front(i); - - BuffInfo info; - const BufferWOffset &bo = _queues[i].front(); - const tsbe::Buffer &buff = bo.buffer; - - if (bo.offset < _history_bytes[i]) - { - const tsbe::Buffer &hist_buff = _history_buffs[i]; - ASSERT(buff.get_length() >= _post_bytes[i]); - - if (bo.offset == 0) - { - char *src_mem = ((char *)buff.get_memory()); - char *dst_mem = ((char *)hist_buff.get_memory()) + _history_bytes[i]; - std::memcpy(dst_mem, src_mem, _post_bytes[i]); - } - - info.mem = (char *)hist_buff.get_memory() + bo.offset; - info.len = hist_buff.get_length() - bo.offset - _history_bytes[i]; - } - - else - { - info.mem = ((char *)buff.get_memory()) + bo.offset - _history_bytes[i]; - info.len = buff.get_length() - bo.offset; - } - - if (_reserve_bytes[i] != 0) - { - info.len /= _reserve_bytes[i]; - info.len *= _reserve_bytes[i]; - } - - return info; - } + BuffInfo front(const size_t i); /*! * Rules for consume: @@ -152,59 +87,9 @@ struct InputBufferQueues * * \return true if the input allows output flushing */ - inline bool consume(const size_t i, const size_t bytes_consumed) - { - BufferWOffset &bo = _queues[i].front(); - const tsbe::Buffer &buff = bo.buffer; + bool consume(const size_t i, const size_t bytes_consumed); - __consume(i, bytes_consumed); - bo.offset += bytes_consumed; - - //if totally consumed, memcpy history and pop - if (bo.offset >= buff.get_length()) - { - ASSERT(bo.offset == buff.get_length()); //bad to consume more than buffer allows - /*if (bo.offset != buff.get_length()) - { - VAR(bo.offset); - VAR(buff.get_length()); - }*/ - if (_history_bytes[i] != 0) - { - char *src_mem = ((char *)buff.get_memory()) + bo.offset - _history_bytes[i]; - std::memcpy(_history_buffs[i].get_memory(), src_mem, _history_bytes[i]); - } - __pop(i); - return true; - } - return _history_bytes[i] == 0; - } - - inline void resize(const size_t size) - { - _bitset.resize(size); - _queues.resize(size); - _post_bytes.resize(size, 0); - _history_bytes.resize(size, 0); - _history_buffs.resize(size); - - _reserve_buffs.resize(size); - _reserve_bytes.resize(size, 0); - _enqueued_bytes.resize(size, 0); - } - - inline void set_reserve(const size_t i, const size_t num_bytes) - { - //TODO, we may call this dynamically, so 1) call __update - //and 2) safely resize the buffer and preserve its data - /*if (num_bytes) - { - std::cout << "set_reserve " << i << " " << num_bytes << " bytes\n"; - }//*/ - _reserve_bytes[i] = num_bytes; - my_buff_alloc(_reserve_buffs[i], _reserve_bytes[i]); - if (_reserve_buffs[i]) _reserve_buffs[i].get_length() = 0; - } + void resize(const size_t size); inline void push(const size_t i, const tsbe::Buffer &buffer) { @@ -213,79 +98,6 @@ struct InputBufferQueues __update(i); } - inline void __prepare_front(const size_t i) - { - VAR(_reserve_bytes[i]); - VAR(_history_bytes[i]); - ASSERT(_history_bytes[i] == 0 or _reserve_bytes[i] == 0); //FIXME dont mix history and reserve for now - tsbe::Buffer &reserve_buff = _reserve_buffs[i]; - - { - BufferWOffset &bo = _queues[i].front(); - const tsbe::Buffer &buff = bo.buffer; - - //the buffer has enough space to meet reserve reqs as-is - if (buff.get_length() >= _reserve_bytes[i] + bo.offset) - { - return; - } - - //if its already the reserve buff, memmove that shit - if (buff.get() == reserve_buff.get()) - { - const size_t bytes_left = buff.get_length() - bo.offset; - std::memmove( - reserve_buff.get_memory(), - ((char *)buff.get_memory()) + bo.offset, - bytes_left - ); - reserve_buff.get_length() = bytes_left; - __pop(i); - } - } - - //now we have a state where the front buff is not reserve - while (reserve_buff.get_length() < _reserve_bytes[i]) - { - BufferWOffset &bo = _queues[i].front(); - const size_t bytes_to_copy = std::min( - bo.buffer.get_length()-bo.offset, - _reserve_bytes[i]-reserve_buff.get_length() - ); - std::memcpy( - ((char *)reserve_buff.get_memory())+reserve_buff.get_length(), - ((char *)bo.buffer.get_memory())+bo.offset, - bytes_to_copy - ); - reserve_buff.get_length() += bytes_to_copy; - bo.offset += bytes_to_copy; - if (bo.offset >= bo.buffer.get_length()) __pop(i); - } - _queues[i].push_front(reserve_buff); - } - - inline void __consume(const size_t i, const size_t num_bytes) - { - ASSERT(_enqueued_bytes[i] >= num_bytes); - _enqueued_bytes[i] -= num_bytes; - __update(i); - } - - inline void __pop(const size_t i) - { - //TODO FIXME quick hack - if (_queues[i].front().buffer.get() == _reserve_buffs[i].get()) - { - _reserve_buffs[i].get_length() = 0; - } - _queues[i].pop_front(); - } - - inline void __update(const size_t i) - { - _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i] and _enqueued_bytes[i] > 0); - } - inline void flush(const size_t i) { _queues[i] = std::deque<BufferWOffset>(); @@ -318,8 +130,134 @@ struct InputBufferQueues { return (~_bitset).none(); } + + void __prepare(const size_t i); + + inline void __update(const size_t i) + { + _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i]); + } + + boost::dynamic_bitset<> _bitset; + std::vector<size_t> _enqueued_bytes; + std::vector<std::deque<BufferWOffset> > _queues; + std::vector<size_t> _history_bytes; + std::vector<size_t> _reserve_bytes; + std::vector<BufferQueue> _aux_queues; }; + +void InputBufferQueues::resize(const size_t size) +{ + _bitset.resize(size); + _enqueued_bytes.resize(size, 0); + _queues.resize(size); + _history_bytes.resize(size, 0); + _reserve_bytes.resize(size, 0); + _aux_queues.resize(size); +} + + +void InputBufferQueues::init( + const std::vector<size_t> &input_history_items, + const std::vector<size_t> &input_multiple_items, + const std::vector<size_t> &input_item_sizes +){ + if (this->size() == 0) return; + + const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end()); + + for (size_t i = 0; i < this->size(); i++) + { + //determine byte sizes for buffers and dealing with history + _history_bytes[i] = input_item_sizes[i]*input_history_items[i]; + const size_t post_bytes = input_item_sizes[i]*max_history_items; + const size_t byte_multiple = input_item_sizes[i]*input_multiple_items[i]; + _reserve_bytes[i] = byte_multiple; + while (_reserve_bytes[i] < post_bytes) + { + _reserve_bytes[i] += byte_multiple; + } + + //allocate mini buffers for history edge conditions + const size_t num_bytes = _history_bytes[i] + _reserve_bytes[i]; + _aux_queues[i].allocate_one(num_bytes); + _aux_queues[i].allocate_one(num_bytes); + + //there is history, so enqueue some initial history + if (_history_bytes[i] != 0) + { + tsbe::Buffer buff = _aux_queues[i].front(); + _aux_queues[i].pop(); + + std::memset(buff.get_memory(), 0, _history_bytes[i]); + _queues[i].push_front(buff); + _queues[i].front().length = _history_bytes[i]; + } + } +} + +inline BuffInfo InputBufferQueues::front(const size_t i) +{ + ASSERT(this->ready(i)); + __prepare(i); + + BufferWOffset &front = _queues[i].front(); + BuffInfo info; + info.mem = front.mem_offset() - _history_bytes[i]; + info.len = front.length; + info.len /= _reserve_bytes[i]; + info.len *= _reserve_bytes[i]; + return info; +} + +inline void InputBufferQueues::__prepare(const size_t i) +{ + //this conditional statement is the requirement we must meet + while ( + _queues[i].front().length < _reserve_bytes[i] or + _queues[i].front().offset < _history_bytes[i] + ){ + + } +} + +inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consumed) +{ + //update bounds on the current buffer + _queues[i].front().offset += bytes_consumed; + _queues[i].front().length -= bytes_consumed; + + //assert that we dont consum past the bounds of the buffer + ASSERT(_queues[i].front().buffer.get_length() >= _queues[i].front().offset); + + //this input was completed, pop it free + if (_queues[i].front().length == 0) + { + const BufferWOffset old_buff = _queues[i].front(); + _queues[i].pop_front(); + + //push history into the front of the queue + if (_history_bytes[i] != 0) + { + tsbe::Buffer buff = _aux_queues[i].front(); + _aux_queues[i].pop(); + + const size_t hist_bytes = _history_bytes[i]; + std::memcpy(buff.get_memory(), old_buff.mem_offset() - hist_bytes, hist_bytes); + _queues[i].push_front(buff); + _queues[i].front().length = hist_bytes; + } + } + + //update the number of bytes in this queue + ASSERT(_enqueued_bytes[i] >= bytes_consumed); + _enqueued_bytes[i] -= bytes_consumed; + __update(i); + + return true; //TODO not true on minibuff +} + } //namespace gnuradio #endif /*INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP*/ |