summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-10-13 13:22:18 -0700
committerJosh Blum2012-10-13 13:22:18 -0700
commit8354c9c4c92687904cedae0b0feb702c2d7e8408 (patch)
tree4c063baf0dfa3a06a6f090b2346b593babaa2efb /lib/block_task.cpp
parentf9c854d3cdd0a9337b41842d15a13d6df4ece408 (diff)
downloadsandhi-8354c9c4c92687904cedae0b0feb702c2d7e8408.tar.gz
sandhi-8354c9c4c92687904cedae0b0feb702c2d7e8408.tar.bz2
sandhi-8354c9c4c92687904cedae0b0feb702c2d7e8408.zip
lots of mini tweaks for QA passing, WIP...
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp14
1 files changed, 11 insertions, 3 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 66669d1..1570064 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -25,6 +25,8 @@ void BlockActor::mark_done(void)
{
if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
+ this->block_ptr->stop();
+
//flush partial output buffers to the downstream
for (size_t i = 0; i < this->get_num_outputs(); i++)
{
@@ -103,15 +105,19 @@ void BlockActor::handle_task(void)
ASSERT(this->input_queues.ready(i));
bool potential_inline;
- const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate, potential_inline);
+ const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate and num_outputs, potential_inline);
void *mem = buff.get();
- const size_t items = buff.length/this->input_items_sizes[i];
+ size_t items = buff.length/this->input_items_sizes[i];
this->work_io_ptr_mask |= ptrdiff_t(mem);
this->input_items[i].get() = mem;
this->input_items[i].size() = items;
this->work_input_items[i] = mem;
this->work_ninput_items[i] = items;
+ if (this->enable_fixed_rate)
+ {
+ items = std::max<int>(0, int(items) - int(this->input_configs[i].lookahead_items));
+ }
num_input_items = std::min(num_input_items, items);
this->consume_items[i] = 0;
this->consume_called[i] = false;
@@ -184,11 +190,12 @@ void BlockActor::handle_task(void)
{
if (fcast_ninput_items[i] <= work_ninput_items[i]) continue;
+ const size_t work_noutput_items_last = work_noutput_items;
work_noutput_items = work_noutput_items/2; //backoff regime
work_noutput_items += this->output_multiple_items-1;
work_noutput_items /= this->output_multiple_items;
work_noutput_items *= this->output_multiple_items;
- if (work_noutput_items) goto forecast_again_you_jerk;
+ if (work_noutput_items and work_noutput_items_last != work_noutput_items) goto forecast_again_you_jerk;
//handle the case of forecast failing
//TODO accumulate input here, only done if inputs done and already accumulated
@@ -210,6 +217,7 @@ void BlockActor::handle_task(void)
this->task_work();
}
const size_t noutput_items = size_t(work_ret);
+ VAR(work_ret);
if (work_ret == Block::WORK_DONE)
{