//
// Copyright 2012 Josh Blum
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

#ifndef INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP
#define INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP

#include <gras_impl/debug.hpp>
#include <gras_impl/bitset.hpp>
#include <gras_impl/buffer_queue.hpp>
#include <gnuradio/sbuffer.hpp>
#include <vector>
#include <queue>
#include <deque>
#include <cstring> //memcpy/memset
#include <boost/circular_buffer.hpp>

namespace gnuradio
{

struct InputBufferQueues
{
    enum {MAX_QUEUE_SIZE = 128};
    enum {MAX_AUX_BUFF_BYTES=(1<<16)};

    ~InputBufferQueues(void)
    {
        this->resize(0);
    }

    void update_history_bytes(const size_t i, const size_t hist_bytes);

    //! Call to get an input buffer for work
    GRAS_FORCE_INLINE SBuffer &front(const size_t i)
    {
        ASSERT(not _queues[i].empty());
        ASSERT(this->ready(i));

        return _queues[i].front();
    }

    //! Call when input bytes consumed by work
    void consume(const size_t i, const size_t bytes_consumed);

    void resize(const size_t size);

    void accumulate(const size_t i, const size_t item_size);

    /*!
     * Can we consider this queue's buffers to be accumulated?
     * Either the first buffer holds all of the enqueued bytes
     * or the first buffer is larger than we can accumulate.
     */
    GRAS_FORCE_INLINE bool is_accumulated(const size_t i) const
    {
        ASSERT(not _queues[i].empty());
        return
            (_queues[i].front().length == _enqueued_bytes[i]) or
            (_queues[i].front().length >= MAX_AUX_BUFF_BYTES);
    }

    GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buffer)
    {
        ASSERT(not _queues[i].full());
        _queues[i].push_back(buffer);
        _enqueued_bytes[i] += _queues[i].back().length;
        __update(i);
    }

    GRAS_FORCE_INLINE void flush(const size_t i)
    {
        _queues[i].clear();
        _bitset.reset(i);
    }

    size_t size(void) const
    {
        return _queues.size();
    }

    GRAS_FORCE_INLINE void flush_all(void)
    {
        const size_t old_size = this->size();
        this->resize(0);
        this->resize(old_size);
    }

    GRAS_FORCE_INLINE bool ready(const size_t i) const
    {
        return _bitset[i];
    }

    GRAS_FORCE_INLINE bool empty(const size_t i) const
    {
        return not _bitset[i];
    }

    GRAS_FORCE_INLINE bool all_ready(void) const
    {
        return _bitset.all();
    }

    GRAS_FORCE_INLINE void __update(const size_t i)
    {
        _bitset.set(i, _enqueued_bytes[i] != 0);
    }

    BitSet _bitset;
    std::vector<size_t> _enqueued_bytes;
    std::vector<boost::circular_buffer<SBuffer> > _queues;
    std::vector<size_t> _history_bytes;
    std::vector<boost::shared_ptr<BufferQueue> > _aux_queues;
};


GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size)
{
    _bitset.resize(size);
    _enqueued_bytes.resize(size, 0);
    _queues.resize(size, boost::circular_buffer<SBuffer>(MAX_QUEUE_SIZE));
    _history_bytes.resize(size, 0);
    _aux_queues.resize(size);

    for (size_t i = 0; i < this->size(); i++)
    {
        if (_aux_queues[i]) continue;
        _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue());
        _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
        _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
        _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
        _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
    }

}

inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t hist_bytes)
{
    //there is history, so enqueue some initial history
    if (hist_bytes > _history_bytes[i])
    {
        SBuffer buff = _aux_queues[i]->front();
        _aux_queues[i]->pop();

        const size_t delta = hist_bytes - _history_bytes[i];
        std::memset(buff.get_actual_memory(), 0, delta);
        buff.offset = 0;
        buff.length = delta;

        this->push(i, buff);
    }
    if (hist_bytes < _history_bytes[i])
    {
        size_t delta = _history_bytes[i] - hist_bytes;
        delta = std::min(delta, _enqueued_bytes[i]); //FIXME
        //TODO consume extra delta on push...? so we dont need std::min
        this->consume(i, delta);
    }

    _history_bytes[i] = hist_bytes;
}

GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size)
{
    ASSERT(not _aux_queues[i]->empty());
    SBuffer accum_buff = _aux_queues[i]->front();
    _aux_queues[i]->pop();
    accum_buff.offset = 0;
    accum_buff.length = 0;

    size_t free_bytes = accum_buff.get_actual_length();
    free_bytes /= item_size; free_bytes *= item_size;

    while (not _queues[i].empty() and free_bytes != 0)
    {
        SBuffer &front = _queues[i].front();
        const size_t bytes = std::min(front.length, free_bytes);
        std::memcpy(accum_buff.get(accum_buff.length), front.get(), bytes);
        accum_buff.length += bytes;
        free_bytes -= bytes;
        front.length -= bytes;
        front.offset += bytes;
        if (front.length == 0) _queues[i].pop_front();
    }

    _queues[i].push_front(accum_buff);

    ASSERT(this->is_accumulated(i));
}

GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t bytes_consumed)
{
    //assert that we dont consume past the bounds of the buffer
    ASSERT(_queues[i].front().length >= bytes_consumed);

    //update bounds on the current buffer
    _queues[i].front().offset += bytes_consumed;
    _queues[i].front().length -= bytes_consumed;

    ASSERT(_queues[i].front().offset <= _queues[i].front().get_actual_length());

    //update the number of bytes in this queue
    ASSERT(_enqueued_bytes[i] >= bytes_consumed);
    _enqueued_bytes[i] -= bytes_consumed;

    __update(i);
}

} //namespace gnuradio

#endif /*INCLUDED_LIBGRAS_IMPL_INPUT_BUFFERS_HPP*/