diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block.cpp | 18 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 6 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 18 | ||||
-rw-r--r-- | lib/block_task.cpp | 48 | ||||
-rw-r--r-- | lib/gr_block.cpp | 23 | ||||
-rw-r--r-- | lib/gr_top_block.cpp | 10 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 4 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 239 | ||||
-rw-r--r-- | lib/gras_impl/output_buffer_queues.hpp | 4 | ||||
-rw-r--r-- | lib/hier_block.cpp | 5 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 5 | ||||
-rw-r--r-- | lib/top_block.cpp | 3 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 12 |
13 files changed, 160 insertions, 235 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index f236ca4..6b27ab2 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -128,6 +128,24 @@ void Block::set_fixed_rate(const bool fixed_rate) (*this)->block->enable_fixed_rate = fixed_rate; } +bool Block::fixed_rate(void) const +{ + return (*this)->block->enable_fixed_rate; +} + +void Block::set_output_multiple(const size_t multiple) +{ + (*this)->block->output_multiple_items = multiple; + gnuradio::OutputPortConfig config = this->output_config(); + config.reserve_items = multiple; + this->set_output_config(config); +} + +size_t Block::output_multiple(void) const +{ + return (*this)->block->output_multiple_items; +} + void Block::set_relative_rate(double relative_rate) { (*this)->block->relative_rate = relative_rate; diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 9b70824..05b35ac 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -78,12 +78,6 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address { MESSAGE_TRACER(); - if (this->block_state == BLOCK_STATE_DONE) - { - this->Send(0, from); //ACK - return; - } - //allocate output buffers which will also wake up the task const size_t num_outputs = this->get_num_outputs(); this->output_buffer_tokens.resize(num_outputs); diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 2c14643..ca42b45 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -27,12 +27,6 @@ void BlockActor::handle_top_active( ){ MESSAGE_TRACER(); - if (this->block_state == BLOCK_STATE_DONE) - { - this->Send(0, from); //ACK - return; - } - if (this->block_state != BLOCK_STATE_LIVE) { this->block_ptr->start(); @@ -50,10 +44,6 @@ void BlockActor::handle_top_inert( ){ MESSAGE_TRACER(); - if (this->block_state != BLOCK_STATE_DONE) - { - this->block_ptr->stop(); - } this->mark_done(); this->Send(0, from); //ACK @@ -65,12 +55,6 @@ void BlockActor::handle_top_token( ){ MESSAGE_TRACER(); - if (this->block_state == BLOCK_STATE_DONE) - { - this->Send(0, from); //ACK - return; - } - //create input tokens and send allocation hints for (size_t i = 0; i < this->get_num_inputs(); i++) { @@ -84,7 +68,7 @@ void BlockActor::handle_top_token( //tell the upstream about the input requirements OutputHintMessage output_hints; output_hints.history_bytes = this->input_configs[i].lookahead_items*this->input_items_sizes[i]; - output_hints.reserve_bytes = this->input_reserve_items[i]; + output_hints.reserve_bytes = size_t(std::ceil(this->output_multiple_items/this->relative_rate)); output_hints.token = this->input_tokens[i]; this->post_upstream(i, output_hints); diff --git a/lib/block_task.cpp b/lib/block_task.cpp index c1046c3..a291c01 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -25,6 +25,8 @@ void BlockActor::mark_done(void) { if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first + this->block_ptr->stop(); + //flush partial output buffers to the downstream for (size_t i = 0; i < this->get_num_outputs(); i++) { @@ -71,6 +73,19 @@ void BlockActor::mark_done(void) << std::flush; } +GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i) +{ + //input failed, accumulate and try again + if (not this->input_queues.is_accumulated(i)) + { + this->input_queues.accumulate(i, this->input_items_sizes[i]); + this->Push(SelfKickMessage(), Theron::Address()); + return; + } + //otherwise check for done, else wait for more + if (this->inputs_done[i]) this->mark_done(); +} + void BlockActor::handle_task(void) { #ifdef WORK_DEBUG @@ -102,16 +117,26 @@ void BlockActor::handle_task(void) this->sort_tags(i); ASSERT(this->input_queues.ready(i)); - bool potential_inline; - const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate, potential_inline); + //this->input_queues.accumulate(i, this->input_items_sizes[i]); + const SBuffer &buff = this->input_queues.front(i); void *mem = buff.get(); - const size_t items = buff.length/this->input_items_sizes[i]; + size_t items = buff.length/this->input_items_sizes[i]; this->work_io_ptr_mask |= ptrdiff_t(mem); this->input_items[i].get() = mem; this->input_items[i].size() = items; this->work_input_items[i] = mem; this->work_ninput_items[i] = items; + + if (this->enable_fixed_rate) + { + if (items <= this->input_configs[i].lookahead_items) + { + this->input_fail(i); return; + } + items -= this->input_configs[i].lookahead_items; + } + num_input_items = std::min(num_input_items, items); this->consume_items[i] = 0; this->consume_called[i] = false; @@ -119,7 +144,7 @@ void BlockActor::handle_task(void) //inline dealings, how and when input buffers can be inlined into output buffers //continue; if ( - potential_inline and + buff.unique() and input_configs[i].inline_buffer and output_inline_index < num_outputs and buff.get_affinity() == this->buffer_affinity @@ -175,6 +200,7 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- forecast //------------------------------------------------------------------ + //VAR(work_noutput_items); if (this->forecast_enable) { forecast_again_you_jerk: @@ -184,21 +210,24 @@ void BlockActor::handle_task(void) { if (fcast_ninput_items[i] <= work_ninput_items[i]) continue; + //handle the case of forecast failing + if (work_noutput_items <= this->output_multiple_items) + { + this->input_fail(i); return; + } + work_noutput_items = work_noutput_items/2; //backoff regime work_noutput_items += this->output_multiple_items-1; work_noutput_items /= this->output_multiple_items; work_noutput_items *= this->output_multiple_items; - if (work_noutput_items) goto forecast_again_you_jerk; - - //handle the case of forecast failing - this->mark_done(); - return; + goto forecast_again_you_jerk; } } //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ + //VAR(work_noutput_items); this->work_ret = -1; if (this->interruptible_thread) { @@ -209,6 +238,7 @@ void BlockActor::handle_task(void) this->task_work(); } const size_t noutput_items = size_t(work_ret); + //VAR(work_ret); if (work_ret == Block::WORK_DONE) { diff --git a/lib/gr_block.cpp b/lib/gr_block.cpp index 834bbd5..b4e7c04 100644 --- a/lib/gr_block.cpp +++ b/lib/gr_block.cpp @@ -80,7 +80,14 @@ bool gr_block::is_unaligned(void) size_t gr_block::fixed_rate_noutput_to_ninput(const size_t noutput_items) { - return size_t((noutput_items/this->relative_rate())); + if (this->fixed_rate()) + { + return size_t(0.5 + (noutput_items/this->relative_rate())) + this->history() - 1; + } + else + { + return noutput_items + this->history() - 1; + } } size_t gr_block::interpolation(void) const @@ -91,6 +98,7 @@ size_t gr_block::interpolation(void) const void gr_block::set_interpolation(const size_t interp) { this->set_relative_rate(1.0*interp); + this->set_output_multiple(interp); } size_t gr_block::decimation(void) const @@ -118,19 +126,6 @@ void gr_block::set_history(unsigned history) this->set_input_config(config); } -unsigned gr_block::output_multiple(void) const -{ - return (*this)->block->output_multiple_items; -} - -void gr_block::set_output_multiple(unsigned multiple) -{ - (*this)->block->output_multiple_items = multiple; - gnuradio::OutputPortConfig config = this->output_config(); - config.reserve_items = multiple; - this->set_output_config(config); -} - int gr_block::max_noutput_items(void) const { return this->output_config().maximum_items; diff --git a/lib/gr_top_block.cpp b/lib/gr_top_block.cpp index 6c023b1..c51f9b3 100644 --- a/lib/gr_top_block.cpp +++ b/lib/gr_top_block.cpp @@ -34,16 +34,6 @@ gr_top_block_sptr gr_make_top_block(const std::string &name) return gr_top_block_sptr(new gr_top_block(name)); } -void gr_top_block::lock(void) -{ - //NOP -} - -void gr_top_block::unlock(void) -{ - this->commit(); -} - void gr_top_block::start(void) { gnuradio::TopBlock::start(); diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 5dd7421..d32c105 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -110,11 +110,12 @@ struct BlockActor : Apology::Worker void buffer_returner(const size_t index, SBuffer &buffer); void mark_done(void); void handle_task(void); + void input_fail(const size_t index); void sort_tags(const size_t index); void trim_tags(const size_t index); GRAS_FORCE_INLINE bool any_inputs_done(void) { - if (this->inputs_done.none()) return false; + if (this->inputs_done.none() or this->input_queues.all_ready()) return false; for (size_t i = 0; i < this->get_num_inputs(); i++) { if (this->inputs_done[i] and not this->input_queues.ready(i)) @@ -130,7 +131,6 @@ struct BlockActor : Apology::Worker std::vector<size_t> output_items_sizes; std::vector<InputPortConfig> input_configs; std::vector<OutputPortConfig> output_configs; - std::vector<size_t> input_reserve_items; size_t output_multiple_items; //keeps track of production diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 5b610d1..d17c4f0 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -33,26 +33,44 @@ namespace gnuradio struct InputBufferQueues { enum {MAX_QUEUE_SIZE = 128}; + enum {MAX_AUX_BUFF_BYTES=(1<<16)}; ~InputBufferQueues(void) { this->resize(0); } - void init( - const std::vector<size_t> &input_history_items, - const std::vector<size_t> &input_reserve_items, - const std::vector<size_t> &input_item_sizes - ); + void update_history_bytes(const size_t i, const size_t hist_bytes); //! Call to get an input buffer for work - SBuffer front(const size_t i, const bool conserve_history, bool &potential_GRAS_FORCE_INLINE); + GRAS_FORCE_INLINE SBuffer &front(const size_t i) + { + ASSERT(not _queues[i].empty()); + ASSERT(this->ready(i)); + + return _queues[i].front(); + } //! Call when input bytes consumed by work void consume(const size_t i, const size_t bytes_consumed); void resize(const size_t size); + void accumulate(const size_t i, const size_t item_size); + + /*! + * Can we consider this queue's buffers to be accumulated? + * Either the first buffer holds all of the enqueued bytes + * or the first buffer is larger than we can accumulate. + */ + GRAS_FORCE_INLINE bool is_accumulated(const size_t i) const + { + ASSERT(not _queues[i].empty()); + return + (_queues[i].front().length == _enqueued_bytes[i]) or + (_queues[i].front().length >= MAX_AUX_BUFF_BYTES); + } + GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buffer) { ASSERT(not _queues[i].full()); @@ -94,21 +112,16 @@ struct InputBufferQueues return _bitset.all(); } - void __prepare(const size_t i); - GRAS_FORCE_INLINE void __update(const size_t i) { - _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i]); + _bitset.set(i, _enqueued_bytes[i] != 0); } BitSet _bitset; std::vector<size_t> _enqueued_bytes; std::vector<boost::circular_buffer<SBuffer> > _queues; std::vector<size_t> _history_bytes; - std::vector<size_t> _reserve_bytes; - std::vector<size_t> _post_bytes; std::vector<boost::shared_ptr<BufferQueue> > _aux_queues; - std::vector<bool> _in_aux_buff; }; @@ -118,152 +131,76 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size) _enqueued_bytes.resize(size, 0); _queues.resize(size, boost::circular_buffer<SBuffer>(MAX_QUEUE_SIZE)); _history_bytes.resize(size, 0); - _reserve_bytes.resize(size, 0); - _post_bytes.resize(size, 0); _aux_queues.resize(size); - _in_aux_buff.resize(size, false); -} - -GRAS_FORCE_INLINE void InputBufferQueues::init( - const std::vector<size_t> &input_history_items, - const std::vector<size_t> &input_reserve_items, - const std::vector<size_t> &input_item_sizes -){ - if (this->size() == 0) return; - - const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end()); for (size_t i = 0; i < this->size(); i++) { - ASSERT(input_reserve_items[i] > 0); - + if (_aux_queues[i]) continue; _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue()); - - //determine byte sizes for buffers and dealing with history - const size_t old_history = _history_bytes[i]; - _history_bytes[i] = input_item_sizes[i]*input_history_items[i]; - - //calculate the input reserve aka reserve size - _reserve_bytes[i] = input_item_sizes[i]*input_reserve_items[i]; - _reserve_bytes[i] = std::max(size_t(1), _reserve_bytes[i]); - - //calculate the input reserve aka reserve size - _reserve_bytes[i] = std::max( - _history_bytes[i] + _reserve_bytes[i], - _reserve_bytes[i] - ); - - //post bytes are the desired buffer size to escape the edge case - _post_bytes[i] = std::max( - input_item_sizes[i]*max_history_items + _reserve_bytes[i], - _reserve_bytes[i] - ); - - //allocate mini buffers for history edge conditions - size_t num_bytes = _post_bytes[i]; - _aux_queues[i]->allocate_one(num_bytes); - _aux_queues[i]->allocate_one(num_bytes); - - //there is history, so enqueue some initial history - if (_history_bytes[i] > old_history) - { - SBuffer buff = _aux_queues[i]->front(); - _aux_queues[i]->pop(); - - const size_t delta = _history_bytes[i] - old_history; - std::memset(buff.get_actual_memory(), 0, delta); - buff.offset = 0; - buff.length = delta; - - this->push(i, buff); - _in_aux_buff[i] = true; - } - if (_history_bytes[i] < old_history) - { - size_t delta = old_history - _history_bytes[i]; - delta = std::min(delta, _enqueued_bytes[i]); //FIXME - //TODO consume extra delta on push...? so we dont need std::min - this->consume(i, delta); - } + _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); + _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); + _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); + _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); } -} +} -GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, const bool conserve_history, bool &potential_inline) +inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t hist_bytes) { - //if (_queues[i].empty()) return BuffInfo(); - - ASSERT(not _queues[i].empty()); - ASSERT(this->ready(i)); - __prepare(i); - ASSERT(_queues[i].front().length >= _history_bytes[i]); - SBuffer &front = _queues[i].front(); - const bool unique = front.unique(); + //there is history, so enqueue some initial history + if (hist_bytes > _history_bytes[i]) + { + SBuffer buff = _aux_queues[i]->front(); + _aux_queues[i]->pop(); - //same buffer, different offset and length - SBuffer buff = front; - if (conserve_history) buff.length -= _history_bytes[i]; + const size_t delta = hist_bytes - _history_bytes[i]; + std::memset(buff.get_actual_memory(), 0, delta); + buff.offset = 0; + buff.length = delta; - //set the flag that this buffer *might* be inlined as an output buffer - potential_inline = unique and (buff.length == front.length); + this->push(i, buff); + } + if (hist_bytes < _history_bytes[i]) + { + size_t delta = _history_bytes[i] - hist_bytes; + delta = std::min(delta, _enqueued_bytes[i]); //FIXME + //TODO consume extra delta on push...? so we dont need std::min + this->consume(i, delta); + } - return buff; + _history_bytes[i] = hist_bytes; } -GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i) +GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size) { - //HERE(); - //assumes that we are always pushing proper history buffs on front - //ASSERT(_queues[i].front().length >= _history_bytes[i]); + ASSERT(not _aux_queues[i]->empty()); + SBuffer accum_buff = _aux_queues[i]->front(); + _aux_queues[i]->pop(); + accum_buff.offset = 0; + accum_buff.length = 0; - while (_queues[i].front().length < _reserve_bytes[i]) + size_t free_bytes = accum_buff.get_actual_length(); + free_bytes /= item_size; free_bytes *= item_size; + + while (not _queues[i].empty() and free_bytes != 0) { SBuffer &front = _queues[i].front(); - SBuffer dst; - - //do we need a new buffer: - //- is the buffer unique (queue has only reference)? - //- can its remaining space meet reserve requirements? - const bool enough_space = front.get_actual_length() >= _reserve_bytes[i] + front.offset; - if (enough_space and front.unique()) - { - dst = _queues[i].front(); - _queues[i].pop_front(); - } - else - { - dst = _aux_queues[i]->front(); - _aux_queues[i]->pop(); - dst.offset = 0; - dst.length = 0; - _in_aux_buff[i] = true; - } - - SBuffer src = _queues[i].front(); - _queues[i].pop_front(); - const size_t dst_tail = dst.get_actual_length() - (dst.offset + dst.length); - const size_t bytes = std::min(dst_tail, src.length); - //const size_t bytes = std::min(std::min(dst_tail, src.length), _post_bytes[i]); - std::memcpy(dst.get(dst.length), src.get(), bytes); - - //update buffer additions, consumptions - dst.length += bytes; - src.offset += bytes; - src.length -= bytes; - - //keep the source buffer if not fully consumed - if (src.length) _queues[i].push_front(src); - - //destination buffer is the new front of the queue - _queues[i].push_front(dst); + const size_t bytes = std::min(front.length, free_bytes); + std::memcpy(accum_buff.get(accum_buff.length), front.get(), bytes); + accum_buff.length += bytes; + free_bytes -= bytes; + front.length -= bytes; + front.offset += bytes; + if (front.length == 0) _queues[i].pop_front(); } -} + _queues[i].push_front(accum_buff); + + ASSERT(this->is_accumulated(i)); +} GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t bytes_consumed) { - //if (bytes_consumed == 0) return true; - //assert that we dont consume past the bounds of the buffer ASSERT(_queues[i].front().length >= bytes_consumed); @@ -271,43 +208,13 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b _queues[i].front().offset += bytes_consumed; _queues[i].front().length -= bytes_consumed; - //safe to pop here when the buffer is consumed and no history - if (_queues[i].front().length == 0 and _history_bytes[i] == 0) - { - _queues[i].pop_front(); - } - - else if (_in_aux_buff[i] and _queues[i].front().offset >= 2*_history_bytes[i]) - { - const SBuffer buff = _queues[i].front(); - _queues[i].pop_front(); - - if (_queues[i].empty()) - { - _queues[i].push_front(buff); - } - else - { - _in_aux_buff[i] = false; - const size_t residual = buff.length; - _queues[i].front().length += residual; - _queues[i].front().offset -= residual; - } - } + ASSERT(_queues[i].front().offset <= _queues[i].front().get_actual_length()); //update the number of bytes in this queue ASSERT(_enqueued_bytes[i] >= bytes_consumed); _enqueued_bytes[i] -= bytes_consumed; - //we have consumed the history, change reqs - if (_enqueued_bytes[i] < _history_bytes[i]) - { - _history_bytes[i] = 0; - _reserve_bytes[i] = 1; //cant be 0 - } - __update(i); - } } //namespace gnuradio diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp index a740b2d..1f07af0 100644 --- a/lib/gras_impl/output_buffer_queues.hpp +++ b/lib/gras_impl/output_buffer_queues.hpp @@ -76,9 +76,7 @@ struct OutputBufferQueues GRAS_FORCE_INLINE void flush_all(void) { - _queues.clear(); - _queues.resize(_bitset.size()); - _bitset.reset(); + for (size_t i = 0; i < this->size(); i++) this->flush(i); } GRAS_FORCE_INLINE bool ready(const size_t i) const diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp index ef29cc2..422ed77 100644 --- a/lib/hier_block.cpp +++ b/lib/hier_block.cpp @@ -76,3 +76,8 @@ void HierBlock::disconnect_all(void) { (*this)->topology->clear_all(); } + +void HierBlock::commit(void) +{ + (*this)->topology->commit(); +} diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index 574aee4..30bb613 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -60,6 +60,11 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther { this->mark_done(); } + //or re-enter handle task so forecast logic can mark done + else + { + this->handle_task(); + } } void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address) diff --git a/lib/top_block.cpp b/lib/top_block.cpp index bcce1bf..a09744e 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -47,6 +47,8 @@ TopBlock::TopBlock(const std::string &name): void ElementImpl::top_block_cleanup(void) { this->executor->post_all(TopInertMessage()); + this->topology->clear_all();; + this->executor->commit(); if (ARMAGEDDON) std::cerr << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n" << "xx Top Block Destroyed: " << name << "\n" @@ -105,6 +107,7 @@ void TopBlock::run(void) { this->start(); this->wait(); + this->disconnect_all(); } GRAS_FORCE_INLINE void wait_thread_yield(void) diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index cc57ed6..5ad9378 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -106,18 +106,14 @@ void BlockActor::handle_update_inputs( const UpdateInputsMessage &, const Theron::Address ){ + MESSAGE_TRACER(); const size_t num_inputs = this->get_num_inputs(); + this->input_queues.resize(num_inputs); //impose input reserve requirements based on relative rate and output multiple - resize_fill_grow(this->input_reserve_items, num_inputs, 1); - std::vector<size_t> input_lookahead_items(num_inputs); for (size_t i = 0; i < num_inputs; i++) { - input_lookahead_items[i] = this->input_configs[i].lookahead_items; - this->input_reserve_items[i] = size_t(std::ceil(this->output_multiple_items/this->relative_rate)); - if (this->input_reserve_items[i] == 0) this->input_reserve_items[i] = 1; + const size_t hist_bytes = this->input_items_sizes[i]*this->input_configs[i].lookahead_items; + this->input_queues.update_history_bytes(i, hist_bytes); } - - //init the history comprehension on input queues - this->input_queues.init(input_lookahead_items, this->input_reserve_items, this->input_items_sizes); } |