diff options
Diffstat (limited to 'grc/src/grc_gnuradio/blks2/probe.py')
-rw-r--r-- | grc/src/grc_gnuradio/blks2/probe.py | 123 |
1 files changed, 0 insertions, 123 deletions
diff --git a/grc/src/grc_gnuradio/blks2/probe.py b/grc/src/grc_gnuradio/blks2/probe.py deleted file mode 100644 index 8db81f057..000000000 --- a/grc/src/grc_gnuradio/blks2/probe.py +++ /dev/null @@ -1,123 +0,0 @@ -# -# Copyright 2008 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# GNU Radio is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3, or (at your option) -# any later version. -# -# GNU Radio is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to -# the Free Software Foundation, Inc., 51 Franklin Street, -# Boston, MA 02110-1301, USA. -# - -from gnuradio import gr -import threading -import numpy -import time - -####################################################################################### -## Probe: Function -####################################################################################### -class probe_function(gr.hier_block2, threading.Thread): - """ - The thread polls the function for values and writes to a message source. - """ - - def __init__(self, probe_callback, probe_rate): - #init hier block - gr.hier_block2.__init__( - self, 'probe_function', - gr.io_signature(0, 0, 0), - gr.io_signature(1, 1, gr.sizeof_float), - ) - self._probe_callback = probe_callback - self.set_probe_rate(probe_rate) - #create message source - message_source = gr.message_source(gr.sizeof_float, 1) - self._msgq = message_source.msgq() - #connect - self.connect(message_source, self) - #setup thread - threading.Thread.__init__(self) - self.setDaemon(True) - self.start() - - def run(self): - """ - Infinite polling loop. - """ - while True: - time.sleep(1.0/self._probe_rate) - arr = numpy.array(self._probe_callback(), numpy.float32) - msg = gr.message_from_string(arr.tostring(), 0, gr.sizeof_float, 1) - self._msgq.insert_tail(msg) - - def set_probe_rate(self, probe_rate): - self._probe_rate = probe_rate - -class _probe_base(gr.hier_block2): - def __init__(self, probe_block, probe_callback, probe_rate): - #init hier block - gr.hier_block2.__init__( - self, 'probe', - gr.io_signature(1, 1, probe_block.input_signature().sizeof_stream_items()[0]), - gr.io_signature(1, 1, gr.sizeof_float), - ) - probe_function_block = probe_function(probe_callback, probe_rate) - #forward callbacks - self.set_probe_rate = probe_function_block.set_probe_rate - #connect - self.connect(self, probe_block) - self.connect(probe_function_block, self) - -####################################################################################### -## Probe: Average Magnitude Squared -####################################################################################### -class _probe_avg_mag_sqrd_base(_probe_base): - def __init__(self, threshold, alpha, probe_rate): - #create block - probe_block = self._probe_block_contructor[0](threshold, alpha) - #forward callbacks - self.set_alpha = probe_block.set_alpha - self.set_threshold = probe_block.set_threshold - #init - _probe_base.__init__(self, probe_block, probe_block.level, probe_rate) - -class probe_avg_mag_sqrd_c(_probe_avg_mag_sqrd_base): _probe_block_contructor = (gr.probe_avg_mag_sqrd_c,) -class probe_avg_mag_sqrd_f(_probe_avg_mag_sqrd_base): _probe_block_contructor = (gr.probe_avg_mag_sqrd_f,) - -####################################################################################### -## Probe: Density -####################################################################################### -class probe_density_b(_probe_base): - def __init__(self, alpha, probe_rate): - #create block - probe_block = gr.probe_density_b(alpha) - #forward callbacks - self.set_alpha = probe_block.set_alpha - #init - _probe_base.__init__(self, probe_block, probe_block.density, probe_rate) - -####################################################################################### -## Probe: MPSK SNR -####################################################################################### -class probe_mpsk_snr_c(_probe_base): - def __init__(self, type, alpha, probe_rate): - """ - Type can be "snr", "signal_mean", or "noise_variance" - """ - #create block - probe_block = gr.probe_mpsk_snr_c(alpha) - #forward callbacks - self.set_alpha = probe_block.set_alpha - #init - _probe_base.__init__(self, probe_block, getattr(probe_block, type), probe_rate) |