summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-11-01 21:15:03 -0700
committerJosh Blum2012-11-01 21:15:03 -0700
commitb22c63e756d50723188fdd25983868ea1f67bd05 (patch)
tree8c2f8672ca8e4284b9cd250d85b235011fc3d675 /lib/block_task.cpp
parent1581c3925a8d1e2057b9864f0bfe59e7f5fbedfb (diff)
downloadsandhi-b22c63e756d50723188fdd25983868ea1f67bd05.tar.gz
sandhi-b22c63e756d50723188fdd25983868ea1f67bd05.tar.bz2
sandhi-b22c63e756d50723188fdd25983868ea1f67bd05.zip
moved out a bunch of work, rate, fcast logic
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp80
1 files changed, 2 insertions, 78 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index f3ddb04..01a9072 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -17,8 +17,6 @@
#include <gras_impl/block_actor.hpp>
#include "tag_handlers.hpp"
-#define REALLY_BIG size_t(1 << 30)
-
using namespace gras;
void BlockActor::mark_done(void)
@@ -110,7 +108,6 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- initialize input buffers before work
//------------------------------------------------------------------
- size_t num_input_items = REALLY_BIG; //so big that it must std::min
size_t output_inline_index = 0;
for (size_t i = 0; i < num_inputs; i++)
{
@@ -122,24 +119,10 @@ void BlockActor::handle_task(void)
void *mem = buff.get();
size_t items = buff.length/this->input_items_sizes[i];
- this->work_io_ptr_mask |= ptrdiff_t(mem);
this->input_items[i].get() = mem;
this->input_items[i].size() = items;
- this->work_input_items[i] = mem;
- this->work_ninput_items[i] = items;
- if (this->enable_fixed_rate)
- {
- if (items <= this->input_configs[i].lookahead_items)
- {
- this->input_fail(i); return;
- }
- items -= this->input_configs[i].lookahead_items;
- }
-
- num_input_items = std::min(num_input_items, items);
this->consume_items[i] = 0;
- this->consume_called[i] = false;
//inline dealings, how and when input buffers can be inlined into output buffers
//continue;
@@ -160,7 +143,6 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- initialize output buffers before work
//------------------------------------------------------------------
- size_t num_output_items = REALLY_BIG; //so big that it must std::min
for (size_t i = 0; i < num_outputs; i++)
{
ASSERT(this->output_queues.ready(i));
@@ -169,69 +151,15 @@ void BlockActor::handle_task(void)
const size_t bytes = buff.get_actual_length() - buff.length - buff.offset;
size_t items = bytes/this->output_items_sizes[i];
- this->work_io_ptr_mask |= ptrdiff_t(mem);
this->output_items[i].get() = mem;
this->output_items[i].size() = items;
- this->work_output_items[i] = mem;
- items /= this->output_multiple_items;
- items *= this->output_multiple_items;
- num_output_items = std::min(num_output_items, items);
this->produce_items[i] = 0;
}
//------------------------------------------------------------------
- //-- calculate the work_noutput_items given:
- //-- min of num_input_items
- //-- min of num_output_items
- //-- relative rate and output multiple items
- //------------------------------------------------------------------
- work_noutput_items = num_output_items;
- if (num_inputs and (this->enable_fixed_rate or not num_outputs))
- {
- size_t calc_output_items = size_t(num_input_items*this->relative_rate);
- calc_output_items += this->output_multiple_items-1;
- calc_output_items /= this->output_multiple_items;
- calc_output_items *= this->output_multiple_items;
- if (calc_output_items and calc_output_items < work_noutput_items)
- work_noutput_items = calc_output_items;
- }
-
- //------------------------------------------------------------------
- //-- forecast
- //------------------------------------------------------------------
- //VAR(work_noutput_items);
- if (this->forecast_enable)
- {
- forecast_again_you_jerk:
- fcast_ninput_items = work_ninput_items; //init for NOP case
- block_ptr->forecast(work_noutput_items, fcast_ninput_items);
- for (size_t i = 0; i < num_inputs; i++)
- {
- if (fcast_ninput_items[i] <= work_ninput_items[i]) continue;
-
- //handle the case of forecast failing
- if (work_noutput_items <= this->output_multiple_items)
- {
- this->input_fail(i); return;
- }
-
- work_noutput_items = work_noutput_items/2; //backoff regime
- work_noutput_items += this->output_multiple_items-1;
- work_noutput_items /= this->output_multiple_items;
- work_noutput_items *= this->output_multiple_items;
- goto forecast_again_you_jerk;
- }
- }
-
- //workaround:
- if (num_outputs) output_items[0].size() = work_noutput_items;
- else input_items[0].size() = work_noutput_items;
-
- //------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
- //VAR(work_noutput_items);
this->work_ret = -1;
if (this->interruptible_thread)
{
@@ -241,8 +169,6 @@ void BlockActor::handle_task(void)
{
this->task_work();
}
- const size_t noutput_items = size_t(work_ret);
- //VAR(work_ret);
if (work_ret == Block::WORK_DONE)
{
@@ -255,9 +181,7 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
for (size_t i = 0; i < num_inputs; i++)
{
- ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE);
- const bool use_consume = (not this->enable_fixed_rate) or (this->consume_called[i]);
- const size_t items = (use_consume)? this->consume_items[i] : (myulround((noutput_items/this->relative_rate)));
+ const size_t items = this->consume_items[i];
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
@@ -271,7 +195,7 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
for (size_t i = 0; i < num_outputs; i++)
{
- const size_t items = (work_ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items;
+ const size_t items = this->produce_items[i];
if (items == 0) continue;
SBuffer &buff = this->output_queues.front(i);