From ac3857575c4c762f9a18ee18889740d4360a9aa8 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Tue, 28 Aug 2012 23:05:41 -0700 Subject: token work w/ messages to implement finite runs --- lib/block_task.cpp | 52 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) (limited to 'lib/block_task.cpp') diff --git a/lib/block_task.cpp b/lib/block_task.cpp index f156ae0..577fdb7 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -20,8 +20,44 @@ using namespace gnuradio; +void ElementImpl::free_inputs(const tsbe::TaskInterface &task_iface) +{ + for (size_t i = 0; i < task_iface.get_num_inputs(); i++) + { + while (task_iface.get_input_buffer(i)) + { + task_iface.pop_input_buffer(i); + } + } +} + +void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) +{ + if (not this->active) return; + this->active = false; + this->token_pool.clear(); + this->token.reset(); + this->free_inputs(task_iface); + for (size_t i = 0; i < task_iface.get_num_inputs(); i++) + { + task_iface.post_upstream(i, CheckTokensMessage()); + } + for (size_t i = 0; i < task_iface.get_num_outputs(); i++) + { + task_iface.post_downstream(i, CheckTokensMessage()); + } + HERE(); + VAR(name); +} + void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) { + //FIXME in case we get called in the inactive state, assuming done? + if (not this->active) + { + this->free_inputs(task_iface); + } + //------------------------------------------------------------------ //-- Decide if its possible to continue any processing: //-- Handle task may get called for incoming buffers, @@ -53,8 +89,10 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) std::cout << "calling work on " << name << std::endl; //reset work trackers for production/consumption + size_t input_tokens_count = 0; for (size_t i = 0; i < num_inputs; i++) { + input_tokens_count += this->input_tokens[i].use_count(); //this->consume_items[i] = 0; ASSERT(this->input_history_items[i] == 0); @@ -69,9 +107,12 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->work_input_items[i] = mem; this->work_ninput_items[i] = items; } + size_t num_output_items = ~0; //so big that it must std::min + size_t output_tokens_count = 0; for (size_t i = 0; i < num_outputs; i++) { + output_tokens_count += this->output_tokens[i].use_count(); //this->produce_items[i] = 0; const tsbe::Buffer &buff = task_iface.get_output_buffer(i); @@ -85,13 +126,22 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) num_output_items = std::min(num_output_items, items); } + //someone upstream or downstream holds no tokens, we are done! + if ( + (num_inputs != 0 and input_tokens_count == num_inputs) or + (num_outputs != 0 and output_tokens_count == num_outputs) + ){ + this->mark_done(task_iface); + return; + } + //start with source, this should be EZ int ret = 0; ret = block_ptr->Work(this->input_items, this->output_items); VAR(ret); if (ret == Block::WORK_DONE) { - this->active = false; + this->mark_done(task_iface); return; } const size_t noutput_items = size_t(ret); -- cgit