From b0d32c6c20cadaa544aeaa7b5257919674e8d0ad Mon Sep 17 00:00:00 2001 From: Don Ward Date: Thu, 15 Apr 2010 14:37:04 -0400 Subject: Ignore ENOPROTOOPT return from setsockopt(SO_LINGER) SO_LINGER is not valid for SOCK_DGRAM sockets on Windows, so we expect setsockopt to return ENOPROTOOPT (invalid option for protocol) on Cygwin and MinGW. If it happens on any other system it should probably be ignored there, too. --- gnuradio-core/src/lib/io/gr_udp_sink.cc | 6 ++++-- gnuradio-core/src/lib/io/gr_udp_source.cc | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) mode change 100644 => 100755 gnuradio-core/src/lib/io/gr_udp_sink.cc mode change 100644 => 100755 gnuradio-core/src/lib/io/gr_udp_source.cc (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc old mode 100644 new mode 100755 index d37adfb8a..a837e731e --- a/gnuradio-core/src/lib/io/gr_udp_sink.cc +++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc @@ -132,8 +132,10 @@ gr_udp_sink::open() lngr.l_onoff = 1; lngr.l_linger = 0; if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (optval_t)&lngr, sizeof(linger)) == -1) { - perror("SO_LINGER"); - throw std::runtime_error("can't set socket option SO_LINGER"); + if(errno != ENOPROTOOPT) { // no SO_LINGER for SOCK_DGRAM on Windows + perror("SO_LINGER"); + throw std::runtime_error("can't set socket option SO_LINGER"); + } } // bind socket to an address and port number to listen on diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc old mode 100644 new mode 100755 index d76d0ee32..fed5b6142 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -110,8 +110,10 @@ gr_udp_source::open() lngr.l_onoff = 1; lngr.l_linger = 0; if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (optval_t)&lngr, sizeof(linger)) == -1) { - perror("SO_LINGER"); - throw std::runtime_error("can't set socket option SO_LINGER"); + if(errno != ENOPROTOOPT) { // no SO_LINGER for SOCK_DGRAM on Windows + perror("SO_LINGER"); + throw std::runtime_error("can't set socket option SO_LINGER"); + } } // Set a timeout on the receive function to not block indefinitely -- cgit From 545901e335f27600c460f749b66d60155a179492 Mon Sep 17 00:00:00 2001 From: U-DON-WORKBENCH\Don Date: Sun, 18 Apr 2010 10:19:30 -0400 Subject: Changes to gr_udp_{source,sink} for MinGW Initialize and cleanup after winsock DLL. Interpret winsock error codes. Use DWORD instead of timeval for setting timeout. --- gnuradio-core/src/lib/io/gr_udp_sink.cc | 80 +++++++++++++++++++++++------- gnuradio-core/src/lib/io/gr_udp_source.cc | 82 +++++++++++++++++++++++++------ 2 files changed, 130 insertions(+), 32 deletions(-) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc index a837e731e..dff066288 100755 --- a/gnuradio-core/src/lib/io/gr_udp_sink.cc +++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc @@ -31,15 +31,52 @@ #include typedef void* optval_t; #else +#define USING_WINSOCK #define SHUT_RDWR 2 #define inet_aton(N,A) ( (A)->s_addr = inet_addr(N), ( (A)->s_addr != INADDR_NONE ) ) typedef char* optval_t; +#define ENOPROTOOPT 109 #endif #include #define SNK_VERBOSE 0 +static int is_error( int perr ) +{ + // Compare error to posix error code; return nonzero if match. +#if defined(USING_WINSOCK) + // All codes to be checked for must be defined below + int werr = WSAGetLastError(); + switch( werr ) { + case WSAETIMEDOUT: + return( perr == EAGAIN ); + case WSAENOPROTOOPT: + return( perr == ENOPROTOOPT ); + default: + fprintf(stderr,"gr_udp_source/is_error: unknown error %d\n", perr ); + throw std::runtime_error("internal error"); + } + return 0; +#else + return( perr == errno ); +#endif +} + +static void report_error( char *msg1, char *msg2 ) +{ + // Deal with errors, both posix and winsock +#if defined(USING_WINSOCK) + int werr = WSAGetLastError(); + fprintf(stderr, "%s: winsock error %d\n", msg1, werr ); +#else + perror(msg1); +#endif + if( msg2 != NULL ) + throw std::runtime_error(msg2); + return; +} + gr_udp_sink::gr_udp_sink (size_t itemsize, const char *src, unsigned short port_src, const char *dst, unsigned short port_dst, @@ -50,6 +87,15 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, d_itemsize (itemsize), d_updated(false), d_payload_size(payload_size) { int ret = 0; + +#if !defined(HAVE_SOCKET) // for Windows (with MinGW) + // initialize winsock DLL + WSADATA wsaData; + int iResult = WSAStartup( MAKEWORD(2,2), &wsaData ); + if( iResult != NO_ERROR ) { + report_error( "gr_udp_source WSAStartup", "can't open socket" ); + } +#endif // Set up the address stucture for the source address and port numbers // Get the source IP address from the host name @@ -59,8 +105,8 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, } else { // assume it was specified as an IP address if((ret=inet_aton(src, &d_ip_src)) == 0) { // format IP address - perror("Not a valid source IP address or host name"); - throw std::runtime_error("can't initialize source socket"); + report_error("Not a valid source IP address or host name", + "can't initialize source socket"); } } @@ -71,8 +117,8 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, } else { // assume it was specified as an IP address if((ret=inet_aton(dst, &d_ip_dst)) == 0) { // format IP address - perror("Not a valid destination IP address or host name"); - throw std::runtime_error("can't initialize destination socket"); + report_error("Not a valid destination IP address or host name", + "can't initialize destination socket"); } } @@ -107,6 +153,11 @@ gr_make_udp_sink (size_t itemsize, gr_udp_sink::~gr_udp_sink () { close(); + +#if !defined(HAVE_SOCKET) // for Windows (with MinGW) + // free winsock resources + WSACleanup(); +#endif } bool @@ -116,15 +167,13 @@ gr_udp_sink::open() // create socket if((d_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { - perror("socket open"); - throw std::runtime_error("can't open socket"); + report_error("socket open","can't open socket"); } // Turn on reuse address int opt_val = true; if(setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (optval_t)&opt_val, sizeof(int)) == -1) { - perror("SO_REUSEADDR"); - throw std::runtime_error("can't set socket option SO_REUSEADDR"); + report_error("SO_REUSEADDR","can't set socket option SO_REUSEADDR"); } // Don't wait when shutting down @@ -132,22 +181,19 @@ gr_udp_sink::open() lngr.l_onoff = 1; lngr.l_linger = 0; if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (optval_t)&lngr, sizeof(linger)) == -1) { - if(errno != ENOPROTOOPT) { // no SO_LINGER for SOCK_DGRAM on Windows - perror("SO_LINGER"); - throw std::runtime_error("can't set socket option SO_LINGER"); + if( !is_error(ENOPROTOOPT) ) { // no SO_LINGER for SOCK_DGRAM on Windows + report_error("SO_LINGER","can't set socket option SO_LINGER"); } } // bind socket to an address and port number to listen on if(bind (d_socket, (sockaddr*)&d_sockaddr_src, sizeof(struct sockaddr)) == -1) { - perror("socket bind"); - throw std::runtime_error("can't bind socket"); + report_error("socket bind","can't bind socket"); } // Not sure if we should throw here or allow retries if(connect(d_socket, (sockaddr*)&d_sockaddr_dst, sizeof(struct sockaddr)) == -1) { - perror("socket connect"); - throw std::runtime_error("can't connect to socket"); + report_error("socket connect","can't connect to socket"); } d_updated = true; @@ -184,8 +230,8 @@ gr_udp_sink::work (int noutput_items, r = send(d_socket, (in+bytes_sent), bytes_to_send, 0); if(r == -1) { // error on send command - perror("udp_sink"); // there should be no error case where this function - return -1; // should not exit immediately + report_error("udp_sink",NULL); // there should be no error case where + return -1; // this function should not exit immediately } bytes_sent += r; diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index fed5b6142..9df47da2e 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -33,13 +33,51 @@ #include typedef void* optval_t; #else +// Not posix, assume winsock +#define USING_WINSOCK #define SHUT_RDWR 2 #define inet_aton(N,A) ( (A)->s_addr = inet_addr(N), ( (A)->s_addr != INADDR_NONE ) ) typedef char* optval_t; +#define ENOPROTOOPT 109 #endif #define SRC_VERBOSE 0 +static int is_error( int perr ) +{ + // Compare error to posix error code; return nonzero if match. +#if defined(USING_WINSOCK) + // All codes to be checked for must be defined below + int werr = WSAGetLastError(); + switch( werr ) { + case WSAETIMEDOUT: + return( perr == EAGAIN ); + case WSAENOPROTOOPT: + return( perr == ENOPROTOOPT ); + default: + fprintf(stderr,"gr_udp_source/is_error: unknown error %d\n", perr ); + throw std::runtime_error("internal error"); + } + return 0; +#else + return( perr == errno ); +#endif +} + +static void report_error( char *msg1, char *msg2 ) +{ + // Deal with errors, both posix and winsock +#if defined(USING_WINSOCK) + int werr = WSAGetLastError(); + fprintf(stderr, "%s: winsock error %d\n", msg1, werr ); +#else + perror(msg1); +#endif + if( msg2 != NULL ) + throw std::runtime_error(msg2); + return; +} + gr_udp_source::gr_udp_source(size_t itemsize, const char *src, unsigned short port_src, int payload_size) : gr_sync_block ("udp_source", @@ -48,6 +86,15 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src, d_itemsize(itemsize), d_updated(false), d_payload_size(payload_size), d_residual(0), d_temp_offset(0) { int ret = 0; + +#if !defined(HAVE_SOCKET) // for Windows (with MinGW) + // initialize winsock DLL + WSADATA wsaData; + int iResult = WSAStartup( MAKEWORD(2,2), &wsaData ); + if( iResult != NO_ERROR ) { + report_error( "gr_udp_source WSAStartup", "can't open socket" ); + } +#endif // Set up the address stucture for the source address and port numbers // Get the source IP address from the host name @@ -57,8 +104,8 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src, } else { // assume it was specified as an IP address if((ret=inet_aton(src, &d_ip_src)) == 0) { // format IP address - perror("Not a valid source IP address or host name"); - throw std::runtime_error("can't initialize source socket"); + report_error("Not a valid source IP address or host name", + "can't initialize source socket"); } } @@ -85,6 +132,11 @@ gr_udp_source::~gr_udp_source () { delete [] d_temp_buff; close(); + +#if !defined(HAVE_SOCKET) // for Windows (with MinGW) + // free winsock resources + WSACleanup(); +#endif } bool @@ -94,15 +146,13 @@ gr_udp_source::open() // create socket d_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP); if(d_socket == -1) { - perror("socket open"); - throw std::runtime_error("can't open socket"); + report_error("socket open","can't open socket"); } // Turn on reuse address int opt_val = 1; if(setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (optval_t)&opt_val, sizeof(int)) == -1) { - perror("SO_REUSEADDR"); - throw std::runtime_error("can't set socket option SO_REUSEADDR"); + report_error("SO_REUSEADDR","can't set socket option SO_REUSEADDR"); } // Don't wait when shutting down @@ -110,26 +160,28 @@ gr_udp_source::open() lngr.l_onoff = 1; lngr.l_linger = 0; if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (optval_t)&lngr, sizeof(linger)) == -1) { - if(errno != ENOPROTOOPT) { // no SO_LINGER for SOCK_DGRAM on Windows - perror("SO_LINGER"); - throw std::runtime_error("can't set socket option SO_LINGER"); + if( !is_error(ENOPROTOOPT) ) { // no SO_LINGER for SOCK_DGRAM on Windows + report_error("SO_LINGER","can't set socket option SO_LINGER"); } } // Set a timeout on the receive function to not block indefinitely // This value can (and probably should) be changed + // Ignored on Cygwin +#if defined(USING_WINSOCK) + DWORD timeout = 1000; // milliseconds +#else timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0; +#endif if(setsockopt(d_socket, SOL_SOCKET, SO_RCVTIMEO, (optval_t)&timeout, sizeof(timeout)) == -1) { - perror("SO_RCVTIMEO"); - throw std::runtime_error("can't set socket option SO_RCVTIMEO"); + report_error("SO_RCVTIMEO","can't set socket option SO_RCVTIMEO"); } // bind socket to an address and port number to listen on if(bind (d_socket, (sockaddr*)&d_sockaddr_src, sizeof(struct sockaddr)) == -1) { - perror("socket bind"); - throw std::runtime_error("can't bind socket"); + report_error("socket bind","can't bind socket"); } d_updated = true; @@ -187,7 +239,7 @@ gr_udp_source::work (int noutput_items, // Check if there was a problem; forget it if the operation just timed out if(r == -1) { - if(errno == EAGAIN) { // handle non-blocking call timeout + if( is_error(EAGAIN) ) { // handle non-blocking call timeout #if SRC_VERBOSE printf("UDP receive timed out\n"); #endif @@ -196,7 +248,7 @@ gr_udp_source::work (int noutput_items, break; } else { - perror("udp_source"); + report_error("udp_source",NULL); return -1; } } -- cgit From d1ae6560ab2b8b5d474e58f865314a6cf18b958c Mon Sep 17 00:00:00 2001 From: Don Ward Date: Sun, 18 Apr 2010 19:01:56 -0400 Subject: Use getaddrinfo in gr_udp_{source,sink} Using getaddrinfo allows more common code between posix and winsock systems. Remove unused variables and #include files. Close sockets when done. --- gnuradio-core/src/lib/io/gr_udp_sink.cc | 69 ++++++++++++++----------------- gnuradio-core/src/lib/io/gr_udp_sink.h | 21 +++------- gnuradio-core/src/lib/io/gr_udp_source.cc | 46 ++++++++++----------- gnuradio-core/src/lib/io/gr_udp_source.h | 16 +++---- 4 files changed, 66 insertions(+), 86 deletions(-) mode change 100644 => 100755 gnuradio-core/src/lib/io/gr_udp_source.h (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc index dff066288..3d8d65145 100755 --- a/gnuradio-core/src/lib/io/gr_udp_sink.cc +++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc @@ -26,14 +26,15 @@ #include #include #include -#if defined(HAVE_SOCKET) -#include +#include #include +#include +#if defined(HAVE_NETDB_H) typedef void* optval_t; #else +// if not posix, assume winsock #define USING_WINSOCK #define SHUT_RDWR 2 -#define inet_aton(N,A) ( (A)->s_addr = inet_addr(N), ( (A)->s_addr != INADDR_NONE ) ) typedef char* optval_t; #define ENOPROTOOPT 109 #endif @@ -99,39 +100,24 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, // Set up the address stucture for the source address and port numbers // Get the source IP address from the host name - struct hostent *hsrc = gethostbyname(src); - if(hsrc) { // if the source was provided as a host namex - d_ip_src = *(struct in_addr*)hsrc->h_addr_list[0]; - } - else { // assume it was specified as an IP address - if((ret=inet_aton(src, &d_ip_src)) == 0) { // format IP address - report_error("Not a valid source IP address or host name", - "can't initialize source socket"); - } - } + struct addrinfo hints; + memset( (void*)&hints, 0, sizeof(hints) ); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = IPPROTO_UDP; + char port_str[7]; + sprintf( port_str, "%d", port_src ); + ret = getaddrinfo( src, port_str, &hints, &d_ip_src ); + if( ret != 0 ) + report_error("gr_udp_source/getaddrinfo", + "can't initialize source socket" ); // Get the destination IP address from the host name - struct hostent *hdst = gethostbyname(dst); - if(hdst) { // if the source was provided as a host namex - d_ip_dst = *(struct in_addr*)hdst->h_addr_list[0]; - } - else { // assume it was specified as an IP address - if((ret=inet_aton(dst, &d_ip_dst)) == 0) { // format IP address - report_error("Not a valid destination IP address or host name", - "can't initialize destination socket"); - } - } - - d_port_src = htons(port_src); // format port number - d_port_dst = htons(port_dst); // format port number - - d_sockaddr_src.sin_family = AF_INET; - d_sockaddr_src.sin_addr = d_ip_src; - d_sockaddr_src.sin_port = d_port_src; - - d_sockaddr_dst.sin_family = AF_INET; - d_sockaddr_dst.sin_addr = d_ip_dst; - d_sockaddr_dst.sin_port = d_port_dst; + sprintf( port_str, "%d", port_dst ); + ret = getaddrinfo( dst, port_str, &hints, &d_ip_dst ); + if( ret != 0 ) + report_error("gr_udp_source/getaddrinfo", + "can't initialize destination socket" ); open(); } @@ -152,6 +138,8 @@ gr_make_udp_sink (size_t itemsize, gr_udp_sink::~gr_udp_sink () { + freeaddrinfo(d_ip_src); + freeaddrinfo(d_ip_dst); close(); #if !defined(HAVE_SOCKET) // for Windows (with MinGW) @@ -166,7 +154,9 @@ gr_udp_sink::open() gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function // create socket - if((d_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { + d_socket = socket(d_ip_src->ai_family, d_ip_src->ai_socktype, + d_ip_src->ai_protocol); + if(d_socket == -1) { report_error("socket open","can't open socket"); } @@ -187,12 +177,12 @@ gr_udp_sink::open() } // bind socket to an address and port number to listen on - if(bind (d_socket, (sockaddr*)&d_sockaddr_src, sizeof(struct sockaddr)) == -1) { + if(bind (d_socket, d_ip_src->ai_addr, d_ip_src->ai_addrlen) == -1) { report_error("socket bind","can't bind socket"); } // Not sure if we should throw here or allow retries - if(connect(d_socket, (sockaddr*)&d_sockaddr_dst, sizeof(struct sockaddr)) == -1) { + if(connect(d_socket, d_ip_dst->ai_addr, d_ip_dst->ai_addrlen) == -1) { report_error("socket connect","can't connect to socket"); } @@ -207,6 +197,11 @@ gr_udp_sink::close() if (d_socket){ shutdown(d_socket, SHUT_RDWR); +#if defined(USING_WINSOCK) + closesocket(d_socket); +#else + ::close(d_socket); +#endif d_socket = 0; } d_updated = true; diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.h b/gnuradio-core/src/lib/io/gr_udp_sink.h index f22b92dd0..9f50ed7f0 100644 --- a/gnuradio-core/src/lib/io/gr_udp_sink.h +++ b/gnuradio-core/src/lib/io/gr_udp_sink.h @@ -24,16 +24,12 @@ #define INCLUDED_GR_UDP_SINK_H #include -#include -#if defined(HAVE_SOCKET) -#include -#include +#if defined(HAVE_NETDB_H) +#include +#include // usually #included by ? #elif defined(HAVE_WINDOWS_H) #include -#include -#endif -#if defined(HAVE_NETINET_IN_H) -#include +#include #endif #include @@ -75,13 +71,8 @@ class gr_udp_sink : public gr_sync_block int d_payload_size; // maximum transmission unit (packet length) int d_socket; // handle to socket - int d_socket_rcv; // handle to socket retuned in the accept call - struct in_addr d_ip_src; // store the source ip info - struct in_addr d_ip_dst; // store the destination ip info - unsigned short d_port_src; // the port number to open for connections to this service - unsigned short d_port_dst; // port number of the remove system - struct sockaddr_in d_sockaddr_src; // store the source sockaddr data (formatted IP address and port number) - struct sockaddr_in d_sockaddr_dst; // store the destination sockaddr data (formatted IP address and port number) + struct addrinfo *d_ip_src; // store the source ip info + struct addrinfo *d_ip_dst; // store the destination ip info protected: /*! diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index 9df47da2e..f459e7f13 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -29,14 +29,12 @@ #include #include #include -#if defined(HAVE_SOCKET) -#include +#if defined(HAVE_NETDB_H) typedef void* optval_t; #else -// Not posix, assume winsock +// if not posix, assume winsock #define USING_WINSOCK #define SHUT_RDWR 2 -#define inet_aton(N,A) ( (A)->s_addr = inet_addr(N), ( (A)->s_addr != INADDR_NONE ) ) typedef char* optval_t; #define ENOPROTOOPT 109 #endif @@ -98,22 +96,17 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src, // Set up the address stucture for the source address and port numbers // Get the source IP address from the host name - struct hostent *hsrc = gethostbyname(src); - if(hsrc) { // if the source was provided as a host namex - d_ip_src = *(struct in_addr*)hsrc->h_addr_list[0]; - } - else { // assume it was specified as an IP address - if((ret=inet_aton(src, &d_ip_src)) == 0) { // format IP address - report_error("Not a valid source IP address or host name", - "can't initialize source socket"); - } - } - - d_port_src = htons(port_src); // format port number - - d_sockaddr_src.sin_family = AF_INET; - d_sockaddr_src.sin_addr = d_ip_src; - d_sockaddr_src.sin_port = d_port_src; + struct addrinfo hints; + memset( (void*)&hints, 0, sizeof(hints) ); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = IPPROTO_UDP; + char port_str[7]; + sprintf( port_str, "%d", port_src ); + ret = getaddrinfo( src, port_str, &hints, &d_ip_src ); + if( ret != 0 ) + report_error("gr_udp_source/getaddrinfo", + "can't initialize source socket" ); d_temp_buff = new char[d_payload_size]; // allow it to hold up to payload_size bytes @@ -130,6 +123,7 @@ gr_make_udp_source (size_t itemsize, const char *ipaddr, gr_udp_source::~gr_udp_source () { + freeaddrinfo(d_ip_src); delete [] d_temp_buff; close(); @@ -144,7 +138,8 @@ gr_udp_source::open() { gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function // create socket - d_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP); + d_socket = socket(d_ip_src->ai_family, d_ip_src->ai_socktype, + d_ip_src->ai_protocol); if(d_socket == -1) { report_error("socket open","can't open socket"); } @@ -180,10 +175,10 @@ gr_udp_source::open() } // bind socket to an address and port number to listen on - if(bind (d_socket, (sockaddr*)&d_sockaddr_src, sizeof(struct sockaddr)) == -1) { + if(bind (d_socket, d_ip_src->ai_addr, d_ip_src->ai_addrlen) == -1) { report_error("socket bind","can't bind socket"); } - + d_updated = true; return d_socket != 0; } @@ -195,6 +190,11 @@ gr_udp_source::close() if (d_socket){ shutdown(d_socket, SHUT_RDWR); +#if defined(USING_WINSOCK) + closesocket(d_socket); +#else + ::close(d_socket); +#endif d_socket = 0; } d_updated = true; diff --git a/gnuradio-core/src/lib/io/gr_udp_source.h b/gnuradio-core/src/lib/io/gr_udp_source.h old mode 100644 new mode 100755 index 61d719e4d..14d521dac --- a/gnuradio-core/src/lib/io/gr_udp_source.h +++ b/gnuradio-core/src/lib/io/gr_udp_source.h @@ -24,15 +24,12 @@ #define INCLUDED_GR_UDP_SOURCE_H #include -#if defined(HAVE_SOCKET) -#include -#include +#if defined(HAVE_NETDB_H) +#include +#include // usually #included by ? #elif defined(HAVE_WINDOWS_H) #include -#include -#endif -#if defined(HAVE_NETINET_IN_H) -#include +#include #endif #include @@ -68,10 +65,7 @@ class gr_udp_source : public gr_sync_block int d_payload_size; // maximum transmission unit (packet length) int d_socket; // handle to socket - int d_socket_rcv; // handle to socket retuned in the accept call - struct in_addr d_ip_src; // store the source IP address to use - unsigned short d_port_src; // the port number to open for connections to this service - struct sockaddr_in d_sockaddr_src; // store the source sockaddr data (formatted IP address and port number) + struct addrinfo *d_ip_src; // store the source IP address to use char *d_temp_buff; // hold buffer between calls ssize_t d_residual; // hold information about number of bytes stored in the temp buffer size_t d_temp_offset; // point to temp buffer location offset -- cgit From 3b8fcaa640d75573d314fb8616969ad2adf2a099 Mon Sep 17 00:00:00 2001 From: Don Ward Date: Mon, 19 Apr 2010 17:02:14 -0400 Subject: Discard data in gr_udp_sink until receiver is started. Also fixes warnings from gcc 4.3 and adds for usrp2. --- gnuradio-core/src/lib/io/gr_udp_sink.cc | 17 ++++++++++++----- gnuradio-core/src/lib/io/gr_udp_source.cc | 2 +- 2 files changed, 13 insertions(+), 6 deletions(-) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc index 3d8d65145..b447dd3b3 100755 --- a/gnuradio-core/src/lib/io/gr_udp_sink.cc +++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc @@ -36,7 +36,6 @@ typedef void* optval_t; #define USING_WINSOCK #define SHUT_RDWR 2 typedef char* optval_t; -#define ENOPROTOOPT 109 #endif #include @@ -47,6 +46,8 @@ static int is_error( int perr ) { // Compare error to posix error code; return nonzero if match. #if defined(USING_WINSOCK) +#define ENOPROTOOPT 109 +#define ECONNREFUSED 111 // All codes to be checked for must be defined below int werr = WSAGetLastError(); switch( werr ) { @@ -54,6 +55,8 @@ static int is_error( int perr ) return( perr == EAGAIN ); case WSAENOPROTOOPT: return( perr == ENOPROTOOPT ); + case WSAECONNREFUSED: + return( perr == ECONNREFUSED ); default: fprintf(stderr,"gr_udp_source/is_error: unknown error %d\n", perr ); throw std::runtime_error("internal error"); @@ -64,7 +67,7 @@ static int is_error( int perr ) #endif } -static void report_error( char *msg1, char *msg2 ) +static void report_error( const char *msg1, const char *msg2 ) { // Deal with errors, both posix and winsock #if defined(USING_WINSOCK) @@ -217,7 +220,7 @@ gr_udp_sink::work (int noutput_items, ssize_t total_size = noutput_items*d_itemsize; #if SNK_VERBOSE - printf("Entered upd_sink\n"); + printf("Entered udp_sink\n"); #endif while(bytes_sent < total_size) { @@ -225,8 +228,12 @@ gr_udp_sink::work (int noutput_items, r = send(d_socket, (in+bytes_sent), bytes_to_send, 0); if(r == -1) { // error on send command - report_error("udp_sink",NULL); // there should be no error case where - return -1; // this function should not exit immediately + if( is_error(ECONNREFUSED) ) + r = bytes_to_send; // discard data until receiver is started + else { + report_error("udp_sink",NULL); // there should be no error case where + return -1; // this function should not exit immediately + } } bytes_sent += r; diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index f459e7f13..56499258c 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -36,7 +36,6 @@ typedef void* optval_t; #define USING_WINSOCK #define SHUT_RDWR 2 typedef char* optval_t; -#define ENOPROTOOPT 109 #endif #define SRC_VERBOSE 0 @@ -45,6 +44,7 @@ static int is_error( int perr ) { // Compare error to posix error code; return nonzero if match. #if defined(USING_WINSOCK) +#define ENOPROTOOPT 109 // All codes to be checked for must be defined below int werr = WSAGetLastError(); switch( werr ) { -- cgit From 3ff43f7487b43436cd0f49de80ebff2c1ff1a188 Mon Sep 17 00:00:00 2001 From: Don Ward Date: Fri, 30 Apr 2010 14:48:56 -0400 Subject: Updates to udp source/sink (select(), wait, cleanup) Use select() to avoid blocking on recv() in gr_udp_source (only known way to avoid blocking on Cygwin). Add wait argument to gr_udp_source to allow waiting for connection or accepting lack of connection as EOF; add --no-wait option to dial_tone_sink.py. Remove system dependencies from .h files; remove unused data members and (useless?) public open and close functions. --- gnuradio-core/src/lib/io/gr_udp_sink.cc | 85 +++++++++------------- gnuradio-core/src/lib/io/gr_udp_sink.h | 21 +----- gnuradio-core/src/lib/io/gr_udp_source.cc | 117 ++++++++++++++++++------------ gnuradio-core/src/lib/io/gr_udp_source.h | 32 +++----- gnuradio-core/src/lib/io/gr_udp_source.i | 7 +- 5 files changed, 121 insertions(+), 141 deletions(-) mode change 100644 => 100755 gnuradio-core/src/lib/io/gr_udp_source.i (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc index b447dd3b3..263d3dd4f 100755 --- a/gnuradio-core/src/lib/io/gr_udp_sink.cc +++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2007,2008,2009 Free Software Foundation, Inc. + * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -31,9 +31,11 @@ #include #if defined(HAVE_NETDB_H) typedef void* optval_t; -#else +#elif defined(HAVE_WINDOWS_H) // if not posix, assume winsock #define USING_WINSOCK +#include +#include #define SHUT_RDWR 2 typedef char* optval_t; #endif @@ -88,11 +90,13 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, : gr_sync_block ("udp_sink", gr_make_io_signature (1, 1, itemsize), gr_make_io_signature (0, 0, 0)), - d_itemsize (itemsize), d_updated(false), d_payload_size(payload_size) + d_itemsize (itemsize), d_payload_size(payload_size) { int ret = 0; + struct addrinfo *ip_src; // store the source ip info + struct addrinfo *ip_dst; // store the destination ip info -#if !defined(HAVE_SOCKET) // for Windows (with MinGW) +#if defined(USING_WINSOCK) // for Windows (with MinGW) // initialize winsock DLL WSADATA wsaData; int iResult = WSAStartup( MAKEWORD(2,2), &wsaData ); @@ -110,55 +114,21 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, hints.ai_protocol = IPPROTO_UDP; char port_str[7]; sprintf( port_str, "%d", port_src ); - ret = getaddrinfo( src, port_str, &hints, &d_ip_src ); + ret = getaddrinfo( src, port_str, &hints, &ip_src ); if( ret != 0 ) report_error("gr_udp_source/getaddrinfo", "can't initialize source socket" ); // Get the destination IP address from the host name sprintf( port_str, "%d", port_dst ); - ret = getaddrinfo( dst, port_str, &hints, &d_ip_dst ); + ret = getaddrinfo( dst, port_str, &hints, &ip_dst ); if( ret != 0 ) report_error("gr_udp_source/getaddrinfo", "can't initialize destination socket" ); - - open(); -} - -// public constructor that returns a shared_ptr - -gr_udp_sink_sptr -gr_make_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size) -{ - return gr_udp_sink_sptr (new gr_udp_sink (itemsize, - src, port_src, - dst, port_dst, - payload_size)); -} - -gr_udp_sink::~gr_udp_sink () -{ - freeaddrinfo(d_ip_src); - freeaddrinfo(d_ip_dst); - close(); - -#if !defined(HAVE_SOCKET) // for Windows (with MinGW) - // free winsock resources - WSACleanup(); -#endif -} - -bool -gr_udp_sink::open() -{ - gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function // create socket - d_socket = socket(d_ip_src->ai_family, d_ip_src->ai_socktype, - d_ip_src->ai_protocol); + d_socket = socket(ip_src->ai_family, ip_src->ai_socktype, + ip_src->ai_protocol); if(d_socket == -1) { report_error("socket open","can't open socket"); } @@ -180,24 +150,35 @@ gr_udp_sink::open() } // bind socket to an address and port number to listen on - if(bind (d_socket, d_ip_src->ai_addr, d_ip_src->ai_addrlen) == -1) { + if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) { report_error("socket bind","can't bind socket"); } // Not sure if we should throw here or allow retries - if(connect(d_socket, d_ip_dst->ai_addr, d_ip_dst->ai_addrlen) == -1) { + if(connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) { report_error("socket connect","can't connect to socket"); } - d_updated = true; - return d_socket != 0; + freeaddrinfo(ip_src); + freeaddrinfo(ip_dst); } -void -gr_udp_sink::close() +// public constructor that returns a shared_ptr + +gr_udp_sink_sptr +gr_make_udp_sink (size_t itemsize, + const char *src, unsigned short port_src, + const char *dst, unsigned short port_dst, + int payload_size) { - gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function + return gr_udp_sink_sptr (new gr_udp_sink (itemsize, + src, port_src, + dst, port_dst, + payload_size)); +} +gr_udp_sink::~gr_udp_sink () +{ if (d_socket){ shutdown(d_socket, SHUT_RDWR); #if defined(USING_WINSOCK) @@ -207,7 +188,11 @@ gr_udp_sink::close() #endif d_socket = 0; } - d_updated = true; + +#if defined(USING_WINSOCK) // for Windows (with MinGW) + // free winsock resources + WSACleanup(); +#endif } int diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.h b/gnuradio-core/src/lib/io/gr_udp_sink.h index 9f50ed7f0..6b6ee40fe 100644 --- a/gnuradio-core/src/lib/io/gr_udp_sink.h +++ b/gnuradio-core/src/lib/io/gr_udp_sink.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2007,2008,2009 Free Software Foundation, Inc. + * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -66,13 +66,9 @@ class gr_udp_sink : public gr_sync_block int payload_size); private: size_t d_itemsize; - bool d_updated; - gruel::mutex d_mutex; int d_payload_size; // maximum transmission unit (packet length) int d_socket; // handle to socket - struct addrinfo *d_ip_src; // store the source ip info - struct addrinfo *d_ip_dst; // store the destination ip info protected: /*! @@ -96,21 +92,6 @@ class gr_udp_sink : public gr_sync_block public: ~gr_udp_sink (); - /*! - * \brief open a socket specified by the port and ip address info - * - * Opens a socket, binds to the address, and makes connectionless association - * over UDP. If any of these fail, the fuction retuns the error and exits. - */ - bool open(); - - /*! - * \brief Close current socket. - * - * Shuts down read/write on the socket - */ - void close(); - /*! \brief return the PAYLOAD_SIZE of the socket */ int payload_size() { return d_payload_size; } diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index 56499258c..ce870d481 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2007,2008,2009 Free Software Foundation, Inc. + * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -30,14 +30,19 @@ #include #include #if defined(HAVE_NETDB_H) +#include typedef void* optval_t; -#else +#elif defined(HAVE_WINDOWS_H) // if not posix, assume winsock #define USING_WINSOCK +#include +#include #define SHUT_RDWR 2 typedef char* optval_t; #endif +#define USE_SELECT 1 // non-blocking receive on all platforms +#define USE_RCV_TIMEO 0 // non-blocking receive on all but Cygwin #define SRC_VERBOSE 0 static int is_error( int perr ) @@ -77,15 +82,17 @@ static void report_error( char *msg1, char *msg2 ) } gr_udp_source::gr_udp_source(size_t itemsize, const char *src, - unsigned short port_src, int payload_size) + unsigned short port_src, int payload_size, + bool wait) : gr_sync_block ("udp_source", gr_make_io_signature(0, 0, 0), gr_make_io_signature(1, 1, itemsize)), - d_itemsize(itemsize), d_updated(false), d_payload_size(payload_size), d_residual(0), d_temp_offset(0) + d_itemsize(itemsize), d_payload_size(payload_size), d_wait(wait), d_residual(0), d_temp_offset(0) { int ret = 0; + struct addrinfo *ip_src; // store the source IP address to use -#if !defined(HAVE_SOCKET) // for Windows (with MinGW) +#if defined(USING_WINSOCK) // for Windows (with MinGW) // initialize winsock DLL WSADATA wsaData; int iResult = WSAStartup( MAKEWORD(2,2), &wsaData ); @@ -103,43 +110,16 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src, hints.ai_protocol = IPPROTO_UDP; char port_str[7]; sprintf( port_str, "%d", port_src ); - ret = getaddrinfo( src, port_str, &hints, &d_ip_src ); + ret = getaddrinfo( src, port_str, &hints, &ip_src ); if( ret != 0 ) report_error("gr_udp_source/getaddrinfo", "can't initialize source socket" ); d_temp_buff = new char[d_payload_size]; // allow it to hold up to payload_size bytes - - open(); -} - -gr_udp_source_sptr -gr_make_udp_source (size_t itemsize, const char *ipaddr, - unsigned short port, int payload_size) -{ - return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, - port, payload_size)); -} - -gr_udp_source::~gr_udp_source () -{ - freeaddrinfo(d_ip_src); - delete [] d_temp_buff; - close(); -#if !defined(HAVE_SOCKET) // for Windows (with MinGW) - // free winsock resources - WSACleanup(); -#endif -} - -bool -gr_udp_source::open() -{ - gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function // create socket - d_socket = socket(d_ip_src->ai_family, d_ip_src->ai_socktype, - d_ip_src->ai_protocol); + d_socket = socket(ip_src->ai_family, ip_src->ai_socktype, + ip_src->ai_protocol); if(d_socket == -1) { report_error("socket open","can't open socket"); } @@ -160,6 +140,7 @@ gr_udp_source::open() } } +#if USE_RCV_TIMEO // Set a timeout on the receive function to not block indefinitely // This value can (and probably should) be changed // Ignored on Cygwin @@ -173,20 +154,27 @@ gr_udp_source::open() if(setsockopt(d_socket, SOL_SOCKET, SO_RCVTIMEO, (optval_t)&timeout, sizeof(timeout)) == -1) { report_error("SO_RCVTIMEO","can't set socket option SO_RCVTIMEO"); } +#endif // USE_RCV_TIMEO // bind socket to an address and port number to listen on - if(bind (d_socket, d_ip_src->ai_addr, d_ip_src->ai_addrlen) == -1) { + if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) { report_error("socket bind","can't bind socket"); } + freeaddrinfo(ip_src); - d_updated = true; - return d_socket != 0; } -void -gr_udp_source::close() +gr_udp_source_sptr +gr_make_udp_source (size_t itemsize, const char *ipaddr, + unsigned short port, int payload_size, bool wait) { - gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function + return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, + port, payload_size, wait)); +} + +gr_udp_source::~gr_udp_source () +{ + delete [] d_temp_buff; if (d_socket){ shutdown(d_socket, SHUT_RDWR); @@ -197,7 +185,11 @@ gr_udp_source::close() #endif d_socket = 0; } - d_updated = true; + +#if defined(USING_WINSOCK) // for Windows (with MinGW) + // free winsock resources + WSACleanup(); +#endif } int @@ -232,8 +224,38 @@ gr_udp_source::work (int noutput_items, d_temp_offset = d_temp_offset+d_residual; } +#if USE_SELECT + // Use select() to determine when socket is readable + fd_set readfds; + timeval timeout; + timeout.tv_sec = 1; + timeout.tv_usec = 0; +#endif + while(1) { // get the data into our output buffer and record the number of bytes + +#if USE_SELECT + // RCV_TIMEO doesn't work on all systems (e.g., Cygwin) + // use select() instead of, or in addition to RCV_TIMEO + FD_ZERO(&readfds); + FD_SET(d_socket, &readfds); + r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout); + if(r < 0) { + report_error("udp_source/select",NULL); + return -1; + } + else if(r == 0 ) { // timed out + if( d_wait ) { + // Allow boost thread interrupt, then try again + boost::this_thread::interruption_point(); + continue; + } + else + return -1; + } +#endif // USE_SELECT + // This is a non-blocking call with a timeout set in the constructor r = recv(d_socket, d_temp_buff, d_payload_size, 0); // get the entire payload or the what's available @@ -244,11 +266,16 @@ gr_udp_source::work (int noutput_items, printf("UDP receive timed out\n"); #endif - // Break here to allow the rest of the flow graph time to run and so ctrl-C breaks - break; + if( d_wait ) { + // Allow boost thread interrupt, then try again + boost::this_thread::interruption_point(); + continue; + } + else + return -1; } else { - report_error("udp_source",NULL); + report_error("udp_source/recv",NULL); return -1; } } diff --git a/gnuradio-core/src/lib/io/gr_udp_source.h b/gnuradio-core/src/lib/io/gr_udp_source.h index 14d521dac..b06536d6a 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.h +++ b/gnuradio-core/src/lib/io/gr_udp_source.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2007,2008,2009 Free Software Foundation, Inc. + * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -38,7 +38,8 @@ class gr_udp_source; typedef boost::shared_ptr gr_udp_source_sptr; gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, - unsigned short port_src, int payload_size=1472); + unsigned short port_src, + int payload_size=1472, bool wait=true); /*! * \brief Read stream from an UDP socket. @@ -50,22 +51,22 @@ gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, * \param port_src The port number on which the socket listens for data * \param payload_size UDP payload size by default set to * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param wait Wait for data if not immediately available (default: true) * */ class gr_udp_source : public gr_sync_block { friend gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, - unsigned short port_src, int payload_size); + unsigned short port_src, + int payload_size, bool wait); private: size_t d_itemsize; - bool d_updated; - gruel::mutex d_mutex; int d_payload_size; // maximum transmission unit (packet length) + bool d_wait; // wait if data if not immediately available int d_socket; // handle to socket - struct addrinfo *d_ip_src; // store the source IP address to use char *d_temp_buff; // hold buffer between calls ssize_t d_residual; // hold information about number of bytes stored in the temp buffer size_t d_temp_offset; // point to temp buffer location offset @@ -80,27 +81,14 @@ class gr_udp_source : public gr_sync_block * \param port_src The port number on which the socket listens for data * \param payload_size UDP payload size by default set to * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param wait Wait for data if not immediately available (default: true) */ - gr_udp_source(size_t itemsize, const char *src, unsigned short port_src, int payload_size); + gr_udp_source(size_t itemsize, const char *src, unsigned short port_src, + int payload_size, bool wait); public: ~gr_udp_source(); - /*! - * \brief open a socket specified by the port and ip address info - * - * Opens a socket, binds to the address, and waits for a connection - * over UDP. If any of these fail, the fuction retuns the error and exits. - */ - bool open(); - - /*! - * \brief Close current socket. - * - * Shuts down read/write on the socket - */ - void close(); - /*! \brief return the PAYLOAD_SIZE of the socket */ int payload_size() { return d_payload_size; } diff --git a/gnuradio-core/src/lib/io/gr_udp_source.i b/gnuradio-core/src/lib/io/gr_udp_source.i old mode 100644 new mode 100755 index fb39dad68..efaa57c27 --- a/gnuradio-core/src/lib/io/gr_udp_source.i +++ b/gnuradio-core/src/lib/io/gr_udp_source.i @@ -24,19 +24,18 @@ GR_SWIG_BLOCK_MAGIC(gr,udp_source) gr_udp_source_sptr gr_make_udp_source (size_t itemsize, const char *src, - unsigned short port_src, int payload_size=1472); + unsigned short port_src, int payload_size=1472, + bool wait=true); class gr_udp_source : public gr_sync_block { protected: gr_udp_source (size_t itemsize, const char *src, - unsigned short port_src, int payload_size); + unsigned short port_src, int payload_size, bool wait); public: ~gr_udp_source (); - bool open(); - void close(); int payload_size() { return d_payload_size; } }; -- cgit From d702e27d1f3b0e76ef3734ee6b5b6ac1333cdbff Mon Sep 17 00:00:00 2001 From: Don Ward Date: Tue, 4 May 2010 12:41:52 -0400 Subject: Rework UDP source and sink, with incompatible API changes Remove source address specifications for sink; add connect() and disconnect() to sink; add get_port() to source; add optional EOF signaling (using zero-length packets) to sink and source; modify dial_tone, vector, and audio examples to match new code; add qa test case. --- gnuradio-core/src/lib/io/gr_udp_sink.cc | 161 +++++++++++++-------- gnuradio-core/src/lib/io/gr_udp_sink.h | 72 ++++----- gnuradio-core/src/lib/io/gr_udp_sink.i | 21 ++- gnuradio-core/src/lib/io/gr_udp_source.cc | 62 ++++++-- gnuradio-core/src/lib/io/gr_udp_source.h | 70 +++++---- gnuradio-core/src/lib/io/gr_udp_source.i | 14 +- gnuradio-core/src/python/gnuradio/gr/Makefile.am | 1 + .../src/python/gnuradio/gr/qa_udp_sink_source.py | 99 +++++++++++++ 8 files changed, 343 insertions(+), 157 deletions(-) mode change 100644 => 100755 gnuradio-core/src/lib/io/gr_udp_sink.h mode change 100644 => 100755 gnuradio-core/src/lib/io/gr_udp_sink.i mode change 100644 => 100755 gnuradio-core/src/python/gnuradio/gr/Makefile.am create mode 100755 gnuradio-core/src/python/gnuradio/gr/qa_udp_sink_source.py (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc index 263d3dd4f..a9cb87a21 100755 --- a/gnuradio-core/src/lib/io/gr_udp_sink.cc +++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc @@ -30,6 +30,8 @@ #include #include #if defined(HAVE_NETDB_H) +#include +#include //usually included by ? typedef void* optval_t; #elif defined(HAVE_WINDOWS_H) // if not posix, assume winsock @@ -84,18 +86,14 @@ static void report_error( const char *msg1, const char *msg2 ) } gr_udp_sink::gr_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size) + const char *host, unsigned short port, + int payload_size, bool eof) : gr_sync_block ("udp_sink", gr_make_io_signature (1, 1, itemsize), gr_make_io_signature (0, 0, 0)), - d_itemsize (itemsize), d_payload_size(payload_size) + d_itemsize (itemsize), d_payload_size(payload_size), d_eof(eof), + d_connected(false) { - int ret = 0; - struct addrinfo *ip_src; // store the source ip info - struct addrinfo *ip_dst; // store the destination ip info - #if defined(USING_WINSOCK) // for Windows (with MinGW) // initialize winsock DLL WSADATA wsaData; @@ -104,41 +102,13 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, report_error( "gr_udp_source WSAStartup", "can't open socket" ); } #endif - - // Set up the address stucture for the source address and port numbers - // Get the source IP address from the host name - struct addrinfo hints; - memset( (void*)&hints, 0, sizeof(hints) ); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol = IPPROTO_UDP; - char port_str[7]; - sprintf( port_str, "%d", port_src ); - ret = getaddrinfo( src, port_str, &hints, &ip_src ); - if( ret != 0 ) - report_error("gr_udp_source/getaddrinfo", - "can't initialize source socket" ); - - // Get the destination IP address from the host name - sprintf( port_str, "%d", port_dst ); - ret = getaddrinfo( dst, port_str, &hints, &ip_dst ); - if( ret != 0 ) - report_error("gr_udp_source/getaddrinfo", - "can't initialize destination socket" ); // create socket - d_socket = socket(ip_src->ai_family, ip_src->ai_socktype, - ip_src->ai_protocol); + d_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if(d_socket == -1) { report_error("socket open","can't open socket"); } - // Turn on reuse address - int opt_val = true; - if(setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (optval_t)&opt_val, sizeof(int)) == -1) { - report_error("SO_REUSEADDR","can't set socket option SO_REUSEADDR"); - } - // Don't wait when shutting down linger lngr; lngr.l_onoff = 1; @@ -149,36 +119,27 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, } } - // bind socket to an address and port number to listen on - if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) { - report_error("socket bind","can't bind socket"); - } - - // Not sure if we should throw here or allow retries - if(connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) { - report_error("socket connect","can't connect to socket"); - } - - freeaddrinfo(ip_src); - freeaddrinfo(ip_dst); + // Get the destination address + connect(host, port); } // public constructor that returns a shared_ptr gr_udp_sink_sptr gr_make_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size) + const char *host, unsigned short port, + int payload_size, bool eof) { return gr_udp_sink_sptr (new gr_udp_sink (itemsize, - src, port_src, - dst, port_dst, - payload_size)); + host, port, + payload_size, eof)); } gr_udp_sink::~gr_udp_sink () { + if (d_connected) + disconnect(); + if (d_socket){ shutdown(d_socket, SHUT_RDWR); #if defined(USING_WINSOCK) @@ -208,22 +169,28 @@ gr_udp_sink::work (int noutput_items, printf("Entered udp_sink\n"); #endif + gruel::scoped_lock guard(d_mutex); // protect d_socket + while(bytes_sent < total_size) { bytes_to_send = std::min((ssize_t)d_payload_size, (total_size-bytes_sent)); - r = send(d_socket, (in+bytes_sent), bytes_to_send, 0); - if(r == -1) { // error on send command - if( is_error(ECONNREFUSED) ) - r = bytes_to_send; // discard data until receiver is started - else { - report_error("udp_sink",NULL); // there should be no error case where - return -1; // this function should not exit immediately + if(d_connected) { + r = send(d_socket, (in+bytes_sent), bytes_to_send, 0); + if(r == -1) { // error on send command + if( is_error(ECONNREFUSED) ) + r = bytes_to_send; // discard data until receiver is started + else { + report_error("udp_sink",NULL); // there should be no error case where + return -1; // this function should not exit immediately + } } } + else + r = bytes_to_send; // discarded for lack of connection bytes_sent += r; #if SNK_VERBOSE - printf("\tbyte sent: %d bytes\n", bytes); + printf("\tbyte sent: %d bytes\n", r); #endif } @@ -233,3 +200,71 @@ gr_udp_sink::work (int noutput_items, return noutput_items; } + +void gr_udp_sink::connect( const char *host, unsigned short port ) +{ + if(d_connected) + disconnect(); + + if(host != NULL ) { + // Get the destination address + struct addrinfo *ip_dst; + struct addrinfo hints; + memset( (void*)&hints, 0, sizeof(hints) ); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = IPPROTO_UDP; + char port_str[12]; + sprintf( port_str, "%d", port ); + int ret = getaddrinfo( host, port_str, &hints, &ip_dst ); + if( ret != 0 ) + report_error("gr_udp_source/getaddrinfo", + "can't initialize destination socket" ); + + // don't need d_mutex lock when !d_connected + if(::connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) { + report_error("socket connect","can't connect to socket"); + } + d_connected = true; + + freeaddrinfo(ip_dst); + } + + return; +} + +void gr_udp_sink::disconnect() +{ + if(!d_connected) + return; + + #if SNK_VERBOSE + printf("gr_udp_sink disconnecting\n"); + #endif + + gruel::scoped_lock guard(d_mutex); // protect d_socket from work() + + // Send a few zero-length packets to signal receiver we are done + if(d_eof) { + int i; + for( i = 0; i < 3; i++ ) + (void) send( d_socket, NULL, 0, 0 ); // ignore errors + } + + // Since I can't find any way to disconnect a datagram socket in Cygwin, + // we just leave it connected but disable sending. +#if 0 + // zeroed address structure should reset connection + struct sockaddr addr; + memset( (void*)&addr, 0, sizeof(addr) ); + // addr.sa_family = AF_UNSPEC; // doesn't work on Cygwin + // addr.sa_family = AF_INET; // doesn't work on Cygwin + + if(::connect(d_socket, &addr, sizeof(addr)) == -1) + report_error("socket connect","can't connect to socket"); +#endif + + d_connected = false; + + return; +} diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.h b/gnuradio-core/src/lib/io/gr_udp_sink.h old mode 100644 new mode 100755 index 6b6ee40fe..421d514a4 --- a/gnuradio-core/src/lib/io/gr_udp_sink.h +++ b/gnuradio-core/src/lib/io/gr_udp_sink.h @@ -24,14 +24,6 @@ #define INCLUDED_GR_UDP_SINK_H #include -#if defined(HAVE_NETDB_H) -#include -#include // usually #included by ? -#elif defined(HAVE_WINDOWS_H) -#include -#include -#endif - #include class gr_udp_sink; @@ -39,55 +31,52 @@ typedef boost::shared_ptr gr_udp_sink_sptr; gr_udp_sink_sptr gr_make_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size=1472); + const char *host, unsigned short port, + int payload_size=1472, bool eof=true); /*! * \brief Write stream to an UDP socket. * \ingroup sink_blk * * \param itemsize The size (in bytes) of the item datatype - * \param src The source address as either the host name or the 'numbers-and-dots' - * IP address - * \param port_src Destination port to bind to (0 allows socket to choose an appropriate port) - * \param dst The destination address as either the host name or the 'numbers-and-dots' - * IP address - * \param port_dst Destination port to connect to - * \param payload_size UDP payload size by default set to - * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param host The name or IP address of the receiving host; use + * NULL or None for no connection + * \param port Destination port to connect to on receiving host + * \param payload_size UDP payload size by default set to 1472 = + * (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param eof Send zero-length packet on disconnect */ class gr_udp_sink : public gr_sync_block { friend gr_udp_sink_sptr gr_make_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size); + const char *host, + unsigned short port, + int payload_size, bool eof); private: size_t d_itemsize; - int d_payload_size; // maximum transmission unit (packet length) - int d_socket; // handle to socket + int d_payload_size; // maximum transmission unit (packet length) + bool d_eof; // send zero-length packet on disconnect + int d_socket; // handle to socket + bool d_connected; // are we connected? + gruel::mutex d_mutex; // protects d_socket and d_connected protected: /*! * \brief UDP Sink Constructor * * \param itemsize The size (in bytes) of the item datatype - * \param src The source address as either the host name or the 'numbers-and-dots' - * IP address - * \param port_src Destination port to bind to (0 allows socket to choose an appropriate port) - * \param dst The destination address as either the host name or the 'numbers-and-dots' - * IP address - * \param port_dst Destination port to connect to + * \param host The name or IP address of the receiving host; use + * NULL or None for no connection + * \param port Destination port to connect to on receiving host * \param payload_size UDP payload size by default set to * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param eof Send zero-length packet on disconnect */ gr_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size); + const char *host, unsigned short port, + int payload_size, bool eof); public: ~gr_udp_sink (); @@ -95,6 +84,23 @@ class gr_udp_sink : public gr_sync_block /*! \brief return the PAYLOAD_SIZE of the socket */ int payload_size() { return d_payload_size; } + /*! \brief Change the connection to a new destination + * + * \param host The name or IP address of the receiving host; use + * NULL or None to break the connection without closing + * \param port Destination port to connect to on receiving host + * + * Calls disconnect() to terminate any current connection first. + */ + void connect( const char *host, unsigned short port ); + + /*! \brief Send zero-length packet (if eof is requested) then stop sending + * + * Zero-byte packets can be interpreted as EOF by gr_udp_source. Note that + * disconnect occurs automatically when the sink is destroyed, but not when + * its top_block stops.*/ + void disconnect(); + // should we export anything else? int work (int noutput_items, diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.i b/gnuradio-core/src/lib/io/gr_udp_sink.i old mode 100644 new mode 100755 index 0f37b477b..fc8059f36 --- a/gnuradio-core/src/lib/io/gr_udp_sink.i +++ b/gnuradio-core/src/lib/io/gr_udp_sink.i @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2007 Free Software Foundation, Inc. + * Copyright 2007,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -25,22 +25,21 @@ GR_SWIG_BLOCK_MAGIC(gr,udp_sink) gr_udp_sink_sptr gr_make_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size=1472); + const char *host, unsigned short port, + int payload_size=1472, bool eof=true); class gr_udp_sink : public gr_sync_block { protected: gr_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size); - - bool open(); - void close(); - int payload_size() { return d_payload_size; } + const char *host, unsigned short port, + int payload_size, bool eof); public: ~gr_udp_sink (); + + int payload_size() { return d_payload_size; } + void connect( const char *host, unsigned short port ); + void disconnect(); + }; diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index ce870d481..880388e5e 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -29,9 +29,19 @@ #include #include #include + #if defined(HAVE_NETDB_H) #include typedef void* optval_t; + +// ntohs() on FreeBSD may require both netinet/in.h and arpa/inet.h, in order +#if defined(HAVE_NETINET_IN_H) +#include +#endif +#if defined(HAVE_ARPA_INET_H) +#include +#endif + #elif defined(HAVE_WINDOWS_H) // if not posix, assume winsock #define USING_WINSOCK @@ -67,7 +77,7 @@ static int is_error( int perr ) #endif } -static void report_error( char *msg1, char *msg2 ) +static void report_error( const char *msg1, const char *msg2 ) { // Deal with errors, both posix and winsock #if defined(USING_WINSOCK) @@ -81,16 +91,16 @@ static void report_error( char *msg1, char *msg2 ) return; } -gr_udp_source::gr_udp_source(size_t itemsize, const char *src, - unsigned short port_src, int payload_size, - bool wait) +gr_udp_source::gr_udp_source(size_t itemsize, const char *host, + unsigned short port, int payload_size, + bool eof, bool wait) : gr_sync_block ("udp_source", gr_make_io_signature(0, 0, 0), gr_make_io_signature(1, 1, itemsize)), - d_itemsize(itemsize), d_payload_size(payload_size), d_wait(wait), d_residual(0), d_temp_offset(0) + d_itemsize(itemsize), d_payload_size(payload_size), + d_eof(eof), d_wait(wait), d_residual(0), d_temp_offset(0) { int ret = 0; - struct addrinfo *ip_src; // store the source IP address to use #if defined(USING_WINSOCK) // for Windows (with MinGW) // initialize winsock DLL @@ -103,14 +113,16 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src, // Set up the address stucture for the source address and port numbers // Get the source IP address from the host name + struct addrinfo *ip_src; // store the source IP address to use struct addrinfo hints; memset( (void*)&hints, 0, sizeof(hints) ); hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; hints.ai_protocol = IPPROTO_UDP; - char port_str[7]; - sprintf( port_str, "%d", port_src ); - ret = getaddrinfo( src, port_str, &hints, &ip_src ); + hints.ai_flags = AI_PASSIVE; + char port_str[12]; + sprintf( port_str, "%d", port ); + ret = getaddrinfo( host, port_str, &hints, &ip_src ); if( ret != 0 ) report_error("gr_udp_source/getaddrinfo", "can't initialize source socket" ); @@ -166,10 +178,10 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src, gr_udp_source_sptr gr_make_udp_source (size_t itemsize, const char *ipaddr, - unsigned short port, int payload_size, bool wait) + unsigned short port, int payload_size, bool eof, bool wait) { return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, - port, payload_size, wait)); + port, payload_size, eof, wait)); } gr_udp_source::~gr_udp_source () @@ -279,6 +291,22 @@ gr_udp_source::work (int noutput_items, return -1; } } + else if(r==0) { + if(d_eof) { + // zero-length packet interpreted as EOF + + #if SNK_VERBOSE + printf("\tzero-length packet received; returning EOF\n"); + #endif + + return -1; + } + else{ + // do we need to allow boost thread interrupt? + boost::this_thread::interruption_point(); + continue; + } + } else { // Calculate the number of bytes we can take from the buffer in this call nbytes = std::min(r, total_bytes-bytes_received); @@ -316,3 +344,15 @@ gr_udp_source::work (int noutput_items, return bytes_received/d_itemsize; } +// Return port number of d_socket +int gr_udp_source::get_port(void) +{ + sockaddr_in name; + socklen_t len = sizeof(name); + int ret = getsockname( d_socket, (sockaddr*)&name, &len ); + if( ret ) { + report_error("gr_udp_source/getsockname",NULL); + return -1; + } + return ntohs(name.sin_port); +} diff --git a/gnuradio-core/src/lib/io/gr_udp_source.h b/gnuradio-core/src/lib/io/gr_udp_source.h index b06536d6a..e23231aa7 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.h +++ b/gnuradio-core/src/lib/io/gr_udp_source.h @@ -24,49 +24,48 @@ #define INCLUDED_GR_UDP_SOURCE_H #include -#if defined(HAVE_NETDB_H) -#include -#include // usually #included by ? -#elif defined(HAVE_WINDOWS_H) -#include -#include -#endif - #include class gr_udp_source; typedef boost::shared_ptr gr_udp_source_sptr; -gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, - unsigned short port_src, - int payload_size=1472, bool wait=true); +gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *host, + unsigned short port, + int payload_size=1472, + bool eof=true, bool wait=true); /*! * \brief Read stream from an UDP socket. * \ingroup source_blk * * \param itemsize The size (in bytes) of the item datatype - * \param src The source address as either the host name or the 'numbers-and-dots' - * IP address - * \param port_src The port number on which the socket listens for data - * \param payload_size UDP payload size by default set to - * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) - * \param wait Wait for data if not immediately available (default: true) + * \param host The name or IP address of the receiving host; can be + * NULL, None, or "0.0.0.0" to allow reading from any + * interface on the host + * \param port The port number on which to receive data; use 0 to + * have the system assign an unused port number + * \param payload_size UDP payload size by default set to 1472 = + * (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param eof Interpret zero-length packet as EOF (default: true) + * \param wait Wait for data if not immediately available + * (default: true) * */ class gr_udp_source : public gr_sync_block { - friend gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, - unsigned short port_src, - int payload_size, bool wait); + friend gr_udp_source_sptr gr_make_udp_source(size_t itemsize, + const char *host, + unsigned short port, + int payload_size, + bool eof, bool wait); private: size_t d_itemsize; - - int d_payload_size; // maximum transmission unit (packet length) - bool d_wait; // wait if data if not immediately available - int d_socket; // handle to socket + int d_payload_size; // maximum transmission unit (packet length) + bool d_eof; // zero-length packet is EOF + bool d_wait; // wait if data if not immediately available + int d_socket; // handle to socket char *d_temp_buff; // hold buffer between calls ssize_t d_residual; // hold information about number of bytes stored in the temp buffer size_t d_temp_offset; // point to temp buffer location offset @@ -76,15 +75,19 @@ class gr_udp_source : public gr_sync_block * \brief UDP Source Constructor * * \param itemsize The size (in bytes) of the item datatype - * \param src The source address as either the host name or the 'numbers-and-dots' - * IP address - * \param port_src The port number on which the socket listens for data - * \param payload_size UDP payload size by default set to - * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) - * \param wait Wait for data if not immediately available (default: true) + * \param host The name or IP address of the receiving host; can be + * NULL, None, or "0.0.0.0" to allow reading from any + * interface on the host + * \param port The port number on which to receive data; use 0 to + * have the system assign an unused port number + * \param payload_size UDP payload size by default set to 1472 = + * (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param eof Interpret zero-length packet as EOF (default: true) + * \param wait Wait for data if not immediately available + * (default: true) */ - gr_udp_source(size_t itemsize, const char *src, unsigned short port_src, - int payload_size, bool wait); + gr_udp_source(size_t itemsize, const char *host, unsigned short port, + int payload_size, bool eof, bool wait); public: ~gr_udp_source(); @@ -92,6 +95,9 @@ class gr_udp_source : public gr_sync_block /*! \brief return the PAYLOAD_SIZE of the socket */ int payload_size() { return d_payload_size; } + /*! \breif return the port number of the socket */ + int get_port(); + // should we export anything else? int work(int noutput_items, diff --git a/gnuradio-core/src/lib/io/gr_udp_source.i b/gnuradio-core/src/lib/io/gr_udp_source.i index efaa57c27..e1b23074d 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.i +++ b/gnuradio-core/src/lib/io/gr_udp_source.i @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2007 Free Software Foundation, Inc. + * Copyright 2007,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -23,19 +23,19 @@ GR_SWIG_BLOCK_MAGIC(gr,udp_source) gr_udp_source_sptr -gr_make_udp_source (size_t itemsize, const char *src, - unsigned short port_src, int payload_size=1472, - bool wait=true); +gr_make_udp_source (size_t itemsize, const char *host, + unsigned short port, int payload_size=1472, + bool eof=true, bool wait=true); class gr_udp_source : public gr_sync_block { protected: - gr_udp_source (size_t itemsize, const char *src, - unsigned short port_src, int payload_size, bool wait); + gr_udp_source (size_t itemsize, const char *host, + unsigned short port, int payload_size, bool eof, bool wait); public: ~gr_udp_source (); int payload_size() { return d_payload_size; } - + int get_port(); }; diff --git a/gnuradio-core/src/python/gnuradio/gr/Makefile.am b/gnuradio-core/src/python/gnuradio/gr/Makefile.am old mode 100644 new mode 100755 index 3aff89ee7..74c46afb1 --- a/gnuradio-core/src/python/gnuradio/gr/Makefile.am +++ b/gnuradio-core/src/python/gnuradio/gr/Makefile.am @@ -97,4 +97,5 @@ noinst_PYTHON = \ qa_unpack_k_bits.py \ qa_repeat.py \ qa_scrambler.py \ + qa_udp_sink_source.py \ qa_vector_sink_source.py diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_udp_sink_source.py b/gnuradio-core/src/python/gnuradio/gr/qa_udp_sink_source.py new file mode 100755 index 000000000..e85d6eebf --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_udp_sink_source.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# +# Copyright 2008 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +from threading import Timer + +class test_sink_source(gr_unittest.TestCase): + + def setUp(self): + self.tb_snd = gr.top_block() + self.tb_rcv = gr.top_block() + + def tearDown(self): + self.tb_rcv = None + self.tb_snd = None + + def test_001(self): + port = 65500 + + n_data = 16 + src_data = [float(x) for x in range(n_data)] + expected_result = tuple(src_data) + src = gr.vector_source_f(src_data) + udp_snd = gr.udp_sink( gr.sizeof_float, 'localhost', port ) + self.tb_snd.connect( src, udp_snd ) + + udp_rcv = gr.udp_source( gr.sizeof_float, 'localhost', port ) + dst = gr.vector_sink_f() + self.tb_rcv.connect( udp_rcv, dst ) + + self.tb_rcv.start() + self.tb_snd.run() + udp_snd.disconnect() + self.timeout = False + q = Timer(3.0,self.stop_rcv) + q.start() + self.tb_rcv.wait() + q.cancel() + + result_data = dst.data() + self.assertEqual(expected_result, result_data) + self.assert_(not self.timeout) + + def test_002(self): + udp_rcv = gr.udp_source( gr.sizeof_float, '0.0.0.0', 0, eof=False ) + rcv_port = udp_rcv.get_port() + + udp_snd = gr.udp_sink( gr.sizeof_float, '127.0.0.1', 65500 ) + udp_snd.connect( 'localhost', rcv_port ) + + n_data = 16 + src_data = [float(x) for x in range(n_data)] + expected_result = tuple(src_data) + src = gr.vector_source_f(src_data) + dst = gr.vector_sink_f() + + self.tb_snd.connect( src, udp_snd ) + self.tb_rcv.connect( udp_rcv, dst ) + + self.tb_rcv.start() + self.tb_snd.run() + udp_snd.disconnect() + self.timeout = False + q = Timer(3.0,self.stop_rcv) + q.start() + self.tb_rcv.wait() + q.cancel() + + result_data = dst.data() + self.assertEqual(expected_result, result_data) + self.assert_(self.timeout) # source ignores EOF? + + def stop_rcv(self): + self.timeout = True + self.tb_rcv.stop() + #print "tb_rcv stopped by Timer" + +if __name__ == '__main__': + gr_unittest.main () + -- cgit From a61fc516f5deeef67b48a704c5426c3969d36248 Mon Sep 17 00:00:00 2001 From: Don Ward Date: Thu, 6 May 2010 10:02:35 -0400 Subject: Flush pending errors in gr_udp_sink on disconnect() On some systems (e.g., Debian/lenny) UDP errors are reported on the following send() or recv() call. To avoid having errors (such as ECONNREFUSED) from an old connection showing up on the first write to a new connection, we do a recv() on disconnect() to flush them. This may not work for all errors on all systems, but it works in some simple cases of interest. --- gnuradio-core/src/lib/io/gr_udp_sink.cc | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc index a9cb87a21..a323aef98 100755 --- a/gnuradio-core/src/lib/io/gr_udp_sink.cc +++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc @@ -251,6 +251,31 @@ void gr_udp_sink::disconnect() (void) send( d_socket, NULL, 0, 0 ); // ignore errors } + // Sending EOF can produce ERRCONNREFUSED errors that won't show up + // until the next send or recv, which might confuse us if it happens + // on a new connection. The following does a nonblocking recv to + // clear any such errors. + timeval timeout; + timeout.tv_sec = 0; // zero time for immediate return + timeout.tv_usec = 0; + fd_set readfds; + FD_ZERO(&readfds); + FD_SET(d_socket, &readfds); + int r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout); + if(r < 0) { + #if SNK_VERBOSE + report_error("udp_sink/select",NULL); + #endif + } + else if(r > 0) { // call recv() to get error return + r = recv(d_socket, (char*)&readfds, sizeof(readfds), 0); + if(r < 0) { + #if SNK_VERBOSE + report_error("udp_sink/recv",NULL); + #endif + } + } + // Since I can't find any way to disconnect a datagram socket in Cygwin, // we just leave it connected but disable sending. #if 0 -- cgit From 68ce6b4a9b90d4d53310c4271e6011b796fc1bff Mon Sep 17 00:00:00 2001 From: Eric Blossom Date: Wed, 19 May 2010 19:57:10 -0700 Subject: Add additional conditionalization of networking includes --- gnuradio-core/src/lib/io/gr_udp_sink.cc | 5 +++++ gnuradio-core/src/lib/io/gr_udp_source.cc | 6 ++++++ 2 files changed, 11 insertions(+) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc index a323aef98..73e6d0283 100755 --- a/gnuradio-core/src/lib/io/gr_udp_sink.cc +++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc @@ -31,7 +31,12 @@ #include #if defined(HAVE_NETDB_H) #include +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_SYS_SOCKET_H #include //usually included by ? +#endif typedef void* optval_t; #elif defined(HAVE_WINDOWS_H) // if not posix, assume winsock diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index 880388e5e..15f83892c 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -32,6 +32,12 @@ #if defined(HAVE_NETDB_H) #include +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_SYS_SOCKET_H +#include +#endif typedef void* optval_t; // ntohs() on FreeBSD may require both netinet/in.h and arpa/inet.h, in order -- cgit From 0058f55926c8e9f6ecb37f153ff7464b93838484 Mon Sep 17 00:00:00 2001 From: Eric Blossom Date: Wed, 19 May 2010 20:02:26 -0700 Subject: Use -1 as file descriptor "not open" value instead of 0 --- gnuradio-core/src/lib/io/gr_udp_sink.cc | 6 +++--- gnuradio-core/src/lib/io/gr_udp_source.cc | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc index 73e6d0283..2ee16b79f 100755 --- a/gnuradio-core/src/lib/io/gr_udp_sink.cc +++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc @@ -97,7 +97,7 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, gr_make_io_signature (1, 1, itemsize), gr_make_io_signature (0, 0, 0)), d_itemsize (itemsize), d_payload_size(payload_size), d_eof(eof), - d_connected(false) + d_socket(-1), d_connected(false) { #if defined(USING_WINSOCK) // for Windows (with MinGW) // initialize winsock DLL @@ -145,14 +145,14 @@ gr_udp_sink::~gr_udp_sink () if (d_connected) disconnect(); - if (d_socket){ + if (d_socket != -1){ shutdown(d_socket, SHUT_RDWR); #if defined(USING_WINSOCK) closesocket(d_socket); #else ::close(d_socket); #endif - d_socket = 0; + d_socket = -1; } #if defined(USING_WINSOCK) // for Windows (with MinGW) diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index 15f83892c..b1c25382c 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -104,7 +104,7 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *host, gr_make_io_signature(0, 0, 0), gr_make_io_signature(1, 1, itemsize)), d_itemsize(itemsize), d_payload_size(payload_size), - d_eof(eof), d_wait(wait), d_residual(0), d_temp_offset(0) + d_eof(eof), d_wait(wait), d_socket(-1), d_residual(0), d_temp_offset(0) { int ret = 0; @@ -194,14 +194,14 @@ gr_udp_source::~gr_udp_source () { delete [] d_temp_buff; - if (d_socket){ + if (d_socket != -1){ shutdown(d_socket, SHUT_RDWR); #if defined(USING_WINSOCK) closesocket(d_socket); #else ::close(d_socket); #endif - d_socket = 0; + d_socket = -1; } #if defined(USING_WINSOCK) // for Windows (with MinGW) -- cgit From 99a39a4351ca250dcbeeface0ab7b9de6e301d49 Mon Sep 17 00:00:00 2001 From: Eric Blossom Date: Wed, 19 May 2010 20:06:26 -0700 Subject: Identify memory leaks that occur on error conditions --- gnuradio-core/src/lib/io/gr_udp_sink.cc | 2 ++ gnuradio-core/src/lib/io/gr_udp_source.cc | 3 +++ 2 files changed, 5 insertions(+) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc index 2ee16b79f..3084a848b 100755 --- a/gnuradio-core/src/lib/io/gr_udp_sink.cc +++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc @@ -221,6 +221,8 @@ void gr_udp_sink::connect( const char *host, unsigned short port ) hints.ai_protocol = IPPROTO_UDP; char port_str[12]; sprintf( port_str, "%d", port ); + + // FIXME leaks if report_error throws below int ret = getaddrinfo( host, port_str, &hints, &ip_dst ); if( ret != 0 ) report_error("gr_udp_source/getaddrinfo", diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index b1c25382c..063cd7b50 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -128,11 +128,14 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *host, hints.ai_flags = AI_PASSIVE; char port_str[12]; sprintf( port_str, "%d", port ); + + // FIXME leaks if report_error throws below ret = getaddrinfo( host, port_str, &hints, &ip_src ); if( ret != 0 ) report_error("gr_udp_source/getaddrinfo", "can't initialize source socket" ); + // FIXME leaks if report_error throws below d_temp_buff = new char[d_payload_size]; // allow it to hold up to payload_size bytes // create socket -- cgit From 1e963cd9b6e2a1687bbff86df66c5efbcb2be363 Mon Sep 17 00:00:00 2001 From: Eric Blossom Date: Wed, 19 May 2010 20:11:35 -0700 Subject: Correct update of d_temp_offset (parallel construction) --- gnuradio-core/src/lib/io/gr_udp_source.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index 063cd7b50..cc9849280 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -242,7 +242,8 @@ gr_udp_source::work (int noutput_items, // Update indexing of amount of bytes left in the buffer d_residual -= nbytes; - d_temp_offset = d_temp_offset+d_residual; + d_temp_offset += nbytes; + // FIXME? Returning here could simplify life... } #if USE_SELECT -- cgit From 34e0be1f96cb24e302269c008444bc18e418b653 Mon Sep 17 00:00:00 2001 From: Eric Blossom Date: Wed, 19 May 2010 20:15:41 -0700 Subject: Move initialization of select timeout --- gnuradio-core/src/lib/io/gr_udp_source.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index cc9849280..f8727c4dc 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -250,8 +250,6 @@ gr_udp_source::work (int noutput_items, // Use select() to determine when socket is readable fd_set readfds; timeval timeout; - timeout.tv_sec = 1; - timeout.tv_usec = 0; #endif while(1) { @@ -260,6 +258,8 @@ gr_udp_source::work (int noutput_items, #if USE_SELECT // RCV_TIMEO doesn't work on all systems (e.g., Cygwin) // use select() instead of, or in addition to RCV_TIMEO + timeout.tv_sec = 1; // Init timeout each iteration. Select can modify it. + timeout.tv_usec = 0; FD_ZERO(&readfds); FD_SET(d_socket, &readfds); r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout); -- cgit From 4267b714f4276671f718136a1279f681a4231aee Mon Sep 17 00:00:00 2001 From: Eric Blossom Date: Wed, 19 May 2010 20:27:10 -0700 Subject: Defend against a peer that sends an invalid message length. --- gnuradio-core/src/lib/io/gr_udp_source.cc | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index f8727c4dc..1197a0c43 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -281,6 +281,11 @@ gr_udp_source::work (int noutput_items, // This is a non-blocking call with a timeout set in the constructor r = recv(d_socket, d_temp_buff, d_payload_size, 0); // get the entire payload or the what's available + // If r > 0, round it down to a multiple of d_itemsize + // (If sender is broken, don't propagate problem) + if (r > 0) + r = (r/d_itemsize) * d_itemsize; + // Check if there was a problem; forget it if the operation just timed out if(r == -1) { if( is_error(EAGAIN) ) { // handle non-blocking call timeout -- cgit From 91054ed9fe7f08cac9738a6a6af5a9ad476ba24b Mon Sep 17 00:00:00 2001 From: Eric Blossom Date: Wed, 19 May 2010 20:38:17 -0700 Subject: Return immediately when using d_residual. (Otherwise recv may overwrite valid data in d_temp_buff.) --- gnuradio-core/src/lib/io/gr_udp_source.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index 1197a0c43..da5b8a191 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -243,7 +243,10 @@ gr_udp_source::work (int noutput_items, // Update indexing of amount of bytes left in the buffer d_residual -= nbytes; d_temp_offset += nbytes; - // FIXME? Returning here could simplify life... + + // Return now with what we've got. + assert(nbytes % d_itemsize == 0); + return nbytes/d_itemsize; } #if USE_SELECT -- cgit From 26185e9077e5e20f71fb515c0e847a5dfd57986c Mon Sep 17 00:00:00 2001 From: Eric Blossom Date: Wed, 19 May 2010 20:43:48 -0700 Subject: Simplify USE_SELECT usage --- gnuradio-core/src/lib/io/gr_udp_source.cc | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index da5b8a191..fea9a26ba 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -249,18 +249,14 @@ gr_udp_source::work (int noutput_items, return nbytes/d_itemsize; } -#if USE_SELECT - // Use select() to determine when socket is readable - fd_set readfds; - timeval timeout; -#endif - while(1) { // get the data into our output buffer and record the number of bytes #if USE_SELECT // RCV_TIMEO doesn't work on all systems (e.g., Cygwin) // use select() instead of, or in addition to RCV_TIMEO + fd_set readfds; + timeval timeout; timeout.tv_sec = 1; // Init timeout each iteration. Select can modify it. timeout.tv_usec = 0; FD_ZERO(&readfds); -- cgit From 625a12540792512c968a6f45418a694a5f6aef76 Mon Sep 17 00:00:00 2001 From: Johnathan Corgan Date: Fri, 21 May 2010 17:23:06 -0700 Subject: gnuradio-core: allow swig to handle exceptions in UDP source/sink --- gnuradio-core/src/lib/io/gr_udp_sink.i | 5 +++-- gnuradio-core/src/lib/io/gr_udp_source.i | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.i b/gnuradio-core/src/lib/io/gr_udp_sink.i index fc8059f36..a71006ae0 100755 --- a/gnuradio-core/src/lib/io/gr_udp_sink.i +++ b/gnuradio-core/src/lib/io/gr_udp_sink.i @@ -26,14 +26,15 @@ GR_SWIG_BLOCK_MAGIC(gr,udp_sink) gr_udp_sink_sptr gr_make_udp_sink (size_t itemsize, const char *host, unsigned short port, - int payload_size=1472, bool eof=true); + int payload_size=1472, bool eof=true) throw (std::runtime_error); class gr_udp_sink : public gr_sync_block { protected: gr_udp_sink (size_t itemsize, const char *host, unsigned short port, - int payload_size, bool eof); + int payload_size, bool eof) + throw (std::runtime_error); public: ~gr_udp_sink (); diff --git a/gnuradio-core/src/lib/io/gr_udp_source.i b/gnuradio-core/src/lib/io/gr_udp_source.i index e1b23074d..2001f33e9 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.i +++ b/gnuradio-core/src/lib/io/gr_udp_source.i @@ -25,13 +25,13 @@ GR_SWIG_BLOCK_MAGIC(gr,udp_source) gr_udp_source_sptr gr_make_udp_source (size_t itemsize, const char *host, unsigned short port, int payload_size=1472, - bool eof=true, bool wait=true); + bool eof=true, bool wait=true) throw (std::runtime_error); class gr_udp_source : public gr_sync_block { protected: gr_udp_source (size_t itemsize, const char *host, - unsigned short port, int payload_size, bool eof, bool wait); + unsigned short port, int payload_size, bool eof, bool wait) throw (std::runtime_error); public: ~gr_udp_source (); -- cgit From b32e803b1bee283033c976a4656bc0af4fe9461f Mon Sep 17 00:00:00 2001 From: Johnathan Corgan Date: Fri, 21 May 2010 17:28:02 -0700 Subject: gnuradio-core: update copyrights --- gnuradio-core/src/python/gnuradio/gr/Makefile.am | 2 +- gnuradio-core/src/python/gnuradio/gr/qa_udp_sink_source.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'gnuradio-core') diff --git a/gnuradio-core/src/python/gnuradio/gr/Makefile.am b/gnuradio-core/src/python/gnuradio/gr/Makefile.am index 74c46afb1..341f58812 100755 --- a/gnuradio-core/src/python/gnuradio/gr/Makefile.am +++ b/gnuradio-core/src/python/gnuradio/gr/Makefile.am @@ -1,5 +1,5 @@ # -# Copyright 2004,2005,2006,2008 Free Software Foundation, Inc. +# Copyright 2004,2005,2006,2008,2010 Free Software Foundation, Inc. # # This file is part of GNU Radio # diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_udp_sink_source.py b/gnuradio-core/src/python/gnuradio/gr/qa_udp_sink_source.py index e85d6eebf..b00b26bbe 100755 --- a/gnuradio-core/src/python/gnuradio/gr/qa_udp_sink_source.py +++ b/gnuradio-core/src/python/gnuradio/gr/qa_udp_sink_source.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2008 Free Software Foundation, Inc. +# Copyright 2008,2010 Free Software Foundation, Inc. # # This file is part of GNU Radio # -- cgit