diff options
author | Tom Rondeau | 2012-05-02 18:59:23 -0400 |
---|---|---|
committer | Tom Rondeau | 2012-05-02 18:59:23 -0400 |
commit | 26531c2da601a1c21c50e3644350c604c27f8658 (patch) | |
tree | 1ddb4469afc96e508cc54b236c79657dd0c73363 /gr-filter/python | |
parent | 32f807a8c8f1bcadfd8f8ad4c5a46c1b099f8c8f (diff) | |
download | gnuradio-26531c2da601a1c21c50e3644350c604c27f8658.tar.gz gnuradio-26531c2da601a1c21c50e3644350c604c27f8658.tar.bz2 gnuradio-26531c2da601a1c21c50e3644350c604c27f8658.zip |
filter: fixed FIR filter taps and added complex FFT filter.
Diffstat (limited to 'gr-filter/python')
-rwxr-xr-x | gr-filter/python/qa_fft_filter.py | 380 | ||||
-rwxr-xr-x | gr-filter/python/qa_fir_filter.py | 22 |
2 files changed, 399 insertions, 3 deletions
diff --git a/gr-filter/python/qa_fft_filter.py b/gr-filter/python/qa_fft_filter.py new file mode 100755 index 000000000..33d3d870b --- /dev/null +++ b/gr-filter/python/qa_fft_filter.py @@ -0,0 +1,380 @@ +#!/usr/bin/env python +# +# Copyright 2004,2005,2007,2010,2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import filter_swig as filter +import sys +import random + +def make_random_complex_tuple(L): + result = [] + for x in range(L): + result.append(complex(random.uniform(-1000,1000), + random.uniform(-1000,1000))) + return tuple(result) + +def make_random_float_tuple(L): + result = [] + for x in range(L): + result.append(float(int(random.uniform(-1000,1000)))) + return tuple(result) + + +def reference_filter_ccc(dec, taps, input): + """ + compute result using conventional fir filter + """ + tb = gr.top_block() + #src = gr.vector_source_c(((0,) * (len(taps) - 1)) + input) + src = gr.vector_source_c(input) + op = filter.fir_filter_ccc(dec, taps) + dst = gr.vector_sink_c() + tb.connect(src, op, dst) + tb.run() + return dst.data() + +def reference_filter_fff(dec, taps, input): + """ + compute result using conventional fir filter + """ + tb = gr.top_block() + #src = gr.vector_source_f(((0,) * (len(taps) - 1)) + input) + src = gr.vector_source_f(input) + op = filter.fir_filter_fff(dec, taps) + dst = gr.vector_sink_f() + tb.connect(src, op, dst) + tb.run() + return dst.data() + + +def print_complex(x): + for i in x: + i = complex(i) + sys.stdout.write("(%6.3f,%6.3fj), " % (i.real, i.imag)) + sys.stdout.write('\n') + + +class test_fft_filter(gr_unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def assert_fft_ok2(self, expected_result, result_data): + expected_result = expected_result[:len(result_data)] + self.assertComplexTuplesAlmostEqual2 (expected_result, result_data, + abs_eps=1e-9, rel_eps=4e-4) + + def assert_fft_float_ok2(self, expected_result, result_data, abs_eps=1e-9, rel_eps=4e-4): + expected_result = expected_result[:len(result_data)] + self.assertFloatTuplesAlmostEqual2 (expected_result, result_data, + abs_eps, rel_eps) + + def test_ccc_001(self): + tb = gr.top_block() + src_data = (0,1,2,3,4,5,6,7) + taps = (1,) + expected_result = tuple([complex(x) for x in (0,1,2,3,4,5,6,7)]) + src = gr.vector_source_c(src_data) + op = gr.fft_filter_ccc(1, taps) + dst = gr.vector_sink_c() + tb.connect(src, op, dst) + tb.run() + result_data = dst.data() + #print 'expected:', expected_result + #print 'results: ', result_data + self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5) + + + def test_ccc_002(self): + # Test nthreads + tb = gr.top_block() + src_data = (0,1,2,3,4,5,6,7) + taps = (2,) + nthreads = 2 + expected_result = tuple([2 * complex(x) for x in (0,1,2,3,4,5,6,7)]) + src = gr.vector_source_c(src_data) + op = gr.fft_filter_ccc(1, taps, nthreads) + dst = gr.vector_sink_c() + tb.connect(src, op, dst) + tb.run() + result_data = dst.data() + #print 'expected:', expected_result + #print 'results: ', result_data + self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5) + + def test_ccc_003(self): + tb = gr.top_block() + src_data = (0,1,2,3,4,5,6,7) + taps = (2,) + expected_result = tuple([2 * complex(x) for x in (0,1,2,3,4,5,6,7)]) + src = gr.vector_source_c(src_data) + op = gr.fft_filter_ccc(1, taps) + dst = gr.vector_sink_c() + tb.connect(src, op, dst) + tb.run() + result_data = dst.data() + #print 'expected:', expected_result + #print 'results: ', result_data + self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5) + + + def test_ccc_004(self): + random.seed(0) + for i in xrange(25): + # sys.stderr.write("\n>>> Loop = %d\n" % (i,)) + src_len = 4*1024 + src_data = make_random_complex_tuple(src_len) + ntaps = int(random.uniform(2, 1000)) + taps = make_random_complex_tuple(ntaps) + expected_result = reference_filter_ccc(1, taps, src_data) + + src = gr.vector_source_c(src_data) + op = filter.fft_filter_ccc(1, taps) + dst = gr.vector_sink_c() + tb = gr.top_block() + tb.connect(src, op, dst) + tb.run() + result_data = dst.data() + del tb + self.assert_fft_ok2(expected_result, result_data) + + def test_ccc_005(self): + random.seed(0) + for i in xrange(25): + # sys.stderr.write("\n>>> Loop = %d\n" % (i,)) + dec = i + 1 + src_len = 4*1024 + src_data = make_random_complex_tuple(src_len) + ntaps = int(random.uniform(2, 100)) + taps = make_random_complex_tuple(ntaps) + expected_result = reference_filter_ccc(dec, taps, src_data) + + src = gr.vector_source_c(src_data) + op = gr.fft_filter_ccc(dec, taps) + dst = gr.vector_sink_c() + tb = gr.top_block() + tb.connect(src, op, dst) + tb.run() + del tb + result_data = dst.data() + + self.assert_fft_ok2(expected_result, result_data) + + def test_ccc_006(self): + # Test decimating with nthreads=2 + random.seed(0) + nthreads = 2 + for i in xrange(25): + # sys.stderr.write("\n>>> Loop = %d\n" % (i,)) + dec = i + 1 + src_len = 4*1024 + src_data = make_random_complex_tuple(src_len) + ntaps = int(random.uniform(2, 100)) + taps = make_random_complex_tuple(ntaps) + expected_result = reference_filter_ccc(dec, taps, src_data) + + src = gr.vector_source_c(src_data) + op = filter.fft_filter_ccc(dec, taps, nthreads) + dst = gr.vector_sink_c() + tb = gr.top_block() + tb.connect(src, op, dst) + tb.run() + del tb + result_data = dst.data() + + self.assert_fft_ok2(expected_result, result_data) + + # ---------------------------------------------------------------- + # test _fff version + # ---------------------------------------------------------------- + + def test_fff_001(self): + tb = gr.top_block() + src_data = (0,1,2,3,4,5,6,7) + taps = (1,) + expected_result = tuple([float(x) for x in (0,1,2,3,4,5,6,7)]) + src = gr.vector_source_f(src_data) + op = gr.fft_filter_fff(1, taps) + dst = gr.vector_sink_f() + tb.connect(src, op, dst) + tb.run() + result_data = dst.data() + #print 'expected:', expected_result + #print 'results: ', result_data + self.assertFloatTuplesAlmostEqual (expected_result, result_data, 5) + + + def test_fff_002(self): + tb = gr.top_block() + src_data = (0,1,2,3,4,5,6,7) + taps = (2,) + expected_result = tuple([2 * float(x) for x in (0,1,2,3,4,5,6,7)]) + src = gr.vector_source_f(src_data) + op = gr.fft_filter_fff(1, taps) + dst = gr.vector_sink_f() + tb.connect(src, op, dst) + tb.run() + result_data = dst.data() + #print 'expected:', expected_result + #print 'results: ', result_data + self.assertFloatTuplesAlmostEqual (expected_result, result_data, 5) + + def test_fff_003(self): + # Test 02 with nthreads + tb = gr.top_block() + src_data = (0,1,2,3,4,5,6,7) + taps = (2,) + nthreads = 2 + expected_result = tuple([2 * float(x) for x in (0,1,2,3,4,5,6,7)]) + src = gr.vector_source_f(src_data) + op = gr.fft_filter_fff(1, taps, nthreads) + dst = gr.vector_sink_f() + tb.connect(src, op, dst) + tb.run() + result_data = dst.data() + self.assertFloatTuplesAlmostEqual (expected_result, result_data, 5) + + def xtest_fff_004(self): + random.seed(0) + for i in xrange(25): + sys.stderr.write("\n>>> Loop = %d\n" % (i,)) + src_len = 4096 + src_data = make_random_float_tuple(src_len) + ntaps = int(random.uniform(2, 1000)) + taps = make_random_float_tuple(ntaps) + expected_result = reference_filter_fff(1, taps, src_data) + + src = gr.vector_source_f(src_data) + op = gr.fft_filter_fff(1, taps) + dst = gr.vector_sink_f() + tb = gr.top_block() + tb.connect(src, op, dst) + tb.run() + result_data = dst.data() + + #print "src_len =", src_len, " ntaps =", ntaps + try: + self.assert_fft_float_ok2(expected_result, result_data, abs_eps=1.0) + except: + expected = open('expected', 'w') + for x in expected_result: + expected.write(`x` + '\n') + actual = open('actual', 'w') + for x in result_data: + actual.write(`x` + '\n') + raise + + def xtest_fff_005(self): + random.seed(0) + for i in xrange(25): + sys.stderr.write("\n>>> Loop = %d\n" % (i,)) + src_len = 4*1024 + src_data = make_random_float_tuple(src_len) + ntaps = int(random.uniform(2, 1000)) + taps = make_random_float_tuple(ntaps) + expected_result = reference_filter_fff(1, taps, src_data) + + src = gr.vector_source_f(src_data) + op = gr.fft_filter_fff(1, taps) + dst = gr.vector_sink_f() + tb = gr.top_block() + tb.connect(src, op, dst) + tb.run() + result_data = dst.data() + + self.assert_fft_float_ok2(expected_result, result_data, abs_eps=2.0) + + def xtest_fff_006(self): + random.seed(0) + for i in xrange(25): + sys.stderr.write("\n>>> Loop = %d\n" % (i,)) + dec = i + 1 + src_len = 4*1024 + src_data = make_random_float_tuple(src_len) + ntaps = int(random.uniform(2, 100)) + taps = make_random_float_tuple(ntaps) + expected_result = reference_filter_fff(dec, taps, src_data) + + src = gr.vector_source_f(src_data) + op = gr.fft_filter_fff(dec, taps) + dst = gr.vector_sink_f() + tb = gr.top_block() + tb.connect(src, op, dst) + tb.run() + result_data = dst.data() + + self.assert_fft_float_ok2(expected_result, result_data) + + def xtest_fff_007(self): + # test decimation with nthreads + random.seed(0) + nthreads = 2 + for i in xrange(25): + sys.stderr.write("\n>>> Loop = %d\n" % (i,)) + dec = i + 1 + src_len = 4*1024 + src_data = make_random_float_tuple(src_len) + ntaps = int(random.uniform(2, 100)) + taps = make_random_float_tuple(ntaps) + expected_result = reference_filter_fff(dec, taps, src_data) + + src = gr.vector_source_f(src_data) + op = gr.fft_filter_fff(dec, taps, nthreads) + dst = gr.vector_sink_f() + tb = gr.top_block() + tb.connect(src, op, dst) + tb.run() + result_data = dst.data() + + self.assert_fft_float_ok2(expected_result, result_data) + + def test_fff_get0(self): + random.seed(0) + for i in xrange(25): + ntaps = int(random.uniform(2, 100)) + taps = make_random_float_tuple(ntaps) + + op = gr.fft_filter_fff(1, taps) + result_data = op.taps() + #print result_data + + self.assertEqual(taps, result_data) + + def test_ccc_get0(self): + random.seed(0) + for i in xrange(25): + ntaps = int(random.uniform(2, 100)) + taps = make_random_complex_tuple(ntaps) + + op = gr.fft_filter_ccc(1, taps) + result_data = op.taps() + #print result_data + + self.assertComplexTuplesAlmostEqual(taps, result_data, 4) + + +if __name__ == '__main__': + gr_unittest.run(test_fft_filter, "test_fft_filter.xml") + diff --git a/gr-filter/python/qa_fir_filter.py b/gr-filter/python/qa_fir_filter.py index 2c88e7830..f0f08afca 100755 --- a/gr-filter/python/qa_fir_filter.py +++ b/gr-filter/python/qa_fir_filter.py @@ -32,7 +32,7 @@ class test_filter(gr_unittest.TestCase): def test_fir_filter_fff_001(self): src_data = [1, 2, 3, 4] - expected_data = [0, 0.5, 1.5, 2.5] + expected_data = [0.5, 1.5, 2.5, 3.5] src = gr.vector_source_f(src_data) op = filter.fir_filter_fff(1, [0.5, 0.5]) dst = gr.vector_sink_f() @@ -43,7 +43,7 @@ class test_filter(gr_unittest.TestCase): def test_fir_filter_ccf_001(self): src_data = [1+1j, 2+2j, 3+3j, 4+4j] - expected_data = [0+0j, 0.5+0.5j, 1.5+1.5j, 2.5+2.5j] + expected_data = [0.5+0.5j, 1.5+1.5j, 2.5+2.5j, 3.5+3.5j] src = gr.vector_source_c(src_data) op = filter.fir_filter_ccf(1, [0.5, 0.5]) dst = gr.vector_sink_c() @@ -54,7 +54,7 @@ class test_filter(gr_unittest.TestCase): def test_fir_filter_ccc_001(self): src_data = [1+1j, 2+2j, 3+3j, 4+4j] - expected_data = [0+0j, -0.5+1.5j, -1.5+4.5j, -2.5+7.5j] + expected_data = [-0.5+1.5j, -1.5+4.5j, -2.5+7.5j, -3.5+10.5j] src = gr.vector_source_c(src_data) op = filter.fir_filter_ccc(1, [0.5+1j, 0.5+1j]) dst = gr.vector_sink_c() @@ -63,6 +63,22 @@ class test_filter(gr_unittest.TestCase): result_data = dst.data() self.assertComplexTuplesAlmostEqual(expected_data, result_data, 5) + + def test_fir_filter_ccc_002(self): + src_data = 10*[1+1j, 2+2j, 3+3j, 4+4j] + + # results derived from original gr.fir_filter_ccc + expected_data = ((7.537424837948042e-20+7.537424837948042e-20j), (9.131923434324563e-05+9.131923434324563e-05j), (0.0003317668742965907+0.0003317668742965907j), (0.0007230418268591166+0.0007230418268591166j), (0.0012087896466255188+0.0012087896466255188j), (0.0013292605290189385+0.0013292605290189385j), (0.001120875240303576+0.001120875240303576j), (0.000744672492146492+0.000744672492146492j), (0.000429437990533188+0.000429437990533188j), (2.283908543176949e-05+2.283908543176949e-05j), (-0.0002245186478830874-0.0002245186478830874j), (-0.0001157080550910905-0.0001157080550910905j), (0.00041409023106098175+0.00041409023106098175j), (0.0009017843985930085+0.0009017843985930085j), (0.0012520025484263897+0.0012520025484263897j), (0.0014116164529696107+0.0014116164529696107j), (0.001393353333696723+0.001393353333696723j), (0.000912194955162704+0.000912194955162704j), (0.00022649182938039303+0.00022649182938039303j), (-0.00031363096786662936-0.00031363096786662936j), (-0.0003966730728279799-0.0003966730728279799j), (-0.00023757052258588374-0.00023757052258588374j), (0.00021952332463115454+0.00021952332463115454j), (0.0009092430118471384+0.0009092430118471384j), (0.001662317430600524+0.001662317430600524j), (0.0019024648936465383+0.0019024648936465383j), (0.0015955769922584295+0.0015955769922584295j), (0.0009144138311967254+0.0009144138311967254j), (0.0001872836146503687+0.0001872836146503687j), (-0.000581968342885375-0.000581968342885375j), (-0.0009886166080832481-0.0009886166080832481j), (-0.0007480768254026771-0.0007480768254026771j), (0.00018211957649327815+0.00018211957649327815j), (0.0012042406015098095+0.0012042406015098095j), (0.0020200139842927456+0.0020200139842927456j), (0.0023816542234271765+0.0023816542234271765j), (0.002195809967815876+0.002195809967815876j), (0.0012113333214074373+0.0012113333214074373j), (-0.00014088614261709154-0.00014088614261709154j), (-0.0012574587017297745-0.0012574587017297745j)) + + taps = gr.firdes.low_pass(1, 1, 0.1, 0.01) + src = gr.vector_source_c(src_data) + op = filter.fir_filter_ccc(1, taps) + dst = gr.vector_sink_c() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + self.assertComplexTuplesAlmostEqual(expected_data, result_data, 5) + if __name__ == '__main__': gr_unittest.run(test_filter, "test_filter.xml") |