summaryrefslogtreecommitdiff
path: root/gr-blocks/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-blocks/python')
-rw-r--r--gr-blocks/python/CMakeLists.txt1
-rw-r--r--gr-blocks/python/parse_file_metadata.py188
-rw-r--r--gr-blocks/python/qa_file_metadata.py197
3 files changed, 386 insertions, 0 deletions
diff --git a/gr-blocks/python/CMakeLists.txt b/gr-blocks/python/CMakeLists.txt
index 710ab155c..cab0b956f 100644
--- a/gr-blocks/python/CMakeLists.txt
+++ b/gr-blocks/python/CMakeLists.txt
@@ -23,6 +23,7 @@ include(GrPython)
GR_PYTHON_INSTALL(
FILES
__init__.py
+ parse_file_metadata.py
DESTINATION ${GR_PYTHON_DIR}/gnuradio/blocks
COMPONENT "blocks_python"
)
diff --git a/gr-blocks/python/parse_file_metadata.py b/gr-blocks/python/parse_file_metadata.py
new file mode 100644
index 000000000..ec7bf6e80
--- /dev/null
+++ b/gr-blocks/python/parse_file_metadata.py
@@ -0,0 +1,188 @@
+#!/usr/bin/env python
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import sys
+from gnuradio import gr
+from gruel import pmt
+
+try:
+ import blocks_swig as blocks
+except:
+ from gnuradio import blocks
+
+'''
+sr Sample rate (samples/second)
+time Time as uint64(secs), double(fractional secs)
+type Type of data (see gr_file_types enum)
+cplx is complex? (True or False)
+strt Start of data (or size of header) in bytes
+size Size of data in bytes
+'''
+
+HEADER_LENGTH = blocks.METADATA_HEADER_SIZE
+
+ftype_to_string = {blocks.GR_FILE_BYTE: "bytes",
+ blocks.GR_FILE_SHORT: "short",
+ blocks.GR_FILE_INT: "int",
+ blocks.GR_FILE_LONG: "long",
+ blocks.GR_FILE_LONG_LONG: "long long",
+ blocks.GR_FILE_FLOAT: "float",
+ blocks.GR_FILE_DOUBLE: "double" }
+
+ftype_to_size = {blocks.GR_FILE_BYTE: gr.sizeof_char,
+ blocks.GR_FILE_SHORT: gr.sizeof_short,
+ blocks.GR_FILE_INT: gr.sizeof_int,
+ blocks.GR_FILE_LONG: gr.sizeof_int,
+ blocks.GR_FILE_LONG_LONG: 2*gr.sizeof_int,
+ blocks.GR_FILE_FLOAT: gr.sizeof_float,
+ blocks.GR_FILE_DOUBLE: gr.sizeof_double}
+
+def parse_header(p, VERBOSE=False):
+ dump = pmt.PMT_NIL
+
+ info = dict()
+
+ if(pmt.pmt_is_dict(p) is False):
+ sys.stderr.write("Header is not a PMT dictionary: invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # GET FILE FORMAT VERSION NUMBER
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("version"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("version"), dump)
+ version = pmt.pmt_to_long(r)
+ if(VERBOSE):
+ print "Version Number: {0}".format(version)
+ else:
+ sys.stderr.write("Could not find key 'sr': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT SAMPLE RATE
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("rx_rate"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("rx_rate"), dump)
+ samp_rate = pmt.pmt_to_double(r)
+ info["rx_rate"] = samp_rate
+ if(VERBOSE):
+ print "Sample Rate: {0:.2f} sps".format(samp_rate)
+ else:
+ sys.stderr.write("Could not find key 'sr': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT TIME STAMP
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("rx_time"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("rx_time"), dump)
+ pmt_secs = pmt.pmt_tuple_ref(r, 0)
+ pmt_fracs = pmt.pmt_tuple_ref(r, 1)
+ secs = float(pmt.pmt_to_uint64(pmt_secs))
+ fracs = pmt.pmt_to_double(pmt_fracs)
+ t = secs + fracs
+ info["rx_time"] = t
+ if(VERBOSE):
+ print "Seconds: {0:.6f}".format(t)
+ else:
+ sys.stderr.write("Could not find key 'time': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT ITEM SIZE
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("size"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("size"), dump)
+ dsize = pmt.pmt_to_long(r)
+ info["size"] = dsize
+ if(VERBOSE):
+ print "Item size: {0}".format(dsize)
+ else:
+ sys.stderr.write("Could not find key 'size': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT DATA TYPE
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("type"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("type"), dump)
+ dtype = pmt.pmt_to_long(r)
+ stype = ftype_to_string[dtype]
+ info["type"] = stype
+ if(VERBOSE):
+ print "Data Type: {0} ({1})".format(stype, dtype)
+ else:
+ sys.stderr.write("Could not find key 'type': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT COMPLEX
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("cplx"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("cplx"), dump)
+ cplx = pmt.pmt_to_bool(r)
+ info["cplx"] = cplx
+ if(VERBOSE):
+ print "Complex? {0}".format(cplx)
+ else:
+ sys.stderr.write("Could not find key 'cplx': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT WHERE CURRENT SEGMENT STARTS
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("strt"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("strt"), dump)
+ seg_start = pmt.pmt_to_uint64(r)
+ info["hdr_len"] = seg_start
+ info["extra_len"] = seg_start - HEADER_LENGTH
+ info["has_extra"] = info["extra_len"] > 0
+ if(VERBOSE):
+ print "Header Length: {0} bytes".format(info["hdr_len"])
+ print "Extra Length: {0}".format((info["extra_len"]))
+ print "Extra Header? {0}".format(info["has_extra"])
+ else:
+ sys.stderr.write("Could not find key 'strt': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ # EXTRACT SIZE OF DATA
+ if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("bytes"))):
+ r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("bytes"), dump)
+ nbytes = pmt.pmt_to_uint64(r)
+
+ nitems = nbytes/dsize
+ info["nitems"] = nitems
+ info["nbytes"] = nbytes
+
+ if(VERBOSE):
+ print "Size of Data: {0} bytes".format(nbytes)
+ print " {0} items".format(nitems)
+ else:
+ sys.stderr.write("Could not find key 'size': invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ return info
+
+# IF THERE IS EXTRA DATA, PULL OUT THE DICTIONARY AND PARSE IT
+def parse_extra_dict(p, info, VERBOSE=False):
+ if(pmt.pmt_is_dict(p) is False):
+ sys.stderr.write("Extra header is not a PMT dictionary: invalid or corrupt data file.\n")
+ sys.exit(1)
+
+ items = pmt.pmt_dict_items(p)
+ nitems = pmt.pmt_length(items)
+ for i in xrange(nitems):
+ item = pmt.pmt_nth(i, items)
+ key = pmt.pmt_symbol_to_string(pmt.pmt_car(item))
+ val = pmt.pmt_cdr(item)
+ info[key] = val
+ if(VERBOSE):
+ print "{0}: ".format(key)
+ pmt.pmt_print(val)
+
+ return info
diff --git a/gr-blocks/python/qa_file_metadata.py b/gr-blocks/python/qa_file_metadata.py
new file mode 100644
index 000000000..9f4a331d6
--- /dev/null
+++ b/gr-blocks/python/qa_file_metadata.py
@@ -0,0 +1,197 @@
+#!/usr/bin/env python
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import parse_file_metadata
+import blocks_swig as blocks
+import pmt
+import os, time
+
+class test_file_metadata(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_001(self):
+ outfile = "test_out.dat"
+
+ detached = False
+ samp_rate = 200000
+ key = pmt.pmt_intern("samp_rate")
+ val = pmt.pmt_from_double(samp_rate)
+ extras = pmt.pmt_make_dict()
+ extras = pmt.pmt_dict_add(extras, key, val)
+ extras_str = pmt.pmt_serialize_str(extras)
+
+ src = gr.sig_source_c(samp_rate, gr.GR_COS_WAVE, 1000, 1, 0)
+ head = gr.head(gr.sizeof_gr_complex, 1000)
+ fsnk = blocks.file_meta_sink(gr.sizeof_gr_complex, outfile,
+ samp_rate, 1,
+ blocks.GR_FILE_FLOAT, True,
+ 1000000, extras_str, detached)
+ fsnk.set_unbuffered(True)
+
+ self.tb.connect(src, head, fsnk)
+ self.tb.run()
+ fsnk.close()
+
+ handle = open(outfile, "rb")
+ header_str = handle.read(parse_file_metadata.HEADER_LENGTH)
+ if(len(header_str) == 0):
+ self.assertFalse()
+
+ try:
+ header = pmt.pmt_deserialize_str(header_str)
+ except RuntimeError:
+ self.assertFalse()
+
+ info = parse_file_metadata.parse_header(header, False)
+
+ extra_str = handle.read(info["extra_len"])
+ self.assertGreater(len(extra_str), 0)
+ handle.close()
+
+ try:
+ extra = pmt.pmt_deserialize_str(extra_str)
+ except RuntimeError:
+ self.assertFalse()
+
+ extra_info = parse_file_metadata.parse_extra_dict(extra, info, False)
+
+ self.assertEqual(info['rx_rate'], samp_rate)
+ self.assertEqual(pmt.pmt_to_double(extra_info['samp_rate']), samp_rate)
+
+
+ # Test file metadata source
+ # Create a new sig source to start from the beginning
+ src2 = gr.sig_source_c(samp_rate, gr.GR_COS_WAVE, 1000, 1, 0)
+ fsrc = blocks.file_meta_source(outfile, False)
+ vsnk = gr.vector_sink_c()
+ tsnk = gr.tag_debug(gr.sizeof_gr_complex, "QA")
+ ssnk = gr.vector_sink_c()
+ head.reset()
+ self.tb.disconnect(src, head, fsnk)
+ self.tb.connect(fsrc, vsnk)
+ self.tb.connect(fsrc, tsnk)
+ self.tb.connect(src2, head, ssnk)
+ self.tb.run()
+
+ # Test to make sure tags with 'samp_rate' and 'rx_rate' keys
+ # were generated and received correctly.
+ tags = tsnk.current_tags()
+ for t in tags:
+ if(pmt.pmt_eq(t.key, pmt.pmt_intern("samp_rate"))):
+ self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
+ elif(pmt.pmt_eq(t.key, pmt.pmt_intern("rx_rate"))):
+ self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
+
+ # Test that the data portion was extracted and received correctly.
+ self.assertComplexTuplesAlmostEqual(vsnk.data(), ssnk.data(), 5)
+
+ os.remove(outfile)
+
+ def test_002(self):
+ outfile = "test_out.dat"
+ outfile_hdr = "test_out.dat.hdr"
+
+ detached = True
+ samp_rate = 200000
+ key = pmt.pmt_intern("samp_rate")
+ val = pmt.pmt_from_double(samp_rate)
+ extras = pmt.pmt_make_dict()
+ extras = pmt.pmt_dict_add(extras, key, val)
+ extras_str = pmt.pmt_serialize_str(extras)
+
+ src = gr.sig_source_c(samp_rate, gr.GR_COS_WAVE, 1000, 1, 0)
+ head = gr.head(gr.sizeof_gr_complex, 1000)
+ fsnk = blocks.file_meta_sink(gr.sizeof_gr_complex, outfile,
+ samp_rate, 1,
+ blocks.GR_FILE_FLOAT, True,
+ 1000000, extras_str, detached)
+ fsnk.set_unbuffered(True)
+
+ self.tb.connect(src, head, fsnk)
+ self.tb.run()
+ fsnk.close()
+
+ # Open detached header for reading
+ handle = open(outfile_hdr, "rb")
+ header_str = handle.read(parse_file_metadata.HEADER_LENGTH)
+ if(len(header_str) == 0):
+ self.assertFalse()
+
+ try:
+ header = pmt.pmt_deserialize_str(header_str)
+ except RuntimeError:
+ self.assertFalse()
+
+ info = parse_file_metadata.parse_header(header, False)
+
+ extra_str = handle.read(info["extra_len"])
+ self.assertGreater(len(extra_str), 0)
+ handle.close()
+
+ try:
+ extra = pmt.pmt_deserialize_str(extra_str)
+ except RuntimeError:
+ self.assertFalse()
+
+ extra_info = parse_file_metadata.parse_extra_dict(extra, info, False)
+
+ self.assertEqual(info['rx_rate'], samp_rate)
+ self.assertEqual(pmt.pmt_to_double(extra_info['samp_rate']), samp_rate)
+
+
+ # Test file metadata source
+ # Create a new sig source to start from the beginning
+ src2 = gr.sig_source_c(samp_rate, gr.GR_COS_WAVE, 1000, 1, 0)
+ fsrc = blocks.file_meta_source(outfile, False, detached, outfile_hdr)
+ vsnk = gr.vector_sink_c()
+ tsnk = gr.tag_debug(gr.sizeof_gr_complex, "QA")
+ ssnk = gr.vector_sink_c()
+ head.reset()
+ self.tb.disconnect(src, head, fsnk)
+ self.tb.connect(fsrc, vsnk)
+ self.tb.connect(fsrc, tsnk)
+ self.tb.connect(src2, head, ssnk)
+ self.tb.run()
+
+ # Test to make sure tags with 'samp_rate' and 'rx_rate' keys
+ # were generated and received correctly.
+ tags = tsnk.current_tags()
+ for t in tags:
+ if(pmt.pmt_eq(t.key, pmt.pmt_intern("samp_rate"))):
+ self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
+ elif(pmt.pmt_eq(t.key, pmt.pmt_intern("rx_rate"))):
+ self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
+
+ # Test that the data portion was extracted and received correctly.
+ self.assertComplexTuplesAlmostEqual(vsnk.data(), ssnk.data(), 5)
+
+ os.remove(outfile)
+ os.remove(outfile_hdr)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_file_metadata, "test_file_metadata.xml")