summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp44
1 files changed, 18 insertions, 26 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 4806914..8546731 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -128,8 +128,6 @@ void BlockActor::handle_task(void)
this->input_items[i].get() = mem;
this->input_items[i].size() = items;
- this->consume_items[i] = 0;
-
//inline dealings, how and when input buffers can be inlined into output buffers
//continue;
if (
@@ -159,8 +157,6 @@ void BlockActor::handle_task(void)
this->output_items[i].get() = mem;
this->output_items[i].size() = items;
-
- this->produce_items[i] = 0;
}
//------------------------------------------------------------------
@@ -176,32 +172,12 @@ void BlockActor::handle_task(void)
}
//------------------------------------------------------------------
- //-- process input consumption
- //------------------------------------------------------------------
- for (size_t i = 0; i < num_inputs; i++)
- {
- const size_t items = this->consume_items[i];
- if (items == 0) continue;
-
- this->items_consumed[i] += items;
- const size_t bytes = items*this->input_items_sizes[i];
- this->input_queues.consume(i, bytes);
-
- this->trim_tags(i);
- }
-
- //------------------------------------------------------------------
- //-- process output production
+ //-- Flush output buffers downstream
//------------------------------------------------------------------
for (size_t i = 0; i < num_outputs; i++)
{
- const size_t items = this->produce_items[i];
- if (items == 0) continue;
-
+ if (not this->output_queues.ready(i)) continue;
SBuffer &buff = this->output_queues.front(i);
- this->items_produced[i] += items;
- const size_t bytes = items*this->output_items_sizes[i];
- buff.length += bytes;
//dont always pass output buffers downstream for the sake of efficiency
if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length())
@@ -226,3 +202,19 @@ void BlockActor::handle_task(void)
this->Push(SelfKickMessage(), Theron::Address());
}
}
+
+void BlockActor::consume(const size_t i, const size_t items)
+{
+ this->items_consumed[i] += items;
+ const size_t bytes = items*this->input_items_sizes[i];
+ this->input_queues.consume(i, bytes);
+ this->trim_tags(i);
+}
+
+void BlockActor::produce(const size_t i, const size_t items)
+{
+ SBuffer &buff = this->output_queues.front(i);
+ this->items_produced[i] += items;
+ const size_t bytes = items*this->output_items_sizes[i];
+ buff.length += bytes;
+}