summaryrefslogtreecommitdiff
path: root/lib/gras_impl/input_buffer_queues.hpp
diff options
context:
space:
mode:
authorJosh Blum2012-10-14 20:00:49 -0700
committerJosh Blum2012-10-14 20:00:49 -0700
commitabc6715098cc5ca4d83d1227b2c9ca98e33b4a86 (patch)
tree304234322d66954c841908b0e51fba75143b9b2f /lib/gras_impl/input_buffer_queues.hpp
parent7962eb546821ddd98f57fa4fb60a8192bf4e34df (diff)
downloadsandhi-abc6715098cc5ca4d83d1227b2c9ca98e33b4a86.tar.gz
sandhi-abc6715098cc5ca4d83d1227b2c9ca98e33b4a86.tar.bz2
sandhi-abc6715098cc5ca4d83d1227b2c9ca98e33b4a86.zip
input_buffer_queues logic cleanup for accumulate implementation
Diffstat (limited to 'lib/gras_impl/input_buffer_queues.hpp')
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp146
1 files changed, 29 insertions, 117 deletions
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 2789f92..1b663c3 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -43,13 +43,21 @@ struct InputBufferQueues
void update_history_bytes(const size_t i, const size_t hist_bytes);
//! Call to get an input buffer for work
- SBuffer front(const size_t i, bool &potential_GRAS_FORCE_INLINE);
+ GRAS_FORCE_INLINE SBuffer &front(const size_t i)
+ {
+ ASSERT(not _queues[i].empty());
+ ASSERT(this->ready(i));
+
+ return _queues[i].front();
+ }
//! Call when input bytes consumed by work
void consume(const size_t i, const size_t bytes_consumed);
void resize(const size_t size);
+ void accumulate(const size_t i, const size_t item_size);
+
GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buffer)
{
ASSERT(not _queues[i].full());
@@ -91,8 +99,6 @@ struct InputBufferQueues
return _bitset.all();
}
- void __prepare(const size_t i);
-
GRAS_FORCE_INLINE void __update(const size_t i)
{
_bitset.set(i, _enqueued_bytes[i] != 0);
@@ -120,6 +126,8 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size)
_aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue());
_aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
_aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
+ _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
+ _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
}
}
@@ -150,105 +158,35 @@ inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t
_history_bytes[i] = hist_bytes;
}
-GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, bool &potential_inline)
+GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size)
{
- //if (_queues[i].empty()) return BuffInfo();
-
- ASSERT(not _queues[i].empty());
- ASSERT(this->ready(i));
- __prepare(i);
- SBuffer &front = _queues[i].front();
- const bool unique = front.unique();
-
- //same buffer, different offset and length
- SBuffer buff = front;
+ ASSERT(not _aux_queues[i]->empty());
+ SBuffer accum_buff = _aux_queues[i]->front();
+ _aux_queues[i]->pop();
+ accum_buff.offset = 0;
+ accum_buff.length = 0;
- //set the flag that this buffer *might* be inlined as an output buffer
- potential_inline = unique and (buff.length == front.length);
+ size_t free_bytes = accum_buff.get_actual_length();
+ free_bytes /= item_size; free_bytes *= item_size;
- return buff;
-}
-
-GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i)
-{
- SBufferConfig config;
- config.memory = NULL;
- config.length = _enqueued_bytes[i];
- SBuffer newbuff(config);
- newbuff.offset = 0;
- newbuff.length = 0;
-
- while (not _queues[i].empty())
+ while (not _queues[i].empty() and free_bytes != 0)
{
- std::memcpy(newbuff.get(newbuff.length), _queues[i].front().get(), _queues[i].front().length);
- newbuff.length += _queues[i].front().length;
- _queues[i].pop_front();
+ SBuffer &front = _queues[i].front();
+ const size_t bytes = std::min(front.length, free_bytes);
+ std::memcpy(accum_buff.get(accum_buff.length), front.get(), bytes);
+ accum_buff.length += bytes;
+ free_bytes -= bytes;
+ front.length -= bytes;
+ front.offset += bytes;
+ if (front.length == 0) _queues[i].pop_front();
}
- _queues[i].push_back(newbuff);
+ _queues[i].push_front(accum_buff);
return;
-
-
-
-
- /*
-
-
-
-
- //HERE();
- //assumes that we are always pushing proper history buffs on front
- //ASSERT(_queues[i].front().length >= _history_bytes[i]);
-
- while (_queues[i].front().length < _reserve_bytes[i])
- {
- SBuffer &front = _queues[i].front();
- SBuffer dst;
-
- //do we need a new buffer:
- //- is the buffer unique (queue has only reference)?
- //- can its remaining space meet reserve requirements?
- const bool enough_space = front.get_actual_length() >= _reserve_bytes[i] + front.offset;
- if (enough_space and front.unique())
- {
- dst = _queues[i].front();
- _queues[i].pop_front();
- }
- else
- {
- dst = _aux_queues[i]->front();
- _aux_queues[i]->pop();
- dst.offset = 0;
- dst.length = 0;
- _in_aux_buff[i] = true;
- }
-
- SBuffer src = _queues[i].front();
- _queues[i].pop_front();
- const size_t dst_tail = dst.get_actual_length() - (dst.offset + dst.length);
- const size_t bytes = std::min(dst_tail, src.length);
- //const size_t bytes = std::min(std::min(dst_tail, src.length), _post_bytes[i]);
- std::memcpy(dst.get(dst.length), src.get(), bytes);
-
- //update buffer additions, consumptions
- dst.length += bytes;
- src.offset += bytes;
- src.length -= bytes;
-
- //keep the source buffer if not fully consumed
- if (src.length) _queues[i].push_front(src);
-
- //destination buffer is the new front of the queue
- _queues[i].push_front(dst);
- }
- */
}
-
GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t bytes_consumed)
{
- //if (bytes_consumed == 0) return true;
-
//assert that we dont consume past the bounds of the buffer
ASSERT(_queues[i].front().length >= bytes_consumed);
@@ -258,37 +196,11 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b
ASSERT(_queues[i].front().offset <= _queues[i].front().get_actual_length());
- //safe to pop here when the buffer is consumed and no history
- //if (_queues[i].front().length == 0/* and _history_bytes[i] == 0*/)
- {
- // _queues[i].pop_front();
- }
-/*
- else if (_in_aux_buff[i] and _queues[i].front().offset >= 2*_history_bytes[i])
- {
- const SBuffer buff = _queues[i].front();
- _queues[i].pop_front();
-
- if (_queues[i].empty())
- {
- _queues[i].push_front(buff);
- }
- else
- {
- _in_aux_buff[i] = false;
- const size_t residual = buff.length;
- _queues[i].front().length += residual;
- _queues[i].front().offset -= residual;
- }
- }
- */
-
//update the number of bytes in this queue
ASSERT(_enqueued_bytes[i] >= bytes_consumed);
_enqueued_bytes[i] -= bytes_consumed;
__update(i);
-
}
} //namespace gnuradio