summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2013-04-14 02:07:40 -0700
committerJosh Blum2013-04-14 02:07:40 -0700
commit277dd31b08afcadceec7852012aa8b3c2cecbea7 (patch)
treeb5c06f46c13365dbe2489638496d867221e3ef8c /lib
parent82af15c5e7a69b116214cb6de99f9095852934d0 (diff)
downloadsandhi-277dd31b08afcadceec7852012aa8b3c2cecbea7.tar.gz
sandhi-277dd31b08afcadceec7852012aa8b3c2cecbea7.tar.bz2
sandhi-277dd31b08afcadceec7852012aa8b3c2cecbea7.zip
gras: move code into component files
Diffstat (limited to 'lib')
-rw-r--r--lib/CMakeLists.txt7
-rw-r--r--lib/block.cpp122
-rw-r--r--lib/block_consume.cpp44
-rw-r--r--lib/block_handlers.cpp2
-rw-r--r--lib/block_message.cpp49
-rw-r--r--lib/block_produce.cpp70
-rw-r--r--lib/block_task.cpp278
-rw-r--r--lib/gras_impl/block_actor.hpp86
-rw-r--r--lib/input_handlers.cpp6
-rw-r--r--lib/output_handlers.cpp2
-rw-r--r--lib/task_done.cpp68
-rw-r--r--lib/task_fail.cpp58
-rw-r--r--lib/task_main.cpp124
13 files changed, 480 insertions, 436 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index 5d059e7..ba7e76e 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -45,9 +45,14 @@ list(APPEND GRAS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/buffer_queue_pool.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tags.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/block_message.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/block_consume.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/block_produce.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_props.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/task_done.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/task_fail.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/task_main.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp
${CMAKE_CURRENT_SOURCE_DIR}/topology_handler.cpp
diff --git a/lib/block.cpp b/lib/block.cpp
index 6de4208..2506cc8 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -2,7 +2,6 @@
#include "element_impl.hpp"
#include <gras/block.hpp>
-#include <boost/foreach.hpp>
#include <boost/thread/thread.hpp> //yield
using namespace gras;
@@ -126,88 +125,6 @@ void Block::commit_config(void)
}
-void Block::consume(const size_t which_input, const size_t num_items)
-{
- ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative
- (*this)->block->consume(which_input, num_items);
-}
-
-void Block::produce(const size_t which_output, const size_t num_items)
-{
- ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative
- (*this)->block->produce(which_output, num_items);
-}
-
-void Block::consume(const size_t num_items)
-{
- const size_t num_inputs = (*this)->block->get_num_inputs();
- for (size_t i = 0; i < num_inputs; i++)
- {
- (*this)->block->consume(i, num_items);
- }
-}
-
-void Block::produce(const size_t num_items)
-{
- const size_t num_outputs = (*this)->block->get_num_outputs();
- for (size_t o = 0; o < num_outputs; o++)
- {
- (*this)->block->produce(o, num_items);
- }
-}
-
-item_index_t Block::get_consumed(const size_t which_input)
-{
- return (*this)->block->stats.items_consumed[which_input];
-}
-
-item_index_t Block::get_produced(const size_t which_output)
-{
- return (*this)->block->stats.items_produced[which_output];
-}
-
-void Block::post_output_tag(const size_t which_output, const Tag &tag)
-{
- (*this)->block->stats.tags_produced[which_output]++;
- (*this)->block->post_downstream(which_output, InputTagMessage(tag));
-}
-
-TagIter Block::get_input_tags(const size_t which_input)
-{
- const std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input];
- return TagIter(input_tags.begin(), input_tags.end());
-}
-
-void Block::post_output_msg(const size_t which_output, const PMCC &msg)
-{
- (*this)->block->stats.msgs_produced[which_output]++;
- (*this)->block->post_downstream(which_output, InputMsgMessage(msg));
-}
-
-PMCC Block::pop_input_msg(const size_t which_input)
-{
- std::vector<PMCC> &input_msgs = (*this)->block->input_msgs[which_input];
- size_t &num_read = (*this)->block->num_input_msgs_read[which_input];
- if (num_read >= input_msgs.size()) return PMCC();
- PMCC p = input_msgs[num_read++];
- (*this)->block->stats.msgs_consumed[which_input]++;
- return p;
-}
-
-void Block::propagate_tags(const size_t i, const TagIter &iter)
-{
- const size_t num_outputs = (*this)->block->get_num_outputs();
- for (size_t o = 0; o < num_outputs; o++)
- {
- BOOST_FOREACH(gras::Tag t, iter)
- {
- t.offset -= this->get_consumed(i);
- t.offset += this->get_produced(o);
- this->post_output_tag(o, t);
- }
- }
-}
-
void Block::notify_active(void)
{
//NOP
@@ -232,42 +149,3 @@ void Block::set_interruptible_work(const bool enb)
{
(*this)->block->interruptible_work = enb;
}
-
-void Block::mark_output_fail(const size_t which_output)
-{
- (*this)->block->output_fail(which_output);
-}
-
-void Block::mark_input_fail(const size_t which_input)
-{
- (*this)->block->input_fail(which_input);
-}
-
-void Block::mark_done(void)
-{
- (*this)->block->mark_done();
-}
-
-SBuffer Block::get_input_buffer(const size_t which_input) const
-{
- return (*this)->block->input_queues.front(which_input);
-}
-
-SBuffer Block::get_output_buffer(const size_t which_output) const
-{
- SBuffer &buff = (*this)->block->output_queues.front(which_output);
- //increment length to auto pop full buffer size,
- //when user doesnt call pop_output_buffer()
- buff.length = buff.get_actual_length();
- return buff;
-}
-
-void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes)
-{
- (*this)->block->output_queues.front(which_output).length = num_bytes;
-}
-
-void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer)
-{
- (*this)->block->produce_buffer(which_output, buffer);
-}
diff --git a/lib/block_consume.cpp b/lib/block_consume.cpp
new file mode 100644
index 0000000..efdb07e
--- /dev/null
+++ b/lib/block_consume.cpp
@@ -0,0 +1,44 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include "element_impl.hpp"
+#include <gras_impl/block_actor.hpp>
+#include <gras/block.hpp>
+#include "tag_handlers.hpp"
+
+using namespace gras;
+
+void Block::consume(const size_t which_input, const size_t num_items)
+{
+ ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative
+ (*this)->block->consume(which_input, num_items);
+}
+
+void Block::consume(const size_t num_items)
+{
+ const size_t num_inputs = (*this)->block->get_num_inputs();
+ for (size_t i = 0; i < num_inputs; i++)
+ {
+ (*this)->block->consume(i, num_items);
+ }
+}
+
+item_index_t Block::get_consumed(const size_t which_input)
+{
+ return (*this)->block->stats.items_consumed[which_input];
+}
+
+SBuffer Block::get_input_buffer(const size_t which_input) const
+{
+ return (*this)->block->input_queues.front(which_input);
+}
+
+GRAS_FORCE_INLINE void BlockActor::consume(const size_t i, const size_t items)
+{
+ #ifdef ITEM_CONSPROD
+ std::cerr << name << " consume " << items << std::endl;
+ #endif
+ this->stats.items_consumed[i] += items;
+ const size_t bytes = items*this->input_configs[i].item_size;
+ this->input_queues.consume(i, bytes);
+ this->trim_tags(i);
+}
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 0be6b85..ffa400e 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -126,7 +126,7 @@ void BlockActor::handle_self_kick(
const Theron::Address
){
MESSAGE_TRACER();
- this->handle_task();
+ this->task_main();
}
void BlockActor::handle_get_stats(
diff --git a/lib/block_message.cpp b/lib/block_message.cpp
new file mode 100644
index 0000000..43dc7b4
--- /dev/null
+++ b/lib/block_message.cpp
@@ -0,0 +1,49 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include "element_impl.hpp"
+#include <gras/block.hpp>
+#include <boost/foreach.hpp>
+
+using namespace gras;
+
+void Block::post_output_tag(const size_t which_output, const Tag &tag)
+{
+ (*this)->block->stats.tags_produced[which_output]++;
+ (*this)->block->post_downstream(which_output, InputTagMessage(tag));
+}
+
+void Block::post_output_msg(const size_t which_output, const PMCC &msg)
+{
+ (*this)->block->stats.msgs_produced[which_output]++;
+ (*this)->block->post_downstream(which_output, InputMsgMessage(msg));
+}
+
+TagIter Block::get_input_tags(const size_t which_input)
+{
+ const std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input];
+ return TagIter(input_tags.begin(), input_tags.end());
+}
+
+PMCC Block::pop_input_msg(const size_t which_input)
+{
+ std::vector<PMCC> &input_msgs = (*this)->block->input_msgs[which_input];
+ size_t &num_read = (*this)->block->num_input_msgs_read[which_input];
+ if (num_read >= input_msgs.size()) return PMCC();
+ PMCC p = input_msgs[num_read++];
+ (*this)->block->stats.msgs_consumed[which_input]++;
+ return p;
+}
+
+void Block::propagate_tags(const size_t i, const TagIter &iter)
+{
+ const size_t num_outputs = (*this)->block->get_num_outputs();
+ for (size_t o = 0; o < num_outputs; o++)
+ {
+ BOOST_FOREACH(gras::Tag t, iter)
+ {
+ t.offset -= this->get_consumed(i);
+ t.offset += this->get_produced(o);
+ this->post_output_tag(o, t);
+ }
+ }
+}
diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp
new file mode 100644
index 0000000..9fbc468
--- /dev/null
+++ b/lib/block_produce.cpp
@@ -0,0 +1,70 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include "element_impl.hpp"
+#include <gras_impl/block_actor.hpp>
+#include <gras/block.hpp>
+
+using namespace gras;
+
+void Block::produce(const size_t which_output, const size_t num_items)
+{
+ ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative
+ (*this)->block->produce(which_output, num_items);
+}
+
+void Block::produce(const size_t num_items)
+{
+ const size_t num_outputs = (*this)->block->get_num_outputs();
+ for (size_t o = 0; o < num_outputs; o++)
+ {
+ (*this)->block->produce(o, num_items);
+ }
+}
+
+item_index_t Block::get_produced(const size_t which_output)
+{
+ return (*this)->block->stats.items_produced[which_output];
+}
+
+SBuffer Block::get_output_buffer(const size_t which_output) const
+{
+ SBuffer &buff = (*this)->block->output_queues.front(which_output);
+ //increment length to auto pop full buffer size,
+ //when user doesnt call pop_output_buffer()
+ buff.length = buff.get_actual_length();
+ return buff;
+}
+
+void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes)
+{
+ (*this)->block->output_queues.front(which_output).length = num_bytes;
+}
+
+void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer)
+{
+ (*this)->block->produce_buffer(which_output, buffer);
+}
+
+GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items)
+{
+ #ifdef ITEM_CONSPROD
+ std::cerr << name << " produce " << items << std::endl;
+ #endif
+ SBuffer &buff = this->output_queues.front(i);
+ ASSERT((buff.length % output_configs[i].item_size) == 0);
+ this->stats.items_produced[i] += items;
+ const size_t bytes = items*this->output_configs[i].item_size;
+ buff.length += bytes;
+ this->produce_outputs[i] = true;
+}
+
+GRAS_FORCE_INLINE void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
+{
+ this->flush_output(i);
+ ASSERT((buffer.length % output_configs[i].item_size) == 0);
+ const size_t items = buffer.length/output_configs[i].item_size;
+ this->stats.items_produced[i] += items;
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = buffer;
+ this->post_downstream(i, buff_msg);
+}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
deleted file mode 100644
index cc707a8..0000000
--- a/lib/block_task.cpp
+++ /dev/null
@@ -1,278 +0,0 @@
-// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
-
-#include <gras_impl/block_actor.hpp>
-#include "tag_handlers.hpp"
-
-using namespace gras;
-
-void BlockActor::mark_done(void)
-{
- if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
-
- this->stats.stop_time = time_now();
- this->block_ptr->notify_inactive();
-
- //flush partial output buffers to the downstream
- for (size_t i = 0; i < this->get_num_outputs(); i++)
- {
- if (not this->output_queues.ready(i)) continue;
- SBuffer &buff = this->output_queues.front(i);
- if (buff.length == 0) continue;
- InputBufferMessage buff_msg;
- buff_msg.buffer = buff;
- this->post_downstream(i, buff_msg);
- this->output_queues.pop(i);
- }
-
- this->interruptible_thread.reset();
-
- //mark down the new state
- this->block_state = BLOCK_STATE_DONE;
-
- //release upstream, downstream, and executor tokens
- this->token_pool.clear();
-
- //release all buffers in queues
- this->input_queues.flush_all();
- this->output_queues.flush_all();
-
- //release all tags and msgs
- for (size_t i = 0; i < this->get_num_inputs(); i++)
- {
- this->input_msgs[i].clear();
- this->input_tags[i].clear();
- }
-
- //tell the upstream and downstram to re-check their tokens
- //this is how the other blocks know who is interested,
- //and can decide based on interest to set done or not
- for (size_t i = 0; i < this->get_num_inputs(); i++)
- {
- this->post_upstream(i, OutputCheckMessage());
- }
- for (size_t i = 0; i < this->get_num_outputs(); i++)
- {
- this->post_downstream(i, InputCheckMessage());
- }
-
- if (ARMAGEDDON) std::cerr
- << "==================================================\n"
- << "== The " << block_ptr->to_string() << " is done...\n"
- << "==================================================\n"
- << std::flush;
-}
-
-void BlockActor::input_fail(const size_t i)
-{
- //input failed, accumulate and try again
- if (not this->input_queues.is_accumulated(i))
- {
- this->input_queues.accumulate(i);
- this->task_kicker();
- return;
- }
-
- //otherwise check for done, else wait for more
- if (this->inputs_done[i])
- {
- this->mark_done();
- return;
- }
-
- //check that the input is not already maxed
- if (this->input_queues.is_front_maximal(i))
- {
- throw std::runtime_error("input_fail called on maximum_items buffer");
- }
-
- //mark fail: not ready until a new buffer appears
- this->input_queues.fail(i);
-}
-
-void BlockActor::output_fail(const size_t i)
-{
- SBuffer &buff = this->output_queues.front(i);
-
- //check that the input is not already maxed
- const size_t front_items = buff.length/this->output_configs[i].item_size;
- if (front_items >= this->output_configs[i].maximum_items)
- {
- throw std::runtime_error("output_fail called on maximum_items buffer");
- }
-
- //mark fail: not ready until a new buffer appears
- this->output_queues.fail(i);
-}
-
-void BlockActor::handle_task(void)
-{
- TimerAccumulate ta_prep(this->stats.total_time_prep);
-
- //------------------------------------------------------------------
- //-- Decide if its possible to continue any processing:
- //-- Handle task may get called for incoming buffers,
- //-- however, not all ports may have available buffers.
- //------------------------------------------------------------------
- if GRAS_UNLIKELY(not this->is_work_allowed()) return;
-
- const size_t num_inputs = this->get_num_inputs();
- const size_t num_outputs = this->get_num_outputs();
-
- //------------------------------------------------------------------
- //-- initialize input buffers before work
- //------------------------------------------------------------------
- size_t output_inline_index = 0;
- this->input_items.min() = ~0;
- this->input_items.max() = 0;
- for (size_t i = 0; i < num_inputs; i++)
- {
- this->sort_tags(i);
- this->num_input_msgs_read[i] = 0;
-
- ASSERT(this->input_queues.ready(i));
- const SBuffer &buff = this->input_queues.front(i);
- const void *mem = buff.get();
- size_t items = buff.length/this->input_configs[i].item_size;
-
- this->input_items[i].get() = mem;
- this->input_items[i].size() = items;
- this->input_items.min() = std::min(this->input_items.min(), items);
- this->input_items.max() = std::max(this->input_items.max(), items);
-
- //inline dealings, how and when input buffers can be inlined into output buffers
- //*
- if GRAS_UNLIKELY(
- buff.unique() and
- input_configs[i].inline_buffer and
- output_inline_index < num_outputs and
- buff.get_affinity() == this->buffer_affinity
- ){
- //copy buffer reference but push with zero length, same offset
- SBuffer new_obuff = buff;
- new_obuff.length = 0;
- this->flush_output(output_inline_index);
- this->output_queues.push(output_inline_index, new_obuff); //you got inlined!
- output_inline_index++; //done do this output port again
- }
- //*/
- }
-
- //------------------------------------------------------------------
- //-- initialize output buffers before work
- //------------------------------------------------------------------
- this->output_items.min() = ~0;
- this->output_items.max() = 0;
- for (size_t i = 0; i < num_outputs; i++)
- {
- ASSERT(this->output_queues.ready(i));
- SBuffer &buff = this->output_queues.front(i);
- ASSERT(buff.length == 0); //assumes it was flushed last call
- void *mem = buff.get();
- const size_t bytes = buff.get_actual_length() - buff.offset;
- size_t items = bytes/this->output_configs[i].item_size;
-
- this->output_items[i].get() = mem;
- this->output_items[i].size() = items;
- this->output_items.min() = std::min(this->output_items.min(), items);
- this->output_items.max() = std::max(this->output_items.max(), items);
- }
-
- //------------------------------------------------------------------
- //-- the work
- //------------------------------------------------------------------
- ta_prep.done();
- this->stats.work_count++;
- if GRAS_UNLIKELY(this->interruptible_thread)
- {
- TimerAccumulate ta_work(this->stats.total_time_work);
- this->interruptible_thread->call();
- }
- else
- {
- TimerAccumulate ta_work(this->stats.total_time_work);
- this->task_work();
- }
- this->stats.time_last_work = time_now();
- TimerAccumulate ta_post(this->stats.total_time_post);
-
- //------------------------------------------------------------------
- //-- Post-work output tasks
- //------------------------------------------------------------------
- for (size_t i = 0; i < num_outputs; i++)
- {
- this->flush_output(i);
- }
-
- //------------------------------------------------------------------
- //-- Post-work input tasks
- //------------------------------------------------------------------
- for (size_t i = 0; i < num_inputs; i++)
- {
- this->trim_msgs(i);
-
- //update the inputs available bit field
- this->update_input_avail(i);
-
- //missing at least one upstream provider?
- //since nothing else is coming in, its safe to mark done
- if GRAS_UNLIKELY(this->is_input_done(i)) this->mark_done();
- }
-
- //still have IO ready? kick off another task
- this->task_kicker();
-}
-
-void BlockActor::consume(const size_t i, const size_t items)
-{
- #ifdef ITEM_CONSPROD
- std::cerr << name << " consume " << items << std::endl;
- #endif
- this->stats.items_consumed[i] += items;
- const size_t bytes = items*this->input_configs[i].item_size;
- this->input_queues.consume(i, bytes);
- this->trim_tags(i);
-}
-
-void BlockActor::produce(const size_t i, const size_t items)
-{
- #ifdef ITEM_CONSPROD
- std::cerr << name << " produce " << items << std::endl;
- #endif
- SBuffer &buff = this->output_queues.front(i);
- ASSERT((buff.length % output_configs[i].item_size) == 0);
- this->stats.items_produced[i] += items;
- const size_t bytes = items*this->output_configs[i].item_size;
- buff.length += bytes;
- this->produce_outputs[i] = true;
-}
-
-void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
-{
- this->flush_output(i);
- ASSERT((buffer.length % output_configs[i].item_size) == 0);
- const size_t items = buffer.length/output_configs[i].item_size;
- this->stats.items_produced[i] += items;
- InputBufferMessage buff_msg;
- buff_msg.buffer = buffer;
- this->post_downstream(i, buff_msg);
-}
-
-GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i)
-{
- if GRAS_UNLIKELY(this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return;
- SBuffer &buff = this->output_queues.front(i);
- if GRAS_LIKELY(this->produce_outputs[i])
- {
- this->produce_outputs[i] = false;
- InputBufferMessage buff_msg;
- buff_msg.buffer = buff;
- this->post_downstream(i, buff_msg);
- }
-
- //increment buffer for next use
- buff.offset += buff.length;
- buff.length = 0;
-
- //release whatever has been used of the output buffer
- this->output_queues.pop(i);
-}
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 00f48f4..a8586c0 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -96,7 +96,7 @@ struct BlockActor : Apology::Worker
//helpers
void buffer_returner(const size_t index, SBuffer &buffer);
void mark_done(void);
- void handle_task(void);
+ void task_main(void);
void input_fail(const size_t index);
void output_fail(const size_t index);
void sort_tags(const size_t index);
@@ -106,35 +106,10 @@ struct BlockActor : Apology::Worker
void consume(const size_t index, const size_t items);
void produce_buffer(const size_t index, const SBuffer &buffer);
void flush_output(const size_t index);
-
- GRAS_FORCE_INLINE void task_kicker(void)
- {
- if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress());
- }
-
- GRAS_FORCE_INLINE void update_input_avail(const size_t i)
- {
- const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i);
- const bool has_input_msgs = not this->input_msgs[i].empty();
- this->inputs_available.set(i, has_input_bufs or has_input_msgs);
- this->input_queues.update_has_msg(i, has_input_msgs);
- }
-
- GRAS_FORCE_INLINE bool is_input_done(const size_t i)
- {
- return this->inputs_done[i] and not this->inputs_available[i];
- }
-
- GRAS_FORCE_INLINE bool is_work_allowed(void)
- {
- return (
- this->prio_token.unique() and
- this->block_state == BLOCK_STATE_LIVE and
- this->inputs_available.any() and
- this->input_queues.all_ready() and
- this->output_queues.all_ready()
- );
- }
+ void task_kicker(void);
+ void update_input_avail(const size_t index);
+ bool is_input_done(const size_t index);
+ bool is_work_allowed(void);
//per port properties
std::vector<InputPortConfig> input_configs;
@@ -192,6 +167,57 @@ struct BlockActor : Apology::Worker
BlockStats stats;
};
+//-------------- common functions from this BlockActor class ---------//
+
+GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i)
+{
+ if GRAS_UNLIKELY(this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return;
+ SBuffer &buff = this->output_queues.front(i);
+ if GRAS_LIKELY(this->produce_outputs[i])
+ {
+ this->produce_outputs[i] = false;
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = buff;
+ this->post_downstream(i, buff_msg);
+ }
+
+ //increment buffer for next use
+ buff.offset += buff.length;
+ buff.length = 0;
+
+ //release whatever has been used of the output buffer
+ this->output_queues.pop(i);
+}
+
+GRAS_FORCE_INLINE void BlockActor::task_kicker(void)
+{
+ if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress());
+}
+
+GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i)
+{
+ const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i);
+ const bool has_input_msgs = not this->input_msgs[i].empty();
+ this->inputs_available.set(i, has_input_bufs or has_input_msgs);
+ this->input_queues.update_has_msg(i, has_input_msgs);
+}
+
+GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i)
+{
+ return this->inputs_done[i] and not this->inputs_available[i];
+}
+
+GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void)
+{
+ return (
+ this->prio_token.unique() and
+ this->block_state == BLOCK_STATE_LIVE and
+ this->inputs_available.any() and
+ this->input_queues.all_ready() and
+ this->output_queues.all_ready()
+ );
+}
+
} //namespace gras
#endif /*INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP*/
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index c82efb3..da16ba4 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -29,7 +29,7 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::
this->update_input_avail(index);
ta.done();
- this->handle_task();
+ this->task_main();
}
void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Theron::Address)
@@ -44,7 +44,7 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th
this->update_input_avail(index);
ta.done();
- this->handle_task();
+ this->task_main();
}
void BlockActor::handle_input_token(const InputTokenMessage &message, const Theron::Address)
@@ -68,7 +68,7 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther
//upstream done, give it one more attempt at task handling
ta.done();
- this->handle_task();
+ this->task_main();
//now recheck the status, mark block done if the input is done
if (this->is_input_done(index)) this->mark_done();
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp
index 17244ee..3c2f971 100644
--- a/lib/output_handlers.cpp
+++ b/lib/output_handlers.cpp
@@ -18,7 +18,7 @@ void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const
this->output_queues.push(index, message.buffer);
ta.done();
- this->handle_task();
+ this->task_main();
}
void BlockActor::handle_output_token(const OutputTokenMessage &message, const Theron::Address)
diff --git a/lib/task_done.cpp b/lib/task_done.cpp
new file mode 100644
index 0000000..d116202
--- /dev/null
+++ b/lib/task_done.cpp
@@ -0,0 +1,68 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include "element_impl.hpp"
+#include <gras_impl/block_actor.hpp>
+
+using namespace gras;
+
+void Block::mark_done(void)
+{
+ (*this)->block->mark_done();
+}
+
+void BlockActor::mark_done(void)
+{
+ if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
+
+ this->stats.stop_time = time_now();
+ this->block_ptr->notify_inactive();
+
+ //flush partial output buffers to the downstream
+ for (size_t i = 0; i < this->get_num_outputs(); i++)
+ {
+ if (not this->output_queues.ready(i)) continue;
+ SBuffer &buff = this->output_queues.front(i);
+ if (buff.length == 0) continue;
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = buff;
+ this->post_downstream(i, buff_msg);
+ this->output_queues.pop(i);
+ }
+
+ this->interruptible_thread.reset();
+
+ //mark down the new state
+ this->block_state = BLOCK_STATE_DONE;
+
+ //release upstream, downstream, and executor tokens
+ this->token_pool.clear();
+
+ //release all buffers in queues
+ this->input_queues.flush_all();
+ this->output_queues.flush_all();
+
+ //release all tags and msgs
+ for (size_t i = 0; i < this->get_num_inputs(); i++)
+ {
+ this->input_msgs[i].clear();
+ this->input_tags[i].clear();
+ }
+
+ //tell the upstream and downstram to re-check their tokens
+ //this is how the other blocks know who is interested,
+ //and can decide based on interest to set done or not
+ for (size_t i = 0; i < this->get_num_inputs(); i++)
+ {
+ this->post_upstream(i, OutputCheckMessage());
+ }
+ for (size_t i = 0; i < this->get_num_outputs(); i++)
+ {
+ this->post_downstream(i, InputCheckMessage());
+ }
+
+ if (ARMAGEDDON) std::cerr
+ << "==================================================\n"
+ << "== The " << block_ptr->to_string() << " is done...\n"
+ << "==================================================\n"
+ << std::flush;
+}
diff --git a/lib/task_fail.cpp b/lib/task_fail.cpp
new file mode 100644
index 0000000..c30b668
--- /dev/null
+++ b/lib/task_fail.cpp
@@ -0,0 +1,58 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include "element_impl.hpp"
+#include <gras_impl/block_actor.hpp>
+
+using namespace gras;
+
+void Block::mark_output_fail(const size_t which_output)
+{
+ (*this)->block->output_fail(which_output);
+}
+
+void Block::mark_input_fail(const size_t which_input)
+{
+ (*this)->block->input_fail(which_input);
+}
+
+void BlockActor::input_fail(const size_t i)
+{
+ //input failed, accumulate and try again
+ if (not this->input_queues.is_accumulated(i))
+ {
+ this->input_queues.accumulate(i);
+ this->task_kicker();
+ return;
+ }
+
+ //otherwise check for done, else wait for more
+ if (this->inputs_done[i])
+ {
+ this->mark_done();
+ return;
+ }
+
+ //check that the input is not already maxed
+ if (this->input_queues.is_front_maximal(i))
+ {
+ throw std::runtime_error("input_fail called on maximum_items buffer");
+ }
+
+ //mark fail: not ready until a new buffer appears
+ this->input_queues.fail(i);
+}
+
+void BlockActor::output_fail(const size_t i)
+{
+ SBuffer &buff = this->output_queues.front(i);
+
+ //check that the input is not already maxed
+ const size_t front_items = buff.length/this->output_configs[i].item_size;
+ if (front_items >= this->output_configs[i].maximum_items)
+ {
+ throw std::runtime_error("output_fail called on maximum_items buffer");
+ }
+
+ //mark fail: not ready until a new buffer appears
+ this->output_queues.fail(i);
+}
diff --git a/lib/task_main.cpp b/lib/task_main.cpp
new file mode 100644
index 0000000..0e855a8
--- /dev/null
+++ b/lib/task_main.cpp
@@ -0,0 +1,124 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <gras_impl/block_actor.hpp>
+#include "tag_handlers.hpp"
+
+using namespace gras;
+
+void BlockActor::task_main(void)
+{
+ TimerAccumulate ta_prep(this->stats.total_time_prep);
+
+ //------------------------------------------------------------------
+ //-- Decide if its possible to continue any processing:
+ //-- Handle task may get called for incoming buffers,
+ //-- however, not all ports may have available buffers.
+ //------------------------------------------------------------------
+ if GRAS_UNLIKELY(not this->is_work_allowed()) return;
+
+ const size_t num_inputs = this->get_num_inputs();
+ const size_t num_outputs = this->get_num_outputs();
+
+ //------------------------------------------------------------------
+ //-- initialize input buffers before work
+ //------------------------------------------------------------------
+ size_t output_inline_index = 0;
+ this->input_items.min() = ~0;
+ this->input_items.max() = 0;
+ for (size_t i = 0; i < num_inputs; i++)
+ {
+ this->sort_tags(i);
+ this->num_input_msgs_read[i] = 0;
+
+ ASSERT(this->input_queues.ready(i));
+ const SBuffer &buff = this->input_queues.front(i);
+ const void *mem = buff.get();
+ size_t items = buff.length/this->input_configs[i].item_size;
+
+ this->input_items[i].get() = mem;
+ this->input_items[i].size() = items;
+ this->input_items.min() = std::min(this->input_items.min(), items);
+ this->input_items.max() = std::max(this->input_items.max(), items);
+
+ //inline dealings, how and when input buffers can be inlined into output buffers
+ //*
+ if GRAS_UNLIKELY(
+ buff.unique() and
+ input_configs[i].inline_buffer and
+ output_inline_index < num_outputs and
+ buff.get_affinity() == this->buffer_affinity
+ ){
+ //copy buffer reference but push with zero length, same offset
+ SBuffer new_obuff = buff;
+ new_obuff.length = 0;
+ this->flush_output(output_inline_index);
+ this->output_queues.push(output_inline_index, new_obuff); //you got inlined!
+ output_inline_index++; //done do this output port again
+ }
+ //*/
+ }
+
+ //------------------------------------------------------------------
+ //-- initialize output buffers before work
+ //------------------------------------------------------------------
+ this->output_items.min() = ~0;
+ this->output_items.max() = 0;
+ for (size_t i = 0; i < num_outputs; i++)
+ {
+ ASSERT(this->output_queues.ready(i));
+ SBuffer &buff = this->output_queues.front(i);
+ ASSERT(buff.length == 0); //assumes it was flushed last call
+ void *mem = buff.get();
+ const size_t bytes = buff.get_actual_length() - buff.offset;
+ size_t items = bytes/this->output_configs[i].item_size;
+
+ this->output_items[i].get() = mem;
+ this->output_items[i].size() = items;
+ this->output_items.min() = std::min(this->output_items.min(), items);
+ this->output_items.max() = std::max(this->output_items.max(), items);
+ }
+
+ //------------------------------------------------------------------
+ //-- the work
+ //------------------------------------------------------------------
+ ta_prep.done();
+ this->stats.work_count++;
+ if GRAS_UNLIKELY(this->interruptible_thread)
+ {
+ TimerAccumulate ta_work(this->stats.total_time_work);
+ this->interruptible_thread->call();
+ }
+ else
+ {
+ TimerAccumulate ta_work(this->stats.total_time_work);
+ this->task_work();
+ }
+ this->stats.time_last_work = time_now();
+ TimerAccumulate ta_post(this->stats.total_time_post);
+
+ //------------------------------------------------------------------
+ //-- Post-work output tasks
+ //------------------------------------------------------------------
+ for (size_t i = 0; i < num_outputs; i++)
+ {
+ this->flush_output(i);
+ }
+
+ //------------------------------------------------------------------
+ //-- Post-work input tasks
+ //------------------------------------------------------------------
+ for (size_t i = 0; i < num_inputs; i++)
+ {
+ this->trim_msgs(i);
+
+ //update the inputs available bit field
+ this->update_input_avail(i);
+
+ //missing at least one upstream provider?
+ //since nothing else is coming in, its safe to mark done
+ if GRAS_UNLIKELY(this->is_input_done(i)) this->mark_done();
+ }
+
+ //still have IO ready? kick off another task
+ this->task_kicker();
+}