diff options
-rw-r--r-- | gnuradio-core/src/python/gnuradio/gr/qa_constellation.py | 88 | ||||
-rw-r--r-- | gnuradio-core/src/python/gnuradio/gr/qa_constellation_receiver.py | 144 |
2 files changed, 190 insertions, 42 deletions
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_constellation.py b/gnuradio-core/src/python/gnuradio/gr/qa_constellation.py index 054194789..6ac3d95b6 100644 --- a/gnuradio-core/src/python/gnuradio/gr/qa_constellation.py +++ b/gnuradio-core/src/python/gnuradio/gr/qa_constellation.py @@ -58,7 +58,7 @@ def threed_constell(): dim = 3 return gr.constellation_calcdist(points, [], rot_sym, dim) -tested_constellations = ( +tested_constellation_info = ( (blks2.psk_constellation, {'m': (2, 4, 8, 16, 32, 64), 'mod_code': tested_mod_codes, }, @@ -76,6 +76,36 @@ tested_constellations = ( (threed_constell, {}, True, None), ) +def tested_constellations(): + """ + Generator to produce (constellation, differential) tuples for testing purposes. + """ + for constructor, poss_args, differential, diff_argname in tested_constellation_info: + if differential: + diff_poss = (True, False) + else: + diff_poss = (False,) + poss_args = [[argname, argvalues, 0] for argname, argvalues in poss_args.items()] + for current_diff in diff_poss: + # Add an index into args to keep track of current position in argvalues + while True: + current_args = dict([(argname, argvalues[argindex]) + for argname, argvalues, argindex in poss_args]) + if diff_argname is not None: + current_args[diff_argname] = current_diff + constellation = constructor(**current_args) + yield (constellation, current_diff) + for this_poss_arg in poss_args: + argname, argvalues, argindex = this_poss_arg + if argindex < len(argvalues) - 1: + this_poss_arg[2] += 1 + break + else: + this_poss_arg[2] = 0 + if sum([argindex for argname, argvalues, argindex in poss_args]) == 0: + break + + class test_constellation (gr_unittest.TestCase): src_length = 256 @@ -88,49 +118,23 @@ class test_constellation (gr_unittest.TestCase): pass def test_hard_decision(self): - for constructor, poss_args, differential, diff_argname in tested_constellations: + for constellation, differential in tested_constellations(): if differential: - diff_poss = (True, False) + rs = constellation.rotational_symmetry() + rotations = [exp(i*2*pi*(0+1j)/rs) for i in range(0, rs)] else: - diff_poss = (False,) - poss_args = [[argname, argvalues, 0] for argname, argvalues in poss_args.items()] - for current_diff in diff_poss: - # Add an index into args to keep track of current position in argvalues - while True: - current_args = dict([(argname, argvalues[argindex]) - for argname, argvalues, argindex in poss_args]) - if diff_argname is not None: - current_args[diff_argname] = current_diff - constellation = constructor(**current_args) - # If we're differentially encoding test for possible rotations - # Testing for every possible rotation seems a bit overkill - # but better safe than sorry. - if current_diff: - rs = constellation.rotational_symmetry() - rotations = [exp(i*2*pi*(0+1j)/rs) for i in range(0, rs)] - else: - rotations = [None] - for rotation in rotations: - src = gr.vector_source_b(self.src_data) - content = mod_demod(constellation, current_diff, rotation) - dst = gr.vector_sink_b() - self.tb = gr.top_block() - self.tb.connect(src, content, dst) - self.tb.run() - data = dst.data() - # Don't worry about cut off data for now. - first = constellation.bits_per_symbol() - self.assertEqual (self.src_data[first:len(data)], data[first:]) - # Move to next arg combination - for this_poss_arg in poss_args: - argname, argvalues, argindex = this_poss_arg - if argindex < len(argvalues) - 1: - this_poss_arg[2] += 1 - break - else: - this_poss_arg[2] = 0 - if sum([argindex for argname, argvalues, argindex in poss_args]) == 0: - break + rotations = [None] + for rotation in rotations: + src = gr.vector_source_b(self.src_data) + content = mod_demod(constellation, differential, rotation) + dst = gr.vector_sink_b() + self.tb = gr.top_block() + self.tb.connect(src, content, dst) + self.tb.run() + data = dst.data() + # Don't worry about cut off data for now. + first = constellation.bits_per_symbol() + self.assertEqual (self.src_data[first:len(data)], data[first:]) class mod_demod(gr.hier_block2): diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_constellation_receiver.py b/gnuradio-core/src/python/gnuradio/gr/qa_constellation_receiver.py new file mode 100644 index 000000000..8d7719739 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_constellation_receiver.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python +# +# Copyright 2011 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import random + +from gnuradio import gr, blks2, packet_utils, gr_unittest +from gnuradio.utils import mod_codes, alignment + +from qa_constellation import tested_constellations, twod_constell + +# Set a seed so that if errors turn up they are reproducible. +# 1234 fails +random.seed(1239) + +# TESTING PARAMETERS +# The number of symbols to test with. +# We need this many to let the frequency recovery block converge. +DATA_LENGTH = 200000 +# Test fails if fraction of output that is correct is less than this. +REQ_CORRECT = 0.8 + +# CHANNEL PARAMETERS +NOISE_VOLTAGE = 0.01 +FREQUENCY_OFFSET = 0.01 +TIMING_OFFSET = 1.0 + +# RECEIVER PARAMETERS +# Increased from normal default of 0.01 to speed things up. +FREQ_ALPHA = 0.02 +# Decreased from normal default of 0.1 is required for the constellations +# with smaller point separations. +PHASE_ALPHA = 0.02 + + +class test_constellation_receiver (gr_unittest.TestCase): + + # We ignore the first half of the output data since often it takes + # a while for the receiver to lock on. + ignore_fraction = 0.8 + seed = 1234 + max_data_length = DATA_LENGTH * 6 + max_num_samples = 1000 + + def test_basic(self): + """ + Tests a bunch of different constellations by using generic + modulation, a channel, and generic demodulation. The generic + demodulation uses constellation_receiver which is what + we're really trying to test. + """ + # Assumes not more than 64 points in a constellation + # Generates some random input data to use. + self.src_data = tuple( + [random.randint(0,1) for i in range(0, self.max_data_length)]) + # Generates some random indices to use for comparing input and + # output data (a full comparison is too slow in python). + self.indices = alignment.random_sample( + self.max_data_length, self.max_num_samples, self.seed) + + for constellation, differential in tested_constellations(): + # The constellation_receiver doesn't work for constellations + # of multple dimensions (i.e. multiple complex numbers to a + # single symbol). + # That is not implemented since the receiver has no way of + # knowing where the beginning of a symbol is. + # It also doesn't work for non-differential modulation. + if constellation.dimensionality() != 1 or not differential: + continue + data_length = DATA_LENGTH * constellation.bits_per_symbol() + tb = rec_test_tb(constellation, differential, + src_data=self.src_data[:data_length]) + tb.run() + data = tb.dst.data() + d1 = tb.src_data[:int(len(tb.src_data)*self.ignore_fraction)] + d2 = data[:int(len(data)*self.ignore_fraction)] + correct, overlap, offset, indices = alignment.align_sequences( + d1, d2, indices=self.indices) + print(constellation, constellation.arity(), differential, correct, overlap, offset) + self.assertTrue(correct > REQ_CORRECT) + + def single(self): + constellation = blks2.psk_constellation(64, mod_codes.NO_CODE) + tb = rec_test_tb(constellation, True, DATA_LENGTH) + tb.run() + data = tb.dst.data() + d1 = tb.src_data[:int(len(tb.src_data)*self.ignore_fraction)] + d2 = data[:int(len(data)*self.ignore_fraction)] + seed = random.randint(0, 256) + correct, overlap, offset, indices = alignment.align_sequences(d1, d2, seed=seed) + print(constellation, constellation.arity(), correct, overlap, offset) + + +class rec_test_tb (gr.top_block): + """ + Takes a constellation an runs a generic modulation, channel, + and generic demodulation. + """ + def __init__(self, constellation, differential, + data_length=None, src_data=None): + """ + constellation -- a constellation object + differential -- whether differential encoding is used + data_length -- the number of bits of data to use + src_data -- a list of the bits to use + """ + super(rec_test_tb, self).__init__() + # Transmission Blocks + if src_data is None: + self.src_data = tuple([random.randint(0,1) for i in range(0, data_length)]) + else: + self.src_data = src_data + packer = gr.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST) + src = gr.vector_source_b(self.src_data) + mod = blks2.generic_mod(constellation, differential=differential) + # Channel + channel = gr.channel_model(NOISE_VOLTAGE, FREQUENCY_OFFSET, TIMING_OFFSET) + # Receiver Blocks + demod = blks2.generic_demod(constellation, differential=differential, log=True, + freq_alpha=FREQ_ALPHA, + phase_alpha=PHASE_ALPHA) + self.dst = gr.vector_sink_b() + self.connect(src, packer, mod, channel, demod, self.dst) + +if __name__ == '__main__': + gr_unittest.run(test_constellation_receiver, "test_constellation_receiver.xml") |