//
// Copyright 2012 Josh Blum
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see .
#ifndef INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP
#define INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP
#include
#include
#include
#include
#include
#include
#include
#include //memcpy/memset
#include
namespace gnuradio
{
struct InputBufferQueues
{
enum {MAX_QUEUE_SIZE = 128};
~InputBufferQueues(void)
{
this->resize(0);
}
void init(
const std::vector &input_history_items,
const std::vector &input_reserve_items,
const std::vector &input_item_sizes
);
//! Call to get an input buffer for work
SBuffer front(const size_t i, const bool conserve_history, bool &potential_GRAS_FORCE_INLINE);
//! Call when input bytes consumed by work
void consume(const size_t i, const size_t bytes_consumed);
void resize(const size_t size);
GRAS_FORCE_INLINE const BitSet &ready_bitset(void) const
{
return _bitset;
}
GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buffer)
{
ASSERT(not _queues[i].full());
_queues[i].push_back(buffer);
_enqueued_bytes[i] += _queues[i].back().length;
__update(i);
}
GRAS_FORCE_INLINE void flush(const size_t i)
{
_queues[i].clear();
_bitset.reset(i);
}
size_t size(void) const
{
return _queues.size();
}
GRAS_FORCE_INLINE void flush_all(void)
{
const size_t old_size = this->size();
this->resize(0);
this->resize(old_size);
}
GRAS_FORCE_INLINE bool ready(const size_t i) const
{
return _bitset[i];
}
GRAS_FORCE_INLINE bool empty(const size_t i) const
{
return not _bitset[i];
}
GRAS_FORCE_INLINE bool all_ready(void) const
{
return _bitset.all();
}
void __prepare(const size_t i);
GRAS_FORCE_INLINE void __update(const size_t i)
{
_bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i]);
}
BitSet _bitset;
std::vector _enqueued_bytes;
std::vector > _queues;
std::vector _history_bytes;
std::vector _reserve_bytes;
std::vector _post_bytes;
std::vector > _aux_queues;
std::vector _in_aux_buff;
};
GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size)
{
_bitset.resize(size);
_enqueued_bytes.resize(size, 0);
_queues.resize(size, boost::circular_buffer(MAX_QUEUE_SIZE));
_history_bytes.resize(size, 0);
_reserve_bytes.resize(size, 0);
_post_bytes.resize(size, 0);
_aux_queues.resize(size);
_in_aux_buff.resize(size, false);
}
GRAS_FORCE_INLINE void InputBufferQueues::init(
const std::vector &input_history_items,
const std::vector &input_reserve_items,
const std::vector &input_item_sizes
){
if (this->size() == 0) return;
const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end());
for (size_t i = 0; i < this->size(); i++)
{
ASSERT(input_reserve_items[i] > 0);
_aux_queues[i] = boost::shared_ptr(new BufferQueue());
//determine byte sizes for buffers and dealing with history
const size_t old_history = _history_bytes[i];
_history_bytes[i] = input_item_sizes[i]*input_history_items[i];
//calculate the input reserve aka reserve size
_reserve_bytes[i] = input_item_sizes[i]*input_reserve_items[i];
_reserve_bytes[i] = std::max(size_t(1), _reserve_bytes[i]);
//calculate the input reserve aka reserve size
_reserve_bytes[i] = std::max(
_history_bytes[i] + _reserve_bytes[i],
_reserve_bytes[i]
);
//post bytes are the desired buffer size to escape the edge case
_post_bytes[i] = std::max(
input_item_sizes[i]*max_history_items + _reserve_bytes[i],
_reserve_bytes[i]
);
//allocate mini buffers for history edge conditions
size_t num_bytes = _post_bytes[i];
_aux_queues[i]->allocate_one(num_bytes);
_aux_queues[i]->allocate_one(num_bytes);
//there is history, so enqueue some initial history
if (_history_bytes[i] > old_history)
{
SBuffer buff = _aux_queues[i]->front();
_aux_queues[i]->pop();
const size_t delta = _history_bytes[i] - old_history;
std::memset(buff.get_actual_memory(), 0, delta);
buff.offset = 0;
buff.length = delta;
this->push(i, buff);
_in_aux_buff[i] = true;
}
if (_history_bytes[i] < old_history)
{
size_t delta = old_history - _history_bytes[i];
delta = std::min(delta, _enqueued_bytes[i]); //FIXME
//TODO consume extra delta on push...? so we dont need std::min
this->consume(i, delta);
}
}
}
GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, const bool conserve_history, bool &potential_inline)
{
//if (_queues[i].empty()) return BuffInfo();
ASSERT(not _queues[i].empty());
ASSERT(this->ready(i));
__prepare(i);
ASSERT(_queues[i].front().length >= _history_bytes[i]);
SBuffer &front = _queues[i].front();
const bool unique = front.unique();
//same buffer, different offset and length
SBuffer buff = front;
if (conserve_history) buff.length -= _history_bytes[i];
//set the flag that this buffer *might* be inlined as an output buffer
potential_inline = unique and (buff.length == front.length);
return buff;
}
GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i)
{
//HERE();
//assumes that we are always pushing proper history buffs on front
//ASSERT(_queues[i].front().length >= _history_bytes[i]);
while (_queues[i].front().length < _reserve_bytes[i])
{
SBuffer &front = _queues[i].front();
SBuffer dst;
//do we need a new buffer:
//- is the buffer unique (queue has only reference)?
//- can its remaining space meet reserve requirements?
const bool enough_space = front.get_actual_length() >= _reserve_bytes[i] + front.offset;
if (enough_space and front.unique())
{
dst = _queues[i].front();
_queues[i].pop_front();
}
else
{
dst = _aux_queues[i]->front();
_aux_queues[i]->pop();
dst.offset = 0;
dst.length = 0;
_in_aux_buff[i] = true;
}
SBuffer src = _queues[i].front();
_queues[i].pop_front();
const size_t dst_tail = dst.get_actual_length() - (dst.offset + dst.length);
const size_t bytes = std::min(dst_tail, src.length);
//const size_t bytes = std::min(std::min(dst_tail, src.length), _post_bytes[i]);
std::memcpy(dst.get(dst.length), src.get(), bytes);
//update buffer additions, consumptions
dst.length += bytes;
src.offset += bytes;
src.length -= bytes;
//keep the source buffer if not fully consumed
if (src.length) _queues[i].push_front(src);
//destination buffer is the new front of the queue
_queues[i].push_front(dst);
}
}
GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t bytes_consumed)
{
//if (bytes_consumed == 0) return true;
//assert that we dont consume past the bounds of the buffer
ASSERT(_queues[i].front().length >= bytes_consumed);
//update bounds on the current buffer
_queues[i].front().offset += bytes_consumed;
_queues[i].front().length -= bytes_consumed;
//safe to pop here when the buffer is consumed and no history
if (_queues[i].front().length == 0 and _history_bytes[i] == 0)
{
_queues[i].pop_front();
}
else if (_in_aux_buff[i] and _queues[i].front().offset >= 2*_history_bytes[i])
{
const SBuffer buff = _queues[i].front();
_queues[i].pop_front();
if (_queues[i].empty())
{
_queues[i].push_front(buff);
}
else
{
_in_aux_buff[i] = false;
const size_t residual = buff.length;
_queues[i].front().length += residual;
_queues[i].front().offset -= residual;
}
}
//update the number of bytes in this queue
ASSERT(_enqueued_bytes[i] >= bytes_consumed);
_enqueued_bytes[i] -= bytes_consumed;
//we have consumed the history, change reqs
if (_enqueued_bytes[i] < _history_bytes[i])
{
_history_bytes[i] = 0;
_reserve_bytes[i] = 1; //cant be 0
}
__update(i);
}
} //namespace gnuradio
#endif /*INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP*/