diff options
-rw-r--r-- | lib/block_task.cpp | 13 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 146 |
2 files changed, 35 insertions, 124 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 935f148..69e99a3 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -104,8 +104,8 @@ void BlockActor::handle_task(void) this->sort_tags(i); ASSERT(this->input_queues.ready(i)); - bool potential_inline; - const SBuffer buff = this->input_queues.front(i, potential_inline); + this->input_queues.accumulate(i, this->input_items_sizes[i]); + const SBuffer &buff = this->input_queues.front(i); void *mem = buff.get(); size_t items = buff.length/this->input_items_sizes[i]; @@ -132,7 +132,7 @@ void BlockActor::handle_task(void) //inline dealings, how and when input buffers can be inlined into output buffers //continue; if ( - potential_inline and + buff.unique() and input_configs[i].inline_buffer and output_inline_index < num_outputs and buff.get_affinity() == this->buffer_affinity @@ -188,7 +188,7 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- forecast //------------------------------------------------------------------ - VAR(work_noutput_items); + //VAR(work_noutput_items); if (this->forecast_enable) { forecast_again_you_jerk: @@ -217,8 +217,7 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ - VAR(work_noutput_items); - if (num_inputs) VAR(work_ninput_items[0]); + //VAR(work_noutput_items); this->work_ret = -1; if (this->interruptible_thread) { @@ -229,7 +228,7 @@ void BlockActor::handle_task(void) this->task_work(); } const size_t noutput_items = size_t(work_ret); - VAR(work_ret); + //VAR(work_ret); if (work_ret == Block::WORK_DONE) { diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 2789f92..1b663c3 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -43,13 +43,21 @@ struct InputBufferQueues void update_history_bytes(const size_t i, const size_t hist_bytes); //! Call to get an input buffer for work - SBuffer front(const size_t i, bool &potential_GRAS_FORCE_INLINE); + GRAS_FORCE_INLINE SBuffer &front(const size_t i) + { + ASSERT(not _queues[i].empty()); + ASSERT(this->ready(i)); + + return _queues[i].front(); + } //! Call when input bytes consumed by work void consume(const size_t i, const size_t bytes_consumed); void resize(const size_t size); + void accumulate(const size_t i, const size_t item_size); + GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buffer) { ASSERT(not _queues[i].full()); @@ -91,8 +99,6 @@ struct InputBufferQueues return _bitset.all(); } - void __prepare(const size_t i); - GRAS_FORCE_INLINE void __update(const size_t i) { _bitset.set(i, _enqueued_bytes[i] != 0); @@ -120,6 +126,8 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size) _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue()); _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); + _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); + _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); } } @@ -150,105 +158,35 @@ inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t _history_bytes[i] = hist_bytes; } -GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, bool &potential_inline) +GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size) { - //if (_queues[i].empty()) return BuffInfo(); - - ASSERT(not _queues[i].empty()); - ASSERT(this->ready(i)); - __prepare(i); - SBuffer &front = _queues[i].front(); - const bool unique = front.unique(); - - //same buffer, different offset and length - SBuffer buff = front; + ASSERT(not _aux_queues[i]->empty()); + SBuffer accum_buff = _aux_queues[i]->front(); + _aux_queues[i]->pop(); + accum_buff.offset = 0; + accum_buff.length = 0; - //set the flag that this buffer *might* be inlined as an output buffer - potential_inline = unique and (buff.length == front.length); + size_t free_bytes = accum_buff.get_actual_length(); + free_bytes /= item_size; free_bytes *= item_size; - return buff; -} - -GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i) -{ - SBufferConfig config; - config.memory = NULL; - config.length = _enqueued_bytes[i]; - SBuffer newbuff(config); - newbuff.offset = 0; - newbuff.length = 0; - - while (not _queues[i].empty()) + while (not _queues[i].empty() and free_bytes != 0) { - std::memcpy(newbuff.get(newbuff.length), _queues[i].front().get(), _queues[i].front().length); - newbuff.length += _queues[i].front().length; - _queues[i].pop_front(); + SBuffer &front = _queues[i].front(); + const size_t bytes = std::min(front.length, free_bytes); + std::memcpy(accum_buff.get(accum_buff.length), front.get(), bytes); + accum_buff.length += bytes; + free_bytes -= bytes; + front.length -= bytes; + front.offset += bytes; + if (front.length == 0) _queues[i].pop_front(); } - _queues[i].push_back(newbuff); + _queues[i].push_front(accum_buff); return; - - - - - /* - - - - - //HERE(); - //assumes that we are always pushing proper history buffs on front - //ASSERT(_queues[i].front().length >= _history_bytes[i]); - - while (_queues[i].front().length < _reserve_bytes[i]) - { - SBuffer &front = _queues[i].front(); - SBuffer dst; - - //do we need a new buffer: - //- is the buffer unique (queue has only reference)? - //- can its remaining space meet reserve requirements? - const bool enough_space = front.get_actual_length() >= _reserve_bytes[i] + front.offset; - if (enough_space and front.unique()) - { - dst = _queues[i].front(); - _queues[i].pop_front(); - } - else - { - dst = _aux_queues[i]->front(); - _aux_queues[i]->pop(); - dst.offset = 0; - dst.length = 0; - _in_aux_buff[i] = true; - } - - SBuffer src = _queues[i].front(); - _queues[i].pop_front(); - const size_t dst_tail = dst.get_actual_length() - (dst.offset + dst.length); - const size_t bytes = std::min(dst_tail, src.length); - //const size_t bytes = std::min(std::min(dst_tail, src.length), _post_bytes[i]); - std::memcpy(dst.get(dst.length), src.get(), bytes); - - //update buffer additions, consumptions - dst.length += bytes; - src.offset += bytes; - src.length -= bytes; - - //keep the source buffer if not fully consumed - if (src.length) _queues[i].push_front(src); - - //destination buffer is the new front of the queue - _queues[i].push_front(dst); - } - */ } - GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t bytes_consumed) { - //if (bytes_consumed == 0) return true; - //assert that we dont consume past the bounds of the buffer ASSERT(_queues[i].front().length >= bytes_consumed); @@ -258,37 +196,11 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b ASSERT(_queues[i].front().offset <= _queues[i].front().get_actual_length()); - //safe to pop here when the buffer is consumed and no history - //if (_queues[i].front().length == 0/* and _history_bytes[i] == 0*/) - { - // _queues[i].pop_front(); - } -/* - else if (_in_aux_buff[i] and _queues[i].front().offset >= 2*_history_bytes[i]) - { - const SBuffer buff = _queues[i].front(); - _queues[i].pop_front(); - - if (_queues[i].empty()) - { - _queues[i].push_front(buff); - } - else - { - _in_aux_buff[i] = false; - const size_t residual = buff.length; - _queues[i].front().length += residual; - _queues[i].front().offset -= residual; - } - } - */ - //update the number of bytes in this queue ASSERT(_enqueued_bytes[i] >= bytes_consumed); _enqueued_bytes[i] -= bytes_consumed; __update(i); - } } //namespace gnuradio |