summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2013-02-17 23:01:03 -0600
committerJosh Blum2013-02-17 23:01:03 -0600
commitf8cca035aacf1578b99f6ffa1bb99ffb052ac27c (patch)
tree37f906338284ec6a48886079f4d8621549b0971b /lib
parentafd0575bc2eef7f2a97c8ce4cadb328ceb491d10 (diff)
downloadsandhi-f8cca035aacf1578b99f6ffa1bb99ffb052ac27c.tar.gz
sandhi-f8cca035aacf1578b99f6ffa1bb99ffb052ac27c.tar.bz2
sandhi-f8cca035aacf1578b99f6ffa1bb99ffb052ac27c.zip
gras: added stats for produced/consumed
Diffstat (limited to 'lib')
-rw-r--r--lib/CMakeLists.txt1
-rw-r--r--lib/block.cpp6
-rw-r--r--lib/block_task.cpp6
-rw-r--r--lib/gras_impl/block_actor.hpp4
-rw-r--r--lib/gras_impl/stats.hpp8
-rw-r--r--lib/tag_handlers.hpp3
-rw-r--r--lib/top_block.cpp37
-rw-r--r--lib/top_block_stats.cpp62
-rw-r--r--lib/topology_handler.cpp6
9 files changed, 83 insertions, 50 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index bc38d06..69ada7c 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -55,6 +55,7 @@ list(APPEND GRAS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/output_handlers.cpp
${CMAKE_CURRENT_SOURCE_DIR}/hier_block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/top_block.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/top_block_stats.cpp
${CMAKE_CURRENT_SOURCE_DIR}/register_messages.cpp
)
diff --git a/lib/block.cpp b/lib/block.cpp
index 716873b..f5ba6ce 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -142,16 +142,17 @@ void Block::produce(const size_t num_items)
item_index_t Block::get_consumed(const size_t which_input)
{
- return (*this)->block->items_consumed[which_input];
+ return (*this)->block->stats.items_consumed[which_input];
}
item_index_t Block::get_produced(const size_t which_output)
{
- return (*this)->block->items_produced[which_output];
+ return (*this)->block->stats.items_produced[which_output];
}
void Block::post_output_tag(const size_t which_output, const Tag &tag)
{
+ (*this)->block->stats.items_produced[which_output]++;
(*this)->block->post_downstream(which_output, InputTagMessage(tag));
}
@@ -172,6 +173,7 @@ PMCC Block::pop_input_msg(const size_t which_input)
if (input_tags.empty()) return PMCC();
PMCC p = input_tags.front().object;
input_tags.erase(input_tags.begin());
+ (*this)->block->stats.items_consumed[which_input]++;
return p;
}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 67ff308..ca106ab 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -229,7 +229,7 @@ void BlockActor::consume(const size_t i, const size_t items)
#ifdef ITEM_CONSPROD
std::cerr << name << " consume " << items << std::endl;
#endif
- this->items_consumed[i] += items;
+ this->stats.items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
this->input_queues.consume(i, bytes);
this->trim_tags(i);
@@ -241,7 +241,7 @@ void BlockActor::produce(const size_t i, const size_t items)
std::cerr << name << " produce " << items << std::endl;
#endif
SBuffer &buff = this->output_queues.front(i);
- this->items_produced[i] += items;
+ this->stats.items_produced[i] += items;
const size_t bytes = items*this->output_items_sizes[i];
buff.length += bytes;
}
@@ -250,7 +250,7 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
{
this->flush_output(i);
const size_t items = buffer.length/output_items_sizes[i];
- this->items_produced[i] += items;
+ this->stats.items_produced[i] += items;
InputBufferMessage buff_msg;
buff_msg.buffer = buffer;
this->post_downstream(i, buff_msg);
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 2aee666..ba69567 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -127,10 +127,6 @@ struct BlockActor : Apology::Worker
std::vector<InputPortConfig> input_configs;
std::vector<OutputPortConfig> output_configs;
- //keeps track of production
- std::vector<item_index_t> items_consumed;
- std::vector<item_index_t> items_produced;
-
//work buffers for the new work interface
Block::InputItems input_items;
Block::OutputItems output_items;
diff --git a/lib/gras_impl/stats.hpp b/lib/gras_impl/stats.hpp
index 138b231..9c9c8e1 100644
--- a/lib/gras_impl/stats.hpp
+++ b/lib/gras_impl/stats.hpp
@@ -4,6 +4,7 @@
#define INCLUDED_LIBGRAS_IMPL_STATS_HPP
#include <gras/chrono.hpp>
+#include <vector>
namespace gras
{
@@ -13,7 +14,12 @@ struct BlockStats
time_ticks_t start_time;
time_ticks_t stop_time;
- size_t work_count;
+ std::vector<item_index_t> items_consumed;
+ std::vector<item_index_t> tags_consumed;
+ std::vector<item_index_t> items_produced;
+ std::vector<item_index_t> tags_produced;
+
+ item_index_t work_count;
time_ticks_t time_last_work;
time_ticks_t total_time_work;
time_ticks_t total_time_work_other;
diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp
index e2eb3b9..d42ce02 100644
--- a/lib/tag_handlers.hpp
+++ b/lib/tag_handlers.hpp
@@ -25,7 +25,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
//------------------------------------------------------------------
std::vector<Tag> &tags_i = this->input_tags[i];
- const size_t items_consumed_i = this->items_consumed[i];
+ const size_t items_consumed_i = this->stats.items_consumed[i];
size_t last = 0;
while (last < tags_i.size() and tags_i[last].offset < items_consumed_i)
{
@@ -39,6 +39,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
//now its safe to perform the erasure
tags_i.erase(tags_i.begin(), tags_i.begin()+last);
+ this->stats.items_consumed[i] += last;
}
} //namespace gras
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 714ff4a..4d2f956 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -3,8 +3,6 @@
#include "element_impl.hpp"
#include <gras/top_block.hpp>
#include <boost/thread/thread.hpp> //sleep
-#include <boost/foreach.hpp>
-#include <boost/format.hpp>
using namespace gras;
@@ -139,41 +137,6 @@ bool TopBlock::wait(const double timeout)
return (*this)->token.unique();
}
-///////////////////////// Stats gathering interface ////////////////////////
-
-struct GetStatsReceiver : Theron::Receiver
-{
- GetStatsReceiver(void)
- {
- this->RegisterHandler(this, &GetStatsReceiver::handle_get_stats);
- }
-
- void handle_get_stats(const GetStatsMessage &message, const Theron::Address)
- {
- this->messages.push_back(message);
- }
-
- std::vector<GetStatsMessage> messages;
-};
-
-std::string TopBlock::get_stats_xml(void)
-{
- GetStatsReceiver receiver;
- (*this)->executor->post_all(GetStatsMessage(), receiver);
- std::string xml;
- xml += str(boost::format(" <now>%ull</now>\n") % time_now());
- xml += str(boost::format(" <tps>%ull</tps>\n") % time_tps());
- BOOST_FOREACH(const GetStatsMessage &message, receiver.messages)
- {
- std::string block_xml;
- block_xml += str(boost::format(" <id>%s</id>\n") % message.block_id);
- block_xml += str(boost::format(" <start_time>%llu</start_time>\n") % message.stats.start_time);
- block_xml += str(boost::format(" <stop_time>%llu</stop_time>\n") % message.stats.stop_time);
- xml += str(boost::format(" <block>\n%s</block>\n") % block_xml);
- }
- return str(boost::format("<gras_stats>\n%s</gras_stats>") % xml);
-}
-
///////////////////////// Deprecated interfaces ////////////////////////
void TopBlock::start(const size_t max_items)
diff --git a/lib/top_block_stats.cpp b/lib/top_block_stats.cpp
new file mode 100644
index 0000000..5a8790e
--- /dev/null
+++ b/lib/top_block_stats.cpp
@@ -0,0 +1,62 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include "element_impl.hpp"
+#include <gras/top_block.hpp>
+#include <boost/foreach.hpp>
+#include <boost/format.hpp>
+
+using namespace gras;
+
+struct GetStatsReceiver : Theron::Receiver
+{
+ GetStatsReceiver(void)
+ {
+ this->RegisterHandler(this, &GetStatsReceiver::handle_get_stats);
+ }
+
+ void handle_get_stats(const GetStatsMessage &message, const Theron::Address)
+ {
+ this->messages.push_back(message);
+ }
+
+ std::vector<GetStatsMessage> messages;
+};
+
+std::string TopBlock::get_stats_xml(void)
+{
+ GetStatsReceiver receiver;
+ (*this)->executor->post_all(GetStatsMessage(), receiver);
+ std::string xml;
+ xml += str(boost::format(" <now>%ull</now>\n") % time_now());
+ xml += str(boost::format(" <tps>%ull</tps>\n") % time_tps());
+ BOOST_FOREACH(const GetStatsMessage &message, receiver.messages)
+ {
+ const BlockStats &stats = message.stats;
+ std::string block_xml;
+ block_xml += str(boost::format(" <id>%s</id>\n") % message.block_id);
+ block_xml += str(boost::format(" <start_time>%llu</start_time>\n") % stats.start_time);
+ block_xml += str(boost::format(" <stop_time>%llu</stop_time>\n") % stats.stop_time);
+ block_xml += str(boost::format(" <work_count>%llu</work_count>\n") % stats.work_count);
+ block_xml += str(boost::format(" <time_last_work>%llu</time_last_work>\n") % stats.time_last_work);
+ block_xml += str(boost::format(" <total_time_work>%llu</total_time_work>\n") % stats.total_time_work);
+ block_xml += str(boost::format(" <total_time_work_other>%llu</total_time_work_other>\n") % stats.total_time_work_other);
+ for (size_t i = 0; i < stats.items_consumed.size(); i++)
+ {
+ block_xml += str(boost::format(" <input_items_consumed>%llu</input_items_consumed>\n") % stats.items_consumed[i]);
+ }
+ for (size_t i = 0; i < stats.tags_consumed.size(); i++)
+ {
+ block_xml += str(boost::format(" <tags_consumed>%llu</tags_consumed>\n") % stats.tags_consumed[i]);
+ }
+ for (size_t i = 0; i < stats.items_produced.size(); i++)
+ {
+ block_xml += str(boost::format(" <items_produced>%llu</items_produced>\n") % stats.items_produced[i]);
+ }
+ for (size_t i = 0; i < stats.tags_produced.size(); i++)
+ {
+ block_xml += str(boost::format(" <tags_produced>%llu</tags_produced>\n") % stats.tags_produced[i]);
+ }
+ xml += str(boost::format(" <block>\n%s</block>\n") % block_xml);
+ }
+ return str(boost::format("<gras_stats>\n%s</gras_stats>") % xml);
+}
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index f813b57..06e0b60 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -52,8 +52,10 @@ void BlockActor::handle_topology(
resize_fill_back(this->output_configs, num_outputs);
//resize the bytes consumed/produced
- resize_fill_grow(this->items_consumed, num_inputs, 0);
- resize_fill_grow(this->items_produced, num_outputs, 0);
+ resize_fill_grow(this->stats.items_consumed, num_inputs, 0);
+ resize_fill_grow(this->stats.tags_consumed, num_inputs, 0);
+ resize_fill_grow(this->stats.items_produced, num_outputs, 0);
+ resize_fill_grow(this->stats.tags_produced, num_outputs, 0);
//resize all work buffers to match current connections
this->input_items.resize(num_inputs);