diff options
Diffstat (limited to 'gr-blocks/python')
28 files changed, 3288 insertions, 0 deletions
diff --git a/gr-blocks/python/CMakeLists.txt b/gr-blocks/python/CMakeLists.txt new file mode 100644 index 000000000..cab0b956f --- /dev/null +++ b/gr-blocks/python/CMakeLists.txt @@ -0,0 +1,48 @@ +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. + +######################################################################## +include(GrPython) + +GR_PYTHON_INSTALL( + FILES + __init__.py + parse_file_metadata.py + DESTINATION ${GR_PYTHON_DIR}/gnuradio/blocks + COMPONENT "blocks_python" +) + +######################################################################## +# Handle the unit tests +######################################################################## +if(ENABLE_TESTING) + +list(APPEND GR_TEST_PYTHON_DIRS + ${CMAKE_BINARY_DIR}/gr-blocks/python + ${CMAKE_BINARY_DIR}/gr-blocks/swig +) +list(APPEND GR_TEST_TARGET_DEPS gnuradio-blocks) + +include(GrTest) +file(GLOB py_qa_test_files "qa_*.py") +foreach(py_qa_test_file ${py_qa_test_files}) + get_filename_component(py_qa_test_name ${py_qa_test_file} NAME_WE) + GR_ADD_TEST(${py_qa_test_name} ${PYTHON_EXECUTABLE} ${PYTHON_DASH_B} ${py_qa_test_file}) +endforeach(py_qa_test_file) +endif(ENABLE_TESTING) diff --git a/gr-blocks/python/__init__.py b/gr-blocks/python/__init__.py new file mode 100644 index 000000000..6577d933e --- /dev/null +++ b/gr-blocks/python/__init__.py @@ -0,0 +1,37 @@ +# +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +''' +This is the gr-blocks package. This package provides GNU Radio +processing blocks common to many flowgraphs. +''' + +from blocks_swig import * + +#alias old gr_add_vXX and gr_multiply_vXX +add_vcc = add_cc +add_vff = add_ff +add_vii = add_ii +add_vss = add_ss +multiply_vcc = multiply_cc +multiply_vff = multiply_ff +multiply_vii = multiply_ii +multiply_vss = multiply_ss diff --git a/gr-blocks/python/parse_file_metadata.py b/gr-blocks/python/parse_file_metadata.py new file mode 100644 index 000000000..c8ac2def9 --- /dev/null +++ b/gr-blocks/python/parse_file_metadata.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python +# +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import sys +from gnuradio import gr + +try: + import pmt +except ImportError: + from gruel import pmt + + +try: + import blocks_swig as blocks +except ImportError: + from gnuradio import blocks + +''' +sr Sample rate (samples/second) +time Time as uint64(secs), double(fractional secs) +type Type of data (see gr_file_types enum) +cplx is complex? (True or False) +strt Start of data (or size of header) in bytes +size Size of data in bytes +''' + +HEADER_LENGTH = blocks.METADATA_HEADER_SIZE + +ftype_to_string = {blocks.GR_FILE_BYTE: "bytes", + blocks.GR_FILE_SHORT: "short", + blocks.GR_FILE_INT: "int", + blocks.GR_FILE_LONG: "long", + blocks.GR_FILE_LONG_LONG: "long long", + blocks.GR_FILE_FLOAT: "float", + blocks.GR_FILE_DOUBLE: "double" } + +ftype_to_size = {blocks.GR_FILE_BYTE: gr.sizeof_char, + blocks.GR_FILE_SHORT: gr.sizeof_short, + blocks.GR_FILE_INT: gr.sizeof_int, + blocks.GR_FILE_LONG: gr.sizeof_int, + blocks.GR_FILE_LONG_LONG: 2*gr.sizeof_int, + blocks.GR_FILE_FLOAT: gr.sizeof_float, + blocks.GR_FILE_DOUBLE: gr.sizeof_double} + +def parse_header(p, VERBOSE=False): + dump = pmt.PMT_NIL + + info = dict() + + if(pmt.pmt_is_dict(p) is False): + sys.stderr.write("Header is not a PMT dictionary: invalid or corrupt data file.\n") + sys.exit(1) + + # GET FILE FORMAT VERSION NUMBER + if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("version"))): + r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("version"), dump) + version = pmt.pmt_to_long(r) + if(VERBOSE): + print "Version Number: {0}".format(version) + else: + sys.stderr.write("Could not find key 'version': invalid or corrupt data file.\n") + sys.exit(1) + + # EXTRACT SAMPLE RATE + if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("rx_rate"))): + r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("rx_rate"), dump) + samp_rate = pmt.pmt_to_double(r) + info["rx_rate"] = samp_rate + if(VERBOSE): + print "Sample Rate: {0:.2f} sps".format(samp_rate) + else: + sys.stderr.write("Could not find key 'sr': invalid or corrupt data file.\n") + sys.exit(1) + + # EXTRACT TIME STAMP + if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("rx_time"))): + r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("rx_time"), dump) + pmt_secs = pmt.pmt_tuple_ref(r, 0) + pmt_fracs = pmt.pmt_tuple_ref(r, 1) + secs = float(pmt.pmt_to_uint64(pmt_secs)) + fracs = pmt.pmt_to_double(pmt_fracs) + t = secs + fracs + info["rx_time"] = t + if(VERBOSE): + print "Seconds: {0:.6f}".format(t) + else: + sys.stderr.write("Could not find key 'time': invalid or corrupt data file.\n") + sys.exit(1) + + # EXTRACT ITEM SIZE + if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("size"))): + r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("size"), dump) + dsize = pmt.pmt_to_long(r) + info["size"] = dsize + if(VERBOSE): + print "Item size: {0}".format(dsize) + else: + sys.stderr.write("Could not find key 'size': invalid or corrupt data file.\n") + sys.exit(1) + + # EXTRACT DATA TYPE + if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("type"))): + r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("type"), dump) + dtype = pmt.pmt_to_long(r) + stype = ftype_to_string[dtype] + info["type"] = stype + if(VERBOSE): + print "Data Type: {0} ({1})".format(stype, dtype) + else: + sys.stderr.write("Could not find key 'type': invalid or corrupt data file.\n") + sys.exit(1) + + # EXTRACT COMPLEX + if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("cplx"))): + r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("cplx"), dump) + cplx = pmt.pmt_to_bool(r) + info["cplx"] = cplx + if(VERBOSE): + print "Complex? {0}".format(cplx) + else: + sys.stderr.write("Could not find key 'cplx': invalid or corrupt data file.\n") + sys.exit(1) + + # EXTRACT WHERE CURRENT SEGMENT STARTS + if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("strt"))): + r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("strt"), dump) + seg_start = pmt.pmt_to_uint64(r) + info["hdr_len"] = seg_start + info["extra_len"] = seg_start - HEADER_LENGTH + info["has_extra"] = info["extra_len"] > 0 + if(VERBOSE): + print "Header Length: {0} bytes".format(info["hdr_len"]) + print "Extra Length: {0}".format((info["extra_len"])) + print "Extra Header? {0}".format(info["has_extra"]) + else: + sys.stderr.write("Could not find key 'strt': invalid or corrupt data file.\n") + sys.exit(1) + + # EXTRACT SIZE OF DATA + if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("bytes"))): + r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("bytes"), dump) + nbytes = pmt.pmt_to_uint64(r) + + nitems = nbytes/dsize + info["nitems"] = nitems + info["nbytes"] = nbytes + + if(VERBOSE): + print "Size of Data: {0} bytes".format(nbytes) + print " {0} items".format(nitems) + else: + sys.stderr.write("Could not find key 'size': invalid or corrupt data file.\n") + sys.exit(1) + + return info + +# IF THERE IS EXTRA DATA, PULL OUT THE DICTIONARY AND PARSE IT +def parse_extra_dict(p, info, VERBOSE=False): + if(pmt.pmt_is_dict(p) is False): + sys.stderr.write("Extra header is not a PMT dictionary: invalid or corrupt data file.\n") + sys.exit(1) + + items = pmt.pmt_dict_items(p) + nitems = pmt.pmt_length(items) + for i in xrange(nitems): + item = pmt.pmt_nth(i, items) + key = pmt.pmt_symbol_to_string(pmt.pmt_car(item)) + val = pmt.pmt_cdr(item) + info[key] = val + if(VERBOSE): + print "{0}: ".format(key) + pmt.pmt_print(val) + + return info diff --git a/gr-blocks/python/qa_add_mult_div_sub.py b/gr-blocks/python/qa_add_mult_div_sub.py new file mode 100755 index 000000000..0aca03d3f --- /dev/null +++ b/gr-blocks/python/qa_add_mult_div_sub.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python +# +# Copyright 2004,2007,2010,2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig + +class test_add_mult_div_sub(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def help_ii(self, src_data, exp_data, op): + for s in zip(range(len(src_data)), src_data): + src = gr.vector_source_i(s[1]) + self.tb.connect(src, (op, s[0])) + dst = gr.vector_sink_i() + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_ss(self, src_data, exp_data, op): + for s in zip(range(len(src_data)), src_data): + src = gr.vector_source_s(s[1]) + self.tb.connect(src, (op, s[0])) + dst = gr.vector_sink_s() + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_ff(self, src_data, exp_data, op): + for s in zip(range(len(src_data)), src_data): + src = gr.vector_source_f(s[1]) + self.tb.connect(src, (op, s[0])) + dst = gr.vector_sink_f() + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_cc(self, src_data, exp_data, op): + for s in zip(range(len(src_data)), src_data): + src = gr.vector_source_c(s[1]) + self.tb.connect(src, (op, s[0])) + dst = gr.vector_sink_c() + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + # add_XX + + def test_add_ss(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (8, -3, 4, 8, 2) + expected_result = (9, -1, 7, 12, 7) + op = blocks_swig.add_ss() + self.help_ss((src1_data, src2_data), expected_result, op) + + def test_add_ii(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (8, -3, 4, 8, 2) + expected_result = (9, -1, 7, 12, 7) + op = blocks_swig.add_ii() + self.help_ii((src1_data, src2_data), expected_result, op) + + def test_add_ff(self): + src1_data = (1.0, 2.0, 3.0, 4.0, 5.0) + src2_data = (8.0, -3.0, 4.0, 8.0, 2.0) + expected_result = (9.0, -1.0, 7.0, 12.0, 7.0) + op = blocks_swig.add_ff() + self.help_ff((src1_data, src2_data), expected_result, op) + + def test_add_cc(self): + src1_data = (1+1j, 2+2j, 3+3j, 4+4j, 5+5j) + src2_data = (8+8j, -3-3j, 4+4j, 8+8j, 2+2j) + expected_result = (9+9j, -1-1j, 7+7j, 12+12j, 7+7j) + op = blocks_swig.add_cc() + self.help_cc((src1_data, src2_data), expected_result, op) + + # add_const_XX + + def test_add_const_ss(self): + src_data = (1, 2, 3, 4, 5) + expected_result = (6, 7, 8, 9, 10) + op = blocks_swig.add_const_ss(5) + self.help_ss((src_data,), expected_result, op) + + def test_add_const_ii(self): + src_data = (1, 2, 3, 4, 5) + expected_result = (6, 7, 8, 9, 10) + op = blocks_swig.add_const_ii(5) + self.help_ii((src_data,), expected_result, op) + + def test_add_const_ff(self): + src_data = (1, 2, 3, 4, 5) + expected_result = (6, 7, 8, 9, 10) + op = blocks_swig.add_const_ff(5) + self.help_ff((src_data,), expected_result, op) + + def test_add_const_cc(self): + src_data = (1, 2, 3, 4, 5) + expected_result = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j) + op = blocks_swig.add_const_cc(5j) + self.help_cc((src_data,), expected_result, op) + + # multiply_XX + + def test_multiply_ss(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (8, -3, 4, 8, 2) + expected_result = (8, -6, 12, 32, 10) + op = blocks_swig.multiply_ss() + self.help_ss((src1_data, src2_data), + expected_result, op) + + def test_multiply_ii(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (8, -3, 4, 8, 2) + expected_result = (8, -6, 12, 32, 10) + op = blocks_swig.multiply_ii() + self.help_ii((src1_data, src2_data), + expected_result, op) + + def test_multiply_ff(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (8, -3, 4, 8, 2) + expected_result = (8, -6, 12, 32, 10) + op = blocks_swig.multiply_ff() + self.help_ff((src1_data, src2_data), + expected_result, op) + + def test_multiply_cc(self): + src1_data = (1+1j, 2+2j, 3+3j, 4+4j, 5+5j) + src2_data = (8, -3, 4, 8, 2) + expected_result = (8+8j, -6-6j, 12+12j, 32+32j, 10+10j) + op = blocks_swig.multiply_cc() + self.help_cc((src1_data, src2_data), + expected_result, op) + + # multiply_const_XX + + def test_multiply_const_ss(self): + src_data = (-1, 0, 1, 2, 3) + expected_result = (-5, 0, 5, 10, 15) + op = blocks_swig.multiply_const_ss(5) + self.help_ss((src_data,), expected_result, op) + + def test_multiply_const_ii(self): + src_data = (-1, 0, 1, 2, 3) + expected_result = (-5, 0, 5, 10, 15) + op = blocks_swig.multiply_const_ii(5) + self.help_ii((src_data,), expected_result, op) + + def test_multiply_const_ff(self): + src_data = (-1, 0, 1, 2, 3) + expected_result = (-5, 0, 5, 10, 15) + op = blocks_swig.multiply_const_ff(5) + self.help_ff((src_data,), expected_result, op) + + def test_multiply_const_cc(self): + src_data = (-1-1j, 0+0j, 1+1j, 2+2j, 3+3j) + expected_result = (-5-5j, 0+0j, 5+5j, 10+10j, 15+15j) + op = blocks_swig.multiply_const_cc(5) + self.help_cc((src_data,), expected_result, op) + + def test_multiply_const_cc2(self): + src_data = (-1-1j, 0+0j, 1+1j, 2+2j, 3+3j) + expected_result = (-3-7j, 0+0j, 3+7j, 6+14j, 9+21j) + op = blocks_swig.multiply_const_cc(5+2j) + self.help_cc((src_data,), expected_result, op) + + def test_sub_ii(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (8, -3, 4, 8, 2) + expected_result = (-7, 5, -1, -4, 3) + op = blocks_swig.sub_ii() + self.help_ii((src1_data, src2_data), + expected_result, op) + + def test_div_ff(self): + src1_data = ( 5, 9, -15, 1024) + src2_data = (10, 3, -5, 64) + expected_result = (0.5, 3, 3, 16) + op = blocks_swig.divide_ff() + self.help_ff((src1_data, src2_data), expected_result, op) + +if __name__ == '__main__': + gr_unittest.run(test_add_mult_div_sub, "test_add_mult_div_sub.xml") diff --git a/gr-blocks/python/qa_add_mult_v.py b/gr-blocks/python/qa_add_mult_v.py new file mode 100755 index 000000000..d362cb885 --- /dev/null +++ b/gr-blocks/python/qa_add_mult_v.py @@ -0,0 +1,360 @@ +#!/usr/bin/env python +# +# Copyright 2004,2007,2010,2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig + +class test_add_mult_v(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def help_ss(self, size, src_data, exp_data, op): + for s in zip(range (len (src_data)), src_data): + src = gr.vector_source_s(s[1]) + srcv = gr.stream_to_vector(gr.sizeof_short, size) + self.tb.connect(src, srcv) + self.tb.connect(srcv, (op, s[0])) + rhs = gr.vector_to_stream(gr.sizeof_short, size) + dst = gr.vector_sink_s() + self.tb.connect(op, rhs, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_ii(self, size, src_data, exp_data, op): + for s in zip(range (len (src_data)), src_data): + src = gr.vector_source_i(s[1]) + srcv = gr.stream_to_vector(gr.sizeof_int, size) + self.tb.connect(src, srcv) + self.tb.connect(srcv, (op, s[0])) + rhs = gr.vector_to_stream(gr.sizeof_int, size) + dst = gr.vector_sink_i() + self.tb.connect(op, rhs, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_ff(self, size, src_data, exp_data, op): + for s in zip(range (len (src_data)), src_data): + src = gr.vector_source_f(s[1]) + srcv = gr.stream_to_vector(gr.sizeof_float, size) + self.tb.connect(src, srcv) + self.tb.connect(srcv, (op, s[0])) + rhs = gr.vector_to_stream(gr.sizeof_float, size) + dst = gr.vector_sink_f() + self.tb.connect(op, rhs, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_cc(self, size, src_data, exp_data, op): + for s in zip(range (len (src_data)), src_data): + src = gr.vector_source_c(s[1]) + srcv = gr.stream_to_vector(gr.sizeof_gr_complex, size) + self.tb.connect(src, srcv) + self.tb.connect(srcv, (op, s[0])) + rhs = gr.vector_to_stream(gr.sizeof_gr_complex, size) + dst = gr.vector_sink_c() + self.tb.connect(op, rhs, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_const_ss(self, src_data, exp_data, op): + src = gr.vector_source_s(src_data) + srcv = gr.stream_to_vector(gr.sizeof_short, len(src_data)) + rhs = gr.vector_to_stream(gr.sizeof_short, len(src_data)) + dst = gr.vector_sink_s() + self.tb.connect(src, srcv, op, rhs, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_const_ii(self, src_data, exp_data, op): + src = gr.vector_source_i(src_data) + srcv = gr.stream_to_vector(gr.sizeof_int, len(src_data)) + rhs = gr.vector_to_stream(gr.sizeof_int, len(src_data)) + dst = gr.vector_sink_i() + self.tb.connect(src, srcv, op, rhs, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_const_ff(self, src_data, exp_data, op): + src = gr.vector_source_f(src_data) + srcv = gr.stream_to_vector(gr.sizeof_float, len(src_data)) + rhs = gr.vector_to_stream(gr.sizeof_float, len(src_data)) + dst = gr.vector_sink_f() + self.tb.connect(src, srcv, op, rhs, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_const_cc(self, src_data, exp_data, op): + src = gr.vector_source_c(src_data) + srcv = gr.stream_to_vector(gr.sizeof_gr_complex, len(src_data)) + rhs = gr.vector_to_stream(gr.sizeof_gr_complex, len(src_data)) + dst = gr.vector_sink_c() + self.tb.connect(src, srcv, op, rhs, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + # add_vXX + + def test_add_vss_one(self): + src1_data = (1,) + src2_data = (2,) + src3_data = (3,) + expected_result = (6,) + op = blocks_swig.add_ss(1) + self.help_ss(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vss_five(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (6, 7, 8, 9, 10) + src3_data = (11, 12, 13, 14, 15) + expected_result = (18, 21, 24, 27, 30) + op = blocks_swig.add_ss(5) + self.help_ss(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vii_one(self): + src1_data = (1,) + src2_data = (2,) + src3_data = (3,) + expected_result = (6,) + op = blocks_swig.add_ii(1) + self.help_ii(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vii_five(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (6, 7, 8, 9, 10) + src3_data = (11, 12, 13, 14, 15) + expected_result = (18, 21, 24, 27, 30) + op = blocks_swig.add_ii(5) + self.help_ii(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vff_one(self): + src1_data = (1.0,) + src2_data = (2.0,) + src3_data = (3.0,) + expected_result = (6.0,) + op = blocks_swig.add_ff(1) + self.help_ff(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vff_five(self): + src1_data = (1.0, 2.0, 3.0, 4.0, 5.0) + src2_data = (6.0, 7.0, 8.0, 9.0, 10.0) + src3_data = (11.0, 12.0, 13.0, 14.0, 15.0) + expected_result = (18.0, 21.0, 24.0, 27.0, 30.0) + op = blocks_swig.add_ff(5) + self.help_ff(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vcc_one(self): + src1_data = (1.0+2.0j,) + src2_data = (3.0+4.0j,) + src3_data = (5.0+6.0j,) + expected_result = (9.0+12j,) + op = blocks_swig.add_cc(1) + self.help_cc(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vcc_five(self): + src1_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j) + src2_data = (11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j) + src3_data = (21.0+22.0j, 23.0+24.0j, 25.0+26.0j, 27.0+28.0j, 29.0+30.0j) + expected_result = (33.0+36.0j, 39.0+42.0j, 45.0+48.0j, 51.0+54.0j, 57.0+60.0j) + op = blocks_swig.add_cc(5) + self.help_cc(5, (src1_data, src2_data, src3_data), expected_result, op) + + # add_const_vXX + + def test_add_const_vss_one(self): + src_data = (1,) + op = blocks_swig.add_const_vss((2,)) + exp_data = (3,) + self.help_const_ss(src_data, exp_data, op) + + def test_add_const_vss_five(self): + src_data = (1, 2, 3, 4, 5) + op = blocks_swig.add_const_vss((6, 7, 8, 9, 10)) + exp_data = (7, 9, 11, 13, 15) + self.help_const_ss(src_data, exp_data, op) + + def test_add_const_vii_one(self): + src_data = (1,) + op = blocks_swig.add_const_vii((2,)) + exp_data = (3,) + self.help_const_ii(src_data, exp_data, op) + + def test_add_const_vii_five(self): + src_data = (1, 2, 3, 4, 5) + op = blocks_swig.add_const_vii((6, 7, 8, 9, 10)) + exp_data = (7, 9, 11, 13, 15) + self.help_const_ii(src_data, exp_data, op) + + def test_add_const_vff_one(self): + src_data = (1.0,) + op = blocks_swig.add_const_vff((2.0,)) + exp_data = (3.0,) + self.help_const_ff(src_data, exp_data, op) + + def test_add_const_vff_five(self): + src_data = (1.0, 2.0, 3.0, 4.0, 5.0) + op = blocks_swig.add_const_vff((6.0, 7.0, 8.0, 9.0, 10.0)) + exp_data = (7.0, 9.0, 11.0, 13.0, 15.0) + self.help_const_ff(src_data, exp_data, op) + + def test_add_const_vcc_one(self): + src_data = (1.0+2.0j,) + op = blocks_swig.add_const_vcc((2.0+3.0j,)) + exp_data = (3.0+5.0j,) + self.help_const_cc(src_data, exp_data, op) + + def test_add_const_vcc_five(self): + src_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j) + op = blocks_swig.add_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j)) + exp_data = (12.0+14.0j, 16.0+18.0j, 20.0+22.0j, 24.0+26.0j, 28.0+30.0j) + self.help_const_cc(src_data, exp_data, op) + + # multiply_vXX + + def test_multiply_vss_one(self): + src1_data = (1,) + src2_data = (2,) + src3_data = (3,) + expected_result = (6,) + op = gr.multiply_vss(1) + self.help_ss(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vss_five(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (6, 7, 8, 9, 10) + src3_data = (11, 12, 13, 14, 15) + expected_result = (66, 168, 312, 504, 750) + op = gr.multiply_vss(5) + self.help_ss(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vii_one(self): + src1_data = (1,) + src2_data = (2,) + src3_data = (3,) + expected_result = (6,) + op = gr.multiply_vii(1) + self.help_ii(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vii_five(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (6, 7, 8, 9, 10) + src3_data = (11, 12, 13, 14, 15) + expected_result = (66, 168, 312, 504, 750) + op = gr.multiply_vii(5) + self.help_ii(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vff_one(self): + src1_data = (1.0,) + src2_data = (2.0,) + src3_data = (3.0,) + expected_result = (6.0,) + op = gr.multiply_vff(1) + self.help_ff(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vff_five(self): + src1_data = (1.0, 2.0, 3.0, 4.0, 5.0) + src2_data = (6.0, 7.0, 8.0, 9.0, 10.0) + src3_data = (11.0, 12.0, 13.0, 14.0, 15.0) + expected_result = (66.0, 168.0, 312.0, 504.0, 750.0) + op = gr.multiply_vff(5) + self.help_ff(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vcc_one(self): + src1_data = (1.0+2.0j,) + src2_data = (3.0+4.0j,) + src3_data = (5.0+6.0j,) + expected_result = (-85+20j,) + op = gr.multiply_vcc(1) + self.help_cc(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vcc_five(self): + src1_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j) + src2_data = (11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j) + src3_data = (21.0+22.0j, 23.0+24.0j, 25.0+26.0j, 27.0+28.0j, 29.0+30.0j) + expected_result = (-1021.0+428.0j, -2647.0+1754.0j, -4945.0+3704.0j, -8011.0+6374.0j, -11941.0+9860.0j) + op = gr.multiply_vcc(5) + self.help_cc(5, (src1_data, src2_data, src3_data), expected_result, op) + + # multiply_const_vXX + + def test_multiply_const_vss_one(self): + src_data = (2,) + op = gr.multiply_const_vss((3,)) + exp_data = (6,) + self.help_const_ss(src_data, exp_data, op) + + def test_multiply_const_vss_five(self): + src_data = (1, 2, 3, 4, 5) + op = gr.multiply_const_vss((6, 7, 8, 9, 10)) + exp_data = (6, 14, 24, 36, 50) + self.help_const_ss(src_data, exp_data, op) + + def test_multiply_const_vii_one(self): + src_data = (2,) + op = gr.multiply_const_vii((3,)) + exp_data = (6,) + self.help_const_ii(src_data, exp_data, op) + + def test_multiply_const_vii_five(self): + src_data = (1, 2, 3, 4, 5) + op = gr.multiply_const_vii((6, 7, 8, 9, 10)) + exp_data = (6, 14, 24, 36, 50) + self.help_const_ii(src_data, exp_data, op) + + def test_multiply_const_vff_one(self): + src_data = (2.0,) + op = gr.multiply_const_vff((3.0,)) + exp_data = (6.0,) + self.help_const_ff(src_data, exp_data, op) + + def test_multiply_const_vff_five(self): + src_data = (1.0, 2.0, 3.0, 4.0, 5.0) + op = gr.multiply_const_vff((6.0, 7.0, 8.0, 9.0, 10.0)) + exp_data = (6.0, 14.0, 24.0, 36.0, 50.0) + self.help_const_ff(src_data, exp_data, op) + + def test_multiply_const_vcc_one(self): + src_data = (1.0+2.0j,) + op = gr.multiply_const_vcc((2.0+3.0j,)) + exp_data = (-4.0+7.0j,) + self.help_const_cc(src_data, exp_data, op) + + def test_multiply_const_vcc_five(self): + src_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j) + op = gr.multiply_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j)) + exp_data = (-13.0+34.0j, -17.0+94.0j, -21.0+170.0j, -25.0+262.0j, -29.0+370.0j) + self.help_const_cc(src_data, exp_data, op) + + +if __name__ == '__main__': + gr_unittest.run(test_add_mult_v, "test_add_mult_v.xml") diff --git a/gr-blocks/python/qa_boolean_operators.py b/gr-blocks/python/qa_boolean_operators.py new file mode 100755 index 000000000..5572f60ac --- /dev/null +++ b/gr-blocks/python/qa_boolean_operators.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python +# +# Copyright 2004,2007,2008,2010 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig + +class test_boolean_operators (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def help_ss (self, src_data, exp_data, op): + for s in zip (range (len (src_data)), src_data): + src = gr.vector_source_s (s[1]) + self.tb.connect (src, (op, s[0])) + dst = gr.vector_sink_s () + self.tb.connect (op, dst) + self.tb.run () + result_data = dst.data () + self.assertEqual (exp_data, result_data) + + def help_bb (self, src_data, exp_data, op): + for s in zip (range (len (src_data)), src_data): + src = gr.vector_source_b (s[1]) + self.tb.connect (src, (op, s[0])) + dst = gr.vector_sink_b () + self.tb.connect (op, dst) + self.tb.run () + result_data = dst.data () + self.assertEqual (exp_data, result_data) + + def help_ii (self, src_data, exp_data, op): + for s in zip (range (len (src_data)), src_data): + src = gr.vector_source_i (s[1]) + self.tb.connect (src, (op, s[0])) + dst = gr.vector_sink_i () + self.tb.connect (op, dst) + self.tb.run () + result_data = dst.data () + self.assertEqual (exp_data, result_data) + + def test_xor_ss (self): + src1_data = (1, 2, 3, 0x5004, 0x1150) + src2_data = (8, 2, 1 , 0x0508, 0x1105) + expected_result = (9, 0, 2, 0x550C, 0x0055) + op = blocks_swig.xor_ss () + self.help_ss ((src1_data, src2_data), + expected_result, op) + + def test_xor_bb (self): + src1_data = (1, 2, 3, 4, 0x50) + src2_data = (8, 2, 1 , 8, 0x05) + expected_result = (9, 0, 2, 0xC, 0x55) + op = blocks_swig.xor_bb () + self.help_bb ((src1_data, src2_data), + expected_result, op) + + + def test_xor_ii (self): + src1_data = (1, 2, 3, 0x5000004, 0x11000050) + src2_data = (8, 2, 1 , 0x0500008, 0x11000005) + expected_result = (9, 0, 2, 0x550000C, 0x00000055) + op = blocks_swig.xor_ii () + self.help_ii ((src1_data, src2_data), + expected_result, op) + + + def test_and_ss (self): + src1_data = (1, 2, 3, 0x5004, 0x1150) + src2_data = (8, 2, 1 , 0x0508, 0x1105) + expected_result = (0, 2, 1, 0x0000, 0x1100) + op = blocks_swig.and_ss () + self.help_ss ((src1_data, src2_data), + expected_result, op) + + def test_and_bb (self): + src1_data = (1, 2, 2, 3, 0x04, 0x50) + src2_data = (8, 2, 2, 1, 0x08, 0x05) + src3_data = (8, 2, 1, 1, 0x08, 0x05) + expected_result = (0, 2, 0, 1, 0x00, 0x00) + op = blocks_swig.and_bb () + self.help_bb ((src1_data, src2_data, src3_data), + expected_result, op) + + def test_and_ii (self): + src1_data = (1, 2, 3, 0x50005004, 0x11001150) + src2_data = (8, 2, 1 , 0x05000508, 0x11001105) + expected_result = (0, 2, 1, 0x00000000, 0x11001100) + op = blocks_swig.and_ii () + self.help_ii ((src1_data, src2_data), + expected_result, op) + + def test_and_const_ss (self): + src_data = (1, 2, 3, 0x5004, 0x1150) + expected_result = (0, 2, 2, 0x5000, 0x1100) + src = gr.vector_source_s(src_data) + op = blocks_swig.and_const_ss (0x55AA) + dst = gr.vector_sink_s() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(dst.data(), expected_result) + + def test_and_const_bb (self): + src_data = (1, 2, 3, 0x50, 0x11) + expected_result = (0, 2, 2, 0x00, 0x00) + src = gr.vector_source_b(src_data) + op = blocks_swig.and_const_bb (0xAA) + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(dst.data(), expected_result) + + + def test_and_const_ii (self): + src_data = (1, 2, 3, 0x5004, 0x1150) + expected_result = (0, 2, 2, 0x5000, 0x1100) + src = gr.vector_source_i(src_data) + op = blocks_swig.and_const_ii (0x55AA) + dst = gr.vector_sink_i() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(dst.data(), expected_result) + + + def test_or_ss (self): + src1_data = (1, 2, 3, 0x5004, 0x1150) + src2_data = (8, 2, 1 , 0x0508, 0x1105) + expected_result = (9, 2, 3, 0x550C, 0x1155) + op = blocks_swig.or_ss () + self.help_ss ((src1_data, src2_data), + expected_result, op) + + def test_or_bb (self): + src1_data = (1, 2, 2, 3, 0x04, 0x50) + src2_data = (8, 2, 2, 1 , 0x08, 0x05) + src3_data = (8, 2, 1, 1 , 0x08, 0x05) + expected_result = (9, 2, 3, 3, 0x0C, 0x55) + op = blocks_swig.or_bb () + self.help_bb ((src1_data, src2_data, src3_data), + expected_result, op) + + def test_or_ii (self): + src1_data = (1, 2, 3, 0x50005004, 0x11001150) + src2_data = (8, 2, 1 , 0x05000508, 0x11001105) + expected_result = (9, 2, 3, 0x5500550C, 0x11001155) + op = blocks_swig.or_ii () + self.help_ii ((src1_data, src2_data), + expected_result, op) + + def test_not_ss (self): + src1_data = (1, 2, 3, 0x5004, 0x1150) + expected_result = (~1, ~2, ~3, ~0x5004, ~0x1150) + op = blocks_swig.not_ss () + self.help_ss ((((src1_data),)), + expected_result, op) + + def test_not_bb (self): + src1_data = (1, 2, 2, 3, 0x04, 0x50) + expected_result = (0xFE, 0xFD, 0xFD, 0xFC, 0xFB, 0xAF) + op = blocks_swig.not_bb () + self.help_bb (((src1_data), ), + expected_result, op) + + def test_not_ii (self): + src1_data = (1, 2, 3, 0x50005004, 0x11001150) + expected_result = (~1 , ~2, ~3, ~0x50005004, ~0x11001150) + op = blocks_swig.not_ii () + self.help_ii (((src1_data),), + expected_result, op) + + +if __name__ == '__main__': + gr_unittest.run(test_boolean_operators, "test_boolean_operators.xml") diff --git a/gr-blocks/python/qa_conjugate.py b/gr-blocks/python/qa_conjugate.py new file mode 100644 index 000000000..1808aa9c0 --- /dev/null +++ b/gr-blocks/python/qa_conjugate.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig + +class test_conjugate (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_000 (self): + src_data = (-2-2j, -1-1j, -2+2j, -1+1j, + 2-2j, 1-1j, 2+2j, 1+1j, + 0+0j) + + exp_data = (-2+2j, -1+1j, -2-2j, -1-1j, + 2+2j, 1+1j, 2-2j, 1-1j, + 0-0j) + + src = gr.vector_source_c(src_data) + op = blocks_swig.conjugate_cc () + dst = gr.vector_sink_c () + + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data () + self.assertEqual (exp_data, result_data) + +if __name__ == '__main__': + gr_unittest.run(test_conjugate, "test_conjugate.xml") diff --git a/gr-blocks/python/qa_delay.py b/gr-blocks/python/qa_delay.py new file mode 100755 index 000000000..031cadb2d --- /dev/null +++ b/gr-blocks/python/qa_delay.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# +# Copyright 2004,2007,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks + +class test_delay(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_000(self): + delta_t = 0 + tb = self.tb + src_data = [float(x) for x in range(0, 100)] + expected_result = tuple(delta_t*[0.0] + src_data) + + src = gr.vector_source_f(src_data) + op = blocks.delay(gr.sizeof_float, delta_t) + dst = gr.vector_sink_f() + + tb.connect(src, op, dst) + tb.run() + dst_data = dst.data() + self.assertEqual(expected_result, dst_data) + + def test_010(self): + delta_t = 10 + tb = self.tb + src_data = [float(x) for x in range(0, 100)] + expected_result = tuple(delta_t*[0.0] + src_data) + + src = gr.vector_source_f(src_data) + op = blocks.delay(gr.sizeof_float, delta_t) + dst = gr.vector_sink_f() + + tb.connect(src, op, dst) + tb.run() + dst_data = dst.data() + self.assertEqual(expected_result, dst_data) + +if __name__ == '__main__': + gr_unittest.run(test_delay, "test_delay.xml") diff --git a/gr-blocks/python/qa_file_metadata.py b/gr-blocks/python/qa_file_metadata.py new file mode 100644 index 000000000..c7826b1d3 --- /dev/null +++ b/gr-blocks/python/qa_file_metadata.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python +# +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import parse_file_metadata +import blocks_swig as blocks +import pmt +import os, math + +def sig_source_c(samp_rate, freq, amp, N): + t = map(lambda x: float(x)/samp_rate, xrange(N)) + y = map(lambda x: amp*math.cos(2.*math.pi*freq*x) + \ + 1j*amp*math.sin(2.*math.pi*freq*x), t) + return y + +class test_file_metadata(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + N = 1000 + outfile = "test_out.dat" + + detached = False + samp_rate = 200000 + key = pmt.pmt_intern("samp_rate") + val = pmt.pmt_from_double(samp_rate) + extras = pmt.pmt_make_dict() + extras = pmt.pmt_dict_add(extras, key, val) + extras_str = pmt.pmt_serialize_str(extras) + + data = sig_source_c(samp_rate, 1000, 1, N) + src = gr.vector_source_c(data) + fsnk = blocks.file_meta_sink(gr.sizeof_gr_complex, outfile, + samp_rate, 1, + blocks.GR_FILE_FLOAT, True, + 1000000, extras_str, detached) + fsnk.set_unbuffered(True) + + self.tb.connect(src, fsnk) + self.tb.run() + fsnk.close() + + handle = open(outfile, "rb") + header_str = handle.read(parse_file_metadata.HEADER_LENGTH) + if(len(header_str) == 0): + self.assertFalse() + + try: + header = pmt.pmt_deserialize_str(header_str) + except RuntimeError: + self.assertFalse() + + print header + info = parse_file_metadata.parse_header(header, False) + + extra_str = handle.read(info["extra_len"]) + self.assertGreater(len(extra_str), 0) + handle.close() + + try: + extra = pmt.pmt_deserialize_str(extra_str) + except RuntimeError: + self.assertFalse() + + extra_info = parse_file_metadata.parse_extra_dict(extra, info, False) + + self.assertEqual(info['rx_rate'], samp_rate) + self.assertEqual(pmt.pmt_to_double(extra_info['samp_rate']), samp_rate) + + + # Test file metadata source + src.rewind() + fsrc = blocks.file_meta_source(outfile, False) + vsnk = gr.vector_sink_c() + tsnk = gr.tag_debug(gr.sizeof_gr_complex, "QA") + ssnk = gr.vector_sink_c() + self.tb.disconnect(src, fsnk) + self.tb.connect(fsrc, vsnk) + self.tb.connect(fsrc, tsnk) + self.tb.connect(src, ssnk) + self.tb.run() + + # Test to make sure tags with 'samp_rate' and 'rx_rate' keys + # were generated and received correctly. + tags = tsnk.current_tags() + for t in tags: + if(pmt.pmt_eq(t.key, pmt.pmt_intern("samp_rate"))): + self.assertEqual(pmt.pmt_to_double(t.value), samp_rate) + elif(pmt.pmt_eq(t.key, pmt.pmt_intern("rx_rate"))): + self.assertEqual(pmt.pmt_to_double(t.value), samp_rate) + + # Test that the data portion was extracted and received correctly. + self.assertComplexTuplesAlmostEqual(vsnk.data(), ssnk.data(), 5) + + os.remove(outfile) + + def test_002(self): + N = 1000 + outfile = "test_out.dat" + outfile_hdr = "test_out.dat.hdr" + + detached = True + samp_rate = 200000 + key = pmt.pmt_intern("samp_rate") + val = pmt.pmt_from_double(samp_rate) + extras = pmt.pmt_make_dict() + extras = pmt.pmt_dict_add(extras, key, val) + extras_str = pmt.pmt_serialize_str(extras) + + data = sig_source_c(samp_rate, 1000, 1, N) + src = gr.vector_source_c(data) + fsnk = blocks.file_meta_sink(gr.sizeof_gr_complex, outfile, + samp_rate, 1, + blocks.GR_FILE_FLOAT, True, + 1000000, extras_str, detached) + fsnk.set_unbuffered(True) + + self.tb.connect(src, fsnk) + self.tb.run() + fsnk.close() + + # Open detached header for reading + handle = open(outfile_hdr, "rb") + header_str = handle.read(parse_file_metadata.HEADER_LENGTH) + if(len(header_str) == 0): + self.assertFalse() + + try: + header = pmt.pmt_deserialize_str(header_str) + except RuntimeError: + self.assertFalse() + + info = parse_file_metadata.parse_header(header, False) + + extra_str = handle.read(info["extra_len"]) + self.assertGreater(len(extra_str), 0) + handle.close() + + try: + extra = pmt.pmt_deserialize_str(extra_str) + except RuntimeError: + self.assertFalse() + + extra_info = parse_file_metadata.parse_extra_dict(extra, info, False) + + self.assertEqual(info['rx_rate'], samp_rate) + self.assertEqual(pmt.pmt_to_double(extra_info['samp_rate']), samp_rate) + + + # Test file metadata source + src.rewind() + fsrc = blocks.file_meta_source(outfile, False, detached, outfile_hdr) + vsnk = gr.vector_sink_c() + tsnk = gr.tag_debug(gr.sizeof_gr_complex, "QA") + ssnk = gr.vector_sink_c() + self.tb.disconnect(src, fsnk) + self.tb.connect(fsrc, vsnk) + self.tb.connect(fsrc, tsnk) + self.tb.connect(src, ssnk) + self.tb.run() + + # Test to make sure tags with 'samp_rate' and 'rx_rate' keys + # were generated and received correctly. + tags = tsnk.current_tags() + for t in tags: + if(pmt.pmt_eq(t.key, pmt.pmt_intern("samp_rate"))): + self.assertEqual(pmt.pmt_to_double(t.value), samp_rate) + elif(pmt.pmt_eq(t.key, pmt.pmt_intern("rx_rate"))): + self.assertEqual(pmt.pmt_to_double(t.value), samp_rate) + + # Test that the data portion was extracted and received correctly. + self.assertComplexTuplesAlmostEqual(vsnk.data(), ssnk.data(), 5) + + os.remove(outfile) + os.remove(outfile_hdr) + +if __name__ == '__main__': + gr_unittest.run(test_file_metadata, "test_file_metadata.xml") diff --git a/gr-blocks/python/qa_integrate.py b/gr-blocks/python/qa_integrate.py new file mode 100755 index 000000000..c404f1b30 --- /dev/null +++ b/gr-blocks/python/qa_integrate.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +# +# Copyright 2008,2010 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig +import math + +class test_integrate (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_000_ss(self): + src_data = (1, 2, 3, 4, 5, 6) + dst_data = (6, 15) + src = gr.vector_source_s(src_data) + itg = blocks_swig.integrate_ss(3) + dst = gr.vector_sink_s() + self.tb.connect(src, itg, dst) + self.tb.run() + self.assertEqual(dst_data, dst.data()) + + def test_001_ii(self): + src_data = (1, 2, 3, 4, 5, 6) + dst_data = (6, 15) + src = gr.vector_source_i(src_data) + itg = blocks_swig.integrate_ii(3) + dst = gr.vector_sink_i() + self.tb.connect(src, itg, dst) + self.tb.run() + self.assertEqual(dst_data, dst.data()) + + def test_002_ff(self): + src_data = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0] + dst_data = [6.0, 15.0] + src = gr.vector_source_f(src_data) + itg = blocks_swig.integrate_ff(3) + dst = gr.vector_sink_f() + self.tb.connect(src, itg, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(dst_data, dst.data(), 6) + + def test_003_cc(self): + src_data = [1.0+1.0j, 2.0+2.0j, 3.0+3.0j, 4.0+4.0j, 5.0+5.0j, 6.0+6.0j] + dst_data = [6.0+6.0j, 15.0+15.0j] + src = gr.vector_source_c(src_data) + itg = blocks_swig.integrate_cc(3) + dst = gr.vector_sink_c() + self.tb.connect(src, itg, dst) + self.tb.run() + self.assertComplexTuplesAlmostEqual(dst_data, dst.data(), 6) + +if __name__ == '__main__': + gr_unittest.run(test_integrate, "test_integrate.xml") diff --git a/gr-blocks/python/qa_interleave.py b/gr-blocks/python/qa_interleave.py new file mode 100755 index 000000000..376d487b1 --- /dev/null +++ b/gr-blocks/python/qa_interleave.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +# +# Copyright 2004,2007,2010,2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig +import math + +class test_interleave (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_int_001 (self): + lenx = 64 + src0 = gr.vector_source_f (range (0, lenx, 4)) + src1 = gr.vector_source_f (range (1, lenx, 4)) + src2 = gr.vector_source_f (range (2, lenx, 4)) + src3 = gr.vector_source_f (range (3, lenx, 4)) + op = blocks_swig.interleave (gr.sizeof_float) + dst = gr.vector_sink_f () + + self.tb.connect (src0, (op, 0)) + self.tb.connect (src1, (op, 1)) + self.tb.connect (src2, (op, 2)) + self.tb.connect (src3, (op, 3)) + self.tb.connect (op, dst) + self.tb.run () + expected_result = tuple (range (lenx)) + result_data = dst.data () + self.assertFloatTuplesAlmostEqual (expected_result, result_data) + + def test_deint_001 (self): + lenx = 64 + src = gr.vector_source_f (range (lenx)) + op = blocks_swig.deinterleave (gr.sizeof_float) + dst0 = gr.vector_sink_f () + dst1 = gr.vector_sink_f () + dst2 = gr.vector_sink_f () + dst3 = gr.vector_sink_f () + + self.tb.connect (src, op) + self.tb.connect ((op, 0), dst0) + self.tb.connect ((op, 1), dst1) + self.tb.connect ((op, 2), dst2) + self.tb.connect ((op, 3), dst3) + self.tb.run () + + expected_result0 = tuple (range (0, lenx, 4)) + expected_result1 = tuple (range (1, lenx, 4)) + expected_result2 = tuple (range (2, lenx, 4)) + expected_result3 = tuple (range (3, lenx, 4)) + + self.assertFloatTuplesAlmostEqual (expected_result0, dst0.data ()) + self.assertFloatTuplesAlmostEqual (expected_result1, dst1.data ()) + self.assertFloatTuplesAlmostEqual (expected_result2, dst2.data ()) + self.assertFloatTuplesAlmostEqual (expected_result3, dst3.data ()) + +if __name__ == '__main__': + gr_unittest.run(test_interleave, "test_interleave.xml") + diff --git a/gr-blocks/python/qa_keep_m_in_n.py b/gr-blocks/python/qa_keep_m_in_n.py new file mode 100755 index 000000000..0898217ba --- /dev/null +++ b/gr-blocks/python/qa_keep_m_in_n.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# +# Copyright 2008,2010,2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig +import sys +import random + +class test_keep_m_in_n(gr_unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_001(self): + self.maxDiff = None; + tb = gr.top_block() + src = gr.vector_source_b( range(0,100) ) + + # itemsize, M, N, offset + km2 = blocks_swig.keep_m_in_n( 1, 1, 2, 0 ); + km3 = blocks_swig.keep_m_in_n( 1, 1, 3, 1 ); + km7 = blocks_swig.keep_m_in_n( 1, 1, 7, 2 ); + snk2 = gr.vector_sink_b(); + snk3 = gr.vector_sink_b(); + snk7 = gr.vector_sink_b(); + tb.connect(src,km2,snk2); + tb.connect(src,km3,snk3); + tb.connect(src,km7,snk7); + tb.run(); + + self.assertEqual(range(0,100,2), list(snk2.data())); + self.assertEqual(range(1,100,3), list(snk3.data())); + self.assertEqual(range(2,100,7), list(snk7.data())); + + +if __name__ == '__main__': + gr_unittest.run(test_keep_m_in_n, "test_keep_m_in_n.xml") + diff --git a/gr-blocks/python/qa_keep_one_in_n.py b/gr-blocks/python/qa_keep_one_in_n.py new file mode 100755 index 000000000..8c5f44b84 --- /dev/null +++ b/gr-blocks/python/qa_keep_one_in_n.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig + +class test_keep_one_in_n(gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001(self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + expected_data = (5, 10) + src = gr.vector_source_b(src_data); + op = blocks_swig.keep_one_in_n(gr.sizeof_char, 5) + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(dst.data(), expected_data) + + +if __name__ == '__main__': + gr_unittest.run(test_keep_one_in_n, "test_integrate.xml") diff --git a/gr-blocks/python/qa_multiply_conjugate.py b/gr-blocks/python/qa_multiply_conjugate.py new file mode 100644 index 000000000..f51563f85 --- /dev/null +++ b/gr-blocks/python/qa_multiply_conjugate.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig + +class test_multiply_conjugate (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_000 (self): + src_data0 = (-2-2j, -1-1j, -2+2j, -1+1j, + 2-2j, 1-1j, 2+2j, 1+1j, + 0+0j) + src_data1 = (-3-3j, -4-4j, -3+3j, -4+4j, + 3-3j, 4-4j, 3+3j, 4+4j, + 0+0j) + + exp_data = (12+0j, 8+0j, 12+0j, 8+0j, + 12+0j, 8+0j, 12+0j, 8+0j, + 0+0j) + src0 = gr.vector_source_c(src_data0) + src1 = gr.vector_source_c(src_data1) + op = blocks_swig.multiply_conjugate_cc () + dst = gr.vector_sink_c () + + self.tb.connect(src0, (op,0)) + self.tb.connect(src1, (op,1)) + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data () + self.assertEqual (exp_data, result_data) + +if __name__ == '__main__': + gr_unittest.run(test_multiply_conjugate, "test_multiply_conjugate.xml") diff --git a/gr-blocks/python/qa_nlog10.py b/gr-blocks/python/qa_nlog10.py new file mode 100755 index 000000000..cc2a3e8cc --- /dev/null +++ b/gr-blocks/python/qa_nlog10.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python +# +# Copyright 2005,2007,2010,2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig + +class test_nlog10(gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001(self): + src_data = (-10, 0, 10, 100, 1000, 10000, 100000) + expected_result = (-180, -180, 10, 20, 30, 40, 50) + src = gr.vector_source_f(src_data) + op = blocks_swig.nlog10_ff(10) + dst = gr.vector_sink_f() + self.tb.connect (src, op, dst) + self.tb.run() + result_data = dst.data() + self.assertFloatTuplesAlmostEqual (expected_result, result_data) + + +if __name__ == '__main__': + gr_unittest.run(test_nlog10, "test_nlog10.xml") + diff --git a/gr-blocks/python/qa_packed_to_unpacked.py b/gr-blocks/python/qa_packed_to_unpacked.py new file mode 100755 index 000000000..d84f5dbd3 --- /dev/null +++ b/gr-blocks/python/qa_packed_to_unpacked.py @@ -0,0 +1,344 @@ +#!/usr/bin/env python +# +# Copyright 2005,2007,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks +import random + +class test_packing(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block () + + def tearDown(self): + self.tb = None + + def test_001(self): + src_data = (0x80,) + expected_results = (1,0,0,0,0,0,0,0) + src = gr.vector_source_b(src_data, False) + op = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST) + dst = gr.vector_sink_b() + + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_002(self): + src_data = (0x80,) + expected_results = (0,0,0,0,0,0,0,1) + src = gr.vector_source_b(src_data, False) + op = blocks.packed_to_unpacked_bb(1, gr.GR_LSB_FIRST) + dst = gr.vector_sink_b() + + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_003(self): + src_data = (0x11,) + expected_results = (4, 2) + src = gr.vector_source_b(src_data, False) + op = blocks.packed_to_unpacked_bb(3, gr.GR_LSB_FIRST) + dst = gr.vector_sink_b() + + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_004(self): + src_data = (0x11,) + expected_results = (0, 4) + src = gr.vector_source_b(src_data, False) + op = blocks.packed_to_unpacked_bb(3, gr.GR_MSB_FIRST) + dst = gr.vector_sink_b() + + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_005(self): + src_data = (1,0,0,0,0,0,1,0,0,1,0,1,1,0,1,0) + expected_results = (0x82, 0x5a) + src = gr.vector_source_b(src_data, False) + op = blocks.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST) + dst = gr.vector_sink_b() + + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_006(self): + src_data = (0,1,0,0,0,0,0,1,0,1,0,1,1,0,1,0) + expected_results = (0x82, 0x5a) + src = gr.vector_source_b(src_data, False) + op = blocks.unpacked_to_packed_bb(1, gr.GR_LSB_FIRST) + dst = gr.vector_sink_b() + + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_007(self): + src_data = (4, 2, 0,0,0) + expected_results = (0x11,) + src = gr.vector_source_b(src_data, False) + op = blocks.unpacked_to_packed_bb(3, gr.GR_LSB_FIRST) + dst = gr.vector_sink_b() + + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_008(self): + src_data = (0, 4, 2,0,0) + expected_results = (0x11,) + src = gr.vector_source_b(src_data,False) + op = blocks.unpacked_to_packed_bb(3, gr.GR_MSB_FIRST) + dst = gr.vector_sink_b() + + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_009(self): + random.seed(0) + src_data = [] + for i in xrange(202): + src_data.append((random.randint(0,255))) + src_data = tuple(src_data) + expected_results = src_data + + src = gr.vector_source_b(tuple(src_data), False) + op1 = blocks.packed_to_unpacked_bb(3, gr.GR_MSB_FIRST) + op2 = blocks.unpacked_to_packed_bb(3, gr.GR_MSB_FIRST) + dst = gr.vector_sink_b() + + self.tb.connect(src, op1, op2) + self.tb.connect(op2, dst) + self.tb.run() + + self.assertEqual(expected_results[0:201], dst.data()) + + def test_010(self): + random.seed(0) + src_data = [] + for i in xrange(56): + src_data.append((random.randint(0,255))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_b(tuple(src_data), False) + op1 = blocks.packed_to_unpacked_bb(7, gr.GR_MSB_FIRST) + op2 = blocks.unpacked_to_packed_bb(7, gr.GR_MSB_FIRST) + dst = gr.vector_sink_b() + + self.tb.connect(src, op1, op2) + self.tb.connect(op2, dst) + self.tb.run() + + self.assertEqual(expected_results[0:201], dst.data()) + + def test_011(self): + random.seed(0) + src_data = [] + for i in xrange(56): + src_data.append((random.randint(0,255))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_b(tuple(src_data),False) + op1 = blocks.packed_to_unpacked_bb(7, gr.GR_LSB_FIRST) + op2 = blocks.unpacked_to_packed_bb(7, gr.GR_LSB_FIRST) + dst = gr.vector_sink_b() + + self.tb.connect(src, op1, op2) + self.tb.connect(op2, dst) + self.tb.run() + + self.assertEqual(expected_results[0:201], dst.data()) + + + # tests on shorts + def test_100a(self): + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**15,2**15-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_s(tuple(src_data), False) + op1 = blocks.packed_to_unpacked_ss(1, gr.GR_MSB_FIRST) + op2 = blocks.unpacked_to_packed_ss(1, gr.GR_MSB_FIRST) + dst = gr.vector_sink_s() + + self.tb.connect(src, op1, op2) + self.tb.connect(op2, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_100b(self): + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**15,2**15-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_s(tuple(src_data), False) + op1 = blocks.packed_to_unpacked_ss(1, gr.GR_LSB_FIRST) + op2 = blocks.unpacked_to_packed_ss(1, gr.GR_LSB_FIRST) + dst = gr.vector_sink_s() + + self.tb.connect(src, op1, op2) + self.tb.connect(op2, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_101a(self): + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**15,2**15-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_s(tuple(src_data), False) + op1 = blocks.packed_to_unpacked_ss(8, gr.GR_MSB_FIRST) + op2 = blocks.unpacked_to_packed_ss(8, gr.GR_MSB_FIRST) + dst = gr.vector_sink_s() + + self.tb.connect(src, op1, op2) + self.tb.connect(op2, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_101b(self): + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**15,2**15-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_s(tuple(src_data), False) + op1 = blocks.packed_to_unpacked_ss(8, gr.GR_LSB_FIRST) + op2 = blocks.unpacked_to_packed_ss(8, gr.GR_LSB_FIRST) + dst = gr.vector_sink_s() + + self.tb.connect(src, op1, op2) + self.tb.connect(op2, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + + # tests on ints + def test_200a(self): + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**31,2**31-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_i(tuple(src_data), False) + op1 = blocks.packed_to_unpacked_ii(1, gr.GR_MSB_FIRST) + op2 = blocks.unpacked_to_packed_ii(1, gr.GR_MSB_FIRST) + dst = gr.vector_sink_i() + + self.tb.connect(src, op1, op2) + self.tb.connect(op2, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_200b(self): + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**31,2**31-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_i(tuple(src_data), False) + op1 = blocks.packed_to_unpacked_ii(1, gr.GR_LSB_FIRST) + op2 = blocks.unpacked_to_packed_ii(1, gr.GR_LSB_FIRST) + dst = gr.vector_sink_i() + + self.tb.connect(src, op1, op2) + self.tb.connect(op2, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_201a(self): + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**31,2**31-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_i(tuple(src_data), False) + op1 = blocks.packed_to_unpacked_ii(8, gr.GR_MSB_FIRST) + op2 = blocks.unpacked_to_packed_ii(8, gr.GR_MSB_FIRST) + dst = gr.vector_sink_i() + + self.tb.connect(src, op1, op2) + self.tb.connect(op2, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + + def test_201b(self): + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**31,2**31-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_i(tuple(src_data), False) + op1 = blocks.packed_to_unpacked_ii(8, gr.GR_LSB_FIRST) + op2 = blocks.unpacked_to_packed_ii(8, gr.GR_LSB_FIRST) + dst = gr.vector_sink_i() + + self.tb.connect(src, op1, op2) + self.tb.connect(op2, dst) + self.tb.run() + + self.assertEqual(expected_results, dst.data()) + +if __name__ == '__main__': + gr_unittest.run(test_packing, "test_packing.xml") + diff --git a/gr-blocks/python/qa_patterned_interleaver.py b/gr-blocks/python/qa_patterned_interleaver.py new file mode 100755 index 000000000..3cf29c917 --- /dev/null +++ b/gr-blocks/python/qa_patterned_interleaver.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +# +# Copyright 2008,2010 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +try: + import blocks_swig as blocks +except: + from gnuradio import blocks +import math + +class test_patterned_interleaver (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_000(self): + dst_data = [0,0,1,2,0,2,1,0]; + src0 = gr.vector_source_f(200*[0]) + src1 = gr.vector_source_f(200*[1]) + src2 = gr.vector_source_f(200*[2]) + itg = blocks.patterned_interleaver(gr.sizeof_float, dst_data) + dst = gr.vector_sink_f() + head = gr.head(gr.sizeof_float, 8); + + self.tb.connect( src0, (itg,0) ); + self.tb.connect( src1, (itg,1) ); + self.tb.connect( src2, (itg,2) ); + self.tb.connect( itg, head, dst ); + + self.tb.run() + self.assertEqual(list(dst_data), list(dst.data())) + +if __name__ == '__main__': + gr_unittest.run(test_patterned_interleaver, "test_patterned_interleaver.xml") diff --git a/gr-blocks/python/qa_peak_detector2.py b/gr-blocks/python/qa_peak_detector2.py new file mode 100644 index 000000000..4b864e4d7 --- /dev/null +++ b/gr-blocks/python/qa_peak_detector2.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# +# Copyright 2007,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks + +class test_peak_detector2(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_regen1(self): + tb = self.tb + + data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 9, 8, 7, 6, 5, 4, 3, 2, 1, 0] + + expected_result = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + + + src = gr.vector_source_f(data, False) + regen = blocks.peak_detector2_fb() + dst = gr.vector_sink_b() + + tb.connect(src, regen) + tb.connect(regen, dst) + tb.run() + + dst_data = dst.data() + print dst_data + + self.assertEqual(expected_result, dst_data) + +if __name__ == '__main__': + gr_unittest.run(test_peak_detector2, "test_peak_detector2.xml") diff --git a/gr-blocks/python/qa_pipe_fittings.py b/gr-blocks/python/qa_pipe_fittings.py new file mode 100755 index 000000000..321660d5e --- /dev/null +++ b/gr-blocks/python/qa_pipe_fittings.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python +# +# Copyright 2005,2007,2010,2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig + +def calc_expected_result(src_data, n): + assert (len(src_data) % n) == 0 + result = [list() for x in range(n)] + #print "len(result) =", len(result) + for i in xrange(len(src_data)): + (result[i % n]).append(src_data[i]) + return [tuple(x) for x in result] + + +class test_pipe_fittings(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block () + + def tearDown(self): + self.tb = None + + def test_001(self): + """ + Test stream_to_streams. + """ + n = 8 + src_len = n * 8 + src_data = range(src_len) + + expected_results = calc_expected_result(src_data, n) + #print "expected results: ", expected_results + src = gr.vector_source_i(src_data) + op = gr.stream_to_streams(gr.sizeof_int, n) + self.tb.connect(src, op) + + dsts = [] + for i in range(n): + dst = gr.vector_sink_i() + self.tb.connect((op, i), (dst, 0)) + dsts.append(dst) + + self.tb.run() + + for d in range(n): + self.assertEqual(expected_results[d], dsts[d].data()) + + def test_002(self): + + # Test streams_to_stream (using stream_to_streams). + + n = 8 + src_len = n * 8 + src_data = tuple(range(src_len)) + expected_results = src_data + + src = gr.vector_source_i(src_data) + op1 = gr.stream_to_streams(gr.sizeof_int, n) + op2 = gr.streams_to_stream(gr.sizeof_int, n) + dst = gr.vector_sink_i() + + self.tb.connect(src, op1) + for i in range(n): + self.tb.connect((op1, i), (op2, i)) + self.tb.connect(op2, dst) + + self.tb.run() + self.assertEqual(expected_results, dst.data()) + + def test_003(self): + + #Test streams_to_vector (using stream_to_streams & vector_to_stream). + + n = 8 + src_len = n * 8 + src_data = tuple(range(src_len)) + expected_results = src_data + + src = gr.vector_source_i(src_data) + op1 = gr.stream_to_streams(gr.sizeof_int, n) + op2 = gr.streams_to_vector(gr.sizeof_int, n) + op3 = gr.vector_to_stream(gr.sizeof_int, n) + dst = gr.vector_sink_i() + + self.tb.connect(src, op1) + for i in range(n): + self.tb.connect((op1, i), (op2, i)) + self.tb.connect(op2, op3, dst) + + self.tb.run() + self.assertEqual(expected_results, dst.data()) + + def test_004(self): + + #Test vector_to_streams. + + n = 8 + src_len = n * 8 + src_data = tuple(range(src_len)) + expected_results = src_data + + src = gr.vector_source_i(src_data) + op1 = gr.stream_to_vector(gr.sizeof_int, n) + op2 = gr.vector_to_streams(gr.sizeof_int, n) + op3 = gr.streams_to_stream(gr.sizeof_int, n) + dst = gr.vector_sink_i() + + self.tb.connect(src, op1, op2) + for i in range(n): + self.tb.connect((op2, i), (op3, i)) + self.tb.connect(op3, dst) + + self.tb.run() + self.assertEqual(expected_results, dst.data()) + + +if __name__ == '__main__': + gr_unittest.run(test_pipe_fittings, "test_pipe_fittings.xml") diff --git a/gr-blocks/python/qa_regenerate.py b/gr-blocks/python/qa_regenerate.py new file mode 100755 index 000000000..a57eeba2b --- /dev/null +++ b/gr-blocks/python/qa_regenerate.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# +# Copyright 2007,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks + +class test_regenerate(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_regen1(self): + tb = self.tb + + data = [0, 0, 0, + 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + + expected_result = (0, 0, 0, + 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, + 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + + + src = gr.vector_source_b(data, False) + regen = blocks.regenerate_bb(5, 2) + dst = gr.vector_sink_b() + + tb.connect(src, regen) + tb.connect(regen, dst) + tb.run() + + dst_data = dst.data() + + self.assertEqual(expected_result, dst_data) + + def test_regen2(self): + tb = self.tb + + data = 200*[0,] + data[9] = 1 + data[99] = 1 + + expected_result = 200*[0,] + expected_result[9] = 1 + expected_result[19] = 1 + expected_result[29] = 1 + expected_result[39] = 1 + + expected_result[99] = 1 + expected_result[109] = 1 + expected_result[119] = 1 + expected_result[129] = 1 + + src = gr.vector_source_b(data, False) + regen = blocks.regenerate_bb(10, 3) + dst = gr.vector_sink_b() + + tb.connect(src, regen) + tb.connect(regen, dst) + tb.run () + + dst_data = dst.data() + + self.assertEqual(tuple(expected_result), dst_data) + + +if __name__ == '__main__': + gr_unittest.run(test_regenerate, "test_regenerate.xml") diff --git a/gr-blocks/python/qa_repeat.py b/gr-blocks/python/qa_repeat.py new file mode 100755 index 000000000..69fb3ef72 --- /dev/null +++ b/gr-blocks/python/qa_repeat.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +# +# Copyright 2008,2010,2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig +import math + +class test_repeat (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_float(self): + src_data = [n*1.0 for n in range(100)]; + dst_data = [] + for n in range(100): + dst_data += [1.0*n, 1.0*n, 1.0*n] + + src = gr.vector_source_f(src_data) + rpt = blocks_swig.repeat(gr.sizeof_float, 3) + dst = gr.vector_sink_f() + self.tb.connect(src, rpt, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(dst_data, dst.data(), 6) + +if __name__ == '__main__': + gr_unittest.run(test_repeat, "test_repeat.xml") diff --git a/gr-blocks/python/qa_rms.py b/gr-blocks/python/qa_rms.py new file mode 100644 index 000000000..f3386668a --- /dev/null +++ b/gr-blocks/python/qa_rms.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks +import math + +def sig_source_f(samp_rate, freq, amp, N): + t = map(lambda x: float(x)/samp_rate, xrange(N)) + y = map(lambda x: amp*math.cos(2.*math.pi*freq*x), t) + return y + +def sig_source_c(samp_rate, freq, amp, N): + t = map(lambda x: float(x)/samp_rate, xrange(N)) + y = map(lambda x: amp*math.cos(2.*math.pi*freq*x) + \ + 1j*amp*math.sin(2.*math.pi*freq*x), t) + return y + +class test_rms(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_ff(self): + amp = 2 + src_data = sig_source_f(1, 0.01, amp, 200) + N = 750000 + + expected_data = amp/math.sqrt(2.0) + + src = gr.vector_source_f(src_data, True) + head = gr.head(gr.sizeof_float, N) + op = blocks.rms_ff(0.0001) + dst = gr.vector_sink_f() + + self.tb.connect(src, head, op, dst) + self.tb.run() + + dst_data = dst.data() + self.assertAlmostEqual(dst_data[-1], expected_data, 4) + + def test_cf(self): + amp = 4 + src_data = sig_source_c(1, 0.01, amp, 200) + N = 750000 + + expected_data = amp + + src = gr.vector_source_c(src_data, True) + head = gr.head(gr.sizeof_gr_complex, N) + op = blocks.rms_cf(0.0001) + dst = gr.vector_sink_f() + + self.tb.connect(src, head, op, dst) + self.tb.run() + + dst_data = dst.data() + self.assertAlmostEqual(dst_data[-1], expected_data, 4) + +if __name__ == '__main__': + gr_unittest.run(test_rms, "test_rms.xml") diff --git a/gr-blocks/python/qa_stream_mux.py b/gr-blocks/python/qa_stream_mux.py new file mode 100755 index 000000000..f21a9bbbc --- /dev/null +++ b/gr-blocks/python/qa_stream_mux.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python +# +# Copyright 2004,2005,2007,2010,2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig + +class test_stream_mux (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def help_stream_2ff(self, N, stream_sizes): + v0 = gr.vector_source_f(N*[1,], False) + v1 = gr.vector_source_f(N*[2,], False) + + mux = blocks_swig.stream_mux(gr.sizeof_float, stream_sizes) + + dst = gr.vector_sink_f () + + self.tb.connect (v0, (mux,0)) + self.tb.connect (v1, (mux,1)) + self.tb.connect (mux, dst) + self.tb.run () + + return dst.data () + + def help_stream_ramp_2ff(self, N, stream_sizes): + r1 = range(N) + r2 = range(N) + r2.reverse() + + v0 = gr.vector_source_f(r1, False) + v1 = gr.vector_source_f(r2, False) + + mux = blocks_swig.stream_mux(gr.sizeof_float, stream_sizes) + + dst = gr.vector_sink_f () + + self.tb.connect (v0, (mux,0)) + self.tb.connect (v1, (mux,1)) + self.tb.connect (mux, dst) + self.tb.run () + + return dst.data () + + def test_stream_2NN_ff(self): + N = 40 + stream_sizes = [10, 10] + result_data = self.help_stream_2ff(N, stream_sizes) + + exp_data = (1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0) + self.assertEqual (exp_data, result_data) + + def test_stream_ramp_2NN_ff(self): + N = 40 + stream_sizes = [10, 10] + result_data = self.help_stream_ramp_2ff(N, stream_sizes) + + exp_data = ( 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + 39.0, 38.0, 37.0, 36.0, 35.0, 34.0, 33.0, 32.0, 31.0, 30.0, + 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, + 29.0, 28.0, 27.0, 26.0, 25.0, 24.0, 23.0, 22.0, 21.0, 20.0, + 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, + 19.0, 18.0, 17.0, 16.0, 15.0, 14.0, 13.0, 12.0, 11.0, 10.0, + 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, + 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0) + self.assertEqual (exp_data, result_data) + + def test_stream_2NM_ff(self): + N = 40 + stream_sizes = [7, 9] + self.help_stream_2ff(N, stream_sizes) + + result_data = self.help_stream_2ff(N, stream_sizes) + + exp_data = (1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0) + + self.assertEqual (exp_data, result_data) + + + def test_stream_2MN_ff(self): + N = 37 + stream_sizes = [7, 9] + self.help_stream_2ff(N, stream_sizes) + + result_data = self.help_stream_2ff(N, stream_sizes) + + exp_data = (1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0) + + self.assertEqual (exp_data, result_data) + + def test_stream_2N0_ff(self): + N = 30 + stream_sizes = [7, 0] + self.help_stream_2ff(N, stream_sizes) + + result_data = self.help_stream_2ff(N, stream_sizes) + + exp_data = (1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0) + + self.assertEqual (exp_data, result_data) + + def test_stream_20N_ff(self): + N = 30 + stream_sizes = [0, 9] + self.help_stream_2ff(N, stream_sizes) + + result_data = self.help_stream_2ff(N, stream_sizes) + + exp_data = (2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0) + + self.assertEqual (exp_data, result_data) + +if __name__ == '__main__': + gr_unittest.run(test_stream_mux, "test_stream_mux.xml") diff --git a/gr-blocks/python/qa_stretch.py b/gr-blocks/python/qa_stretch.py new file mode 100755 index 000000000..013d878a8 --- /dev/null +++ b/gr-blocks/python/qa_stretch.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks + +class test_stretch(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_stretch_01(self): + tb = self.tb + + data = 10*[1,] + data0 = map(lambda x: x/20.0, data) + data1 = map(lambda x: x/10.0, data) + + expected_result0 = 10*[0.05,] + expected_result1 = 10*[0.1,] + + src0 = gr.vector_source_f(data0, False) + src1 = gr.vector_source_f(data1, False) + inter = gr.streams_to_vector(gr.sizeof_float, 2) + op = blocks.stretch_ff(0.1, 2) + deinter = gr.vector_to_streams(gr.sizeof_float, 2) + dst0 = gr.vector_sink_f() + dst1 = gr.vector_sink_f() + + tb.connect(src0, (inter,0)) + tb.connect(src1, (inter,1)) + tb.connect(inter, op) + tb.connect(op, deinter) + tb.connect((deinter,0), dst0) + tb.connect((deinter,1), dst1) + tb.run() + + dst0_data = dst0.data() + dst1_data = dst1.data() + + self.assertFloatTuplesAlmostEqual(expected_result0, dst0_data, 4) + self.assertFloatTuplesAlmostEqual(expected_result1, dst1_data, 4) + +if __name__ == '__main__': + gr_unittest.run(test_stretch, "test_stretch.xml") diff --git a/gr-blocks/python/qa_threshold.py b/gr-blocks/python/qa_threshold.py new file mode 100644 index 000000000..f91af739a --- /dev/null +++ b/gr-blocks/python/qa_threshold.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks + +class test_threshold(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_01(self): + tb = self.tb + + data = [0, 0, 0, 2, 2, 2, 2, 0, 0, 0, 2, 2, 2] + + expected_result = (0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1) + + src = gr.vector_source_f(data, False) + op = blocks.threshold_ff(1, 1) + dst = gr.vector_sink_f() + + tb.connect(src, op) + tb.connect(op, dst) + tb.run() + + dst_data = dst.data() + + self.assertEqual(expected_result, dst_data) + +if __name__ == '__main__': + gr_unittest.run(test_threshold, "test_threshold.xml") diff --git a/gr-blocks/python/qa_throttle.py b/gr-blocks/python/qa_throttle.py new file mode 100755 index 000000000..5d462f2c9 --- /dev/null +++ b/gr-blocks/python/qa_throttle.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks + +class test_throttle(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_01(self): + # Test that we can make the block + op = blocks.throttle(gr.sizeof_gr_complex, 1) + +if __name__ == '__main__': + gr_unittest.run(test_throttle, "test_throttle.xml") diff --git a/gr-blocks/python/qa_transcendental.py b/gr-blocks/python/qa_transcendental.py new file mode 100644 index 000000000..8174f7963 --- /dev/null +++ b/gr-blocks/python/qa_transcendental.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks +import math + +class test_transcendental(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_01(self): + tb = self.tb + + data = 100*[0,] + expected_result = 100*[1,] + + src = gr.vector_source_f(data, False) + op = blocks.transcendental("cos", "float") + dst = gr.vector_sink_f() + + tb.connect(src, op) + tb.connect(op, dst) + tb.run() + + dst_data = dst.data() + + self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5) + + def test_02(self): + tb = self.tb + + data = 100*[3,] + expected_result = 100*[math.log10(3),] + + src = gr.vector_source_f(data, False) + op = blocks.transcendental("log10", "float") + dst = gr.vector_sink_f() + + tb.connect(src, op) + tb.connect(op, dst) + tb.run() + + dst_data = dst.data() + + self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5) + + def test_03(self): + tb = self.tb + + data = 100*[3,] + expected_result = 100*[math.tanh(3),] + + src = gr.vector_source_f(data, False) + op = blocks.transcendental("tanh", "float") + dst = gr.vector_sink_f() + + tb.connect(src, op) + tb.connect(op, dst) + tb.run() + + dst_data = dst.data() + + self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5) + +if __name__ == '__main__': + gr_unittest.run(test_transcendental, "test_transcendental.xml") diff --git a/gr-blocks/python/qa_type_conversions.py b/gr-blocks/python/qa_type_conversions.py new file mode 100755 index 000000000..eb1b42b63 --- /dev/null +++ b/gr-blocks/python/qa_type_conversions.py @@ -0,0 +1,316 @@ +#!/usr/bin/env python +# +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig +from math import sqrt, atan2 + +class test_type_conversions(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_char_to_float_identity(self): + src_data = (1, 2, 3, 4, 5) + expected_data = (1.0, 2.0, 3.0, 4.0, 5.0) + src = gr.vector_source_b(src_data) + op = blocks_swig.char_to_float() + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) + + def test_char_to_float_scale(self): + src_data = (1, 2, 3, 4, 5) + expected_data = (0.5, 1.0, 1.5, 2.0, 2.5) + src = gr.vector_source_b(src_data) + op = blocks_swig.char_to_float(scale=2.0) + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) + + def test_char_to_short(self): + src_data = (1, 2, 3, 4, 5) + expected_data = (256, 512, 768, 1024, 1280) + src = gr.vector_source_b(src_data) + op = blocks_swig.char_to_short() + dst = gr.vector_sink_s() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_complex_to_interleaved_short(self): + src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + expected_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + src = gr.vector_source_c(src_data) + op = blocks_swig.complex_to_interleaved_short() + dst = gr.vector_sink_s() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_complex_to_float_1(self): + src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + expected_data = (1.0, 3.0, 5.0, 7.0, 9.0) + src = gr.vector_source_c(src_data) + op = blocks_swig.complex_to_float() + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) + + def test_complex_to_float_2(self): + src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + expected_data1 = (1.0, 3.0, 5.0, 7.0, 9.0) + expected_data2 = (2.0, 4.0, 6.0, 8.0, 10.0) + src = gr.vector_source_c(src_data) + op = blocks_swig.complex_to_float() + dst1 = gr.vector_sink_f() + dst2 = gr.vector_sink_f() + self.tb.connect(src, op) + self.tb.connect((op, 0), dst1) + self.tb.connect((op, 1), dst2) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data1, dst1.data()) + self.assertFloatTuplesAlmostEqual(expected_data2, dst2.data()) + + def test_complex_to_real(self): + src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + expected_data = (1.0, 3.0, 5.0, 7.0, 9.0) + src = gr.vector_source_c(src_data) + op = blocks_swig.complex_to_real() + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) + + def test_complex_to_imag(self): + src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + expected_data = (2.0, 4.0, 6.0, 8.0, 10.0) + src = gr.vector_source_c(src_data) + op = blocks_swig.complex_to_imag() + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) + + def test_complex_to_mag(self): + src_data = (1+2j, 3-4j, 5+6j, 7-8j, -9+10j) + expected_data = (sqrt(5), sqrt(25), sqrt(61), sqrt(113), sqrt(181)) + src = gr.vector_source_c(src_data) + op = blocks_swig.complex_to_mag() + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data, dst.data(), 5) + + def test_complex_to_mag_squared(self): + src_data = (1+2j, 3-4j, 5+6j, 7-8j, -9+10j) + expected_data = (5.0, 25.0, 61.0, 113.0, 181.0) + src = gr.vector_source_c(src_data) + op = blocks_swig.complex_to_mag_squared() + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) + + def test_complex_to_arg(self): + src_data = (1+2j, 3-4j, 5+6j, 7-8j, -9+10j) + expected_data = (atan2(2, 1), atan2(-4,3), atan2(6, 5), atan2(-8, 7), atan2(10,-9)) + src = gr.vector_source_c(src_data) + op = blocks_swig.complex_to_arg() + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data, dst.data(), 2) + + def test_float_to_char_identity(self): + src_data = (1.0, 2.0, 3.0, 4.0, 5.0) + expected_data = (1, 2, 3, 4, 5) + src = gr.vector_source_f(src_data) + op = blocks_swig.float_to_char() + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_float_to_char_scale(self): + src_data = (1.0, 2.0, 3.0, 4.0, 5.0) + expected_data = (5, 10, 15, 20, 25) + src = gr.vector_source_f(src_data) + op = blocks_swig.float_to_char(1, 5) + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_float_to_complex_1(self): + src_data = (1.0, 3.0, 5.0, 7.0, 9.0) + expected_data = (1+0j, 3+0j, 5+0j, 7+0j, 9+0j) + src = gr.vector_source_f(src_data) + op = blocks_swig.float_to_complex() + dst = gr.vector_sink_c() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) + + def test_float_to_complex_2(self): + src1_data = (1.0, 3.0, 5.0, 7.0, 9.0) + src2_data = (2.0, 4.0, 6.0, 8.0, 10.0) + expected_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + src1 = gr.vector_source_f(src1_data) + src2 = gr.vector_source_f(src2_data) + op = blocks_swig.float_to_complex() + dst = gr.vector_sink_c() + self.tb.connect(src1, (op, 0)) + self.tb.connect(src2, (op, 1)) + self.tb.connect(op, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) + + def test_float_to_int_identity(self): + src_data = (1.0, 2.0, 3.0, 4.0, 5.0) + expected_data = (1, 2, 3, 4, 5) + src = gr.vector_source_f(src_data) + op = blocks_swig.float_to_int() + dst = gr.vector_sink_i() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_float_to_int_scale(self): + src_data = (1.0, 2.0, 3.0, 4.0, 5.0) + expected_data = (5, 10, 15, 20, 25) + src = gr.vector_source_f(src_data) + op = blocks_swig.float_to_int(1, 5) + dst = gr.vector_sink_i() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_float_to_short_identity(self): + src_data = (1.0, 2.0, 3.0, 4.0, 5.0) + expected_data = (1, 2, 3, 4, 5) + src = gr.vector_source_f(src_data) + op = blocks_swig.float_to_short() + dst = gr.vector_sink_s() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_float_to_short_scale(self): + src_data = (1.0, 2.0, 3.0, 4.0, 5.0) + expected_data = (5, 10, 15, 20, 25) + src = gr.vector_source_f(src_data) + op = blocks_swig.float_to_short(1, 5) + dst = gr.vector_sink_s() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_float_to_uchar(self): + src_data = (1.0, -2.0, 3.0, -4.0, 256.0) + expected_data = (1, 0, 3, 0, 255) + src = gr.vector_source_f(src_data) + op = blocks_swig.float_to_uchar() + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_int_to_float_identity(self): + src_data = (1, 2, 3, 4, 5) + expected_data = (1.0, 2.0, 3.0, 4.0, 5.0) + src = gr.vector_source_i(src_data) + op = blocks_swig.int_to_float() + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) + + def test_int_to_float_scale(self): + src_data = (1, 2, 3, 4, 5) + expected_data = (0.2, 0.4, 0.6, 0.8, 1.0) + src = gr.vector_source_i(src_data) + op = blocks_swig.int_to_float(1, 5) + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) + + def test_interleaved_short_to_complex(self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + expected_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + src = gr.vector_source_s(src_data) + op = blocks_swig.interleaved_short_to_complex() + dst = gr.vector_sink_c() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_short_to_char(self): + src_data = (256, 512, 768, 1024, 1280) + expected_data = (1, 2, 3, 4, 5) + src = gr.vector_source_s(src_data) + op = blocks_swig.short_to_char() + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_short_to_float_identity(self): + src_data = (1, 2, 3, 4, 5) + expected_data = (1.0, 2.0, 3.0, 4.0, 5.0) + src = gr.vector_source_s(src_data) + op = blocks_swig.short_to_float() + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_short_to_float_scale(self): + src_data = (5, 10, 15, 20, 25) + expected_data = (1.0, 2.0, 3.0, 4.0, 5.0) + src = gr.vector_source_s(src_data) + op = blocks_swig.short_to_float(1, 5) + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + def test_uchar_to_float(self): + src_data = (1, 2, 3, 4, 5) + expected_data = (1.0, 2.0, 3.0, 4.0, 5.0) + src = gr.vector_source_b(src_data) + op = blocks_swig.uchar_to_float() + dst = gr.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_data, dst.data()) + + +if __name__ == '__main__': + gr_unittest.run(test_type_conversions, "test_type_conversions.xml") |