summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2012-11-06 21:52:30 -0800
committerJosh Blum2012-11-06 21:52:30 -0800
commit12c56a0845a7b8e5cbff6ed20eae8b3fa3e26b2d (patch)
tree7c3a029df201a4aca7d1f550618637673eddb71a
parentb89c9abb84974492b4746a6fab5fe6cea920decb (diff)
downloadsandhi-12c56a0845a7b8e5cbff6ed20eae8b3fa3e26b2d.tar.gz
sandhi-12c56a0845a7b8e5cbff6ed20eae8b3fa3e26b2d.tar.bz2
sandhi-12c56a0845a7b8e5cbff6ed20eae8b3fa3e26b2d.zip
consume and produce calls do it in-place
-rw-r--r--lib/block.cpp4
-rw-r--r--lib/block_task.cpp44
-rw-r--r--lib/gras_impl/block_actor.hpp6
-rw-r--r--lib/topology_handler.cpp2
-rw-r--r--tests/block_test.py14
5 files changed, 25 insertions, 45 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 7afb96b..2235f72 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -101,12 +101,12 @@ void Block::set_output_config(const OutputPortConfig &config, const size_t which
void Block::consume(const size_t which_input, const size_t how_many_items)
{
- (*this)->block->consume_items[which_input] += how_many_items;
+ (*this)->block->consume(which_input, how_many_items);
}
void Block::produce(const size_t which_output, const size_t how_many_items)
{
- (*this)->block->produce_items[which_output] += how_many_items;
+ (*this)->block->produce(which_output, how_many_items);
}
uint64_t Block::nitems_read(const size_t which_input)
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 4806914..8546731 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -128,8 +128,6 @@ void BlockActor::handle_task(void)
this->input_items[i].get() = mem;
this->input_items[i].size() = items;
- this->consume_items[i] = 0;
-
//inline dealings, how and when input buffers can be inlined into output buffers
//continue;
if (
@@ -159,8 +157,6 @@ void BlockActor::handle_task(void)
this->output_items[i].get() = mem;
this->output_items[i].size() = items;
-
- this->produce_items[i] = 0;
}
//------------------------------------------------------------------
@@ -176,32 +172,12 @@ void BlockActor::handle_task(void)
}
//------------------------------------------------------------------
- //-- process input consumption
- //------------------------------------------------------------------
- for (size_t i = 0; i < num_inputs; i++)
- {
- const size_t items = this->consume_items[i];
- if (items == 0) continue;
-
- this->items_consumed[i] += items;
- const size_t bytes = items*this->input_items_sizes[i];
- this->input_queues.consume(i, bytes);
-
- this->trim_tags(i);
- }
-
- //------------------------------------------------------------------
- //-- process output production
+ //-- Flush output buffers downstream
//------------------------------------------------------------------
for (size_t i = 0; i < num_outputs; i++)
{
- const size_t items = this->produce_items[i];
- if (items == 0) continue;
-
+ if (not this->output_queues.ready(i)) continue;
SBuffer &buff = this->output_queues.front(i);
- this->items_produced[i] += items;
- const size_t bytes = items*this->output_items_sizes[i];
- buff.length += bytes;
//dont always pass output buffers downstream for the sake of efficiency
if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length())
@@ -226,3 +202,19 @@ void BlockActor::handle_task(void)
this->Push(SelfKickMessage(), Theron::Address());
}
}
+
+void BlockActor::consume(const size_t i, const size_t items)
+{
+ this->items_consumed[i] += items;
+ const size_t bytes = items*this->input_items_sizes[i];
+ this->input_queues.consume(i, bytes);
+ this->trim_tags(i);
+}
+
+void BlockActor::produce(const size_t i, const size_t items)
+{
+ SBuffer &buff = this->output_queues.front(i);
+ this->items_produced[i] += items;
+ const size_t bytes = items*this->output_items_sizes[i];
+ buff.length += bytes;
+}
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 67e1815..93e3a2f 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -104,6 +104,8 @@ struct BlockActor : Apology::Worker
void output_fail(const size_t index);
void sort_tags(const size_t index);
void trim_tags(const size_t index);
+ void produce(const size_t index, const size_t items);
+ void consume(const size_t index, const size_t items);
GRAS_FORCE_INLINE bool any_inputs_done(void)
{
if (this->inputs_done.none() or this->input_queues.all_ready()) return false;
@@ -131,10 +133,6 @@ struct BlockActor : Apology::Worker
Block::InputItems input_items;
Block::OutputItems output_items;
- //track work's calls to produce and consume
- std::vector<size_t> produce_items;
- std::vector<size_t> consume_items;
-
//track the subscriber counts
std::vector<Token> input_tokens;
std::vector<Token> output_tokens;
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index d058c41..9261af1 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -69,8 +69,6 @@ void BlockActor::handle_topology(
//resize all work buffers to match current connections
this->input_items.resize(num_inputs);
this->output_items.resize(num_outputs);
- this->consume_items.resize(num_inputs, 0);
- this->produce_items.resize(num_outputs, 0);
this->input_queues.resize(num_inputs);
this->output_queues.resize(num_outputs);
diff --git a/tests/block_test.py b/tests/block_test.py
index a6dccfc..c62ea6f 100644
--- a/tests/block_test.py
+++ b/tests/block_test.py
@@ -31,19 +31,11 @@ class VectorSource(gras.Block):
def work(self, ins, outs):
print 'vector source work'
num = min(len(outs[0]), len(self._vec))
- if num == 0:
- self.mark_done()
- return
- #print 'outs[0][:num] is '
- #print len(outs)
- #print len(outs[0])
- #print outs[0][0]
- #print outs[0][:num]
outs[0][:num] = self._vec[:num]
self.produce(0, num)
self._vec = self._vec[num:]
- #if not self._vec:
- # self.mark_done()
+ if not self._vec:
+ self.mark_done()
print 'vector source work done'
class VectorSink(gras.Block):
@@ -84,7 +76,7 @@ class BlockTest(unittest.TestCase):
vec_sink = None
def test_make_block(self):
- return
+ #return
null_src = NullSource()
null_sink = NullSink()