summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp48
1 files changed, 39 insertions, 9 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index c1046c3..a291c01 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -25,6 +25,8 @@ void BlockActor::mark_done(void)
{
if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
+ this->block_ptr->stop();
+
//flush partial output buffers to the downstream
for (size_t i = 0; i < this->get_num_outputs(); i++)
{
@@ -71,6 +73,19 @@ void BlockActor::mark_done(void)
<< std::flush;
}
+GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i)
+{
+ //input failed, accumulate and try again
+ if (not this->input_queues.is_accumulated(i))
+ {
+ this->input_queues.accumulate(i, this->input_items_sizes[i]);
+ this->Push(SelfKickMessage(), Theron::Address());
+ return;
+ }
+ //otherwise check for done, else wait for more
+ if (this->inputs_done[i]) this->mark_done();
+}
+
void BlockActor::handle_task(void)
{
#ifdef WORK_DEBUG
@@ -102,16 +117,26 @@ void BlockActor::handle_task(void)
this->sort_tags(i);
ASSERT(this->input_queues.ready(i));
- bool potential_inline;
- const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate, potential_inline);
+ //this->input_queues.accumulate(i, this->input_items_sizes[i]);
+ const SBuffer &buff = this->input_queues.front(i);
void *mem = buff.get();
- const size_t items = buff.length/this->input_items_sizes[i];
+ size_t items = buff.length/this->input_items_sizes[i];
this->work_io_ptr_mask |= ptrdiff_t(mem);
this->input_items[i].get() = mem;
this->input_items[i].size() = items;
this->work_input_items[i] = mem;
this->work_ninput_items[i] = items;
+
+ if (this->enable_fixed_rate)
+ {
+ if (items <= this->input_configs[i].lookahead_items)
+ {
+ this->input_fail(i); return;
+ }
+ items -= this->input_configs[i].lookahead_items;
+ }
+
num_input_items = std::min(num_input_items, items);
this->consume_items[i] = 0;
this->consume_called[i] = false;
@@ -119,7 +144,7 @@ void BlockActor::handle_task(void)
//inline dealings, how and when input buffers can be inlined into output buffers
//continue;
if (
- potential_inline and
+ buff.unique() and
input_configs[i].inline_buffer and
output_inline_index < num_outputs and
buff.get_affinity() == this->buffer_affinity
@@ -175,6 +200,7 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- forecast
//------------------------------------------------------------------
+ //VAR(work_noutput_items);
if (this->forecast_enable)
{
forecast_again_you_jerk:
@@ -184,21 +210,24 @@ void BlockActor::handle_task(void)
{
if (fcast_ninput_items[i] <= work_ninput_items[i]) continue;
+ //handle the case of forecast failing
+ if (work_noutput_items <= this->output_multiple_items)
+ {
+ this->input_fail(i); return;
+ }
+
work_noutput_items = work_noutput_items/2; //backoff regime
work_noutput_items += this->output_multiple_items-1;
work_noutput_items /= this->output_multiple_items;
work_noutput_items *= this->output_multiple_items;
- if (work_noutput_items) goto forecast_again_you_jerk;
-
- //handle the case of forecast failing
- this->mark_done();
- return;
+ goto forecast_again_you_jerk;
}
}
//------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
+ //VAR(work_noutput_items);
this->work_ret = -1;
if (this->interruptible_thread)
{
@@ -209,6 +238,7 @@ void BlockActor::handle_task(void)
this->task_work();
}
const size_t noutput_items = size_t(work_ret);
+ //VAR(work_ret);
if (work_ret == Block::WORK_DONE)
{