From 89de2759ba95682f51865dd06b7509e48969dc96 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Sat, 22 Sep 2012 12:41:12 -0400 Subject: work on consume/produce implementation --- lib/block.cpp | 6 +++--- lib/block_task.cpp | 11 ++++++----- lib/element.cpp | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) (limited to 'lib') diff --git a/lib/block.cpp b/lib/block.cpp index c391c05..b50c49d 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -95,7 +95,7 @@ void Block::set_output_multiple(const size_t multiple, const size_t which_output void Block::consume(const size_t which_input, const size_t how_many_items) { - (*this)->consume_items[which_input] = how_many_items; + (*this)->consume_items[which_input] += how_many_items; (*this)->consume_called[which_input] = true; } @@ -103,14 +103,14 @@ void Block::consume_each(const size_t how_many_items) { for (size_t i = 0; i < (*this)->consume_items.size(); i++) { - (*this)->consume_items[i] = how_many_items; + (*this)->consume_items[i] += how_many_items; (*this)->consume_called[i] = true; } } void Block::produce(const size_t which_output, const size_t how_many_items) { - (*this)->produce_items[which_output] = how_many_items; + (*this)->produce_items[which_output] += how_many_items; } void Block::set_input_inline(const size_t which_input, const bool enb) diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 96c6b7a..f6f3dcb 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -65,7 +65,7 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) if (ARMAGEDDON) std::cerr << "==================================================\n" - << "== The " << name << " is done...\n" + << "== The " << name << " " << unique_id << " is done...\n" << "==================================================\n" << std::flush; } @@ -117,6 +117,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->work_input_items[i] = mem; this->work_ninput_items[i] = items; num_input_items = std::min(num_input_items, items); + this->consume_items[i] = 0; this->consume_called[i] = false; //inline dealings, how and when input buffers can be inlined into output buffers @@ -155,6 +156,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->output_items[i].size() = items; this->work_output_items[i] = mem; num_output_items = std::min(num_output_items, items); + this->produce_items[i] = 0; } //if we have outputs and at least one port has no downstream subscibers, mark done @@ -194,7 +196,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //-- the work //------------------------------------------------------------------ work_noutput_items = num_output_items; - if (this->enable_fixed_rate) work_noutput_items = std::min( + /*if (this->enable_fixed_rate)*/ work_noutput_items = std::min( work_noutput_items, myulround((num_input_items)*this->relative_rate)); this->work_task_iface = task_iface; this->work_ret = -1; @@ -221,8 +223,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) for (size_t i = 0; i < num_inputs; i++) { ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE); - const size_t items = (this->consume_called[i])? this->consume_items[i] : (myulround((noutput_items/this->relative_rate))); - this->consume_items[i] = 0; + const bool use_consume = (not this->enable_fixed_rate) or (this->consume_called[i]); + const size_t items = (use_consume)? this->consume_items[i] : (myulround((noutput_items/this->relative_rate))); this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; @@ -237,7 +239,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) for (size_t i = 0; i < num_outputs; i++) { const size_t items = (work_ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items; - this->produce_items[i] = 0; if (items == 0) continue; SBuffer &buff = this->output_queues.front(i); diff --git a/lib/element.cpp b/lib/element.cpp index 5ae20e0..b416261 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -33,7 +33,7 @@ Element::Element(const std::string &name) (*this)->name = name; (*this)->unique_id = ++unique_id_pool; - if (GENESIS) std::cerr << "New element: " << name << std::endl; + if (GENESIS) std::cerr << "New element: " << name << " " << (*this)->unique_id << std::endl; //default io signature to something IOSignature sig; sig.push_back(1); -- cgit