diff options
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 48 |
1 files changed, 39 insertions, 9 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index c1046c3..a291c01 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -25,6 +25,8 @@ void BlockActor::mark_done(void) { if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first + this->block_ptr->stop(); + //flush partial output buffers to the downstream for (size_t i = 0; i < this->get_num_outputs(); i++) { @@ -71,6 +73,19 @@ void BlockActor::mark_done(void) << std::flush; } +GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i) +{ + //input failed, accumulate and try again + if (not this->input_queues.is_accumulated(i)) + { + this->input_queues.accumulate(i, this->input_items_sizes[i]); + this->Push(SelfKickMessage(), Theron::Address()); + return; + } + //otherwise check for done, else wait for more + if (this->inputs_done[i]) this->mark_done(); +} + void BlockActor::handle_task(void) { #ifdef WORK_DEBUG @@ -102,16 +117,26 @@ void BlockActor::handle_task(void) this->sort_tags(i); ASSERT(this->input_queues.ready(i)); - bool potential_inline; - const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate, potential_inline); + //this->input_queues.accumulate(i, this->input_items_sizes[i]); + const SBuffer &buff = this->input_queues.front(i); void *mem = buff.get(); - const size_t items = buff.length/this->input_items_sizes[i]; + size_t items = buff.length/this->input_items_sizes[i]; this->work_io_ptr_mask |= ptrdiff_t(mem); this->input_items[i].get() = mem; this->input_items[i].size() = items; this->work_input_items[i] = mem; this->work_ninput_items[i] = items; + + if (this->enable_fixed_rate) + { + if (items <= this->input_configs[i].lookahead_items) + { + this->input_fail(i); return; + } + items -= this->input_configs[i].lookahead_items; + } + num_input_items = std::min(num_input_items, items); this->consume_items[i] = 0; this->consume_called[i] = false; @@ -119,7 +144,7 @@ void BlockActor::handle_task(void) //inline dealings, how and when input buffers can be inlined into output buffers //continue; if ( - potential_inline and + buff.unique() and input_configs[i].inline_buffer and output_inline_index < num_outputs and buff.get_affinity() == this->buffer_affinity @@ -175,6 +200,7 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- forecast //------------------------------------------------------------------ + //VAR(work_noutput_items); if (this->forecast_enable) { forecast_again_you_jerk: @@ -184,21 +210,24 @@ void BlockActor::handle_task(void) { if (fcast_ninput_items[i] <= work_ninput_items[i]) continue; + //handle the case of forecast failing + if (work_noutput_items <= this->output_multiple_items) + { + this->input_fail(i); return; + } + work_noutput_items = work_noutput_items/2; //backoff regime work_noutput_items += this->output_multiple_items-1; work_noutput_items /= this->output_multiple_items; work_noutput_items *= this->output_multiple_items; - if (work_noutput_items) goto forecast_again_you_jerk; - - //handle the case of forecast failing - this->mark_done(); - return; + goto forecast_again_you_jerk; } } //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ + //VAR(work_noutput_items); this->work_ret = -1; if (this->interruptible_thread) { @@ -209,6 +238,7 @@ void BlockActor::handle_task(void) this->task_work(); } const size_t noutput_items = size_t(work_ret); + //VAR(work_ret); if (work_ret == Block::WORK_DONE) { |