summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gr-filter/python/CMakeLists.txt2
-rw-r--r--gr-filter/python/__init__.py2
-rw-r--r--gr-filter/python/optfir.py339
-rw-r--r--gr-filter/python/pfb.py203
-rw-r--r--gr-filter/python/rational_resampler.py131
5 files changed, 674 insertions, 3 deletions
diff --git a/gr-filter/python/CMakeLists.txt b/gr-filter/python/CMakeLists.txt
index 99a1aa3a9..07f03fac4 100644
--- a/gr-filter/python/CMakeLists.txt
+++ b/gr-filter/python/CMakeLists.txt
@@ -23,7 +23,9 @@ include(GrPython)
GR_PYTHON_INSTALL(
FILES
__init__.py
+ optfir.py
pfb.py
+ rational_resampler.py
DESTINATION ${GR_PYTHON_DIR}/gnuradio/filter
COMPONENT "filter_python"
)
diff --git a/gr-filter/python/__init__.py b/gr-filter/python/__init__.py
index 90b5ce0a4..65a62d828 100644
--- a/gr-filter/python/__init__.py
+++ b/gr-filter/python/__init__.py
@@ -25,4 +25,6 @@ processing blocks for FILTER and related functions.
'''
from filter_swig import *
+from rational_resampler import *
import pfb
+import optfir
diff --git a/gr-filter/python/optfir.py b/gr-filter/python/optfir.py
new file mode 100644
index 000000000..bccb8c68d
--- /dev/null
+++ b/gr-filter/python/optfir.py
@@ -0,0 +1,339 @@
+#
+# Copyright 2004,2005,2009 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+'''
+Routines for designing optimal FIR filters.
+
+For a great intro to how all this stuff works, see section 6.6 of
+"Digital Signal Processing: A Practical Approach", Emmanuael C. Ifeachor
+and Barrie W. Jervis, Adison-Wesley, 1993. ISBN 0-201-54413-X.
+'''
+
+import math, cmath
+import filter_swig as filter
+
+# ----------------------------------------------------------------
+
+## Builds a low pass filter.
+# @param gain Filter gain in the passband (linear)
+# @param Fs Sampling rate (sps)
+# @param freq1 End of pass band (in Hz)
+# @param freq2 Start of stop band (in Hz)
+# @param passband_ripple_db Pass band ripple in dB (should be small, < 1)
+# @param stopband_atten_db Stop band attenuation in dB (should be large, >= 60)
+# @param nextra_taps Extra taps to use in the filter (default=2)
+def low_pass (gain, Fs, freq1, freq2, passband_ripple_db, stopband_atten_db,
+ nextra_taps=2):
+ passband_dev = passband_ripple_to_dev (passband_ripple_db)
+ stopband_dev = stopband_atten_to_dev (stopband_atten_db)
+ desired_ampls = (gain, 0)
+ (n, fo, ao, w) = remezord ([freq1, freq2], desired_ampls,
+ [passband_dev, stopband_dev], Fs)
+ # The remezord typically under-estimates the filter order, so add 2 taps by default
+ taps = filter.pm_remez (n + nextra_taps, fo, ao, w, "bandpass")
+ return taps
+
+## Builds a band pass filter.
+# @param gain Filter gain in the passband (linear)
+# @param Fs Sampling rate (sps)
+# @param freq_sb1 End of stop band (in Hz)
+# @param freq_pb1 Start of pass band (in Hz)
+# @param freq_pb2 End of pass band (in Hz)
+# @param freq_sb2 Start of stop band (in Hz)
+# @param passband_ripple_db Pass band ripple in dB (should be small, < 1)
+# @param stopband_atten_db Stop band attenuation in dB (should be large, >= 60)
+# @param nextra_taps Extra taps to use in the filter (default=2)
+def band_pass (gain, Fs, freq_sb1, freq_pb1, freq_pb2, freq_sb2,
+ passband_ripple_db, stopband_atten_db,
+ nextra_taps=2):
+ passband_dev = passband_ripple_to_dev (passband_ripple_db)
+ stopband_dev = stopband_atten_to_dev (stopband_atten_db)
+ desired_ampls = (0, gain, 0)
+ desired_freqs = [freq_sb1, freq_pb1, freq_pb2, freq_sb2]
+ desired_ripple = [stopband_dev, passband_dev, stopband_dev]
+ (n, fo, ao, w) = remezord (desired_freqs, desired_ampls,
+ desired_ripple, Fs)
+ # The remezord typically under-estimates the filter order, so add 2 taps by default
+ taps = filter.pm_remez (n + nextra_taps, fo, ao, w, "bandpass")
+ return taps
+
+
+## Builds a band pass filter with complex taps by making an LPF and
+# spinning it up to the right center frequency
+# @param gain Filter gain in the passband (linear)
+# @param Fs Sampling rate (sps)
+# @param freq_sb1 End of stop band (in Hz)
+# @param freq_pb1 Start of pass band (in Hz)
+# @param freq_pb2 End of pass band (in Hz)
+# @param freq_sb2 Start of stop band (in Hz)
+# @param passband_ripple_db Pass band ripple in dB (should be small, < 1)
+# @param stopband_atten_db Stop band attenuation in dB (should be large, >= 60)
+# @param nextra_taps Extra taps to use in the filter (default=2)
+def complex_band_pass (gain, Fs, freq_sb1, freq_pb1, freq_pb2, freq_sb2,
+ passband_ripple_db, stopband_atten_db,
+ nextra_taps=2):
+ center_freq = (freq_pb2 + freq_pb1) / 2.0
+ lp_pb = (freq_pb2 - center_freq)/1.0
+ lp_sb = freq_sb2 - center_freq
+ lptaps = low_pass(gain, Fs, lp_pb, lp_sb, passband_ripple_db,
+ stopband_atten_db, nextra_taps)
+ spinner = [cmath.exp(2j*cmath.pi*center_freq/Fs*i) for i in xrange(len(lptaps))]
+ taps = [s*t for s,t in zip(spinner, lptaps)]
+ return taps
+
+
+## Builds a band reject filter
+# spinning it up to the right center frequency
+# @param gain Filter gain in the passband (linear)
+# @param Fs Sampling rate (sps)
+# @param freq_pb1 End of pass band (in Hz)
+# @param freq_sb1 Start of stop band (in Hz)
+# @param freq_sb2 End of stop band (in Hz)
+# @param freq_pb2 Start of pass band (in Hz)
+# @param passband_ripple_db Pass band ripple in dB (should be small, < 1)
+# @param stopband_atten_db Stop band attenuation in dB (should be large, >= 60)
+# @param nextra_taps Extra taps to use in the filter (default=2)
+def band_reject (gain, Fs, freq_pb1, freq_sb1, freq_sb2, freq_pb2,
+ passband_ripple_db, stopband_atten_db,
+ nextra_taps=2):
+ passband_dev = passband_ripple_to_dev (passband_ripple_db)
+ stopband_dev = stopband_atten_to_dev (stopband_atten_db)
+ desired_ampls = (gain, 0, gain)
+ desired_freqs = [freq_pb1, freq_sb1, freq_sb2, freq_pb2]
+ desired_ripple = [passband_dev, stopband_dev, passband_dev]
+ (n, fo, ao, w) = remezord (desired_freqs, desired_ampls,
+ desired_ripple, Fs)
+ # Make sure we use an odd number of taps
+ if((n+nextra_taps)%2 == 1):
+ n += 1
+ # The remezord typically under-estimates the filter order, so add 2 taps by default
+ taps = filter.pm_remez (n + nextra_taps, fo, ao, w, "bandpass")
+ return taps
+
+
+## Builds a high pass filter.
+# @param gain Filter gain in the passband (linear)
+# @param Fs Sampling rate (sps)
+# @param freq1 End of stop band (in Hz)
+# @param freq2 Start of pass band (in Hz)
+# @param passband_ripple_db Pass band ripple in dB (should be small, < 1)
+# @param stopband_atten_db Stop band attenuation in dB (should be large, >= 60)
+# @param nextra_taps Extra taps to use in the filter (default=2)
+def high_pass (gain, Fs, freq1, freq2, passband_ripple_db, stopband_atten_db,
+ nextra_taps=2):
+ passband_dev = passband_ripple_to_dev (passband_ripple_db)
+ stopband_dev = stopband_atten_to_dev (stopband_atten_db)
+ desired_ampls = (0, 1)
+ (n, fo, ao, w) = remezord ([freq1, freq2], desired_ampls,
+ [stopband_dev, passband_dev], Fs)
+ # For a HPF, we need to use an odd number of taps
+ # In filter.remez, ntaps = n+1, so n must be even
+ if((n+nextra_taps)%2 == 1):
+ n += 1
+
+ # The remezord typically under-estimates the filter order, so add 2 taps by default
+ taps = filter.pm_remez (n + nextra_taps, fo, ao, w, "bandpass")
+ return taps
+
+# ----------------------------------------------------------------
+
+def stopband_atten_to_dev (atten_db):
+ """Convert a stopband attenuation in dB to an absolute value"""
+ return 10**(-atten_db/20)
+
+def passband_ripple_to_dev (ripple_db):
+ """Convert passband ripple spec expressed in dB to an absolute value"""
+ return (10**(ripple_db/20)-1)/(10**(ripple_db/20)+1)
+
+# ----------------------------------------------------------------
+
+def remezord (fcuts, mags, devs, fsamp = 2):
+ '''
+ FIR order estimator (lowpass, highpass, bandpass, mulitiband).
+
+ (n, fo, ao, w) = remezord (f, a, dev)
+ (n, fo, ao, w) = remezord (f, a, dev, fs)
+
+ (n, fo, ao, w) = remezord (f, a, dev) finds the approximate order,
+ normalized frequency band edges, frequency band amplitudes, and
+ weights that meet input specifications f, a, and dev, to use with
+ the remez command.
+
+ * f is a sequence of frequency band edges (between 0 and Fs/2, where
+ Fs is the sampling frequency), and a is a sequence specifying the
+ desired amplitude on the bands defined by f. The length of f is
+ twice the length of a, minus 2. The desired function is
+ piecewise constant.
+
+ * dev is a sequence the same size as a that specifies the maximum
+ allowable deviation or ripples between the frequency response
+ and the desired amplitude of the output filter, for each band.
+
+ Use remez with the resulting order n, frequency sequence fo,
+ amplitude response sequence ao, and weights w to design the filter b
+ which approximately meets the specifications given by remezord
+ input parameters f, a, and dev:
+
+ b = remez (n, fo, ao, w)
+
+ (n, fo, ao, w) = remezord (f, a, dev, Fs) specifies a sampling frequency Fs.
+
+ Fs defaults to 2 Hz, implying a Nyquist frequency of 1 Hz. You can
+ therefore specify band edges scaled to a particular applications
+ sampling frequency.
+
+ In some cases remezord underestimates the order n. If the filter
+ does not meet the specifications, try a higher order such as n+1
+ or n+2.
+ '''
+ # get local copies
+ fcuts = fcuts[:]
+ mags = mags[:]
+ devs = devs[:]
+
+ for i in range (len (fcuts)):
+ fcuts[i] = float (fcuts[i]) / fsamp
+
+ nf = len (fcuts)
+ nm = len (mags)
+ nd = len (devs)
+ nbands = nm
+
+ if nm != nd:
+ raise ValueError, "Length of mags and devs must be equal"
+
+ if nf != 2 * (nbands - 1):
+ raise ValueError, "Length of f must be 2 * len (mags) - 2"
+
+ for i in range (len (mags)):
+ if mags[i] != 0: # if not stopband, get relative deviation
+ devs[i] = devs[i] / mags[i]
+
+ # separate the passband and stopband edges
+ f1 = fcuts[0::2]
+ f2 = fcuts[1::2]
+
+ n = 0
+ min_delta = 2
+ for i in range (len (f1)):
+ if f2[i] - f1[i] < min_delta:
+ n = i
+ min_delta = f2[i] - f1[i]
+
+ if nbands == 2:
+ # lowpass or highpass case (use formula)
+ l = lporder (f1[n], f2[n], devs[0], devs[1])
+ else:
+ # bandpass or multipass case
+ # try different lowpasses and take the worst one that
+ # goes through the BP specs
+ l = 0
+ for i in range (1, nbands-1):
+ l1 = lporder (f1[i-1], f2[i-1], devs[i], devs[i-1])
+ l2 = lporder (f1[i], f2[i], devs[i], devs[i+1])
+ l = max (l, l1, l2)
+
+ n = int (math.ceil (l)) - 1 # need order, not length for remez
+
+ # cook up remez compatible result
+ ff = [0] + fcuts + [1]
+ for i in range (1, len (ff) - 1):
+ ff[i] *= 2
+
+ aa = []
+ for a in mags:
+ aa = aa + [a, a]
+
+ max_dev = max (devs)
+ wts = [1] * len(devs)
+ for i in range (len (wts)):
+ wts[i] = max_dev / devs[i]
+
+ return (n, ff, aa, wts)
+
+# ----------------------------------------------------------------
+
+def lporder (freq1, freq2, delta_p, delta_s):
+ '''
+ FIR lowpass filter length estimator. freq1 and freq2 are
+ normalized to the sampling frequency. delta_p is the passband
+ deviation (ripple), delta_s is the stopband deviation (ripple).
+
+ Note, this works for high pass filters too (freq1 > freq2), but
+ doesnt work well if the transition is near f == 0 or f == fs/2
+
+ From Herrmann et al (1973), Practical design rules for optimum
+ finite impulse response filters. Bell System Technical J., 52, 769-99
+ '''
+ df = abs (freq2 - freq1)
+ ddp = math.log10 (delta_p)
+ dds = math.log10 (delta_s)
+
+ a1 = 5.309e-3
+ a2 = 7.114e-2
+ a3 = -4.761e-1
+ a4 = -2.66e-3
+ a5 = -5.941e-1
+ a6 = -4.278e-1
+
+ b1 = 11.01217
+ b2 = 0.5124401
+
+ t1 = a1 * ddp * ddp
+ t2 = a2 * ddp
+ t3 = a4 * ddp * ddp
+ t4 = a5 * ddp
+
+ dinf=((t1 + t2 + a3) * dds) + (t3 + t4 + a6)
+ ff = b1 + b2 * (ddp - dds)
+ n = dinf / df - ff * df + 1
+ return n
+
+
+def bporder (freq1, freq2, delta_p, delta_s):
+ '''
+ FIR bandpass filter length estimator. freq1 and freq2 are
+ normalized to the sampling frequency. delta_p is the passband
+ deviation (ripple), delta_s is the stopband deviation (ripple).
+
+ From Mintzer and Liu (1979)
+ '''
+ df = abs (freq2 - freq1)
+ ddp = math.log10 (delta_p)
+ dds = math.log10 (delta_s)
+
+ a1 = 0.01201
+ a2 = 0.09664
+ a3 = -0.51325
+ a4 = 0.00203
+ a5 = -0.57054
+ a6 = -0.44314
+
+ t1 = a1 * ddp * ddp
+ t2 = a2 * ddp
+ t3 = a4 * ddp * ddp
+ t4 = a5 * ddp
+
+ cinf = dds * (t1 + t2 + a3) + t3 + t4 + a6
+ ginf = -14.6 * math.log10 (delta_p / delta_s) - 16.9
+ n = cinf / df + ginf * df + 1
+ return n
+
diff --git a/gr-filter/python/pfb.py b/gr-filter/python/pfb.py
index 9c7e18e31..ddf289982 100644
--- a/gr-filter/python/pfb.py
+++ b/gr-filter/python/pfb.py
@@ -20,8 +20,9 @@
# Boston, MA 02110-1301, USA.
#
-from gnuradio import gr, optfir
+from gnuradio import gr
import filter_swig as filter
+import optfir
class channelizer_ccf(gr.hier_block2):
'''
@@ -32,8 +33,8 @@ class channelizer_ccf(gr.hier_block2):
'''
def __init__(self, numchans, taps=None, oversample_rate=1, atten=100):
gr.hier_block2.__init__(self, "pfb_channelizer_ccf",
- gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
- gr.io_signature(numchans, numchans, gr.sizeof_gr_complex)) # Output signature
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
+ gr.io_signature(numchans, numchans, gr.sizeof_gr_complex))
self._nchans = numchans
self._oversample_rate = oversample_rate
@@ -72,3 +73,199 @@ class channelizer_ccf(gr.hier_block2):
self.pfb.set_channel_map(newmap)
+
+class interpolator_ccf(gr.hier_block2):
+ '''
+ Make a Polyphase Filter interpolator (complex in, complex out, floating-point taps)
+
+ The block takes a single complex stream in and outputs a single complex
+ stream out. As such, it requires no extra glue to handle the input/output
+ streams. This block is provided to be consistent with the interface to the
+ other PFB block.
+ '''
+ def __init__(self, interp, taps=None, atten=100):
+ gr.hier_block2.__init__(self, "pfb_interpolator_ccf",
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex))
+
+ self._interp = interp
+ self._taps = taps
+
+ if taps is not None:
+ self._taps = taps
+ else:
+ # Create a filter that covers the full bandwidth of the input signal
+ bw = 0.4
+ tb = 0.2
+ ripple = 0.99
+ made = False
+ while not made:
+ try:
+ self._taps = optfir.low_pass(self._interp, self._interp, bw, bw+tb, ripple, atten)
+ made = True
+ except RuntimeError:
+ ripple += 0.01
+ made = False
+ print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple))
+
+ # Build in an exit strategy; if we've come this far, it ain't working.
+ if(ripple >= 1.0):
+ raise RuntimeError("optfir could not generate an appropriate filter.")
+
+ self.pfb = filter.pfb_interpolator_ccf(self._interp, self._taps)
+
+ self.connect(self, self.pfb)
+ self.connect(self.pfb, self)
+
+
+class decimator_ccf(gr.hier_block2):
+ '''
+ Make a Polyphase Filter decimator (complex in, complex out, floating-point taps)
+
+ This simplifies the interface by allowing a single input stream to connect to this block.
+ It will then output a stream that is the decimated output stream.
+ '''
+ def __init__(self, decim, taps=None, channel=0, atten=100):
+ gr.hier_block2.__init__(self, "pfb_decimator_ccf",
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex))
+
+ self._decim = decim
+ self._channel = channel
+
+ if taps is not None:
+ self._taps = taps
+ else:
+ # Create a filter that covers the full bandwidth of the input signal
+ bw = 0.4
+ tb = 0.2
+ ripple = 0.1
+ made = False
+ while not made:
+ try:
+ self._taps = optfir.low_pass(1, self._decim, bw, bw+tb, ripple, atten)
+ made = True
+ except RuntimeError:
+ ripple += 0.01
+ made = False
+ print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple))
+
+ # Build in an exit strategy; if we've come this far, it ain't working.
+ if(ripple >= 1.0):
+ raise RuntimeError("optfir could not generate an appropriate filter.")
+
+ self.s2ss = gr.stream_to_streams(gr.sizeof_gr_complex, self._decim)
+ self.pfb = filter.pfb_decimator_ccf(self._decim, self._taps, self._channel)
+
+ self.connect(self, self.s2ss)
+
+ for i in xrange(self._decim):
+ self.connect((self.s2ss,i), (self.pfb,i))
+
+ self.connect(self.pfb, self)
+
+
+class arb_resampler_ccf(gr.hier_block2):
+ '''
+ Convenience wrapper for the polyphase filterbank arbitrary resampler.
+
+ The block takes a single complex stream in and outputs a single complex
+ stream out. As such, it requires no extra glue to handle the input/output
+ streams. This block is provided to be consistent with the interface to the
+ other PFB block.
+ '''
+ def __init__(self, rate, taps=None, flt_size=32, atten=100):
+ gr.hier_block2.__init__(self, "pfb_arb_resampler_ccf",
+ gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
+ gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
+
+ self._rate = rate
+ self._size = flt_size
+
+ if taps is not None:
+ self._taps = taps
+ else:
+ # Create a filter that covers the full bandwidth of the input signal
+ bw = 0.4
+ tb = 0.2
+ ripple = 0.1
+ #self._taps = filter.firdes.low_pass_2(self._size, self._size, bw, tb, atten)
+ made = False
+ while not made:
+ try:
+ self._taps = optfir.low_pass(self._size, self._size, bw, bw+tb, ripple, atten)
+ made = True
+ except RuntimeError:
+ ripple += 0.01
+ made = False
+ print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple))
+
+ # Build in an exit strategy; if we've come this far, it ain't working.
+ if(ripple >= 1.0):
+ raise RuntimeError("optfir could not generate an appropriate filter.")
+
+ self.pfb = filter.pfb_arb_resampler_ccf(self._rate, self._taps, self._size)
+ #print "PFB has %d taps\n" % (len(self._taps),)
+
+ self.connect(self, self.pfb)
+ self.connect(self.pfb, self)
+
+ # Note -- set_taps not implemented in base class yet
+ def set_taps(self, taps):
+ self.pfb.set_taps(taps)
+
+ def set_rate(self, rate):
+ self.pfb.set_rate(rate)
+
+
+class arb_resampler_fff(gr.hier_block2):
+ '''
+ Convenience wrapper for the polyphase filterbank arbitrary resampler.
+
+ The block takes a single float stream in and outputs a single float
+ stream out. As such, it requires no extra glue to handle the input/output
+ streams. This block is provided to be consistent with the interface to the
+ other PFB block.
+ '''
+ def __init__(self, rate, taps=None, flt_size=32, atten=100):
+ gr.hier_block2.__init__(self, "pfb_arb_resampler_fff",
+ gr.io_signature(1, 1, gr.sizeof_float), # Input signature
+ gr.io_signature(1, 1, gr.sizeof_float)) # Output signature
+
+ self._rate = rate
+ self._size = flt_size
+
+ if taps is not None:
+ self._taps = taps
+ else:
+ # Create a filter that covers the full bandwidth of the input signal
+ bw = 0.4
+ tb = 0.2
+ ripple = 0.1
+ #self._taps = filter.firdes.low_pass_2(self._size, self._size, bw, tb, atten)
+ made = False
+ while not made:
+ try:
+ self._taps = optfir.low_pass(self._size, self._size, bw, bw+tb, ripple, atten)
+ made = True
+ except RuntimeError:
+ ripple += 0.01
+ made = False
+ print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple))
+
+ # Build in an exit strategy; if we've come this far, it ain't working.
+ if(ripple >= 1.0):
+ raise RuntimeError("optfir could not generate an appropriate filter.")
+
+ self.pfb = filter.pfb_arb_resampler_fff(self._rate, self._taps, self._size)
+ #print "PFB has %d taps\n" % (len(self._taps),)
+
+ self.connect(self, self.pfb)
+ self.connect(self.pfb, self)
+
+ # Note -- set_taps not implemented in base class yet
+ def set_taps(self, taps):
+ self.pfb.set_taps(taps)
+
+ def set_rate(self, rate):
+ self.pfb.set_rate(rate)
diff --git a/gr-filter/python/rational_resampler.py b/gr-filter/python/rational_resampler.py
new file mode 100644
index 000000000..eea12af95
--- /dev/null
+++ b/gr-filter/python/rational_resampler.py
@@ -0,0 +1,131 @@
+#
+# Copyright 2005,2007 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gru
+
+_plot = None
+
+def design_filter(interpolation, decimation, fractional_bw):
+ """
+ Given the interpolation rate, decimation rate and a fractional bandwidth,
+ design a set of taps.
+
+ @param interpolation: interpolation factor
+ @type interpolation: integer > 0
+ @param decimation: decimation factor
+ @type decimation: integer > 0
+ @param fractional_bw: fractional bandwidth in (0, 0.5) 0.4 works well.
+ @type fractional_bw: float
+ @returns: sequence of numbers
+ """
+
+ if fractional_bw >= 0.5 or fractional_bw <= 0:
+ raise ValueError, "Invalid fractional_bandwidth, must be in (0, 0.5)"
+
+ beta = 5.0
+ trans_width = 0.5 - fractional_bw
+ mid_transition_band = 0.5 - trans_width/2
+
+ taps = gr.firdes.low_pass(interpolation, # gain
+ 1, # Fs
+ mid_transition_band/interpolation, # trans mid point
+ trans_width/interpolation, # transition width
+ gr.firdes.WIN_KAISER,
+ beta # beta
+ )
+
+ return taps
+
+
+
+class _rational_resampler_base(gr.hier_block2):
+ """
+ base class for all rational resampler variants.
+ """
+ def __init__(self, resampler_base,
+ interpolation, decimation, taps=None, fractional_bw=None):
+ """
+ Rational resampling polyphase FIR filter.
+
+ Either taps or fractional_bw may be specified, but not both.
+ If neither is specified, a reasonable default, 0.4, is used as
+ the fractional_bw.
+
+ @param interpolation: interpolation factor
+ @type interpolation: integer > 0
+ @param decimation: decimation factor
+ @type decimation: integer > 0
+ @param taps: optional filter coefficients
+ @type taps: sequence
+ @param fractional_bw: fractional bandwidth in (0, 0.5), measured at final freq (use 0.4)
+ @type fractional_bw: float
+ """
+
+ if not isinstance(interpolation, int) or interpolation < 1:
+ raise ValueError, "interpolation must be an integer >= 1"
+
+ if not isinstance(decimation, int) or decimation < 1:
+ raise ValueError, "decimation must be an integer >= 1"
+
+ if taps is None and fractional_bw is None:
+ fractional_bw = 0.4
+
+ d = gru.gcd(interpolation, decimation)
+ interpolation = interpolation // d
+ decimation = decimation // d
+
+ if taps is None:
+ taps = design_filter(interpolation, decimation, fractional_bw)
+
+ resampler = resampler_base(interpolation, decimation, taps)
+ gr.hier_block2.__init__(self, "rational_resampler",
+ gr.io_signature(1, 1, resampler.input_signature().sizeof_stream_item(0)),
+ gr.io_signature(1, 1, resampler.output_signature().sizeof_stream_item(0)))
+
+ self.connect(self, resampler, self)
+
+
+class rational_resampler_fff(_rational_resampler_base):
+ def __init__(self, interpolation, decimation, taps=None, fractional_bw=None):
+ """
+ Rational resampling polyphase FIR filter with
+ float input, float output and float taps.
+ """
+ _rational_resampler_base.__init__(self, gr.rational_resampler_base_fff,
+ interpolation, decimation, taps, fractional_bw)
+
+class rational_resampler_ccf(_rational_resampler_base):
+ def __init__(self, interpolation, decimation, taps=None, fractional_bw=None):
+ """
+ Rational resampling polyphase FIR filter with
+ complex input, complex output and float taps.
+ """
+ _rational_resampler_base.__init__(self, gr.rational_resampler_base_ccf,
+ interpolation, decimation, taps, fractional_bw)
+
+class rational_resampler_ccc(_rational_resampler_base):
+ def __init__(self, interpolation, decimation, taps=None, fractional_bw=None):
+ """
+ Rational resampling polyphase FIR filter with
+ complex input, complex output and complex taps.
+ """
+ _rational_resampler_base.__init__(self, gr.rational_resampler_base_ccc,
+ interpolation, decimation, taps, fractional_bw)