summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-09-22 12:41:12 -0400
committerJosh Blum2012-09-22 12:41:12 -0400
commit89de2759ba95682f51865dd06b7509e48969dc96 (patch)
tree74778847d71d473db9f92c35af68328356a0876d /lib
parent8583c68cf63c8fbaaa01b9ee43b2b96c95c6e34e (diff)
downloadsandhi-89de2759ba95682f51865dd06b7509e48969dc96.tar.gz
sandhi-89de2759ba95682f51865dd06b7509e48969dc96.tar.bz2
sandhi-89de2759ba95682f51865dd06b7509e48969dc96.zip
work on consume/produce implementation
Diffstat (limited to 'lib')
-rw-r--r--lib/block.cpp6
-rw-r--r--lib/block_task.cpp11
-rw-r--r--lib/element.cpp2
3 files changed, 10 insertions, 9 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index c391c05..b50c49d 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -95,7 +95,7 @@ void Block::set_output_multiple(const size_t multiple, const size_t which_output
void Block::consume(const size_t which_input, const size_t how_many_items)
{
- (*this)->consume_items[which_input] = how_many_items;
+ (*this)->consume_items[which_input] += how_many_items;
(*this)->consume_called[which_input] = true;
}
@@ -103,14 +103,14 @@ void Block::consume_each(const size_t how_many_items)
{
for (size_t i = 0; i < (*this)->consume_items.size(); i++)
{
- (*this)->consume_items[i] = how_many_items;
+ (*this)->consume_items[i] += how_many_items;
(*this)->consume_called[i] = true;
}
}
void Block::produce(const size_t which_output, const size_t how_many_items)
{
- (*this)->produce_items[which_output] = how_many_items;
+ (*this)->produce_items[which_output] += how_many_items;
}
void Block::set_input_inline(const size_t which_input, const bool enb)
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 96c6b7a..f6f3dcb 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -65,7 +65,7 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
if (ARMAGEDDON) std::cerr
<< "==================================================\n"
- << "== The " << name << " is done...\n"
+ << "== The " << name << " " << unique_id << " is done...\n"
<< "==================================================\n"
<< std::flush;
}
@@ -117,6 +117,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->work_input_items[i] = mem;
this->work_ninput_items[i] = items;
num_input_items = std::min(num_input_items, items);
+ this->consume_items[i] = 0;
this->consume_called[i] = false;
//inline dealings, how and when input buffers can be inlined into output buffers
@@ -155,6 +156,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->output_items[i].size() = items;
this->work_output_items[i] = mem;
num_output_items = std::min(num_output_items, items);
+ this->produce_items[i] = 0;
}
//if we have outputs and at least one port has no downstream subscibers, mark done
@@ -194,7 +196,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//-- the work
//------------------------------------------------------------------
work_noutput_items = num_output_items;
- if (this->enable_fixed_rate) work_noutput_items = std::min(
+ /*if (this->enable_fixed_rate)*/ work_noutput_items = std::min(
work_noutput_items, myulround((num_input_items)*this->relative_rate));
this->work_task_iface = task_iface;
this->work_ret = -1;
@@ -221,8 +223,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
for (size_t i = 0; i < num_inputs; i++)
{
ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE);
- const size_t items = (this->consume_called[i])? this->consume_items[i] : (myulround((noutput_items/this->relative_rate)));
- this->consume_items[i] = 0;
+ const bool use_consume = (not this->enable_fixed_rate) or (this->consume_called[i]);
+ const size_t items = (use_consume)? this->consume_items[i] : (myulround((noutput_items/this->relative_rate)));
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
@@ -237,7 +239,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
for (size_t i = 0; i < num_outputs; i++)
{
const size_t items = (work_ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items;
- this->produce_items[i] = 0;
if (items == 0) continue;
SBuffer &buff = this->output_queues.front(i);
diff --git a/lib/element.cpp b/lib/element.cpp
index 5ae20e0..b416261 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -33,7 +33,7 @@ Element::Element(const std::string &name)
(*this)->name = name;
(*this)->unique_id = ++unique_id_pool;
- if (GENESIS) std::cerr << "New element: " << name << std::endl;
+ if (GENESIS) std::cerr << "New element: " << name << " " << (*this)->unique_id << std::endl;
//default io signature to something
IOSignature sig; sig.push_back(1);