From 9ded08b79b35fdfd67ad4968121d282e6c3dccda Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Mon, 11 Mar 2013 00:43:43 -0700 Subject: gras: separate queue for messages - #54 --- lib/block.cpp | 14 ++++++++------ lib/gras_impl/block_actor.hpp | 3 +++ lib/gras_impl/messages.hpp | 8 ++++++++ lib/gras_impl/stats.hpp | 2 ++ lib/input_handlers.cpp | 14 +++++++++++--- lib/register_messages.cpp | 1 + lib/tag_handlers.hpp | 2 +- lib/top_block_stats.cpp | 8 ++++++++ lib/topology_handler.cpp | 3 +++ 9 files changed, 45 insertions(+), 10 deletions(-) (limited to 'lib') diff --git a/lib/block.cpp b/lib/block.cpp index e9e170b..9bf3b9c 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -162,7 +162,7 @@ item_index_t Block::get_produced(const size_t which_output) void Block::post_output_tag(const size_t which_output, const Tag &tag) { - (*this)->block->stats.items_produced[which_output]++; + (*this)->block->stats.tags_produced[which_output]++; (*this)->block->post_downstream(which_output, InputTagMessage(tag)); } @@ -174,15 +174,17 @@ TagIter Block::get_input_tags(const size_t which_input) void Block::post_output_msg(const size_t which_output, const PMCC &msg) { - this->post_output_tag(which_output, Tag(0, msg)); + (*this)->block->stats.msgs_produced[which_output]++; + (*this)->block->post_downstream(which_output, InputMsgMessage(msg)); } PMCC Block::pop_input_msg(const size_t which_input) { - std::vector &input_tags = (*this)->block->input_tags[which_input]; - if (input_tags.empty()) return PMCC(); - PMCC p = input_tags.front().object; - input_tags.erase(input_tags.begin()); + std::vector &input_msgs = (*this)->block->input_msgs[which_input]; + if (input_msgs.empty()) return PMCC(); + PMCC p = input_msgs.front(); + input_msgs.erase(input_msgs.begin()); + (*this)->block->stats.msgs_consumed[which_input]++; return p; } diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index d40152a..ff285e0 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -43,6 +43,7 @@ struct BlockActor : Apology::Worker this->RegisterHandler(this, &BlockActor::handle_top_thread_group); this->RegisterHandler(this, &BlockActor::handle_input_tag); + this->RegisterHandler(this, &BlockActor::handle_input_msg); this->RegisterHandler(this, &BlockActor::handle_input_buffer); this->RegisterHandler(this, &BlockActor::handle_input_token); this->RegisterHandler(this, &BlockActor::handle_input_check); @@ -71,6 +72,7 @@ struct BlockActor : Apology::Worker void handle_top_thread_group(const SharedThreadGroup &, const Theron::Address); void handle_input_tag(const InputTagMessage &, const Theron::Address); + void handle_input_msg(const InputMsgMessage &, const Theron::Address); void handle_input_buffer(const InputBufferMessage &, const Theron::Address); void handle_input_token(const InputTokenMessage &, const Theron::Address); void handle_input_check(const InputCheckMessage &, const Theron::Address); @@ -145,6 +147,7 @@ struct BlockActor : Apology::Worker //tag tracking std::vector input_tags_changed; std::vector > input_tags; + std::vector > input_msgs; //interruptible thread stuff bool interruptible_work; diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index 386cd0a..86b6a93 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -50,6 +50,13 @@ struct InputTagMessage Tag tag; }; +struct InputMsgMessage +{ + InputMsgMessage(const PMCC &msg):msg(msg){} + size_t index; + PMCC msg; +}; + struct InputBufferMessage { size_t index; @@ -151,6 +158,7 @@ THERON_DECLARE_REGISTERED_MESSAGE(gras::GlobalBlockConfig); THERON_DECLARE_REGISTERED_MESSAGE(gras::SharedThreadGroup); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputTagMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gras::InputMsgMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputBufferMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputTokenMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputCheckMessage); diff --git a/lib/gras_impl/stats.hpp b/lib/gras_impl/stats.hpp index 03fdf3a..b7b4553 100644 --- a/lib/gras_impl/stats.hpp +++ b/lib/gras_impl/stats.hpp @@ -29,8 +29,10 @@ struct BlockStats std::vector items_consumed; std::vector tags_consumed; + std::vector msgs_consumed; std::vector items_produced; std::vector tags_produced; + std::vector msgs_produced; item_index_t work_count; time_ticks_t time_last_work; diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index 56ead46..e55f72e 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -13,11 +13,19 @@ void BlockActor::handle_input_tag(const InputTagMessage &message, const Theron:: //handle incoming stream tag, push into the tag storage this->input_tags[index].push_back(message.tag); + this->input_tags_changed[index] = true; +} - //Changed is a boolean to enable sorting of tags: If the offset is 0, there is nothing to sort, - //because tags are being used for message passing, or this is just the first tag in a stream. - this->input_tags_changed[index] = this->input_tags_changed[index] or message.tag.offset != 0; +void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::Address) +{ + TimerAccumulate ta(this->stats.total_time_input); + MESSAGE_TRACER(); + const size_t index = message.index; + + //handle incoming async message, push into the msg storage + this->input_msgs[index].push_back(message.msg); this->inputs_available.set(index); + ta.done(); this->handle_task(); } diff --git a/lib/register_messages.cpp b/lib/register_messages.cpp index f68bc14..e502e5b 100644 --- a/lib/register_messages.cpp +++ b/lib/register_messages.cpp @@ -12,6 +12,7 @@ THERON_DEFINE_REGISTERED_MESSAGE(gras::GlobalBlockConfig); THERON_DEFINE_REGISTERED_MESSAGE(gras::SharedThreadGroup); THERON_DEFINE_REGISTERED_MESSAGE(gras::InputTagMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gras::InputMsgMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::InputBufferMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::InputTokenMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::InputCheckMessage); diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp index d42ce02..73c9fb7 100644 --- a/lib/tag_handlers.hpp +++ b/lib/tag_handlers.hpp @@ -39,7 +39,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) //now its safe to perform the erasure tags_i.erase(tags_i.begin(), tags_i.begin()+last); - this->stats.items_consumed[i] += last; + this->stats.tags_consumed[i] += last; } } //namespace gras diff --git a/lib/top_block_stats.cpp b/lib/top_block_stats.cpp index 3cadfcb..76e87ad 100644 --- a/lib/top_block_stats.cpp +++ b/lib/top_block_stats.cpp @@ -52,6 +52,10 @@ std::string TopBlock::get_stats(const std::string &) { block_xml += str(boost::format(" %llu\n") % stats.tags_consumed[i]); } + for (size_t i = 0; i < stats.msgs_consumed.size(); i++) + { + block_xml += str(boost::format(" %llu\n") % stats.msgs_consumed[i]); + } for (size_t i = 0; i < stats.items_produced.size(); i++) { block_xml += str(boost::format(" %llu\n") % stats.items_produced[i]); @@ -60,6 +64,10 @@ std::string TopBlock::get_stats(const std::string &) { block_xml += str(boost::format(" %llu\n") % stats.tags_produced[i]); } + for (size_t i = 0; i < stats.msgs_produced.size(); i++) + { + block_xml += str(boost::format(" %llu\n") % stats.msgs_produced[i]); + } xml += str(boost::format(" \n%s\n") % message.block_id % block_xml); } return str(boost::format("\n%s") % this->to_string() % xml); diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index 1696f27..ac4d859 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -37,8 +37,10 @@ void BlockActor::handle_topology( //resize the bytes consumed/produced resize_fill_grow(this->stats.items_consumed, num_inputs, 0); resize_fill_grow(this->stats.tags_consumed, num_inputs, 0); + resize_fill_grow(this->stats.msgs_consumed, num_inputs, 0); resize_fill_grow(this->stats.items_produced, num_outputs, 0); resize_fill_grow(this->stats.tags_produced, num_outputs, 0); + resize_fill_grow(this->stats.msgs_produced, num_outputs, 0); //resize all work buffers to match current connections this->input_items.resize(num_inputs); @@ -62,6 +64,7 @@ void BlockActor::handle_topology( //resize tags vector to match sizes this->input_tags_changed.resize(num_inputs); this->input_tags.resize(num_inputs); + this->input_msgs.resize(num_inputs); //a block looses all connections, allow it to free if (num_inputs == 0 and num_outputs == 0) -- cgit