summaryrefslogtreecommitdiff
path: root/lib/task_fail.cpp
diff options
context:
space:
mode:
authorJosh Blum2013-04-14 02:07:40 -0700
committerJosh Blum2013-04-14 02:07:40 -0700
commit277dd31b08afcadceec7852012aa8b3c2cecbea7 (patch)
treeb5c06f46c13365dbe2489638496d867221e3ef8c /lib/task_fail.cpp
parent82af15c5e7a69b116214cb6de99f9095852934d0 (diff)
downloadsandhi-277dd31b08afcadceec7852012aa8b3c2cecbea7.tar.gz
sandhi-277dd31b08afcadceec7852012aa8b3c2cecbea7.tar.bz2
sandhi-277dd31b08afcadceec7852012aa8b3c2cecbea7.zip
gras: move code into component files
Diffstat (limited to 'lib/task_fail.cpp')
-rw-r--r--lib/task_fail.cpp58
1 files changed, 58 insertions, 0 deletions
diff --git a/lib/task_fail.cpp b/lib/task_fail.cpp
new file mode 100644
index 0000000..c30b668
--- /dev/null
+++ b/lib/task_fail.cpp
@@ -0,0 +1,58 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include "element_impl.hpp"
+#include <gras_impl/block_actor.hpp>
+
+using namespace gras;
+
+void Block::mark_output_fail(const size_t which_output)
+{
+ (*this)->block->output_fail(which_output);
+}
+
+void Block::mark_input_fail(const size_t which_input)
+{
+ (*this)->block->input_fail(which_input);
+}
+
+void BlockActor::input_fail(const size_t i)
+{
+ //input failed, accumulate and try again
+ if (not this->input_queues.is_accumulated(i))
+ {
+ this->input_queues.accumulate(i);
+ this->task_kicker();
+ return;
+ }
+
+ //otherwise check for done, else wait for more
+ if (this->inputs_done[i])
+ {
+ this->mark_done();
+ return;
+ }
+
+ //check that the input is not already maxed
+ if (this->input_queues.is_front_maximal(i))
+ {
+ throw std::runtime_error("input_fail called on maximum_items buffer");
+ }
+
+ //mark fail: not ready until a new buffer appears
+ this->input_queues.fail(i);
+}
+
+void BlockActor::output_fail(const size_t i)
+{
+ SBuffer &buff = this->output_queues.front(i);
+
+ //check that the input is not already maxed
+ const size_t front_items = buff.length/this->output_configs[i].item_size;
+ if (front_items >= this->output_configs[i].maximum_items)
+ {
+ throw std::runtime_error("output_fail called on maximum_items buffer");
+ }
+
+ //mark fail: not ready until a new buffer appears
+ this->output_queues.fail(i);
+}