diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/circular_buffer.cpp | 2 | ||||
-rw-r--r-- | lib/task_main.cpp | 17 |
2 files changed, 12 insertions, 7 deletions
diff --git a/lib/circular_buffer.cpp b/lib/circular_buffer.cpp index 0dda400..a193ad1 100644 --- a/lib/circular_buffer.cpp +++ b/lib/circular_buffer.cpp @@ -194,7 +194,7 @@ SBuffer make_circular_buffer(const size_t num_bytes) std::cerr << boost::format( "GRAS: make_circular_buffer threw ipc exception on attempt %u\n%s" ) % trial_count % ex.what() << std::endl; - if (trial_count== 3) throw ex; + if (trial_count == 3) throw ex; } catch(...) { diff --git a/lib/task_main.cpp b/lib/task_main.cpp index ca05979..2e496fc 100644 --- a/lib/task_main.cpp +++ b/lib/task_main.cpp @@ -153,7 +153,6 @@ void BlockActor::task_main(void) } data->stats.time_last_work = time_now(); TimerAccumulate ta_post(data->stats.total_time_post); - bool mark_done = false; //------------------------------------------------------------------ //-- Post-work input tasks @@ -168,9 +167,7 @@ void BlockActor::task_main(void) //update the inputs available bit field this->update_input_avail(i); - //missing at least one upstream provider? - //since nothing else is coming in, its safe to mark done - mark_done = mark_done or this->is_input_done(i); + //finally update consumed count --affects get_consumed data->total_items_consumed[i] += data->num_input_items_read[i]; } @@ -191,11 +188,19 @@ void BlockActor::task_main(void) //So this explicitly after consuming the output queues so pop is called. //This is because pop may have special hooks in it to prepare the buffer. if GRAS_LIKELY(data->num_output_items_read[i]) worker->post_downstream(i, buff_msg); + + //finally update produced count --affects get_produced data->total_items_produced[i] += data->num_output_items_read[i]; } - //marked done by post work logic - if (mark_done) this->mark_done(); + //now recheck the status, mark block done if an input is done + if GRAS_UNLIKELY(data->inputs_done.any()) + { + for (size_t i = 0; i < num_inputs; i++) + { + if (this->is_input_done(i)) this->mark_done(); + } + } //still have IO ready? kick off another task this->task_kicker(); |