diff options
-rw-r--r-- | lib/block.cpp | 4 | ||||
-rw-r--r-- | lib/block_task.cpp | 44 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 6 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 2 | ||||
-rw-r--r-- | tests/block_test.py | 14 |
5 files changed, 25 insertions, 45 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 7afb96b..2235f72 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -101,12 +101,12 @@ void Block::set_output_config(const OutputPortConfig &config, const size_t which void Block::consume(const size_t which_input, const size_t how_many_items) { - (*this)->block->consume_items[which_input] += how_many_items; + (*this)->block->consume(which_input, how_many_items); } void Block::produce(const size_t which_output, const size_t how_many_items) { - (*this)->block->produce_items[which_output] += how_many_items; + (*this)->block->produce(which_output, how_many_items); } uint64_t Block::nitems_read(const size_t which_input) diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 4806914..8546731 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -128,8 +128,6 @@ void BlockActor::handle_task(void) this->input_items[i].get() = mem; this->input_items[i].size() = items; - this->consume_items[i] = 0; - //inline dealings, how and when input buffers can be inlined into output buffers //continue; if ( @@ -159,8 +157,6 @@ void BlockActor::handle_task(void) this->output_items[i].get() = mem; this->output_items[i].size() = items; - - this->produce_items[i] = 0; } //------------------------------------------------------------------ @@ -176,32 +172,12 @@ void BlockActor::handle_task(void) } //------------------------------------------------------------------ - //-- process input consumption - //------------------------------------------------------------------ - for (size_t i = 0; i < num_inputs; i++) - { - const size_t items = this->consume_items[i]; - if (items == 0) continue; - - this->items_consumed[i] += items; - const size_t bytes = items*this->input_items_sizes[i]; - this->input_queues.consume(i, bytes); - - this->trim_tags(i); - } - - //------------------------------------------------------------------ - //-- process output production + //-- Flush output buffers downstream //------------------------------------------------------------------ for (size_t i = 0; i < num_outputs; i++) { - const size_t items = this->produce_items[i]; - if (items == 0) continue; - + if (not this->output_queues.ready(i)) continue; SBuffer &buff = this->output_queues.front(i); - this->items_produced[i] += items; - const size_t bytes = items*this->output_items_sizes[i]; - buff.length += bytes; //dont always pass output buffers downstream for the sake of efficiency if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length()) @@ -226,3 +202,19 @@ void BlockActor::handle_task(void) this->Push(SelfKickMessage(), Theron::Address()); } } + +void BlockActor::consume(const size_t i, const size_t items) +{ + this->items_consumed[i] += items; + const size_t bytes = items*this->input_items_sizes[i]; + this->input_queues.consume(i, bytes); + this->trim_tags(i); +} + +void BlockActor::produce(const size_t i, const size_t items) +{ + SBuffer &buff = this->output_queues.front(i); + this->items_produced[i] += items; + const size_t bytes = items*this->output_items_sizes[i]; + buff.length += bytes; +} diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 67e1815..93e3a2f 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -104,6 +104,8 @@ struct BlockActor : Apology::Worker void output_fail(const size_t index); void sort_tags(const size_t index); void trim_tags(const size_t index); + void produce(const size_t index, const size_t items); + void consume(const size_t index, const size_t items); GRAS_FORCE_INLINE bool any_inputs_done(void) { if (this->inputs_done.none() or this->input_queues.all_ready()) return false; @@ -131,10 +133,6 @@ struct BlockActor : Apology::Worker Block::InputItems input_items; Block::OutputItems output_items; - //track work's calls to produce and consume - std::vector<size_t> produce_items; - std::vector<size_t> consume_items; - //track the subscriber counts std::vector<Token> input_tokens; std::vector<Token> output_tokens; diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index d058c41..9261af1 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -69,8 +69,6 @@ void BlockActor::handle_topology( //resize all work buffers to match current connections this->input_items.resize(num_inputs); this->output_items.resize(num_outputs); - this->consume_items.resize(num_inputs, 0); - this->produce_items.resize(num_outputs, 0); this->input_queues.resize(num_inputs); this->output_queues.resize(num_outputs); diff --git a/tests/block_test.py b/tests/block_test.py index a6dccfc..c62ea6f 100644 --- a/tests/block_test.py +++ b/tests/block_test.py @@ -31,19 +31,11 @@ class VectorSource(gras.Block): def work(self, ins, outs): print 'vector source work' num = min(len(outs[0]), len(self._vec)) - if num == 0: - self.mark_done() - return - #print 'outs[0][:num] is ' - #print len(outs) - #print len(outs[0]) - #print outs[0][0] - #print outs[0][:num] outs[0][:num] = self._vec[:num] self.produce(0, num) self._vec = self._vec[num:] - #if not self._vec: - # self.mark_done() + if not self._vec: + self.mark_done() print 'vector source work done' class VectorSink(gras.Block): @@ -84,7 +76,7 @@ class BlockTest(unittest.TestCase): vec_sink = None def test_make_block(self): - return + #return null_src = NullSource() null_sink = NullSink() |