diff options
author | Josh Blum | 2012-11-05 22:59:47 -0800 |
---|---|---|
committer | Josh Blum | 2012-11-05 22:59:47 -0800 |
commit | 37a863991f5e05035cce6738379de7a8ebc2b160 (patch) | |
tree | c7f2c7732074d69f1b3fe147f5fb32d92c34c9c8 /lib/block_task.cpp | |
parent | ed68f79a7a2e3c1bdbe0a2b4c5498ac29e431cce (diff) | |
download | sandhi-37a863991f5e05035cce6738379de7a8ebc2b160.tar.gz sandhi-37a863991f5e05035cce6738379de7a8ebc2b160.tar.bz2 sandhi-37a863991f5e05035cce6738379de7a8ebc2b160.zip |
polish up work and notify api
There is a formal api to mark done, and IO failures.
Notify topology was created for consistency.
So far, there output fail is unimplemented,
and input fail requires an additional check.
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 24 |
1 files changed, 9 insertions, 15 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index c763476..4806914 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -71,7 +71,7 @@ void BlockActor::mark_done(void) << std::flush; } -GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i) +void BlockActor::input_fail(const size_t i) { //input failed, accumulate and try again if (not this->input_queues.is_accumulated(i)) @@ -82,6 +82,13 @@ GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i) } //otherwise check for done, else wait for more if (this->inputs_done[i]) this->mark_done(); + + //TODO check if input buffer is max size and throw +} + +void BlockActor::output_fail(const size_t i) +{ + //TODO } void BlockActor::handle_task(void) @@ -103,7 +110,6 @@ void BlockActor::handle_task(void) const size_t num_inputs = this->get_num_inputs(); const size_t num_outputs = this->get_num_outputs(); - this->work_io_ptr_mask = 0; //reset //------------------------------------------------------------------ //-- initialize input buffers before work @@ -160,7 +166,6 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ - this->work_ret = -1; if (this->interruptible_thread) { this->interruptible_thread->call(); @@ -170,24 +175,13 @@ void BlockActor::handle_task(void) this->task_work(); } - if (work_ret >= 0) - { - this->input_fail(work_ret); - return; - } - - if (work_ret == -1) - { - this->mark_done(); - return; - } - //------------------------------------------------------------------ //-- process input consumption //------------------------------------------------------------------ for (size_t i = 0; i < num_inputs; i++) { const size_t items = this->consume_items[i]; + if (items == 0) continue; this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; |