//
// Copyright 2012 Josh Blum
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see .
#ifndef INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP
#define INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP
#include
#include
#include
#include
#include
#include
#include
#include //memcpy/memset
#include
namespace gras
{
struct InputBufferQueues
{
enum {MAX_QUEUE_SIZE = 128};
enum {MAX_AUX_BUFF_BYTES=(1<<16)};
~InputBufferQueues(void)
{
this->resize(0);
}
void update_history_bytes(const size_t i, const size_t hist_bytes);
//! Call to get an input buffer for work
GRAS_FORCE_INLINE SBuffer &front(const size_t i)
{
ASSERT(not _queues[i].empty());
ASSERT(this->ready(i));
return _queues[i].front();
}
//! Call when input bytes consumed by work
void consume(const size_t i, const size_t bytes_consumed);
void resize(const size_t size);
void accumulate(const size_t i, const size_t item_size);
/*!
* Can we consider this queue's buffers to be accumulated?
* Either the first buffer holds all of the enqueued bytes
* or the first buffer is larger than we can accumulate.
*/
GRAS_FORCE_INLINE bool is_accumulated(const size_t i) const
{
ASSERT(not _queues[i].empty());
return
(_queues[i].front().length == _enqueued_bytes[i]) or
(_queues[i].front().length >= MAX_AUX_BUFF_BYTES);
}
GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buffer)
{
ASSERT(not _queues[i].full());
_queues[i].push_back(buffer);
_enqueued_bytes[i] += _queues[i].back().length;
__update(i);
}
GRAS_FORCE_INLINE void flush(const size_t i)
{
_queues[i].clear();
_bitset.reset(i);
}
size_t size(void) const
{
return _queues.size();
}
GRAS_FORCE_INLINE void flush_all(void)
{
const size_t old_size = this->size();
this->resize(0);
this->resize(old_size);
}
GRAS_FORCE_INLINE bool ready(const size_t i) const
{
return _bitset[i];
}
GRAS_FORCE_INLINE bool empty(const size_t i) const
{
return not _bitset[i];
}
GRAS_FORCE_INLINE bool all_ready(void) const
{
return _bitset.all();
}
GRAS_FORCE_INLINE void __update(const size_t i)
{
_bitset.set(i, _enqueued_bytes[i] != 0);
}
BitSet _bitset;
std::vector _enqueued_bytes;
std::vector > _queues;
std::vector _history_bytes;
std::vector > _aux_queues;
};
GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size)
{
_bitset.resize(size);
_enqueued_bytes.resize(size, 0);
_queues.resize(size, boost::circular_buffer(MAX_QUEUE_SIZE));
_history_bytes.resize(size, 0);
_aux_queues.resize(size);
for (size_t i = 0; i < this->size(); i++)
{
if (_aux_queues[i]) continue;
_aux_queues[i] = boost::shared_ptr(new BufferQueue());
_aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
_aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
_aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
_aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
}
}
inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t hist_bytes)
{
//there is history, so enqueue some initial history
if (hist_bytes > _history_bytes[i])
{
SBuffer buff = _aux_queues[i]->front();
_aux_queues[i]->pop();
const size_t delta = hist_bytes - _history_bytes[i];
std::memset(buff.get_actual_memory(), 0, delta);
buff.offset = 0;
buff.length = delta;
this->push(i, buff);
}
if (hist_bytes < _history_bytes[i])
{
size_t delta = _history_bytes[i] - hist_bytes;
delta = std::min(delta, _enqueued_bytes[i]); //FIXME
//TODO consume extra delta on push...? so we dont need std::min
this->consume(i, delta);
}
_history_bytes[i] = hist_bytes;
}
GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size)
{
ASSERT(not _aux_queues[i]->empty());
SBuffer accum_buff = _aux_queues[i]->front();
_aux_queues[i]->pop();
accum_buff.offset = 0;
accum_buff.length = 0;
size_t free_bytes = accum_buff.get_actual_length();
free_bytes /= item_size; free_bytes *= item_size;
while (not _queues[i].empty() and free_bytes != 0)
{
SBuffer &front = _queues[i].front();
const size_t bytes = std::min(front.length, free_bytes);
std::memcpy(accum_buff.get(accum_buff.length), front.get(), bytes);
accum_buff.length += bytes;
free_bytes -= bytes;
front.length -= bytes;
front.offset += bytes;
if (front.length == 0) _queues[i].pop_front();
}
_queues[i].push_front(accum_buff);
ASSERT(this->is_accumulated(i));
}
GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t bytes_consumed)
{
//assert that we dont consume past the bounds of the buffer
ASSERT(_queues[i].front().length >= bytes_consumed);
//update bounds on the current buffer
_queues[i].front().offset += bytes_consumed;
_queues[i].front().length -= bytes_consumed;
ASSERT(_queues[i].front().offset <= _queues[i].front().get_actual_length());
//update the number of bytes in this queue
ASSERT(_enqueued_bytes[i] >= bytes_consumed);
_enqueued_bytes[i] -= bytes_consumed;
__update(i);
}
} //namespace gras
#endif /*INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP*/