summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--vrt/lib/socket_rx_buffer.cc228
-rw-r--r--vrt/lib/socket_rx_buffer.h28
2 files changed, 52 insertions, 204 deletions
diff --git a/vrt/lib/socket_rx_buffer.cc b/vrt/lib/socket_rx_buffer.cc
index 6ed211b9a..9c3dde177 100644
--- a/vrt/lib/socket_rx_buffer.cc
+++ b/vrt/lib/socket_rx_buffer.cc
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2008,2009 Free Software Foundation, Inc.
+ * Copyright 2008,2009,2010 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -25,16 +25,12 @@
#include "socket_rx_buffer.h"
#include "data_handler.h"
-#include <linux/if_packet.h>
+#include <sys/types.h>
#include <sys/socket.h>
-#include <sys/mman.h>
-#include <sys/poll.h>
-#include <iostream>
-#include <cmath>
#include <errno.h>
-#include <stdexcept>
-#include <string.h>
#include <fcntl.h>
+#include <iostream>
+#include <stdexcept>
#include <cstdio>
@@ -47,7 +43,6 @@
#define DEFAULT_MEM_SIZE 62.5e6 // ~0.5s @ 125 MB/s
#define MAX_MEM_SIZE 1000e6 // ~10.00s @ 100 MB/s.
-#define MAX_SLAB_SIZE 131072 // 128 KB (FIXME fish out of /proc/slabinfo)
namespace vrt {
@@ -56,15 +51,14 @@ namespace vrt {
const unsigned int socket_rx_buffer::MIN_PKTLEN = 64;
socket_rx_buffer::socket_rx_buffer(int socket_fd, size_t rx_bufsize)
- : d_fd(socket_fd), d_using_tpring(false), d_buflen(0), d_buf(0), d_frame_nr(0),
- d_frame_size(0), d_head(0), d_ring(0)
+ : d_fd(socket_fd)
{
if (rx_bufsize == 0)
- d_buflen = (size_t)DEFAULT_MEM_SIZE;
+ rx_bufsize = (size_t)DEFAULT_MEM_SIZE;
else
- d_buflen = std::min((size_t)MAX_MEM_SIZE, rx_bufsize);
+ rx_bufsize = std::min((size_t)MAX_MEM_SIZE, rx_bufsize);
- if (!open()){
+ if (!open(rx_bufsize)){
throw std::runtime_error("socket_rx_buffer::open failed");
}
}
@@ -75,88 +69,34 @@ namespace vrt {
}
bool
- socket_rx_buffer::open()
+ socket_rx_buffer::open(size_t buflen)
{
- if (try_packet_ring()){
- d_using_tpring = true;
- // fprintf(stderr, "socket_rx_buffer: using memory mapped interface\n");
- }
- else {
- d_using_tpring = false;
- // fprintf(stderr, "socket_rx_buffer: NOT using memory mapped interface\n");
+ // Increase socket buffer if possible
- // Increase socket buffer if possible
+ int rcvbuf_size = buflen;
- int rcvbuf_size = d_buflen;
#if defined(SO_RCVBUFFORCE)
- if (setsockopt(d_fd, SOL_SOCKET, SO_RCVBUFFORCE, &rcvbuf_size, sizeof(rcvbuf_size)) != 0){
- perror("setsockopt(SO_RCVBUFFORCE)");
- fprintf(stderr, "Are you running as root? If not, please do.\n");
- }
- else {
- fprintf(stderr, "SO_RCVBUFFORCE = %zd\n", d_buflen);
- }
-#endif
+ // If we've got CAP_NET_ADMIN or root, this will allow the
+ // rmem_max limit to be overridden
+ if (setsockopt(d_fd, SOL_SOCKET, SO_RCVBUFFORCE,
+ &rcvbuf_size, sizeof(rcvbuf_size)) != 0){
+ perror("setsockopt(SO_RCVBUFFORCE)");
+ }
+ else {
+ fprintf(stderr, "SO_RCVBUFFORCE = %zd\n", buflen);
}
-
- return true;
- }
-
- bool
- socket_rx_buffer::try_packet_ring()
- {
- struct tpacket_req req;
- size_t page_size = getpagesize();
-
- // Calculate minimum power-of-two aligned size for frames
- req.tp_frame_size =
- (unsigned int)rint(pow(2, ceil(log2(TPACKET_ALIGN(TPACKET_HDRLEN)+TPACKET_ALIGN(MAX_PKTLEN)))));
- d_frame_size = req.tp_frame_size;
-
- // Calculate minimum contiguous pages needed to enclose a frame
- int npages = (page_size > req.tp_frame_size) ? 1 : ((req.tp_frame_size+page_size-1)/page_size);
- req.tp_block_size = page_size << (int)ceil(log2(npages));
-
- // Calculate number of blocks
- req.tp_block_nr = (int)(d_buflen/req.tp_block_size);
-
-
- // Recalculate buffer length
- d_buflen = req.tp_block_nr*req.tp_block_size;
-
- // Finally, calculate total number of frames. Since frames, blocks,
- // and pages are all power-of-two aligned, frames are contiguous
- req.tp_frame_nr = d_buflen/req.tp_frame_size;
- d_frame_nr = req.tp_frame_nr;
-
-#if 0
- if (SOCKET_RX_BUFFER_DEBUG)
- std::cerr << "socket_rx_buffer:"
- << " frame_size=" << req.tp_frame_size
- << " block_size=" << req.tp_block_size
- << " block_nr=" << req.tp_block_nr
- << " frame_nr=" << req.tp_frame_nr
- << " buflen=" << d_buflen
- << std::endl;
#endif
- // Try to get kernel shared memory buffer
- if (setsockopt(d_fd, SOL_PACKET, PACKET_RX_RING, (void *)&req, sizeof(req)) != 0){
- // perror("socket_rx_buffer: setsockopt");
- return false;
+ if (setsockopt(d_fd, SOL_SOCKET, SO_RCVBUF,
+ &rcvbuf_size, sizeof(rcvbuf_size)) != 0){
+ perror("setsockopt(SO_RCVBUF)");
+ fprintf(stderr,
+ "FIXME: message about configuring /proc/sys/net/core/rmem_max to %zd\n",
+ buflen);
}
-
- void *p = mmap(0, d_buflen, PROT_READ|PROT_WRITE, MAP_SHARED, d_fd, 0);
- if (p == MAP_FAILED){
- perror("socket_rx_buffer: mmap");
- return false;
+ else {
+ fprintf(stderr, "SO_RCVBUF = %zd\n", buflen);
}
- d_buf = (uint8_t *) p;
-
- // Initialize our pointers into the packet ring
- d_ring.resize(req.tp_frame_nr);
- for (unsigned int i=0; i < req.tp_frame_nr; i++)
- d_ring[i] = (uint8_t *)(d_buf+i*req.tp_frame_size);
return true;
}
@@ -167,111 +107,43 @@ namespace vrt {
return true;
}
- inline bool
- socket_rx_buffer::frame_available()
- {
- return (((tpacket_hdr *)d_ring[d_head])->tp_status != TP_STATUS_KERNEL);
- }
-
socket_rx_buffer::result
socket_rx_buffer::rx_frames(data_handler *f, int timeout_in_ms)
{
- if (!d_using_tpring){
-
- // ----------------------------------------------------------------
- // Use recv instead of kernel Rx packet ring
- // ----------------------------------------------------------------
-
- unsigned char buf[MAX_PKTLEN];
- bool dont_wait = timeout_in_ms == 0; // FIXME treating timeout as 0 or inf
- int flags = dont_wait ? MSG_DONTWAIT : 0;
-
- ssize_t rr = recv(d_fd, buf, sizeof(buf), flags);
- if (rr == -1){ // error?
- if (errno == EAGAIN){ // non-blocking, nothing there
- return EB_WOULD_BLOCK;
- }
- perror("rx_frames: recv");
- return EB_ERROR;
- }
-
- // Got first packet. Call handler
-
- data_handler::result r = (*f)(buf, rr);
- if (r & data_handler::DONE)
- return EB_OK;
-
- // Now do as many as we can without blocking
-
- while (1){
- rr = recv(d_fd, buf, sizeof(buf), MSG_DONTWAIT);
- if (rr == -1){ // error?
- if (errno == EAGAIN) // non-blocking, nothing there
- return EB_OK; // return OK; we've processed >= 1 packets
- perror("rx_frames: recv");
- return EB_ERROR;
- }
-
- r = (*f)(buf, rr);
- if (r & data_handler::DONE)
- break;
+ unsigned char buf[MAX_PKTLEN];
+ bool dont_wait = timeout_in_ms == 0; // FIXME treating timeout as 0 or inf
+ int flags = dont_wait ? MSG_DONTWAIT : 0;
+
+ ssize_t rr = recv(d_fd, buf, sizeof(buf), flags);
+ if (rr == -1){ // error?
+ if (errno == EAGAIN){ // non-blocking, nothing there
+ return EB_WOULD_BLOCK;
}
- return EB_OK;
+ perror("rx_frames: recv");
+ return EB_ERROR;
}
- // ----------------------------------------------------------------
- // Use kernel Rx packet ring
- // ----------------------------------------------------------------
+ // Got first packet. Call handler
- DEBUG_LOG("\n");
-
- while (!frame_available()) {
- if (timeout_in_ms == 0) {
- DEBUG_LOG("w");
- return EB_WOULD_BLOCK;
- }
-
- struct pollfd pfd;
- pfd.fd = d_fd;
- pfd.revents = 0;
- pfd.events = POLLIN;
+ data_handler::result r = (*f)(buf, rr);
+ if (r & data_handler::DONE)
+ return EB_OK;
- // DEBUG_LOG("P");
+ // Now do as many as we can without blocking
- int pres = poll(&pfd, 1, timeout_in_ms);
- if (pres == -1) {
- perror("poll");
+ while (1){
+ rr = recv(d_fd, buf, sizeof(buf), MSG_DONTWAIT);
+ if (rr == -1){ // error?
+ if (errno == EAGAIN) // non-blocking, nothing there
+ return EB_OK; // return OK; we've processed >= 1 packets
+ perror("rx_frames: recv");
return EB_ERROR;
}
- if (pres == 0) {
- DEBUG_LOG("t");
- return EB_TIMED_OUT;
- }
- }
-
- // Iterate through available packets
- while (frame_available()) {
- // Get start of ethernet frame and length
- tpacket_hdr *hdr = (tpacket_hdr *)d_ring[d_head];
- void *base = (uint8_t *)hdr+hdr->tp_mac;
- size_t len = hdr->tp_len;
-
- if (1)
- fprintf(stderr, "socket_rx_buffer: base = %p tp_mac = %3d tp_net = %3d\n",
- base, hdr->tp_mac, hdr->tp_net);
-
- // Invoke data handler
- data_handler::result r = (*f)(base, len);
- hdr->tp_status = TP_STATUS_KERNEL; // mark it free
-
- inc_head();
-
+ r = (*f)(buf, rr);
if (r & data_handler::DONE)
- break;
+ break;
}
-
- DEBUG_LOG("|");
return EB_OK;
}
diff --git a/vrt/lib/socket_rx_buffer.h b/vrt/lib/socket_rx_buffer.h
index 053c30c12..36c18c1a5 100644
--- a/vrt/lib/socket_rx_buffer.h
+++ b/vrt/lib/socket_rx_buffer.h
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2008,2009 Free Software Foundation, Inc.
+ * Copyright 2008,2009,2010 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -41,28 +41,9 @@ namespace vrt {
{
int d_fd; // socket file descriptor
- bool d_using_tpring; // using kernel mapped packet ring
- size_t d_buflen; // length of our buffer
- uint8_t *d_buf; // packet ring
- unsigned int d_frame_nr; // max frames on ring
- size_t d_frame_size; // frame storage size
- unsigned int d_head; // pointer to next frame
- std::vector<uint8_t *> d_ring; // pointers into buffer
-
- bool frame_available();
-
- void inc_head()
- {
- if (d_head + 1 >= d_frame_nr)
- d_head = 0;
- else
- d_head = d_head + 1;
- }
-
- bool open();
+ bool open(size_t buflen);
bool close();
- bool try_packet_ring();
public:
@@ -110,11 +91,6 @@ namespace vrt {
* \returns EB_ERROR if there was an unrecoverable error.
*/
result rx_frames(data_handler *f, int timeout=-1);
-
- /*
- * \brief Returns maximum possible number of frames in buffer
- */
- unsigned int max_frames() const { return d_using_tpring ? d_frame_nr : 0; }
};
}; // namespace vrt