From f8cca035aacf1578b99f6ffa1bb99ffb052ac27c Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Sun, 17 Feb 2013 23:01:03 -0600 Subject: gras: added stats for produced/consumed --- lib/CMakeLists.txt | 1 + lib/block.cpp | 6 +++-- lib/block_task.cpp | 6 ++--- lib/gras_impl/block_actor.hpp | 4 --- lib/gras_impl/stats.hpp | 8 +++++- lib/tag_handlers.hpp | 3 ++- lib/top_block.cpp | 37 -------------------------- lib/top_block_stats.cpp | 62 +++++++++++++++++++++++++++++++++++++++++++ lib/topology_handler.cpp | 6 +++-- 9 files changed, 83 insertions(+), 50 deletions(-) create mode 100644 lib/top_block_stats.cpp (limited to 'lib') diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index bc38d06..69ada7c 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -55,6 +55,7 @@ list(APPEND GRAS_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/output_handlers.cpp ${CMAKE_CURRENT_SOURCE_DIR}/hier_block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/top_block.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/top_block_stats.cpp ${CMAKE_CURRENT_SOURCE_DIR}/register_messages.cpp ) diff --git a/lib/block.cpp b/lib/block.cpp index 716873b..f5ba6ce 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -142,16 +142,17 @@ void Block::produce(const size_t num_items) item_index_t Block::get_consumed(const size_t which_input) { - return (*this)->block->items_consumed[which_input]; + return (*this)->block->stats.items_consumed[which_input]; } item_index_t Block::get_produced(const size_t which_output) { - return (*this)->block->items_produced[which_output]; + return (*this)->block->stats.items_produced[which_output]; } void Block::post_output_tag(const size_t which_output, const Tag &tag) { + (*this)->block->stats.items_produced[which_output]++; (*this)->block->post_downstream(which_output, InputTagMessage(tag)); } @@ -172,6 +173,7 @@ PMCC Block::pop_input_msg(const size_t which_input) if (input_tags.empty()) return PMCC(); PMCC p = input_tags.front().object; input_tags.erase(input_tags.begin()); + (*this)->block->stats.items_consumed[which_input]++; return p; } diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 67ff308..ca106ab 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -229,7 +229,7 @@ void BlockActor::consume(const size_t i, const size_t items) #ifdef ITEM_CONSPROD std::cerr << name << " consume " << items << std::endl; #endif - this->items_consumed[i] += items; + this->stats.items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; this->input_queues.consume(i, bytes); this->trim_tags(i); @@ -241,7 +241,7 @@ void BlockActor::produce(const size_t i, const size_t items) std::cerr << name << " produce " << items << std::endl; #endif SBuffer &buff = this->output_queues.front(i); - this->items_produced[i] += items; + this->stats.items_produced[i] += items; const size_t bytes = items*this->output_items_sizes[i]; buff.length += bytes; } @@ -250,7 +250,7 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) { this->flush_output(i); const size_t items = buffer.length/output_items_sizes[i]; - this->items_produced[i] += items; + this->stats.items_produced[i] += items; InputBufferMessage buff_msg; buff_msg.buffer = buffer; this->post_downstream(i, buff_msg); diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 2aee666..ba69567 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -127,10 +127,6 @@ struct BlockActor : Apology::Worker std::vector input_configs; std::vector output_configs; - //keeps track of production - std::vector items_consumed; - std::vector items_produced; - //work buffers for the new work interface Block::InputItems input_items; Block::OutputItems output_items; diff --git a/lib/gras_impl/stats.hpp b/lib/gras_impl/stats.hpp index 138b231..9c9c8e1 100644 --- a/lib/gras_impl/stats.hpp +++ b/lib/gras_impl/stats.hpp @@ -4,6 +4,7 @@ #define INCLUDED_LIBGRAS_IMPL_STATS_HPP #include +#include namespace gras { @@ -13,7 +14,12 @@ struct BlockStats time_ticks_t start_time; time_ticks_t stop_time; - size_t work_count; + std::vector items_consumed; + std::vector tags_consumed; + std::vector items_produced; + std::vector tags_produced; + + item_index_t work_count; time_ticks_t time_last_work; time_ticks_t total_time_work; time_ticks_t total_time_work_other; diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp index e2eb3b9..d42ce02 100644 --- a/lib/tag_handlers.hpp +++ b/lib/tag_handlers.hpp @@ -25,7 +25,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) //------------------------------------------------------------------ std::vector &tags_i = this->input_tags[i]; - const size_t items_consumed_i = this->items_consumed[i]; + const size_t items_consumed_i = this->stats.items_consumed[i]; size_t last = 0; while (last < tags_i.size() and tags_i[last].offset < items_consumed_i) { @@ -39,6 +39,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) //now its safe to perform the erasure tags_i.erase(tags_i.begin(), tags_i.begin()+last); + this->stats.items_consumed[i] += last; } } //namespace gras diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 714ff4a..4d2f956 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -3,8 +3,6 @@ #include "element_impl.hpp" #include #include //sleep -#include -#include using namespace gras; @@ -139,41 +137,6 @@ bool TopBlock::wait(const double timeout) return (*this)->token.unique(); } -///////////////////////// Stats gathering interface //////////////////////// - -struct GetStatsReceiver : Theron::Receiver -{ - GetStatsReceiver(void) - { - this->RegisterHandler(this, &GetStatsReceiver::handle_get_stats); - } - - void handle_get_stats(const GetStatsMessage &message, const Theron::Address) - { - this->messages.push_back(message); - } - - std::vector messages; -}; - -std::string TopBlock::get_stats_xml(void) -{ - GetStatsReceiver receiver; - (*this)->executor->post_all(GetStatsMessage(), receiver); - std::string xml; - xml += str(boost::format(" %ull\n") % time_now()); - xml += str(boost::format(" %ull\n") % time_tps()); - BOOST_FOREACH(const GetStatsMessage &message, receiver.messages) - { - std::string block_xml; - block_xml += str(boost::format(" %s\n") % message.block_id); - block_xml += str(boost::format(" %llu\n") % message.stats.start_time); - block_xml += str(boost::format(" %llu\n") % message.stats.stop_time); - xml += str(boost::format(" \n%s\n") % block_xml); - } - return str(boost::format("\n%s") % xml); -} - ///////////////////////// Deprecated interfaces //////////////////////// void TopBlock::start(const size_t max_items) diff --git a/lib/top_block_stats.cpp b/lib/top_block_stats.cpp new file mode 100644 index 0000000..5a8790e --- /dev/null +++ b/lib/top_block_stats.cpp @@ -0,0 +1,62 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include "element_impl.hpp" +#include +#include +#include + +using namespace gras; + +struct GetStatsReceiver : Theron::Receiver +{ + GetStatsReceiver(void) + { + this->RegisterHandler(this, &GetStatsReceiver::handle_get_stats); + } + + void handle_get_stats(const GetStatsMessage &message, const Theron::Address) + { + this->messages.push_back(message); + } + + std::vector messages; +}; + +std::string TopBlock::get_stats_xml(void) +{ + GetStatsReceiver receiver; + (*this)->executor->post_all(GetStatsMessage(), receiver); + std::string xml; + xml += str(boost::format(" %ull\n") % time_now()); + xml += str(boost::format(" %ull\n") % time_tps()); + BOOST_FOREACH(const GetStatsMessage &message, receiver.messages) + { + const BlockStats &stats = message.stats; + std::string block_xml; + block_xml += str(boost::format(" %s\n") % message.block_id); + block_xml += str(boost::format(" %llu\n") % stats.start_time); + block_xml += str(boost::format(" %llu\n") % stats.stop_time); + block_xml += str(boost::format(" %llu\n") % stats.work_count); + block_xml += str(boost::format(" %llu\n") % stats.time_last_work); + block_xml += str(boost::format(" %llu\n") % stats.total_time_work); + block_xml += str(boost::format(" %llu\n") % stats.total_time_work_other); + for (size_t i = 0; i < stats.items_consumed.size(); i++) + { + block_xml += str(boost::format(" %llu\n") % stats.items_consumed[i]); + } + for (size_t i = 0; i < stats.tags_consumed.size(); i++) + { + block_xml += str(boost::format(" %llu\n") % stats.tags_consumed[i]); + } + for (size_t i = 0; i < stats.items_produced.size(); i++) + { + block_xml += str(boost::format(" %llu\n") % stats.items_produced[i]); + } + for (size_t i = 0; i < stats.tags_produced.size(); i++) + { + block_xml += str(boost::format(" %llu\n") % stats.tags_produced[i]); + } + xml += str(boost::format(" \n%s\n") % block_xml); + } + return str(boost::format("\n%s") % xml); +} diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index f813b57..06e0b60 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -52,8 +52,10 @@ void BlockActor::handle_topology( resize_fill_back(this->output_configs, num_outputs); //resize the bytes consumed/produced - resize_fill_grow(this->items_consumed, num_inputs, 0); - resize_fill_grow(this->items_produced, num_outputs, 0); + resize_fill_grow(this->stats.items_consumed, num_inputs, 0); + resize_fill_grow(this->stats.tags_consumed, num_inputs, 0); + resize_fill_grow(this->stats.items_produced, num_outputs, 0); + resize_fill_grow(this->stats.tags_produced, num_outputs, 0); //resize all work buffers to match current connections this->input_items.resize(num_inputs); -- cgit