diff options
Diffstat (limited to 'gr-blocks/lib/file_meta_source_impl.cc')
-rw-r--r-- | gr-blocks/lib/file_meta_source_impl.cc | 433 |
1 files changed, 433 insertions, 0 deletions
diff --git a/gr-blocks/lib/file_meta_source_impl.cc b/gr-blocks/lib/file_meta_source_impl.cc new file mode 100644 index 000000000..fb39b205b --- /dev/null +++ b/gr-blocks/lib/file_meta_source_impl.cc @@ -0,0 +1,433 @@ +/* -*- c++ -*- */ +/* + * Copyright 2012 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "file_meta_source_impl.h" +#include <gr_io_signature.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <stdexcept> +#include <cstdio> + +namespace gr { + namespace blocks { + +// win32 (mingw/msvc) specific +#ifdef HAVE_IO_H +#include <io.h> +#endif +#ifdef O_BINARY +#define OUR_O_BINARY O_BINARY +#else +#define OUR_O_BINARY 0 +#endif + +// should be handled via configure +#ifdef O_LARGEFILE +#define OUR_O_LARGEFILE O_LARGEFILE +#else +#define OUR_O_LARGEFILE 0 +#endif + + file_meta_source::sptr + file_meta_source::make(const std::string &filename, + bool repeat, + bool detached_header, + const std::string &hdr_filename) + { + return gnuradio::get_initial_sptr + (new file_meta_source_impl(filename, + repeat, + detached_header, + hdr_filename)); + } + + file_meta_source_impl::file_meta_source_impl(const std::string &filename, + bool repeat, + bool detached_header, + const std::string &hdr_filename) + : gr_sync_block("file_meta_source", + gr_make_io_signature(0, 0, 0), + gr_make_io_signature(1, 1, 1)), + d_itemsize(0), d_samp_rate(0), + d_seg_size(0), + d_updated(false), d_repeat(repeat) + { + d_fp = 0; + d_new_fp = 0; + d_hdr_fp = 0; + d_new_hdr_fp = 0; + + if(detached_header == true) { + d_state = STATE_DETACHED; + } + else + d_state = STATE_INLINE; + + if(!open(filename, hdr_filename)) + throw std::runtime_error("file_meta_source: can't open file\n"); + + do_update(); + + pmt_t hdr = PMT_NIL, extras = PMT_NIL; + if(read_header(hdr, extras)) { + parse_header(hdr, 0, d_tags); + parse_extras(extras, 0, d_tags); + } + else + throw std::runtime_error("file_meta_source: could not read header.\n"); + + // Set output signature based on itemsize info in header + set_output_signature(gr_make_io_signature(1, 1, d_itemsize)); + } + + file_meta_source_impl::~file_meta_source_impl() + { + close(); + + if(d_fp) { + fclose(d_fp); + d_fp = 0; + } + + if(d_state == STATE_DETACHED) { + if(d_hdr_fp) { + fclose(d_hdr_fp); + d_hdr_fp = 0; + } + } + } + + bool + file_meta_source_impl::read_header(pmt_t &hdr, pmt_t &extras) + { + // Select which file handle to read from. + FILE *fp; + if(d_state == STATE_DETACHED) + fp = d_hdr_fp; + else + fp = d_fp; + + size_t ret; + size_t size = 0; + std::string str; + char *hdr_buffer = new char[METADATA_HEADER_SIZE]; + while(size < METADATA_HEADER_SIZE) { + ret = fread(&hdr_buffer[size], sizeof(char), METADATA_HEADER_SIZE-size, fp); + if(ret == 0) { + delete [] hdr_buffer; + if(feof(fp)) + return false; + else { + std::stringstream s; + s << "file_meta_source: error occurred extracting header: " + << strerror(errno) << std::endl; + throw std::runtime_error(s.str()); + } + } + size += ret; + } + + // Convert to string or the char array gets confused by the \0 + str.insert(0, hdr_buffer, METADATA_HEADER_SIZE); + hdr = pmt_deserialize_str(str); + delete [] hdr_buffer; + + uint64_t seg_start, extra_len; + pmt_t r, dump; + if(pmt_dict_has_key(hdr, pmt_string_to_symbol("strt"))) { + r = pmt_dict_ref(hdr, pmt_string_to_symbol("strt"), dump); + seg_start = pmt_to_uint64(r); + extra_len = seg_start - METADATA_HEADER_SIZE; + } + + if(extra_len > 0) { + size = 0; + hdr_buffer = new char[extra_len]; + while(size < extra_len) { + ret = fread(&hdr_buffer[size], sizeof(char), extra_len-size, fp); + if(ret == 0) { + delete [] hdr_buffer; + if(feof(fp)) + return false; + else { + std::stringstream s; + s << "file_meta_source: error occurred extracting extras: " + << strerror(errno) << std::endl; + throw std::runtime_error(s.str()); + } + } + size += ret; + } + + str.clear(); + str.insert(0, hdr_buffer, extra_len); + extras = pmt_deserialize_str(str); + delete [] hdr_buffer; + } + + return true; + } + + void + file_meta_source_impl::parse_header(pmt_t hdr, uint64_t offset, + std::vector<gr_tag_t> &tags) + { + pmt_t r, key; + + // GET SAMPLE RATE + key = pmt_string_to_symbol("rx_rate"); + if(pmt_dict_has_key(hdr, key)) { + r = pmt_dict_ref(hdr, key, PMT_NIL); + d_samp_rate = pmt_to_double(r); + + gr_tag_t t; + t.offset = offset; + t.key = key; + t.value = r; + t.srcid = alias_pmt(); + tags.push_back(t); + } + else { + throw std::runtime_error("file_meta_source: Could not extract sample rate.\n"); + } + + // GET TIME STAMP + key = pmt_string_to_symbol("rx_time"); + if(pmt_dict_has_key(hdr, key)) { + d_time_stamp = pmt_dict_ref(hdr, key, PMT_NIL); + + gr_tag_t t; + t.offset = offset; + t.key = key; + t.value = d_time_stamp; + t.srcid = alias_pmt(); + tags.push_back(t); + } + else { + throw std::runtime_error("file_meta_source: Could not extract time stamp.\n"); + } + + // GET ITEM SIZE OF DATA + if(pmt_dict_has_key(hdr, pmt_string_to_symbol("size"))) { + d_itemsize = pmt_to_long(pmt_dict_ref(hdr, pmt_string_to_symbol("size"), PMT_NIL)); + } + else { + throw std::runtime_error("file_meta_source: Could not extract item size.\n"); + } + + // GET SEGMENT SIZE + if(pmt_dict_has_key(hdr, pmt_string_to_symbol("bytes"))) { + d_seg_size = pmt_to_uint64(pmt_dict_ref(hdr, pmt_string_to_symbol("bytes"), PMT_NIL)); + + // Convert from bytes to items + d_seg_size /= d_itemsize; + } + else { + throw std::runtime_error("file_meta_source: Could not extract segment size.\n"); + } + } + + void + file_meta_source_impl::parse_extras(pmt_t extras, uint64_t offset, + std::vector<gr_tag_t> &tags) + { + pmt_t item, key, val; + + size_t nitems = pmt_length(extras); + for(size_t i = 0; i < nitems; i++) { + item = pmt_nth(i, extras); + key = pmt_car(item); + val = pmt_cdr(item); + + gr_tag_t t; + t.offset = offset; + t.key = key; + t.value = val; + t.srcid = alias_pmt(); + tags.push_back(t); + } + } + + bool + file_meta_source_impl::open(const std::string &filename, + const std::string &hdr_filename) + { + bool ret = true; + if(d_state == STATE_DETACHED) { + std::string s; + if(hdr_filename == "") + s = filename + ".hdr"; + else + s = hdr_filename; + ret = _open(&d_new_hdr_fp, s.c_str()); + } + + ret = ret && _open(&d_new_fp, filename.c_str()); + d_updated = true; + return ret; + } + + bool + file_meta_source_impl::_open(FILE **fp, const char *filename) + { + gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function + + bool ret = true; + int fd; + + if((fd = ::open(filename, + O_RDONLY|OUR_O_LARGEFILE|OUR_O_BINARY)) < 0) { + perror(filename); + return false; + } + + if(*fp) { // if we've already got a new one open, close it + fclose(*fp); + fp = 0; + } + + if((*fp = fdopen(fd, "rb")) == NULL) { + perror(filename); + ::close(fd); // don't leak file descriptor if fdopen fails. + } + + ret = fp != 0; + + return ret; + } + + void + file_meta_source_impl::close() + { + gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function + if(d_state == STATE_DETACHED) { + if(d_new_hdr_fp) { + fclose(d_new_hdr_fp); + d_new_hdr_fp = 0; + } + } + + if(d_new_fp) { + fclose(d_new_fp); + d_new_fp = 0; + } + d_updated = true; + } + + void + file_meta_source_impl::do_update() + { + if(d_updated) { + gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this block + if(d_state == STATE_DETACHED) { + if(d_hdr_fp) + fclose(d_hdr_fp); + d_hdr_fp = d_new_hdr_fp; // install new file pointer + d_new_hdr_fp = 0; + } + + if(d_fp) + fclose(d_fp); + d_fp = d_new_fp; // install new file pointer + d_new_fp = 0; + + d_updated = false; + } + } + + int + file_meta_source_impl::work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) + { + // We've reached the end of a segment; parse the next header and get + // the new tags to send and set the next segment size. + if(d_seg_size == 0) { + pmt_t hdr=PMT_NIL, extras=PMT_NIL; + if(read_header(hdr, extras)) { + parse_header(hdr, nitems_written(0), d_tags); + parse_extras(extras, nitems_written(0), d_tags); + } + else { + return -1; + } + } + + char *out = (char*)output_items[0]; + int i; + int seg_size = std::min(noutput_items, (int)d_seg_size); + int size = seg_size; + + do_update(); // update d_fp is reqd + if(d_fp == NULL) + throw std::runtime_error("work with file not open"); + + // Push all tags onto the stream and remove them from the vector + while(!d_tags.empty()) { + add_item_tag(0, d_tags.back()); + d_tags.pop_back(); + } + + gruel::scoped_lock lock(d_mutex); // hold for the rest of this function + while(size) { + i = fread(out, d_itemsize, size, d_fp); + + size -= i; + d_seg_size -= i; + out += i * d_itemsize; + + if(size == 0) // done + break; + + if(i > 0) // short read, try again + continue; + + // We got a zero from fread. This is either EOF or error. In + // any event, if we're in repeat mode, seek back to the beginning + // of the file and try again, else break + + if(!d_repeat) + break; + + if(fseek(d_fp, 0, SEEK_SET) == -1) { + std::stringstream s; + s << "[" << __FILE__ << "]" << " fseek failed" << std::endl; + throw std::runtime_error(s.str()); + } + } + + if(size > 0) { // EOF or error + if(size == seg_size) // we didn't read anything; say we're done + return -1; + return seg_size - size; // else return partial result + } + + return seg_size; + } + + } /* namespace blocks */ +} /* namespace gr */ |