summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-09-06 23:20:15 -0700
committerJosh Blum2012-09-06 23:20:15 -0700
commit77485f255b040846890c3c64d764e00f5de5cfe2 (patch)
tree7d2c97c07cc334dd61054249d6309ad56c90effd /lib
parentf84970693f4e1c9919e139c1d99daa74691b5a46 (diff)
downloadsandhi-77485f255b040846890c3c64d764e00f5de5cfe2.tar.gz
sandhi-77485f255b040846890c3c64d764e00f5de5cfe2.tar.bz2
sandhi-77485f255b040846890c3c64d764e00f5de5cfe2.zip
work on simplifying buffer queue logic
Diffstat (limited to 'lib')
-rw-r--r--lib/gras_impl/buffer_queue.hpp64
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp358
2 files changed, 212 insertions, 210 deletions
diff --git a/lib/gras_impl/buffer_queue.hpp b/lib/gras_impl/buffer_queue.hpp
new file mode 100644
index 0000000..e9ac220
--- /dev/null
+++ b/lib/gras_impl/buffer_queue.hpp
@@ -0,0 +1,64 @@
+//
+// Copyright 2012 Josh Blum
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
+
+#ifndef INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP
+#define INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP
+
+#include <tsbe/buffer.hpp>
+#include <boost/bind.hpp>
+#include <queue>
+
+namespace gnuradio
+{
+
+struct BufferQueue : std::queue<tsbe::Buffer>
+{
+ BufferQueue(void)
+ {
+ tsbe::BufferDeleter deleter = boost::bind(&BufferQueue::push, this, _1);
+ _token = tsbe::BufferToken(new tsbe::BufferDeleter(deleter));
+ }
+
+ ~BufferQueue(void)
+ {
+ _token.reset();
+ this->flush();
+ }
+
+ void flush(void)
+ {
+ while (not this->empty())
+ {
+ this->pop();
+ }
+ }
+
+ void allocate_one(const size_t num_bytes)
+ {
+ tsbe::BufferConfig config;
+ config.memory = NULL;
+ config.length = num_bytes;
+ config.token = _token;
+ tsbe::Buffer buff(config);
+ //buffer derefs here and the token messages it back to the queue
+ }
+
+ tsbe::BufferToken _token;
+};
+
+} //namespace gnuradio
+
+#endif /*INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP*/
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 8c2c41a..4814fdd 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -18,38 +18,30 @@
#define INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP
#include <gras_impl/debug.hpp>
+#include <gras_impl/buffer_queue.hpp>
#include <tsbe/buffer.hpp>
#include <boost/dynamic_bitset.hpp>
#include <vector>
+#include <queue>
#include <deque>
#include <cstring> //memcpy/memset
namespace gnuradio
{
-inline void my_buff_alloc(tsbe::Buffer &buff, const size_t num_bytes)
+struct BufferWOffset
{
- if (num_bytes == 0)
- {
- buff = tsbe::Buffer(); //empty
- return;
- }
- if (not buff or buff.get_length() != num_bytes)
+ BufferWOffset(void): offset(0), length(0){}
+ BufferWOffset(const tsbe::Buffer &buffer):
+ offset(0), length(buffer.get_length()), buffer(buffer){}
+
+ inline char *mem_offset(void) const
{
- tsbe::BufferConfig config;
- config.memory = NULL;
- config.length = num_bytes;
- buff = tsbe::Buffer(config);
- std::memset(buff.get_memory(), 0, buff.get_length());
+ return ((char *)buffer.get_memory()) + offset;
}
-}
-struct BufferWOffset
-{
- BufferWOffset(void): offset(0){}
- BufferWOffset(const tsbe::Buffer &buffer):
- offset(0), buffer(buffer){}
size_t offset;
+ size_t length;
tsbe::Buffer buffer;
};
@@ -61,37 +53,17 @@ struct BuffInfo
struct InputBufferQueues
{
- boost::dynamic_bitset<> _bitset;
- std::vector<std::deque<BufferWOffset> > _queues;
- std::vector<size_t> _history_bytes;
- std::vector<size_t> _post_bytes;
- std::vector<tsbe::Buffer> _history_buffs;
-
- std::vector<size_t> _reserve_bytes;
- std::vector<size_t> _enqueued_bytes;
- std::vector<tsbe::Buffer> _reserve_buffs;
-
- template <typename V>
- inline void init(
- const V &input_history_items,
- const V &input_item_sizes
- ){
- if (this->size() == 0) return;
-
- const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end());
-
- for (size_t i = 0; i < this->size(); i++)
- {
- //determine byte sizes for buffers and dealing with history
- _history_bytes[i] = input_item_sizes[i]*input_history_items[i];
- _post_bytes[i] = input_item_sizes[i]*max_history_items;
-
- //allocate mini buffer for history edge conditions
- const size_t num_bytes = _history_bytes[i] + _post_bytes[i];
- my_buff_alloc(_history_buffs[i], num_bytes);
- }
+ ~InputBufferQueues(void)
+ {
+ this->resize(0);
}
+ void init(
+ const std::vector<size_t> &input_history_items,
+ const std::vector<size_t> &input_multiple_items,
+ const std::vector<size_t> &input_item_sizes
+ );
+
/*!
* Rules for front:
*
@@ -103,44 +75,7 @@ struct InputBufferQueues
* Otherwise, resolve pointers to the input buffer,
* moving the memory and length by num history bytes.
*/
- inline BuffInfo front(const size_t i)
- {
- __prepare_front(i);
-
- BuffInfo info;
- const BufferWOffset &bo = _queues[i].front();
- const tsbe::Buffer &buff = bo.buffer;
-
- if (bo.offset < _history_bytes[i])
- {
- const tsbe::Buffer &hist_buff = _history_buffs[i];
- ASSERT(buff.get_length() >= _post_bytes[i]);
-
- if (bo.offset == 0)
- {
- char *src_mem = ((char *)buff.get_memory());
- char *dst_mem = ((char *)hist_buff.get_memory()) + _history_bytes[i];
- std::memcpy(dst_mem, src_mem, _post_bytes[i]);
- }
-
- info.mem = (char *)hist_buff.get_memory() + bo.offset;
- info.len = hist_buff.get_length() - bo.offset - _history_bytes[i];
- }
-
- else
- {
- info.mem = ((char *)buff.get_memory()) + bo.offset - _history_bytes[i];
- info.len = buff.get_length() - bo.offset;
- }
-
- if (_reserve_bytes[i] != 0)
- {
- info.len /= _reserve_bytes[i];
- info.len *= _reserve_bytes[i];
- }
-
- return info;
- }
+ BuffInfo front(const size_t i);
/*!
* Rules for consume:
@@ -152,59 +87,9 @@ struct InputBufferQueues
*
* \return true if the input allows output flushing
*/
- inline bool consume(const size_t i, const size_t bytes_consumed)
- {
- BufferWOffset &bo = _queues[i].front();
- const tsbe::Buffer &buff = bo.buffer;
+ bool consume(const size_t i, const size_t bytes_consumed);
- __consume(i, bytes_consumed);
- bo.offset += bytes_consumed;
-
- //if totally consumed, memcpy history and pop
- if (bo.offset >= buff.get_length())
- {
- ASSERT(bo.offset == buff.get_length()); //bad to consume more than buffer allows
- /*if (bo.offset != buff.get_length())
- {
- VAR(bo.offset);
- VAR(buff.get_length());
- }*/
- if (_history_bytes[i] != 0)
- {
- char *src_mem = ((char *)buff.get_memory()) + bo.offset - _history_bytes[i];
- std::memcpy(_history_buffs[i].get_memory(), src_mem, _history_bytes[i]);
- }
- __pop(i);
- return true;
- }
- return _history_bytes[i] == 0;
- }
-
- inline void resize(const size_t size)
- {
- _bitset.resize(size);
- _queues.resize(size);
- _post_bytes.resize(size, 0);
- _history_bytes.resize(size, 0);
- _history_buffs.resize(size);
-
- _reserve_buffs.resize(size);
- _reserve_bytes.resize(size, 0);
- _enqueued_bytes.resize(size, 0);
- }
-
- inline void set_reserve(const size_t i, const size_t num_bytes)
- {
- //TODO, we may call this dynamically, so 1) call __update
- //and 2) safely resize the buffer and preserve its data
- /*if (num_bytes)
- {
- std::cout << "set_reserve " << i << " " << num_bytes << " bytes\n";
- }//*/
- _reserve_bytes[i] = num_bytes;
- my_buff_alloc(_reserve_buffs[i], _reserve_bytes[i]);
- if (_reserve_buffs[i]) _reserve_buffs[i].get_length() = 0;
- }
+ void resize(const size_t size);
inline void push(const size_t i, const tsbe::Buffer &buffer)
{
@@ -213,79 +98,6 @@ struct InputBufferQueues
__update(i);
}
- inline void __prepare_front(const size_t i)
- {
- VAR(_reserve_bytes[i]);
- VAR(_history_bytes[i]);
- ASSERT(_history_bytes[i] == 0 or _reserve_bytes[i] == 0); //FIXME dont mix history and reserve for now
- tsbe::Buffer &reserve_buff = _reserve_buffs[i];
-
- {
- BufferWOffset &bo = _queues[i].front();
- const tsbe::Buffer &buff = bo.buffer;
-
- //the buffer has enough space to meet reserve reqs as-is
- if (buff.get_length() >= _reserve_bytes[i] + bo.offset)
- {
- return;
- }
-
- //if its already the reserve buff, memmove that shit
- if (buff.get() == reserve_buff.get())
- {
- const size_t bytes_left = buff.get_length() - bo.offset;
- std::memmove(
- reserve_buff.get_memory(),
- ((char *)buff.get_memory()) + bo.offset,
- bytes_left
- );
- reserve_buff.get_length() = bytes_left;
- __pop(i);
- }
- }
-
- //now we have a state where the front buff is not reserve
- while (reserve_buff.get_length() < _reserve_bytes[i])
- {
- BufferWOffset &bo = _queues[i].front();
- const size_t bytes_to_copy = std::min(
- bo.buffer.get_length()-bo.offset,
- _reserve_bytes[i]-reserve_buff.get_length()
- );
- std::memcpy(
- ((char *)reserve_buff.get_memory())+reserve_buff.get_length(),
- ((char *)bo.buffer.get_memory())+bo.offset,
- bytes_to_copy
- );
- reserve_buff.get_length() += bytes_to_copy;
- bo.offset += bytes_to_copy;
- if (bo.offset >= bo.buffer.get_length()) __pop(i);
- }
- _queues[i].push_front(reserve_buff);
- }
-
- inline void __consume(const size_t i, const size_t num_bytes)
- {
- ASSERT(_enqueued_bytes[i] >= num_bytes);
- _enqueued_bytes[i] -= num_bytes;
- __update(i);
- }
-
- inline void __pop(const size_t i)
- {
- //TODO FIXME quick hack
- if (_queues[i].front().buffer.get() == _reserve_buffs[i].get())
- {
- _reserve_buffs[i].get_length() = 0;
- }
- _queues[i].pop_front();
- }
-
- inline void __update(const size_t i)
- {
- _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i] and _enqueued_bytes[i] > 0);
- }
-
inline void flush(const size_t i)
{
_queues[i] = std::deque<BufferWOffset>();
@@ -318,8 +130,134 @@ struct InputBufferQueues
{
return (~_bitset).none();
}
+
+ void __prepare(const size_t i);
+
+ inline void __update(const size_t i)
+ {
+ _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i]);
+ }
+
+ boost::dynamic_bitset<> _bitset;
+ std::vector<size_t> _enqueued_bytes;
+ std::vector<std::deque<BufferWOffset> > _queues;
+ std::vector<size_t> _history_bytes;
+ std::vector<size_t> _reserve_bytes;
+ std::vector<BufferQueue> _aux_queues;
};
+
+void InputBufferQueues::resize(const size_t size)
+{
+ _bitset.resize(size);
+ _enqueued_bytes.resize(size, 0);
+ _queues.resize(size);
+ _history_bytes.resize(size, 0);
+ _reserve_bytes.resize(size, 0);
+ _aux_queues.resize(size);
+}
+
+
+void InputBufferQueues::init(
+ const std::vector<size_t> &input_history_items,
+ const std::vector<size_t> &input_multiple_items,
+ const std::vector<size_t> &input_item_sizes
+){
+ if (this->size() == 0) return;
+
+ const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end());
+
+ for (size_t i = 0; i < this->size(); i++)
+ {
+ //determine byte sizes for buffers and dealing with history
+ _history_bytes[i] = input_item_sizes[i]*input_history_items[i];
+ const size_t post_bytes = input_item_sizes[i]*max_history_items;
+ const size_t byte_multiple = input_item_sizes[i]*input_multiple_items[i];
+ _reserve_bytes[i] = byte_multiple;
+ while (_reserve_bytes[i] < post_bytes)
+ {
+ _reserve_bytes[i] += byte_multiple;
+ }
+
+ //allocate mini buffers for history edge conditions
+ const size_t num_bytes = _history_bytes[i] + _reserve_bytes[i];
+ _aux_queues[i].allocate_one(num_bytes);
+ _aux_queues[i].allocate_one(num_bytes);
+
+ //there is history, so enqueue some initial history
+ if (_history_bytes[i] != 0)
+ {
+ tsbe::Buffer buff = _aux_queues[i].front();
+ _aux_queues[i].pop();
+
+ std::memset(buff.get_memory(), 0, _history_bytes[i]);
+ _queues[i].push_front(buff);
+ _queues[i].front().length = _history_bytes[i];
+ }
+ }
+}
+
+inline BuffInfo InputBufferQueues::front(const size_t i)
+{
+ ASSERT(this->ready(i));
+ __prepare(i);
+
+ BufferWOffset &front = _queues[i].front();
+ BuffInfo info;
+ info.mem = front.mem_offset() - _history_bytes[i];
+ info.len = front.length;
+ info.len /= _reserve_bytes[i];
+ info.len *= _reserve_bytes[i];
+ return info;
+}
+
+inline void InputBufferQueues::__prepare(const size_t i)
+{
+ //this conditional statement is the requirement we must meet
+ while (
+ _queues[i].front().length < _reserve_bytes[i] or
+ _queues[i].front().offset < _history_bytes[i]
+ ){
+
+ }
+}
+
+inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consumed)
+{
+ //update bounds on the current buffer
+ _queues[i].front().offset += bytes_consumed;
+ _queues[i].front().length -= bytes_consumed;
+
+ //assert that we dont consum past the bounds of the buffer
+ ASSERT(_queues[i].front().buffer.get_length() >= _queues[i].front().offset);
+
+ //this input was completed, pop it free
+ if (_queues[i].front().length == 0)
+ {
+ const BufferWOffset old_buff = _queues[i].front();
+ _queues[i].pop_front();
+
+ //push history into the front of the queue
+ if (_history_bytes[i] != 0)
+ {
+ tsbe::Buffer buff = _aux_queues[i].front();
+ _aux_queues[i].pop();
+
+ const size_t hist_bytes = _history_bytes[i];
+ std::memcpy(buff.get_memory(), old_buff.mem_offset() - hist_bytes, hist_bytes);
+ _queues[i].push_front(buff);
+ _queues[i].front().length = hist_bytes;
+ }
+ }
+
+ //update the number of bytes in this queue
+ ASSERT(_enqueued_bytes[i] >= bytes_consumed);
+ _enqueued_bytes[i] -= bytes_consumed;
+ __update(i);
+
+ return true; //TODO not true on minibuff
+}
+
} //namespace gnuradio
#endif /*INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP*/