diff options
Diffstat (limited to 'grc/src/grc_gnuradio/blks2/error_rate.py')
-rw-r--r-- | grc/src/grc_gnuradio/blks2/error_rate.py | 137 |
1 files changed, 0 insertions, 137 deletions
diff --git a/grc/src/grc_gnuradio/blks2/error_rate.py b/grc/src/grc_gnuradio/blks2/error_rate.py deleted file mode 100644 index 9b2df58ef..000000000 --- a/grc/src/grc_gnuradio/blks2/error_rate.py +++ /dev/null @@ -1,137 +0,0 @@ -# Copyright 2008 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# GNU Radio is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3, or (at your option) -# any later version. -# -# GNU Radio is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to -# the Free Software Foundation, Inc., 51 Franklin Street, -# Boston, MA 02110-1301, USA. -# - -default_win_size = 1000 - -from gnuradio import gr -import gnuradio.gr.gr_threading as _threading -import numpy - -#generate 1s counts array -_1s_counts = [sum([1&(i>>j) for j in range(8)]) for i in range(2**8)] - -class input_watcher(_threading.Thread): - """ - Read samples from the message queue and hand them to the callback. - """ - - def __init__(self, msgq, callback): - self._msgq = msgq - self._callback = callback - _threading.Thread.__init__(self) - self.setDaemon(1) - self.keep_running = True - self.start() - - def run(self): - r = '' - while True: - msg = self._msgq.delete_head() - itemsize = int(msg.arg1()) - nitems = int(msg.arg2()) - s = r + msg.to_string() - i = (nitems-nitems%2)*itemsize - r = s[i:] - s = s[:i] - samples = numpy.fromstring(s, numpy.int8) - self._callback(samples) - -class error_rate(gr.hier_block2): - """ - Sample the incoming data streams (byte) and calculate the bit or symbol error rate. - Write the running rate to the output data stream (float). - """ - - def __init__(self, type='BER', win_size=default_win_size, bits_per_symbol=2): - """ - Error rate constructor. - @param type a string 'BER' or 'SER' - @param win_size the number of samples to calculate over - @param bits_per_symbol the number of information bits per symbol (BER only) - """ - #init - gr.hier_block2.__init__( - self, 'error_rate', - gr.io_signature(2, 2, gr.sizeof_char), - gr.io_signature(1, 1, gr.sizeof_float), - ) - assert type in ('BER', 'SER') - self._max_samples = win_size - self._bits_per_symbol = bits_per_symbol - #setup message queue - msg_source = gr.message_source(gr.sizeof_float, 1) - self._msgq_source = msg_source.msgq() - msgq_sink = gr.msg_queue(2) - msg_sink = gr.message_sink(gr.sizeof_char, msgq_sink, False) #False -> blocking - inter = gr.interleave(gr.sizeof_char) - #start thread - self._num_errs = 0 - self._err_index = 0 - self._num_samps = 0 - self._err_array = numpy.zeros(self._max_samples, numpy.int8) - if type == 'BER': - input_watcher(msgq_sink, self._handler_ber) - elif type == 'SER': - input_watcher(msgq_sink, self._handler_ser) - #connect - self.connect(msg_source, self) - self.connect((self, 0), (inter, 0)) - self.connect((self, 1), (inter, 1)) - self.connect(inter, msg_sink) - - def _handler_ber(self, samples): - num = len(samples)/2 - arr = numpy.zeros(num, numpy.float32) - for i in range(num): - old_err = self._err_array[self._err_index] - #record error - self._err_array[self._err_index] = _1s_counts[samples[i*2] ^ samples[i*2 + 1]] - self._num_errs = self._num_errs + self._err_array[self._err_index] - old_err - #increment index - self._err_index = (self._err_index + 1)%self._max_samples - self._num_samps = min(self._num_samps + 1, self._max_samples) - #write sample - arr[i] = float(self._num_errs)/float(self._num_samps*self._bits_per_symbol) - #write message - msg = gr.message_from_string(arr.tostring(), 0, gr.sizeof_float, num) - self._msgq_source.insert_tail(msg) - - def _handler_ser(self, samples): - num = len(samples)/2 - arr = numpy.zeros(num, numpy.float32) - for i in range(num): - old_err = self._err_array[self._err_index] - #record error - ref = samples[i*2] - res = samples[i*2 + 1] - if ref == res: - self._err_array[self._err_index] = 0 - else: - self._err_array[self._err_index] = 1 - #update number of errors - self._num_errs = self._num_errs + self._err_array[self._err_index] - old_err - #increment index - self._err_index = (self._err_index + 1)%self._max_samples - self._num_samps = min(self._num_samps + 1, self._max_samples) - #write sample - arr[i] = float(self._num_errs)/float(self._num_samps) - #write message - msg = gr.message_from_string(arr.tostring(), 0, gr.sizeof_float, num) - self._msgq_source.insert_tail(msg) |