From 26531c2da601a1c21c50e3644350c604c27f8658 Mon Sep 17 00:00:00 2001
From: Tom Rondeau
Date: Wed, 2 May 2012 18:59:23 -0400
Subject: filter: fixed FIR filter taps and added complex FFT filter.

---
 gr-filter/python/qa_fft_filter.py | 380 ++++++++++++++++++++++++++++++++++++++
 gr-filter/python/qa_fir_filter.py |  22 ++-
 2 files changed, 399 insertions(+), 3 deletions(-)
 create mode 100755 gr-filter/python/qa_fft_filter.py

(limited to 'gr-filter/python')

diff --git a/gr-filter/python/qa_fft_filter.py b/gr-filter/python/qa_fft_filter.py
new file mode 100755
index 000000000..33d3d870b
--- /dev/null
+++ b/gr-filter/python/qa_fft_filter.py
@@ -0,0 +1,380 @@
+#!/usr/bin/env python
+#
+# Copyright 2004,2005,2007,2010,2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING.  If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import filter_swig as filter
+import sys
+import random
+
+def make_random_complex_tuple(L):
+    result = []
+    for x in range(L):
+        result.append(complex(random.uniform(-1000,1000),
+                              random.uniform(-1000,1000)))
+    return tuple(result)
+
+def make_random_float_tuple(L):
+    result = []
+    for x in range(L):
+        result.append(float(int(random.uniform(-1000,1000))))
+    return tuple(result)
+
+
+def reference_filter_ccc(dec, taps, input):
+    """
+    compute result using conventional fir filter
+    """
+    tb = gr.top_block()
+    #src = gr.vector_source_c(((0,) * (len(taps) - 1)) + input)
+    src = gr.vector_source_c(input)
+    op = filter.fir_filter_ccc(dec, taps)
+    dst = gr.vector_sink_c()
+    tb.connect(src, op, dst)
+    tb.run()
+    return dst.data()
+
+def reference_filter_fff(dec, taps, input):
+    """
+    compute result using conventional fir filter
+    """
+    tb = gr.top_block()
+    #src = gr.vector_source_f(((0,) * (len(taps) - 1)) + input)
+    src = gr.vector_source_f(input)
+    op = filter.fir_filter_fff(dec, taps)
+    dst = gr.vector_sink_f()
+    tb.connect(src, op, dst)
+    tb.run()
+    return dst.data()
+
+
+def print_complex(x):
+    for i in x:
+        i = complex(i)
+        sys.stdout.write("(%6.3f,%6.3fj), " % (i.real, i.imag))
+    sys.stdout.write('\n')
+
+
+class test_fft_filter(gr_unittest.TestCase):
+
+    def setUp(self):
+	pass
+
+    def tearDown(self):
+	pass
+
+    def assert_fft_ok2(self, expected_result, result_data):
+        expected_result = expected_result[:len(result_data)]
+        self.assertComplexTuplesAlmostEqual2 (expected_result, result_data,
+                                              abs_eps=1e-9, rel_eps=4e-4)
+
+    def assert_fft_float_ok2(self, expected_result, result_data, abs_eps=1e-9, rel_eps=4e-4):
+        expected_result = expected_result[:len(result_data)]
+        self.assertFloatTuplesAlmostEqual2 (expected_result, result_data,
+                                            abs_eps, rel_eps)
+
+    def test_ccc_001(self):
+	tb = gr.top_block()
+        src_data = (0,1,2,3,4,5,6,7)
+        taps = (1,)
+        expected_result = tuple([complex(x) for x in (0,1,2,3,4,5,6,7)])
+        src = gr.vector_source_c(src_data)
+        op = gr.fft_filter_ccc(1, taps)
+        dst = gr.vector_sink_c()
+        tb.connect(src, op, dst)
+        tb.run()
+        result_data = dst.data()
+        #print 'expected:', expected_result
+        #print 'results: ', result_data
+        self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5)
+
+
+    def test_ccc_002(self):
+        # Test nthreads
+	tb = gr.top_block()
+        src_data = (0,1,2,3,4,5,6,7)
+        taps = (2,)
+        nthreads = 2
+        expected_result = tuple([2 * complex(x) for x in (0,1,2,3,4,5,6,7)])
+        src = gr.vector_source_c(src_data)
+        op = gr.fft_filter_ccc(1, taps, nthreads)
+        dst = gr.vector_sink_c()
+        tb.connect(src, op, dst)
+        tb.run()
+        result_data = dst.data()
+        #print 'expected:', expected_result
+        #print 'results: ', result_data
+        self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5)
+
+    def test_ccc_003(self):
+	tb = gr.top_block()
+        src_data = (0,1,2,3,4,5,6,7)
+        taps = (2,)
+        expected_result = tuple([2 * complex(x) for x in (0,1,2,3,4,5,6,7)])
+        src = gr.vector_source_c(src_data)
+        op = gr.fft_filter_ccc(1, taps)
+        dst = gr.vector_sink_c()
+        tb.connect(src, op, dst)
+        tb.run()
+        result_data = dst.data()
+        #print 'expected:', expected_result
+        #print 'results: ', result_data
+        self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5)
+
+
+    def test_ccc_004(self):
+        random.seed(0)
+        for i in xrange(25):
+            # sys.stderr.write("\n>>> Loop = %d\n" % (i,))
+            src_len = 4*1024
+            src_data = make_random_complex_tuple(src_len)
+            ntaps = int(random.uniform(2, 1000))
+            taps = make_random_complex_tuple(ntaps)
+            expected_result = reference_filter_ccc(1, taps, src_data)
+
+            src = gr.vector_source_c(src_data)
+            op = filter.fft_filter_ccc(1, taps)
+            dst = gr.vector_sink_c()
+	    tb = gr.top_block()
+            tb.connect(src, op, dst)
+            tb.run()
+            result_data = dst.data()
+	    del tb
+            self.assert_fft_ok2(expected_result, result_data)
+
+    def test_ccc_005(self):
+        random.seed(0)
+        for i in xrange(25):
+            # sys.stderr.write("\n>>> Loop = %d\n" % (i,))
+            dec = i + 1
+            src_len = 4*1024
+            src_data = make_random_complex_tuple(src_len)
+            ntaps = int(random.uniform(2, 100))
+            taps = make_random_complex_tuple(ntaps)
+            expected_result = reference_filter_ccc(dec, taps, src_data)
+
+            src = gr.vector_source_c(src_data)
+            op = gr.fft_filter_ccc(dec, taps)
+            dst = gr.vector_sink_c()
+            tb = gr.top_block()
+	    tb.connect(src, op, dst)
+            tb.run()
+            del tb
+	    result_data = dst.data()
+
+            self.assert_fft_ok2(expected_result, result_data)
+
+    def test_ccc_006(self):
+        # Test decimating with nthreads=2
+        random.seed(0)
+        nthreads = 2
+        for i in xrange(25):
+            # sys.stderr.write("\n>>> Loop = %d\n" % (i,))
+            dec = i + 1
+            src_len = 4*1024
+            src_data = make_random_complex_tuple(src_len)
+            ntaps = int(random.uniform(2, 100))
+            taps = make_random_complex_tuple(ntaps)
+            expected_result = reference_filter_ccc(dec, taps, src_data)
+
+            src = gr.vector_source_c(src_data)
+            op = filter.fft_filter_ccc(dec, taps, nthreads)
+            dst = gr.vector_sink_c()
+            tb = gr.top_block()
+	    tb.connect(src, op, dst)
+            tb.run()
+            del tb
+	    result_data = dst.data()
+
+            self.assert_fft_ok2(expected_result, result_data)
+
+    # ----------------------------------------------------------------
+    # test _fff version
+    # ----------------------------------------------------------------
+
+    def test_fff_001(self):
+        tb = gr.top_block()
+        src_data = (0,1,2,3,4,5,6,7)
+        taps = (1,)
+        expected_result = tuple([float(x) for x in (0,1,2,3,4,5,6,7)])
+        src = gr.vector_source_f(src_data)
+        op = gr.fft_filter_fff(1, taps)
+        dst = gr.vector_sink_f()
+        tb.connect(src, op, dst)
+        tb.run()
+        result_data = dst.data()
+        #print 'expected:', expected_result
+        #print 'results: ', result_data
+        self.assertFloatTuplesAlmostEqual (expected_result, result_data, 5)
+
+
+    def test_fff_002(self):
+        tb = gr.top_block()
+        src_data = (0,1,2,3,4,5,6,7)
+        taps = (2,)
+        expected_result = tuple([2 * float(x) for x in (0,1,2,3,4,5,6,7)])
+        src = gr.vector_source_f(src_data)
+        op = gr.fft_filter_fff(1, taps)
+        dst = gr.vector_sink_f()
+        tb.connect(src, op, dst)
+        tb.run()
+        result_data = dst.data()
+        #print 'expected:', expected_result
+        #print 'results: ', result_data
+        self.assertFloatTuplesAlmostEqual (expected_result, result_data, 5)
+
+    def test_fff_003(self):
+        # Test 02 with nthreads
+        tb = gr.top_block()
+        src_data = (0,1,2,3,4,5,6,7)
+        taps = (2,)
+        nthreads = 2
+        expected_result = tuple([2 * float(x) for x in (0,1,2,3,4,5,6,7)])
+        src = gr.vector_source_f(src_data)
+        op = gr.fft_filter_fff(1, taps, nthreads)
+        dst = gr.vector_sink_f()
+        tb.connect(src, op, dst)
+        tb.run()
+        result_data = dst.data()
+        self.assertFloatTuplesAlmostEqual (expected_result, result_data, 5)
+
+    def xtest_fff_004(self):
+        random.seed(0)
+        for i in xrange(25):
+            sys.stderr.write("\n>>> Loop = %d\n" % (i,))
+            src_len = 4096
+            src_data = make_random_float_tuple(src_len)
+            ntaps = int(random.uniform(2, 1000))
+            taps = make_random_float_tuple(ntaps)
+            expected_result = reference_filter_fff(1, taps, src_data)
+
+            src = gr.vector_source_f(src_data)
+            op = gr.fft_filter_fff(1, taps)
+            dst = gr.vector_sink_f()
+    	    tb = gr.top_block()
+            tb.connect(src, op, dst)
+            tb.run()
+            result_data = dst.data()
+
+            #print "src_len =", src_len, " ntaps =", ntaps
+            try:
+                self.assert_fft_float_ok2(expected_result, result_data, abs_eps=1.0)
+            except:
+                expected = open('expected', 'w')
+                for x in expected_result:
+                    expected.write(`x` + '\n')
+                actual = open('actual', 'w')
+                for x in result_data:
+                    actual.write(`x` + '\n')
+                raise
+
+    def xtest_fff_005(self):
+        random.seed(0)
+        for i in xrange(25):
+            sys.stderr.write("\n>>> Loop = %d\n" % (i,))
+            src_len = 4*1024
+            src_data = make_random_float_tuple(src_len)
+            ntaps = int(random.uniform(2, 1000))
+            taps = make_random_float_tuple(ntaps)
+            expected_result = reference_filter_fff(1, taps, src_data)
+
+            src = gr.vector_source_f(src_data)
+            op = gr.fft_filter_fff(1, taps)
+            dst = gr.vector_sink_f()
+    	    tb = gr.top_block()
+            tb.connect(src, op, dst)
+            tb.run()
+            result_data = dst.data()
+
+            self.assert_fft_float_ok2(expected_result, result_data, abs_eps=2.0)
+
+    def xtest_fff_006(self):
+        random.seed(0)
+        for i in xrange(25):
+            sys.stderr.write("\n>>> Loop = %d\n" % (i,))
+            dec = i + 1
+            src_len = 4*1024
+            src_data = make_random_float_tuple(src_len)
+            ntaps = int(random.uniform(2, 100))
+            taps = make_random_float_tuple(ntaps)
+            expected_result = reference_filter_fff(dec, taps, src_data)
+
+            src = gr.vector_source_f(src_data)
+            op = gr.fft_filter_fff(dec, taps)
+            dst = gr.vector_sink_f()
+    	    tb = gr.top_block()
+            tb.connect(src, op, dst)
+            tb.run()
+            result_data = dst.data()
+
+            self.assert_fft_float_ok2(expected_result, result_data)
+
+    def xtest_fff_007(self):
+        # test decimation with nthreads
+        random.seed(0)
+        nthreads = 2
+        for i in xrange(25):
+            sys.stderr.write("\n>>> Loop = %d\n" % (i,))
+            dec = i + 1
+            src_len = 4*1024
+            src_data = make_random_float_tuple(src_len)
+            ntaps = int(random.uniform(2, 100))
+            taps = make_random_float_tuple(ntaps)
+            expected_result = reference_filter_fff(dec, taps, src_data)
+
+            src = gr.vector_source_f(src_data)
+            op = gr.fft_filter_fff(dec, taps, nthreads)
+            dst = gr.vector_sink_f()
+    	    tb = gr.top_block()
+            tb.connect(src, op, dst)
+            tb.run()
+            result_data = dst.data()
+
+            self.assert_fft_float_ok2(expected_result, result_data)
+
+    def test_fff_get0(self):
+        random.seed(0)
+        for i in xrange(25):
+            ntaps = int(random.uniform(2, 100))
+            taps = make_random_float_tuple(ntaps)
+
+            op = gr.fft_filter_fff(1, taps)
+            result_data = op.taps()
+            #print result_data
+
+            self.assertEqual(taps, result_data)
+
+    def test_ccc_get0(self):
+        random.seed(0)
+        for i in xrange(25):
+            ntaps = int(random.uniform(2, 100))
+            taps = make_random_complex_tuple(ntaps)
+
+            op = gr.fft_filter_ccc(1, taps)
+            result_data = op.taps()
+            #print result_data
+
+            self.assertComplexTuplesAlmostEqual(taps, result_data, 4)
+
+
+if __name__ == '__main__':
+    gr_unittest.run(test_fft_filter, "test_fft_filter.xml")
+
diff --git a/gr-filter/python/qa_fir_filter.py b/gr-filter/python/qa_fir_filter.py
index 2c88e7830..f0f08afca 100755
--- a/gr-filter/python/qa_fir_filter.py
+++ b/gr-filter/python/qa_fir_filter.py
@@ -32,7 +32,7 @@ class test_filter(gr_unittest.TestCase):
 
     def test_fir_filter_fff_001(self):
         src_data = [1, 2, 3, 4]
-        expected_data = [0, 0.5, 1.5, 2.5]
+        expected_data = [0.5, 1.5, 2.5, 3.5]
         src = gr.vector_source_f(src_data)
         op  = filter.fir_filter_fff(1, [0.5, 0.5])
         dst = gr.vector_sink_f()
@@ -43,7 +43,7 @@ class test_filter(gr_unittest.TestCase):
 
     def test_fir_filter_ccf_001(self):
         src_data = [1+1j, 2+2j, 3+3j, 4+4j]
-        expected_data = [0+0j, 0.5+0.5j, 1.5+1.5j, 2.5+2.5j]
+        expected_data = [0.5+0.5j, 1.5+1.5j, 2.5+2.5j, 3.5+3.5j]
         src = gr.vector_source_c(src_data)
         op  = filter.fir_filter_ccf(1, [0.5, 0.5])
         dst = gr.vector_sink_c()
@@ -54,7 +54,7 @@ class test_filter(gr_unittest.TestCase):
 
     def test_fir_filter_ccc_001(self):
         src_data = [1+1j, 2+2j, 3+3j, 4+4j]
-        expected_data = [0+0j, -0.5+1.5j, -1.5+4.5j, -2.5+7.5j]
+        expected_data = [-0.5+1.5j, -1.5+4.5j, -2.5+7.5j, -3.5+10.5j]
         src = gr.vector_source_c(src_data)
         op  = filter.fir_filter_ccc(1, [0.5+1j, 0.5+1j])
         dst = gr.vector_sink_c()
@@ -63,6 +63,22 @@ class test_filter(gr_unittest.TestCase):
         result_data = dst.data()
         self.assertComplexTuplesAlmostEqual(expected_data, result_data, 5)
 
+
+    def test_fir_filter_ccc_002(self):
+        src_data = 10*[1+1j, 2+2j, 3+3j, 4+4j]
+        
+        # results derived from original gr.fir_filter_ccc
+        expected_data = ((7.537424837948042e-20+7.537424837948042e-20j), (9.131923434324563e-05+9.131923434324563e-05j), (0.0003317668742965907+0.0003317668742965907j), (0.0007230418268591166+0.0007230418268591166j), (0.0012087896466255188+0.0012087896466255188j), (0.0013292605290189385+0.0013292605290189385j), (0.001120875240303576+0.001120875240303576j), (0.000744672492146492+0.000744672492146492j), (0.000429437990533188+0.000429437990533188j), (2.283908543176949e-05+2.283908543176949e-05j), (-0.0002245186478830874-0.0002245186478830874j), (-0.0001157080550910905-0.0001157080550910905j), (0.00041409023106098175+0.00041409023106098175j), (0.0009017843985930085+0.0009017843985930085j), (0.0012520025484263897+0.0012520025484263897j), (0.0014116164529696107+0.0014116164529696107j), (0.001393353333696723+0.001393353333696723j), (0.000912194955162704+0.000912194955162704j), (0.00022649182938039303+0.00022649182938039303j), (-0.00031363096786662936-0.00031363096786662936j), (-0.0003966730728279799-0.0003966730728279799j), (-0.00023757052258588374-0.00023757052258588374j), (0.00021952332463115454+0.00021952332463115454j), (0.0009092430118471384+0.0009092430118471384j), (0.001662317430600524+0.001662317430600524j), (0.0019024648936465383+0.0019024648936465383j), (0.0015955769922584295+0.0015955769922584295j), (0.0009144138311967254+0.0009144138311967254j), (0.0001872836146503687+0.0001872836146503687j), (-0.000581968342885375-0.000581968342885375j), (-0.0009886166080832481-0.0009886166080832481j), (-0.0007480768254026771-0.0007480768254026771j), (0.00018211957649327815+0.00018211957649327815j), (0.0012042406015098095+0.0012042406015098095j), (0.0020200139842927456+0.0020200139842927456j), (0.0023816542234271765+0.0023816542234271765j), (0.002195809967815876+0.002195809967815876j), (0.0012113333214074373+0.0012113333214074373j), (-0.00014088614261709154-0.00014088614261709154j), (-0.0012574587017297745-0.0012574587017297745j))
+
+        taps = gr.firdes.low_pass(1, 1, 0.1, 0.01)
+        src = gr.vector_source_c(src_data)
+        op  = filter.fir_filter_ccc(1, taps)
+        dst = gr.vector_sink_c()
+        self.tb.connect(src, op, dst)
+        self.tb.run()
+        result_data = dst.data()
+        self.assertComplexTuplesAlmostEqual(expected_data, result_data, 5)
+
 if __name__ == '__main__':
     gr_unittest.run(test_filter, "test_filter.xml")
 
-- 
cgit