summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2013-03-11 00:43:43 -0700
committerJosh Blum2013-03-11 00:43:43 -0700
commit9ded08b79b35fdfd67ad4968121d282e6c3dccda (patch)
treec47ba09f6db23a6f7a07442ab56a9d7ad96ecc66
parent83d78cdcaefe18555e83dc7cc810422616054527 (diff)
downloadsandhi-9ded08b79b35fdfd67ad4968121d282e6c3dccda.tar.gz
sandhi-9ded08b79b35fdfd67ad4968121d282e6c3dccda.tar.bz2
sandhi-9ded08b79b35fdfd67ad4968121d282e6c3dccda.zip
gras: separate queue for messages - #54
-rw-r--r--lib/block.cpp14
-rw-r--r--lib/gras_impl/block_actor.hpp3
-rw-r--r--lib/gras_impl/messages.hpp8
-rw-r--r--lib/gras_impl/stats.hpp2
-rw-r--r--lib/input_handlers.cpp14
-rw-r--r--lib/register_messages.cpp1
-rw-r--r--lib/tag_handlers.hpp2
-rw-r--r--lib/top_block_stats.cpp8
-rw-r--r--lib/topology_handler.cpp3
9 files changed, 45 insertions, 10 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index e9e170b..9bf3b9c 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -162,7 +162,7 @@ item_index_t Block::get_produced(const size_t which_output)
void Block::post_output_tag(const size_t which_output, const Tag &tag)
{
- (*this)->block->stats.items_produced[which_output]++;
+ (*this)->block->stats.tags_produced[which_output]++;
(*this)->block->post_downstream(which_output, InputTagMessage(tag));
}
@@ -174,15 +174,17 @@ TagIter Block::get_input_tags(const size_t which_input)
void Block::post_output_msg(const size_t which_output, const PMCC &msg)
{
- this->post_output_tag(which_output, Tag(0, msg));
+ (*this)->block->stats.msgs_produced[which_output]++;
+ (*this)->block->post_downstream(which_output, InputMsgMessage(msg));
}
PMCC Block::pop_input_msg(const size_t which_input)
{
- std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input];
- if (input_tags.empty()) return PMCC();
- PMCC p = input_tags.front().object;
- input_tags.erase(input_tags.begin());
+ std::vector<PMCC> &input_msgs = (*this)->block->input_msgs[which_input];
+ if (input_msgs.empty()) return PMCC();
+ PMCC p = input_msgs.front();
+ input_msgs.erase(input_msgs.begin());
+ (*this)->block->stats.msgs_consumed[which_input]++;
return p;
}
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index d40152a..ff285e0 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -43,6 +43,7 @@ struct BlockActor : Apology::Worker
this->RegisterHandler(this, &BlockActor::handle_top_thread_group);
this->RegisterHandler(this, &BlockActor::handle_input_tag);
+ this->RegisterHandler(this, &BlockActor::handle_input_msg);
this->RegisterHandler(this, &BlockActor::handle_input_buffer);
this->RegisterHandler(this, &BlockActor::handle_input_token);
this->RegisterHandler(this, &BlockActor::handle_input_check);
@@ -71,6 +72,7 @@ struct BlockActor : Apology::Worker
void handle_top_thread_group(const SharedThreadGroup &, const Theron::Address);
void handle_input_tag(const InputTagMessage &, const Theron::Address);
+ void handle_input_msg(const InputMsgMessage &, const Theron::Address);
void handle_input_buffer(const InputBufferMessage &, const Theron::Address);
void handle_input_token(const InputTokenMessage &, const Theron::Address);
void handle_input_check(const InputCheckMessage &, const Theron::Address);
@@ -145,6 +147,7 @@ struct BlockActor : Apology::Worker
//tag tracking
std::vector<bool> input_tags_changed;
std::vector<std::vector<Tag> > input_tags;
+ std::vector<std::vector<PMCC> > input_msgs;
//interruptible thread stuff
bool interruptible_work;
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index 386cd0a..86b6a93 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -50,6 +50,13 @@ struct InputTagMessage
Tag tag;
};
+struct InputMsgMessage
+{
+ InputMsgMessage(const PMCC &msg):msg(msg){}
+ size_t index;
+ PMCC msg;
+};
+
struct InputBufferMessage
{
size_t index;
@@ -151,6 +158,7 @@ THERON_DECLARE_REGISTERED_MESSAGE(gras::GlobalBlockConfig);
THERON_DECLARE_REGISTERED_MESSAGE(gras::SharedThreadGroup);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputTagMessage);
+THERON_DECLARE_REGISTERED_MESSAGE(gras::InputMsgMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputBufferMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputTokenMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputCheckMessage);
diff --git a/lib/gras_impl/stats.hpp b/lib/gras_impl/stats.hpp
index 03fdf3a..b7b4553 100644
--- a/lib/gras_impl/stats.hpp
+++ b/lib/gras_impl/stats.hpp
@@ -29,8 +29,10 @@ struct BlockStats
std::vector<item_index_t> items_consumed;
std::vector<item_index_t> tags_consumed;
+ std::vector<item_index_t> msgs_consumed;
std::vector<item_index_t> items_produced;
std::vector<item_index_t> tags_produced;
+ std::vector<item_index_t> msgs_produced;
item_index_t work_count;
time_ticks_t time_last_work;
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index 56ead46..e55f72e 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -13,11 +13,19 @@ void BlockActor::handle_input_tag(const InputTagMessage &message, const Theron::
//handle incoming stream tag, push into the tag storage
this->input_tags[index].push_back(message.tag);
+ this->input_tags_changed[index] = true;
+}
- //Changed is a boolean to enable sorting of tags: If the offset is 0, there is nothing to sort,
- //because tags are being used for message passing, or this is just the first tag in a stream.
- this->input_tags_changed[index] = this->input_tags_changed[index] or message.tag.offset != 0;
+void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::Address)
+{
+ TimerAccumulate ta(this->stats.total_time_input);
+ MESSAGE_TRACER();
+ const size_t index = message.index;
+
+ //handle incoming async message, push into the msg storage
+ this->input_msgs[index].push_back(message.msg);
this->inputs_available.set(index);
+
ta.done();
this->handle_task();
}
diff --git a/lib/register_messages.cpp b/lib/register_messages.cpp
index f68bc14..e502e5b 100644
--- a/lib/register_messages.cpp
+++ b/lib/register_messages.cpp
@@ -12,6 +12,7 @@ THERON_DEFINE_REGISTERED_MESSAGE(gras::GlobalBlockConfig);
THERON_DEFINE_REGISTERED_MESSAGE(gras::SharedThreadGroup);
THERON_DEFINE_REGISTERED_MESSAGE(gras::InputTagMessage);
+THERON_DEFINE_REGISTERED_MESSAGE(gras::InputMsgMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::InputBufferMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::InputTokenMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::InputCheckMessage);
diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp
index d42ce02..73c9fb7 100644
--- a/lib/tag_handlers.hpp
+++ b/lib/tag_handlers.hpp
@@ -39,7 +39,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
//now its safe to perform the erasure
tags_i.erase(tags_i.begin(), tags_i.begin()+last);
- this->stats.items_consumed[i] += last;
+ this->stats.tags_consumed[i] += last;
}
} //namespace gras
diff --git a/lib/top_block_stats.cpp b/lib/top_block_stats.cpp
index 3cadfcb..76e87ad 100644
--- a/lib/top_block_stats.cpp
+++ b/lib/top_block_stats.cpp
@@ -52,6 +52,10 @@ std::string TopBlock::get_stats(const std::string &)
{
block_xml += str(boost::format(" <tags_consumed>%llu</tags_consumed>\n") % stats.tags_consumed[i]);
}
+ for (size_t i = 0; i < stats.msgs_consumed.size(); i++)
+ {
+ block_xml += str(boost::format(" <msgs_consumed>%llu</msgs_consumed>\n") % stats.msgs_consumed[i]);
+ }
for (size_t i = 0; i < stats.items_produced.size(); i++)
{
block_xml += str(boost::format(" <items_produced>%llu</items_produced>\n") % stats.items_produced[i]);
@@ -60,6 +64,10 @@ std::string TopBlock::get_stats(const std::string &)
{
block_xml += str(boost::format(" <tags_produced>%llu</tags_produced>\n") % stats.tags_produced[i]);
}
+ for (size_t i = 0; i < stats.msgs_produced.size(); i++)
+ {
+ block_xml += str(boost::format(" <msgs_produced>%llu</msgs_produced>\n") % stats.msgs_produced[i]);
+ }
xml += str(boost::format(" <block id=\"%s\">\n%s</block>\n") % message.block_id % block_xml);
}
return str(boost::format("<gras_stats id=\"%s\">\n%s</gras_stats>") % this->to_string() % xml);
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index 1696f27..ac4d859 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -37,8 +37,10 @@ void BlockActor::handle_topology(
//resize the bytes consumed/produced
resize_fill_grow(this->stats.items_consumed, num_inputs, 0);
resize_fill_grow(this->stats.tags_consumed, num_inputs, 0);
+ resize_fill_grow(this->stats.msgs_consumed, num_inputs, 0);
resize_fill_grow(this->stats.items_produced, num_outputs, 0);
resize_fill_grow(this->stats.tags_produced, num_outputs, 0);
+ resize_fill_grow(this->stats.msgs_produced, num_outputs, 0);
//resize all work buffers to match current connections
this->input_items.resize(num_inputs);
@@ -62,6 +64,7 @@ void BlockActor::handle_topology(
//resize tags vector to match sizes
this->input_tags_changed.resize(num_inputs);
this->input_tags.resize(num_inputs);
+ this->input_msgs.resize(num_inputs);
//a block looses all connections, allow it to free
if (num_inputs == 0 and num_outputs == 0)