diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/CMakeLists.txt | 7 | ||||
-rw-r--r-- | lib/block.cpp | 122 | ||||
-rw-r--r-- | lib/block_consume.cpp | 44 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 2 | ||||
-rw-r--r-- | lib/block_message.cpp | 49 | ||||
-rw-r--r-- | lib/block_produce.cpp | 70 | ||||
-rw-r--r-- | lib/block_task.cpp | 278 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 86 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 6 | ||||
-rw-r--r-- | lib/output_handlers.cpp | 2 | ||||
-rw-r--r-- | lib/task_done.cpp | 68 | ||||
-rw-r--r-- | lib/task_fail.cpp | 58 | ||||
-rw-r--r-- | lib/task_main.cpp | 124 |
13 files changed, 480 insertions, 436 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 5d059e7..ba7e76e 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -45,9 +45,14 @@ list(APPEND GRAS_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/buffer_queue_pool.cpp ${CMAKE_CURRENT_SOURCE_DIR}/tags.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/block_message.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/block_consume.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/block_produce.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_props.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/task_done.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/task_fail.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/task_main.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp ${CMAKE_CURRENT_SOURCE_DIR}/topology_handler.cpp diff --git a/lib/block.cpp b/lib/block.cpp index 6de4208..2506cc8 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -2,7 +2,6 @@ #include "element_impl.hpp" #include <gras/block.hpp> -#include <boost/foreach.hpp> #include <boost/thread/thread.hpp> //yield using namespace gras; @@ -126,88 +125,6 @@ void Block::commit_config(void) } -void Block::consume(const size_t which_input, const size_t num_items) -{ - ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative - (*this)->block->consume(which_input, num_items); -} - -void Block::produce(const size_t which_output, const size_t num_items) -{ - ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative - (*this)->block->produce(which_output, num_items); -} - -void Block::consume(const size_t num_items) -{ - const size_t num_inputs = (*this)->block->get_num_inputs(); - for (size_t i = 0; i < num_inputs; i++) - { - (*this)->block->consume(i, num_items); - } -} - -void Block::produce(const size_t num_items) -{ - const size_t num_outputs = (*this)->block->get_num_outputs(); - for (size_t o = 0; o < num_outputs; o++) - { - (*this)->block->produce(o, num_items); - } -} - -item_index_t Block::get_consumed(const size_t which_input) -{ - return (*this)->block->stats.items_consumed[which_input]; -} - -item_index_t Block::get_produced(const size_t which_output) -{ - return (*this)->block->stats.items_produced[which_output]; -} - -void Block::post_output_tag(const size_t which_output, const Tag &tag) -{ - (*this)->block->stats.tags_produced[which_output]++; - (*this)->block->post_downstream(which_output, InputTagMessage(tag)); -} - -TagIter Block::get_input_tags(const size_t which_input) -{ - const std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input]; - return TagIter(input_tags.begin(), input_tags.end()); -} - -void Block::post_output_msg(const size_t which_output, const PMCC &msg) -{ - (*this)->block->stats.msgs_produced[which_output]++; - (*this)->block->post_downstream(which_output, InputMsgMessage(msg)); -} - -PMCC Block::pop_input_msg(const size_t which_input) -{ - std::vector<PMCC> &input_msgs = (*this)->block->input_msgs[which_input]; - size_t &num_read = (*this)->block->num_input_msgs_read[which_input]; - if (num_read >= input_msgs.size()) return PMCC(); - PMCC p = input_msgs[num_read++]; - (*this)->block->stats.msgs_consumed[which_input]++; - return p; -} - -void Block::propagate_tags(const size_t i, const TagIter &iter) -{ - const size_t num_outputs = (*this)->block->get_num_outputs(); - for (size_t o = 0; o < num_outputs; o++) - { - BOOST_FOREACH(gras::Tag t, iter) - { - t.offset -= this->get_consumed(i); - t.offset += this->get_produced(o); - this->post_output_tag(o, t); - } - } -} - void Block::notify_active(void) { //NOP @@ -232,42 +149,3 @@ void Block::set_interruptible_work(const bool enb) { (*this)->block->interruptible_work = enb; } - -void Block::mark_output_fail(const size_t which_output) -{ - (*this)->block->output_fail(which_output); -} - -void Block::mark_input_fail(const size_t which_input) -{ - (*this)->block->input_fail(which_input); -} - -void Block::mark_done(void) -{ - (*this)->block->mark_done(); -} - -SBuffer Block::get_input_buffer(const size_t which_input) const -{ - return (*this)->block->input_queues.front(which_input); -} - -SBuffer Block::get_output_buffer(const size_t which_output) const -{ - SBuffer &buff = (*this)->block->output_queues.front(which_output); - //increment length to auto pop full buffer size, - //when user doesnt call pop_output_buffer() - buff.length = buff.get_actual_length(); - return buff; -} - -void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes) -{ - (*this)->block->output_queues.front(which_output).length = num_bytes; -} - -void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer) -{ - (*this)->block->produce_buffer(which_output, buffer); -} diff --git a/lib/block_consume.cpp b/lib/block_consume.cpp new file mode 100644 index 0000000..efdb07e --- /dev/null +++ b/lib/block_consume.cpp @@ -0,0 +1,44 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include "element_impl.hpp" +#include <gras_impl/block_actor.hpp> +#include <gras/block.hpp> +#include "tag_handlers.hpp" + +using namespace gras; + +void Block::consume(const size_t which_input, const size_t num_items) +{ + ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative + (*this)->block->consume(which_input, num_items); +} + +void Block::consume(const size_t num_items) +{ + const size_t num_inputs = (*this)->block->get_num_inputs(); + for (size_t i = 0; i < num_inputs; i++) + { + (*this)->block->consume(i, num_items); + } +} + +item_index_t Block::get_consumed(const size_t which_input) +{ + return (*this)->block->stats.items_consumed[which_input]; +} + +SBuffer Block::get_input_buffer(const size_t which_input) const +{ + return (*this)->block->input_queues.front(which_input); +} + +GRAS_FORCE_INLINE void BlockActor::consume(const size_t i, const size_t items) +{ + #ifdef ITEM_CONSPROD + std::cerr << name << " consume " << items << std::endl; + #endif + this->stats.items_consumed[i] += items; + const size_t bytes = items*this->input_configs[i].item_size; + this->input_queues.consume(i, bytes); + this->trim_tags(i); +} diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 0be6b85..ffa400e 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -126,7 +126,7 @@ void BlockActor::handle_self_kick( const Theron::Address ){ MESSAGE_TRACER(); - this->handle_task(); + this->task_main(); } void BlockActor::handle_get_stats( diff --git a/lib/block_message.cpp b/lib/block_message.cpp new file mode 100644 index 0000000..43dc7b4 --- /dev/null +++ b/lib/block_message.cpp @@ -0,0 +1,49 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include "element_impl.hpp" +#include <gras/block.hpp> +#include <boost/foreach.hpp> + +using namespace gras; + +void Block::post_output_tag(const size_t which_output, const Tag &tag) +{ + (*this)->block->stats.tags_produced[which_output]++; + (*this)->block->post_downstream(which_output, InputTagMessage(tag)); +} + +void Block::post_output_msg(const size_t which_output, const PMCC &msg) +{ + (*this)->block->stats.msgs_produced[which_output]++; + (*this)->block->post_downstream(which_output, InputMsgMessage(msg)); +} + +TagIter Block::get_input_tags(const size_t which_input) +{ + const std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input]; + return TagIter(input_tags.begin(), input_tags.end()); +} + +PMCC Block::pop_input_msg(const size_t which_input) +{ + std::vector<PMCC> &input_msgs = (*this)->block->input_msgs[which_input]; + size_t &num_read = (*this)->block->num_input_msgs_read[which_input]; + if (num_read >= input_msgs.size()) return PMCC(); + PMCC p = input_msgs[num_read++]; + (*this)->block->stats.msgs_consumed[which_input]++; + return p; +} + +void Block::propagate_tags(const size_t i, const TagIter &iter) +{ + const size_t num_outputs = (*this)->block->get_num_outputs(); + for (size_t o = 0; o < num_outputs; o++) + { + BOOST_FOREACH(gras::Tag t, iter) + { + t.offset -= this->get_consumed(i); + t.offset += this->get_produced(o); + this->post_output_tag(o, t); + } + } +} diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp new file mode 100644 index 0000000..9fbc468 --- /dev/null +++ b/lib/block_produce.cpp @@ -0,0 +1,70 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include "element_impl.hpp" +#include <gras_impl/block_actor.hpp> +#include <gras/block.hpp> + +using namespace gras; + +void Block::produce(const size_t which_output, const size_t num_items) +{ + ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative + (*this)->block->produce(which_output, num_items); +} + +void Block::produce(const size_t num_items) +{ + const size_t num_outputs = (*this)->block->get_num_outputs(); + for (size_t o = 0; o < num_outputs; o++) + { + (*this)->block->produce(o, num_items); + } +} + +item_index_t Block::get_produced(const size_t which_output) +{ + return (*this)->block->stats.items_produced[which_output]; +} + +SBuffer Block::get_output_buffer(const size_t which_output) const +{ + SBuffer &buff = (*this)->block->output_queues.front(which_output); + //increment length to auto pop full buffer size, + //when user doesnt call pop_output_buffer() + buff.length = buff.get_actual_length(); + return buff; +} + +void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes) +{ + (*this)->block->output_queues.front(which_output).length = num_bytes; +} + +void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer) +{ + (*this)->block->produce_buffer(which_output, buffer); +} + +GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items) +{ + #ifdef ITEM_CONSPROD + std::cerr << name << " produce " << items << std::endl; + #endif + SBuffer &buff = this->output_queues.front(i); + ASSERT((buff.length % output_configs[i].item_size) == 0); + this->stats.items_produced[i] += items; + const size_t bytes = items*this->output_configs[i].item_size; + buff.length += bytes; + this->produce_outputs[i] = true; +} + +GRAS_FORCE_INLINE void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) +{ + this->flush_output(i); + ASSERT((buffer.length % output_configs[i].item_size) == 0); + const size_t items = buffer.length/output_configs[i].item_size; + this->stats.items_produced[i] += items; + InputBufferMessage buff_msg; + buff_msg.buffer = buffer; + this->post_downstream(i, buff_msg); +} diff --git a/lib/block_task.cpp b/lib/block_task.cpp deleted file mode 100644 index cc707a8..0000000 --- a/lib/block_task.cpp +++ /dev/null @@ -1,278 +0,0 @@ -// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. - -#include <gras_impl/block_actor.hpp> -#include "tag_handlers.hpp" - -using namespace gras; - -void BlockActor::mark_done(void) -{ - if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first - - this->stats.stop_time = time_now(); - this->block_ptr->notify_inactive(); - - //flush partial output buffers to the downstream - for (size_t i = 0; i < this->get_num_outputs(); i++) - { - if (not this->output_queues.ready(i)) continue; - SBuffer &buff = this->output_queues.front(i); - if (buff.length == 0) continue; - InputBufferMessage buff_msg; - buff_msg.buffer = buff; - this->post_downstream(i, buff_msg); - this->output_queues.pop(i); - } - - this->interruptible_thread.reset(); - - //mark down the new state - this->block_state = BLOCK_STATE_DONE; - - //release upstream, downstream, and executor tokens - this->token_pool.clear(); - - //release all buffers in queues - this->input_queues.flush_all(); - this->output_queues.flush_all(); - - //release all tags and msgs - for (size_t i = 0; i < this->get_num_inputs(); i++) - { - this->input_msgs[i].clear(); - this->input_tags[i].clear(); - } - - //tell the upstream and downstram to re-check their tokens - //this is how the other blocks know who is interested, - //and can decide based on interest to set done or not - for (size_t i = 0; i < this->get_num_inputs(); i++) - { - this->post_upstream(i, OutputCheckMessage()); - } - for (size_t i = 0; i < this->get_num_outputs(); i++) - { - this->post_downstream(i, InputCheckMessage()); - } - - if (ARMAGEDDON) std::cerr - << "==================================================\n" - << "== The " << block_ptr->to_string() << " is done...\n" - << "==================================================\n" - << std::flush; -} - -void BlockActor::input_fail(const size_t i) -{ - //input failed, accumulate and try again - if (not this->input_queues.is_accumulated(i)) - { - this->input_queues.accumulate(i); - this->task_kicker(); - return; - } - - //otherwise check for done, else wait for more - if (this->inputs_done[i]) - { - this->mark_done(); - return; - } - - //check that the input is not already maxed - if (this->input_queues.is_front_maximal(i)) - { - throw std::runtime_error("input_fail called on maximum_items buffer"); - } - - //mark fail: not ready until a new buffer appears - this->input_queues.fail(i); -} - -void BlockActor::output_fail(const size_t i) -{ - SBuffer &buff = this->output_queues.front(i); - - //check that the input is not already maxed - const size_t front_items = buff.length/this->output_configs[i].item_size; - if (front_items >= this->output_configs[i].maximum_items) - { - throw std::runtime_error("output_fail called on maximum_items buffer"); - } - - //mark fail: not ready until a new buffer appears - this->output_queues.fail(i); -} - -void BlockActor::handle_task(void) -{ - TimerAccumulate ta_prep(this->stats.total_time_prep); - - //------------------------------------------------------------------ - //-- Decide if its possible to continue any processing: - //-- Handle task may get called for incoming buffers, - //-- however, not all ports may have available buffers. - //------------------------------------------------------------------ - if GRAS_UNLIKELY(not this->is_work_allowed()) return; - - const size_t num_inputs = this->get_num_inputs(); - const size_t num_outputs = this->get_num_outputs(); - - //------------------------------------------------------------------ - //-- initialize input buffers before work - //------------------------------------------------------------------ - size_t output_inline_index = 0; - this->input_items.min() = ~0; - this->input_items.max() = 0; - for (size_t i = 0; i < num_inputs; i++) - { - this->sort_tags(i); - this->num_input_msgs_read[i] = 0; - - ASSERT(this->input_queues.ready(i)); - const SBuffer &buff = this->input_queues.front(i); - const void *mem = buff.get(); - size_t items = buff.length/this->input_configs[i].item_size; - - this->input_items[i].get() = mem; - this->input_items[i].size() = items; - this->input_items.min() = std::min(this->input_items.min(), items); - this->input_items.max() = std::max(this->input_items.max(), items); - - //inline dealings, how and when input buffers can be inlined into output buffers - //* - if GRAS_UNLIKELY( - buff.unique() and - input_configs[i].inline_buffer and - output_inline_index < num_outputs and - buff.get_affinity() == this->buffer_affinity - ){ - //copy buffer reference but push with zero length, same offset - SBuffer new_obuff = buff; - new_obuff.length = 0; - this->flush_output(output_inline_index); - this->output_queues.push(output_inline_index, new_obuff); //you got inlined! - output_inline_index++; //done do this output port again - } - //*/ - } - - //------------------------------------------------------------------ - //-- initialize output buffers before work - //------------------------------------------------------------------ - this->output_items.min() = ~0; - this->output_items.max() = 0; - for (size_t i = 0; i < num_outputs; i++) - { - ASSERT(this->output_queues.ready(i)); - SBuffer &buff = this->output_queues.front(i); - ASSERT(buff.length == 0); //assumes it was flushed last call - void *mem = buff.get(); - const size_t bytes = buff.get_actual_length() - buff.offset; - size_t items = bytes/this->output_configs[i].item_size; - - this->output_items[i].get() = mem; - this->output_items[i].size() = items; - this->output_items.min() = std::min(this->output_items.min(), items); - this->output_items.max() = std::max(this->output_items.max(), items); - } - - //------------------------------------------------------------------ - //-- the work - //------------------------------------------------------------------ - ta_prep.done(); - this->stats.work_count++; - if GRAS_UNLIKELY(this->interruptible_thread) - { - TimerAccumulate ta_work(this->stats.total_time_work); - this->interruptible_thread->call(); - } - else - { - TimerAccumulate ta_work(this->stats.total_time_work); - this->task_work(); - } - this->stats.time_last_work = time_now(); - TimerAccumulate ta_post(this->stats.total_time_post); - - //------------------------------------------------------------------ - //-- Post-work output tasks - //------------------------------------------------------------------ - for (size_t i = 0; i < num_outputs; i++) - { - this->flush_output(i); - } - - //------------------------------------------------------------------ - //-- Post-work input tasks - //------------------------------------------------------------------ - for (size_t i = 0; i < num_inputs; i++) - { - this->trim_msgs(i); - - //update the inputs available bit field - this->update_input_avail(i); - - //missing at least one upstream provider? - //since nothing else is coming in, its safe to mark done - if GRAS_UNLIKELY(this->is_input_done(i)) this->mark_done(); - } - - //still have IO ready? kick off another task - this->task_kicker(); -} - -void BlockActor::consume(const size_t i, const size_t items) -{ - #ifdef ITEM_CONSPROD - std::cerr << name << " consume " << items << std::endl; - #endif - this->stats.items_consumed[i] += items; - const size_t bytes = items*this->input_configs[i].item_size; - this->input_queues.consume(i, bytes); - this->trim_tags(i); -} - -void BlockActor::produce(const size_t i, const size_t items) -{ - #ifdef ITEM_CONSPROD - std::cerr << name << " produce " << items << std::endl; - #endif - SBuffer &buff = this->output_queues.front(i); - ASSERT((buff.length % output_configs[i].item_size) == 0); - this->stats.items_produced[i] += items; - const size_t bytes = items*this->output_configs[i].item_size; - buff.length += bytes; - this->produce_outputs[i] = true; -} - -void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) -{ - this->flush_output(i); - ASSERT((buffer.length % output_configs[i].item_size) == 0); - const size_t items = buffer.length/output_configs[i].item_size; - this->stats.items_produced[i] += items; - InputBufferMessage buff_msg; - buff_msg.buffer = buffer; - this->post_downstream(i, buff_msg); -} - -GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i) -{ - if GRAS_UNLIKELY(this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return; - SBuffer &buff = this->output_queues.front(i); - if GRAS_LIKELY(this->produce_outputs[i]) - { - this->produce_outputs[i] = false; - InputBufferMessage buff_msg; - buff_msg.buffer = buff; - this->post_downstream(i, buff_msg); - } - - //increment buffer for next use - buff.offset += buff.length; - buff.length = 0; - - //release whatever has been used of the output buffer - this->output_queues.pop(i); -} diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 00f48f4..a8586c0 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -96,7 +96,7 @@ struct BlockActor : Apology::Worker //helpers void buffer_returner(const size_t index, SBuffer &buffer); void mark_done(void); - void handle_task(void); + void task_main(void); void input_fail(const size_t index); void output_fail(const size_t index); void sort_tags(const size_t index); @@ -106,35 +106,10 @@ struct BlockActor : Apology::Worker void consume(const size_t index, const size_t items); void produce_buffer(const size_t index, const SBuffer &buffer); void flush_output(const size_t index); - - GRAS_FORCE_INLINE void task_kicker(void) - { - if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress()); - } - - GRAS_FORCE_INLINE void update_input_avail(const size_t i) - { - const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i); - const bool has_input_msgs = not this->input_msgs[i].empty(); - this->inputs_available.set(i, has_input_bufs or has_input_msgs); - this->input_queues.update_has_msg(i, has_input_msgs); - } - - GRAS_FORCE_INLINE bool is_input_done(const size_t i) - { - return this->inputs_done[i] and not this->inputs_available[i]; - } - - GRAS_FORCE_INLINE bool is_work_allowed(void) - { - return ( - this->prio_token.unique() and - this->block_state == BLOCK_STATE_LIVE and - this->inputs_available.any() and - this->input_queues.all_ready() and - this->output_queues.all_ready() - ); - } + void task_kicker(void); + void update_input_avail(const size_t index); + bool is_input_done(const size_t index); + bool is_work_allowed(void); //per port properties std::vector<InputPortConfig> input_configs; @@ -192,6 +167,57 @@ struct BlockActor : Apology::Worker BlockStats stats; }; +//-------------- common functions from this BlockActor class ---------// + +GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i) +{ + if GRAS_UNLIKELY(this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return; + SBuffer &buff = this->output_queues.front(i); + if GRAS_LIKELY(this->produce_outputs[i]) + { + this->produce_outputs[i] = false; + InputBufferMessage buff_msg; + buff_msg.buffer = buff; + this->post_downstream(i, buff_msg); + } + + //increment buffer for next use + buff.offset += buff.length; + buff.length = 0; + + //release whatever has been used of the output buffer + this->output_queues.pop(i); +} + +GRAS_FORCE_INLINE void BlockActor::task_kicker(void) +{ + if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress()); +} + +GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i) +{ + const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i); + const bool has_input_msgs = not this->input_msgs[i].empty(); + this->inputs_available.set(i, has_input_bufs or has_input_msgs); + this->input_queues.update_has_msg(i, has_input_msgs); +} + +GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i) +{ + return this->inputs_done[i] and not this->inputs_available[i]; +} + +GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void) +{ + return ( + this->prio_token.unique() and + this->block_state == BLOCK_STATE_LIVE and + this->inputs_available.any() and + this->input_queues.all_ready() and + this->output_queues.all_ready() + ); +} + } //namespace gras #endif /*INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP*/ diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index c82efb3..da16ba4 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -29,7 +29,7 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron:: this->update_input_avail(index); ta.done(); - this->handle_task(); + this->task_main(); } void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Theron::Address) @@ -44,7 +44,7 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th this->update_input_avail(index); ta.done(); - this->handle_task(); + this->task_main(); } void BlockActor::handle_input_token(const InputTokenMessage &message, const Theron::Address) @@ -68,7 +68,7 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther //upstream done, give it one more attempt at task handling ta.done(); - this->handle_task(); + this->task_main(); //now recheck the status, mark block done if the input is done if (this->is_input_done(index)) this->mark_done(); diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp index 17244ee..3c2f971 100644 --- a/lib/output_handlers.cpp +++ b/lib/output_handlers.cpp @@ -18,7 +18,7 @@ void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const this->output_queues.push(index, message.buffer); ta.done(); - this->handle_task(); + this->task_main(); } void BlockActor::handle_output_token(const OutputTokenMessage &message, const Theron::Address) diff --git a/lib/task_done.cpp b/lib/task_done.cpp new file mode 100644 index 0000000..d116202 --- /dev/null +++ b/lib/task_done.cpp @@ -0,0 +1,68 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include "element_impl.hpp" +#include <gras_impl/block_actor.hpp> + +using namespace gras; + +void Block::mark_done(void) +{ + (*this)->block->mark_done(); +} + +void BlockActor::mark_done(void) +{ + if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first + + this->stats.stop_time = time_now(); + this->block_ptr->notify_inactive(); + + //flush partial output buffers to the downstream + for (size_t i = 0; i < this->get_num_outputs(); i++) + { + if (not this->output_queues.ready(i)) continue; + SBuffer &buff = this->output_queues.front(i); + if (buff.length == 0) continue; + InputBufferMessage buff_msg; + buff_msg.buffer = buff; + this->post_downstream(i, buff_msg); + this->output_queues.pop(i); + } + + this->interruptible_thread.reset(); + + //mark down the new state + this->block_state = BLOCK_STATE_DONE; + + //release upstream, downstream, and executor tokens + this->token_pool.clear(); + + //release all buffers in queues + this->input_queues.flush_all(); + this->output_queues.flush_all(); + + //release all tags and msgs + for (size_t i = 0; i < this->get_num_inputs(); i++) + { + this->input_msgs[i].clear(); + this->input_tags[i].clear(); + } + + //tell the upstream and downstram to re-check their tokens + //this is how the other blocks know who is interested, + //and can decide based on interest to set done or not + for (size_t i = 0; i < this->get_num_inputs(); i++) + { + this->post_upstream(i, OutputCheckMessage()); + } + for (size_t i = 0; i < this->get_num_outputs(); i++) + { + this->post_downstream(i, InputCheckMessage()); + } + + if (ARMAGEDDON) std::cerr + << "==================================================\n" + << "== The " << block_ptr->to_string() << " is done...\n" + << "==================================================\n" + << std::flush; +} diff --git a/lib/task_fail.cpp b/lib/task_fail.cpp new file mode 100644 index 0000000..c30b668 --- /dev/null +++ b/lib/task_fail.cpp @@ -0,0 +1,58 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include "element_impl.hpp" +#include <gras_impl/block_actor.hpp> + +using namespace gras; + +void Block::mark_output_fail(const size_t which_output) +{ + (*this)->block->output_fail(which_output); +} + +void Block::mark_input_fail(const size_t which_input) +{ + (*this)->block->input_fail(which_input); +} + +void BlockActor::input_fail(const size_t i) +{ + //input failed, accumulate and try again + if (not this->input_queues.is_accumulated(i)) + { + this->input_queues.accumulate(i); + this->task_kicker(); + return; + } + + //otherwise check for done, else wait for more + if (this->inputs_done[i]) + { + this->mark_done(); + return; + } + + //check that the input is not already maxed + if (this->input_queues.is_front_maximal(i)) + { + throw std::runtime_error("input_fail called on maximum_items buffer"); + } + + //mark fail: not ready until a new buffer appears + this->input_queues.fail(i); +} + +void BlockActor::output_fail(const size_t i) +{ + SBuffer &buff = this->output_queues.front(i); + + //check that the input is not already maxed + const size_t front_items = buff.length/this->output_configs[i].item_size; + if (front_items >= this->output_configs[i].maximum_items) + { + throw std::runtime_error("output_fail called on maximum_items buffer"); + } + + //mark fail: not ready until a new buffer appears + this->output_queues.fail(i); +} diff --git a/lib/task_main.cpp b/lib/task_main.cpp new file mode 100644 index 0000000..0e855a8 --- /dev/null +++ b/lib/task_main.cpp @@ -0,0 +1,124 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include <gras_impl/block_actor.hpp> +#include "tag_handlers.hpp" + +using namespace gras; + +void BlockActor::task_main(void) +{ + TimerAccumulate ta_prep(this->stats.total_time_prep); + + //------------------------------------------------------------------ + //-- Decide if its possible to continue any processing: + //-- Handle task may get called for incoming buffers, + //-- however, not all ports may have available buffers. + //------------------------------------------------------------------ + if GRAS_UNLIKELY(not this->is_work_allowed()) return; + + const size_t num_inputs = this->get_num_inputs(); + const size_t num_outputs = this->get_num_outputs(); + + //------------------------------------------------------------------ + //-- initialize input buffers before work + //------------------------------------------------------------------ + size_t output_inline_index = 0; + this->input_items.min() = ~0; + this->input_items.max() = 0; + for (size_t i = 0; i < num_inputs; i++) + { + this->sort_tags(i); + this->num_input_msgs_read[i] = 0; + + ASSERT(this->input_queues.ready(i)); + const SBuffer &buff = this->input_queues.front(i); + const void *mem = buff.get(); + size_t items = buff.length/this->input_configs[i].item_size; + + this->input_items[i].get() = mem; + this->input_items[i].size() = items; + this->input_items.min() = std::min(this->input_items.min(), items); + this->input_items.max() = std::max(this->input_items.max(), items); + + //inline dealings, how and when input buffers can be inlined into output buffers + //* + if GRAS_UNLIKELY( + buff.unique() and + input_configs[i].inline_buffer and + output_inline_index < num_outputs and + buff.get_affinity() == this->buffer_affinity + ){ + //copy buffer reference but push with zero length, same offset + SBuffer new_obuff = buff; + new_obuff.length = 0; + this->flush_output(output_inline_index); + this->output_queues.push(output_inline_index, new_obuff); //you got inlined! + output_inline_index++; //done do this output port again + } + //*/ + } + + //------------------------------------------------------------------ + //-- initialize output buffers before work + //------------------------------------------------------------------ + this->output_items.min() = ~0; + this->output_items.max() = 0; + for (size_t i = 0; i < num_outputs; i++) + { + ASSERT(this->output_queues.ready(i)); + SBuffer &buff = this->output_queues.front(i); + ASSERT(buff.length == 0); //assumes it was flushed last call + void *mem = buff.get(); + const size_t bytes = buff.get_actual_length() - buff.offset; + size_t items = bytes/this->output_configs[i].item_size; + + this->output_items[i].get() = mem; + this->output_items[i].size() = items; + this->output_items.min() = std::min(this->output_items.min(), items); + this->output_items.max() = std::max(this->output_items.max(), items); + } + + //------------------------------------------------------------------ + //-- the work + //------------------------------------------------------------------ + ta_prep.done(); + this->stats.work_count++; + if GRAS_UNLIKELY(this->interruptible_thread) + { + TimerAccumulate ta_work(this->stats.total_time_work); + this->interruptible_thread->call(); + } + else + { + TimerAccumulate ta_work(this->stats.total_time_work); + this->task_work(); + } + this->stats.time_last_work = time_now(); + TimerAccumulate ta_post(this->stats.total_time_post); + + //------------------------------------------------------------------ + //-- Post-work output tasks + //------------------------------------------------------------------ + for (size_t i = 0; i < num_outputs; i++) + { + this->flush_output(i); + } + + //------------------------------------------------------------------ + //-- Post-work input tasks + //------------------------------------------------------------------ + for (size_t i = 0; i < num_inputs; i++) + { + this->trim_msgs(i); + + //update the inputs available bit field + this->update_input_avail(i); + + //missing at least one upstream provider? + //since nothing else is coming in, its safe to mark done + if GRAS_UNLIKELY(this->is_input_done(i)) this->mark_done(); + } + + //still have IO ready? kick off another task + this->task_kicker(); +} |