diff options
author | Josh Blum | 2012-10-13 13:22:18 -0700 |
---|---|---|
committer | Josh Blum | 2012-10-13 13:22:18 -0700 |
commit | 8354c9c4c92687904cedae0b0feb702c2d7e8408 (patch) | |
tree | 4c063baf0dfa3a06a6f090b2346b593babaa2efb /lib/block_task.cpp | |
parent | f9c854d3cdd0a9337b41842d15a13d6df4ece408 (diff) | |
download | sandhi-8354c9c4c92687904cedae0b0feb702c2d7e8408.tar.gz sandhi-8354c9c4c92687904cedae0b0feb702c2d7e8408.tar.bz2 sandhi-8354c9c4c92687904cedae0b0feb702c2d7e8408.zip |
lots of mini tweaks for QA passing, WIP...
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 14 |
1 files changed, 11 insertions, 3 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 66669d1..1570064 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -25,6 +25,8 @@ void BlockActor::mark_done(void) { if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first + this->block_ptr->stop(); + //flush partial output buffers to the downstream for (size_t i = 0; i < this->get_num_outputs(); i++) { @@ -103,15 +105,19 @@ void BlockActor::handle_task(void) ASSERT(this->input_queues.ready(i)); bool potential_inline; - const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate, potential_inline); + const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate and num_outputs, potential_inline); void *mem = buff.get(); - const size_t items = buff.length/this->input_items_sizes[i]; + size_t items = buff.length/this->input_items_sizes[i]; this->work_io_ptr_mask |= ptrdiff_t(mem); this->input_items[i].get() = mem; this->input_items[i].size() = items; this->work_input_items[i] = mem; this->work_ninput_items[i] = items; + if (this->enable_fixed_rate) + { + items = std::max<int>(0, int(items) - int(this->input_configs[i].lookahead_items)); + } num_input_items = std::min(num_input_items, items); this->consume_items[i] = 0; this->consume_called[i] = false; @@ -184,11 +190,12 @@ void BlockActor::handle_task(void) { if (fcast_ninput_items[i] <= work_ninput_items[i]) continue; + const size_t work_noutput_items_last = work_noutput_items; work_noutput_items = work_noutput_items/2; //backoff regime work_noutput_items += this->output_multiple_items-1; work_noutput_items /= this->output_multiple_items; work_noutput_items *= this->output_multiple_items; - if (work_noutput_items) goto forecast_again_you_jerk; + if (work_noutput_items and work_noutput_items_last != work_noutput_items) goto forecast_again_you_jerk; //handle the case of forecast failing //TODO accumulate input here, only done if inputs done and already accumulated @@ -210,6 +217,7 @@ void BlockActor::handle_task(void) this->task_work(); } const size_t noutput_items = size_t(work_ret); + VAR(work_ret); if (work_ret == Block::WORK_DONE) { |