diff options
author | Josh Blum | 2012-11-05 22:59:47 -0800 |
---|---|---|
committer | Josh Blum | 2012-11-05 22:59:47 -0800 |
commit | 37a863991f5e05035cce6738379de7a8ebc2b160 (patch) | |
tree | c7f2c7732074d69f1b3fe147f5fb32d92c34c9c8 | |
parent | ed68f79a7a2e3c1bdbe0a2b4c5498ac29e431cce (diff) | |
download | sandhi-37a863991f5e05035cce6738379de7a8ebc2b160.tar.gz sandhi-37a863991f5e05035cce6738379de7a8ebc2b160.tar.bz2 sandhi-37a863991f5e05035cce6738379de7a8ebc2b160.zip |
polish up work and notify api
There is a formal api to mark done, and IO failures.
Notify topology was created for consistency.
So far, there output fail is unimplemented,
and input fail requires an additional check.
m--------- | gnuradio | 0 | ||||
-rw-r--r-- | include/gras/block.hpp | 52 | ||||
-rw-r--r-- | lib/block.cpp | 19 | ||||
-rw-r--r-- | lib/block_task.cpp | 24 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 12 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 6 | ||||
-rw-r--r-- | python/gras/GRAS_Block.i | 21 | ||||
-rw-r--r-- | tests/block_test.py | 12 |
8 files changed, 93 insertions, 53 deletions
diff --git a/gnuradio b/gnuradio -Subproject ff90bae8a04dc7b6375d7f9301061d1a1e98d61 +Subproject 2ea57794854cc0ee121cac8d33c81e120c9ced0 diff --git a/include/gras/block.hpp b/include/gras/block.hpp index dcdd505..6ad162b 100644 --- a/include/gras/block.hpp +++ b/include/gras/block.hpp @@ -194,13 +194,59 @@ struct GRAS_API Block : Element typedef std::vector<WorkBuffer<void *> > OutputItems; //! The official call into the work routine (overload please) - virtual int work( + virtual void work( const InputItems &input_items, const OutputItems &output_items ) = 0; - //! scheduler calls when the topology is updated, can be overloaded - virtual bool check_topology(int ninputs, int noutputs); + /*! + * Tell the scheduler that an output requirement could not be met. + * + * - If the output buffer was partially filled (ie, not flushed downstream), + * this will cause the output buffer to flush to the downstream. + * The next call to work will be with a full size output buffer. + * + * - If the output buffer was not partially filled, this call will throw. + * In this case, the user should set larger reserve_items on this port. + * \param which_output the output port index + */ + void mark_output_fail(const size_t which_output); + + /*! + * Tell the scheduler that an input requirement could not be met. + * + * - If there are more inputs enqueued ahead of this buffer, + * the enqueued inputs will be accumulated into a larger buffer. + * The next call to work will be with a larger input buffer. + * + * - If the buffer is already accumlated and the upstream provider + * is no longer producing, then the scheduler will mark this block done. + * + * - If the input buffer at the maximum size, this call will throw. + * In this case, the user should set larger reserve_items on this port. + * + * If the output buffer was partially filled (ie, not flushed downstream), + * this will cause the output buffer to flush to the downstream. + * The next call to work will be with a full size output buffer. + * If the output buffer was not partially filled, this call will throw. + * In this case, the user should set larger reserve_items on this port. + * \param which_output the output port index + */ + void mark_input_fail(const size_t which_input); + + /*! + * Mark this block as done. + * The scheduler will no longer call the work() routine. + * Downstream consumers and upstream providers will be notified. + */ + void mark_done(void); + + /*! + * Overload notify_topology to get called on topological changes. + * Use notify_topology to perform one-time resizing operations + * to avoid a conditional resizing operation inside the work(). + */ + virtual void notify_topology(const size_t num_inputs, const size_t num_outputs); /*! * Set if the work call should be interruptible by stop(). diff --git a/lib/block.cpp b/lib/block.cpp index 3732fdb..51c5f08 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -156,9 +156,9 @@ bool Block::stop(void) return true; } -bool Block::check_topology(int, int) +void Block::notify_topology(const size_t, const size_t) { - return true; + return; } void Block::set_buffer_affinity(const long affinity) @@ -170,3 +170,18 @@ void Block::set_interruptible_work(const bool enb) { (*this)->block->interruptible_work = enb; } + +void Block::mark_output_fail(const size_t which_output) +{ + (*this)->block->output_fail(which_output); +} + +void Block::mark_input_fail(const size_t which_input) +{ + (*this)->block->input_fail(which_input); +} + +void Block::mark_done(void) +{ + (*this)->block->mark_done(); +} diff --git a/lib/block_task.cpp b/lib/block_task.cpp index c763476..4806914 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -71,7 +71,7 @@ void BlockActor::mark_done(void) << std::flush; } -GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i) +void BlockActor::input_fail(const size_t i) { //input failed, accumulate and try again if (not this->input_queues.is_accumulated(i)) @@ -82,6 +82,13 @@ GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i) } //otherwise check for done, else wait for more if (this->inputs_done[i]) this->mark_done(); + + //TODO check if input buffer is max size and throw +} + +void BlockActor::output_fail(const size_t i) +{ + //TODO } void BlockActor::handle_task(void) @@ -103,7 +110,6 @@ void BlockActor::handle_task(void) const size_t num_inputs = this->get_num_inputs(); const size_t num_outputs = this->get_num_outputs(); - this->work_io_ptr_mask = 0; //reset //------------------------------------------------------------------ //-- initialize input buffers before work @@ -160,7 +166,6 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ - this->work_ret = -1; if (this->interruptible_thread) { this->interruptible_thread->call(); @@ -170,24 +175,13 @@ void BlockActor::handle_task(void) this->task_work(); } - if (work_ret >= 0) - { - this->input_fail(work_ret); - return; - } - - if (work_ret == -1) - { - this->mark_done(); - return; - } - //------------------------------------------------------------------ //-- process input consumption //------------------------------------------------------------------ for (size_t i = 0; i < num_inputs; i++) { const size_t items = this->consume_items[i]; + if (items == 0) continue; this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 18c13e2..67e1815 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -101,6 +101,7 @@ struct BlockActor : Apology::Worker void mark_done(void); void handle_task(void); void input_fail(const size_t index); + void output_fail(const size_t index); void sort_tags(const size_t index); void trim_tags(const size_t index); GRAS_FORCE_INLINE bool any_inputs_done(void) @@ -126,17 +127,9 @@ struct BlockActor : Apology::Worker std::vector<uint64_t> items_consumed; std::vector<uint64_t> items_produced; - //work buffers for the classic interface - size_t work_noutput_items; - std::vector<const void *> work_input_items; - std::vector<void *> work_output_items; - std::vector<int> work_ninput_items; - std::vector<int> fcast_ninput_items; - //work buffers for the new work interface Block::InputItems input_items; Block::OutputItems output_items; - ptrdiff_t work_io_ptr_mask; //track work's calls to produce and consume std::vector<size_t> produce_items; @@ -164,10 +157,9 @@ struct BlockActor : Apology::Worker boost::shared_ptr<InterruptibleThread> interruptible_thread; //work helpers - int work_ret; inline void task_work(void) { - this->work_ret = block_ptr->work(this->input_items, this->output_items); + block_ptr->work(this->input_items, this->output_items); } //is the fg running? diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index d1512e9..d058c41 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -52,7 +52,7 @@ void BlockActor::handle_topology( const size_t num_outputs = this->get_num_outputs(); //call check_topology on block before committing settings - this->block_ptr->check_topology(num_inputs, num_outputs); + this->block_ptr->notify_topology(num_inputs, num_outputs); //fill the item sizes from the IO signatures fill_item_sizes_from_sig(this->input_items_sizes, block_ptr->input_signature(), num_inputs); @@ -67,10 +67,6 @@ void BlockActor::handle_topology( resize_fill_grow(this->items_produced, num_outputs, 0); //resize all work buffers to match current connections - this->work_input_items.resize(num_inputs); - this->work_output_items.resize(num_outputs); - this->work_ninput_items.resize(num_inputs); - this->fcast_ninput_items.resize(num_inputs); this->input_items.resize(num_inputs); this->output_items.resize(num_outputs); this->consume_items.resize(num_inputs, 0); diff --git a/python/gras/GRAS_Block.i b/python/gras/GRAS_Block.i index 9e917b4..ab92b26 100644 --- a/python/gras/GRAS_Block.i +++ b/python/gras/GRAS_Block.i @@ -26,7 +26,7 @@ %feature("nodirector") gras::BlockPython::propagate_tags; %feature("nodirector") gras::BlockPython::start; %feature("nodirector") gras::BlockPython::stop; -%feature("nodirector") gras::BlockPython::check_topology; +%feature("nodirector") gras::BlockPython::notify_topology; %feature("nodirector") gras::BlockPython::work; //////////////////////////////////////////////////////////////////////// @@ -137,15 +137,15 @@ struct BlockPython : Block virtual bool _Py_stop(void) = 0; - bool check_topology(int ninputs, int noutputs) + void notify_topology(const size_t num_inputs, const size_t num_outputs) { PyGILPhondler phil(); - return this->_Py_check_topology(ninputs, noutputs); + return this->_Py_notify_topology(num_inputs, num_outputs); } - virtual bool _Py_check_topology(int ninputs, int noutputs) = 0; + virtual void _Py_notify_topology(const size_t num_inputs, const size_t num_outputs) = 0; - int work + void work ( const InputItems &input_items, const OutputItems &output_items @@ -173,7 +173,7 @@ struct BlockPython : Block IOPairVec _input_items; IOPairVec _output_items; - virtual int _Py_work + virtual void _Py_work ( const IOPairVec &input_items, const IOPairVec &output_items @@ -247,14 +247,15 @@ class Block(BlockPython): ndarray = pointer_to_ndarray(addr=addr, dtype=self.__out_sig[i], nitems=nitems, readonly=False) output_arrays.append(ndarray) - return self.work(input_arrays, output_arrays) + ret = self.work(input_arrays, output_arrays) + if ret is not None: + raise Exception, 'work return != None, did you call consume/produce?' def work(self, *args): print 'Implement Work!' - return -1 - def _Py_check_topology(self, *args): return self.check_topology(*args) - def check_topology(self, *args): return True + def _Py_notify_topology(self, *args): return self.notify_topology(*args) + def notify_topology(self, *args): return def _Py_start(self): return self.start() def start(self): return True diff --git a/tests/block_test.py b/tests/block_test.py index d510b6e..b598b27 100644 --- a/tests/block_test.py +++ b/tests/block_test.py @@ -8,16 +8,16 @@ class NullSource(gras.Block): gras.Block.__init__(self, 'NullSource') self.set_output_signature([numpy.int32]) - def callback(self, x): - print x + def work(self, ins, outs): + self.mark_done() class NullSink(gras.Block): def __init__(self): gras.Block.__init__(self, 'NullSink') self.set_input_signature([numpy.int32]) - def callback(self, x): - print x + def work(self, ins, outs): + self.mark_done() class BlockTest(unittest.TestCase): @@ -26,10 +26,6 @@ class BlockTest(unittest.TestCase): null_sink = NullSink() - #null_src.doit(321) - - #return - tb = gras.TopBlock() print 'connect...' |