diff options
author | Josh Blum | 2012-11-01 21:15:03 -0700 |
---|---|---|
committer | Josh Blum | 2012-11-01 21:15:03 -0700 |
commit | b22c63e756d50723188fdd25983868ea1f67bd05 (patch) | |
tree | 8c2f8672ca8e4284b9cd250d85b235011fc3d675 /lib/block_task.cpp | |
parent | 1581c3925a8d1e2057b9864f0bfe59e7f5fbedfb (diff) | |
download | sandhi-b22c63e756d50723188fdd25983868ea1f67bd05.tar.gz sandhi-b22c63e756d50723188fdd25983868ea1f67bd05.tar.bz2 sandhi-b22c63e756d50723188fdd25983868ea1f67bd05.zip |
moved out a bunch of work, rate, fcast logic
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 80 |
1 files changed, 2 insertions, 78 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index f3ddb04..01a9072 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -17,8 +17,6 @@ #include <gras_impl/block_actor.hpp> #include "tag_handlers.hpp" -#define REALLY_BIG size_t(1 << 30) - using namespace gras; void BlockActor::mark_done(void) @@ -110,7 +108,6 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- initialize input buffers before work //------------------------------------------------------------------ - size_t num_input_items = REALLY_BIG; //so big that it must std::min size_t output_inline_index = 0; for (size_t i = 0; i < num_inputs; i++) { @@ -122,24 +119,10 @@ void BlockActor::handle_task(void) void *mem = buff.get(); size_t items = buff.length/this->input_items_sizes[i]; - this->work_io_ptr_mask |= ptrdiff_t(mem); this->input_items[i].get() = mem; this->input_items[i].size() = items; - this->work_input_items[i] = mem; - this->work_ninput_items[i] = items; - if (this->enable_fixed_rate) - { - if (items <= this->input_configs[i].lookahead_items) - { - this->input_fail(i); return; - } - items -= this->input_configs[i].lookahead_items; - } - - num_input_items = std::min(num_input_items, items); this->consume_items[i] = 0; - this->consume_called[i] = false; //inline dealings, how and when input buffers can be inlined into output buffers //continue; @@ -160,7 +143,6 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- initialize output buffers before work //------------------------------------------------------------------ - size_t num_output_items = REALLY_BIG; //so big that it must std::min for (size_t i = 0; i < num_outputs; i++) { ASSERT(this->output_queues.ready(i)); @@ -169,69 +151,15 @@ void BlockActor::handle_task(void) const size_t bytes = buff.get_actual_length() - buff.length - buff.offset; size_t items = bytes/this->output_items_sizes[i]; - this->work_io_ptr_mask |= ptrdiff_t(mem); this->output_items[i].get() = mem; this->output_items[i].size() = items; - this->work_output_items[i] = mem; - items /= this->output_multiple_items; - items *= this->output_multiple_items; - num_output_items = std::min(num_output_items, items); this->produce_items[i] = 0; } //------------------------------------------------------------------ - //-- calculate the work_noutput_items given: - //-- min of num_input_items - //-- min of num_output_items - //-- relative rate and output multiple items - //------------------------------------------------------------------ - work_noutput_items = num_output_items; - if (num_inputs and (this->enable_fixed_rate or not num_outputs)) - { - size_t calc_output_items = size_t(num_input_items*this->relative_rate); - calc_output_items += this->output_multiple_items-1; - calc_output_items /= this->output_multiple_items; - calc_output_items *= this->output_multiple_items; - if (calc_output_items and calc_output_items < work_noutput_items) - work_noutput_items = calc_output_items; - } - - //------------------------------------------------------------------ - //-- forecast - //------------------------------------------------------------------ - //VAR(work_noutput_items); - if (this->forecast_enable) - { - forecast_again_you_jerk: - fcast_ninput_items = work_ninput_items; //init for NOP case - block_ptr->forecast(work_noutput_items, fcast_ninput_items); - for (size_t i = 0; i < num_inputs; i++) - { - if (fcast_ninput_items[i] <= work_ninput_items[i]) continue; - - //handle the case of forecast failing - if (work_noutput_items <= this->output_multiple_items) - { - this->input_fail(i); return; - } - - work_noutput_items = work_noutput_items/2; //backoff regime - work_noutput_items += this->output_multiple_items-1; - work_noutput_items /= this->output_multiple_items; - work_noutput_items *= this->output_multiple_items; - goto forecast_again_you_jerk; - } - } - - //workaround: - if (num_outputs) output_items[0].size() = work_noutput_items; - else input_items[0].size() = work_noutput_items; - - //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ - //VAR(work_noutput_items); this->work_ret = -1; if (this->interruptible_thread) { @@ -241,8 +169,6 @@ void BlockActor::handle_task(void) { this->task_work(); } - const size_t noutput_items = size_t(work_ret); - //VAR(work_ret); if (work_ret == Block::WORK_DONE) { @@ -255,9 +181,7 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ for (size_t i = 0; i < num_inputs; i++) { - ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE); - const bool use_consume = (not this->enable_fixed_rate) or (this->consume_called[i]); - const size_t items = (use_consume)? this->consume_items[i] : (myulround((noutput_items/this->relative_rate))); + const size_t items = this->consume_items[i]; this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; @@ -271,7 +195,7 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ for (size_t i = 0; i < num_outputs; i++) { - const size_t items = (work_ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items; + const size_t items = this->produce_items[i]; if (items == 0) continue; SBuffer &buff = this->output_queues.front(i); |