summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/qa_constellation.py88
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/qa_constellation_receiver.py144
2 files changed, 190 insertions, 42 deletions
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_constellation.py b/gnuradio-core/src/python/gnuradio/gr/qa_constellation.py
index 054194789..6ac3d95b6 100644
--- a/gnuradio-core/src/python/gnuradio/gr/qa_constellation.py
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_constellation.py
@@ -58,7 +58,7 @@ def threed_constell():
dim = 3
return gr.constellation_calcdist(points, [], rot_sym, dim)
-tested_constellations = (
+tested_constellation_info = (
(blks2.psk_constellation,
{'m': (2, 4, 8, 16, 32, 64),
'mod_code': tested_mod_codes, },
@@ -76,6 +76,36 @@ tested_constellations = (
(threed_constell, {}, True, None),
)
+def tested_constellations():
+ """
+ Generator to produce (constellation, differential) tuples for testing purposes.
+ """
+ for constructor, poss_args, differential, diff_argname in tested_constellation_info:
+ if differential:
+ diff_poss = (True, False)
+ else:
+ diff_poss = (False,)
+ poss_args = [[argname, argvalues, 0] for argname, argvalues in poss_args.items()]
+ for current_diff in diff_poss:
+ # Add an index into args to keep track of current position in argvalues
+ while True:
+ current_args = dict([(argname, argvalues[argindex])
+ for argname, argvalues, argindex in poss_args])
+ if diff_argname is not None:
+ current_args[diff_argname] = current_diff
+ constellation = constructor(**current_args)
+ yield (constellation, current_diff)
+ for this_poss_arg in poss_args:
+ argname, argvalues, argindex = this_poss_arg
+ if argindex < len(argvalues) - 1:
+ this_poss_arg[2] += 1
+ break
+ else:
+ this_poss_arg[2] = 0
+ if sum([argindex for argname, argvalues, argindex in poss_args]) == 0:
+ break
+
+
class test_constellation (gr_unittest.TestCase):
src_length = 256
@@ -88,49 +118,23 @@ class test_constellation (gr_unittest.TestCase):
pass
def test_hard_decision(self):
- for constructor, poss_args, differential, diff_argname in tested_constellations:
+ for constellation, differential in tested_constellations():
if differential:
- diff_poss = (True, False)
+ rs = constellation.rotational_symmetry()
+ rotations = [exp(i*2*pi*(0+1j)/rs) for i in range(0, rs)]
else:
- diff_poss = (False,)
- poss_args = [[argname, argvalues, 0] for argname, argvalues in poss_args.items()]
- for current_diff in diff_poss:
- # Add an index into args to keep track of current position in argvalues
- while True:
- current_args = dict([(argname, argvalues[argindex])
- for argname, argvalues, argindex in poss_args])
- if diff_argname is not None:
- current_args[diff_argname] = current_diff
- constellation = constructor(**current_args)
- # If we're differentially encoding test for possible rotations
- # Testing for every possible rotation seems a bit overkill
- # but better safe than sorry.
- if current_diff:
- rs = constellation.rotational_symmetry()
- rotations = [exp(i*2*pi*(0+1j)/rs) for i in range(0, rs)]
- else:
- rotations = [None]
- for rotation in rotations:
- src = gr.vector_source_b(self.src_data)
- content = mod_demod(constellation, current_diff, rotation)
- dst = gr.vector_sink_b()
- self.tb = gr.top_block()
- self.tb.connect(src, content, dst)
- self.tb.run()
- data = dst.data()
- # Don't worry about cut off data for now.
- first = constellation.bits_per_symbol()
- self.assertEqual (self.src_data[first:len(data)], data[first:])
- # Move to next arg combination
- for this_poss_arg in poss_args:
- argname, argvalues, argindex = this_poss_arg
- if argindex < len(argvalues) - 1:
- this_poss_arg[2] += 1
- break
- else:
- this_poss_arg[2] = 0
- if sum([argindex for argname, argvalues, argindex in poss_args]) == 0:
- break
+ rotations = [None]
+ for rotation in rotations:
+ src = gr.vector_source_b(self.src_data)
+ content = mod_demod(constellation, differential, rotation)
+ dst = gr.vector_sink_b()
+ self.tb = gr.top_block()
+ self.tb.connect(src, content, dst)
+ self.tb.run()
+ data = dst.data()
+ # Don't worry about cut off data for now.
+ first = constellation.bits_per_symbol()
+ self.assertEqual (self.src_data[first:len(data)], data[first:])
class mod_demod(gr.hier_block2):
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_constellation_receiver.py b/gnuradio-core/src/python/gnuradio/gr/qa_constellation_receiver.py
new file mode 100644
index 000000000..8d7719739
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_constellation_receiver.py
@@ -0,0 +1,144 @@
+#!/usr/bin/env python
+#
+# Copyright 2011 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import random
+
+from gnuradio import gr, blks2, packet_utils, gr_unittest
+from gnuradio.utils import mod_codes, alignment
+
+from qa_constellation import tested_constellations, twod_constell
+
+# Set a seed so that if errors turn up they are reproducible.
+# 1234 fails
+random.seed(1239)
+
+# TESTING PARAMETERS
+# The number of symbols to test with.
+# We need this many to let the frequency recovery block converge.
+DATA_LENGTH = 200000
+# Test fails if fraction of output that is correct is less than this.
+REQ_CORRECT = 0.8
+
+# CHANNEL PARAMETERS
+NOISE_VOLTAGE = 0.01
+FREQUENCY_OFFSET = 0.01
+TIMING_OFFSET = 1.0
+
+# RECEIVER PARAMETERS
+# Increased from normal default of 0.01 to speed things up.
+FREQ_ALPHA = 0.02
+# Decreased from normal default of 0.1 is required for the constellations
+# with smaller point separations.
+PHASE_ALPHA = 0.02
+
+
+class test_constellation_receiver (gr_unittest.TestCase):
+
+ # We ignore the first half of the output data since often it takes
+ # a while for the receiver to lock on.
+ ignore_fraction = 0.8
+ seed = 1234
+ max_data_length = DATA_LENGTH * 6
+ max_num_samples = 1000
+
+ def test_basic(self):
+ """
+ Tests a bunch of different constellations by using generic
+ modulation, a channel, and generic demodulation. The generic
+ demodulation uses constellation_receiver which is what
+ we're really trying to test.
+ """
+ # Assumes not more than 64 points in a constellation
+ # Generates some random input data to use.
+ self.src_data = tuple(
+ [random.randint(0,1) for i in range(0, self.max_data_length)])
+ # Generates some random indices to use for comparing input and
+ # output data (a full comparison is too slow in python).
+ self.indices = alignment.random_sample(
+ self.max_data_length, self.max_num_samples, self.seed)
+
+ for constellation, differential in tested_constellations():
+ # The constellation_receiver doesn't work for constellations
+ # of multple dimensions (i.e. multiple complex numbers to a
+ # single symbol).
+ # That is not implemented since the receiver has no way of
+ # knowing where the beginning of a symbol is.
+ # It also doesn't work for non-differential modulation.
+ if constellation.dimensionality() != 1 or not differential:
+ continue
+ data_length = DATA_LENGTH * constellation.bits_per_symbol()
+ tb = rec_test_tb(constellation, differential,
+ src_data=self.src_data[:data_length])
+ tb.run()
+ data = tb.dst.data()
+ d1 = tb.src_data[:int(len(tb.src_data)*self.ignore_fraction)]
+ d2 = data[:int(len(data)*self.ignore_fraction)]
+ correct, overlap, offset, indices = alignment.align_sequences(
+ d1, d2, indices=self.indices)
+ print(constellation, constellation.arity(), differential, correct, overlap, offset)
+ self.assertTrue(correct > REQ_CORRECT)
+
+ def single(self):
+ constellation = blks2.psk_constellation(64, mod_codes.NO_CODE)
+ tb = rec_test_tb(constellation, True, DATA_LENGTH)
+ tb.run()
+ data = tb.dst.data()
+ d1 = tb.src_data[:int(len(tb.src_data)*self.ignore_fraction)]
+ d2 = data[:int(len(data)*self.ignore_fraction)]
+ seed = random.randint(0, 256)
+ correct, overlap, offset, indices = alignment.align_sequences(d1, d2, seed=seed)
+ print(constellation, constellation.arity(), correct, overlap, offset)
+
+
+class rec_test_tb (gr.top_block):
+ """
+ Takes a constellation an runs a generic modulation, channel,
+ and generic demodulation.
+ """
+ def __init__(self, constellation, differential,
+ data_length=None, src_data=None):
+ """
+ constellation -- a constellation object
+ differential -- whether differential encoding is used
+ data_length -- the number of bits of data to use
+ src_data -- a list of the bits to use
+ """
+ super(rec_test_tb, self).__init__()
+ # Transmission Blocks
+ if src_data is None:
+ self.src_data = tuple([random.randint(0,1) for i in range(0, data_length)])
+ else:
+ self.src_data = src_data
+ packer = gr.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST)
+ src = gr.vector_source_b(self.src_data)
+ mod = blks2.generic_mod(constellation, differential=differential)
+ # Channel
+ channel = gr.channel_model(NOISE_VOLTAGE, FREQUENCY_OFFSET, TIMING_OFFSET)
+ # Receiver Blocks
+ demod = blks2.generic_demod(constellation, differential=differential, log=True,
+ freq_alpha=FREQ_ALPHA,
+ phase_alpha=PHASE_ALPHA)
+ self.dst = gr.vector_sink_b()
+ self.connect(src, packer, mod, channel, demod, self.dst)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_constellation_receiver, "test_constellation_receiver.xml")