diff options
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 24 |
1 files changed, 15 insertions, 9 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index b97dabf..52cf505 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -9,6 +9,7 @@ void BlockActor::mark_done(void) { if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first + this->stats.stop_time = time_now(); this->block_ptr->notify_inactive(); //flush partial output buffers to the downstream @@ -98,18 +99,13 @@ void BlockActor::output_fail(const size_t i) void BlockActor::handle_task(void) { + const time_ticks_t task_start = time_now(); //------------------------------------------------------------------ //-- Decide if its possible to continue any processing: //-- Handle task may get called for incoming buffers, //-- however, not all ports may have available buffers. //------------------------------------------------------------------ - this->handle_task_count++; if GRAS_UNLIKELY(not this->is_work_allowed()) return; - this->work_count++; - - #ifdef WORK_DEBUG - WorkDebugPrinter WDP(block_ptr->to_string()); - #endif //------------------------------------------------------------------ //-- Asynchronous notification through atomic variable @@ -185,6 +181,7 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ + const time_ticks_t work_start = time_now(); if GRAS_UNLIKELY(this->interruptible_thread) { this->interruptible_thread->call(); @@ -193,6 +190,7 @@ void BlockActor::handle_task(void) { this->task_work(); } + const time_ticks_t work_stop = time_now(); //------------------------------------------------------------------ //-- Flush output buffers downstream @@ -216,6 +214,14 @@ void BlockActor::handle_task(void) //still have IO ready? kick off another task this->task_kicker(); + + //save stats + const time_ticks_t task_time = time_now() - task_start; + const time_ticks_t work_time = work_stop - work_start; + this->stats.work_count++; + this->stats.total_time_work += work_time; + this->stats.total_time_work_other += task_time - work_time; + this->stats.time_last_work = work_stop; } void BlockActor::consume(const size_t i, const size_t items) @@ -223,7 +229,7 @@ void BlockActor::consume(const size_t i, const size_t items) #ifdef ITEM_CONSPROD std::cerr << name << " consume " << items << std::endl; #endif - this->items_consumed[i] += items; + this->stats.items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; this->input_queues.consume(i, bytes); this->trim_tags(i); @@ -235,7 +241,7 @@ void BlockActor::produce(const size_t i, const size_t items) std::cerr << name << " produce " << items << std::endl; #endif SBuffer &buff = this->output_queues.front(i); - this->items_produced[i] += items; + this->stats.items_produced[i] += items; const size_t bytes = items*this->output_items_sizes[i]; buff.length += bytes; } @@ -244,7 +250,7 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) { this->flush_output(i); const size_t items = buffer.length/output_items_sizes[i]; - this->items_produced[i] += items; + this->stats.items_produced[i] += items; InputBufferMessage buff_msg; buff_msg.buffer = buffer; this->post_downstream(i, buff_msg); |