summaryrefslogtreecommitdiff
path: root/gnuradio-core
diff options
context:
space:
mode:
authorTom Rondeau2012-12-07 12:11:46 -0500
committerTom Rondeau2012-12-07 12:11:46 -0500
commit619a167471d060e2f1b49a9aac55a23b327afa88 (patch)
treed75171f1b1f6b9881bee20c7100883909e0f7b3c /gnuradio-core
parente8cf359e43f533fd3309389e5d62642f3080ac15 (diff)
downloadgnuradio-619a167471d060e2f1b49a9aac55a23b327afa88.tar.gz
gnuradio-619a167471d060e2f1b49a9aac55a23b327afa88.tar.bz2
gnuradio-619a167471d060e2f1b49a9aac55a23b327afa88.zip
core: metadata file sink can set detached header.
This breaks away from subclassing gr_file_sink_base since we have to keep track of two files.
Diffstat (limited to 'gnuradio-core')
-rw-r--r--gnuradio-core/src/lib/io/gr_file_meta_sink.cc280
-rw-r--r--gnuradio-core/src/lib/io/gr_file_meta_sink.h41
-rw-r--r--gnuradio-core/src/lib/io/gr_file_meta_sink.i6
-rw-r--r--gnuradio-core/src/lib/io/gr_file_sink_base.cc4
4 files changed, 261 insertions, 70 deletions
diff --git a/gnuradio-core/src/lib/io/gr_file_meta_sink.cc b/gnuradio-core/src/lib/io/gr_file_meta_sink.cc
index da3be8060..43900bcd9 100644
--- a/gnuradio-core/src/lib/io/gr_file_meta_sink.cc
+++ b/gnuradio-core/src/lib/io/gr_file_meta_sink.cc
@@ -26,35 +26,70 @@
#include <gr_file_meta_sink.h>
#include <gr_io_signature.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <gruel/thread.h>
#include <stdexcept>
+// win32 (mingw/msvc) specific
+#ifdef HAVE_IO_H
+#include <io.h>
+#endif
+#ifdef O_BINARY
+#define OUR_O_BINARY O_BINARY
+#else
+#define OUR_O_BINARY 0
+#endif
+
+// should be handled via configure
+#ifdef O_LARGEFILE
+#define OUR_O_LARGEFILE O_LARGEFILE
+#else
+#define OUR_O_LARGEFILE 0
+#endif
+
gr_file_meta_sink_sptr
gr_make_file_meta_sink(size_t itemsize, const char *filename,
double samp_rate, double relative_rate,
gr_file_types type, bool complex,
size_t max_segment_size,
- const std::string &extra_dict)
+ const std::string &extra_dict,
+ bool detached_header)
{
return gnuradio::get_initial_sptr
(new gr_file_meta_sink(itemsize, filename,
samp_rate, relative_rate,
type, complex,
max_segment_size,
- extra_dict));
+ extra_dict,
+ detached_header));
}
gr_file_meta_sink::gr_file_meta_sink(size_t itemsize, const char *filename,
double samp_rate, double relative_rate,
gr_file_types type, bool complex,
size_t max_segment_size,
- const std::string &extra_dict)
+ const std::string &extra_dict,
+ bool detached_header)
: gr_sync_block("file_meta_sink",
gr_make_io_signature(1, 1, itemsize),
gr_make_io_signature(0, 0, 0)),
- gr_file_sink_base(filename, true), d_itemsize(itemsize),
+ d_itemsize(itemsize),
d_samp_rate(samp_rate), d_relative_rate(relative_rate),
- d_max_seg_size(max_segment_size), d_total_seg_size(0)
+ d_max_seg_size(max_segment_size), d_total_seg_size(0),
+ d_updated(false), d_unbuffered(false)
{
+ d_fp = 0;
+ d_new_fp = 0;
+ d_hdr_fp = 0;
+ d_new_hdr_fp = 0;
+
+ if(detached_header == true)
+ d_state = STATE_DETACHED;
+ else
+ d_state = STATE_INLINE;
+
if(!open(filename))
throw std::runtime_error("file_meta_sink: can't open file\n");
@@ -86,14 +121,120 @@ gr_file_meta_sink::gr_file_meta_sink(size_t itemsize, const char *filename,
d_header = pmt_dict_add(d_header, mp("strt"), pmt_from_uint64(METADATA_HEADER_SIZE+d_extra_size));
d_header = pmt_dict_add(d_header, mp("size"), pmt_from_uint64(0));
- write_header(d_header, d_extra);
+ do_update();
+
+ if(d_state == STATE_DETACHED)
+ write_header(d_hdr_fp, d_header, d_extra);
+ else
+ write_header(d_fp, d_header, d_extra);
+}
+
+gr_file_meta_sink::~gr_file_meta_sink()
+{
+ update_last_header();
+
+ close();
+
+ if(d_fp) {
+ fclose(d_fp);
+ d_fp = 0;
+ }
+
+ if(d_state == STATE_DETACHED) {
+ if(d_hdr_fp) {
+ fclose(d_hdr_fp);
+ d_hdr_fp = 0;
+ }
+ }
+}
+
+bool
+gr_file_meta_sink::open(const char *filename)
+{
+ bool ret = true;
+ if(d_state == STATE_DETACHED) {
+ std::stringstream s;
+ s << filename << ".hdr";
+ ret = _open(&d_new_hdr_fp, s.str().c_str());
+ }
+
+ ret = ret && _open(&d_new_fp, filename);
+ d_updated = true;
+ return ret;
+}
+
+bool
+gr_file_meta_sink::_open(FILE **fp, const char *filename)
+{
+ gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function
+
+ bool ret = true;
+ int fd;
+
+ if((fd = ::open(filename,
+ O_WRONLY|O_CREAT|O_TRUNC|OUR_O_LARGEFILE|OUR_O_BINARY,
+ 0664)) < 0){
+ perror(filename);
+ return false;
+ }
+
+ if(*fp) { // if we've already got a new one open, close it
+ fclose(*fp);
+ fp = 0;
+ }
+
+ if((*fp = fdopen(fd, "wb")) == NULL) {
+ perror(filename);
+ ::close(fd); // don't leak file descriptor if fdopen fails.
+ }
+
+ ret = fp != 0;
+
+ return ret;
}
void
-gr_file_meta_sink::write_header(pmt_t header, pmt_t extra)
+gr_file_meta_sink::close()
{
- do_update();
+ gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function
+ if(d_state == STATE_DETACHED) {
+ if(d_new_hdr_fp) {
+ fclose(d_new_hdr_fp);
+ d_new_hdr_fp = 0;
+ }
+ }
+
+ if(d_new_fp) {
+ fclose(d_new_fp);
+ d_new_fp = 0;
+ }
+ d_updated = true;
+}
+
+void
+gr_file_meta_sink::do_update()
+{
+ if(d_updated) {
+ gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this block
+ if(d_state == STATE_DETACHED) {
+ if(d_hdr_fp)
+ fclose(d_hdr_fp);
+ d_hdr_fp = d_new_hdr_fp; // install new file pointer
+ d_new_hdr_fp = 0;
+ }
+ if(d_fp)
+ fclose(d_fp);
+ d_fp = d_new_fp; // install new file pointer
+ d_new_fp = 0;
+
+ d_updated = false;
+ }
+}
+
+void
+gr_file_meta_sink::write_header(FILE *fp, pmt_t header, pmt_t extra)
+{
std::string header_str = pmt_serialize_str(header);
std::string extra_str = pmt_serialize_str(extra);
@@ -103,10 +244,10 @@ gr_file_meta_sink::write_header(pmt_t header, pmt_t extra)
size_t nwritten = 0;
while(nwritten < header_str.size()) {
std::string sub = header_str.substr(nwritten);
- int count = fwrite(sub.c_str(), sizeof(char), sub.size(), d_fp);
+ int count = fwrite(sub.c_str(), sizeof(char), sub.size(), fp);
nwritten += count;
- if((count == 0) && (ferror(d_fp))) {
- fclose(d_fp);
+ if((count == 0) && (ferror(fp))) {
+ fclose(fp);
throw std::runtime_error("file_meta_sink: error writing header to file.\n");
}
}
@@ -114,10 +255,10 @@ gr_file_meta_sink::write_header(pmt_t header, pmt_t extra)
nwritten = 0;
while(nwritten < extra_str.size()) {
std::string sub = extra_str.substr(nwritten);
- int count = fwrite(sub.c_str(), sizeof(char), sub.size(), d_fp);
+ int count = fwrite(sub.c_str(), sizeof(char), sub.size(), fp);
nwritten += count;
- if((count == 0) && (ferror(d_fp))) {
- fclose(d_fp);
+ if((count == 0) && (ferror(fp))) {
+ fclose(fp);
throw std::runtime_error("file_meta_sink: error writing extra to file.\n");
}
}
@@ -142,26 +283,49 @@ gr_file_meta_sink::update_header(pmt_t key, pmt_t value)
else {
d_extra = pmt_dict_add(d_extra, key, value);
d_extra_size = pmt_serialize_str(d_extra).size();
- d_header = pmt_dict_add(d_header, mp("strt"),
- pmt_from_uint64(METADATA_HEADER_SIZE+d_extra_size));
}
}
void
gr_file_meta_sink::update_last_header()
{
+ if(d_state == STATE_DETACHED)
+ update_last_header_detached();
+ else
+ update_last_header_inline();
+}
+
+void
+gr_file_meta_sink::update_last_header_inline()
+{
// Update the last header info with the number of samples this
// block represents.
- size_t hdrlen = METADATA_HEADER_SIZE+d_extra_size;
+
+ size_t hdrlen = pmt_to_uint64(pmt_dict_ref(d_header, mp("strt"), PMT_NIL));
size_t seg_size = d_itemsize*d_total_seg_size;
pmt_t s = pmt_from_uint64(seg_size);
update_header(mp("size"), s);
+ update_header(mp("strt"), pmt_from_uint64(METADATA_HEADER_SIZE+d_extra_size));
fseek(d_fp, -seg_size-hdrlen, SEEK_CUR);
- write_header(d_header, d_extra);
+ write_header(d_fp, d_header, d_extra);
fseek(d_fp, seg_size, SEEK_CUR);
}
void
+gr_file_meta_sink::update_last_header_detached()
+{
+ // Update the last header info with the number of samples this
+ // block represents.
+ size_t hdrlen = pmt_to_uint64(pmt_dict_ref(d_header, mp("strt"), PMT_NIL));
+ size_t seg_size = d_itemsize*d_total_seg_size;
+ pmt_t s = pmt_from_uint64(seg_size);
+ update_header(mp("size"), s);
+ update_header(mp("strt"), pmt_from_uint64(METADATA_HEADER_SIZE+d_extra_size));
+ fseek(d_hdr_fp, -hdrlen, SEEK_CUR);
+ write_header(d_hdr_fp, d_header, d_extra);
+}
+
+void
gr_file_meta_sink::write_and_update()
{
// New header, so set current size of chunk to 0 and start of chunk
@@ -175,7 +339,11 @@ gr_file_meta_sink::write_and_update()
// of creating a new header per tag.
s = pmt_from_uint64(METADATA_HEADER_SIZE + d_extra_size);
update_header(mp("strt"), s);
- write_header(d_header, d_extra);
+
+ if(d_state == STATE_DETACHED)
+ write_header(d_hdr_fp, d_header, d_extra);
+ else
+ write_header(d_fp, d_header, d_extra);
}
void
@@ -204,11 +372,6 @@ gr_file_meta_sink::update_rx_time()
d_header = pmt_dict_add(d_header, rx_time, r);
}
-gr_file_meta_sink::~gr_file_meta_sink()
-{
- update_last_header();
-}
-
int
gr_file_meta_sink::work(int noutput_items,
gr_vector_const_void_star &input_items,
@@ -229,49 +392,40 @@ gr_file_meta_sink::work(int noutput_items,
std::vector<gr_tag_t>::iterator itr;
for(itr = all_tags.begin(); itr != all_tags.end(); itr++) {
- // Special case where info is carried on the first tag, so we just
- // overwrite the first header.
- if(itr->offset == 0) {
- update_header(itr->key, itr->value);
- fseek(d_fp, 0, SEEK_SET);
- write_header(d_header, d_extra);
- }
- else {
- int item_offset = (int)(itr->offset - abs_N);
-
- // Write date to file up to the next tag location
- while(nwritten < item_offset) {
- size_t towrite = std::min(d_max_seg_size - d_total_seg_size,
- (size_t)(item_offset - nwritten));
- int count = fwrite(inbuf, d_itemsize, towrite, d_fp);
- if(count == 0) // FIXME add error handling
- break;
- nwritten += count;
- inbuf += count * d_itemsize;
-
- d_total_seg_size += count;
-
- // Only add a new header if we are not at the position of the
- // next tag
- if((d_total_seg_size == d_max_seg_size) &&
- (nwritten < item_offset)) {
- update_last_header();
- update_rx_time();
- write_and_update();
- d_total_seg_size = 0;
- }
- }
-
- if(d_total_seg_size > 0) {
+ int item_offset = (int)(itr->offset - abs_N);
+
+ // Write date to file up to the next tag location
+ while(nwritten < item_offset) {
+ size_t towrite = std::min(d_max_seg_size - d_total_seg_size,
+ (size_t)(item_offset - nwritten));
+ int count = fwrite(inbuf, d_itemsize, towrite, d_fp);
+ if(count == 0) // FIXME add error handling
+ break;
+ nwritten += count;
+ inbuf += count * d_itemsize;
+
+ d_total_seg_size += count;
+
+ // Only add a new header if we are not at the position of the
+ // next tag
+ if((d_total_seg_size == d_max_seg_size) &&
+ (nwritten < item_offset)) {
update_last_header();
- update_header(itr->key, itr->value);
+ update_rx_time();
write_and_update();
d_total_seg_size = 0;
}
- else {
- update_header(itr->key, itr->value);
- update_last_header();
- }
+ }
+
+ if(d_total_seg_size > 0) {
+ update_last_header();
+ update_header(itr->key, itr->value);
+ write_and_update();
+ d_total_seg_size = 0;
+ }
+ else {
+ update_header(itr->key, itr->value);
+ update_last_header();
}
}
diff --git a/gnuradio-core/src/lib/io/gr_file_meta_sink.h b/gnuradio-core/src/lib/io/gr_file_meta_sink.h
index 929c9634e..c0636d66a 100644
--- a/gnuradio-core/src/lib/io/gr_file_meta_sink.h
+++ b/gnuradio-core/src/lib/io/gr_file_meta_sink.h
@@ -52,7 +52,8 @@ gr_make_file_meta_sink(size_t itemsize, const char *filename,
double samp_rate=1, double relative_rate=1,
gr_file_types type=GR_FILE_FLOAT, bool complex=true,
size_t max_segment_size=1000000,
- const std::string &extra_dict="");
+ const std::string &extra_dict="",
+ bool detached_header=false);
/*!
* \brief Write stream to file with meta-data headers.
@@ -76,7 +77,7 @@ gr_make_file_meta_sink(size_t itemsize, const char *filename,
* segment starts plus the data segment size. Following will either be
* a new header or EOF.
*/
-class GR_CORE_API gr_file_meta_sink : public gr_sync_block, public gr_file_sink_base
+class GR_CORE_API gr_file_meta_sink : public gr_sync_block
{
/*!
* \brief Create a meta-data file sink.
@@ -94,15 +95,24 @@ class GR_CORE_API gr_file_meta_sink : public gr_sync_block, public gr_file_sink_
* before the header is repeated (in items).
* \param extra_dict (string): a serialized PMT dictionary of extra
* information. Currently not supported.
+ * \param detached_header (bool): Set to true to store the header
+ * info in a separate file (named filename.hdr)
*/
friend GR_CORE_API gr_file_meta_sink_sptr
gr_make_file_meta_sink(size_t itemsize, const char *filename,
double samp_rate, double relative_rate,
gr_file_types type, bool complex,
size_t max_segment_size,
- const std::string &extra_dict);
+ const std::string &extra_dict,
+ bool detached_header);
private:
+ enum meta_state_t {
+ STATE_INLINE=0,
+ STATE_DETACHED
+ };
+
+
size_t d_itemsize;
double d_samp_rate;
double d_relative_rate;
@@ -111,23 +121,44 @@ class GR_CORE_API gr_file_meta_sink : public gr_sync_block, public gr_file_sink_
pmt_t d_header;
pmt_t d_extra;
size_t d_extra_size;
+ bool d_updated;
+ bool d_unbuffered;
+
+ boost::mutex d_mutex;
+ FILE *d_new_fp, *d_new_hdr_fp;
+ FILE *d_fp, *d_hdr_fp;
+ meta_state_t d_state;
protected:
gr_file_meta_sink(size_t itemsize, const char *filename,
double samp_rate=1, double relative_rate=1,
gr_file_types type=GR_FILE_FLOAT, bool complex=true,
size_t max_segment_size=1000000,
- const std::string &extra_dict="");
+ const std::string &extra_dict="",
+ bool detached_header=false);
- void write_header(pmt_t header, pmt_t extra);
+ void write_header(FILE *fp, pmt_t header, pmt_t extra);
void update_header(pmt_t key, pmt_t value);
void update_last_header();
+ void update_last_header_inline();
+ void update_last_header_detached();
void write_and_update();
void update_rx_time();
+ bool _open(FILE **fp, const char *filename);
+
public:
~gr_file_meta_sink();
+ bool open(const char *filename);
+ void close();
+ void do_update();
+
+ void set_unbuffered(bool unbuffered)
+ {
+ d_unbuffered = unbuffered;
+ }
+
//FIXME: add setters/getters for properties.
int work(int noutput_items,
diff --git a/gnuradio-core/src/lib/io/gr_file_meta_sink.i b/gnuradio-core/src/lib/io/gr_file_meta_sink.i
index 6cced3f38..ed3eda0f8 100644
--- a/gnuradio-core/src/lib/io/gr_file_meta_sink.i
+++ b/gnuradio-core/src/lib/io/gr_file_meta_sink.i
@@ -41,7 +41,8 @@ gr_make_file_meta_sink(size_t itemsize, const char *filename,
double samp_rate=1, double relative_rate=1,
gr_file_types type=GR_FILE_FLOAT, bool complex=true,
size_t max_segment_size=1000000,
- const std::string & extra_dict="");
+ const std::string & extra_dict="",
+ bool detached_header=false);
class gr_file_meta_sink : public gr_sync_block, public gr_file_sink_base
{
@@ -50,7 +51,8 @@ class gr_file_meta_sink : public gr_sync_block, public gr_file_sink_base
double samp_rate, double relative_rate,
gr_file_types type, bool complex,
size_t max_segment_size,
- const std::string & extra_dict);
+ const std::string & extra_dict,
+ bool detached_header);
public:
~gr_file_meta_sink();
diff --git a/gnuradio-core/src/lib/io/gr_file_sink_base.cc b/gnuradio-core/src/lib/io/gr_file_sink_base.cc
index b2dcc1be5..cb67bbb6c 100644
--- a/gnuradio-core/src/lib/io/gr_file_sink_base.cc
+++ b/gnuradio-core/src/lib/io/gr_file_sink_base.cc
@@ -79,6 +79,8 @@ gr_file_sink_base::open(const char *filename)
perror (filename);
return false;
}
+ std::cerr << "OPENING NEW FILE: " << filename << std::endl;
+ std::cerr << "FD: " << fd << std::endl;
if (d_new_fp){ // if we've already got a new one open, close it
fclose(d_new_fp);
@@ -90,6 +92,8 @@ gr_file_sink_base::open(const char *filename)
::close(fd); // don't leak file descriptor if fdopen fails.
}
+ std::cerr << "D_NEW_FD: " << d_new_fp << std::endl;
+
d_updated = true;
return d_new_fp != 0;
}