diff options
author | Josh Blum | 2012-09-22 12:41:12 -0400 |
---|---|---|
committer | Josh Blum | 2012-09-22 12:41:12 -0400 |
commit | 89de2759ba95682f51865dd06b7509e48969dc96 (patch) | |
tree | 74778847d71d473db9f92c35af68328356a0876d /lib/block_task.cpp | |
parent | 8583c68cf63c8fbaaa01b9ee43b2b96c95c6e34e (diff) | |
download | sandhi-89de2759ba95682f51865dd06b7509e48969dc96.tar.gz sandhi-89de2759ba95682f51865dd06b7509e48969dc96.tar.bz2 sandhi-89de2759ba95682f51865dd06b7509e48969dc96.zip |
work on consume/produce implementation
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 96c6b7a..f6f3dcb 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -65,7 +65,7 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) if (ARMAGEDDON) std::cerr << "==================================================\n" - << "== The " << name << " is done...\n" + << "== The " << name << " " << unique_id << " is done...\n" << "==================================================\n" << std::flush; } @@ -117,6 +117,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->work_input_items[i] = mem; this->work_ninput_items[i] = items; num_input_items = std::min(num_input_items, items); + this->consume_items[i] = 0; this->consume_called[i] = false; //inline dealings, how and when input buffers can be inlined into output buffers @@ -155,6 +156,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->output_items[i].size() = items; this->work_output_items[i] = mem; num_output_items = std::min(num_output_items, items); + this->produce_items[i] = 0; } //if we have outputs and at least one port has no downstream subscibers, mark done @@ -194,7 +196,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //-- the work //------------------------------------------------------------------ work_noutput_items = num_output_items; - if (this->enable_fixed_rate) work_noutput_items = std::min( + /*if (this->enable_fixed_rate)*/ work_noutput_items = std::min( work_noutput_items, myulround((num_input_items)*this->relative_rate)); this->work_task_iface = task_iface; this->work_ret = -1; @@ -221,8 +223,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) for (size_t i = 0; i < num_inputs; i++) { ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE); - const size_t items = (this->consume_called[i])? this->consume_items[i] : (myulround((noutput_items/this->relative_rate))); - this->consume_items[i] = 0; + const bool use_consume = (not this->enable_fixed_rate) or (this->consume_called[i]); + const size_t items = (use_consume)? this->consume_items[i] : (myulround((noutput_items/this->relative_rate))); this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; @@ -237,7 +239,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) for (size_t i = 0; i < num_outputs; i++) { const size_t items = (work_ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items; - this->produce_items[i] = 0; if (items == 0) continue; SBuffer &buff = this->output_queues.front(i); |