diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block.cpp | 19 | ||||
-rw-r--r-- | lib/block_task.cpp | 24 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 12 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 6 |
4 files changed, 29 insertions, 32 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 3732fdb..51c5f08 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -156,9 +156,9 @@ bool Block::stop(void) return true; } -bool Block::check_topology(int, int) +void Block::notify_topology(const size_t, const size_t) { - return true; + return; } void Block::set_buffer_affinity(const long affinity) @@ -170,3 +170,18 @@ void Block::set_interruptible_work(const bool enb) { (*this)->block->interruptible_work = enb; } + +void Block::mark_output_fail(const size_t which_output) +{ + (*this)->block->output_fail(which_output); +} + +void Block::mark_input_fail(const size_t which_input) +{ + (*this)->block->input_fail(which_input); +} + +void Block::mark_done(void) +{ + (*this)->block->mark_done(); +} diff --git a/lib/block_task.cpp b/lib/block_task.cpp index c763476..4806914 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -71,7 +71,7 @@ void BlockActor::mark_done(void) << std::flush; } -GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i) +void BlockActor::input_fail(const size_t i) { //input failed, accumulate and try again if (not this->input_queues.is_accumulated(i)) @@ -82,6 +82,13 @@ GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i) } //otherwise check for done, else wait for more if (this->inputs_done[i]) this->mark_done(); + + //TODO check if input buffer is max size and throw +} + +void BlockActor::output_fail(const size_t i) +{ + //TODO } void BlockActor::handle_task(void) @@ -103,7 +110,6 @@ void BlockActor::handle_task(void) const size_t num_inputs = this->get_num_inputs(); const size_t num_outputs = this->get_num_outputs(); - this->work_io_ptr_mask = 0; //reset //------------------------------------------------------------------ //-- initialize input buffers before work @@ -160,7 +166,6 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ - this->work_ret = -1; if (this->interruptible_thread) { this->interruptible_thread->call(); @@ -170,24 +175,13 @@ void BlockActor::handle_task(void) this->task_work(); } - if (work_ret >= 0) - { - this->input_fail(work_ret); - return; - } - - if (work_ret == -1) - { - this->mark_done(); - return; - } - //------------------------------------------------------------------ //-- process input consumption //------------------------------------------------------------------ for (size_t i = 0; i < num_inputs; i++) { const size_t items = this->consume_items[i]; + if (items == 0) continue; this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 18c13e2..67e1815 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -101,6 +101,7 @@ struct BlockActor : Apology::Worker void mark_done(void); void handle_task(void); void input_fail(const size_t index); + void output_fail(const size_t index); void sort_tags(const size_t index); void trim_tags(const size_t index); GRAS_FORCE_INLINE bool any_inputs_done(void) @@ -126,17 +127,9 @@ struct BlockActor : Apology::Worker std::vector<uint64_t> items_consumed; std::vector<uint64_t> items_produced; - //work buffers for the classic interface - size_t work_noutput_items; - std::vector<const void *> work_input_items; - std::vector<void *> work_output_items; - std::vector<int> work_ninput_items; - std::vector<int> fcast_ninput_items; - //work buffers for the new work interface Block::InputItems input_items; Block::OutputItems output_items; - ptrdiff_t work_io_ptr_mask; //track work's calls to produce and consume std::vector<size_t> produce_items; @@ -164,10 +157,9 @@ struct BlockActor : Apology::Worker boost::shared_ptr<InterruptibleThread> interruptible_thread; //work helpers - int work_ret; inline void task_work(void) { - this->work_ret = block_ptr->work(this->input_items, this->output_items); + block_ptr->work(this->input_items, this->output_items); } //is the fg running? diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index d1512e9..d058c41 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -52,7 +52,7 @@ void BlockActor::handle_topology( const size_t num_outputs = this->get_num_outputs(); //call check_topology on block before committing settings - this->block_ptr->check_topology(num_inputs, num_outputs); + this->block_ptr->notify_topology(num_inputs, num_outputs); //fill the item sizes from the IO signatures fill_item_sizes_from_sig(this->input_items_sizes, block_ptr->input_signature(), num_inputs); @@ -67,10 +67,6 @@ void BlockActor::handle_topology( resize_fill_grow(this->items_produced, num_outputs, 0); //resize all work buffers to match current connections - this->work_input_items.resize(num_inputs); - this->work_output_items.resize(num_outputs); - this->work_ninput_items.resize(num_inputs); - this->fcast_ninput_items.resize(num_inputs); this->input_items.resize(num_inputs); this->output_items.resize(num_outputs); this->consume_items.resize(num_inputs, 0); |