summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-11-05 22:59:47 -0800
committerJosh Blum2012-11-05 22:59:47 -0800
commit37a863991f5e05035cce6738379de7a8ebc2b160 (patch)
treec7f2c7732074d69f1b3fe147f5fb32d92c34c9c8 /lib/block_task.cpp
parented68f79a7a2e3c1bdbe0a2b4c5498ac29e431cce (diff)
downloadsandhi-37a863991f5e05035cce6738379de7a8ebc2b160.tar.gz
sandhi-37a863991f5e05035cce6738379de7a8ebc2b160.tar.bz2
sandhi-37a863991f5e05035cce6738379de7a8ebc2b160.zip
polish up work and notify api
There is a formal api to mark done, and IO failures. Notify topology was created for consistency. So far, there output fail is unimplemented, and input fail requires an additional check.
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp24
1 files changed, 9 insertions, 15 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index c763476..4806914 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -71,7 +71,7 @@ void BlockActor::mark_done(void)
<< std::flush;
}
-GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i)
+void BlockActor::input_fail(const size_t i)
{
//input failed, accumulate and try again
if (not this->input_queues.is_accumulated(i))
@@ -82,6 +82,13 @@ GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i)
}
//otherwise check for done, else wait for more
if (this->inputs_done[i]) this->mark_done();
+
+ //TODO check if input buffer is max size and throw
+}
+
+void BlockActor::output_fail(const size_t i)
+{
+ //TODO
}
void BlockActor::handle_task(void)
@@ -103,7 +110,6 @@ void BlockActor::handle_task(void)
const size_t num_inputs = this->get_num_inputs();
const size_t num_outputs = this->get_num_outputs();
- this->work_io_ptr_mask = 0; //reset
//------------------------------------------------------------------
//-- initialize input buffers before work
@@ -160,7 +166,6 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
- this->work_ret = -1;
if (this->interruptible_thread)
{
this->interruptible_thread->call();
@@ -170,24 +175,13 @@ void BlockActor::handle_task(void)
this->task_work();
}
- if (work_ret >= 0)
- {
- this->input_fail(work_ret);
- return;
- }
-
- if (work_ret == -1)
- {
- this->mark_done();
- return;
- }
-
//------------------------------------------------------------------
//-- process input consumption
//------------------------------------------------------------------
for (size_t i = 0; i < num_inputs; i++)
{
const size_t items = this->consume_items[i];
+ if (items == 0) continue;
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];