summaryrefslogtreecommitdiff
path: root/gnuradio-core
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-core')
-rwxr-xr-xgnuradio-core/src/lib/io/gr_udp_sink.cc85
-rw-r--r--gnuradio-core/src/lib/io/gr_udp_sink.h21
-rwxr-xr-xgnuradio-core/src/lib/io/gr_udp_source.cc117
-rwxr-xr-xgnuradio-core/src/lib/io/gr_udp_source.h32
-rwxr-xr-x[-rw-r--r--]gnuradio-core/src/lib/io/gr_udp_source.i7
5 files changed, 121 insertions, 141 deletions
diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc
index b447dd3b3..263d3dd4f 100755
--- a/gnuradio-core/src/lib/io/gr_udp_sink.cc
+++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -31,9 +31,11 @@
#include <string.h>
#if defined(HAVE_NETDB_H)
typedef void* optval_t;
-#else
+#elif defined(HAVE_WINDOWS_H)
// if not posix, assume winsock
#define USING_WINSOCK
+#include <winsock2.h>
+#include <ws2tcpip.h>
#define SHUT_RDWR 2
typedef char* optval_t;
#endif
@@ -88,11 +90,13 @@ gr_udp_sink::gr_udp_sink (size_t itemsize,
: gr_sync_block ("udp_sink",
gr_make_io_signature (1, 1, itemsize),
gr_make_io_signature (0, 0, 0)),
- d_itemsize (itemsize), d_updated(false), d_payload_size(payload_size)
+ d_itemsize (itemsize), d_payload_size(payload_size)
{
int ret = 0;
+ struct addrinfo *ip_src; // store the source ip info
+ struct addrinfo *ip_dst; // store the destination ip info
-#if !defined(HAVE_SOCKET) // for Windows (with MinGW)
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
// initialize winsock DLL
WSADATA wsaData;
int iResult = WSAStartup( MAKEWORD(2,2), &wsaData );
@@ -110,55 +114,21 @@ gr_udp_sink::gr_udp_sink (size_t itemsize,
hints.ai_protocol = IPPROTO_UDP;
char port_str[7];
sprintf( port_str, "%d", port_src );
- ret = getaddrinfo( src, port_str, &hints, &d_ip_src );
+ ret = getaddrinfo( src, port_str, &hints, &ip_src );
if( ret != 0 )
report_error("gr_udp_source/getaddrinfo",
"can't initialize source socket" );
// Get the destination IP address from the host name
sprintf( port_str, "%d", port_dst );
- ret = getaddrinfo( dst, port_str, &hints, &d_ip_dst );
+ ret = getaddrinfo( dst, port_str, &hints, &ip_dst );
if( ret != 0 )
report_error("gr_udp_source/getaddrinfo",
"can't initialize destination socket" );
-
- open();
-}
-
-// public constructor that returns a shared_ptr
-
-gr_udp_sink_sptr
-gr_make_udp_sink (size_t itemsize,
- const char *src, unsigned short port_src,
- const char *dst, unsigned short port_dst,
- int payload_size)
-{
- return gr_udp_sink_sptr (new gr_udp_sink (itemsize,
- src, port_src,
- dst, port_dst,
- payload_size));
-}
-
-gr_udp_sink::~gr_udp_sink ()
-{
- freeaddrinfo(d_ip_src);
- freeaddrinfo(d_ip_dst);
- close();
-
-#if !defined(HAVE_SOCKET) // for Windows (with MinGW)
- // free winsock resources
- WSACleanup();
-#endif
-}
-
-bool
-gr_udp_sink::open()
-{
- gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function
// create socket
- d_socket = socket(d_ip_src->ai_family, d_ip_src->ai_socktype,
- d_ip_src->ai_protocol);
+ d_socket = socket(ip_src->ai_family, ip_src->ai_socktype,
+ ip_src->ai_protocol);
if(d_socket == -1) {
report_error("socket open","can't open socket");
}
@@ -180,24 +150,35 @@ gr_udp_sink::open()
}
// bind socket to an address and port number to listen on
- if(bind (d_socket, d_ip_src->ai_addr, d_ip_src->ai_addrlen) == -1) {
+ if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) {
report_error("socket bind","can't bind socket");
}
// Not sure if we should throw here or allow retries
- if(connect(d_socket, d_ip_dst->ai_addr, d_ip_dst->ai_addrlen) == -1) {
+ if(connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) {
report_error("socket connect","can't connect to socket");
}
- d_updated = true;
- return d_socket != 0;
+ freeaddrinfo(ip_src);
+ freeaddrinfo(ip_dst);
}
-void
-gr_udp_sink::close()
+// public constructor that returns a shared_ptr
+
+gr_udp_sink_sptr
+gr_make_udp_sink (size_t itemsize,
+ const char *src, unsigned short port_src,
+ const char *dst, unsigned short port_dst,
+ int payload_size)
{
- gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function
+ return gr_udp_sink_sptr (new gr_udp_sink (itemsize,
+ src, port_src,
+ dst, port_dst,
+ payload_size));
+}
+gr_udp_sink::~gr_udp_sink ()
+{
if (d_socket){
shutdown(d_socket, SHUT_RDWR);
#if defined(USING_WINSOCK)
@@ -207,7 +188,11 @@ gr_udp_sink::close()
#endif
d_socket = 0;
}
- d_updated = true;
+
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
+ // free winsock resources
+ WSACleanup();
+#endif
}
int
diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.h b/gnuradio-core/src/lib/io/gr_udp_sink.h
index 9f50ed7f0..6b6ee40fe 100644
--- a/gnuradio-core/src/lib/io/gr_udp_sink.h
+++ b/gnuradio-core/src/lib/io/gr_udp_sink.h
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -66,13 +66,9 @@ class gr_udp_sink : public gr_sync_block
int payload_size);
private:
size_t d_itemsize;
- bool d_updated;
- gruel::mutex d_mutex;
int d_payload_size; // maximum transmission unit (packet length)
int d_socket; // handle to socket
- struct addrinfo *d_ip_src; // store the source ip info
- struct addrinfo *d_ip_dst; // store the destination ip info
protected:
/*!
@@ -96,21 +92,6 @@ class gr_udp_sink : public gr_sync_block
public:
~gr_udp_sink ();
- /*!
- * \brief open a socket specified by the port and ip address info
- *
- * Opens a socket, binds to the address, and makes connectionless association
- * over UDP. If any of these fail, the fuction retuns the error and exits.
- */
- bool open();
-
- /*!
- * \brief Close current socket.
- *
- * Shuts down read/write on the socket
- */
- void close();
-
/*! \brief return the PAYLOAD_SIZE of the socket */
int payload_size() { return d_payload_size; }
diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc
index 56499258c..ce870d481 100755
--- a/gnuradio-core/src/lib/io/gr_udp_source.cc
+++ b/gnuradio-core/src/lib/io/gr_udp_source.cc
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -30,14 +30,19 @@
#include <stdio.h>
#include <string.h>
#if defined(HAVE_NETDB_H)
+#include <netdb.h>
typedef void* optval_t;
-#else
+#elif defined(HAVE_WINDOWS_H)
// if not posix, assume winsock
#define USING_WINSOCK
+#include <winsock2.h>
+#include <ws2tcpip.h>
#define SHUT_RDWR 2
typedef char* optval_t;
#endif
+#define USE_SELECT 1 // non-blocking receive on all platforms
+#define USE_RCV_TIMEO 0 // non-blocking receive on all but Cygwin
#define SRC_VERBOSE 0
static int is_error( int perr )
@@ -77,15 +82,17 @@ static void report_error( char *msg1, char *msg2 )
}
gr_udp_source::gr_udp_source(size_t itemsize, const char *src,
- unsigned short port_src, int payload_size)
+ unsigned short port_src, int payload_size,
+ bool wait)
: gr_sync_block ("udp_source",
gr_make_io_signature(0, 0, 0),
gr_make_io_signature(1, 1, itemsize)),
- d_itemsize(itemsize), d_updated(false), d_payload_size(payload_size), d_residual(0), d_temp_offset(0)
+ d_itemsize(itemsize), d_payload_size(payload_size), d_wait(wait), d_residual(0), d_temp_offset(0)
{
int ret = 0;
+ struct addrinfo *ip_src; // store the source IP address to use
-#if !defined(HAVE_SOCKET) // for Windows (with MinGW)
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
// initialize winsock DLL
WSADATA wsaData;
int iResult = WSAStartup( MAKEWORD(2,2), &wsaData );
@@ -103,43 +110,16 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src,
hints.ai_protocol = IPPROTO_UDP;
char port_str[7];
sprintf( port_str, "%d", port_src );
- ret = getaddrinfo( src, port_str, &hints, &d_ip_src );
+ ret = getaddrinfo( src, port_str, &hints, &ip_src );
if( ret != 0 )
report_error("gr_udp_source/getaddrinfo",
"can't initialize source socket" );
d_temp_buff = new char[d_payload_size]; // allow it to hold up to payload_size bytes
-
- open();
-}
-
-gr_udp_source_sptr
-gr_make_udp_source (size_t itemsize, const char *ipaddr,
- unsigned short port, int payload_size)
-{
- return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr,
- port, payload_size));
-}
-
-gr_udp_source::~gr_udp_source ()
-{
- freeaddrinfo(d_ip_src);
- delete [] d_temp_buff;
- close();
-#if !defined(HAVE_SOCKET) // for Windows (with MinGW)
- // free winsock resources
- WSACleanup();
-#endif
-}
-
-bool
-gr_udp_source::open()
-{
- gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function
// create socket
- d_socket = socket(d_ip_src->ai_family, d_ip_src->ai_socktype,
- d_ip_src->ai_protocol);
+ d_socket = socket(ip_src->ai_family, ip_src->ai_socktype,
+ ip_src->ai_protocol);
if(d_socket == -1) {
report_error("socket open","can't open socket");
}
@@ -160,6 +140,7 @@ gr_udp_source::open()
}
}
+#if USE_RCV_TIMEO
// Set a timeout on the receive function to not block indefinitely
// This value can (and probably should) be changed
// Ignored on Cygwin
@@ -173,20 +154,27 @@ gr_udp_source::open()
if(setsockopt(d_socket, SOL_SOCKET, SO_RCVTIMEO, (optval_t)&timeout, sizeof(timeout)) == -1) {
report_error("SO_RCVTIMEO","can't set socket option SO_RCVTIMEO");
}
+#endif // USE_RCV_TIMEO
// bind socket to an address and port number to listen on
- if(bind (d_socket, d_ip_src->ai_addr, d_ip_src->ai_addrlen) == -1) {
+ if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) {
report_error("socket bind","can't bind socket");
}
+ freeaddrinfo(ip_src);
- d_updated = true;
- return d_socket != 0;
}
-void
-gr_udp_source::close()
+gr_udp_source_sptr
+gr_make_udp_source (size_t itemsize, const char *ipaddr,
+ unsigned short port, int payload_size, bool wait)
{
- gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function
+ return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr,
+ port, payload_size, wait));
+}
+
+gr_udp_source::~gr_udp_source ()
+{
+ delete [] d_temp_buff;
if (d_socket){
shutdown(d_socket, SHUT_RDWR);
@@ -197,7 +185,11 @@ gr_udp_source::close()
#endif
d_socket = 0;
}
- d_updated = true;
+
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
+ // free winsock resources
+ WSACleanup();
+#endif
}
int
@@ -232,8 +224,38 @@ gr_udp_source::work (int noutput_items,
d_temp_offset = d_temp_offset+d_residual;
}
+#if USE_SELECT
+ // Use select() to determine when socket is readable
+ fd_set readfds;
+ timeval timeout;
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0;
+#endif
+
while(1) {
// get the data into our output buffer and record the number of bytes
+
+#if USE_SELECT
+ // RCV_TIMEO doesn't work on all systems (e.g., Cygwin)
+ // use select() instead of, or in addition to RCV_TIMEO
+ FD_ZERO(&readfds);
+ FD_SET(d_socket, &readfds);
+ r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout);
+ if(r < 0) {
+ report_error("udp_source/select",NULL);
+ return -1;
+ }
+ else if(r == 0 ) { // timed out
+ if( d_wait ) {
+ // Allow boost thread interrupt, then try again
+ boost::this_thread::interruption_point();
+ continue;
+ }
+ else
+ return -1;
+ }
+#endif // USE_SELECT
+
// This is a non-blocking call with a timeout set in the constructor
r = recv(d_socket, d_temp_buff, d_payload_size, 0); // get the entire payload or the what's available
@@ -244,11 +266,16 @@ gr_udp_source::work (int noutput_items,
printf("UDP receive timed out\n");
#endif
- // Break here to allow the rest of the flow graph time to run and so ctrl-C breaks
- break;
+ if( d_wait ) {
+ // Allow boost thread interrupt, then try again
+ boost::this_thread::interruption_point();
+ continue;
+ }
+ else
+ return -1;
}
else {
- report_error("udp_source",NULL);
+ report_error("udp_source/recv",NULL);
return -1;
}
}
diff --git a/gnuradio-core/src/lib/io/gr_udp_source.h b/gnuradio-core/src/lib/io/gr_udp_source.h
index 14d521dac..b06536d6a 100755
--- a/gnuradio-core/src/lib/io/gr_udp_source.h
+++ b/gnuradio-core/src/lib/io/gr_udp_source.h
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -38,7 +38,8 @@ class gr_udp_source;
typedef boost::shared_ptr<gr_udp_source> gr_udp_source_sptr;
gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src,
- unsigned short port_src, int payload_size=1472);
+ unsigned short port_src,
+ int payload_size=1472, bool wait=true);
/*!
* \brief Read stream from an UDP socket.
@@ -50,22 +51,22 @@ gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src,
* \param port_src The port number on which the socket listens for data
* \param payload_size UDP payload size by default set to
* 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+ * \param wait Wait for data if not immediately available (default: true)
*
*/
class gr_udp_source : public gr_sync_block
{
friend gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src,
- unsigned short port_src, int payload_size);
+ unsigned short port_src,
+ int payload_size, bool wait);
private:
size_t d_itemsize;
- bool d_updated;
- gruel::mutex d_mutex;
int d_payload_size; // maximum transmission unit (packet length)
+ bool d_wait; // wait if data if not immediately available
int d_socket; // handle to socket
- struct addrinfo *d_ip_src; // store the source IP address to use
char *d_temp_buff; // hold buffer between calls
ssize_t d_residual; // hold information about number of bytes stored in the temp buffer
size_t d_temp_offset; // point to temp buffer location offset
@@ -80,27 +81,14 @@ class gr_udp_source : public gr_sync_block
* \param port_src The port number on which the socket listens for data
* \param payload_size UDP payload size by default set to
* 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+ * \param wait Wait for data if not immediately available (default: true)
*/
- gr_udp_source(size_t itemsize, const char *src, unsigned short port_src, int payload_size);
+ gr_udp_source(size_t itemsize, const char *src, unsigned short port_src,
+ int payload_size, bool wait);
public:
~gr_udp_source();
- /*!
- * \brief open a socket specified by the port and ip address info
- *
- * Opens a socket, binds to the address, and waits for a connection
- * over UDP. If any of these fail, the fuction retuns the error and exits.
- */
- bool open();
-
- /*!
- * \brief Close current socket.
- *
- * Shuts down read/write on the socket
- */
- void close();
-
/*! \brief return the PAYLOAD_SIZE of the socket */
int payload_size() { return d_payload_size; }
diff --git a/gnuradio-core/src/lib/io/gr_udp_source.i b/gnuradio-core/src/lib/io/gr_udp_source.i
index fb39dad68..efaa57c27 100644..100755
--- a/gnuradio-core/src/lib/io/gr_udp_source.i
+++ b/gnuradio-core/src/lib/io/gr_udp_source.i
@@ -24,19 +24,18 @@ GR_SWIG_BLOCK_MAGIC(gr,udp_source)
gr_udp_source_sptr
gr_make_udp_source (size_t itemsize, const char *src,
- unsigned short port_src, int payload_size=1472);
+ unsigned short port_src, int payload_size=1472,
+ bool wait=true);
class gr_udp_source : public gr_sync_block
{
protected:
gr_udp_source (size_t itemsize, const char *src,
- unsigned short port_src, int payload_size);
+ unsigned short port_src, int payload_size, bool wait);
public:
~gr_udp_source ();
- bool open();
- void close();
int payload_size() { return d_payload_size; }
};