//
// Copyright 2012 Josh Blum
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see .
#define GRAS_API
////////////////////////////////////////////////////////////////////////
// SWIG director shit - be explicit with all virtual methods
////////////////////////////////////////////////////////////////////////
%module(directors="1") GRAS_Block
%feature("director") gras::BlockPython;
%feature("nodirector") gras::BlockPython::input_buffer_allocator;
%feature("nodirector") gras::BlockPython::output_buffer_allocator;
%feature("nodirector") gras::BlockPython::propagate_tags;
%feature("nodirector") gras::BlockPython::start;
%feature("nodirector") gras::BlockPython::stop;
%feature("nodirector") gras::BlockPython::notify_topology;
%feature("nodirector") gras::BlockPython::work;
////////////////////////////////////////////////////////////////////////
// http://www.swig.org/Doc2.0/Library.html#Library_stl_exceptions
////////////////////////////////////////////////////////////////////////
%include "exception.i"
%exception
{
try
{
$action
}
catch (const Swig::DirectorException &e)
{
SWIG_fail;
}
catch (const std::exception& e)
{
SWIG_exception(SWIG_RuntimeError, e.what());
}
}
%feature("director:except")
{
if ($error != NULL)
{
throw Swig::DirectorMethodException();
}
}
%{
#include
#include
%}
////////////////////////////////////////////////////////////////////////
// Simple class to deal with smart locking/unlocking of python GIL
////////////////////////////////////////////////////////////////////////
%{
struct PyGILPhondler
{
PyGILPhondler(void):
s(PyGILState_Ensure())
{
//NOP
}
~PyGILPhondler(void)
{
PyGILState_Release(s);
}
PyGILState_STATE s;
};
%}
////////////////////////////////////////////////////////////////////////
// SWIG up the representation for IO work arrays
////////////////////////////////////////////////////////////////////////
%include
%template (IntVec) std::vector;
%template (VoidStarVec) std::vector;
////////////////////////////////////////////////////////////////////////
// Pull in the implementation goodies
////////////////////////////////////////////////////////////////////////
%include
%include
%include
%include
%include
%include
////////////////////////////////////////////////////////////////////////
// Make a special block with safe overloads
////////////////////////////////////////////////////////////////////////
%inline %{
namespace gras
{
struct BlockPython : Block
{
BlockPython(const std::string &name):
Block(name)
{
//NOP
}
virtual ~BlockPython(void)
{
//NOP
}
bool start(void)
{
PyGILPhondler phil;
return this->_Py_start();
}
virtual bool _Py_start(void) = 0;
bool stop(void)
{
PyGILPhondler phil;
return this->_Py_stop();
}
virtual bool _Py_stop(void) = 0;
void notify_topology(const size_t num_inputs, const size_t num_outputs)
{
_input_addrs.resize(num_inputs);
_input_sizes.resize(num_inputs);
_output_addrs.resize(num_outputs);
_output_sizes.resize(num_outputs);
PyGILPhondler phil;
return this->_Py_notify_topology(num_inputs, num_outputs);
}
virtual void _Py_notify_topology(const size_t, const size_t) = 0;
void work
(
const InputItems &input_items,
const OutputItems &output_items
)
{
for (size_t i = 0; i < input_items.size(); i++)
{
_input_addrs[i] = (void *)(input_items[i].get());
_input_sizes[i] = input_items[i].size();
}
for (size_t i = 0; i < output_items.size(); i++)
{
_output_addrs[i] = (void *)(output_items[i].get());
_output_sizes[i] = output_items[i].size();
}
PyGILPhondler phil;
return this->_Py_work(_input_addrs, _input_sizes, _output_addrs, _output_sizes);
}
std::vector _input_addrs;
std::vector _input_sizes;
std::vector _output_addrs;
std::vector _output_sizes;
virtual void _Py_work
(
const std::vector &,
const std::vector &,
const std::vector &,
const std::vector &
) = 0;
};
}
%}
////////////////////////////////////////////////////////////////////////
// Conversion to the numpy array
////////////////////////////////////////////////////////////////////////
%pythoncode %{
import numpy
def pointer_to_ndarray(addr, dtype, nitems, readonly=False):
class array_like:
__array_interface__ = {
'data' : (addr, readonly),
'typestr' : dtype.base.str,
'descr' : dtype.base.descr,
'shape' : (nitems,) + dtype.shape,
'strides' : None,
'version' : 3,
}
return numpy.asarray(array_like()).view(dtype.base)
%}
////////////////////////////////////////////////////////////////////////
// Python overload for adding pythonic interfaces
////////////////////////////////////////////////////////////////////////
%pythoncode %{
import numpy
import traceback
def sig_to_dtype_sig(sig):
if sig is None: sig = ()
return map(numpy.dtype, sig)
class Block(BlockPython):
def __init__(self, name='Block', in_sig=None, out_sig=None):
BlockPython.__init__(self, name)
self.set_input_signature(in_sig)
self.set_output_signature(out_sig)
def set_input_signature(self, sig):
self.__in_sig = sig_to_dtype_sig(sig)
BlockPython.set_input_signature(self, IOSignature([s.itemsize for s in self.__in_sig]))
def set_output_signature(self, sig):
self.__out_sig = sig_to_dtype_sig(sig)
BlockPython.set_output_signature(self, IOSignature([s.itemsize for s in self.__out_sig]))
def input_signature(self): return self.__in_sig
def output_signature(self): return self.__out_sig
def _Py_work(self, input_addrs, input_sizes, output_addrs, output_sizes):
try:
input_arrays = list()
for i in self.__in_indexes:
addr = long(input_addrs[i])
nitems = input_sizes[i]
ndarray = pointer_to_ndarray(addr=addr, dtype=self.__in_sig[i], nitems=nitems, readonly=True)
input_arrays.append(ndarray)
output_arrays = list()
for i in self.__out_indexes:
addr = long(output_addrs[i])
nitems = output_sizes[i]
ndarray = pointer_to_ndarray(addr=addr, dtype=self.__out_sig[i], nitems=nitems, readonly=False)
output_arrays.append(ndarray)
ret = self.work(input_arrays, output_arrays)
if ret is not None:
raise Exception, 'work return != None, did you call consume/produce?'
except: traceback.print_exc(); raise
def work(self, *args):
print 'Implement Work!'
def _Py_notify_topology(self, num_inputs, num_outputs):
self.__in_indexes = range(num_inputs)
self.__out_indexes = range(num_outputs)
try: return self.notify_topology(num_inputs, num_outputs)
except: traceback.print_exc(); raise
def notify_topology(self, *args): return
def _Py_start(self):
try: return self.start()
except: traceback.print_exc(); raise
def start(self): return True
def _Py_stop(self):
try: return self.stop()
except: traceback.print_exc(); raise
def stop(self): return True
%}