summaryrefslogtreecommitdiff
path: root/lib/task_main.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/task_main.cpp')
-rw-r--r--lib/task_main.cpp124
1 files changed, 124 insertions, 0 deletions
diff --git a/lib/task_main.cpp b/lib/task_main.cpp
new file mode 100644
index 0000000..0e855a8
--- /dev/null
+++ b/lib/task_main.cpp
@@ -0,0 +1,124 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <gras_impl/block_actor.hpp>
+#include "tag_handlers.hpp"
+
+using namespace gras;
+
+void BlockActor::task_main(void)
+{
+ TimerAccumulate ta_prep(this->stats.total_time_prep);
+
+ //------------------------------------------------------------------
+ //-- Decide if its possible to continue any processing:
+ //-- Handle task may get called for incoming buffers,
+ //-- however, not all ports may have available buffers.
+ //------------------------------------------------------------------
+ if GRAS_UNLIKELY(not this->is_work_allowed()) return;
+
+ const size_t num_inputs = this->get_num_inputs();
+ const size_t num_outputs = this->get_num_outputs();
+
+ //------------------------------------------------------------------
+ //-- initialize input buffers before work
+ //------------------------------------------------------------------
+ size_t output_inline_index = 0;
+ this->input_items.min() = ~0;
+ this->input_items.max() = 0;
+ for (size_t i = 0; i < num_inputs; i++)
+ {
+ this->sort_tags(i);
+ this->num_input_msgs_read[i] = 0;
+
+ ASSERT(this->input_queues.ready(i));
+ const SBuffer &buff = this->input_queues.front(i);
+ const void *mem = buff.get();
+ size_t items = buff.length/this->input_configs[i].item_size;
+
+ this->input_items[i].get() = mem;
+ this->input_items[i].size() = items;
+ this->input_items.min() = std::min(this->input_items.min(), items);
+ this->input_items.max() = std::max(this->input_items.max(), items);
+
+ //inline dealings, how and when input buffers can be inlined into output buffers
+ //*
+ if GRAS_UNLIKELY(
+ buff.unique() and
+ input_configs[i].inline_buffer and
+ output_inline_index < num_outputs and
+ buff.get_affinity() == this->buffer_affinity
+ ){
+ //copy buffer reference but push with zero length, same offset
+ SBuffer new_obuff = buff;
+ new_obuff.length = 0;
+ this->flush_output(output_inline_index);
+ this->output_queues.push(output_inline_index, new_obuff); //you got inlined!
+ output_inline_index++; //done do this output port again
+ }
+ //*/
+ }
+
+ //------------------------------------------------------------------
+ //-- initialize output buffers before work
+ //------------------------------------------------------------------
+ this->output_items.min() = ~0;
+ this->output_items.max() = 0;
+ for (size_t i = 0; i < num_outputs; i++)
+ {
+ ASSERT(this->output_queues.ready(i));
+ SBuffer &buff = this->output_queues.front(i);
+ ASSERT(buff.length == 0); //assumes it was flushed last call
+ void *mem = buff.get();
+ const size_t bytes = buff.get_actual_length() - buff.offset;
+ size_t items = bytes/this->output_configs[i].item_size;
+
+ this->output_items[i].get() = mem;
+ this->output_items[i].size() = items;
+ this->output_items.min() = std::min(this->output_items.min(), items);
+ this->output_items.max() = std::max(this->output_items.max(), items);
+ }
+
+ //------------------------------------------------------------------
+ //-- the work
+ //------------------------------------------------------------------
+ ta_prep.done();
+ this->stats.work_count++;
+ if GRAS_UNLIKELY(this->interruptible_thread)
+ {
+ TimerAccumulate ta_work(this->stats.total_time_work);
+ this->interruptible_thread->call();
+ }
+ else
+ {
+ TimerAccumulate ta_work(this->stats.total_time_work);
+ this->task_work();
+ }
+ this->stats.time_last_work = time_now();
+ TimerAccumulate ta_post(this->stats.total_time_post);
+
+ //------------------------------------------------------------------
+ //-- Post-work output tasks
+ //------------------------------------------------------------------
+ for (size_t i = 0; i < num_outputs; i++)
+ {
+ this->flush_output(i);
+ }
+
+ //------------------------------------------------------------------
+ //-- Post-work input tasks
+ //------------------------------------------------------------------
+ for (size_t i = 0; i < num_inputs; i++)
+ {
+ this->trim_msgs(i);
+
+ //update the inputs available bit field
+ this->update_input_avail(i);
+
+ //missing at least one upstream provider?
+ //since nothing else is coming in, its safe to mark done
+ if GRAS_UNLIKELY(this->is_input_done(i)) this->mark_done();
+ }
+
+ //still have IO ready? kick off another task
+ this->task_kicker();
+}