summaryrefslogtreecommitdiff
path: root/gr-blocks/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-blocks/python')
-rw-r--r--gr-blocks/python/CMakeLists.txt48
-rw-r--r--gr-blocks/python/__init__.py37
-rw-r--r--gr-blocks/python/parse_file_metadata.py193
-rwxr-xr-xgr-blocks/python/qa_add_mult_div_sub.py212
-rwxr-xr-xgr-blocks/python/qa_add_mult_v.py360
-rwxr-xr-xgr-blocks/python/qa_boolean_operators.py195
-rw-r--r--gr-blocks/python/qa_conjugate.py54
-rwxr-xr-xgr-blocks/python/qa_delay.py65
-rw-r--r--gr-blocks/python/qa_file_metadata.py202
-rwxr-xr-xgr-blocks/python/qa_integrate.py76
-rwxr-xr-xgr-blocks/python/qa_interleave.py82
-rwxr-xr-xgr-blocks/python/qa_keep_m_in_n.py59
-rwxr-xr-xgr-blocks/python/qa_keep_one_in_n.py46
-rw-r--r--gr-blocks/python/qa_multiply_conjugate.py58
-rwxr-xr-xgr-blocks/python/qa_nlog10.py48
-rwxr-xr-xgr-blocks/python/qa_packed_to_unpacked.py344
-rwxr-xr-xgr-blocks/python/qa_patterned_interleaver.py56
-rw-r--r--gr-blocks/python/qa_peak_detector2.py58
-rwxr-xr-xgr-blocks/python/qa_pipe_fittings.py138
-rwxr-xr-xgr-blocks/python/qa_regenerate.py90
-rwxr-xr-xgr-blocks/python/qa_repeat.py49
-rw-r--r--gr-blocks/python/qa_rms.py83
-rwxr-xr-xgr-blocks/python/qa_stream_mux.py169
-rwxr-xr-xgr-blocks/python/qa_stretch.py67
-rw-r--r--gr-blocks/python/qa_threshold.py54
-rwxr-xr-xgr-blocks/python/qa_throttle.py39
-rw-r--r--gr-blocks/python/qa_transcendental.py90
-rwxr-xr-xgr-blocks/python/qa_type_conversions.py316
28 files changed, 3288 insertions, 0 deletions
diff --git a/gr-blocks/python/CMakeLists.txt b/gr-blocks/python/CMakeLists.txt
new file mode 100644
index 000000000..cab0b956f
--- /dev/null
+++ b/gr-blocks/python/CMakeLists.txt
@@ -0,0 +1,48 @@
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+
+########################################################################
+include(GrPython)
+
+GR_PYTHON_INSTALL(
+ FILES
+ __init__.py
+ parse_file_metadata.py
+ DESTINATION ${GR_PYTHON_DIR}/gnuradio/blocks
+ COMPONENT "blocks_python"
+)
+
+########################################################################
+# Handle the unit tests
+########################################################################
+if(ENABLE_TESTING)
+
+list(APPEND GR_TEST_PYTHON_DIRS
+ ${CMAKE_BINARY_DIR}/gr-blocks/python
+ ${CMAKE_BINARY_DIR}/gr-blocks/swig
+)
+list(APPEND GR_TEST_TARGET_DEPS gnuradio-blocks)
+
+include(GrTest)
+file(GLOB py_qa_test_files "qa_*.py")
+foreach(py_qa_test_file ${py_qa_test_files})
+ get_filename_component(py_qa_test_name ${py_qa_test_file} NAME_WE)
+ GR_ADD_TEST(${py_qa_test_name} ${PYTHON_EXECUTABLE} ${PYTHON_DASH_B} ${py_qa_test_file})
+endforeach(py_qa_test_file)
+endif(ENABLE_TESTING)
diff --git a/gr-blocks/python/__init__.py b/gr-blocks/python/__init__.py
new file mode 100644
index 000000000..6577d933e
--- /dev/null
+++ b/gr-blocks/python/__init__.py
@@ -0,0 +1,37 @@
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+'''
+This is the gr-blocks package. This package provides GNU Radio
+processing blocks common to many flowgraphs.
+'''
+
+from blocks_swig import *
+
+#alias old gr_add_vXX and gr_multiply_vXX
+add_vcc = add_cc
+add_vff = add_ff
+add_vii = add_ii
+add_vss = add_ss
+multiply_vcc = multiply_cc
+multiply_vff = multiply_ff
+multiply_vii = multiply_ii
+multiply_vss = multiply_ss
diff --git a/gr-blocks/python/parse_file_metadata.py b/gr-blocks/python/parse_file_metadata.py
new file mode 100644
index 000000000..c8ac2def9
--- /dev/null
+++ b/gr-blocks/python/parse_file_metadata.py
@@ -0,0 +1,193 @@
+#!/usr/bin/env python
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import sys
+from gnuradio import gr
+
+try:
+ import pmt
+except ImportError:
+ from gruel import pmt
+
+
+try:
+ import blocks_swig as blocks
+except ImportError:
+ from gnuradio import blocks
+
+'''
+sr Sample rate (samples/second)
+time Time as uint64(secs), double(fractional secs)
+type Type of data (see gr_file_types enum)
+cplx is complex? (True or False)
+strt Start of data (or size of header) in bytes
+size Size of data in bytes
+'''
+
+HEADER_LENGTH = blocks.METADATA_HEADER_SIZE
+
+ftype_to_string = {blocks.GR_FILE_BYTE: "bytes",
+ blocks.GR_FILE_SHORT: "short",
+ blocks.GR_FILE_INT: "int",
+ blocks.GR_FILE_LONG: "long",
+ blocks.GR_FILE_LONG_LONG: "long long",
+ blocks.GR_FILE_FLOAT: "float",
+ blocks.GR_FILE_DOUBLE: "double" }
+
+ftype_to_size = {blocks.GR_FILE_BYTE: gr.sizeof_char,
+ blocks.GR_FILE_SHORT: gr.sizeof_short,
+ blocks.GR_FILE_INT: gr.sizeof_int,
+ blocks.GR_FILE_LONG: gr.sizeof_int,
+ blocks.GR_FILE_LONG_LONG: 2*gr.sizeof_int,
+ blocks.GR_FILE_FLOAT: gr.sizeof_float,
+ blocks.GR_FILE_DOUBLE: gr.sizeof_double}
+
+def parse_header(p, VERBOSE=False):
+ dump = pmt.PMT_NIL
+
+ info = dict()
+
+ if(pmt.pmt_is_dict(p) is False):
+ sys.stderr.write("Header is not a PMT dictionary: invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # GET FILE FORMAT VERSION NUMBER
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("version"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("version"), dump)
+ version = pmt.pmt_to_long(r)
+ if(VERBOSE):
+ print "Version Number: {0}".format(version)
+ else:
+ sys.stderr.write("Could not find key 'version': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT SAMPLE RATE
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("rx_rate"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("rx_rate"), dump)
+ samp_rate = pmt.pmt_to_double(r)
+ info["rx_rate"] = samp_rate
+ if(VERBOSE):
+ print "Sample Rate: {0:.2f} sps".format(samp_rate)
+ else:
+ sys.stderr.write("Could not find key 'sr': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT TIME STAMP
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("rx_time"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("rx_time"), dump)
+ pmt_secs = pmt.pmt_tuple_ref(r, 0)
+ pmt_fracs = pmt.pmt_tuple_ref(r, 1)
+ secs = float(pmt.pmt_to_uint64(pmt_secs))
+ fracs = pmt.pmt_to_double(pmt_fracs)
+ t = secs + fracs
+ info["rx_time"] = t
+ if(VERBOSE):
+ print "Seconds: {0:.6f}".format(t)
+ else:
+ sys.stderr.write("Could not find key 'time': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT ITEM SIZE
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("size"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("size"), dump)
+ dsize = pmt.pmt_to_long(r)
+ info["size"] = dsize
+ if(VERBOSE):
+ print "Item size: {0}".format(dsize)
+ else:
+ sys.stderr.write("Could not find key 'size': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT DATA TYPE
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("type"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("type"), dump)
+ dtype = pmt.pmt_to_long(r)
+ stype = ftype_to_string[dtype]
+ info["type"] = stype
+ if(VERBOSE):
+ print "Data Type: {0} ({1})".format(stype, dtype)
+ else:
+ sys.stderr.write("Could not find key 'type': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT COMPLEX
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("cplx"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("cplx"), dump)
+ cplx = pmt.pmt_to_bool(r)
+ info["cplx"] = cplx
+ if(VERBOSE):
+ print "Complex? {0}".format(cplx)
+ else:
+ sys.stderr.write("Could not find key 'cplx': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT WHERE CURRENT SEGMENT STARTS
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("strt"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("strt"), dump)
+ seg_start = pmt.pmt_to_uint64(r)
+ info["hdr_len"] = seg_start
+ info["extra_len"] = seg_start - HEADER_LENGTH
+ info["has_extra"] = info["extra_len"] > 0
+ if(VERBOSE):
+ print "Header Length: {0} bytes".format(info["hdr_len"])
+ print "Extra Length: {0}".format((info["extra_len"]))
+ print "Extra Header? {0}".format(info["has_extra"])
+ else:
+ sys.stderr.write("Could not find key 'strt': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT SIZE OF DATA
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("bytes"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("bytes"), dump)
+ nbytes = pmt.pmt_to_uint64(r)
+
+ nitems = nbytes/dsize
+ info["nitems"] = nitems
+ info["nbytes"] = nbytes
+
+ if(VERBOSE):
+ print "Size of Data: {0} bytes".format(nbytes)
+ print " {0} items".format(nitems)
+ else:
+ sys.stderr.write("Could not find key 'size': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ return info
+
+# IF THERE IS EXTRA DATA, PULL OUT THE DICTIONARY AND PARSE IT
+def parse_extra_dict(p, info, VERBOSE=False):
+ if(pmt.pmt_is_dict(p) is False):
+ sys.stderr.write("Extra header is not a PMT dictionary: invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ items = pmt.pmt_dict_items(p)
+ nitems = pmt.pmt_length(items)
+ for i in xrange(nitems):
+ item = pmt.pmt_nth(i, items)
+ key = pmt.pmt_symbol_to_string(pmt.pmt_car(item))
+ val = pmt.pmt_cdr(item)
+ info[key] = val
+ if(VERBOSE):
+ print "{0}: ".format(key)
+ pmt.pmt_print(val)
+
+ return info
diff --git a/gr-blocks/python/qa_add_mult_div_sub.py b/gr-blocks/python/qa_add_mult_div_sub.py
new file mode 100755
index 000000000..0aca03d3f
--- /dev/null
+++ b/gr-blocks/python/qa_add_mult_div_sub.py
@@ -0,0 +1,212 @@
+#!/usr/bin/env python
+#
+# Copyright 2004,2007,2010,2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+
+class test_add_mult_div_sub(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def help_ii(self, src_data, exp_data, op):
+ for s in zip(range(len(src_data)), src_data):
+ src = gr.vector_source_i(s[1])
+ self.tb.connect(src, (op, s[0]))
+ dst = gr.vector_sink_i()
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_ss(self, src_data, exp_data, op):
+ for s in zip(range(len(src_data)), src_data):
+ src = gr.vector_source_s(s[1])
+ self.tb.connect(src, (op, s[0]))
+ dst = gr.vector_sink_s()
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_ff(self, src_data, exp_data, op):
+ for s in zip(range(len(src_data)), src_data):
+ src = gr.vector_source_f(s[1])
+ self.tb.connect(src, (op, s[0]))
+ dst = gr.vector_sink_f()
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_cc(self, src_data, exp_data, op):
+ for s in zip(range(len(src_data)), src_data):
+ src = gr.vector_source_c(s[1])
+ self.tb.connect(src, (op, s[0]))
+ dst = gr.vector_sink_c()
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ # add_XX
+
+ def test_add_ss(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (9, -1, 7, 12, 7)
+ op = blocks_swig.add_ss()
+ self.help_ss((src1_data, src2_data), expected_result, op)
+
+ def test_add_ii(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (9, -1, 7, 12, 7)
+ op = blocks_swig.add_ii()
+ self.help_ii((src1_data, src2_data), expected_result, op)
+
+ def test_add_ff(self):
+ src1_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ src2_data = (8.0, -3.0, 4.0, 8.0, 2.0)
+ expected_result = (9.0, -1.0, 7.0, 12.0, 7.0)
+ op = blocks_swig.add_ff()
+ self.help_ff((src1_data, src2_data), expected_result, op)
+
+ def test_add_cc(self):
+ src1_data = (1+1j, 2+2j, 3+3j, 4+4j, 5+5j)
+ src2_data = (8+8j, -3-3j, 4+4j, 8+8j, 2+2j)
+ expected_result = (9+9j, -1-1j, 7+7j, 12+12j, 7+7j)
+ op = blocks_swig.add_cc()
+ self.help_cc((src1_data, src2_data), expected_result, op)
+
+ # add_const_XX
+
+ def test_add_const_ss(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_result = (6, 7, 8, 9, 10)
+ op = blocks_swig.add_const_ss(5)
+ self.help_ss((src_data,), expected_result, op)
+
+ def test_add_const_ii(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_result = (6, 7, 8, 9, 10)
+ op = blocks_swig.add_const_ii(5)
+ self.help_ii((src_data,), expected_result, op)
+
+ def test_add_const_ff(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_result = (6, 7, 8, 9, 10)
+ op = blocks_swig.add_const_ff(5)
+ self.help_ff((src_data,), expected_result, op)
+
+ def test_add_const_cc(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_result = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j)
+ op = blocks_swig.add_const_cc(5j)
+ self.help_cc((src_data,), expected_result, op)
+
+ # multiply_XX
+
+ def test_multiply_ss(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (8, -6, 12, 32, 10)
+ op = blocks_swig.multiply_ss()
+ self.help_ss((src1_data, src2_data),
+ expected_result, op)
+
+ def test_multiply_ii(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (8, -6, 12, 32, 10)
+ op = blocks_swig.multiply_ii()
+ self.help_ii((src1_data, src2_data),
+ expected_result, op)
+
+ def test_multiply_ff(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (8, -6, 12, 32, 10)
+ op = blocks_swig.multiply_ff()
+ self.help_ff((src1_data, src2_data),
+ expected_result, op)
+
+ def test_multiply_cc(self):
+ src1_data = (1+1j, 2+2j, 3+3j, 4+4j, 5+5j)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (8+8j, -6-6j, 12+12j, 32+32j, 10+10j)
+ op = blocks_swig.multiply_cc()
+ self.help_cc((src1_data, src2_data),
+ expected_result, op)
+
+ # multiply_const_XX
+
+ def test_multiply_const_ss(self):
+ src_data = (-1, 0, 1, 2, 3)
+ expected_result = (-5, 0, 5, 10, 15)
+ op = blocks_swig.multiply_const_ss(5)
+ self.help_ss((src_data,), expected_result, op)
+
+ def test_multiply_const_ii(self):
+ src_data = (-1, 0, 1, 2, 3)
+ expected_result = (-5, 0, 5, 10, 15)
+ op = blocks_swig.multiply_const_ii(5)
+ self.help_ii((src_data,), expected_result, op)
+
+ def test_multiply_const_ff(self):
+ src_data = (-1, 0, 1, 2, 3)
+ expected_result = (-5, 0, 5, 10, 15)
+ op = blocks_swig.multiply_const_ff(5)
+ self.help_ff((src_data,), expected_result, op)
+
+ def test_multiply_const_cc(self):
+ src_data = (-1-1j, 0+0j, 1+1j, 2+2j, 3+3j)
+ expected_result = (-5-5j, 0+0j, 5+5j, 10+10j, 15+15j)
+ op = blocks_swig.multiply_const_cc(5)
+ self.help_cc((src_data,), expected_result, op)
+
+ def test_multiply_const_cc2(self):
+ src_data = (-1-1j, 0+0j, 1+1j, 2+2j, 3+3j)
+ expected_result = (-3-7j, 0+0j, 3+7j, 6+14j, 9+21j)
+ op = blocks_swig.multiply_const_cc(5+2j)
+ self.help_cc((src_data,), expected_result, op)
+
+ def test_sub_ii(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (-7, 5, -1, -4, 3)
+ op = blocks_swig.sub_ii()
+ self.help_ii((src1_data, src2_data),
+ expected_result, op)
+
+ def test_div_ff(self):
+ src1_data = ( 5, 9, -15, 1024)
+ src2_data = (10, 3, -5, 64)
+ expected_result = (0.5, 3, 3, 16)
+ op = blocks_swig.divide_ff()
+ self.help_ff((src1_data, src2_data), expected_result, op)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_add_mult_div_sub, "test_add_mult_div_sub.xml")
diff --git a/gr-blocks/python/qa_add_mult_v.py b/gr-blocks/python/qa_add_mult_v.py
new file mode 100755
index 000000000..d362cb885
--- /dev/null
+++ b/gr-blocks/python/qa_add_mult_v.py
@@ -0,0 +1,360 @@
+#!/usr/bin/env python
+#
+# Copyright 2004,2007,2010,2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+
+class test_add_mult_v(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def help_ss(self, size, src_data, exp_data, op):
+ for s in zip(range (len (src_data)), src_data):
+ src = gr.vector_source_s(s[1])
+ srcv = gr.stream_to_vector(gr.sizeof_short, size)
+ self.tb.connect(src, srcv)
+ self.tb.connect(srcv, (op, s[0]))
+ rhs = gr.vector_to_stream(gr.sizeof_short, size)
+ dst = gr.vector_sink_s()
+ self.tb.connect(op, rhs, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_ii(self, size, src_data, exp_data, op):
+ for s in zip(range (len (src_data)), src_data):
+ src = gr.vector_source_i(s[1])
+ srcv = gr.stream_to_vector(gr.sizeof_int, size)
+ self.tb.connect(src, srcv)
+ self.tb.connect(srcv, (op, s[0]))
+ rhs = gr.vector_to_stream(gr.sizeof_int, size)
+ dst = gr.vector_sink_i()
+ self.tb.connect(op, rhs, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_ff(self, size, src_data, exp_data, op):
+ for s in zip(range (len (src_data)), src_data):
+ src = gr.vector_source_f(s[1])
+ srcv = gr.stream_to_vector(gr.sizeof_float, size)
+ self.tb.connect(src, srcv)
+ self.tb.connect(srcv, (op, s[0]))
+ rhs = gr.vector_to_stream(gr.sizeof_float, size)
+ dst = gr.vector_sink_f()
+ self.tb.connect(op, rhs, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_cc(self, size, src_data, exp_data, op):
+ for s in zip(range (len (src_data)), src_data):
+ src = gr.vector_source_c(s[1])
+ srcv = gr.stream_to_vector(gr.sizeof_gr_complex, size)
+ self.tb.connect(src, srcv)
+ self.tb.connect(srcv, (op, s[0]))
+ rhs = gr.vector_to_stream(gr.sizeof_gr_complex, size)
+ dst = gr.vector_sink_c()
+ self.tb.connect(op, rhs, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_const_ss(self, src_data, exp_data, op):
+ src = gr.vector_source_s(src_data)
+ srcv = gr.stream_to_vector(gr.sizeof_short, len(src_data))
+ rhs = gr.vector_to_stream(gr.sizeof_short, len(src_data))
+ dst = gr.vector_sink_s()
+ self.tb.connect(src, srcv, op, rhs, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_const_ii(self, src_data, exp_data, op):
+ src = gr.vector_source_i(src_data)
+ srcv = gr.stream_to_vector(gr.sizeof_int, len(src_data))
+ rhs = gr.vector_to_stream(gr.sizeof_int, len(src_data))
+ dst = gr.vector_sink_i()
+ self.tb.connect(src, srcv, op, rhs, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_const_ff(self, src_data, exp_data, op):
+ src = gr.vector_source_f(src_data)
+ srcv = gr.stream_to_vector(gr.sizeof_float, len(src_data))
+ rhs = gr.vector_to_stream(gr.sizeof_float, len(src_data))
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, srcv, op, rhs, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_const_cc(self, src_data, exp_data, op):
+ src = gr.vector_source_c(src_data)
+ srcv = gr.stream_to_vector(gr.sizeof_gr_complex, len(src_data))
+ rhs = gr.vector_to_stream(gr.sizeof_gr_complex, len(src_data))
+ dst = gr.vector_sink_c()
+ self.tb.connect(src, srcv, op, rhs, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ # add_vXX
+
+ def test_add_vss_one(self):
+ src1_data = (1,)
+ src2_data = (2,)
+ src3_data = (3,)
+ expected_result = (6,)
+ op = blocks_swig.add_ss(1)
+ self.help_ss(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vss_five(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (6, 7, 8, 9, 10)
+ src3_data = (11, 12, 13, 14, 15)
+ expected_result = (18, 21, 24, 27, 30)
+ op = blocks_swig.add_ss(5)
+ self.help_ss(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vii_one(self):
+ src1_data = (1,)
+ src2_data = (2,)
+ src3_data = (3,)
+ expected_result = (6,)
+ op = blocks_swig.add_ii(1)
+ self.help_ii(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vii_five(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (6, 7, 8, 9, 10)
+ src3_data = (11, 12, 13, 14, 15)
+ expected_result = (18, 21, 24, 27, 30)
+ op = blocks_swig.add_ii(5)
+ self.help_ii(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vff_one(self):
+ src1_data = (1.0,)
+ src2_data = (2.0,)
+ src3_data = (3.0,)
+ expected_result = (6.0,)
+ op = blocks_swig.add_ff(1)
+ self.help_ff(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vff_five(self):
+ src1_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ src2_data = (6.0, 7.0, 8.0, 9.0, 10.0)
+ src3_data = (11.0, 12.0, 13.0, 14.0, 15.0)
+ expected_result = (18.0, 21.0, 24.0, 27.0, 30.0)
+ op = blocks_swig.add_ff(5)
+ self.help_ff(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vcc_one(self):
+ src1_data = (1.0+2.0j,)
+ src2_data = (3.0+4.0j,)
+ src3_data = (5.0+6.0j,)
+ expected_result = (9.0+12j,)
+ op = blocks_swig.add_cc(1)
+ self.help_cc(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vcc_five(self):
+ src1_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j)
+ src2_data = (11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j)
+ src3_data = (21.0+22.0j, 23.0+24.0j, 25.0+26.0j, 27.0+28.0j, 29.0+30.0j)
+ expected_result = (33.0+36.0j, 39.0+42.0j, 45.0+48.0j, 51.0+54.0j, 57.0+60.0j)
+ op = blocks_swig.add_cc(5)
+ self.help_cc(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ # add_const_vXX
+
+ def test_add_const_vss_one(self):
+ src_data = (1,)
+ op = blocks_swig.add_const_vss((2,))
+ exp_data = (3,)
+ self.help_const_ss(src_data, exp_data, op)
+
+ def test_add_const_vss_five(self):
+ src_data = (1, 2, 3, 4, 5)
+ op = blocks_swig.add_const_vss((6, 7, 8, 9, 10))
+ exp_data = (7, 9, 11, 13, 15)
+ self.help_const_ss(src_data, exp_data, op)
+
+ def test_add_const_vii_one(self):
+ src_data = (1,)
+ op = blocks_swig.add_const_vii((2,))
+ exp_data = (3,)
+ self.help_const_ii(src_data, exp_data, op)
+
+ def test_add_const_vii_five(self):
+ src_data = (1, 2, 3, 4, 5)
+ op = blocks_swig.add_const_vii((6, 7, 8, 9, 10))
+ exp_data = (7, 9, 11, 13, 15)
+ self.help_const_ii(src_data, exp_data, op)
+
+ def test_add_const_vff_one(self):
+ src_data = (1.0,)
+ op = blocks_swig.add_const_vff((2.0,))
+ exp_data = (3.0,)
+ self.help_const_ff(src_data, exp_data, op)
+
+ def test_add_const_vff_five(self):
+ src_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ op = blocks_swig.add_const_vff((6.0, 7.0, 8.0, 9.0, 10.0))
+ exp_data = (7.0, 9.0, 11.0, 13.0, 15.0)
+ self.help_const_ff(src_data, exp_data, op)
+
+ def test_add_const_vcc_one(self):
+ src_data = (1.0+2.0j,)
+ op = blocks_swig.add_const_vcc((2.0+3.0j,))
+ exp_data = (3.0+5.0j,)
+ self.help_const_cc(src_data, exp_data, op)
+
+ def test_add_const_vcc_five(self):
+ src_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j)
+ op = blocks_swig.add_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j))
+ exp_data = (12.0+14.0j, 16.0+18.0j, 20.0+22.0j, 24.0+26.0j, 28.0+30.0j)
+ self.help_const_cc(src_data, exp_data, op)
+
+ # multiply_vXX
+
+ def test_multiply_vss_one(self):
+ src1_data = (1,)
+ src2_data = (2,)
+ src3_data = (3,)
+ expected_result = (6,)
+ op = gr.multiply_vss(1)
+ self.help_ss(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vss_five(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (6, 7, 8, 9, 10)
+ src3_data = (11, 12, 13, 14, 15)
+ expected_result = (66, 168, 312, 504, 750)
+ op = gr.multiply_vss(5)
+ self.help_ss(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vii_one(self):
+ src1_data = (1,)
+ src2_data = (2,)
+ src3_data = (3,)
+ expected_result = (6,)
+ op = gr.multiply_vii(1)
+ self.help_ii(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vii_five(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (6, 7, 8, 9, 10)
+ src3_data = (11, 12, 13, 14, 15)
+ expected_result = (66, 168, 312, 504, 750)
+ op = gr.multiply_vii(5)
+ self.help_ii(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vff_one(self):
+ src1_data = (1.0,)
+ src2_data = (2.0,)
+ src3_data = (3.0,)
+ expected_result = (6.0,)
+ op = gr.multiply_vff(1)
+ self.help_ff(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vff_five(self):
+ src1_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ src2_data = (6.0, 7.0, 8.0, 9.0, 10.0)
+ src3_data = (11.0, 12.0, 13.0, 14.0, 15.0)
+ expected_result = (66.0, 168.0, 312.0, 504.0, 750.0)
+ op = gr.multiply_vff(5)
+ self.help_ff(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vcc_one(self):
+ src1_data = (1.0+2.0j,)
+ src2_data = (3.0+4.0j,)
+ src3_data = (5.0+6.0j,)
+ expected_result = (-85+20j,)
+ op = gr.multiply_vcc(1)
+ self.help_cc(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vcc_five(self):
+ src1_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j)
+ src2_data = (11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j)
+ src3_data = (21.0+22.0j, 23.0+24.0j, 25.0+26.0j, 27.0+28.0j, 29.0+30.0j)
+ expected_result = (-1021.0+428.0j, -2647.0+1754.0j, -4945.0+3704.0j, -8011.0+6374.0j, -11941.0+9860.0j)
+ op = gr.multiply_vcc(5)
+ self.help_cc(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ # multiply_const_vXX
+
+ def test_multiply_const_vss_one(self):
+ src_data = (2,)
+ op = gr.multiply_const_vss((3,))
+ exp_data = (6,)
+ self.help_const_ss(src_data, exp_data, op)
+
+ def test_multiply_const_vss_five(self):
+ src_data = (1, 2, 3, 4, 5)
+ op = gr.multiply_const_vss((6, 7, 8, 9, 10))
+ exp_data = (6, 14, 24, 36, 50)
+ self.help_const_ss(src_data, exp_data, op)
+
+ def test_multiply_const_vii_one(self):
+ src_data = (2,)
+ op = gr.multiply_const_vii((3,))
+ exp_data = (6,)
+ self.help_const_ii(src_data, exp_data, op)
+
+ def test_multiply_const_vii_five(self):
+ src_data = (1, 2, 3, 4, 5)
+ op = gr.multiply_const_vii((6, 7, 8, 9, 10))
+ exp_data = (6, 14, 24, 36, 50)
+ self.help_const_ii(src_data, exp_data, op)
+
+ def test_multiply_const_vff_one(self):
+ src_data = (2.0,)
+ op = gr.multiply_const_vff((3.0,))
+ exp_data = (6.0,)
+ self.help_const_ff(src_data, exp_data, op)
+
+ def test_multiply_const_vff_five(self):
+ src_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ op = gr.multiply_const_vff((6.0, 7.0, 8.0, 9.0, 10.0))
+ exp_data = (6.0, 14.0, 24.0, 36.0, 50.0)
+ self.help_const_ff(src_data, exp_data, op)
+
+ def test_multiply_const_vcc_one(self):
+ src_data = (1.0+2.0j,)
+ op = gr.multiply_const_vcc((2.0+3.0j,))
+ exp_data = (-4.0+7.0j,)
+ self.help_const_cc(src_data, exp_data, op)
+
+ def test_multiply_const_vcc_five(self):
+ src_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j)
+ op = gr.multiply_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j))
+ exp_data = (-13.0+34.0j, -17.0+94.0j, -21.0+170.0j, -25.0+262.0j, -29.0+370.0j)
+ self.help_const_cc(src_data, exp_data, op)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_add_mult_v, "test_add_mult_v.xml")
diff --git a/gr-blocks/python/qa_boolean_operators.py b/gr-blocks/python/qa_boolean_operators.py
new file mode 100755
index 000000000..5572f60ac
--- /dev/null
+++ b/gr-blocks/python/qa_boolean_operators.py
@@ -0,0 +1,195 @@
+#!/usr/bin/env python
+#
+# Copyright 2004,2007,2008,2010 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+
+class test_boolean_operators (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def help_ss (self, src_data, exp_data, op):
+ for s in zip (range (len (src_data)), src_data):
+ src = gr.vector_source_s (s[1])
+ self.tb.connect (src, (op, s[0]))
+ dst = gr.vector_sink_s ()
+ self.tb.connect (op, dst)
+ self.tb.run ()
+ result_data = dst.data ()
+ self.assertEqual (exp_data, result_data)
+
+ def help_bb (self, src_data, exp_data, op):
+ for s in zip (range (len (src_data)), src_data):
+ src = gr.vector_source_b (s[1])
+ self.tb.connect (src, (op, s[0]))
+ dst = gr.vector_sink_b ()
+ self.tb.connect (op, dst)
+ self.tb.run ()
+ result_data = dst.data ()
+ self.assertEqual (exp_data, result_data)
+
+ def help_ii (self, src_data, exp_data, op):
+ for s in zip (range (len (src_data)), src_data):
+ src = gr.vector_source_i (s[1])
+ self.tb.connect (src, (op, s[0]))
+ dst = gr.vector_sink_i ()
+ self.tb.connect (op, dst)
+ self.tb.run ()
+ result_data = dst.data ()
+ self.assertEqual (exp_data, result_data)
+
+ def test_xor_ss (self):
+ src1_data = (1, 2, 3, 0x5004, 0x1150)
+ src2_data = (8, 2, 1 , 0x0508, 0x1105)
+ expected_result = (9, 0, 2, 0x550C, 0x0055)
+ op = blocks_swig.xor_ss ()
+ self.help_ss ((src1_data, src2_data),
+ expected_result, op)
+
+ def test_xor_bb (self):
+ src1_data = (1, 2, 3, 4, 0x50)
+ src2_data = (8, 2, 1 , 8, 0x05)
+ expected_result = (9, 0, 2, 0xC, 0x55)
+ op = blocks_swig.xor_bb ()
+ self.help_bb ((src1_data, src2_data),
+ expected_result, op)
+
+
+ def test_xor_ii (self):
+ src1_data = (1, 2, 3, 0x5000004, 0x11000050)
+ src2_data = (8, 2, 1 , 0x0500008, 0x11000005)
+ expected_result = (9, 0, 2, 0x550000C, 0x00000055)
+ op = blocks_swig.xor_ii ()
+ self.help_ii ((src1_data, src2_data),
+ expected_result, op)
+
+
+ def test_and_ss (self):
+ src1_data = (1, 2, 3, 0x5004, 0x1150)
+ src2_data = (8, 2, 1 , 0x0508, 0x1105)
+ expected_result = (0, 2, 1, 0x0000, 0x1100)
+ op = blocks_swig.and_ss ()
+ self.help_ss ((src1_data, src2_data),
+ expected_result, op)
+
+ def test_and_bb (self):
+ src1_data = (1, 2, 2, 3, 0x04, 0x50)
+ src2_data = (8, 2, 2, 1, 0x08, 0x05)
+ src3_data = (8, 2, 1, 1, 0x08, 0x05)
+ expected_result = (0, 2, 0, 1, 0x00, 0x00)
+ op = blocks_swig.and_bb ()
+ self.help_bb ((src1_data, src2_data, src3_data),
+ expected_result, op)
+
+ def test_and_ii (self):
+ src1_data = (1, 2, 3, 0x50005004, 0x11001150)
+ src2_data = (8, 2, 1 , 0x05000508, 0x11001105)
+ expected_result = (0, 2, 1, 0x00000000, 0x11001100)
+ op = blocks_swig.and_ii ()
+ self.help_ii ((src1_data, src2_data),
+ expected_result, op)
+
+ def test_and_const_ss (self):
+ src_data = (1, 2, 3, 0x5004, 0x1150)
+ expected_result = (0, 2, 2, 0x5000, 0x1100)
+ src = gr.vector_source_s(src_data)
+ op = blocks_swig.and_const_ss (0x55AA)
+ dst = gr.vector_sink_s()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(dst.data(), expected_result)
+
+ def test_and_const_bb (self):
+ src_data = (1, 2, 3, 0x50, 0x11)
+ expected_result = (0, 2, 2, 0x00, 0x00)
+ src = gr.vector_source_b(src_data)
+ op = blocks_swig.and_const_bb (0xAA)
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(dst.data(), expected_result)
+
+
+ def test_and_const_ii (self):
+ src_data = (1, 2, 3, 0x5004, 0x1150)
+ expected_result = (0, 2, 2, 0x5000, 0x1100)
+ src = gr.vector_source_i(src_data)
+ op = blocks_swig.and_const_ii (0x55AA)
+ dst = gr.vector_sink_i()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(dst.data(), expected_result)
+
+
+ def test_or_ss (self):
+ src1_data = (1, 2, 3, 0x5004, 0x1150)
+ src2_data = (8, 2, 1 , 0x0508, 0x1105)
+ expected_result = (9, 2, 3, 0x550C, 0x1155)
+ op = blocks_swig.or_ss ()
+ self.help_ss ((src1_data, src2_data),
+ expected_result, op)
+
+ def test_or_bb (self):
+ src1_data = (1, 2, 2, 3, 0x04, 0x50)
+ src2_data = (8, 2, 2, 1 , 0x08, 0x05)
+ src3_data = (8, 2, 1, 1 , 0x08, 0x05)
+ expected_result = (9, 2, 3, 3, 0x0C, 0x55)
+ op = blocks_swig.or_bb ()
+ self.help_bb ((src1_data, src2_data, src3_data),
+ expected_result, op)
+
+ def test_or_ii (self):
+ src1_data = (1, 2, 3, 0x50005004, 0x11001150)
+ src2_data = (8, 2, 1 , 0x05000508, 0x11001105)
+ expected_result = (9, 2, 3, 0x5500550C, 0x11001155)
+ op = blocks_swig.or_ii ()
+ self.help_ii ((src1_data, src2_data),
+ expected_result, op)
+
+ def test_not_ss (self):
+ src1_data = (1, 2, 3, 0x5004, 0x1150)
+ expected_result = (~1, ~2, ~3, ~0x5004, ~0x1150)
+ op = blocks_swig.not_ss ()
+ self.help_ss ((((src1_data),)),
+ expected_result, op)
+
+ def test_not_bb (self):
+ src1_data = (1, 2, 2, 3, 0x04, 0x50)
+ expected_result = (0xFE, 0xFD, 0xFD, 0xFC, 0xFB, 0xAF)
+ op = blocks_swig.not_bb ()
+ self.help_bb (((src1_data), ),
+ expected_result, op)
+
+ def test_not_ii (self):
+ src1_data = (1, 2, 3, 0x50005004, 0x11001150)
+ expected_result = (~1 , ~2, ~3, ~0x50005004, ~0x11001150)
+ op = blocks_swig.not_ii ()
+ self.help_ii (((src1_data),),
+ expected_result, op)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_boolean_operators, "test_boolean_operators.xml")
diff --git a/gr-blocks/python/qa_conjugate.py b/gr-blocks/python/qa_conjugate.py
new file mode 100644
index 000000000..1808aa9c0
--- /dev/null
+++ b/gr-blocks/python/qa_conjugate.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+
+class test_conjugate (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_000 (self):
+ src_data = (-2-2j, -1-1j, -2+2j, -1+1j,
+ 2-2j, 1-1j, 2+2j, 1+1j,
+ 0+0j)
+
+ exp_data = (-2+2j, -1+1j, -2-2j, -1-1j,
+ 2+2j, 1+1j, 2-2j, 1-1j,
+ 0-0j)
+
+ src = gr.vector_source_c(src_data)
+ op = blocks_swig.conjugate_cc ()
+ dst = gr.vector_sink_c ()
+
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data ()
+ self.assertEqual (exp_data, result_data)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_conjugate, "test_conjugate.xml")
diff --git a/gr-blocks/python/qa_delay.py b/gr-blocks/python/qa_delay.py
new file mode 100755
index 000000000..031cadb2d
--- /dev/null
+++ b/gr-blocks/python/qa_delay.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+#
+# Copyright 2004,2007,2010,2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+
+class test_delay(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_000(self):
+ delta_t = 0
+ tb = self.tb
+ src_data = [float(x) for x in range(0, 100)]
+ expected_result = tuple(delta_t*[0.0] + src_data)
+
+ src = gr.vector_source_f(src_data)
+ op = blocks.delay(gr.sizeof_float, delta_t)
+ dst = gr.vector_sink_f()
+
+ tb.connect(src, op, dst)
+ tb.run()
+ dst_data = dst.data()
+ self.assertEqual(expected_result, dst_data)
+
+ def test_010(self):
+ delta_t = 10
+ tb = self.tb
+ src_data = [float(x) for x in range(0, 100)]
+ expected_result = tuple(delta_t*[0.0] + src_data)
+
+ src = gr.vector_source_f(src_data)
+ op = blocks.delay(gr.sizeof_float, delta_t)
+ dst = gr.vector_sink_f()
+
+ tb.connect(src, op, dst)
+ tb.run()
+ dst_data = dst.data()
+ self.assertEqual(expected_result, dst_data)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_delay, "test_delay.xml")
diff --git a/gr-blocks/python/qa_file_metadata.py b/gr-blocks/python/qa_file_metadata.py
new file mode 100644
index 000000000..c7826b1d3
--- /dev/null
+++ b/gr-blocks/python/qa_file_metadata.py
@@ -0,0 +1,202 @@
+#!/usr/bin/env python
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import parse_file_metadata
+import blocks_swig as blocks
+import pmt
+import os, math
+
+def sig_source_c(samp_rate, freq, amp, N):
+ t = map(lambda x: float(x)/samp_rate, xrange(N))
+ y = map(lambda x: amp*math.cos(2.*math.pi*freq*x) + \
+ 1j*amp*math.sin(2.*math.pi*freq*x), t)
+ return y
+
+class test_file_metadata(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_001(self):
+ N = 1000
+ outfile = "test_out.dat"
+
+ detached = False
+ samp_rate = 200000
+ key = pmt.pmt_intern("samp_rate")
+ val = pmt.pmt_from_double(samp_rate)
+ extras = pmt.pmt_make_dict()
+ extras = pmt.pmt_dict_add(extras, key, val)
+ extras_str = pmt.pmt_serialize_str(extras)
+
+ data = sig_source_c(samp_rate, 1000, 1, N)
+ src = gr.vector_source_c(data)
+ fsnk = blocks.file_meta_sink(gr.sizeof_gr_complex, outfile,
+ samp_rate, 1,
+ blocks.GR_FILE_FLOAT, True,
+ 1000000, extras_str, detached)
+ fsnk.set_unbuffered(True)
+
+ self.tb.connect(src, fsnk)
+ self.tb.run()
+ fsnk.close()
+
+ handle = open(outfile, "rb")
+ header_str = handle.read(parse_file_metadata.HEADER_LENGTH)
+ if(len(header_str) == 0):
+ self.assertFalse()
+
+ try:
+ header = pmt.pmt_deserialize_str(header_str)
+ except RuntimeError:
+ self.assertFalse()
+
+ print header
+ info = parse_file_metadata.parse_header(header, False)
+
+ extra_str = handle.read(info["extra_len"])
+ self.assertGreater(len(extra_str), 0)
+ handle.close()
+
+ try:
+ extra = pmt.pmt_deserialize_str(extra_str)
+ except RuntimeError:
+ self.assertFalse()
+
+ extra_info = parse_file_metadata.parse_extra_dict(extra, info, False)
+
+ self.assertEqual(info['rx_rate'], samp_rate)
+ self.assertEqual(pmt.pmt_to_double(extra_info['samp_rate']), samp_rate)
+
+
+ # Test file metadata source
+ src.rewind()
+ fsrc = blocks.file_meta_source(outfile, False)
+ vsnk = gr.vector_sink_c()
+ tsnk = gr.tag_debug(gr.sizeof_gr_complex, "QA")
+ ssnk = gr.vector_sink_c()
+ self.tb.disconnect(src, fsnk)
+ self.tb.connect(fsrc, vsnk)
+ self.tb.connect(fsrc, tsnk)
+ self.tb.connect(src, ssnk)
+ self.tb.run()
+
+ # Test to make sure tags with 'samp_rate' and 'rx_rate' keys
+ # were generated and received correctly.
+ tags = tsnk.current_tags()
+ for t in tags:
+ if(pmt.pmt_eq(t.key, pmt.pmt_intern("samp_rate"))):
+ self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
+ elif(pmt.pmt_eq(t.key, pmt.pmt_intern("rx_rate"))):
+ self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
+
+ # Test that the data portion was extracted and received correctly.
+ self.assertComplexTuplesAlmostEqual(vsnk.data(), ssnk.data(), 5)
+
+ os.remove(outfile)
+
+ def test_002(self):
+ N = 1000
+ outfile = "test_out.dat"
+ outfile_hdr = "test_out.dat.hdr"
+
+ detached = True
+ samp_rate = 200000
+ key = pmt.pmt_intern("samp_rate")
+ val = pmt.pmt_from_double(samp_rate)
+ extras = pmt.pmt_make_dict()
+ extras = pmt.pmt_dict_add(extras, key, val)
+ extras_str = pmt.pmt_serialize_str(extras)
+
+ data = sig_source_c(samp_rate, 1000, 1, N)
+ src = gr.vector_source_c(data)
+ fsnk = blocks.file_meta_sink(gr.sizeof_gr_complex, outfile,
+ samp_rate, 1,
+ blocks.GR_FILE_FLOAT, True,
+ 1000000, extras_str, detached)
+ fsnk.set_unbuffered(True)
+
+ self.tb.connect(src, fsnk)
+ self.tb.run()
+ fsnk.close()
+
+ # Open detached header for reading
+ handle = open(outfile_hdr, "rb")
+ header_str = handle.read(parse_file_metadata.HEADER_LENGTH)
+ if(len(header_str) == 0):
+ self.assertFalse()
+
+ try:
+ header = pmt.pmt_deserialize_str(header_str)
+ except RuntimeError:
+ self.assertFalse()
+
+ info = parse_file_metadata.parse_header(header, False)
+
+ extra_str = handle.read(info["extra_len"])
+ self.assertGreater(len(extra_str), 0)
+ handle.close()
+
+ try:
+ extra = pmt.pmt_deserialize_str(extra_str)
+ except RuntimeError:
+ self.assertFalse()
+
+ extra_info = parse_file_metadata.parse_extra_dict(extra, info, False)
+
+ self.assertEqual(info['rx_rate'], samp_rate)
+ self.assertEqual(pmt.pmt_to_double(extra_info['samp_rate']), samp_rate)
+
+
+ # Test file metadata source
+ src.rewind()
+ fsrc = blocks.file_meta_source(outfile, False, detached, outfile_hdr)
+ vsnk = gr.vector_sink_c()
+ tsnk = gr.tag_debug(gr.sizeof_gr_complex, "QA")
+ ssnk = gr.vector_sink_c()
+ self.tb.disconnect(src, fsnk)
+ self.tb.connect(fsrc, vsnk)
+ self.tb.connect(fsrc, tsnk)
+ self.tb.connect(src, ssnk)
+ self.tb.run()
+
+ # Test to make sure tags with 'samp_rate' and 'rx_rate' keys
+ # were generated and received correctly.
+ tags = tsnk.current_tags()
+ for t in tags:
+ if(pmt.pmt_eq(t.key, pmt.pmt_intern("samp_rate"))):
+ self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
+ elif(pmt.pmt_eq(t.key, pmt.pmt_intern("rx_rate"))):
+ self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
+
+ # Test that the data portion was extracted and received correctly.
+ self.assertComplexTuplesAlmostEqual(vsnk.data(), ssnk.data(), 5)
+
+ os.remove(outfile)
+ os.remove(outfile_hdr)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_file_metadata, "test_file_metadata.xml")
diff --git a/gr-blocks/python/qa_integrate.py b/gr-blocks/python/qa_integrate.py
new file mode 100755
index 000000000..c404f1b30
--- /dev/null
+++ b/gr-blocks/python/qa_integrate.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+#
+# Copyright 2008,2010 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+import math
+
+class test_integrate (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_000_ss(self):
+ src_data = (1, 2, 3, 4, 5, 6)
+ dst_data = (6, 15)
+ src = gr.vector_source_s(src_data)
+ itg = blocks_swig.integrate_ss(3)
+ dst = gr.vector_sink_s()
+ self.tb.connect(src, itg, dst)
+ self.tb.run()
+ self.assertEqual(dst_data, dst.data())
+
+ def test_001_ii(self):
+ src_data = (1, 2, 3, 4, 5, 6)
+ dst_data = (6, 15)
+ src = gr.vector_source_i(src_data)
+ itg = blocks_swig.integrate_ii(3)
+ dst = gr.vector_sink_i()
+ self.tb.connect(src, itg, dst)
+ self.tb.run()
+ self.assertEqual(dst_data, dst.data())
+
+ def test_002_ff(self):
+ src_data = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
+ dst_data = [6.0, 15.0]
+ src = gr.vector_source_f(src_data)
+ itg = blocks_swig.integrate_ff(3)
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, itg, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(dst_data, dst.data(), 6)
+
+ def test_003_cc(self):
+ src_data = [1.0+1.0j, 2.0+2.0j, 3.0+3.0j, 4.0+4.0j, 5.0+5.0j, 6.0+6.0j]
+ dst_data = [6.0+6.0j, 15.0+15.0j]
+ src = gr.vector_source_c(src_data)
+ itg = blocks_swig.integrate_cc(3)
+ dst = gr.vector_sink_c()
+ self.tb.connect(src, itg, dst)
+ self.tb.run()
+ self.assertComplexTuplesAlmostEqual(dst_data, dst.data(), 6)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_integrate, "test_integrate.xml")
diff --git a/gr-blocks/python/qa_interleave.py b/gr-blocks/python/qa_interleave.py
new file mode 100755
index 000000000..376d487b1
--- /dev/null
+++ b/gr-blocks/python/qa_interleave.py
@@ -0,0 +1,82 @@
+#!/usr/bin/env python
+#
+# Copyright 2004,2007,2010,2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+import math
+
+class test_interleave (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_int_001 (self):
+ lenx = 64
+ src0 = gr.vector_source_f (range (0, lenx, 4))
+ src1 = gr.vector_source_f (range (1, lenx, 4))
+ src2 = gr.vector_source_f (range (2, lenx, 4))
+ src3 = gr.vector_source_f (range (3, lenx, 4))
+ op = blocks_swig.interleave (gr.sizeof_float)
+ dst = gr.vector_sink_f ()
+
+ self.tb.connect (src0, (op, 0))
+ self.tb.connect (src1, (op, 1))
+ self.tb.connect (src2, (op, 2))
+ self.tb.connect (src3, (op, 3))
+ self.tb.connect (op, dst)
+ self.tb.run ()
+ expected_result = tuple (range (lenx))
+ result_data = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+
+ def test_deint_001 (self):
+ lenx = 64
+ src = gr.vector_source_f (range (lenx))
+ op = blocks_swig.deinterleave (gr.sizeof_float)
+ dst0 = gr.vector_sink_f ()
+ dst1 = gr.vector_sink_f ()
+ dst2 = gr.vector_sink_f ()
+ dst3 = gr.vector_sink_f ()
+
+ self.tb.connect (src, op)
+ self.tb.connect ((op, 0), dst0)
+ self.tb.connect ((op, 1), dst1)
+ self.tb.connect ((op, 2), dst2)
+ self.tb.connect ((op, 3), dst3)
+ self.tb.run ()
+
+ expected_result0 = tuple (range (0, lenx, 4))
+ expected_result1 = tuple (range (1, lenx, 4))
+ expected_result2 = tuple (range (2, lenx, 4))
+ expected_result3 = tuple (range (3, lenx, 4))
+
+ self.assertFloatTuplesAlmostEqual (expected_result0, dst0.data ())
+ self.assertFloatTuplesAlmostEqual (expected_result1, dst1.data ())
+ self.assertFloatTuplesAlmostEqual (expected_result2, dst2.data ())
+ self.assertFloatTuplesAlmostEqual (expected_result3, dst3.data ())
+
+if __name__ == '__main__':
+ gr_unittest.run(test_interleave, "test_interleave.xml")
+
diff --git a/gr-blocks/python/qa_keep_m_in_n.py b/gr-blocks/python/qa_keep_m_in_n.py
new file mode 100755
index 000000000..0898217ba
--- /dev/null
+++ b/gr-blocks/python/qa_keep_m_in_n.py
@@ -0,0 +1,59 @@
+#!/usr/bin/env python
+#
+# Copyright 2008,2010,2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+import sys
+import random
+
+class test_keep_m_in_n(gr_unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_001(self):
+ self.maxDiff = None;
+ tb = gr.top_block()
+ src = gr.vector_source_b( range(0,100) )
+
+ # itemsize, M, N, offset
+ km2 = blocks_swig.keep_m_in_n( 1, 1, 2, 0 );
+ km3 = blocks_swig.keep_m_in_n( 1, 1, 3, 1 );
+ km7 = blocks_swig.keep_m_in_n( 1, 1, 7, 2 );
+ snk2 = gr.vector_sink_b();
+ snk3 = gr.vector_sink_b();
+ snk7 = gr.vector_sink_b();
+ tb.connect(src,km2,snk2);
+ tb.connect(src,km3,snk3);
+ tb.connect(src,km7,snk7);
+ tb.run();
+
+ self.assertEqual(range(0,100,2), list(snk2.data()));
+ self.assertEqual(range(1,100,3), list(snk3.data()));
+ self.assertEqual(range(2,100,7), list(snk7.data()));
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_keep_m_in_n, "test_keep_m_in_n.xml")
+
diff --git a/gr-blocks/python/qa_keep_one_in_n.py b/gr-blocks/python/qa_keep_one_in_n.py
new file mode 100755
index 000000000..8c5f44b84
--- /dev/null
+++ b/gr-blocks/python/qa_keep_one_in_n.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+
+class test_keep_one_in_n(gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001(self):
+ src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ expected_data = (5, 10)
+ src = gr.vector_source_b(src_data);
+ op = blocks_swig.keep_one_in_n(gr.sizeof_char, 5)
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(dst.data(), expected_data)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_keep_one_in_n, "test_integrate.xml")
diff --git a/gr-blocks/python/qa_multiply_conjugate.py b/gr-blocks/python/qa_multiply_conjugate.py
new file mode 100644
index 000000000..f51563f85
--- /dev/null
+++ b/gr-blocks/python/qa_multiply_conjugate.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+
+class test_multiply_conjugate (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_000 (self):
+ src_data0 = (-2-2j, -1-1j, -2+2j, -1+1j,
+ 2-2j, 1-1j, 2+2j, 1+1j,
+ 0+0j)
+ src_data1 = (-3-3j, -4-4j, -3+3j, -4+4j,
+ 3-3j, 4-4j, 3+3j, 4+4j,
+ 0+0j)
+
+ exp_data = (12+0j, 8+0j, 12+0j, 8+0j,
+ 12+0j, 8+0j, 12+0j, 8+0j,
+ 0+0j)
+ src0 = gr.vector_source_c(src_data0)
+ src1 = gr.vector_source_c(src_data1)
+ op = blocks_swig.multiply_conjugate_cc ()
+ dst = gr.vector_sink_c ()
+
+ self.tb.connect(src0, (op,0))
+ self.tb.connect(src1, (op,1))
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data ()
+ self.assertEqual (exp_data, result_data)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_multiply_conjugate, "test_multiply_conjugate.xml")
diff --git a/gr-blocks/python/qa_nlog10.py b/gr-blocks/python/qa_nlog10.py
new file mode 100755
index 000000000..cc2a3e8cc
--- /dev/null
+++ b/gr-blocks/python/qa_nlog10.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python
+#
+# Copyright 2005,2007,2010,2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+
+class test_nlog10(gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001(self):
+ src_data = (-10, 0, 10, 100, 1000, 10000, 100000)
+ expected_result = (-180, -180, 10, 20, 30, 40, 50)
+ src = gr.vector_source_f(src_data)
+ op = blocks_swig.nlog10_ff(10)
+ dst = gr.vector_sink_f()
+ self.tb.connect (src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_nlog10, "test_nlog10.xml")
+
diff --git a/gr-blocks/python/qa_packed_to_unpacked.py b/gr-blocks/python/qa_packed_to_unpacked.py
new file mode 100755
index 000000000..d84f5dbd3
--- /dev/null
+++ b/gr-blocks/python/qa_packed_to_unpacked.py
@@ -0,0 +1,344 @@
+#!/usr/bin/env python
+#
+# Copyright 2005,2007,2010,2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+import random
+
+class test_packing(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block ()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_001(self):
+ src_data = (0x80,)
+ expected_results = (1,0,0,0,0,0,0,0)
+ src = gr.vector_source_b(src_data, False)
+ op = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)
+ dst = gr.vector_sink_b()
+
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_002(self):
+ src_data = (0x80,)
+ expected_results = (0,0,0,0,0,0,0,1)
+ src = gr.vector_source_b(src_data, False)
+ op = blocks.packed_to_unpacked_bb(1, gr.GR_LSB_FIRST)
+ dst = gr.vector_sink_b()
+
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_003(self):
+ src_data = (0x11,)
+ expected_results = (4, 2)
+ src = gr.vector_source_b(src_data, False)
+ op = blocks.packed_to_unpacked_bb(3, gr.GR_LSB_FIRST)
+ dst = gr.vector_sink_b()
+
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_004(self):
+ src_data = (0x11,)
+ expected_results = (0, 4)
+ src = gr.vector_source_b(src_data, False)
+ op = blocks.packed_to_unpacked_bb(3, gr.GR_MSB_FIRST)
+ dst = gr.vector_sink_b()
+
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_005(self):
+ src_data = (1,0,0,0,0,0,1,0,0,1,0,1,1,0,1,0)
+ expected_results = (0x82, 0x5a)
+ src = gr.vector_source_b(src_data, False)
+ op = blocks.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST)
+ dst = gr.vector_sink_b()
+
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_006(self):
+ src_data = (0,1,0,0,0,0,0,1,0,1,0,1,1,0,1,0)
+ expected_results = (0x82, 0x5a)
+ src = gr.vector_source_b(src_data, False)
+ op = blocks.unpacked_to_packed_bb(1, gr.GR_LSB_FIRST)
+ dst = gr.vector_sink_b()
+
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_007(self):
+ src_data = (4, 2, 0,0,0)
+ expected_results = (0x11,)
+ src = gr.vector_source_b(src_data, False)
+ op = blocks.unpacked_to_packed_bb(3, gr.GR_LSB_FIRST)
+ dst = gr.vector_sink_b()
+
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_008(self):
+ src_data = (0, 4, 2,0,0)
+ expected_results = (0x11,)
+ src = gr.vector_source_b(src_data,False)
+ op = blocks.unpacked_to_packed_bb(3, gr.GR_MSB_FIRST)
+ dst = gr.vector_sink_b()
+
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_009(self):
+ random.seed(0)
+ src_data = []
+ for i in xrange(202):
+ src_data.append((random.randint(0,255)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+
+ src = gr.vector_source_b(tuple(src_data), False)
+ op1 = blocks.packed_to_unpacked_bb(3, gr.GR_MSB_FIRST)
+ op2 = blocks.unpacked_to_packed_bb(3, gr.GR_MSB_FIRST)
+ dst = gr.vector_sink_b()
+
+ self.tb.connect(src, op1, op2)
+ self.tb.connect(op2, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results[0:201], dst.data())
+
+ def test_010(self):
+ random.seed(0)
+ src_data = []
+ for i in xrange(56):
+ src_data.append((random.randint(0,255)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_b(tuple(src_data), False)
+ op1 = blocks.packed_to_unpacked_bb(7, gr.GR_MSB_FIRST)
+ op2 = blocks.unpacked_to_packed_bb(7, gr.GR_MSB_FIRST)
+ dst = gr.vector_sink_b()
+
+ self.tb.connect(src, op1, op2)
+ self.tb.connect(op2, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results[0:201], dst.data())
+
+ def test_011(self):
+ random.seed(0)
+ src_data = []
+ for i in xrange(56):
+ src_data.append((random.randint(0,255)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_b(tuple(src_data),False)
+ op1 = blocks.packed_to_unpacked_bb(7, gr.GR_LSB_FIRST)
+ op2 = blocks.unpacked_to_packed_bb(7, gr.GR_LSB_FIRST)
+ dst = gr.vector_sink_b()
+
+ self.tb.connect(src, op1, op2)
+ self.tb.connect(op2, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results[0:201], dst.data())
+
+
+ # tests on shorts
+ def test_100a(self):
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**15,2**15-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_s(tuple(src_data), False)
+ op1 = blocks.packed_to_unpacked_ss(1, gr.GR_MSB_FIRST)
+ op2 = blocks.unpacked_to_packed_ss(1, gr.GR_MSB_FIRST)
+ dst = gr.vector_sink_s()
+
+ self.tb.connect(src, op1, op2)
+ self.tb.connect(op2, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_100b(self):
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**15,2**15-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_s(tuple(src_data), False)
+ op1 = blocks.packed_to_unpacked_ss(1, gr.GR_LSB_FIRST)
+ op2 = blocks.unpacked_to_packed_ss(1, gr.GR_LSB_FIRST)
+ dst = gr.vector_sink_s()
+
+ self.tb.connect(src, op1, op2)
+ self.tb.connect(op2, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_101a(self):
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**15,2**15-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_s(tuple(src_data), False)
+ op1 = blocks.packed_to_unpacked_ss(8, gr.GR_MSB_FIRST)
+ op2 = blocks.unpacked_to_packed_ss(8, gr.GR_MSB_FIRST)
+ dst = gr.vector_sink_s()
+
+ self.tb.connect(src, op1, op2)
+ self.tb.connect(op2, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_101b(self):
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**15,2**15-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_s(tuple(src_data), False)
+ op1 = blocks.packed_to_unpacked_ss(8, gr.GR_LSB_FIRST)
+ op2 = blocks.unpacked_to_packed_ss(8, gr.GR_LSB_FIRST)
+ dst = gr.vector_sink_s()
+
+ self.tb.connect(src, op1, op2)
+ self.tb.connect(op2, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+
+ # tests on ints
+ def test_200a(self):
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**31,2**31-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_i(tuple(src_data), False)
+ op1 = blocks.packed_to_unpacked_ii(1, gr.GR_MSB_FIRST)
+ op2 = blocks.unpacked_to_packed_ii(1, gr.GR_MSB_FIRST)
+ dst = gr.vector_sink_i()
+
+ self.tb.connect(src, op1, op2)
+ self.tb.connect(op2, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_200b(self):
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**31,2**31-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_i(tuple(src_data), False)
+ op1 = blocks.packed_to_unpacked_ii(1, gr.GR_LSB_FIRST)
+ op2 = blocks.unpacked_to_packed_ii(1, gr.GR_LSB_FIRST)
+ dst = gr.vector_sink_i()
+
+ self.tb.connect(src, op1, op2)
+ self.tb.connect(op2, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_201a(self):
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**31,2**31-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_i(tuple(src_data), False)
+ op1 = blocks.packed_to_unpacked_ii(8, gr.GR_MSB_FIRST)
+ op2 = blocks.unpacked_to_packed_ii(8, gr.GR_MSB_FIRST)
+ dst = gr.vector_sink_i()
+
+ self.tb.connect(src, op1, op2)
+ self.tb.connect(op2, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_201b(self):
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**31,2**31-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_i(tuple(src_data), False)
+ op1 = blocks.packed_to_unpacked_ii(8, gr.GR_LSB_FIRST)
+ op2 = blocks.unpacked_to_packed_ii(8, gr.GR_LSB_FIRST)
+ dst = gr.vector_sink_i()
+
+ self.tb.connect(src, op1, op2)
+ self.tb.connect(op2, dst)
+ self.tb.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+if __name__ == '__main__':
+ gr_unittest.run(test_packing, "test_packing.xml")
+
diff --git a/gr-blocks/python/qa_patterned_interleaver.py b/gr-blocks/python/qa_patterned_interleaver.py
new file mode 100755
index 000000000..3cf29c917
--- /dev/null
+++ b/gr-blocks/python/qa_patterned_interleaver.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+#
+# Copyright 2008,2010 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+try:
+ import blocks_swig as blocks
+except:
+ from gnuradio import blocks
+import math
+
+class test_patterned_interleaver (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_000(self):
+ dst_data = [0,0,1,2,0,2,1,0];
+ src0 = gr.vector_source_f(200*[0])
+ src1 = gr.vector_source_f(200*[1])
+ src2 = gr.vector_source_f(200*[2])
+ itg = blocks.patterned_interleaver(gr.sizeof_float, dst_data)
+ dst = gr.vector_sink_f()
+ head = gr.head(gr.sizeof_float, 8);
+
+ self.tb.connect( src0, (itg,0) );
+ self.tb.connect( src1, (itg,1) );
+ self.tb.connect( src2, (itg,2) );
+ self.tb.connect( itg, head, dst );
+
+ self.tb.run()
+ self.assertEqual(list(dst_data), list(dst.data()))
+
+if __name__ == '__main__':
+ gr_unittest.run(test_patterned_interleaver, "test_patterned_interleaver.xml")
diff --git a/gr-blocks/python/qa_peak_detector2.py b/gr-blocks/python/qa_peak_detector2.py
new file mode 100644
index 000000000..4b864e4d7
--- /dev/null
+++ b/gr-blocks/python/qa_peak_detector2.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+#
+# Copyright 2007,2010,2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+
+class test_peak_detector2(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_regen1(self):
+ tb = self.tb
+
+ data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
+ 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
+
+ expected_result = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+
+
+ src = gr.vector_source_f(data, False)
+ regen = blocks.peak_detector2_fb()
+ dst = gr.vector_sink_b()
+
+ tb.connect(src, regen)
+ tb.connect(regen, dst)
+ tb.run()
+
+ dst_data = dst.data()
+ print dst_data
+
+ self.assertEqual(expected_result, dst_data)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_peak_detector2, "test_peak_detector2.xml")
diff --git a/gr-blocks/python/qa_pipe_fittings.py b/gr-blocks/python/qa_pipe_fittings.py
new file mode 100755
index 000000000..321660d5e
--- /dev/null
+++ b/gr-blocks/python/qa_pipe_fittings.py
@@ -0,0 +1,138 @@
+#!/usr/bin/env python
+#
+# Copyright 2005,2007,2010,2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+
+def calc_expected_result(src_data, n):
+ assert (len(src_data) % n) == 0
+ result = [list() for x in range(n)]
+ #print "len(result) =", len(result)
+ for i in xrange(len(src_data)):
+ (result[i % n]).append(src_data[i])
+ return [tuple(x) for x in result]
+
+
+class test_pipe_fittings(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block ()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_001(self):
+ """
+ Test stream_to_streams.
+ """
+ n = 8
+ src_len = n * 8
+ src_data = range(src_len)
+
+ expected_results = calc_expected_result(src_data, n)
+ #print "expected results: ", expected_results
+ src = gr.vector_source_i(src_data)
+ op = gr.stream_to_streams(gr.sizeof_int, n)
+ self.tb.connect(src, op)
+
+ dsts = []
+ for i in range(n):
+ dst = gr.vector_sink_i()
+ self.tb.connect((op, i), (dst, 0))
+ dsts.append(dst)
+
+ self.tb.run()
+
+ for d in range(n):
+ self.assertEqual(expected_results[d], dsts[d].data())
+
+ def test_002(self):
+
+ # Test streams_to_stream (using stream_to_streams).
+
+ n = 8
+ src_len = n * 8
+ src_data = tuple(range(src_len))
+ expected_results = src_data
+
+ src = gr.vector_source_i(src_data)
+ op1 = gr.stream_to_streams(gr.sizeof_int, n)
+ op2 = gr.streams_to_stream(gr.sizeof_int, n)
+ dst = gr.vector_sink_i()
+
+ self.tb.connect(src, op1)
+ for i in range(n):
+ self.tb.connect((op1, i), (op2, i))
+ self.tb.connect(op2, dst)
+
+ self.tb.run()
+ self.assertEqual(expected_results, dst.data())
+
+ def test_003(self):
+
+ #Test streams_to_vector (using stream_to_streams & vector_to_stream).
+
+ n = 8
+ src_len = n * 8
+ src_data = tuple(range(src_len))
+ expected_results = src_data
+
+ src = gr.vector_source_i(src_data)
+ op1 = gr.stream_to_streams(gr.sizeof_int, n)
+ op2 = gr.streams_to_vector(gr.sizeof_int, n)
+ op3 = gr.vector_to_stream(gr.sizeof_int, n)
+ dst = gr.vector_sink_i()
+
+ self.tb.connect(src, op1)
+ for i in range(n):
+ self.tb.connect((op1, i), (op2, i))
+ self.tb.connect(op2, op3, dst)
+
+ self.tb.run()
+ self.assertEqual(expected_results, dst.data())
+
+ def test_004(self):
+
+ #Test vector_to_streams.
+
+ n = 8
+ src_len = n * 8
+ src_data = tuple(range(src_len))
+ expected_results = src_data
+
+ src = gr.vector_source_i(src_data)
+ op1 = gr.stream_to_vector(gr.sizeof_int, n)
+ op2 = gr.vector_to_streams(gr.sizeof_int, n)
+ op3 = gr.streams_to_stream(gr.sizeof_int, n)
+ dst = gr.vector_sink_i()
+
+ self.tb.connect(src, op1, op2)
+ for i in range(n):
+ self.tb.connect((op2, i), (op3, i))
+ self.tb.connect(op3, dst)
+
+ self.tb.run()
+ self.assertEqual(expected_results, dst.data())
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_pipe_fittings, "test_pipe_fittings.xml")
diff --git a/gr-blocks/python/qa_regenerate.py b/gr-blocks/python/qa_regenerate.py
new file mode 100755
index 000000000..a57eeba2b
--- /dev/null
+++ b/gr-blocks/python/qa_regenerate.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+#
+# Copyright 2007,2010,2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+
+class test_regenerate(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_regen1(self):
+ tb = self.tb
+
+ data = [0, 0, 0,
+ 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
+
+ expected_result = (0, 0, 0,
+ 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0,
+ 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+
+
+ src = gr.vector_source_b(data, False)
+ regen = blocks.regenerate_bb(5, 2)
+ dst = gr.vector_sink_b()
+
+ tb.connect(src, regen)
+ tb.connect(regen, dst)
+ tb.run()
+
+ dst_data = dst.data()
+
+ self.assertEqual(expected_result, dst_data)
+
+ def test_regen2(self):
+ tb = self.tb
+
+ data = 200*[0,]
+ data[9] = 1
+ data[99] = 1
+
+ expected_result = 200*[0,]
+ expected_result[9] = 1
+ expected_result[19] = 1
+ expected_result[29] = 1
+ expected_result[39] = 1
+
+ expected_result[99] = 1
+ expected_result[109] = 1
+ expected_result[119] = 1
+ expected_result[129] = 1
+
+ src = gr.vector_source_b(data, False)
+ regen = blocks.regenerate_bb(10, 3)
+ dst = gr.vector_sink_b()
+
+ tb.connect(src, regen)
+ tb.connect(regen, dst)
+ tb.run ()
+
+ dst_data = dst.data()
+
+ self.assertEqual(tuple(expected_result), dst_data)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_regenerate, "test_regenerate.xml")
diff --git a/gr-blocks/python/qa_repeat.py b/gr-blocks/python/qa_repeat.py
new file mode 100755
index 000000000..69fb3ef72
--- /dev/null
+++ b/gr-blocks/python/qa_repeat.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python
+#
+# Copyright 2008,2010,2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+import math
+
+class test_repeat (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001_float(self):
+ src_data = [n*1.0 for n in range(100)];
+ dst_data = []
+ for n in range(100):
+ dst_data += [1.0*n, 1.0*n, 1.0*n]
+
+ src = gr.vector_source_f(src_data)
+ rpt = blocks_swig.repeat(gr.sizeof_float, 3)
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, rpt, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(dst_data, dst.data(), 6)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_repeat, "test_repeat.xml")
diff --git a/gr-blocks/python/qa_rms.py b/gr-blocks/python/qa_rms.py
new file mode 100644
index 000000000..f3386668a
--- /dev/null
+++ b/gr-blocks/python/qa_rms.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+#
+# Copyright 2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+import math
+
+def sig_source_f(samp_rate, freq, amp, N):
+ t = map(lambda x: float(x)/samp_rate, xrange(N))
+ y = map(lambda x: amp*math.cos(2.*math.pi*freq*x), t)
+ return y
+
+def sig_source_c(samp_rate, freq, amp, N):
+ t = map(lambda x: float(x)/samp_rate, xrange(N))
+ y = map(lambda x: amp*math.cos(2.*math.pi*freq*x) + \
+ 1j*amp*math.sin(2.*math.pi*freq*x), t)
+ return y
+
+class test_rms(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_ff(self):
+ amp = 2
+ src_data = sig_source_f(1, 0.01, amp, 200)
+ N = 750000
+
+ expected_data = amp/math.sqrt(2.0)
+
+ src = gr.vector_source_f(src_data, True)
+ head = gr.head(gr.sizeof_float, N)
+ op = blocks.rms_ff(0.0001)
+ dst = gr.vector_sink_f()
+
+ self.tb.connect(src, head, op, dst)
+ self.tb.run()
+
+ dst_data = dst.data()
+ self.assertAlmostEqual(dst_data[-1], expected_data, 4)
+
+ def test_cf(self):
+ amp = 4
+ src_data = sig_source_c(1, 0.01, amp, 200)
+ N = 750000
+
+ expected_data = amp
+
+ src = gr.vector_source_c(src_data, True)
+ head = gr.head(gr.sizeof_gr_complex, N)
+ op = blocks.rms_cf(0.0001)
+ dst = gr.vector_sink_f()
+
+ self.tb.connect(src, head, op, dst)
+ self.tb.run()
+
+ dst_data = dst.data()
+ self.assertAlmostEqual(dst_data[-1], expected_data, 4)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_rms, "test_rms.xml")
diff --git a/gr-blocks/python/qa_stream_mux.py b/gr-blocks/python/qa_stream_mux.py
new file mode 100755
index 000000000..f21a9bbbc
--- /dev/null
+++ b/gr-blocks/python/qa_stream_mux.py
@@ -0,0 +1,169 @@
+#!/usr/bin/env python
+#
+# Copyright 2004,2005,2007,2010,2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+
+class test_stream_mux (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def help_stream_2ff(self, N, stream_sizes):
+ v0 = gr.vector_source_f(N*[1,], False)
+ v1 = gr.vector_source_f(N*[2,], False)
+
+ mux = blocks_swig.stream_mux(gr.sizeof_float, stream_sizes)
+
+ dst = gr.vector_sink_f ()
+
+ self.tb.connect (v0, (mux,0))
+ self.tb.connect (v1, (mux,1))
+ self.tb.connect (mux, dst)
+ self.tb.run ()
+
+ return dst.data ()
+
+ def help_stream_ramp_2ff(self, N, stream_sizes):
+ r1 = range(N)
+ r2 = range(N)
+ r2.reverse()
+
+ v0 = gr.vector_source_f(r1, False)
+ v1 = gr.vector_source_f(r2, False)
+
+ mux = blocks_swig.stream_mux(gr.sizeof_float, stream_sizes)
+
+ dst = gr.vector_sink_f ()
+
+ self.tb.connect (v0, (mux,0))
+ self.tb.connect (v1, (mux,1))
+ self.tb.connect (mux, dst)
+ self.tb.run ()
+
+ return dst.data ()
+
+ def test_stream_2NN_ff(self):
+ N = 40
+ stream_sizes = [10, 10]
+ result_data = self.help_stream_2ff(N, stream_sizes)
+
+ exp_data = (1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0)
+ self.assertEqual (exp_data, result_data)
+
+ def test_stream_ramp_2NN_ff(self):
+ N = 40
+ stream_sizes = [10, 10]
+ result_data = self.help_stream_ramp_2ff(N, stream_sizes)
+
+ exp_data = ( 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
+ 39.0, 38.0, 37.0, 36.0, 35.0, 34.0, 33.0, 32.0, 31.0, 30.0,
+ 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0,
+ 29.0, 28.0, 27.0, 26.0, 25.0, 24.0, 23.0, 22.0, 21.0, 20.0,
+ 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0,
+ 19.0, 18.0, 17.0, 16.0, 15.0, 14.0, 13.0, 12.0, 11.0, 10.0,
+ 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0,
+ 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0)
+ self.assertEqual (exp_data, result_data)
+
+ def test_stream_2NM_ff(self):
+ N = 40
+ stream_sizes = [7, 9]
+ self.help_stream_2ff(N, stream_sizes)
+
+ result_data = self.help_stream_2ff(N, stream_sizes)
+
+ exp_data = (1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0)
+
+ self.assertEqual (exp_data, result_data)
+
+
+ def test_stream_2MN_ff(self):
+ N = 37
+ stream_sizes = [7, 9]
+ self.help_stream_2ff(N, stream_sizes)
+
+ result_data = self.help_stream_2ff(N, stream_sizes)
+
+ exp_data = (1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0)
+
+ self.assertEqual (exp_data, result_data)
+
+ def test_stream_2N0_ff(self):
+ N = 30
+ stream_sizes = [7, 0]
+ self.help_stream_2ff(N, stream_sizes)
+
+ result_data = self.help_stream_2ff(N, stream_sizes)
+
+ exp_data = (1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0)
+
+ self.assertEqual (exp_data, result_data)
+
+ def test_stream_20N_ff(self):
+ N = 30
+ stream_sizes = [0, 9]
+ self.help_stream_2ff(N, stream_sizes)
+
+ result_data = self.help_stream_2ff(N, stream_sizes)
+
+ exp_data = (2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0)
+
+ self.assertEqual (exp_data, result_data)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_stream_mux, "test_stream_mux.xml")
diff --git a/gr-blocks/python/qa_stretch.py b/gr-blocks/python/qa_stretch.py
new file mode 100755
index 000000000..013d878a8
--- /dev/null
+++ b/gr-blocks/python/qa_stretch.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+#
+# Copyright 2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+
+class test_stretch(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_stretch_01(self):
+ tb = self.tb
+
+ data = 10*[1,]
+ data0 = map(lambda x: x/20.0, data)
+ data1 = map(lambda x: x/10.0, data)
+
+ expected_result0 = 10*[0.05,]
+ expected_result1 = 10*[0.1,]
+
+ src0 = gr.vector_source_f(data0, False)
+ src1 = gr.vector_source_f(data1, False)
+ inter = gr.streams_to_vector(gr.sizeof_float, 2)
+ op = blocks.stretch_ff(0.1, 2)
+ deinter = gr.vector_to_streams(gr.sizeof_float, 2)
+ dst0 = gr.vector_sink_f()
+ dst1 = gr.vector_sink_f()
+
+ tb.connect(src0, (inter,0))
+ tb.connect(src1, (inter,1))
+ tb.connect(inter, op)
+ tb.connect(op, deinter)
+ tb.connect((deinter,0), dst0)
+ tb.connect((deinter,1), dst1)
+ tb.run()
+
+ dst0_data = dst0.data()
+ dst1_data = dst1.data()
+
+ self.assertFloatTuplesAlmostEqual(expected_result0, dst0_data, 4)
+ self.assertFloatTuplesAlmostEqual(expected_result1, dst1_data, 4)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_stretch, "test_stretch.xml")
diff --git a/gr-blocks/python/qa_threshold.py b/gr-blocks/python/qa_threshold.py
new file mode 100644
index 000000000..f91af739a
--- /dev/null
+++ b/gr-blocks/python/qa_threshold.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+#
+# Copyright 2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+
+class test_threshold(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_01(self):
+ tb = self.tb
+
+ data = [0, 0, 0, 2, 2, 2, 2, 0, 0, 0, 2, 2, 2]
+
+ expected_result = (0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1)
+
+ src = gr.vector_source_f(data, False)
+ op = blocks.threshold_ff(1, 1)
+ dst = gr.vector_sink_f()
+
+ tb.connect(src, op)
+ tb.connect(op, dst)
+ tb.run()
+
+ dst_data = dst.data()
+
+ self.assertEqual(expected_result, dst_data)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_threshold, "test_threshold.xml")
diff --git a/gr-blocks/python/qa_throttle.py b/gr-blocks/python/qa_throttle.py
new file mode 100755
index 000000000..5d462f2c9
--- /dev/null
+++ b/gr-blocks/python/qa_throttle.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python
+#
+# Copyright 2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+
+class test_throttle(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_01(self):
+ # Test that we can make the block
+ op = blocks.throttle(gr.sizeof_gr_complex, 1)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_throttle, "test_throttle.xml")
diff --git a/gr-blocks/python/qa_transcendental.py b/gr-blocks/python/qa_transcendental.py
new file mode 100644
index 000000000..8174f7963
--- /dev/null
+++ b/gr-blocks/python/qa_transcendental.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+#
+# Copyright 2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+import math
+
+class test_transcendental(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_01(self):
+ tb = self.tb
+
+ data = 100*[0,]
+ expected_result = 100*[1,]
+
+ src = gr.vector_source_f(data, False)
+ op = blocks.transcendental("cos", "float")
+ dst = gr.vector_sink_f()
+
+ tb.connect(src, op)
+ tb.connect(op, dst)
+ tb.run()
+
+ dst_data = dst.data()
+
+ self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5)
+
+ def test_02(self):
+ tb = self.tb
+
+ data = 100*[3,]
+ expected_result = 100*[math.log10(3),]
+
+ src = gr.vector_source_f(data, False)
+ op = blocks.transcendental("log10", "float")
+ dst = gr.vector_sink_f()
+
+ tb.connect(src, op)
+ tb.connect(op, dst)
+ tb.run()
+
+ dst_data = dst.data()
+
+ self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5)
+
+ def test_03(self):
+ tb = self.tb
+
+ data = 100*[3,]
+ expected_result = 100*[math.tanh(3),]
+
+ src = gr.vector_source_f(data, False)
+ op = blocks.transcendental("tanh", "float")
+ dst = gr.vector_sink_f()
+
+ tb.connect(src, op)
+ tb.connect(op, dst)
+ tb.run()
+
+ dst_data = dst.data()
+
+ self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_transcendental, "test_transcendental.xml")
diff --git a/gr-blocks/python/qa_type_conversions.py b/gr-blocks/python/qa_type_conversions.py
new file mode 100755
index 000000000..eb1b42b63
--- /dev/null
+++ b/gr-blocks/python/qa_type_conversions.py
@@ -0,0 +1,316 @@
+#!/usr/bin/env python
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig
+from math import sqrt, atan2
+
+class test_type_conversions(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_char_to_float_identity(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ src = gr.vector_source_b(src_data)
+ op = blocks_swig.char_to_float()
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data, dst.data())
+
+ def test_char_to_float_scale(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_data = (0.5, 1.0, 1.5, 2.0, 2.5)
+ src = gr.vector_source_b(src_data)
+ op = blocks_swig.char_to_float(scale=2.0)
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data, dst.data())
+
+ def test_char_to_short(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_data = (256, 512, 768, 1024, 1280)
+ src = gr.vector_source_b(src_data)
+ op = blocks_swig.char_to_short()
+ dst = gr.vector_sink_s()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_complex_to_interleaved_short(self):
+ src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j)
+ expected_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ src = gr.vector_source_c(src_data)
+ op = blocks_swig.complex_to_interleaved_short()
+ dst = gr.vector_sink_s()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_complex_to_float_1(self):
+ src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j)
+ expected_data = (1.0, 3.0, 5.0, 7.0, 9.0)
+ src = gr.vector_source_c(src_data)
+ op = blocks_swig.complex_to_float()
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data, dst.data())
+
+ def test_complex_to_float_2(self):
+ src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j)
+ expected_data1 = (1.0, 3.0, 5.0, 7.0, 9.0)
+ expected_data2 = (2.0, 4.0, 6.0, 8.0, 10.0)
+ src = gr.vector_source_c(src_data)
+ op = blocks_swig.complex_to_float()
+ dst1 = gr.vector_sink_f()
+ dst2 = gr.vector_sink_f()
+ self.tb.connect(src, op)
+ self.tb.connect((op, 0), dst1)
+ self.tb.connect((op, 1), dst2)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data1, dst1.data())
+ self.assertFloatTuplesAlmostEqual(expected_data2, dst2.data())
+
+ def test_complex_to_real(self):
+ src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j)
+ expected_data = (1.0, 3.0, 5.0, 7.0, 9.0)
+ src = gr.vector_source_c(src_data)
+ op = blocks_swig.complex_to_real()
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data, dst.data())
+
+ def test_complex_to_imag(self):
+ src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j)
+ expected_data = (2.0, 4.0, 6.0, 8.0, 10.0)
+ src = gr.vector_source_c(src_data)
+ op = blocks_swig.complex_to_imag()
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data, dst.data())
+
+ def test_complex_to_mag(self):
+ src_data = (1+2j, 3-4j, 5+6j, 7-8j, -9+10j)
+ expected_data = (sqrt(5), sqrt(25), sqrt(61), sqrt(113), sqrt(181))
+ src = gr.vector_source_c(src_data)
+ op = blocks_swig.complex_to_mag()
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data, dst.data(), 5)
+
+ def test_complex_to_mag_squared(self):
+ src_data = (1+2j, 3-4j, 5+6j, 7-8j, -9+10j)
+ expected_data = (5.0, 25.0, 61.0, 113.0, 181.0)
+ src = gr.vector_source_c(src_data)
+ op = blocks_swig.complex_to_mag_squared()
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data, dst.data())
+
+ def test_complex_to_arg(self):
+ src_data = (1+2j, 3-4j, 5+6j, 7-8j, -9+10j)
+ expected_data = (atan2(2, 1), atan2(-4,3), atan2(6, 5), atan2(-8, 7), atan2(10,-9))
+ src = gr.vector_source_c(src_data)
+ op = blocks_swig.complex_to_arg()
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data, dst.data(), 2)
+
+ def test_float_to_char_identity(self):
+ src_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ expected_data = (1, 2, 3, 4, 5)
+ src = gr.vector_source_f(src_data)
+ op = blocks_swig.float_to_char()
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_float_to_char_scale(self):
+ src_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ expected_data = (5, 10, 15, 20, 25)
+ src = gr.vector_source_f(src_data)
+ op = blocks_swig.float_to_char(1, 5)
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_float_to_complex_1(self):
+ src_data = (1.0, 3.0, 5.0, 7.0, 9.0)
+ expected_data = (1+0j, 3+0j, 5+0j, 7+0j, 9+0j)
+ src = gr.vector_source_f(src_data)
+ op = blocks_swig.float_to_complex()
+ dst = gr.vector_sink_c()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data, dst.data())
+
+ def test_float_to_complex_2(self):
+ src1_data = (1.0, 3.0, 5.0, 7.0, 9.0)
+ src2_data = (2.0, 4.0, 6.0, 8.0, 10.0)
+ expected_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j)
+ src1 = gr.vector_source_f(src1_data)
+ src2 = gr.vector_source_f(src2_data)
+ op = blocks_swig.float_to_complex()
+ dst = gr.vector_sink_c()
+ self.tb.connect(src1, (op, 0))
+ self.tb.connect(src2, (op, 1))
+ self.tb.connect(op, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data, dst.data())
+
+ def test_float_to_int_identity(self):
+ src_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ expected_data = (1, 2, 3, 4, 5)
+ src = gr.vector_source_f(src_data)
+ op = blocks_swig.float_to_int()
+ dst = gr.vector_sink_i()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_float_to_int_scale(self):
+ src_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ expected_data = (5, 10, 15, 20, 25)
+ src = gr.vector_source_f(src_data)
+ op = blocks_swig.float_to_int(1, 5)
+ dst = gr.vector_sink_i()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_float_to_short_identity(self):
+ src_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ expected_data = (1, 2, 3, 4, 5)
+ src = gr.vector_source_f(src_data)
+ op = blocks_swig.float_to_short()
+ dst = gr.vector_sink_s()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_float_to_short_scale(self):
+ src_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ expected_data = (5, 10, 15, 20, 25)
+ src = gr.vector_source_f(src_data)
+ op = blocks_swig.float_to_short(1, 5)
+ dst = gr.vector_sink_s()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_float_to_uchar(self):
+ src_data = (1.0, -2.0, 3.0, -4.0, 256.0)
+ expected_data = (1, 0, 3, 0, 255)
+ src = gr.vector_source_f(src_data)
+ op = blocks_swig.float_to_uchar()
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_int_to_float_identity(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ src = gr.vector_source_i(src_data)
+ op = blocks_swig.int_to_float()
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data, dst.data())
+
+ def test_int_to_float_scale(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_data = (0.2, 0.4, 0.6, 0.8, 1.0)
+ src = gr.vector_source_i(src_data)
+ op = blocks_swig.int_to_float(1, 5)
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertFloatTuplesAlmostEqual(expected_data, dst.data())
+
+ def test_interleaved_short_to_complex(self):
+ src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ expected_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j)
+ src = gr.vector_source_s(src_data)
+ op = blocks_swig.interleaved_short_to_complex()
+ dst = gr.vector_sink_c()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_short_to_char(self):
+ src_data = (256, 512, 768, 1024, 1280)
+ expected_data = (1, 2, 3, 4, 5)
+ src = gr.vector_source_s(src_data)
+ op = blocks_swig.short_to_char()
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_short_to_float_identity(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ src = gr.vector_source_s(src_data)
+ op = blocks_swig.short_to_float()
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_short_to_float_scale(self):
+ src_data = (5, 10, 15, 20, 25)
+ expected_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ src = gr.vector_source_s(src_data)
+ op = blocks_swig.short_to_float(1, 5)
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+ def test_uchar_to_float(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ src = gr.vector_source_b(src_data)
+ op = blocks_swig.uchar_to_float()
+ dst = gr.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ self.assertEqual(expected_data, dst.data())
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_type_conversions, "test_type_conversions.xml")