diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | lib/block.cpp | 6 | ||||
-rw-r--r-- | lib/block_actor.cpp | 7 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 12 | ||||
-rw-r--r-- | lib/block_task.cpp | 24 | ||||
-rw-r--r-- | lib/element.cpp | 3 | ||||
-rw-r--r-- | lib/element_impl.hpp | 1 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 11 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 32 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 9 | ||||
-rw-r--r-- | lib/gras_impl/stats.hpp | 30 | ||||
-rw-r--r-- | lib/register_messages.cpp | 1 | ||||
-rw-r--r-- | lib/tag_handlers.hpp | 3 | ||||
-rw-r--r-- | lib/top_block_stats.cpp | 63 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 6 |
15 files changed, 149 insertions, 60 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 715f844..76e4f2d 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -57,6 +57,7 @@ list(APPEND GRAS_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/output_handlers.cpp ${CMAKE_CURRENT_SOURCE_DIR}/hier_block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/top_block.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/top_block_stats.cpp ${CMAKE_CURRENT_SOURCE_DIR}/register_messages.cpp ) diff --git a/lib/block.cpp b/lib/block.cpp index 330e1a6..424df83 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -164,16 +164,17 @@ void Block::produce(const size_t num_items) item_index_t Block::get_consumed(const size_t which_input) { - return (*this)->block->items_consumed[which_input]; + return (*this)->block->stats.items_consumed[which_input]; } item_index_t Block::get_produced(const size_t which_output) { - return (*this)->block->items_produced[which_output]; + return (*this)->block->stats.items_produced[which_output]; } void Block::post_output_tag(const size_t which_output, const Tag &tag) { + (*this)->block->stats.items_produced[which_output]++; (*this)->block->post_downstream(which_output, InputTagMessage(tag)); } @@ -194,6 +195,7 @@ PMCC Block::pop_input_msg(const size_t which_input) if (input_tags.empty()) return PMCC(); PMCC p = input_tags.front().object; input_tags.erase(input_tags.begin()); + (*this)->block->stats.items_consumed[which_input]++; return p; } diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp index 19f8f97..5b1c5db 100644 --- a/lib/block_actor.cpp +++ b/lib/block_actor.cpp @@ -88,16 +88,9 @@ BlockActor::BlockActor(void): active_thread_pool.reset(); //actors hold this, now its safe to reset, weak_framework only } this->register_handlers(); - this->handle_task_count = 0; - this->work_count = 0; } BlockActor::~BlockActor(void) { this->mark_done(); - #ifdef WORK_COUNTS - if (work_count == 0) std::cerr << "\n WORK FAIL!!!" << std::endl; - std::cerr << name << " handle_task_count " << handle_task_count << std::endl; - std::cerr << name << " work_count " << work_count << std::endl; - #endif } diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 1290a67..73a22da 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -16,6 +16,7 @@ void BlockActor::handle_top_active( if (this->block_state != BLOCK_STATE_LIVE) { this->block_ptr->notify_active(); + this->stats.start_time = time_now(); } this->block_state = BLOCK_STATE_LIVE; this->active_token = message.token; @@ -128,3 +129,14 @@ void BlockActor::handle_self_kick( MESSAGE_TRACER(); this->handle_task(); } + +void BlockActor::handle_get_stats( + const GetStatsMessage &, + const Theron::Address from +){ + GetStatsMessage message; + message.block_id = this->block_ptr->to_string(); + message.stats = this->stats; + message.stats_time = time_now(); + this->Send(message, from); //ACK +} diff --git a/lib/block_task.cpp b/lib/block_task.cpp index b97dabf..52cf505 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -9,6 +9,7 @@ void BlockActor::mark_done(void) { if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first + this->stats.stop_time = time_now(); this->block_ptr->notify_inactive(); //flush partial output buffers to the downstream @@ -98,18 +99,13 @@ void BlockActor::output_fail(const size_t i) void BlockActor::handle_task(void) { + const time_ticks_t task_start = time_now(); //------------------------------------------------------------------ //-- Decide if its possible to continue any processing: //-- Handle task may get called for incoming buffers, //-- however, not all ports may have available buffers. //------------------------------------------------------------------ - this->handle_task_count++; if GRAS_UNLIKELY(not this->is_work_allowed()) return; - this->work_count++; - - #ifdef WORK_DEBUG - WorkDebugPrinter WDP(block_ptr->to_string()); - #endif //------------------------------------------------------------------ //-- Asynchronous notification through atomic variable @@ -185,6 +181,7 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ + const time_ticks_t work_start = time_now(); if GRAS_UNLIKELY(this->interruptible_thread) { this->interruptible_thread->call(); @@ -193,6 +190,7 @@ void BlockActor::handle_task(void) { this->task_work(); } + const time_ticks_t work_stop = time_now(); //------------------------------------------------------------------ //-- Flush output buffers downstream @@ -216,6 +214,14 @@ void BlockActor::handle_task(void) //still have IO ready? kick off another task this->task_kicker(); + + //save stats + const time_ticks_t task_time = time_now() - task_start; + const time_ticks_t work_time = work_stop - work_start; + this->stats.work_count++; + this->stats.total_time_work += work_time; + this->stats.total_time_work_other += task_time - work_time; + this->stats.time_last_work = work_stop; } void BlockActor::consume(const size_t i, const size_t items) @@ -223,7 +229,7 @@ void BlockActor::consume(const size_t i, const size_t items) #ifdef ITEM_CONSPROD std::cerr << name << " consume " << items << std::endl; #endif - this->items_consumed[i] += items; + this->stats.items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; this->input_queues.consume(i, bytes); this->trim_tags(i); @@ -235,7 +241,7 @@ void BlockActor::produce(const size_t i, const size_t items) std::cerr << name << " produce " << items << std::endl; #endif SBuffer &buff = this->output_queues.front(i); - this->items_produced[i] += items; + this->stats.items_produced[i] += items; const size_t bytes = items*this->output_items_sizes[i]; buff.length += bytes; } @@ -244,7 +250,7 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) { this->flush_output(i); const size_t items = buffer.length/output_items_sizes[i]; - this->items_produced[i] += items; + this->stats.items_produced[i] += items; InputBufferMessage buff_msg; buff_msg.buffer = buffer; this->post_downstream(i, buff_msg); diff --git a/lib/element.cpp b/lib/element.cpp index 71e4e69..368dbea 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -19,6 +19,7 @@ Element::Element(const std::string &name) this->reset(new ElementImpl()); (*this)->name = name; (*this)->unique_id = ++unique_id_pool; + (*this)->id = str(boost::format("%s(%d)") % this->name() % this->unique_id()); if (GENESIS) std::cerr << "New element: " << to_string() << std::endl; } @@ -42,5 +43,5 @@ std::string Element::name(void) const std::string Element::to_string(void) const { - return str(boost::format("%s(%d)") % this->name() % this->unique_id()); + return (*this)->id; } diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 9e3c9b3..c707268 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -26,6 +26,7 @@ struct ElementImpl //common element properties std::string name; long unique_id; + std::string id; //top block stuff SharedThreadGroup thread_group; diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index dca21e6..ba69567 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -11,6 +11,7 @@ #include <gras/thread_pool.hpp> #include <Apology/Worker.hpp> #include <gras_impl/token.hpp> +#include <gras_impl/stats.hpp> #include <gras_impl/messages.hpp> #include <gras_impl/output_buffer_queues.hpp> #include <gras_impl/input_buffer_queues.hpp> @@ -56,6 +57,7 @@ struct BlockActor : Apology::Worker this->RegisterHandler(this, &BlockActor::handle_output_update); this->RegisterHandler(this, &BlockActor::handle_self_kick); + this->RegisterHandler(this, &BlockActor::handle_get_stats); } //handlers @@ -83,6 +85,7 @@ struct BlockActor : Apology::Worker void handle_output_update(const OutputUpdateMessage &, const Theron::Address); void handle_self_kick(const SelfKickMessage &, const Theron::Address); + void handle_get_stats(const GetStatsMessage &, const Theron::Address); //helpers void buffer_returner(const size_t index, SBuffer &buffer); @@ -124,10 +127,6 @@ struct BlockActor : Apology::Worker std::vector<InputPortConfig> input_configs; std::vector<OutputPortConfig> output_configs; - //keeps track of production - std::vector<item_index_t> items_consumed; - std::vector<item_index_t> items_produced; - //work buffers for the new work interface Block::InputItems input_items; Block::OutputItems output_items; @@ -171,9 +170,7 @@ struct BlockActor : Apology::Worker std::vector<std::vector<OutputHintMessage> > output_allocation_hints; - //status keepers - size_t handle_task_count; - size_t work_count; + BlockStats stats; }; } //namespace gras diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index 61376d5..c01e7d4 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -24,11 +24,9 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc); //---------------------------------------------------------------------- //-- define to enable these debugs: //---------------------------------------------------------------------- -//#define WORK_DEBUG -//#define ASSERTING +#define ASSERTING //#define MESSAGE_TRACING //#define ITEM_CONSPROD -//#define WORK_COUNTS //---------------------------------------------------------------------- //-- time accumulation printer @@ -68,32 +66,4 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc); #define ASSERT(x) #endif -//---------------------------------------------------------------------- -//-- implementation for work debug -//---------------------------------------------------------------------- -#ifdef WORK_DEBUG - -#include <boost/thread/mutex.hpp> - -static boost::mutex work_debug_mutex; - -struct WorkDebugPrinter -{ - WorkDebugPrinter(const std::string &name): - lock(work_debug_mutex), name(name) - { - std::cerr << "-----> begin work on " << name << std::endl; - } - - ~WorkDebugPrinter(void) - { - std::cerr << "<----- end work on " << name << std::endl; - } - - boost::mutex::scoped_lock lock; - std::string name; -}; - -#endif - #endif /*INCLUDED_LIBGRAS_IMPL_DEBUG_HPP*/ diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index 57f308b..386cd0a 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -8,6 +8,7 @@ #include <gras/tags.hpp> #include <gras/sbuffer.hpp> #include <gras_impl/token.hpp> +#include <gras_impl/stats.hpp> namespace gras { @@ -128,6 +129,13 @@ struct SelfKickMessage //empty }; +struct GetStatsMessage +{ + std::string block_id; + BlockStats stats; + time_ticks_t stats_time; +}; + } //namespace gras #include <Theron/Register.h> @@ -157,5 +165,6 @@ THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputAllocMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputUpdateMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::SelfKickMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gras::GetStatsMessage); #endif /*INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP*/ diff --git a/lib/gras_impl/stats.hpp b/lib/gras_impl/stats.hpp new file mode 100644 index 0000000..9c9c8e1 --- /dev/null +++ b/lib/gras_impl/stats.hpp @@ -0,0 +1,30 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#ifndef INCLUDED_LIBGRAS_IMPL_STATS_HPP +#define INCLUDED_LIBGRAS_IMPL_STATS_HPP + +#include <gras/chrono.hpp> +#include <vector> + +namespace gras +{ + +struct BlockStats +{ + time_ticks_t start_time; + time_ticks_t stop_time; + + std::vector<item_index_t> items_consumed; + std::vector<item_index_t> tags_consumed; + std::vector<item_index_t> items_produced; + std::vector<item_index_t> tags_produced; + + item_index_t work_count; + time_ticks_t time_last_work; + time_ticks_t total_time_work; + time_ticks_t total_time_work_other; +}; + +} //namespace gras + +#endif /*INCLUDED_LIBGRAS_IMPL_STATS_HPP*/ diff --git a/lib/register_messages.cpp b/lib/register_messages.cpp index caa8ed8..f68bc14 100644 --- a/lib/register_messages.cpp +++ b/lib/register_messages.cpp @@ -26,3 +26,4 @@ THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputAllocMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputUpdateMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::SelfKickMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gras::GetStatsMessage); diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp index e2eb3b9..d42ce02 100644 --- a/lib/tag_handlers.hpp +++ b/lib/tag_handlers.hpp @@ -25,7 +25,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) //------------------------------------------------------------------ std::vector<Tag> &tags_i = this->input_tags[i]; - const size_t items_consumed_i = this->items_consumed[i]; + const size_t items_consumed_i = this->stats.items_consumed[i]; size_t last = 0; while (last < tags_i.size() and tags_i[last].offset < items_consumed_i) { @@ -39,6 +39,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) //now its safe to perform the erasure tags_i.erase(tags_i.begin(), tags_i.begin()+last); + this->stats.items_consumed[i] += last; } } //namespace gras diff --git a/lib/top_block_stats.cpp b/lib/top_block_stats.cpp new file mode 100644 index 0000000..5d206e2 --- /dev/null +++ b/lib/top_block_stats.cpp @@ -0,0 +1,63 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include "element_impl.hpp" +#include <gras/top_block.hpp> +#include <boost/foreach.hpp> +#include <boost/format.hpp> + +using namespace gras; + +struct GetStatsReceiver : Theron::Receiver +{ + GetStatsReceiver(void) + { + this->RegisterHandler(this, &GetStatsReceiver::handle_get_stats); + } + + void handle_get_stats(const GetStatsMessage &message, const Theron::Address) + { + this->messages.push_back(message); + } + + std::vector<GetStatsMessage> messages; +}; + +std::string TopBlock::get_stats_xml(void) +{ + GetStatsReceiver receiver; + (*this)->executor->post_all(GetStatsMessage(), receiver); + std::string xml; + xml += str(boost::format(" <now>%llu</now>\n") % time_now()); + xml += str(boost::format(" <tps>%llu</tps>\n") % time_tps()); + BOOST_FOREACH(const GetStatsMessage &message, receiver.messages) + { + const BlockStats &stats = message.stats; + std::string block_xml; + block_xml += str(boost::format(" <tps>%llu</tps>\n") % time_tps()); + block_xml += str(boost::format(" <stats_time>%llu</stats_time>\n") % message.stats_time); + block_xml += str(boost::format(" <start_time>%llu</start_time>\n") % stats.start_time); + block_xml += str(boost::format(" <stop_time>%llu</stop_time>\n") % stats.stop_time); + block_xml += str(boost::format(" <work_count>%llu</work_count>\n") % stats.work_count); + block_xml += str(boost::format(" <time_last_work>%llu</time_last_work>\n") % stats.time_last_work); + block_xml += str(boost::format(" <total_time_work>%llu</total_time_work>\n") % stats.total_time_work); + block_xml += str(boost::format(" <total_time_work_other>%llu</total_time_work_other>\n") % stats.total_time_work_other); + for (size_t i = 0; i < stats.items_consumed.size(); i++) + { + block_xml += str(boost::format(" <items_consumed>%llu</items_consumed>\n") % stats.items_consumed[i]); + } + for (size_t i = 0; i < stats.tags_consumed.size(); i++) + { + block_xml += str(boost::format(" <tags_consumed>%llu</tags_consumed>\n") % stats.tags_consumed[i]); + } + for (size_t i = 0; i < stats.items_produced.size(); i++) + { + block_xml += str(boost::format(" <items_produced>%llu</items_produced>\n") % stats.items_produced[i]); + } + for (size_t i = 0; i < stats.tags_produced.size(); i++) + { + block_xml += str(boost::format(" <tags_produced>%llu</tags_produced>\n") % stats.tags_produced[i]); + } + xml += str(boost::format(" <block id=\"%s\">\n%s</block>\n") % message.block_id % block_xml); + } + return str(boost::format("<gras_stats>\n%s</gras_stats>") % xml); +} diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index af4dead..4f12619 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -39,8 +39,10 @@ void BlockActor::handle_topology( resize_fill_back(this->output_configs, num_outputs); //resize the bytes consumed/produced - resize_fill_grow(this->items_consumed, num_inputs, 0); - resize_fill_grow(this->items_produced, num_outputs, 0); + resize_fill_grow(this->stats.items_consumed, num_inputs, 0); + resize_fill_grow(this->stats.tags_consumed, num_inputs, 0); + resize_fill_grow(this->stats.items_produced, num_outputs, 0); + resize_fill_grow(this->stats.tags_produced, num_outputs, 0); //resize all work buffers to match current connections this->input_items.resize(num_inputs); |