summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/CMakeLists.txt1
-rw-r--r--lib/block.cpp6
-rw-r--r--lib/block_actor.cpp7
-rw-r--r--lib/block_handlers.cpp12
-rw-r--r--lib/block_task.cpp24
-rw-r--r--lib/element.cpp3
-rw-r--r--lib/element_impl.hpp1
-rw-r--r--lib/gras_impl/block_actor.hpp11
-rw-r--r--lib/gras_impl/debug.hpp32
-rw-r--r--lib/gras_impl/messages.hpp9
-rw-r--r--lib/gras_impl/stats.hpp30
-rw-r--r--lib/register_messages.cpp1
-rw-r--r--lib/tag_handlers.hpp3
-rw-r--r--lib/top_block_stats.cpp63
-rw-r--r--lib/topology_handler.cpp6
15 files changed, 149 insertions, 60 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index 715f844..76e4f2d 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -57,6 +57,7 @@ list(APPEND GRAS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/output_handlers.cpp
${CMAKE_CURRENT_SOURCE_DIR}/hier_block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/top_block.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/top_block_stats.cpp
${CMAKE_CURRENT_SOURCE_DIR}/register_messages.cpp
)
diff --git a/lib/block.cpp b/lib/block.cpp
index 330e1a6..424df83 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -164,16 +164,17 @@ void Block::produce(const size_t num_items)
item_index_t Block::get_consumed(const size_t which_input)
{
- return (*this)->block->items_consumed[which_input];
+ return (*this)->block->stats.items_consumed[which_input];
}
item_index_t Block::get_produced(const size_t which_output)
{
- return (*this)->block->items_produced[which_output];
+ return (*this)->block->stats.items_produced[which_output];
}
void Block::post_output_tag(const size_t which_output, const Tag &tag)
{
+ (*this)->block->stats.items_produced[which_output]++;
(*this)->block->post_downstream(which_output, InputTagMessage(tag));
}
@@ -194,6 +195,7 @@ PMCC Block::pop_input_msg(const size_t which_input)
if (input_tags.empty()) return PMCC();
PMCC p = input_tags.front().object;
input_tags.erase(input_tags.begin());
+ (*this)->block->stats.items_consumed[which_input]++;
return p;
}
diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp
index 19f8f97..5b1c5db 100644
--- a/lib/block_actor.cpp
+++ b/lib/block_actor.cpp
@@ -88,16 +88,9 @@ BlockActor::BlockActor(void):
active_thread_pool.reset(); //actors hold this, now its safe to reset, weak_framework only
}
this->register_handlers();
- this->handle_task_count = 0;
- this->work_count = 0;
}
BlockActor::~BlockActor(void)
{
this->mark_done();
- #ifdef WORK_COUNTS
- if (work_count == 0) std::cerr << "\n WORK FAIL!!!" << std::endl;
- std::cerr << name << " handle_task_count " << handle_task_count << std::endl;
- std::cerr << name << " work_count " << work_count << std::endl;
- #endif
}
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 1290a67..73a22da 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -16,6 +16,7 @@ void BlockActor::handle_top_active(
if (this->block_state != BLOCK_STATE_LIVE)
{
this->block_ptr->notify_active();
+ this->stats.start_time = time_now();
}
this->block_state = BLOCK_STATE_LIVE;
this->active_token = message.token;
@@ -128,3 +129,14 @@ void BlockActor::handle_self_kick(
MESSAGE_TRACER();
this->handle_task();
}
+
+void BlockActor::handle_get_stats(
+ const GetStatsMessage &,
+ const Theron::Address from
+){
+ GetStatsMessage message;
+ message.block_id = this->block_ptr->to_string();
+ message.stats = this->stats;
+ message.stats_time = time_now();
+ this->Send(message, from); //ACK
+}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index b97dabf..52cf505 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -9,6 +9,7 @@ void BlockActor::mark_done(void)
{
if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
+ this->stats.stop_time = time_now();
this->block_ptr->notify_inactive();
//flush partial output buffers to the downstream
@@ -98,18 +99,13 @@ void BlockActor::output_fail(const size_t i)
void BlockActor::handle_task(void)
{
+ const time_ticks_t task_start = time_now();
//------------------------------------------------------------------
//-- Decide if its possible to continue any processing:
//-- Handle task may get called for incoming buffers,
//-- however, not all ports may have available buffers.
//------------------------------------------------------------------
- this->handle_task_count++;
if GRAS_UNLIKELY(not this->is_work_allowed()) return;
- this->work_count++;
-
- #ifdef WORK_DEBUG
- WorkDebugPrinter WDP(block_ptr->to_string());
- #endif
//------------------------------------------------------------------
//-- Asynchronous notification through atomic variable
@@ -185,6 +181,7 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
+ const time_ticks_t work_start = time_now();
if GRAS_UNLIKELY(this->interruptible_thread)
{
this->interruptible_thread->call();
@@ -193,6 +190,7 @@ void BlockActor::handle_task(void)
{
this->task_work();
}
+ const time_ticks_t work_stop = time_now();
//------------------------------------------------------------------
//-- Flush output buffers downstream
@@ -216,6 +214,14 @@ void BlockActor::handle_task(void)
//still have IO ready? kick off another task
this->task_kicker();
+
+ //save stats
+ const time_ticks_t task_time = time_now() - task_start;
+ const time_ticks_t work_time = work_stop - work_start;
+ this->stats.work_count++;
+ this->stats.total_time_work += work_time;
+ this->stats.total_time_work_other += task_time - work_time;
+ this->stats.time_last_work = work_stop;
}
void BlockActor::consume(const size_t i, const size_t items)
@@ -223,7 +229,7 @@ void BlockActor::consume(const size_t i, const size_t items)
#ifdef ITEM_CONSPROD
std::cerr << name << " consume " << items << std::endl;
#endif
- this->items_consumed[i] += items;
+ this->stats.items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
this->input_queues.consume(i, bytes);
this->trim_tags(i);
@@ -235,7 +241,7 @@ void BlockActor::produce(const size_t i, const size_t items)
std::cerr << name << " produce " << items << std::endl;
#endif
SBuffer &buff = this->output_queues.front(i);
- this->items_produced[i] += items;
+ this->stats.items_produced[i] += items;
const size_t bytes = items*this->output_items_sizes[i];
buff.length += bytes;
}
@@ -244,7 +250,7 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
{
this->flush_output(i);
const size_t items = buffer.length/output_items_sizes[i];
- this->items_produced[i] += items;
+ this->stats.items_produced[i] += items;
InputBufferMessage buff_msg;
buff_msg.buffer = buffer;
this->post_downstream(i, buff_msg);
diff --git a/lib/element.cpp b/lib/element.cpp
index 71e4e69..368dbea 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -19,6 +19,7 @@ Element::Element(const std::string &name)
this->reset(new ElementImpl());
(*this)->name = name;
(*this)->unique_id = ++unique_id_pool;
+ (*this)->id = str(boost::format("%s(%d)") % this->name() % this->unique_id());
if (GENESIS) std::cerr << "New element: " << to_string() << std::endl;
}
@@ -42,5 +43,5 @@ std::string Element::name(void) const
std::string Element::to_string(void) const
{
- return str(boost::format("%s(%d)") % this->name() % this->unique_id());
+ return (*this)->id;
}
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 9e3c9b3..c707268 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -26,6 +26,7 @@ struct ElementImpl
//common element properties
std::string name;
long unique_id;
+ std::string id;
//top block stuff
SharedThreadGroup thread_group;
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index dca21e6..ba69567 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -11,6 +11,7 @@
#include <gras/thread_pool.hpp>
#include <Apology/Worker.hpp>
#include <gras_impl/token.hpp>
+#include <gras_impl/stats.hpp>
#include <gras_impl/messages.hpp>
#include <gras_impl/output_buffer_queues.hpp>
#include <gras_impl/input_buffer_queues.hpp>
@@ -56,6 +57,7 @@ struct BlockActor : Apology::Worker
this->RegisterHandler(this, &BlockActor::handle_output_update);
this->RegisterHandler(this, &BlockActor::handle_self_kick);
+ this->RegisterHandler(this, &BlockActor::handle_get_stats);
}
//handlers
@@ -83,6 +85,7 @@ struct BlockActor : Apology::Worker
void handle_output_update(const OutputUpdateMessage &, const Theron::Address);
void handle_self_kick(const SelfKickMessage &, const Theron::Address);
+ void handle_get_stats(const GetStatsMessage &, const Theron::Address);
//helpers
void buffer_returner(const size_t index, SBuffer &buffer);
@@ -124,10 +127,6 @@ struct BlockActor : Apology::Worker
std::vector<InputPortConfig> input_configs;
std::vector<OutputPortConfig> output_configs;
- //keeps track of production
- std::vector<item_index_t> items_consumed;
- std::vector<item_index_t> items_produced;
-
//work buffers for the new work interface
Block::InputItems input_items;
Block::OutputItems output_items;
@@ -171,9 +170,7 @@ struct BlockActor : Apology::Worker
std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
- //status keepers
- size_t handle_task_count;
- size_t work_count;
+ BlockStats stats;
};
} //namespace gras
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index 61376d5..c01e7d4 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -24,11 +24,9 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc);
//----------------------------------------------------------------------
//-- define to enable these debugs:
//----------------------------------------------------------------------
-//#define WORK_DEBUG
-//#define ASSERTING
+#define ASSERTING
//#define MESSAGE_TRACING
//#define ITEM_CONSPROD
-//#define WORK_COUNTS
//----------------------------------------------------------------------
//-- time accumulation printer
@@ -68,32 +66,4 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc);
#define ASSERT(x)
#endif
-//----------------------------------------------------------------------
-//-- implementation for work debug
-//----------------------------------------------------------------------
-#ifdef WORK_DEBUG
-
-#include <boost/thread/mutex.hpp>
-
-static boost::mutex work_debug_mutex;
-
-struct WorkDebugPrinter
-{
- WorkDebugPrinter(const std::string &name):
- lock(work_debug_mutex), name(name)
- {
- std::cerr << "-----> begin work on " << name << std::endl;
- }
-
- ~WorkDebugPrinter(void)
- {
- std::cerr << "<----- end work on " << name << std::endl;
- }
-
- boost::mutex::scoped_lock lock;
- std::string name;
-};
-
-#endif
-
#endif /*INCLUDED_LIBGRAS_IMPL_DEBUG_HPP*/
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index 57f308b..386cd0a 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -8,6 +8,7 @@
#include <gras/tags.hpp>
#include <gras/sbuffer.hpp>
#include <gras_impl/token.hpp>
+#include <gras_impl/stats.hpp>
namespace gras
{
@@ -128,6 +129,13 @@ struct SelfKickMessage
//empty
};
+struct GetStatsMessage
+{
+ std::string block_id;
+ BlockStats stats;
+ time_ticks_t stats_time;
+};
+
} //namespace gras
#include <Theron/Register.h>
@@ -157,5 +165,6 @@ THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputAllocMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputUpdateMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::SelfKickMessage);
+THERON_DECLARE_REGISTERED_MESSAGE(gras::GetStatsMessage);
#endif /*INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP*/
diff --git a/lib/gras_impl/stats.hpp b/lib/gras_impl/stats.hpp
new file mode 100644
index 0000000..9c9c8e1
--- /dev/null
+++ b/lib/gras_impl/stats.hpp
@@ -0,0 +1,30 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#ifndef INCLUDED_LIBGRAS_IMPL_STATS_HPP
+#define INCLUDED_LIBGRAS_IMPL_STATS_HPP
+
+#include <gras/chrono.hpp>
+#include <vector>
+
+namespace gras
+{
+
+struct BlockStats
+{
+ time_ticks_t start_time;
+ time_ticks_t stop_time;
+
+ std::vector<item_index_t> items_consumed;
+ std::vector<item_index_t> tags_consumed;
+ std::vector<item_index_t> items_produced;
+ std::vector<item_index_t> tags_produced;
+
+ item_index_t work_count;
+ time_ticks_t time_last_work;
+ time_ticks_t total_time_work;
+ time_ticks_t total_time_work_other;
+};
+
+} //namespace gras
+
+#endif /*INCLUDED_LIBGRAS_IMPL_STATS_HPP*/
diff --git a/lib/register_messages.cpp b/lib/register_messages.cpp
index caa8ed8..f68bc14 100644
--- a/lib/register_messages.cpp
+++ b/lib/register_messages.cpp
@@ -26,3 +26,4 @@ THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputAllocMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputUpdateMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::SelfKickMessage);
+THERON_DEFINE_REGISTERED_MESSAGE(gras::GetStatsMessage);
diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp
index e2eb3b9..d42ce02 100644
--- a/lib/tag_handlers.hpp
+++ b/lib/tag_handlers.hpp
@@ -25,7 +25,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
//------------------------------------------------------------------
std::vector<Tag> &tags_i = this->input_tags[i];
- const size_t items_consumed_i = this->items_consumed[i];
+ const size_t items_consumed_i = this->stats.items_consumed[i];
size_t last = 0;
while (last < tags_i.size() and tags_i[last].offset < items_consumed_i)
{
@@ -39,6 +39,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
//now its safe to perform the erasure
tags_i.erase(tags_i.begin(), tags_i.begin()+last);
+ this->stats.items_consumed[i] += last;
}
} //namespace gras
diff --git a/lib/top_block_stats.cpp b/lib/top_block_stats.cpp
new file mode 100644
index 0000000..5d206e2
--- /dev/null
+++ b/lib/top_block_stats.cpp
@@ -0,0 +1,63 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include "element_impl.hpp"
+#include <gras/top_block.hpp>
+#include <boost/foreach.hpp>
+#include <boost/format.hpp>
+
+using namespace gras;
+
+struct GetStatsReceiver : Theron::Receiver
+{
+ GetStatsReceiver(void)
+ {
+ this->RegisterHandler(this, &GetStatsReceiver::handle_get_stats);
+ }
+
+ void handle_get_stats(const GetStatsMessage &message, const Theron::Address)
+ {
+ this->messages.push_back(message);
+ }
+
+ std::vector<GetStatsMessage> messages;
+};
+
+std::string TopBlock::get_stats_xml(void)
+{
+ GetStatsReceiver receiver;
+ (*this)->executor->post_all(GetStatsMessage(), receiver);
+ std::string xml;
+ xml += str(boost::format(" <now>%llu</now>\n") % time_now());
+ xml += str(boost::format(" <tps>%llu</tps>\n") % time_tps());
+ BOOST_FOREACH(const GetStatsMessage &message, receiver.messages)
+ {
+ const BlockStats &stats = message.stats;
+ std::string block_xml;
+ block_xml += str(boost::format(" <tps>%llu</tps>\n") % time_tps());
+ block_xml += str(boost::format(" <stats_time>%llu</stats_time>\n") % message.stats_time);
+ block_xml += str(boost::format(" <start_time>%llu</start_time>\n") % stats.start_time);
+ block_xml += str(boost::format(" <stop_time>%llu</stop_time>\n") % stats.stop_time);
+ block_xml += str(boost::format(" <work_count>%llu</work_count>\n") % stats.work_count);
+ block_xml += str(boost::format(" <time_last_work>%llu</time_last_work>\n") % stats.time_last_work);
+ block_xml += str(boost::format(" <total_time_work>%llu</total_time_work>\n") % stats.total_time_work);
+ block_xml += str(boost::format(" <total_time_work_other>%llu</total_time_work_other>\n") % stats.total_time_work_other);
+ for (size_t i = 0; i < stats.items_consumed.size(); i++)
+ {
+ block_xml += str(boost::format(" <items_consumed>%llu</items_consumed>\n") % stats.items_consumed[i]);
+ }
+ for (size_t i = 0; i < stats.tags_consumed.size(); i++)
+ {
+ block_xml += str(boost::format(" <tags_consumed>%llu</tags_consumed>\n") % stats.tags_consumed[i]);
+ }
+ for (size_t i = 0; i < stats.items_produced.size(); i++)
+ {
+ block_xml += str(boost::format(" <items_produced>%llu</items_produced>\n") % stats.items_produced[i]);
+ }
+ for (size_t i = 0; i < stats.tags_produced.size(); i++)
+ {
+ block_xml += str(boost::format(" <tags_produced>%llu</tags_produced>\n") % stats.tags_produced[i]);
+ }
+ xml += str(boost::format(" <block id=\"%s\">\n%s</block>\n") % message.block_id % block_xml);
+ }
+ return str(boost::format("<gras_stats>\n%s</gras_stats>") % xml);
+}
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index af4dead..4f12619 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -39,8 +39,10 @@ void BlockActor::handle_topology(
resize_fill_back(this->output_configs, num_outputs);
//resize the bytes consumed/produced
- resize_fill_grow(this->items_consumed, num_inputs, 0);
- resize_fill_grow(this->items_produced, num_outputs, 0);
+ resize_fill_grow(this->stats.items_consumed, num_inputs, 0);
+ resize_fill_grow(this->stats.tags_consumed, num_inputs, 0);
+ resize_fill_grow(this->stats.items_produced, num_outputs, 0);
+ resize_fill_grow(this->stats.tags_produced, num_outputs, 0);
//resize all work buffers to match current connections
this->input_items.resize(num_inputs);