summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
m---------gnuradio0
-rw-r--r--include/gras/block.hpp52
-rw-r--r--lib/block.cpp19
-rw-r--r--lib/block_task.cpp24
-rw-r--r--lib/gras_impl/block_actor.hpp12
-rw-r--r--lib/topology_handler.cpp6
-rw-r--r--python/gras/GRAS_Block.i21
-rw-r--r--tests/block_test.py12
8 files changed, 93 insertions, 53 deletions
diff --git a/gnuradio b/gnuradio
-Subproject ff90bae8a04dc7b6375d7f9301061d1a1e98d61
+Subproject 2ea57794854cc0ee121cac8d33c81e120c9ced0
diff --git a/include/gras/block.hpp b/include/gras/block.hpp
index dcdd505..6ad162b 100644
--- a/include/gras/block.hpp
+++ b/include/gras/block.hpp
@@ -194,13 +194,59 @@ struct GRAS_API Block : Element
typedef std::vector<WorkBuffer<void *> > OutputItems;
//! The official call into the work routine (overload please)
- virtual int work(
+ virtual void work(
const InputItems &input_items,
const OutputItems &output_items
) = 0;
- //! scheduler calls when the topology is updated, can be overloaded
- virtual bool check_topology(int ninputs, int noutputs);
+ /*!
+ * Tell the scheduler that an output requirement could not be met.
+ *
+ * - If the output buffer was partially filled (ie, not flushed downstream),
+ * this will cause the output buffer to flush to the downstream.
+ * The next call to work will be with a full size output buffer.
+ *
+ * - If the output buffer was not partially filled, this call will throw.
+ * In this case, the user should set larger reserve_items on this port.
+ * \param which_output the output port index
+ */
+ void mark_output_fail(const size_t which_output);
+
+ /*!
+ * Tell the scheduler that an input requirement could not be met.
+ *
+ * - If there are more inputs enqueued ahead of this buffer,
+ * the enqueued inputs will be accumulated into a larger buffer.
+ * The next call to work will be with a larger input buffer.
+ *
+ * - If the buffer is already accumlated and the upstream provider
+ * is no longer producing, then the scheduler will mark this block done.
+ *
+ * - If the input buffer at the maximum size, this call will throw.
+ * In this case, the user should set larger reserve_items on this port.
+ *
+ * If the output buffer was partially filled (ie, not flushed downstream),
+ * this will cause the output buffer to flush to the downstream.
+ * The next call to work will be with a full size output buffer.
+ * If the output buffer was not partially filled, this call will throw.
+ * In this case, the user should set larger reserve_items on this port.
+ * \param which_output the output port index
+ */
+ void mark_input_fail(const size_t which_input);
+
+ /*!
+ * Mark this block as done.
+ * The scheduler will no longer call the work() routine.
+ * Downstream consumers and upstream providers will be notified.
+ */
+ void mark_done(void);
+
+ /*!
+ * Overload notify_topology to get called on topological changes.
+ * Use notify_topology to perform one-time resizing operations
+ * to avoid a conditional resizing operation inside the work().
+ */
+ virtual void notify_topology(const size_t num_inputs, const size_t num_outputs);
/*!
* Set if the work call should be interruptible by stop().
diff --git a/lib/block.cpp b/lib/block.cpp
index 3732fdb..51c5f08 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -156,9 +156,9 @@ bool Block::stop(void)
return true;
}
-bool Block::check_topology(int, int)
+void Block::notify_topology(const size_t, const size_t)
{
- return true;
+ return;
}
void Block::set_buffer_affinity(const long affinity)
@@ -170,3 +170,18 @@ void Block::set_interruptible_work(const bool enb)
{
(*this)->block->interruptible_work = enb;
}
+
+void Block::mark_output_fail(const size_t which_output)
+{
+ (*this)->block->output_fail(which_output);
+}
+
+void Block::mark_input_fail(const size_t which_input)
+{
+ (*this)->block->input_fail(which_input);
+}
+
+void Block::mark_done(void)
+{
+ (*this)->block->mark_done();
+}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index c763476..4806914 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -71,7 +71,7 @@ void BlockActor::mark_done(void)
<< std::flush;
}
-GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i)
+void BlockActor::input_fail(const size_t i)
{
//input failed, accumulate and try again
if (not this->input_queues.is_accumulated(i))
@@ -82,6 +82,13 @@ GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i)
}
//otherwise check for done, else wait for more
if (this->inputs_done[i]) this->mark_done();
+
+ //TODO check if input buffer is max size and throw
+}
+
+void BlockActor::output_fail(const size_t i)
+{
+ //TODO
}
void BlockActor::handle_task(void)
@@ -103,7 +110,6 @@ void BlockActor::handle_task(void)
const size_t num_inputs = this->get_num_inputs();
const size_t num_outputs = this->get_num_outputs();
- this->work_io_ptr_mask = 0; //reset
//------------------------------------------------------------------
//-- initialize input buffers before work
@@ -160,7 +166,6 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
- this->work_ret = -1;
if (this->interruptible_thread)
{
this->interruptible_thread->call();
@@ -170,24 +175,13 @@ void BlockActor::handle_task(void)
this->task_work();
}
- if (work_ret >= 0)
- {
- this->input_fail(work_ret);
- return;
- }
-
- if (work_ret == -1)
- {
- this->mark_done();
- return;
- }
-
//------------------------------------------------------------------
//-- process input consumption
//------------------------------------------------------------------
for (size_t i = 0; i < num_inputs; i++)
{
const size_t items = this->consume_items[i];
+ if (items == 0) continue;
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 18c13e2..67e1815 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -101,6 +101,7 @@ struct BlockActor : Apology::Worker
void mark_done(void);
void handle_task(void);
void input_fail(const size_t index);
+ void output_fail(const size_t index);
void sort_tags(const size_t index);
void trim_tags(const size_t index);
GRAS_FORCE_INLINE bool any_inputs_done(void)
@@ -126,17 +127,9 @@ struct BlockActor : Apology::Worker
std::vector<uint64_t> items_consumed;
std::vector<uint64_t> items_produced;
- //work buffers for the classic interface
- size_t work_noutput_items;
- std::vector<const void *> work_input_items;
- std::vector<void *> work_output_items;
- std::vector<int> work_ninput_items;
- std::vector<int> fcast_ninput_items;
-
//work buffers for the new work interface
Block::InputItems input_items;
Block::OutputItems output_items;
- ptrdiff_t work_io_ptr_mask;
//track work's calls to produce and consume
std::vector<size_t> produce_items;
@@ -164,10 +157,9 @@ struct BlockActor : Apology::Worker
boost::shared_ptr<InterruptibleThread> interruptible_thread;
//work helpers
- int work_ret;
inline void task_work(void)
{
- this->work_ret = block_ptr->work(this->input_items, this->output_items);
+ block_ptr->work(this->input_items, this->output_items);
}
//is the fg running?
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index d1512e9..d058c41 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -52,7 +52,7 @@ void BlockActor::handle_topology(
const size_t num_outputs = this->get_num_outputs();
//call check_topology on block before committing settings
- this->block_ptr->check_topology(num_inputs, num_outputs);
+ this->block_ptr->notify_topology(num_inputs, num_outputs);
//fill the item sizes from the IO signatures
fill_item_sizes_from_sig(this->input_items_sizes, block_ptr->input_signature(), num_inputs);
@@ -67,10 +67,6 @@ void BlockActor::handle_topology(
resize_fill_grow(this->items_produced, num_outputs, 0);
//resize all work buffers to match current connections
- this->work_input_items.resize(num_inputs);
- this->work_output_items.resize(num_outputs);
- this->work_ninput_items.resize(num_inputs);
- this->fcast_ninput_items.resize(num_inputs);
this->input_items.resize(num_inputs);
this->output_items.resize(num_outputs);
this->consume_items.resize(num_inputs, 0);
diff --git a/python/gras/GRAS_Block.i b/python/gras/GRAS_Block.i
index 9e917b4..ab92b26 100644
--- a/python/gras/GRAS_Block.i
+++ b/python/gras/GRAS_Block.i
@@ -26,7 +26,7 @@
%feature("nodirector") gras::BlockPython::propagate_tags;
%feature("nodirector") gras::BlockPython::start;
%feature("nodirector") gras::BlockPython::stop;
-%feature("nodirector") gras::BlockPython::check_topology;
+%feature("nodirector") gras::BlockPython::notify_topology;
%feature("nodirector") gras::BlockPython::work;
////////////////////////////////////////////////////////////////////////
@@ -137,15 +137,15 @@ struct BlockPython : Block
virtual bool _Py_stop(void) = 0;
- bool check_topology(int ninputs, int noutputs)
+ void notify_topology(const size_t num_inputs, const size_t num_outputs)
{
PyGILPhondler phil();
- return this->_Py_check_topology(ninputs, noutputs);
+ return this->_Py_notify_topology(num_inputs, num_outputs);
}
- virtual bool _Py_check_topology(int ninputs, int noutputs) = 0;
+ virtual void _Py_notify_topology(const size_t num_inputs, const size_t num_outputs) = 0;
- int work
+ void work
(
const InputItems &input_items,
const OutputItems &output_items
@@ -173,7 +173,7 @@ struct BlockPython : Block
IOPairVec _input_items;
IOPairVec _output_items;
- virtual int _Py_work
+ virtual void _Py_work
(
const IOPairVec &input_items,
const IOPairVec &output_items
@@ -247,14 +247,15 @@ class Block(BlockPython):
ndarray = pointer_to_ndarray(addr=addr, dtype=self.__out_sig[i], nitems=nitems, readonly=False)
output_arrays.append(ndarray)
- return self.work(input_arrays, output_arrays)
+ ret = self.work(input_arrays, output_arrays)
+ if ret is not None:
+ raise Exception, 'work return != None, did you call consume/produce?'
def work(self, *args):
print 'Implement Work!'
- return -1
- def _Py_check_topology(self, *args): return self.check_topology(*args)
- def check_topology(self, *args): return True
+ def _Py_notify_topology(self, *args): return self.notify_topology(*args)
+ def notify_topology(self, *args): return
def _Py_start(self): return self.start()
def start(self): return True
diff --git a/tests/block_test.py b/tests/block_test.py
index d510b6e..b598b27 100644
--- a/tests/block_test.py
+++ b/tests/block_test.py
@@ -8,16 +8,16 @@ class NullSource(gras.Block):
gras.Block.__init__(self, 'NullSource')
self.set_output_signature([numpy.int32])
- def callback(self, x):
- print x
+ def work(self, ins, outs):
+ self.mark_done()
class NullSink(gras.Block):
def __init__(self):
gras.Block.__init__(self, 'NullSink')
self.set_input_signature([numpy.int32])
- def callback(self, x):
- print x
+ def work(self, ins, outs):
+ self.mark_done()
class BlockTest(unittest.TestCase):
@@ -26,10 +26,6 @@ class BlockTest(unittest.TestCase):
null_sink = NullSink()
- #null_src.doit(321)
-
- #return
-
tb = gras.TopBlock()
print 'connect...'