diff options
author | Tom Rondeau | 2012-12-07 12:11:46 -0500 |
---|---|---|
committer | Tom Rondeau | 2012-12-07 12:11:46 -0500 |
commit | 619a167471d060e2f1b49a9aac55a23b327afa88 (patch) | |
tree | d75171f1b1f6b9881bee20c7100883909e0f7b3c /gnuradio-core | |
parent | e8cf359e43f533fd3309389e5d62642f3080ac15 (diff) | |
download | gnuradio-619a167471d060e2f1b49a9aac55a23b327afa88.tar.gz gnuradio-619a167471d060e2f1b49a9aac55a23b327afa88.tar.bz2 gnuradio-619a167471d060e2f1b49a9aac55a23b327afa88.zip |
core: metadata file sink can set detached header.
This breaks away from subclassing gr_file_sink_base since we have to keep track of two files.
Diffstat (limited to 'gnuradio-core')
-rw-r--r-- | gnuradio-core/src/lib/io/gr_file_meta_sink.cc | 280 | ||||
-rw-r--r-- | gnuradio-core/src/lib/io/gr_file_meta_sink.h | 41 | ||||
-rw-r--r-- | gnuradio-core/src/lib/io/gr_file_meta_sink.i | 6 | ||||
-rw-r--r-- | gnuradio-core/src/lib/io/gr_file_sink_base.cc | 4 |
4 files changed, 261 insertions, 70 deletions
diff --git a/gnuradio-core/src/lib/io/gr_file_meta_sink.cc b/gnuradio-core/src/lib/io/gr_file_meta_sink.cc index da3be8060..43900bcd9 100644 --- a/gnuradio-core/src/lib/io/gr_file_meta_sink.cc +++ b/gnuradio-core/src/lib/io/gr_file_meta_sink.cc @@ -26,35 +26,70 @@ #include <gr_file_meta_sink.h> #include <gr_io_signature.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <gruel/thread.h> #include <stdexcept> +// win32 (mingw/msvc) specific +#ifdef HAVE_IO_H +#include <io.h> +#endif +#ifdef O_BINARY +#define OUR_O_BINARY O_BINARY +#else +#define OUR_O_BINARY 0 +#endif + +// should be handled via configure +#ifdef O_LARGEFILE +#define OUR_O_LARGEFILE O_LARGEFILE +#else +#define OUR_O_LARGEFILE 0 +#endif + gr_file_meta_sink_sptr gr_make_file_meta_sink(size_t itemsize, const char *filename, double samp_rate, double relative_rate, gr_file_types type, bool complex, size_t max_segment_size, - const std::string &extra_dict) + const std::string &extra_dict, + bool detached_header) { return gnuradio::get_initial_sptr (new gr_file_meta_sink(itemsize, filename, samp_rate, relative_rate, type, complex, max_segment_size, - extra_dict)); + extra_dict, + detached_header)); } gr_file_meta_sink::gr_file_meta_sink(size_t itemsize, const char *filename, double samp_rate, double relative_rate, gr_file_types type, bool complex, size_t max_segment_size, - const std::string &extra_dict) + const std::string &extra_dict, + bool detached_header) : gr_sync_block("file_meta_sink", gr_make_io_signature(1, 1, itemsize), gr_make_io_signature(0, 0, 0)), - gr_file_sink_base(filename, true), d_itemsize(itemsize), + d_itemsize(itemsize), d_samp_rate(samp_rate), d_relative_rate(relative_rate), - d_max_seg_size(max_segment_size), d_total_seg_size(0) + d_max_seg_size(max_segment_size), d_total_seg_size(0), + d_updated(false), d_unbuffered(false) { + d_fp = 0; + d_new_fp = 0; + d_hdr_fp = 0; + d_new_hdr_fp = 0; + + if(detached_header == true) + d_state = STATE_DETACHED; + else + d_state = STATE_INLINE; + if(!open(filename)) throw std::runtime_error("file_meta_sink: can't open file\n"); @@ -86,14 +121,120 @@ gr_file_meta_sink::gr_file_meta_sink(size_t itemsize, const char *filename, d_header = pmt_dict_add(d_header, mp("strt"), pmt_from_uint64(METADATA_HEADER_SIZE+d_extra_size)); d_header = pmt_dict_add(d_header, mp("size"), pmt_from_uint64(0)); - write_header(d_header, d_extra); + do_update(); + + if(d_state == STATE_DETACHED) + write_header(d_hdr_fp, d_header, d_extra); + else + write_header(d_fp, d_header, d_extra); +} + +gr_file_meta_sink::~gr_file_meta_sink() +{ + update_last_header(); + + close(); + + if(d_fp) { + fclose(d_fp); + d_fp = 0; + } + + if(d_state == STATE_DETACHED) { + if(d_hdr_fp) { + fclose(d_hdr_fp); + d_hdr_fp = 0; + } + } +} + +bool +gr_file_meta_sink::open(const char *filename) +{ + bool ret = true; + if(d_state == STATE_DETACHED) { + std::stringstream s; + s << filename << ".hdr"; + ret = _open(&d_new_hdr_fp, s.str().c_str()); + } + + ret = ret && _open(&d_new_fp, filename); + d_updated = true; + return ret; +} + +bool +gr_file_meta_sink::_open(FILE **fp, const char *filename) +{ + gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function + + bool ret = true; + int fd; + + if((fd = ::open(filename, + O_WRONLY|O_CREAT|O_TRUNC|OUR_O_LARGEFILE|OUR_O_BINARY, + 0664)) < 0){ + perror(filename); + return false; + } + + if(*fp) { // if we've already got a new one open, close it + fclose(*fp); + fp = 0; + } + + if((*fp = fdopen(fd, "wb")) == NULL) { + perror(filename); + ::close(fd); // don't leak file descriptor if fdopen fails. + } + + ret = fp != 0; + + return ret; } void -gr_file_meta_sink::write_header(pmt_t header, pmt_t extra) +gr_file_meta_sink::close() { - do_update(); + gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function + if(d_state == STATE_DETACHED) { + if(d_new_hdr_fp) { + fclose(d_new_hdr_fp); + d_new_hdr_fp = 0; + } + } + + if(d_new_fp) { + fclose(d_new_fp); + d_new_fp = 0; + } + d_updated = true; +} + +void +gr_file_meta_sink::do_update() +{ + if(d_updated) { + gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this block + if(d_state == STATE_DETACHED) { + if(d_hdr_fp) + fclose(d_hdr_fp); + d_hdr_fp = d_new_hdr_fp; // install new file pointer + d_new_hdr_fp = 0; + } + if(d_fp) + fclose(d_fp); + d_fp = d_new_fp; // install new file pointer + d_new_fp = 0; + + d_updated = false; + } +} + +void +gr_file_meta_sink::write_header(FILE *fp, pmt_t header, pmt_t extra) +{ std::string header_str = pmt_serialize_str(header); std::string extra_str = pmt_serialize_str(extra); @@ -103,10 +244,10 @@ gr_file_meta_sink::write_header(pmt_t header, pmt_t extra) size_t nwritten = 0; while(nwritten < header_str.size()) { std::string sub = header_str.substr(nwritten); - int count = fwrite(sub.c_str(), sizeof(char), sub.size(), d_fp); + int count = fwrite(sub.c_str(), sizeof(char), sub.size(), fp); nwritten += count; - if((count == 0) && (ferror(d_fp))) { - fclose(d_fp); + if((count == 0) && (ferror(fp))) { + fclose(fp); throw std::runtime_error("file_meta_sink: error writing header to file.\n"); } } @@ -114,10 +255,10 @@ gr_file_meta_sink::write_header(pmt_t header, pmt_t extra) nwritten = 0; while(nwritten < extra_str.size()) { std::string sub = extra_str.substr(nwritten); - int count = fwrite(sub.c_str(), sizeof(char), sub.size(), d_fp); + int count = fwrite(sub.c_str(), sizeof(char), sub.size(), fp); nwritten += count; - if((count == 0) && (ferror(d_fp))) { - fclose(d_fp); + if((count == 0) && (ferror(fp))) { + fclose(fp); throw std::runtime_error("file_meta_sink: error writing extra to file.\n"); } } @@ -142,26 +283,49 @@ gr_file_meta_sink::update_header(pmt_t key, pmt_t value) else { d_extra = pmt_dict_add(d_extra, key, value); d_extra_size = pmt_serialize_str(d_extra).size(); - d_header = pmt_dict_add(d_header, mp("strt"), - pmt_from_uint64(METADATA_HEADER_SIZE+d_extra_size)); } } void gr_file_meta_sink::update_last_header() { + if(d_state == STATE_DETACHED) + update_last_header_detached(); + else + update_last_header_inline(); +} + +void +gr_file_meta_sink::update_last_header_inline() +{ // Update the last header info with the number of samples this // block represents. - size_t hdrlen = METADATA_HEADER_SIZE+d_extra_size; + + size_t hdrlen = pmt_to_uint64(pmt_dict_ref(d_header, mp("strt"), PMT_NIL)); size_t seg_size = d_itemsize*d_total_seg_size; pmt_t s = pmt_from_uint64(seg_size); update_header(mp("size"), s); + update_header(mp("strt"), pmt_from_uint64(METADATA_HEADER_SIZE+d_extra_size)); fseek(d_fp, -seg_size-hdrlen, SEEK_CUR); - write_header(d_header, d_extra); + write_header(d_fp, d_header, d_extra); fseek(d_fp, seg_size, SEEK_CUR); } void +gr_file_meta_sink::update_last_header_detached() +{ + // Update the last header info with the number of samples this + // block represents. + size_t hdrlen = pmt_to_uint64(pmt_dict_ref(d_header, mp("strt"), PMT_NIL)); + size_t seg_size = d_itemsize*d_total_seg_size; + pmt_t s = pmt_from_uint64(seg_size); + update_header(mp("size"), s); + update_header(mp("strt"), pmt_from_uint64(METADATA_HEADER_SIZE+d_extra_size)); + fseek(d_hdr_fp, -hdrlen, SEEK_CUR); + write_header(d_hdr_fp, d_header, d_extra); +} + +void gr_file_meta_sink::write_and_update() { // New header, so set current size of chunk to 0 and start of chunk @@ -175,7 +339,11 @@ gr_file_meta_sink::write_and_update() // of creating a new header per tag. s = pmt_from_uint64(METADATA_HEADER_SIZE + d_extra_size); update_header(mp("strt"), s); - write_header(d_header, d_extra); + + if(d_state == STATE_DETACHED) + write_header(d_hdr_fp, d_header, d_extra); + else + write_header(d_fp, d_header, d_extra); } void @@ -204,11 +372,6 @@ gr_file_meta_sink::update_rx_time() d_header = pmt_dict_add(d_header, rx_time, r); } -gr_file_meta_sink::~gr_file_meta_sink() -{ - update_last_header(); -} - int gr_file_meta_sink::work(int noutput_items, gr_vector_const_void_star &input_items, @@ -229,49 +392,40 @@ gr_file_meta_sink::work(int noutput_items, std::vector<gr_tag_t>::iterator itr; for(itr = all_tags.begin(); itr != all_tags.end(); itr++) { - // Special case where info is carried on the first tag, so we just - // overwrite the first header. - if(itr->offset == 0) { - update_header(itr->key, itr->value); - fseek(d_fp, 0, SEEK_SET); - write_header(d_header, d_extra); - } - else { - int item_offset = (int)(itr->offset - abs_N); - - // Write date to file up to the next tag location - while(nwritten < item_offset) { - size_t towrite = std::min(d_max_seg_size - d_total_seg_size, - (size_t)(item_offset - nwritten)); - int count = fwrite(inbuf, d_itemsize, towrite, d_fp); - if(count == 0) // FIXME add error handling - break; - nwritten += count; - inbuf += count * d_itemsize; - - d_total_seg_size += count; - - // Only add a new header if we are not at the position of the - // next tag - if((d_total_seg_size == d_max_seg_size) && - (nwritten < item_offset)) { - update_last_header(); - update_rx_time(); - write_and_update(); - d_total_seg_size = 0; - } - } - - if(d_total_seg_size > 0) { + int item_offset = (int)(itr->offset - abs_N); + + // Write date to file up to the next tag location + while(nwritten < item_offset) { + size_t towrite = std::min(d_max_seg_size - d_total_seg_size, + (size_t)(item_offset - nwritten)); + int count = fwrite(inbuf, d_itemsize, towrite, d_fp); + if(count == 0) // FIXME add error handling + break; + nwritten += count; + inbuf += count * d_itemsize; + + d_total_seg_size += count; + + // Only add a new header if we are not at the position of the + // next tag + if((d_total_seg_size == d_max_seg_size) && + (nwritten < item_offset)) { update_last_header(); - update_header(itr->key, itr->value); + update_rx_time(); write_and_update(); d_total_seg_size = 0; } - else { - update_header(itr->key, itr->value); - update_last_header(); - } + } + + if(d_total_seg_size > 0) { + update_last_header(); + update_header(itr->key, itr->value); + write_and_update(); + d_total_seg_size = 0; + } + else { + update_header(itr->key, itr->value); + update_last_header(); } } diff --git a/gnuradio-core/src/lib/io/gr_file_meta_sink.h b/gnuradio-core/src/lib/io/gr_file_meta_sink.h index 929c9634e..c0636d66a 100644 --- a/gnuradio-core/src/lib/io/gr_file_meta_sink.h +++ b/gnuradio-core/src/lib/io/gr_file_meta_sink.h @@ -52,7 +52,8 @@ gr_make_file_meta_sink(size_t itemsize, const char *filename, double samp_rate=1, double relative_rate=1, gr_file_types type=GR_FILE_FLOAT, bool complex=true, size_t max_segment_size=1000000, - const std::string &extra_dict=""); + const std::string &extra_dict="", + bool detached_header=false); /*! * \brief Write stream to file with meta-data headers. @@ -76,7 +77,7 @@ gr_make_file_meta_sink(size_t itemsize, const char *filename, * segment starts plus the data segment size. Following will either be * a new header or EOF. */ -class GR_CORE_API gr_file_meta_sink : public gr_sync_block, public gr_file_sink_base +class GR_CORE_API gr_file_meta_sink : public gr_sync_block { /*! * \brief Create a meta-data file sink. @@ -94,15 +95,24 @@ class GR_CORE_API gr_file_meta_sink : public gr_sync_block, public gr_file_sink_ * before the header is repeated (in items). * \param extra_dict (string): a serialized PMT dictionary of extra * information. Currently not supported. + * \param detached_header (bool): Set to true to store the header + * info in a separate file (named filename.hdr) */ friend GR_CORE_API gr_file_meta_sink_sptr gr_make_file_meta_sink(size_t itemsize, const char *filename, double samp_rate, double relative_rate, gr_file_types type, bool complex, size_t max_segment_size, - const std::string &extra_dict); + const std::string &extra_dict, + bool detached_header); private: + enum meta_state_t { + STATE_INLINE=0, + STATE_DETACHED + }; + + size_t d_itemsize; double d_samp_rate; double d_relative_rate; @@ -111,23 +121,44 @@ class GR_CORE_API gr_file_meta_sink : public gr_sync_block, public gr_file_sink_ pmt_t d_header; pmt_t d_extra; size_t d_extra_size; + bool d_updated; + bool d_unbuffered; + + boost::mutex d_mutex; + FILE *d_new_fp, *d_new_hdr_fp; + FILE *d_fp, *d_hdr_fp; + meta_state_t d_state; protected: gr_file_meta_sink(size_t itemsize, const char *filename, double samp_rate=1, double relative_rate=1, gr_file_types type=GR_FILE_FLOAT, bool complex=true, size_t max_segment_size=1000000, - const std::string &extra_dict=""); + const std::string &extra_dict="", + bool detached_header=false); - void write_header(pmt_t header, pmt_t extra); + void write_header(FILE *fp, pmt_t header, pmt_t extra); void update_header(pmt_t key, pmt_t value); void update_last_header(); + void update_last_header_inline(); + void update_last_header_detached(); void write_and_update(); void update_rx_time(); + bool _open(FILE **fp, const char *filename); + public: ~gr_file_meta_sink(); + bool open(const char *filename); + void close(); + void do_update(); + + void set_unbuffered(bool unbuffered) + { + d_unbuffered = unbuffered; + } + //FIXME: add setters/getters for properties. int work(int noutput_items, diff --git a/gnuradio-core/src/lib/io/gr_file_meta_sink.i b/gnuradio-core/src/lib/io/gr_file_meta_sink.i index 6cced3f38..ed3eda0f8 100644 --- a/gnuradio-core/src/lib/io/gr_file_meta_sink.i +++ b/gnuradio-core/src/lib/io/gr_file_meta_sink.i @@ -41,7 +41,8 @@ gr_make_file_meta_sink(size_t itemsize, const char *filename, double samp_rate=1, double relative_rate=1, gr_file_types type=GR_FILE_FLOAT, bool complex=true, size_t max_segment_size=1000000, - const std::string & extra_dict=""); + const std::string & extra_dict="", + bool detached_header=false); class gr_file_meta_sink : public gr_sync_block, public gr_file_sink_base { @@ -50,7 +51,8 @@ class gr_file_meta_sink : public gr_sync_block, public gr_file_sink_base double samp_rate, double relative_rate, gr_file_types type, bool complex, size_t max_segment_size, - const std::string & extra_dict); + const std::string & extra_dict, + bool detached_header); public: ~gr_file_meta_sink(); diff --git a/gnuradio-core/src/lib/io/gr_file_sink_base.cc b/gnuradio-core/src/lib/io/gr_file_sink_base.cc index b2dcc1be5..cb67bbb6c 100644 --- a/gnuradio-core/src/lib/io/gr_file_sink_base.cc +++ b/gnuradio-core/src/lib/io/gr_file_sink_base.cc @@ -79,6 +79,8 @@ gr_file_sink_base::open(const char *filename) perror (filename); return false; } + std::cerr << "OPENING NEW FILE: " << filename << std::endl; + std::cerr << "FD: " << fd << std::endl; if (d_new_fp){ // if we've already got a new one open, close it fclose(d_new_fp); @@ -90,6 +92,8 @@ gr_file_sink_base::open(const char *filename) ::close(fd); // don't leak file descriptor if fdopen fails. } + std::cerr << "D_NEW_FD: " << d_new_fp << std::endl; + d_updated = true; return d_new_fp != 0; } |