diff options
author | Josh Blum | 2012-11-05 22:59:47 -0800 |
---|---|---|
committer | Josh Blum | 2012-11-05 22:59:47 -0800 |
commit | 37a863991f5e05035cce6738379de7a8ebc2b160 (patch) | |
tree | c7f2c7732074d69f1b3fe147f5fb32d92c34c9c8 /lib | |
parent | ed68f79a7a2e3c1bdbe0a2b4c5498ac29e431cce (diff) | |
download | sandhi-37a863991f5e05035cce6738379de7a8ebc2b160.tar.gz sandhi-37a863991f5e05035cce6738379de7a8ebc2b160.tar.bz2 sandhi-37a863991f5e05035cce6738379de7a8ebc2b160.zip |
polish up work and notify api
There is a formal api to mark done, and IO failures.
Notify topology was created for consistency.
So far, there output fail is unimplemented,
and input fail requires an additional check.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block.cpp | 19 | ||||
-rw-r--r-- | lib/block_task.cpp | 24 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 12 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 6 |
4 files changed, 29 insertions, 32 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 3732fdb..51c5f08 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -156,9 +156,9 @@ bool Block::stop(void) return true; } -bool Block::check_topology(int, int) +void Block::notify_topology(const size_t, const size_t) { - return true; + return; } void Block::set_buffer_affinity(const long affinity) @@ -170,3 +170,18 @@ void Block::set_interruptible_work(const bool enb) { (*this)->block->interruptible_work = enb; } + +void Block::mark_output_fail(const size_t which_output) +{ + (*this)->block->output_fail(which_output); +} + +void Block::mark_input_fail(const size_t which_input) +{ + (*this)->block->input_fail(which_input); +} + +void Block::mark_done(void) +{ + (*this)->block->mark_done(); +} diff --git a/lib/block_task.cpp b/lib/block_task.cpp index c763476..4806914 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -71,7 +71,7 @@ void BlockActor::mark_done(void) << std::flush; } -GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i) +void BlockActor::input_fail(const size_t i) { //input failed, accumulate and try again if (not this->input_queues.is_accumulated(i)) @@ -82,6 +82,13 @@ GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i) } //otherwise check for done, else wait for more if (this->inputs_done[i]) this->mark_done(); + + //TODO check if input buffer is max size and throw +} + +void BlockActor::output_fail(const size_t i) +{ + //TODO } void BlockActor::handle_task(void) @@ -103,7 +110,6 @@ void BlockActor::handle_task(void) const size_t num_inputs = this->get_num_inputs(); const size_t num_outputs = this->get_num_outputs(); - this->work_io_ptr_mask = 0; //reset //------------------------------------------------------------------ //-- initialize input buffers before work @@ -160,7 +166,6 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ - this->work_ret = -1; if (this->interruptible_thread) { this->interruptible_thread->call(); @@ -170,24 +175,13 @@ void BlockActor::handle_task(void) this->task_work(); } - if (work_ret >= 0) - { - this->input_fail(work_ret); - return; - } - - if (work_ret == -1) - { - this->mark_done(); - return; - } - //------------------------------------------------------------------ //-- process input consumption //------------------------------------------------------------------ for (size_t i = 0; i < num_inputs; i++) { const size_t items = this->consume_items[i]; + if (items == 0) continue; this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 18c13e2..67e1815 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -101,6 +101,7 @@ struct BlockActor : Apology::Worker void mark_done(void); void handle_task(void); void input_fail(const size_t index); + void output_fail(const size_t index); void sort_tags(const size_t index); void trim_tags(const size_t index); GRAS_FORCE_INLINE bool any_inputs_done(void) @@ -126,17 +127,9 @@ struct BlockActor : Apology::Worker std::vector<uint64_t> items_consumed; std::vector<uint64_t> items_produced; - //work buffers for the classic interface - size_t work_noutput_items; - std::vector<const void *> work_input_items; - std::vector<void *> work_output_items; - std::vector<int> work_ninput_items; - std::vector<int> fcast_ninput_items; - //work buffers for the new work interface Block::InputItems input_items; Block::OutputItems output_items; - ptrdiff_t work_io_ptr_mask; //track work's calls to produce and consume std::vector<size_t> produce_items; @@ -164,10 +157,9 @@ struct BlockActor : Apology::Worker boost::shared_ptr<InterruptibleThread> interruptible_thread; //work helpers - int work_ret; inline void task_work(void) { - this->work_ret = block_ptr->work(this->input_items, this->output_items); + block_ptr->work(this->input_items, this->output_items); } //is the fg running? diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index d1512e9..d058c41 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -52,7 +52,7 @@ void BlockActor::handle_topology( const size_t num_outputs = this->get_num_outputs(); //call check_topology on block before committing settings - this->block_ptr->check_topology(num_inputs, num_outputs); + this->block_ptr->notify_topology(num_inputs, num_outputs); //fill the item sizes from the IO signatures fill_item_sizes_from_sig(this->input_items_sizes, block_ptr->input_signature(), num_inputs); @@ -67,10 +67,6 @@ void BlockActor::handle_topology( resize_fill_grow(this->items_produced, num_outputs, 0); //resize all work buffers to match current connections - this->work_input_items.resize(num_inputs); - this->work_output_items.resize(num_outputs); - this->work_ninput_items.resize(num_inputs); - this->fcast_ninput_items.resize(num_inputs); this->input_items.resize(num_inputs); this->output_items.resize(num_outputs); this->consume_items.resize(num_inputs, 0); |