summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2012-11-08 00:10:26 -0800
committerJosh Blum2012-11-08 00:10:26 -0800
commitd1d9b98d31d17b9c7b8b80105de890c287ccf96e (patch)
tree26944b48f8eb758fe5ecaaa41a5bcb9e7db6d495
parent4e6548ed237f3d6eda4383d6a07a4d1e99f404f6 (diff)
downloadsandhi-d1d9b98d31d17b9c7b8b80105de890c287ccf96e.tar.gz
sandhi-d1d9b98d31d17b9c7b8b80105de890c287ccf96e.tar.bz2
sandhi-d1d9b98d31d17b9c7b8b80105de890c287ccf96e.zip
lot of python locking hell...
-rw-r--r--include/gras/top_block.hpp4
-rw-r--r--lib/top_block.cpp2
-rw-r--r--lib/topology_handler.cpp2
-rw-r--r--python/gras/GRAS_Block.i102
-rw-r--r--python/gras/GRAS_HierBlock.i42
-rw-r--r--tests/block_test.py96
-rw-r--r--tests/demo_blocks.py57
7 files changed, 174 insertions, 131 deletions
diff --git a/include/gras/top_block.hpp b/include/gras/top_block.hpp
index f74b7a3..b1351e7 100644
--- a/include/gras/top_block.hpp
+++ b/include/gras/top_block.hpp
@@ -70,10 +70,10 @@ struct GRAS_API TopBlock : HierBlock
void run(void);
//! Start a flow graph execution (does not block)
- void start(void);
+ virtual void start(void);
//! Stop a flow graph execution (does not block)
- void stop(void);
+ virtual void stop(void);
/*!
* Wait for threads to exit after stop() or run().
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index c321612..f5fb1a4 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -47,7 +47,7 @@ TopBlock::TopBlock(const std::string &name):
void ElementImpl::top_block_cleanup(void)
{
this->executor->post_all(TopInertMessage());
- this->topology->clear_all();;
+ this->topology->clear_all();
this->executor->commit();
if (ARMAGEDDON) std::cerr
<< "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index 9a1f243..1a19a38 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -51,7 +51,7 @@ void BlockActor::handle_topology(
const size_t num_inputs = this->get_num_inputs();
const size_t num_outputs = this->get_num_outputs();
- //call check_topology on block before committing settings
+ //call notify_topology on block before committing settings
this->block_ptr->notify_topology(num_inputs, num_outputs);
//fill the item sizes from the IO signatures
diff --git a/python/gras/GRAS_Block.i b/python/gras/GRAS_Block.i
index ca43589..2171129 100644
--- a/python/gras/GRAS_Block.i
+++ b/python/gras/GRAS_Block.i
@@ -29,6 +29,7 @@
%feature("nodirector") gras::BlockPython::notify_topology;
%feature("nodirector") gras::BlockPython::work;
+
////////////////////////////////////////////////////////////////////////
// http://www.swig.org/Doc2.0/Library.html#Library_stl_exceptions
////////////////////////////////////////////////////////////////////////
@@ -40,6 +41,10 @@
{
$action
}
+ catch (const Swig::DirectorException &e)
+ {
+ SWIG_fail;
+ }
catch (const std::exception& e)
{
SWIG_exception(SWIG_RuntimeError, e.what());
@@ -83,11 +88,9 @@ struct PyGILPhondler
////////////////////////////////////////////////////////////////////////
// SWIG up the representation for IO work arrays
////////////////////////////////////////////////////////////////////////
-%include <std_pair.i>
%include <std_vector.i>
-
-%template () std::pair<size_t, size_t>;
-%template () std::vector<std::pair<size_t, size_t> >;
+%template (IntVec) std::vector<size_t>;
+%template (VoidStarVec) std::vector<void *>;
////////////////////////////////////////////////////////////////////////
// Pull in the implementation goodies
@@ -123,7 +126,7 @@ struct BlockPython : Block
bool start(void)
{
- PyGILPhondler phil();
+ PyGILPhondler phil;
return this->_Py_start();
}
@@ -131,7 +134,7 @@ struct BlockPython : Block
bool stop(void)
{
- PyGILPhondler phil();
+ PyGILPhondler phil;
return this->_Py_stop();
}
@@ -139,14 +142,16 @@ struct BlockPython : Block
void notify_topology(const size_t num_inputs, const size_t num_outputs)
{
- _input_items.resize(num_inputs);
- _output_items.resize(num_outputs);
+ _input_addrs.resize(num_inputs);
+ _input_sizes.resize(num_inputs);
+ _output_addrs.resize(num_outputs);
+ _output_sizes.resize(num_outputs);
- PyGILPhondler phil();
+ PyGILPhondler phil;
return this->_Py_notify_topology(num_inputs, num_outputs);
}
- virtual void _Py_notify_topology(const size_t num_inputs, const size_t num_outputs) = 0;
+ virtual void _Py_notify_topology(const size_t, const size_t) = 0;
void work
(
@@ -156,28 +161,31 @@ struct BlockPython : Block
{
for (size_t i = 0; i < input_items.size(); i++)
{
- _input_items[i].first = size_t(input_items[i].get());
- _input_items[i].second = input_items[i].size();
+ _input_addrs[i] = (void *)(input_items[i].get());
+ _input_sizes[i] = input_items[i].size();
}
for (size_t i = 0; i < output_items.size(); i++)
{
- _output_items[i].first = size_t(output_items[i].get());
- _output_items[i].second = output_items[i].size();
+ _output_addrs[i] = (void *)(output_items[i].get());
+ _output_sizes[i] = output_items[i].size();
}
- PyGILPhondler phil();
- return this->_Py_work(_input_items, _output_items);
+ PyGILPhondler phil;
+ return this->_Py_work(_input_addrs, _input_sizes, _output_addrs, _output_sizes);
}
- typedef std::vector<std::pair<size_t, size_t> > IOPairVec;
- IOPairVec _input_items;
- IOPairVec _output_items;
+ std::vector<void *> _input_addrs;
+ std::vector<size_t> _input_sizes;
+ std::vector<void *> _output_addrs;
+ std::vector<size_t> _output_sizes;
virtual void _Py_work
(
- const IOPairVec &input_items,
- const IOPairVec &output_items
+ const std::vector<void *> &,
+ const std::vector<size_t> &,
+ const std::vector<void *> &,
+ const std::vector<size_t> &
) = 0;
};
@@ -233,28 +241,44 @@ class Block(BlockPython):
def input_signature(self): return self.__in_sig
def output_signature(self): return self.__out_sig
- def _Py_work(self, input_items, output_items):
-
- input_arrays = list()
- for i, item in enumerate(input_items):
- addr, nitems = item
- ndarray = pointer_to_ndarray(addr=addr, dtype=self.__in_sig[i], nitems=nitems, readonly=True)
- input_arrays.append(ndarray)
-
- output_arrays = list()
- for i, item in enumerate(output_items):
- addr, nitems = item
- ndarray = pointer_to_ndarray(addr=addr, dtype=self.__out_sig[i], nitems=nitems, readonly=False)
- output_arrays.append(ndarray)
-
- ret = self.work(input_arrays, output_arrays)
- if ret is not None:
- raise Exception, 'work return != None, did you call consume/produce?'
+ def _Py_work(self, input_addrs, input_sizes, output_addrs, output_sizes):
+
+ try:
+
+ input_arrays = list()
+ for i in self.__in_indexes:
+ addr = long(input_addrs[i])
+ nitems = input_sizes[i]
+ ndarray = pointer_to_ndarray(addr=addr, dtype=self.__in_sig[i], nitems=nitems, readonly=True)
+ input_arrays.append(ndarray)
+
+ output_arrays = list()
+ for i in self.__out_indexes:
+ addr = long(output_addrs[i])
+ nitems = output_sizes[i]
+ ndarray = pointer_to_ndarray(addr=addr, dtype=self.__out_sig[i], nitems=nitems, readonly=False)
+ output_arrays.append(ndarray)
+
+ if True:
+ #try:
+ ret = self.work(input_arrays, output_arrays)
+ if ret is not None:
+ raise Exception, 'work return != None, did you call consume/produce?'
+ #except Exception, e:
+ # print e
+ # raise
+ except Exception as ex:
+ import traceback; traceback.print_exc()
+ raise ex
def work(self, *args):
print 'Implement Work!'
- def _Py_notify_topology(self, *args): return self.notify_topology(*args)
+ def _Py_notify_topology(self, num_inputs, num_outputs):
+ self.__in_indexes = range(num_inputs)
+ self.__out_indexes = range(num_outputs)
+ return self.notify_topology(num_inputs, num_outputs)
+
def notify_topology(self, *args): return
def _Py_start(self): return self.start()
diff --git a/python/gras/GRAS_HierBlock.i b/python/gras/GRAS_HierBlock.i
index 16142e5..c84dc0e 100644
--- a/python/gras/GRAS_HierBlock.i
+++ b/python/gras/GRAS_HierBlock.i
@@ -33,6 +33,27 @@
}
}
+////////////////////////////////////////////////////////////////////////
+// Simple class to deal with smart save/restore of python thread state
+////////////////////////////////////////////////////////////////////////
+%{
+
+struct PyTSPhondler
+{
+ PyTSPhondler(void):
+ s(PyEval_SaveThread())
+ {
+ //NOP
+ }
+ ~PyTSPhondler(void)
+ {
+ PyEval_RestoreThread(s);
+ }
+ PyThreadState *s;
+};
+
+%}
+
%{
#include <gras/hier_block.hpp>
#include <gras/top_block.hpp>
@@ -64,19 +85,28 @@ struct TopBlockPython : TopBlock
//NOP
}
+ void start(void)
+ {
+ PyTSPhondler phil;
+ TopBlock::start();
+ }
+
+ void stop(void)
+ {
+ PyTSPhondler phil;
+ TopBlock::stop();
+ }
+
void wait(void)
{
- PyThreadState *s = PyEval_SaveThread();
+ PyTSPhondler phil;
TopBlock::wait();
- PyEval_RestoreThread(s);
}
bool wait(const double timeout)
{
- PyThreadState *s = PyEval_SaveThread();
- const bool ret = TopBlock::wait(timeout);
- PyEval_RestoreThread(s);
- return ret;
+ PyTSPhondler phil;
+ return TopBlock::wait(timeout);
}
};
diff --git a/tests/block_test.py b/tests/block_test.py
index c62ea6f..43c764e 100644
--- a/tests/block_test.py
+++ b/tests/block_test.py
@@ -2,59 +2,7 @@
import unittest
import gras
import numpy
-
-class NullSource(gras.Block):
- def __init__(self):
- gras.Block.__init__(self, 'NullSource')
- self.set_output_signature([numpy.int32])
-
- def work(self, ins, outs):
- self.mark_done()
-
-class NullSink(gras.Block):
- def __init__(self):
- gras.Block.__init__(self, 'NullSink')
- self.set_input_signature([numpy.int32])
-
- def work(self, ins, outs):
- self.mark_done()
-
-class VectorSource(gras.Block):
- def __init__(self, out_sig, vec):
- gras.Block.__init__(self,
- name='VectorSource',
- in_sig=None,
- out_sig=[out_sig],
- )
- self._vec = vec
-
- def work(self, ins, outs):
- print 'vector source work'
- num = min(len(outs[0]), len(self._vec))
- outs[0][:num] = self._vec[:num]
- self.produce(0, num)
- self._vec = self._vec[num:]
- if not self._vec:
- self.mark_done()
- print 'vector source work done'
-
-class VectorSink(gras.Block):
- def __init__(self, in_sig):
- gras.Block.__init__(self,
- name='VectorSink',
- in_sig=[in_sig],
- out_sig=None,
- )
- self._vec = list()
-
- def get_vector(self):
- return self._vec
-
- def work(self, ins, outs):
- print 'vector sink work'
- self._vec.extend(ins[0].copy())
- self.consume(0, len(ins[0]))
- print 'vector sink work done'
+from demo_blocks import *
class BlockTest(unittest.TestCase):
@@ -62,43 +10,27 @@ class BlockTest(unittest.TestCase):
vec_source = VectorSource(numpy.uint32, [0, 9, 8, 7, 6])
vec_sink = VectorSink(numpy.uint32)
- print 'make tb'
tb = gras.TopBlock()
- print 'connect'
tb.connect(vec_source, vec_sink)
- print 'run'
tb.run()
- print 'done run'
+ tb.commit()
tb = None
- print vec_sink.get_vector()
- vec_source = None
- vec_sink = None
-
- def test_make_block(self):
- #return
- null_src = NullSource()
-
- null_sink = NullSink()
+ self.assertEqual(vec_sink.get_vector(), (0, 9, 8, 7, 6))
+ def test_add_f32(self):
tb = gras.TopBlock()
-
- print 'connect...'
- tb.connect(null_src, null_sink)
- print 'run'
-
- print 'py start'
- tb.start()
- import time; time.sleep(1)
- print 'py stop'
- tb.stop()
- print 'py wait'
- tb.wait()
- print 'ok'
-
- tb.disconnect(null_src, null_sink)
+ src0 = VectorSource(numpy.float32, [1, 3, 5, 7, 9])
+ src1 = VectorSource(numpy.float32, [0, 2, 4, 6, 8])
+ adder = add_2x(numpy.float32)
+ sink = VectorSink(numpy.float32)
+ tb.connect((src0, 0), (adder, 0))
+ tb.connect((src1, 0), (adder, 1))
+ tb.connect(adder, sink)
+ tb.run()
tb.commit()
- print 'done'
+ tb = None
+ self.assertEqual(sink.get_vector(), (1, 5, 9, 13, 17))
if __name__ == '__main__':
unittest.main()
diff --git a/tests/demo_blocks.py b/tests/demo_blocks.py
new file mode 100644
index 0000000..3a5d5cb
--- /dev/null
+++ b/tests/demo_blocks.py
@@ -0,0 +1,57 @@
+import gras
+import numpy
+
+class add_2x(gras.Block):
+ def __init__(self, sig):
+ gras.Block.__init__(self,
+ name = "add 2x",
+ in_sig = [sig, sig],
+ out_sig = [sig],
+ )
+
+ def work(self, ins, outs):
+ nitems = min(*map(len, (ins[0], ins[1], outs[0])))
+ outs[0][:nitems] = ins[0][:nitems] + ins[1][:nitems]
+ self.consume(0, nitems)
+ self.consume(1, nitems)
+ self.produce(0, nitems)
+
+class NullSource(gras.Block):
+ def __init__(self, out_sig):
+ gras.Block.__init__(self, 'NullSource', out_sig=[out_sig])
+
+ def work(self, ins, outs):
+ outs[0][:] = numpy.zeros(len(outs[0]))
+ self.produce(0, len(outs[0]))
+
+class NullSink(gras.Block):
+ def __init__(self, in_sig):
+ gras.Block.__init__(self, 'NullSink', in_sig=[in_sig])
+
+ def work(self, ins, outs):
+ self.consume(0, len(ins[0]))
+
+class VectorSource(gras.Block):
+ def __init__(self, out_sig, vec):
+ gras.Block.__init__(self, name='VectorSource', out_sig=[out_sig])
+ self._vec = vec
+
+ def work(self, ins, outs):
+ num = min(len(outs[0]), len(self._vec))
+ outs[0][:num] = self._vec[:num]
+ self.produce(0, num)
+ self._vec = self._vec[num:]
+ if not self._vec:
+ self.mark_done()
+
+class VectorSink(gras.Block):
+ def __init__(self, in_sig):
+ gras.Block.__init__(self, name='VectorSink', in_sig=[in_sig])
+ self._vec = list()
+
+ def get_vector(self):
+ return tuple(self._vec)
+
+ def work(self, ins, outs):
+ self._vec.extend(ins[0].copy())
+ self.consume(0, len(ins[0]))