summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp24
1 files changed, 15 insertions, 9 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index b97dabf..52cf505 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -9,6 +9,7 @@ void BlockActor::mark_done(void)
{
if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
+ this->stats.stop_time = time_now();
this->block_ptr->notify_inactive();
//flush partial output buffers to the downstream
@@ -98,18 +99,13 @@ void BlockActor::output_fail(const size_t i)
void BlockActor::handle_task(void)
{
+ const time_ticks_t task_start = time_now();
//------------------------------------------------------------------
//-- Decide if its possible to continue any processing:
//-- Handle task may get called for incoming buffers,
//-- however, not all ports may have available buffers.
//------------------------------------------------------------------
- this->handle_task_count++;
if GRAS_UNLIKELY(not this->is_work_allowed()) return;
- this->work_count++;
-
- #ifdef WORK_DEBUG
- WorkDebugPrinter WDP(block_ptr->to_string());
- #endif
//------------------------------------------------------------------
//-- Asynchronous notification through atomic variable
@@ -185,6 +181,7 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
+ const time_ticks_t work_start = time_now();
if GRAS_UNLIKELY(this->interruptible_thread)
{
this->interruptible_thread->call();
@@ -193,6 +190,7 @@ void BlockActor::handle_task(void)
{
this->task_work();
}
+ const time_ticks_t work_stop = time_now();
//------------------------------------------------------------------
//-- Flush output buffers downstream
@@ -216,6 +214,14 @@ void BlockActor::handle_task(void)
//still have IO ready? kick off another task
this->task_kicker();
+
+ //save stats
+ const time_ticks_t task_time = time_now() - task_start;
+ const time_ticks_t work_time = work_stop - work_start;
+ this->stats.work_count++;
+ this->stats.total_time_work += work_time;
+ this->stats.total_time_work_other += task_time - work_time;
+ this->stats.time_last_work = work_stop;
}
void BlockActor::consume(const size_t i, const size_t items)
@@ -223,7 +229,7 @@ void BlockActor::consume(const size_t i, const size_t items)
#ifdef ITEM_CONSPROD
std::cerr << name << " consume " << items << std::endl;
#endif
- this->items_consumed[i] += items;
+ this->stats.items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
this->input_queues.consume(i, bytes);
this->trim_tags(i);
@@ -235,7 +241,7 @@ void BlockActor::produce(const size_t i, const size_t items)
std::cerr << name << " produce " << items << std::endl;
#endif
SBuffer &buff = this->output_queues.front(i);
- this->items_produced[i] += items;
+ this->stats.items_produced[i] += items;
const size_t bytes = items*this->output_items_sizes[i];
buff.length += bytes;
}
@@ -244,7 +250,7 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
{
this->flush_output(i);
const size_t items = buffer.length/output_items_sizes[i];
- this->items_produced[i] += items;
+ this->stats.items_produced[i] += items;
InputBufferMessage buff_msg;
buff_msg.buffer = buffer;
this->post_downstream(i, buff_msg);