From 277dd31b08afcadceec7852012aa8b3c2cecbea7 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Sun, 14 Apr 2013 02:07:40 -0700 Subject: gras: move code into component files --- lib/gras_impl/block_actor.hpp | 86 ++++++++++++++++++++++++++++--------------- 1 file changed, 56 insertions(+), 30 deletions(-) (limited to 'lib/gras_impl') diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 00f48f4..a8586c0 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -96,7 +96,7 @@ struct BlockActor : Apology::Worker //helpers void buffer_returner(const size_t index, SBuffer &buffer); void mark_done(void); - void handle_task(void); + void task_main(void); void input_fail(const size_t index); void output_fail(const size_t index); void sort_tags(const size_t index); @@ -106,35 +106,10 @@ struct BlockActor : Apology::Worker void consume(const size_t index, const size_t items); void produce_buffer(const size_t index, const SBuffer &buffer); void flush_output(const size_t index); - - GRAS_FORCE_INLINE void task_kicker(void) - { - if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress()); - } - - GRAS_FORCE_INLINE void update_input_avail(const size_t i) - { - const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i); - const bool has_input_msgs = not this->input_msgs[i].empty(); - this->inputs_available.set(i, has_input_bufs or has_input_msgs); - this->input_queues.update_has_msg(i, has_input_msgs); - } - - GRAS_FORCE_INLINE bool is_input_done(const size_t i) - { - return this->inputs_done[i] and not this->inputs_available[i]; - } - - GRAS_FORCE_INLINE bool is_work_allowed(void) - { - return ( - this->prio_token.unique() and - this->block_state == BLOCK_STATE_LIVE and - this->inputs_available.any() and - this->input_queues.all_ready() and - this->output_queues.all_ready() - ); - } + void task_kicker(void); + void update_input_avail(const size_t index); + bool is_input_done(const size_t index); + bool is_work_allowed(void); //per port properties std::vector input_configs; @@ -192,6 +167,57 @@ struct BlockActor : Apology::Worker BlockStats stats; }; +//-------------- common functions from this BlockActor class ---------// + +GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i) +{ + if GRAS_UNLIKELY(this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return; + SBuffer &buff = this->output_queues.front(i); + if GRAS_LIKELY(this->produce_outputs[i]) + { + this->produce_outputs[i] = false; + InputBufferMessage buff_msg; + buff_msg.buffer = buff; + this->post_downstream(i, buff_msg); + } + + //increment buffer for next use + buff.offset += buff.length; + buff.length = 0; + + //release whatever has been used of the output buffer + this->output_queues.pop(i); +} + +GRAS_FORCE_INLINE void BlockActor::task_kicker(void) +{ + if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress()); +} + +GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i) +{ + const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i); + const bool has_input_msgs = not this->input_msgs[i].empty(); + this->inputs_available.set(i, has_input_bufs or has_input_msgs); + this->input_queues.update_has_msg(i, has_input_msgs); +} + +GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i) +{ + return this->inputs_done[i] and not this->inputs_available[i]; +} + +GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void) +{ + return ( + this->prio_token.unique() and + this->block_state == BLOCK_STATE_LIVE and + this->inputs_available.any() and + this->input_queues.all_ready() and + this->output_queues.all_ready() + ); +} + } //namespace gras #endif /*INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP*/ -- cgit