summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/block.cpp19
-rw-r--r--lib/block_task.cpp24
-rw-r--r--lib/gras_impl/block_actor.hpp12
-rw-r--r--lib/topology_handler.cpp6
4 files changed, 29 insertions, 32 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 3732fdb..51c5f08 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -156,9 +156,9 @@ bool Block::stop(void)
return true;
}
-bool Block::check_topology(int, int)
+void Block::notify_topology(const size_t, const size_t)
{
- return true;
+ return;
}
void Block::set_buffer_affinity(const long affinity)
@@ -170,3 +170,18 @@ void Block::set_interruptible_work(const bool enb)
{
(*this)->block->interruptible_work = enb;
}
+
+void Block::mark_output_fail(const size_t which_output)
+{
+ (*this)->block->output_fail(which_output);
+}
+
+void Block::mark_input_fail(const size_t which_input)
+{
+ (*this)->block->input_fail(which_input);
+}
+
+void Block::mark_done(void)
+{
+ (*this)->block->mark_done();
+}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index c763476..4806914 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -71,7 +71,7 @@ void BlockActor::mark_done(void)
<< std::flush;
}
-GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i)
+void BlockActor::input_fail(const size_t i)
{
//input failed, accumulate and try again
if (not this->input_queues.is_accumulated(i))
@@ -82,6 +82,13 @@ GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i)
}
//otherwise check for done, else wait for more
if (this->inputs_done[i]) this->mark_done();
+
+ //TODO check if input buffer is max size and throw
+}
+
+void BlockActor::output_fail(const size_t i)
+{
+ //TODO
}
void BlockActor::handle_task(void)
@@ -103,7 +110,6 @@ void BlockActor::handle_task(void)
const size_t num_inputs = this->get_num_inputs();
const size_t num_outputs = this->get_num_outputs();
- this->work_io_ptr_mask = 0; //reset
//------------------------------------------------------------------
//-- initialize input buffers before work
@@ -160,7 +166,6 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
- this->work_ret = -1;
if (this->interruptible_thread)
{
this->interruptible_thread->call();
@@ -170,24 +175,13 @@ void BlockActor::handle_task(void)
this->task_work();
}
- if (work_ret >= 0)
- {
- this->input_fail(work_ret);
- return;
- }
-
- if (work_ret == -1)
- {
- this->mark_done();
- return;
- }
-
//------------------------------------------------------------------
//-- process input consumption
//------------------------------------------------------------------
for (size_t i = 0; i < num_inputs; i++)
{
const size_t items = this->consume_items[i];
+ if (items == 0) continue;
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 18c13e2..67e1815 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -101,6 +101,7 @@ struct BlockActor : Apology::Worker
void mark_done(void);
void handle_task(void);
void input_fail(const size_t index);
+ void output_fail(const size_t index);
void sort_tags(const size_t index);
void trim_tags(const size_t index);
GRAS_FORCE_INLINE bool any_inputs_done(void)
@@ -126,17 +127,9 @@ struct BlockActor : Apology::Worker
std::vector<uint64_t> items_consumed;
std::vector<uint64_t> items_produced;
- //work buffers for the classic interface
- size_t work_noutput_items;
- std::vector<const void *> work_input_items;
- std::vector<void *> work_output_items;
- std::vector<int> work_ninput_items;
- std::vector<int> fcast_ninput_items;
-
//work buffers for the new work interface
Block::InputItems input_items;
Block::OutputItems output_items;
- ptrdiff_t work_io_ptr_mask;
//track work's calls to produce and consume
std::vector<size_t> produce_items;
@@ -164,10 +157,9 @@ struct BlockActor : Apology::Worker
boost::shared_ptr<InterruptibleThread> interruptible_thread;
//work helpers
- int work_ret;
inline void task_work(void)
{
- this->work_ret = block_ptr->work(this->input_items, this->output_items);
+ block_ptr->work(this->input_items, this->output_items);
}
//is the fg running?
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index d1512e9..d058c41 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -52,7 +52,7 @@ void BlockActor::handle_topology(
const size_t num_outputs = this->get_num_outputs();
//call check_topology on block before committing settings
- this->block_ptr->check_topology(num_inputs, num_outputs);
+ this->block_ptr->notify_topology(num_inputs, num_outputs);
//fill the item sizes from the IO signatures
fill_item_sizes_from_sig(this->input_items_sizes, block_ptr->input_signature(), num_inputs);
@@ -67,10 +67,6 @@ void BlockActor::handle_topology(
resize_fill_grow(this->items_produced, num_outputs, 0);
//resize all work buffers to match current connections
- this->work_input_items.resize(num_inputs);
- this->work_output_items.resize(num_outputs);
- this->work_ninput_items.resize(num_inputs);
- this->fcast_ninput_items.resize(num_inputs);
this->input_items.resize(num_inputs);
this->output_items.resize(num_outputs);
this->consume_items.resize(num_inputs, 0);