summaryrefslogtreecommitdiff
path: root/lib/gras_impl/block_actor.hpp
diff options
context:
space:
mode:
authorJosh Blum2013-04-14 02:07:40 -0700
committerJosh Blum2013-04-14 02:07:40 -0700
commit277dd31b08afcadceec7852012aa8b3c2cecbea7 (patch)
treeb5c06f46c13365dbe2489638496d867221e3ef8c /lib/gras_impl/block_actor.hpp
parent82af15c5e7a69b116214cb6de99f9095852934d0 (diff)
downloadsandhi-277dd31b08afcadceec7852012aa8b3c2cecbea7.tar.gz
sandhi-277dd31b08afcadceec7852012aa8b3c2cecbea7.tar.bz2
sandhi-277dd31b08afcadceec7852012aa8b3c2cecbea7.zip
gras: move code into component files
Diffstat (limited to 'lib/gras_impl/block_actor.hpp')
-rw-r--r--lib/gras_impl/block_actor.hpp86
1 files changed, 56 insertions, 30 deletions
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 00f48f4..a8586c0 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -96,7 +96,7 @@ struct BlockActor : Apology::Worker
//helpers
void buffer_returner(const size_t index, SBuffer &buffer);
void mark_done(void);
- void handle_task(void);
+ void task_main(void);
void input_fail(const size_t index);
void output_fail(const size_t index);
void sort_tags(const size_t index);
@@ -106,35 +106,10 @@ struct BlockActor : Apology::Worker
void consume(const size_t index, const size_t items);
void produce_buffer(const size_t index, const SBuffer &buffer);
void flush_output(const size_t index);
-
- GRAS_FORCE_INLINE void task_kicker(void)
- {
- if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress());
- }
-
- GRAS_FORCE_INLINE void update_input_avail(const size_t i)
- {
- const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i);
- const bool has_input_msgs = not this->input_msgs[i].empty();
- this->inputs_available.set(i, has_input_bufs or has_input_msgs);
- this->input_queues.update_has_msg(i, has_input_msgs);
- }
-
- GRAS_FORCE_INLINE bool is_input_done(const size_t i)
- {
- return this->inputs_done[i] and not this->inputs_available[i];
- }
-
- GRAS_FORCE_INLINE bool is_work_allowed(void)
- {
- return (
- this->prio_token.unique() and
- this->block_state == BLOCK_STATE_LIVE and
- this->inputs_available.any() and
- this->input_queues.all_ready() and
- this->output_queues.all_ready()
- );
- }
+ void task_kicker(void);
+ void update_input_avail(const size_t index);
+ bool is_input_done(const size_t index);
+ bool is_work_allowed(void);
//per port properties
std::vector<InputPortConfig> input_configs;
@@ -192,6 +167,57 @@ struct BlockActor : Apology::Worker
BlockStats stats;
};
+//-------------- common functions from this BlockActor class ---------//
+
+GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i)
+{
+ if GRAS_UNLIKELY(this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return;
+ SBuffer &buff = this->output_queues.front(i);
+ if GRAS_LIKELY(this->produce_outputs[i])
+ {
+ this->produce_outputs[i] = false;
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = buff;
+ this->post_downstream(i, buff_msg);
+ }
+
+ //increment buffer for next use
+ buff.offset += buff.length;
+ buff.length = 0;
+
+ //release whatever has been used of the output buffer
+ this->output_queues.pop(i);
+}
+
+GRAS_FORCE_INLINE void BlockActor::task_kicker(void)
+{
+ if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress());
+}
+
+GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i)
+{
+ const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i);
+ const bool has_input_msgs = not this->input_msgs[i].empty();
+ this->inputs_available.set(i, has_input_bufs or has_input_msgs);
+ this->input_queues.update_has_msg(i, has_input_msgs);
+}
+
+GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i)
+{
+ return this->inputs_done[i] and not this->inputs_available[i];
+}
+
+GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void)
+{
+ return (
+ this->prio_token.unique() and
+ this->block_state == BLOCK_STATE_LIVE and
+ this->inputs_available.any() and
+ this->input_queues.all_ready() and
+ this->output_queues.all_ready()
+ );
+}
+
} //namespace gras
#endif /*INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP*/