summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/block_handlers.cpp21
-rw-r--r--lib/gras_impl/buffer_queue.hpp5
-rw-r--r--lib/gras_impl/debug.hpp9
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp83
4 files changed, 80 insertions, 38 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 8980791..b62df4e 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -139,25 +139,20 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
this->input_tags.resize(num_inputs);
this->output_tags.resize(num_outputs);
- //init the history comprehension on input queues
- this->input_queues.init(this->input_history_items, this->input_items_sizes);
-
//impose input reserve requirements based on relative rate and output multiple
+ std::vector<size_t> input_multiple_items(num_inputs, 1);
for (size_t i = 0; i < num_inputs; i++)
{
- if (num_outputs == 0 or not this->enable_fixed_rate)
- {
- this->input_queues.set_reserve(i, 0);
- continue;
- }
+ if (num_outputs == 0 or not this->enable_fixed_rate) continue;
//TODO, this is a little cheap, we only look at output multiple [0]
- size_t multiple = this->output_multiple_items.front();
- //if (multiple == 1) multiple = 0; //1 is meaningless, so we use 0 to disable the reserve
- const size_t reserve_items = myulround(multiple/this->relative_rate);
- const size_t reserve_bytes = reserve_items * this->input_items_sizes[i];
- this->input_queues.set_reserve(i, reserve_bytes);
+ const size_t multiple = this->output_multiple_items.front();
+ input_multiple_items[i] = myulround(multiple/this->relative_rate);
+ if (input_multiple_items[i] == 0) input_multiple_items[i] = 1;
}
+ //init the history comprehension on input queues
+ this->input_queues.init(this->input_history_items, input_multiple_items, this->input_items_sizes);
+
//TODO: think more about this:
if (num_inputs == 0 and num_outputs == 0)
{
diff --git a/lib/gras_impl/buffer_queue.hpp b/lib/gras_impl/buffer_queue.hpp
index e9ac220..5fd1661 100644
--- a/lib/gras_impl/buffer_queue.hpp
+++ b/lib/gras_impl/buffer_queue.hpp
@@ -35,11 +35,6 @@ struct BufferQueue : std::queue<tsbe::Buffer>
~BufferQueue(void)
{
_token.reset();
- this->flush();
- }
-
- void flush(void)
- {
while (not this->empty())
{
this->pop();
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index bdf1388..9f03efa 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -18,13 +18,18 @@
#define INCLUDED_LIBGRAS_IMPL_DEBUG_HPP
#include <iostream>
+#include <stdexcept>
#define GENESIS 0
#define ARMAGEDDON 0
-#define MESSAGE 1
+#define MESSAGE 0
#define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush;
#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush;
-#define ASSERT(x) if(not (x)){std::cerr << "ASSERT FAIL " << __FILE__ << ":" << __LINE__ << "\n\t" << #x << std::endl << std::flush;}
+#define ASSERT(x) {if(not (x)) \
+{ \
+ std::cerr << "ASSERT FAIL " << __FILE__ << ":" << __LINE__ << "\n\t" << #x << std::endl << std::flush; \
+ throw std::runtime_error(std::string("ASSERT FAIL ") + #x); \
+}}
#endif /*INCLUDED_LIBGRAS_IMPL_DEBUG_HPP*/
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 4814fdd..e724ccc 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -40,6 +40,11 @@ struct BufferWOffset
return ((char *)buffer.get_memory()) + offset;
}
+ inline size_t tail_free(void) const
+ {
+ return buffer.get_length() - offset - length;
+ }
+
size_t offset;
size_t length;
tsbe::Buffer buffer;
@@ -143,11 +148,11 @@ struct InputBufferQueues
std::vector<std::deque<BufferWOffset> > _queues;
std::vector<size_t> _history_bytes;
std::vector<size_t> _reserve_bytes;
- std::vector<BufferQueue> _aux_queues;
+ std::vector<boost::shared_ptr<BufferQueue> > _aux_queues;
};
-void InputBufferQueues::resize(const size_t size)
+inline void InputBufferQueues::resize(const size_t size)
{
_bitset.resize(size);
_enqueued_bytes.resize(size, 0);
@@ -158,7 +163,7 @@ void InputBufferQueues::resize(const size_t size)
}
-void InputBufferQueues::init(
+inline void InputBufferQueues::init(
const std::vector<size_t> &input_history_items,
const std::vector<size_t> &input_multiple_items,
const std::vector<size_t> &input_item_sizes
@@ -169,6 +174,10 @@ void InputBufferQueues::init(
for (size_t i = 0; i < this->size(); i++)
{
+ ASSERT(input_multiple_items[i] > 0);
+
+ _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue());
+
//determine byte sizes for buffers and dealing with history
_history_bytes[i] = input_item_sizes[i]*input_history_items[i];
const size_t post_bytes = input_item_sizes[i]*max_history_items;
@@ -181,22 +190,25 @@ void InputBufferQueues::init(
//allocate mini buffers for history edge conditions
const size_t num_bytes = _history_bytes[i] + _reserve_bytes[i];
- _aux_queues[i].allocate_one(num_bytes);
- _aux_queues[i].allocate_one(num_bytes);
+ _aux_queues[i]->allocate_one(num_bytes);
+ _aux_queues[i]->allocate_one(num_bytes);
//there is history, so enqueue some initial history
if (_history_bytes[i] != 0)
{
- tsbe::Buffer buff = _aux_queues[i].front();
- _aux_queues[i].pop();
+ tsbe::Buffer buff = _aux_queues[i]->front();
+ _aux_queues[i]->pop();
- std::memset(buff.get_memory(), 0, _history_bytes[i]);
+ const size_t hist_bytes = _history_bytes[i];
+ std::memset(buff.get_memory(), 0, hist_bytes);
_queues[i].push_front(buff);
- _queues[i].front().length = _history_bytes[i];
+ _queues[i].front().offset = hist_bytes;
+ _queues[i].front().length = 0;
}
}
}
+
inline BuffInfo InputBufferQueues::front(const size_t i)
{
ASSERT(this->ready(i));
@@ -213,15 +225,49 @@ inline BuffInfo InputBufferQueues::front(const size_t i)
inline void InputBufferQueues::__prepare(const size_t i)
{
- //this conditional statement is the requirement we must meet
- while (
- _queues[i].front().length < _reserve_bytes[i] or
- _queues[i].front().offset < _history_bytes[i]
- ){
-
+ //assumes that we are always pushing proper history buffs on front
+ ASSERT(_queues[i].front().offset >= _history_bytes[i]);
+
+ while (_queues[i].front().length < _reserve_bytes[i])
+ {
+ BufferWOffset &front = _queues[i].front();
+ BufferWOffset dst;
+
+ //do we need a new buffer:
+ //- is the buffer unique (queue has only reference)?
+ //- can its remaining space meet reserve requirements?
+ const bool enough_space = front.buffer.get_length() >= _reserve_bytes[i] + front.offset;
+ if (enough_space and front.buffer.unique())
+ {
+ dst = _queues[i].front();
+ _queues[i].pop_front();
+ }
+ else
+ {
+ dst = BufferWOffset(_aux_queues[i]->front());
+ dst.length = 0;
+ _aux_queues[i]->pop();
+ }
+
+ BufferWOffset src = _queues[i].front();
+ _queues[i].pop_front();
+ const size_t bytes = std::min(dst.tail_free(), src.length);
+ std::memcpy(dst.mem_offset()+dst.length, src.mem_offset(), bytes);
+
+ //update buffer additions, consumptions
+ dst.length += bytes;
+ src.offset += bytes;
+ src.length -= bytes;
+
+ //keep the source buffer if not fully consumed
+ if (src.length) _queues[i].push_front(src);
+
+ //destination buffer is the new front of the queue
+ _queues[i].push_front(dst);
}
}
+
inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consumed)
{
//update bounds on the current buffer
@@ -240,13 +286,14 @@ inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consum
//push history into the front of the queue
if (_history_bytes[i] != 0)
{
- tsbe::Buffer buff = _aux_queues[i].front();
- _aux_queues[i].pop();
+ tsbe::Buffer buff = _aux_queues[i]->front();
+ _aux_queues[i]->pop();
const size_t hist_bytes = _history_bytes[i];
std::memcpy(buff.get_memory(), old_buff.mem_offset() - hist_bytes, hist_bytes);
_queues[i].push_front(buff);
- _queues[i].front().length = hist_bytes;
+ _queues[i].front().offset = hist_bytes;
+ _queues[i].front().length = 0;
}
}