From 89de2759ba95682f51865dd06b7509e48969dc96 Mon Sep 17 00:00:00 2001
From: Josh Blum
Date: Sat, 22 Sep 2012 12:41:12 -0400
Subject: work on consume/produce implementation

---
 lib/block.cpp      |  6 +++---
 lib/block_task.cpp | 11 ++++++-----
 lib/element.cpp    |  2 +-
 3 files changed, 10 insertions(+), 9 deletions(-)

(limited to 'lib')

diff --git a/lib/block.cpp b/lib/block.cpp
index c391c05..b50c49d 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -95,7 +95,7 @@ void Block::set_output_multiple(const size_t multiple, const size_t which_output
 
 void Block::consume(const size_t which_input, const size_t how_many_items)
 {
-    (*this)->consume_items[which_input] = how_many_items;
+    (*this)->consume_items[which_input] += how_many_items;
     (*this)->consume_called[which_input] = true;
 }
 
@@ -103,14 +103,14 @@ void Block::consume_each(const size_t how_many_items)
 {
     for (size_t i = 0; i < (*this)->consume_items.size(); i++)
     {
-        (*this)->consume_items[i] = how_many_items;
+        (*this)->consume_items[i] += how_many_items;
         (*this)->consume_called[i] = true;
     }
 }
 
 void Block::produce(const size_t which_output, const size_t how_many_items)
 {
-    (*this)->produce_items[which_output] = how_many_items;
+    (*this)->produce_items[which_output] += how_many_items;
 }
 
 void Block::set_input_inline(const size_t which_input, const bool enb)
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 96c6b7a..f6f3dcb 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -65,7 +65,7 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
 
     if (ARMAGEDDON) std::cerr
         << "==================================================\n"
-        << "== The " << name << " is done...\n"
+        << "== The " << name << " " << unique_id << " is done...\n"
         << "==================================================\n"
         << std::flush;
 }
@@ -117,6 +117,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
         this->work_input_items[i] = mem;
         this->work_ninput_items[i] = items;
         num_input_items = std::min(num_input_items, items);
+        this->consume_items[i] = 0;
         this->consume_called[i] = false;
 
         //inline dealings, how and when input buffers can be inlined into output buffers
@@ -155,6 +156,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
         this->output_items[i].size() = items;
         this->work_output_items[i] = mem;
         num_output_items = std::min(num_output_items, items);
+        this->produce_items[i] = 0;
     }
 
     //if we have outputs and at least one port has no downstream subscibers, mark done
@@ -194,7 +196,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
     //-- the work
     //------------------------------------------------------------------
     work_noutput_items = num_output_items;
-    if (this->enable_fixed_rate) work_noutput_items = std::min(
+    /*if (this->enable_fixed_rate)*/ work_noutput_items = std::min(
         work_noutput_items, myulround((num_input_items)*this->relative_rate));
     this->work_task_iface = task_iface;
     this->work_ret = -1;
@@ -221,8 +223,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
     for (size_t i = 0; i < num_inputs; i++)
     {
         ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE);
-        const size_t items = (this->consume_called[i])? this->consume_items[i] : (myulround((noutput_items/this->relative_rate)));
-        this->consume_items[i] = 0;
+        const bool use_consume = (not this->enable_fixed_rate) or (this->consume_called[i]);
+        const size_t items = (use_consume)? this->consume_items[i] : (myulround((noutput_items/this->relative_rate)));
 
         this->items_consumed[i] += items;
         const size_t bytes = items*this->input_items_sizes[i];
@@ -237,7 +239,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
     for (size_t i = 0; i < num_outputs; i++)
     {
         const size_t items = (work_ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items;
-        this->produce_items[i] = 0;
         if (items == 0) continue;
 
         SBuffer &buff = this->output_queues.front(i);
diff --git a/lib/element.cpp b/lib/element.cpp
index 5ae20e0..b416261 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -33,7 +33,7 @@ Element::Element(const std::string &name)
     (*this)->name = name;
     (*this)->unique_id = ++unique_id_pool;
 
-    if (GENESIS) std::cerr << "New element: " << name << std::endl;
+    if (GENESIS) std::cerr << "New element: " << name << " " << (*this)->unique_id << std::endl;
 
     //default io signature to something
     IOSignature sig; sig.push_back(1);
-- 
cgit