From e09df5e1170d4282d89e7b62cc75baa311c18da9 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Fri, 31 Aug 2012 01:18:45 -0700 Subject: done logic tweaks, comments, cleanup --- lib/block_task.cpp | 77 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 33 deletions(-) (limited to 'lib/block_task.cpp') diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 3ec9113..61ed5dc 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -34,8 +34,22 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) this->output_buffer_tokens.clear(); //release all buffers in queues - this->input_queues.clear(); - this->output_queues.clear(); + for (size_t i = 0; i < task_iface.get_num_inputs(); i++) + { + while (not this->input_queues[i].empty()) + { + this->input_queues[i].pop(); + } + this->inputs_ready.set(i, false); + } + for (size_t i = 0; i < task_iface.get_num_outputs(); i++) + { + while (not this->output_queues[i].empty()) + { + this->output_queues[i].pop(); + } + this->outputs_ready.set(i, false); + } //tell the upstream and downstram to re-check their tokens //this is how the other blocks know who is interested, @@ -64,6 +78,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //-- however, not all ports may have available buffers. //------------------------------------------------------------------ if (not (this->block_state == BLOCK_STATE_LIVE and this->all_io_ready())) return; + //std::cout << "calling work on " << name << std::endl; const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); @@ -81,12 +96,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) } //------------------------------------------------------------------ - //-- Processing time! + //-- initialize input buffers before work //------------------------------------------------------------------ - - std::cout << "calling work on " << name << std::endl; - - //reset work trackers for production/consumption size_t input_tokens_count = 0; for (size_t i = 0; i < num_inputs; i++) { @@ -109,6 +120,9 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->work_ninput_items[i] = items; } + //------------------------------------------------------------------ + //-- initialize output buffers before work + //------------------------------------------------------------------ size_t num_output_items = ~0; //so big that it must std::min size_t output_tokens_count = 0; for (size_t i = 0; i < num_outputs; i++) @@ -129,27 +143,27 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) num_output_items = std::min(num_output_items, items); } - //someone upstream or downstream holds no tokens, we are done! - if ( - (num_inputs != 0 and input_tokens_count == num_inputs) or - (num_outputs != 0 and output_tokens_count == num_outputs) - ){ + //if we have outputs and at least one port has no downstream subscibers, mark done + if ((num_outputs != 0 and output_tokens_count == num_outputs)){ this->mark_done(task_iface); return; } - //start with source, this should be EZ - int ret = 0; - ret = block_ptr->Work(this->input_items, this->output_items); - //VAR(ret); + //------------------------------------------------------------------ + //-- forecast (TODO) and work + //------------------------------------------------------------------ + const int ret = block_ptr->Work(this->input_items, this->output_items); + const size_t noutput_items = size_t(ret); + if (ret == Block::WORK_DONE) { this->mark_done(task_iface); return; } - const size_t noutput_items = size_t(ret); - //now to deal with consumption and production + //------------------------------------------------------------------ + //-- process input consumption + //------------------------------------------------------------------ for (size_t i = 0; i < num_inputs; i++) { ASSERT(enable_fixed_rate or ret != Block::WORK_CALLED_PRODUCE); @@ -168,6 +182,10 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->input_buff_offsets[i] = 0; } } + + //------------------------------------------------------------------ + //-- process output production + //------------------------------------------------------------------ for (size_t i = 0; i < num_outputs; i++) { const size_t items = (ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items; @@ -182,19 +200,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->outputs_ready.set(i, not this->output_queues[i].empty()); } - //0) figure out what we have for input data - //1) calculate the possible num output items - //2) take into account the item multiple - //3) allocate some buffers - //4) work.... - - //block_ptr->forecast(100 - - - //TODO set deactive when work returns DONE - - //TODO source blocks should call work again until exhausted output buffers - //------------------------------------------------------------------ //-- trim the input tags that are past the consumption zone //-- and post trimmed tags to the downstream based on policy @@ -253,7 +258,13 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->output_tags[i].clear(); } - //create a self-kick so we get called again - //TODO, we could just steal this thread context... + //if there are inputs, and not all are provided for, and we have an empty queue, mark done + if ((num_inputs != 0 and input_tokens_count == num_inputs and not (~this->inputs_ready).none())) + { + this->mark_done(task_iface); + return; + } + + //still have IO ready? kick off another task if (this->all_io_ready()) this->block.post_msg(SelfKickMessage()); } -- cgit