diff options
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 44 |
1 files changed, 18 insertions, 26 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 4806914..8546731 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -128,8 +128,6 @@ void BlockActor::handle_task(void) this->input_items[i].get() = mem; this->input_items[i].size() = items; - this->consume_items[i] = 0; - //inline dealings, how and when input buffers can be inlined into output buffers //continue; if ( @@ -159,8 +157,6 @@ void BlockActor::handle_task(void) this->output_items[i].get() = mem; this->output_items[i].size() = items; - - this->produce_items[i] = 0; } //------------------------------------------------------------------ @@ -176,32 +172,12 @@ void BlockActor::handle_task(void) } //------------------------------------------------------------------ - //-- process input consumption - //------------------------------------------------------------------ - for (size_t i = 0; i < num_inputs; i++) - { - const size_t items = this->consume_items[i]; - if (items == 0) continue; - - this->items_consumed[i] += items; - const size_t bytes = items*this->input_items_sizes[i]; - this->input_queues.consume(i, bytes); - - this->trim_tags(i); - } - - //------------------------------------------------------------------ - //-- process output production + //-- Flush output buffers downstream //------------------------------------------------------------------ for (size_t i = 0; i < num_outputs; i++) { - const size_t items = this->produce_items[i]; - if (items == 0) continue; - + if (not this->output_queues.ready(i)) continue; SBuffer &buff = this->output_queues.front(i); - this->items_produced[i] += items; - const size_t bytes = items*this->output_items_sizes[i]; - buff.length += bytes; //dont always pass output buffers downstream for the sake of efficiency if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length()) @@ -226,3 +202,19 @@ void BlockActor::handle_task(void) this->Push(SelfKickMessage(), Theron::Address()); } } + +void BlockActor::consume(const size_t i, const size_t items) +{ + this->items_consumed[i] += items; + const size_t bytes = items*this->input_items_sizes[i]; + this->input_queues.consume(i, bytes); + this->trim_tags(i); +} + +void BlockActor::produce(const size_t i, const size_t items) +{ + SBuffer &buff = this->output_queues.front(i); + this->items_produced[i] += items; + const size_t bytes = items*this->output_items_sizes[i]; + buff.length += bytes; +} |