summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-10-06 13:42:52 -0700
committerJosh Blum2012-10-06 13:42:52 -0700
commitfcf2261594e4bbc084c757e975e536428b4a2d0d (patch)
treeeb2116ccf00fbc4630c88b2993d3a016e3619914 /lib/block_task.cpp
parent5b7a0489def5c41c6b1a1569d4dd259788097605 (diff)
downloadsandhi-fcf2261594e4bbc084c757e975e536428b4a2d0d.tar.gz
sandhi-fcf2261594e4bbc084c757e975e536428b4a2d0d.tar.bz2
sandhi-fcf2261594e4bbc084c757e975e536428b4a2d0d.zip
reimplement the output multiple items/work output items calculations
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp36
1 files changed, 28 insertions, 8 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 39b0327..c1046c3 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -142,30 +142,53 @@ void BlockActor::handle_task(void)
const SBuffer &buff = this->output_queues.front(i);
void *mem = buff.get(buff.length);
const size_t bytes = buff.get_actual_length() - buff.length - buff.offset;
- const size_t items = bytes/this->output_items_sizes[i];
+ size_t items = bytes/this->output_items_sizes[i];
this->work_io_ptr_mask |= ptrdiff_t(mem);
this->output_items[i].get() = mem;
this->output_items[i].size() = items;
this->work_output_items[i] = mem;
+
+ items /= this->output_multiple_items;
+ items *= this->output_multiple_items;
num_output_items = std::min(num_output_items, items);
this->produce_items[i] = 0;
}
//------------------------------------------------------------------
+ //-- calculate the work_noutput_items given:
+ //-- min of num_input_items
+ //-- min of num_output_items
+ //-- relative rate and output multiple items
+ //------------------------------------------------------------------
+ work_noutput_items = num_output_items;
+ if (num_inputs and (this->enable_fixed_rate or not num_outputs))
+ {
+ size_t calc_output_items = size_t(num_input_items*this->relative_rate);
+ calc_output_items += this->output_multiple_items-1;
+ calc_output_items /= this->output_multiple_items;
+ calc_output_items *= this->output_multiple_items;
+ if (calc_output_items and calc_output_items < work_noutput_items)
+ work_noutput_items = calc_output_items;
+ }
+
+ //------------------------------------------------------------------
//-- forecast
//------------------------------------------------------------------
if (this->forecast_enable)
{
forecast_again_you_jerk:
- fcast_ninput_items = work_ninput_items;
- block_ptr->forecast(num_output_items, fcast_ninput_items);
+ fcast_ninput_items = work_ninput_items; //init for NOP case
+ block_ptr->forecast(work_noutput_items, fcast_ninput_items);
for (size_t i = 0; i < num_inputs; i++)
{
if (fcast_ninput_items[i] <= work_ninput_items[i]) continue;
- num_output_items = num_output_items/2; //backoff regime
- if (num_output_items) goto forecast_again_you_jerk;
+ work_noutput_items = work_noutput_items/2; //backoff regime
+ work_noutput_items += this->output_multiple_items-1;
+ work_noutput_items /= this->output_multiple_items;
+ work_noutput_items *= this->output_multiple_items;
+ if (work_noutput_items) goto forecast_again_you_jerk;
//handle the case of forecast failing
this->mark_done();
@@ -176,9 +199,6 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
- work_noutput_items = num_output_items;
- if (this->enable_fixed_rate) work_noutput_items = std::min(
- work_noutput_items, myulround((num_input_items)*this->relative_rate));
this->work_ret = -1;
if (this->interruptible_thread)
{