diff options
-rw-r--r-- | lib/block_handlers.cpp | 21 | ||||
-rw-r--r-- | lib/gras_impl/buffer_queue.hpp | 5 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 9 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 83 |
4 files changed, 80 insertions, 38 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 8980791..b62df4e 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -139,25 +139,20 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) this->input_tags.resize(num_inputs); this->output_tags.resize(num_outputs); - //init the history comprehension on input queues - this->input_queues.init(this->input_history_items, this->input_items_sizes); - //impose input reserve requirements based on relative rate and output multiple + std::vector<size_t> input_multiple_items(num_inputs, 1); for (size_t i = 0; i < num_inputs; i++) { - if (num_outputs == 0 or not this->enable_fixed_rate) - { - this->input_queues.set_reserve(i, 0); - continue; - } + if (num_outputs == 0 or not this->enable_fixed_rate) continue; //TODO, this is a little cheap, we only look at output multiple [0] - size_t multiple = this->output_multiple_items.front(); - //if (multiple == 1) multiple = 0; //1 is meaningless, so we use 0 to disable the reserve - const size_t reserve_items = myulround(multiple/this->relative_rate); - const size_t reserve_bytes = reserve_items * this->input_items_sizes[i]; - this->input_queues.set_reserve(i, reserve_bytes); + const size_t multiple = this->output_multiple_items.front(); + input_multiple_items[i] = myulround(multiple/this->relative_rate); + if (input_multiple_items[i] == 0) input_multiple_items[i] = 1; } + //init the history comprehension on input queues + this->input_queues.init(this->input_history_items, input_multiple_items, this->input_items_sizes); + //TODO: think more about this: if (num_inputs == 0 and num_outputs == 0) { diff --git a/lib/gras_impl/buffer_queue.hpp b/lib/gras_impl/buffer_queue.hpp index e9ac220..5fd1661 100644 --- a/lib/gras_impl/buffer_queue.hpp +++ b/lib/gras_impl/buffer_queue.hpp @@ -35,11 +35,6 @@ struct BufferQueue : std::queue<tsbe::Buffer> ~BufferQueue(void) { _token.reset(); - this->flush(); - } - - void flush(void) - { while (not this->empty()) { this->pop(); diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index bdf1388..9f03efa 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -18,13 +18,18 @@ #define INCLUDED_LIBGRAS_IMPL_DEBUG_HPP #include <iostream> +#include <stdexcept> #define GENESIS 0 #define ARMAGEDDON 0 -#define MESSAGE 1 +#define MESSAGE 0 #define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush; #define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; -#define ASSERT(x) if(not (x)){std::cerr << "ASSERT FAIL " << __FILE__ << ":" << __LINE__ << "\n\t" << #x << std::endl << std::flush;} +#define ASSERT(x) {if(not (x)) \ +{ \ + std::cerr << "ASSERT FAIL " << __FILE__ << ":" << __LINE__ << "\n\t" << #x << std::endl << std::flush; \ + throw std::runtime_error(std::string("ASSERT FAIL ") + #x); \ +}} #endif /*INCLUDED_LIBGRAS_IMPL_DEBUG_HPP*/ diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 4814fdd..e724ccc 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -40,6 +40,11 @@ struct BufferWOffset return ((char *)buffer.get_memory()) + offset; } + inline size_t tail_free(void) const + { + return buffer.get_length() - offset - length; + } + size_t offset; size_t length; tsbe::Buffer buffer; @@ -143,11 +148,11 @@ struct InputBufferQueues std::vector<std::deque<BufferWOffset> > _queues; std::vector<size_t> _history_bytes; std::vector<size_t> _reserve_bytes; - std::vector<BufferQueue> _aux_queues; + std::vector<boost::shared_ptr<BufferQueue> > _aux_queues; }; -void InputBufferQueues::resize(const size_t size) +inline void InputBufferQueues::resize(const size_t size) { _bitset.resize(size); _enqueued_bytes.resize(size, 0); @@ -158,7 +163,7 @@ void InputBufferQueues::resize(const size_t size) } -void InputBufferQueues::init( +inline void InputBufferQueues::init( const std::vector<size_t> &input_history_items, const std::vector<size_t> &input_multiple_items, const std::vector<size_t> &input_item_sizes @@ -169,6 +174,10 @@ void InputBufferQueues::init( for (size_t i = 0; i < this->size(); i++) { + ASSERT(input_multiple_items[i] > 0); + + _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue()); + //determine byte sizes for buffers and dealing with history _history_bytes[i] = input_item_sizes[i]*input_history_items[i]; const size_t post_bytes = input_item_sizes[i]*max_history_items; @@ -181,22 +190,25 @@ void InputBufferQueues::init( //allocate mini buffers for history edge conditions const size_t num_bytes = _history_bytes[i] + _reserve_bytes[i]; - _aux_queues[i].allocate_one(num_bytes); - _aux_queues[i].allocate_one(num_bytes); + _aux_queues[i]->allocate_one(num_bytes); + _aux_queues[i]->allocate_one(num_bytes); //there is history, so enqueue some initial history if (_history_bytes[i] != 0) { - tsbe::Buffer buff = _aux_queues[i].front(); - _aux_queues[i].pop(); + tsbe::Buffer buff = _aux_queues[i]->front(); + _aux_queues[i]->pop(); - std::memset(buff.get_memory(), 0, _history_bytes[i]); + const size_t hist_bytes = _history_bytes[i]; + std::memset(buff.get_memory(), 0, hist_bytes); _queues[i].push_front(buff); - _queues[i].front().length = _history_bytes[i]; + _queues[i].front().offset = hist_bytes; + _queues[i].front().length = 0; } } } + inline BuffInfo InputBufferQueues::front(const size_t i) { ASSERT(this->ready(i)); @@ -213,15 +225,49 @@ inline BuffInfo InputBufferQueues::front(const size_t i) inline void InputBufferQueues::__prepare(const size_t i) { - //this conditional statement is the requirement we must meet - while ( - _queues[i].front().length < _reserve_bytes[i] or - _queues[i].front().offset < _history_bytes[i] - ){ - + //assumes that we are always pushing proper history buffs on front + ASSERT(_queues[i].front().offset >= _history_bytes[i]); + + while (_queues[i].front().length < _reserve_bytes[i]) + { + BufferWOffset &front = _queues[i].front(); + BufferWOffset dst; + + //do we need a new buffer: + //- is the buffer unique (queue has only reference)? + //- can its remaining space meet reserve requirements? + const bool enough_space = front.buffer.get_length() >= _reserve_bytes[i] + front.offset; + if (enough_space and front.buffer.unique()) + { + dst = _queues[i].front(); + _queues[i].pop_front(); + } + else + { + dst = BufferWOffset(_aux_queues[i]->front()); + dst.length = 0; + _aux_queues[i]->pop(); + } + + BufferWOffset src = _queues[i].front(); + _queues[i].pop_front(); + const size_t bytes = std::min(dst.tail_free(), src.length); + std::memcpy(dst.mem_offset()+dst.length, src.mem_offset(), bytes); + + //update buffer additions, consumptions + dst.length += bytes; + src.offset += bytes; + src.length -= bytes; + + //keep the source buffer if not fully consumed + if (src.length) _queues[i].push_front(src); + + //destination buffer is the new front of the queue + _queues[i].push_front(dst); } } + inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consumed) { //update bounds on the current buffer @@ -240,13 +286,14 @@ inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consum //push history into the front of the queue if (_history_bytes[i] != 0) { - tsbe::Buffer buff = _aux_queues[i].front(); - _aux_queues[i].pop(); + tsbe::Buffer buff = _aux_queues[i]->front(); + _aux_queues[i]->pop(); const size_t hist_bytes = _history_bytes[i]; std::memcpy(buff.get_memory(), old_buff.mem_offset() - hist_bytes, hist_bytes); _queues[i].push_front(buff); - _queues[i].front().length = hist_bytes; + _queues[i].front().offset = hist_bytes; + _queues[i].front().length = 0; } } |