summaryrefslogtreecommitdiff
path: root/gnuradio-core/src
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-core/src')
-rw-r--r--gnuradio-core/src/lib/io/CMakeLists.txt4
-rw-r--r--gnuradio-core/src/lib/io/gr_message_debug.cc34
-rw-r--r--gnuradio-core/src/lib/io/gr_message_debug.h1
-rw-r--r--gnuradio-core/src/lib/io/gr_pdu.cc70
-rw-r--r--gnuradio-core/src/lib/io/gr_pdu.h1
-rw-r--r--gnuradio-core/src/lib/io/gr_socket_pdu.cc157
-rw-r--r--gnuradio-core/src/lib/io/gr_socket_pdu.h203
-rw-r--r--gnuradio-core/src/lib/io/gr_socket_pdu.i33
-rw-r--r--gnuradio-core/src/lib/io/gr_stream_pdu_base.cc117
-rw-r--r--gnuradio-core/src/lib/io/gr_stream_pdu_base.h62
-rw-r--r--gnuradio-core/src/lib/io/gr_tuntap_pdu.cc143
-rw-r--r--gnuradio-core/src/lib/io/gr_tuntap_pdu.h66
-rw-r--r--gnuradio-core/src/lib/io/gr_tuntap_pdu.i30
-rw-r--r--gnuradio-core/src/lib/io/io.i5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.cc41
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h406
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc34
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h4
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flowgraph.cc40
-rw-r--r--gnuradio-core/src/lib/runtime/gr_flowgraph.h61
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2.cc5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2.h33
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2.i6
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc90
-rw-r--r--gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h1
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/hier_block2.py13
26 files changed, 1392 insertions, 268 deletions
diff --git a/gnuradio-core/src/lib/io/CMakeLists.txt b/gnuradio-core/src/lib/io/CMakeLists.txt
index 7041f2820..59ca06b5a 100644
--- a/gnuradio-core/src/lib/io/CMakeLists.txt
+++ b/gnuradio-core/src/lib/io/CMakeLists.txt
@@ -39,6 +39,7 @@ list(APPEND gnuradio_core_sources
${CMAKE_CURRENT_SOURCE_DIR}/ppio_ppdev.cc
${CMAKE_CURRENT_SOURCE_DIR}/gri_wavfile.cc
${CMAKE_CURRENT_SOURCE_DIR}/gr_pdu.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/gr_stream_pdu_base.cc
)
########################################################################
@@ -61,6 +62,7 @@ install(FILES
${CMAKE_CURRENT_SOURCE_DIR}/ppio_ppdev.h
${CMAKE_CURRENT_SOURCE_DIR}/gri_wavfile.h
${CMAKE_CURRENT_SOURCE_DIR}/gr_pdu.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/gr_stream_pdu_base.h
DESTINATION ${GR_INCLUDE_DIR}/gnuradio
COMPONENT "core_devel"
)
@@ -103,6 +105,8 @@ set(gr_core_io_triple_threats
gr_wavfile_sink
gr_tagged_file_sink
gr_tagged_stream_to_pdu
+ gr_tuntap_pdu
+ gr_socket_pdu
)
foreach(file_tt ${gr_core_io_triple_threats})
diff --git a/gnuradio-core/src/lib/io/gr_message_debug.cc b/gnuradio-core/src/lib/io/gr_message_debug.cc
index 7d28ff18e..27f4c65fd 100644
--- a/gnuradio-core/src/lib/io/gr_message_debug.cc
+++ b/gnuradio-core/src/lib/io/gr_message_debug.cc
@@ -58,6 +58,30 @@ gr_message_debug::store(pmt::pmt_t msg)
d_messages.push_back(msg);
}
+void
+gr_message_debug::print_verbose(pmt::pmt_t msg)
+{
+ pmt::pmt_t meta = pmt::pmt_car(msg);
+ pmt::pmt_t vector = pmt::pmt_cdr(msg);
+ std::cout << "* MESSAGE DEBUG PRINT PDU VERBOSE *\n";
+ pmt::pmt_print(meta);
+ size_t len = pmt::pmt_length(vector);
+ std::cout << "pdu_length = " << len << std::endl;
+ std::cout << "contents = " << std::endl;
+ size_t offset(0);
+ const uint8_t* d = (const uint8_t*) pmt_uniform_vector_elements(vector, offset);
+ for(size_t i=0; i<len; i+=16){
+ printf("%04x: ", i);
+ for(size_t j=i; j<std::min(i+16,len); j++){
+ printf("%02x ",d[j] );
+ }
+
+ std::cout << std::endl;
+ }
+
+ std::cout << "***********************************\n";
+}
+
int
gr_message_debug::num_messages()
{
@@ -81,11 +105,11 @@ gr_message_debug::gr_message_debug()
gr_make_io_signature(0, 0, 0),
gr_make_io_signature(0, 0, 0))
{
- message_port_register_in(pmt::mp("print"));
- set_msg_handler(pmt::mp("print"), boost::bind(&gr_message_debug::print, this, _1));
-
- message_port_register_in(pmt::mp("store"));
- set_msg_handler(pmt::mp("store"), boost::bind(&gr_message_debug::store, this, _1));
+ message_port_register_in(pmt::mp("print"));
+ set_msg_handler(pmt::mp("print"), boost::bind(&gr_message_debug::print, this, _1));
+
+ message_port_register_in(pmt::mp("store"));
+ set_msg_handler(pmt::mp("store"), boost::bind(&gr_message_debug::store, this, _1));
}
gr_message_debug::~gr_message_debug()
diff --git a/gnuradio-core/src/lib/io/gr_message_debug.h b/gnuradio-core/src/lib/io/gr_message_debug.h
index 1ffef1b02..6e6e5103c 100644
--- a/gnuradio-core/src/lib/io/gr_message_debug.h
+++ b/gnuradio-core/src/lib/io/gr_message_debug.h
@@ -55,6 +55,7 @@ class GR_CORE_API gr_message_debug : public gr_block
* \param msg A pmt message passed from the scheduler's message handling.
*/
void print(pmt::pmt_t msg);
+ void print_verbose(pmt::pmt_t msg);
/*!
* \brief Messages received in this port are stored in a vector.
diff --git a/gnuradio-core/src/lib/io/gr_pdu.cc b/gnuradio-core/src/lib/io/gr_pdu.cc
index f33eed0a3..b2757c307 100644
--- a/gnuradio-core/src/lib/io/gr_pdu.cc
+++ b/gnuradio-core/src/lib/io/gr_pdu.cc
@@ -28,42 +28,52 @@
size_t
gr_pdu_itemsize(gr_pdu_vector_type type){
- switch(type){
- case BYTE:
- return 1;
- case FLOAT:
- return sizeof(float);
- case COMPLEX:
- return sizeof(gr_complex);
- default:
- throw std::runtime_error("bad type!");
- }
+ switch(type){
+ case BYTE:
+ return 1;
+ case FLOAT:
+ return sizeof(float);
+ case COMPLEX:
+ return sizeof(gr_complex);
+ default:
+ throw std::runtime_error("bad type!");
+ }
}
bool
gr_pdu_type_matches(gr_pdu_vector_type type, pmt::pmt_t v){
- switch(type){
- case BYTE:
- return pmt::pmt_is_u8vector(v);
- case FLOAT:
- return pmt::pmt_is_f32vector(v);
- case COMPLEX:
- return pmt::pmt_is_c32vector(v);
- default:
- throw std::runtime_error("bad type!");
- }
+ switch(type){
+ case BYTE:
+ return pmt::pmt_is_u8vector(v);
+ case FLOAT:
+ return pmt::pmt_is_f32vector(v);
+ case COMPLEX:
+ return pmt::pmt_is_c32vector(v);
+ default:
+ throw std::runtime_error("bad type!");
+ }
}
pmt::pmt_t
gr_pdu_make_vector(gr_pdu_vector_type type, const uint8_t* buf, size_t items){
- switch(type){
- case BYTE:
- return pmt::pmt_init_u8vector(items, buf);
- case FLOAT:
- return pmt::pmt_init_f32vector(items, (const float*)buf);
- case COMPLEX:
- return pmt::pmt_init_c32vector(items, (const gr_complex*)buf);
- default:
- throw std::runtime_error("bad type!");
- }
+ switch(type){
+ case BYTE:
+ return pmt::pmt_init_u8vector(items, buf);
+ case FLOAT:
+ return pmt::pmt_init_f32vector(items, (const float*)buf);
+ case COMPLEX:
+ return pmt::pmt_init_c32vector(items, (const gr_complex*)buf);
+ default:
+ throw std::runtime_error("bad type!");
+ }
+}
+
+gr_pdu_vector_type type_from_pmt(pmt::pmt_t vector){
+ if(pmt_is_u8vector(vector))
+ return BYTE;
+ if(pmt_is_f32vector(vector))
+ return FLOAT;
+ if(pmt_is_c32vector(vector))
+ return COMPLEX;
+ throw std::runtime_error("bad type!");
}
diff --git a/gnuradio-core/src/lib/io/gr_pdu.h b/gnuradio-core/src/lib/io/gr_pdu.h
index 67519c89d..5ed9cdded 100644
--- a/gnuradio-core/src/lib/io/gr_pdu.h
+++ b/gnuradio-core/src/lib/io/gr_pdu.h
@@ -34,5 +34,6 @@ enum gr_pdu_vector_type { BYTE, FLOAT, COMPLEX };
size_t gr_pdu_itemsize(gr_pdu_vector_type type);
bool gr_pdu_type_matches(gr_pdu_vector_type type, pmt::pmt_t v);
pmt::pmt_t gr_pdu_make_vector(gr_pdu_vector_type type, const uint8_t* buf, size_t items);
+gr_pdu_vector_type type_from_pmt(pmt::pmt_t vector);
#endif
diff --git a/gnuradio-core/src/lib/io/gr_socket_pdu.cc b/gnuradio-core/src/lib/io/gr_socket_pdu.cc
new file mode 100644
index 000000000..bb374b300
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_socket_pdu.cc
@@ -0,0 +1,157 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_socket_pdu.h>
+#include <gr_io_signature.h>
+#include <cstdio>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdexcept>
+#include <string.h>
+#include <iostream>
+#include <gr_pdu.h>
+#include <boost/format.hpp>
+
+// public constructor that returns a shared_ptr
+gr_socket_pdu_sptr
+gr_make_socket_pdu (std::string type, std::string addr, std::string port, int MTU)
+{
+ return gnuradio::get_initial_sptr(new gr_socket_pdu(type,addr,port,MTU));
+}
+
+gr_socket_pdu::gr_socket_pdu (std::string type, std::string addr, std::string port, int MTU)
+ : gr_stream_pdu_base(MTU)
+{
+
+ if( (type == "TCP_SERVER") || (type == "TCP_CLIENT")){
+ boost::asio::ip::tcp::resolver resolver(_io_service);
+ boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), addr, port);
+ _tcp_endpoint = *resolver.resolve(query);
+ }
+ if( (type == "UDP_SERVER") || (type == "UDP_CLIENT")){
+ boost::asio::ip::udp::resolver resolver(_io_service);
+ boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);
+ if( (type == "UDP_SERVER") ){
+ _udp_endpoint = *resolver.resolve(query);
+ } else {
+ _udp_endpoint_other = *resolver.resolve(query);
+ }
+ }
+
+ // register ports
+ message_port_register_out(pmt::mp("pdus"));
+ message_port_register_in(pmt::mp("pdus"));
+
+ // set up socketry
+ if (type == "TCP_SERVER"){
+ _acceptor_tcp.reset(new boost::asio::ip::tcp::acceptor(_io_service, _tcp_endpoint));
+ _acceptor_tcp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
+ start_tcp_accept();
+ // bind tcp server send handler
+ set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_socket_pdu::tcp_server_send, this, _1));
+ } else if(type =="TCP_CLIENT"){
+ boost::system::error_code error = boost::asio::error::host_not_found;
+ _tcp_socket.reset(new boost::asio::ip::tcp::socket(_io_service));
+ _tcp_socket->connect(_tcp_endpoint, error);
+ if(error){
+ throw boost::system::system_error(error);
+ }
+ set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_socket_pdu::tcp_client_send, this, _1));
+ _tcp_socket->async_read_some(
+ boost::asio::buffer(rxbuf),
+ boost::bind(&gr_socket_pdu::handle_tcp_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
+
+ } else if(type =="UDP_SERVER"){
+ _udp_socket.reset(new boost::asio::ip::udp::socket(_io_service, _udp_endpoint));
+ _udp_socket->async_receive_from( boost::asio::buffer(rxbuf), _udp_endpoint_other,
+ boost::bind(&gr_socket_pdu::handle_udp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+ set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_socket_pdu::udp_send, this, _1));
+ } else if(type =="UDP_CLIENT"){
+ _udp_socket.reset(new boost::asio::ip::udp::socket(_io_service, _udp_endpoint));
+ _udp_socket->async_receive_from( boost::asio::buffer(rxbuf), _udp_endpoint_other,
+ boost::bind(&gr_socket_pdu::handle_udp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+ set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_socket_pdu::udp_send, this, _1));
+ } else {
+ throw std::runtime_error("unknown socket type!");
+ }
+
+ // start thread for io_service
+ d_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&gr_socket_pdu::run_io_service, this)));
+ d_started = true;
+}
+
+void tcp_connection::handle_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred)
+ {
+ if(!error)
+ {
+ pmt::pmt_t vector = pmt::pmt_init_u8vector(bytes_transferred, (const uint8_t*)&buf[0]);
+ pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector);
+
+ d_block->message_port_pub( pmt::mp("pdus"), pdu );
+
+ socket_.async_read_some(
+ boost::asio::buffer(buf),
+ boost::bind(&tcp_connection::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
+
+ } else {
+ std::cout << "error occurred\n";
+ }
+
+ }
+
+
+void gr_socket_pdu::tcp_server_send(pmt::pmt_t msg){
+ pmt::pmt_t vector = pmt::pmt_cdr(msg);
+ for(size_t i=0; i<d_tcp_connections.size(); i++){
+ d_tcp_connections[i]->send(vector);
+ }
+}
+
+void gr_socket_pdu::tcp_client_send(pmt::pmt_t msg){
+ pmt::pmt_t vector = pmt::pmt_cdr(msg);
+ size_t len = pmt::pmt_length(vector);
+ size_t offset(0);
+ boost::array<char, 10000> txbuf;
+ memcpy(&txbuf[0], pmt::pmt_uniform_vector_elements(vector, offset), len);
+ _tcp_socket->send(boost::asio::buffer(txbuf,len));
+}
+
+void gr_socket_pdu::udp_send(pmt::pmt_t msg){
+ pmt::pmt_t vector = pmt::pmt_cdr(msg);
+ size_t len = pmt::pmt_length(vector);
+ size_t offset(0);
+ boost::array<char, 10000> txbuf;
+ memcpy(&txbuf[0], pmt::pmt_uniform_vector_elements(vector, offset), len);
+ if(_udp_endpoint_other.address().to_string() != "0.0.0.0")
+ _udp_socket->send_to(boost::asio::buffer(txbuf,len), _udp_endpoint_other);
+}
diff --git a/gnuradio-core/src/lib/io/gr_socket_pdu.h b/gnuradio-core/src/lib/io/gr_socket_pdu.h
new file mode 100644
index 000000000..3a96a3f97
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_socket_pdu.h
@@ -0,0 +1,203 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_SOCKET_PDU_H
+#define INCLUDED_GR_SOCKET_PDU_H
+
+#include <gr_core_api.h>
+#include <gr_sync_block.h>
+#include <gr_message.h>
+#include <gr_msg_queue.h>
+#include <gr_stream_pdu_base.h>
+#include <boost/asio.hpp>
+
+#include <linux/if_tun.h>
+
+class gr_socket_pdu;
+typedef boost::shared_ptr<gr_socket_pdu> gr_socket_pdu_sptr;
+
+GR_CORE_API gr_socket_pdu_sptr gr_make_socket_pdu (std::string type, std::string addr, std::string port, int MTU=10000);
+
+class tcp_connection
+ : public boost::enable_shared_from_this<tcp_connection>
+{
+public:
+ typedef boost::shared_ptr<tcp_connection> pointer;
+ gr_socket_pdu *d_block;
+ boost::array<char, 10000> buf;
+
+ static pointer create(boost::asio::io_service& io_service)
+ {
+ return pointer(new tcp_connection(io_service));
+ }
+
+ boost::asio::ip::tcp::socket& socket()
+ {
+ return socket_;
+ }
+
+ void start(gr_socket_pdu* parent)
+ {
+ d_block = parent;
+// message_ = "connected to gr_socket_pdu\n";
+// boost::asio::async_write(socket_, boost::asio::buffer(message_),
+// boost::bind(&tcp_connection::handle_write, shared_from_this(),
+// boost::asio::placeholders::error,
+// boost::asio::placeholders::bytes_transferred));
+
+ socket_.async_read_some(
+ boost::asio::buffer(buf),
+ boost::bind(&tcp_connection::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
+ }
+ void send(pmt::pmt_t vector){
+ size_t len = pmt::pmt_length(vector);
+ size_t offset(0);
+ boost::array<char, 10000> txbuf;
+ memcpy(&txbuf[0], pmt::pmt_uniform_vector_elements(vector, offset), len);
+ boost::asio::async_write(socket_, boost::asio::buffer(txbuf, len),
+ boost::bind(&tcp_connection::handle_write, shared_from_this(),
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+ }
+
+ ~tcp_connection(){
+// std::cout << "tcp_connection destroyed\n";
+ }
+
+private:
+ tcp_connection(boost::asio::io_service& io_service)
+ : socket_(io_service)
+ {
+ }
+
+ void handle_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred);
+
+ void handle_write(const boost::system::error_code& /*error*/,
+ size_t /*bytes_transferred*/)
+ {
+ }
+
+ boost::asio::ip::tcp::socket socket_;
+ std::string message_;
+};
+
+
+/*!
+ * \brief Gather received items into messages and insert into msgq
+ * \ingroup sink_blk
+ */
+class GR_CORE_API gr_socket_pdu : public gr_stream_pdu_base
+{
+ private:
+ friend GR_CORE_API gr_socket_pdu_sptr
+ gr_make_socket_pdu(std::string type, std::string addr, std::string port, int MTU);
+
+ boost::asio::io_service _io_service;
+
+ boost::array<char, 10000> rxbuf;
+
+ // tcp specific
+ boost::asio::ip::tcp::endpoint _tcp_endpoint;
+
+ // specific to tcp server
+ boost::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor_tcp;
+ std::vector<tcp_connection::pointer> d_tcp_connections;
+ void tcp_server_send(pmt::pmt_t msg);
+ void tcp_client_send(pmt::pmt_t msg);
+ void udp_send(pmt::pmt_t msg);
+
+ // specific to tcp client
+ boost::shared_ptr<boost::asio::ip::tcp::socket> _tcp_socket;
+
+ // specific to udp client/server
+ boost::asio::ip::udp::endpoint _udp_endpoint;
+ boost::asio::ip::udp::endpoint _udp_endpoint_other;
+ boost::shared_ptr<boost::asio::ip::udp::socket> _udp_socket;
+
+ void handle_receive(const boost::system::error_code& error, std::size_t ){
+ }
+
+ void start_tcp_accept(){
+ tcp_connection::pointer new_connection =
+ tcp_connection::create(_acceptor_tcp->get_io_service());
+
+ _acceptor_tcp->async_accept(new_connection->socket(),
+ boost::bind(&gr_socket_pdu::handle_tcp_accept, this, new_connection,
+ boost::asio::placeholders::error));
+ }
+
+ void handle_tcp_accept(tcp_connection::pointer new_connection, const boost::system::error_code& error){
+ if (!error)
+ {
+ new_connection->start(this);
+ d_tcp_connections.push_back(new_connection);
+ start_tcp_accept();
+ } else {
+ std::cout << error << std::endl;
+ }
+ }
+
+ void run_io_service(){
+ _io_service.run();
+ }
+
+ void handle_udp_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred){
+ if(!error){
+ pmt::pmt_t vector = pmt::pmt_init_u8vector(bytes_transferred, (const uint8_t*)&rxbuf[0]);
+ pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector);
+
+ message_port_pub( pmt::mp("pdus"), pdu );
+
+ _udp_socket->async_receive_from( boost::asio::buffer(rxbuf), _udp_endpoint_other,
+ boost::bind(&gr_socket_pdu::handle_udp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+ } else {
+ throw boost::system::system_error(error);
+// std::cout << "error occurred\n";
+ }
+ }
+ void handle_tcp_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred){
+ if(!error)
+ {
+ pmt::pmt_t vector = pmt::pmt_init_u8vector(bytes_transferred, (const uint8_t*)&rxbuf[0]);
+ pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector);
+
+ message_port_pub( pmt::mp("pdus"), pdu );
+
+ _tcp_socket->async_read_some(
+ boost::asio::buffer(rxbuf),
+ boost::bind(&gr_socket_pdu::handle_tcp_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
+
+ } else {
+ //std::cout << "error occurred\n";
+ throw boost::system::system_error(error);
+ }
+ }
+
+ protected:
+ gr_socket_pdu (std::string type, std::string addr, std::string port, int MTU=10000);
+ public:
+ ~gr_socket_pdu () {}
+};
+
+#endif /* INCLUDED_GR_TUNTAP_PDU_H */
diff --git a/gnuradio-core/src/lib/io/gr_socket_pdu.i b/gnuradio-core/src/lib/io/gr_socket_pdu.i
new file mode 100644
index 000000000..3e20b63e2
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_socket_pdu.i
@@ -0,0 +1,33 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2005 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+GR_SWIG_BLOCK_MAGIC(gr,socket_pdu);
+
+%ignore tcp_connection;
+
+%{
+#include <gr_socket_pdu.h>
+%}
+
+%include "gr_stream_pdu_base.h"
+%include "gr_socket_pdu.h"
+
diff --git a/gnuradio-core/src/lib/io/gr_stream_pdu_base.cc b/gnuradio-core/src/lib/io/gr_stream_pdu_base.cc
new file mode 100644
index 000000000..cff7296cb
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_stream_pdu_base.cc
@@ -0,0 +1,117 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_stream_pdu_base.h>
+#include <gr_io_signature.h>
+#include <cstdio>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdexcept>
+#include <string.h>
+#include <iostream>
+#include <gr_pdu.h>
+#include <boost/format.hpp>
+
+static const long timeout_us = 100*1000; //100ms
+
+gr_stream_pdu_base::gr_stream_pdu_base (int MTU)
+ : gr_sync_block("stream_pdu_base",
+ gr_make_io_signature(0, 0, 0),
+ gr_make_io_signature(0, 0, 0)),
+ d_finished(false), d_started(false), d_fd(-1)
+{
+ // reserve space for rx buffer
+ d_rxbuf.resize(MTU,0);
+}
+
+gr_stream_pdu_base::~gr_stream_pdu_base()
+{
+ stop_rxthread();
+}
+
+void gr_stream_pdu_base::stop_rxthread(){
+ d_finished = true;
+ if(d_started){
+ d_thread->interrupt();
+ d_thread->join();
+ }
+ }
+
+void gr_stream_pdu_base::start_rxthread(pmt::pmt_t _rxport){
+ rxport = _rxport;
+ d_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&gr_stream_pdu_base::run, this)));
+ d_started = true;
+ }
+
+void gr_stream_pdu_base::run(){
+ while(!d_finished) {
+ if(not wait_ready()){ continue; }
+ const int result = read( d_fd, &d_rxbuf[0], d_rxbuf.size() );
+ if(result <= 0){ throw std::runtime_error("gr_stream_pdu_base, bad socket read!"); }
+ pmt::pmt_t vector = pmt::pmt_init_u8vector(result, &d_rxbuf[0]);
+ pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector);
+ message_port_pub(rxport, pdu);
+ }
+}
+
+void gr_stream_pdu_base::send(pmt::pmt_t msg){
+ pmt::pmt_t vector = pmt::pmt_cdr(msg);
+ size_t offset(0);
+ size_t itemsize(gr_pdu_itemsize(type_from_pmt(vector)));
+ int len( pmt::pmt_length(vector)*itemsize );
+
+ const int rv = write(d_fd, pmt::pmt_uniform_vector_elements(vector, offset), len);
+ if(rv != len){
+ std::cerr << boost::format("WARNING: gr_stream_pdu_base::send(pdu) write failed! (d_fd=%d, len=%d, rv=%d)")
+ % d_fd % len % rv << std::endl;
+ }
+}
+
+int
+gr_stream_pdu_base::work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+{
+ throw std::runtime_error("should not be called.\n");
+ return 0;
+}
+
+bool gr_stream_pdu_base::wait_ready(){
+ //setup timeval for timeout
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = timeout_us;
+
+ //setup rset for timeout
+ fd_set rset;
+ FD_ZERO(&rset);
+ FD_SET(d_fd, &rset);
+
+ //call select with timeout on receive socket
+ return ::select(d_fd+1, &rset, NULL, NULL, &tv) > 0;
+}
diff --git a/gnuradio-core/src/lib/io/gr_stream_pdu_base.h b/gnuradio-core/src/lib/io/gr_stream_pdu_base.h
new file mode 100644
index 000000000..dc5dc5c2e
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_stream_pdu_base.h
@@ -0,0 +1,62 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_STREAM_PDU_BASE_H
+#define INCLUDED_GR_STREAM_PDU_BASE_H
+
+#include <gr_core_api.h>
+#include <gr_sync_block.h>
+#include <gr_message.h>
+#include <gr_msg_queue.h>
+
+#include <linux/if_tun.h>
+
+
+/*!
+ * \brief Gather received items into messages and insert into msgq
+ * \ingroup sink_blk
+ */
+class GR_CORE_API gr_stream_pdu_base : public gr_sync_block
+{
+ public:
+ boost::shared_ptr<boost::thread> d_thread;
+ bool d_finished;
+ bool d_started;
+ std::vector<uint8_t> d_rxbuf;
+ void run();
+ int d_fd;
+ gr_stream_pdu_base (int MTU=10000);
+ ~gr_stream_pdu_base ();
+ void send(pmt::pmt_t msg);
+ bool wait_ready();
+ int work (int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
+ void start_rxthread(pmt::pmt_t _rxport);
+ void stop_rxthread();
+ private:
+ pmt::pmt_t rxport;
+};
+
+typedef boost::shared_ptr<gr_stream_pdu_base> gr_stream_pdu_base_sptr;
+
+#endif /* INCLUDED_GR_TUNTAP_PDU_H */
diff --git a/gnuradio-core/src/lib/io/gr_tuntap_pdu.cc b/gnuradio-core/src/lib/io/gr_tuntap_pdu.cc
new file mode 100644
index 000000000..44de1a5f7
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_tuntap_pdu.cc
@@ -0,0 +1,143 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_tuntap_pdu.h>
+#include <gr_io_signature.h>
+#include <cstdio>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdexcept>
+#include <string.h>
+#include <iostream>
+#include <gr_pdu.h>
+#include <boost/format.hpp>
+
+#if (defined(linux) || defined(__linux) || defined(__linux__))
+
+#include <sys/ioctl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <arpa/inet.h>
+#include <linux/if.h>
+
+
+// public constructor that returns a shared_ptr
+
+gr_tuntap_pdu_sptr
+gr_make_tuntap_pdu (std::string dev, int MTU)
+{
+ return gnuradio::get_initial_sptr(new gr_tuntap_pdu(dev, MTU));
+}
+
+gr_tuntap_pdu::gr_tuntap_pdu (std::string dev, int MTU)
+ : gr_stream_pdu_base(MTU)
+{
+
+ // make the tuntap
+ char dev_cstr[1024];
+ memset(dev_cstr, 0x00, 1024);
+ strncpy(dev_cstr, dev.c_str(), std::min(sizeof(dev_cstr), dev.size()));
+ d_fd = tun_alloc(dev_cstr);
+ if(d_fd <= 0){
+ throw std::runtime_error("TunTap make: tun_alloc failed (are you running as root?)");
+ }
+
+ std::cout << boost::format(
+ "Allocated virtual ethernet interface: %s\n"
+ "You must now use ifconfig to set its IP address. E.g.,\n"
+ " $ sudo ifconfig %s 192.168.200.1\n"
+ "Be sure to use a different address in the same subnet for each machine.\n"
+ ) % dev % dev << std::endl;
+
+ // set up output message port
+ message_port_register_out(pmt::mp("pdus"));
+ start_rxthread(pmt::mp("pdus"));
+
+ // set up input message port
+ message_port_register_in(pmt::mp("pdus"));
+ set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_tuntap_pdu::send, this, _1));
+}
+
+
+int gr_tuntap_pdu::tun_alloc(char *dev, int flags) {
+ struct ifreq ifr;
+ int fd, err;
+ const char *clonedev = "/dev/net/tun";
+
+ /* Arguments taken by the function:
+ *
+ * char *dev: the name of an interface (or '\0'). MUST have enough
+ * space to hold the interface name if '\0' is passed
+ * int flags: interface flags (eg, IFF_TUN etc.)
+ */
+
+ /* open the clone device */
+ if( (fd = open(clonedev, O_RDWR)) < 0 ) {
+ return fd;
+ }
+
+ /* preparation of the struct ifr, of type "struct ifreq" */
+ memset(&ifr, 0, sizeof(ifr));
+
+ ifr.ifr_flags = flags; /* IFF_TUN or IFF_TAP, plus maybe IFF_NO_PI */
+
+ if (*dev) {
+ /* if a device name was specified, put it in the structure; otherwise,
+ * the kernel will try to allocate the "next" device of the
+ * specified type */
+ strncpy(ifr.ifr_name, dev, IFNAMSIZ);
+ }
+
+ /* try to create the device */
+ if( (err = ioctl(fd, TUNSETIFF, (void *) &ifr)) < 0 ) {
+ close(fd);
+ return err;
+ }
+
+ /* if the operation was successful, write back the name of the
+ * interface to the variable "dev", so the caller can know
+ * it. Note that the caller MUST reserve space in *dev (see calling
+ * code below) */
+ strcpy(dev, ifr.ifr_name);
+
+ /* this is the special file descriptor that the caller will use to talk
+ * with the virtual interface */
+ return fd;
+}
+
+#else //if not linux
+
+boost::shared_ptr<gr_block> gr_make_tuntap_pdu (std::string dev, int MTU){
+ boost::shared_ptr<gr_block> rv;
+ throw std::runtime_error("tuntap only implemented on linux");
+ return rv;
+}
+
+#endif
diff --git a/gnuradio-core/src/lib/io/gr_tuntap_pdu.h b/gnuradio-core/src/lib/io/gr_tuntap_pdu.h
new file mode 100644
index 000000000..0e8071c30
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_tuntap_pdu.h
@@ -0,0 +1,66 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_TUNTAP_PDU_H
+#define INCLUDED_GR_TUNTAP_PDU_H
+
+#include <gr_core_api.h>
+#include <gr_sync_block.h>
+#include <gr_message.h>
+#include <gr_msg_queue.h>
+#include <gr_stream_pdu_base.h>
+
+#if (defined(linux) || defined(__linux) || defined(__linux__))
+
+#include <linux/if_tun.h>
+
+class gr_tuntap_pdu;
+typedef boost::shared_ptr<gr_tuntap_pdu> gr_tuntap_pdu_sptr;
+
+GR_CORE_API gr_tuntap_pdu_sptr gr_make_tuntap_pdu (std::string dev, int MTU=10000);
+
+/*!
+ * \brief Gather received items into messages and insert into msgq
+ * \ingroup sink_blk
+ */
+class GR_CORE_API gr_tuntap_pdu : public gr_stream_pdu_base
+{
+ private:
+ friend GR_CORE_API gr_tuntap_pdu_sptr
+ gr_make_tuntap_pdu(std::string dev, int MTU);
+ int tun_alloc(char* dev, int flags = IFF_TAP | IFF_NO_PI);
+ std::string d_dev;
+ protected:
+ gr_tuntap_pdu (std::string dev, int MTU=10000);
+
+ public:
+ ~gr_tuntap_pdu () {}
+
+};
+
+#else // if not linux
+
+GR_CORE_API boost::shared_ptr<gr_block> gr_make_tuntap_pdu (std::string dev, int MTU=0);
+
+#endif
+
+#endif /* INCLUDED_GR_TUNTAP_PDU_H */
diff --git a/gnuradio-core/src/lib/io/gr_tuntap_pdu.i b/gnuradio-core/src/lib/io/gr_tuntap_pdu.i
new file mode 100644
index 000000000..589bbc385
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_tuntap_pdu.i
@@ -0,0 +1,30 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2005 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+GR_SWIG_BLOCK_MAGIC(gr,tuntap_pdu);
+
+%{
+#include <gr_tuntap_pdu.h>
+%}
+
+%include "gr_tuntap_pdu.h"
+
diff --git a/gnuradio-core/src/lib/io/io.i b/gnuradio-core/src/lib/io/io.i
index 871ce1356..e2de4eb97 100644
--- a/gnuradio-core/src/lib/io/io.i
+++ b/gnuradio-core/src/lib/io/io.i
@@ -49,6 +49,8 @@
#include <gr_tagged_stream_to_pdu.h>
#include <gr_message_debug.h>
#include <gr_pdu.h>
+#include <gr_tuntap_pdu.h>
+#include <gr_socket_pdu.h>
%}
%include "gr_file_sink_base.i"
@@ -75,4 +77,7 @@
%include "gr_tagged_stream_to_pdu.i"
%include "gr_message_debug.i"
%include "gr_pdu.i"
+%include "gr_tuntap_pdu.i"
+%include "gr_socket_pdu.i"
+
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index 0f7875a12..6ff57a1d6 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -38,7 +38,7 @@ static long s_ncurrently_allocated = 0;
long
gr_basic_block_ncurrently_allocated()
{
- return s_ncurrently_allocated;
+ return s_ncurrently_allocated;
}
gr_basic_block::gr_basic_block(const std::string &name,
@@ -53,25 +53,25 @@ gr_basic_block::gr_basic_block(const std::string &name,
d_color(WHITE),
message_subscribers(pmt::pmt_make_dict())
{
- s_ncurrently_allocated++;
+ s_ncurrently_allocated++;
}
gr_basic_block::~gr_basic_block()
{
- s_ncurrently_allocated--;
- global_block_registry.block_unregister(this);
+ s_ncurrently_allocated--;
+ global_block_registry.block_unregister(this);
}
gr_basic_block_sptr
gr_basic_block::to_basic_block()
{
- return shared_from_this();
+ return shared_from_this();
}
void
gr_basic_block::set_block_alias(std::string name)
{
- global_block_registry.register_symbolic_name(this, name);
+ global_block_registry.register_symbolic_name(this, name);
}
// ** Message passing interface **
@@ -147,28 +147,29 @@ void gr_basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg)
}
// - subscribe to a message port
-void gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target)
-{
- if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) {
+void
+gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){
std::stringstream ss;
- ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id)
- << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
+ ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
throw std::runtime_error(ss.str());
}
-
pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
- message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target));
+
+ // ignore re-adds of the same target
+ if(!pmt::pmt_list_has(currlist, target))
+ message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target));
}
-void gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target)
-{
- if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) {
+void
+gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){
+ if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){
std::stringstream ss;
- ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id)
- << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
+ ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
throw std::runtime_error(ss.str());
}
-
+
+ // ignore unsubs of unknown targets
pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_rm(currlist,target));
}
@@ -224,5 +225,3 @@ gr_basic_block::delete_head_blocking(pmt::pmt_t which_port)
msg_queue[which_port].pop_front();
return m;
}
-
-
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index 00e9c2192..f3b7b835b 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -36,6 +36,7 @@
#include <gruel/thread.h>
#include <boost/foreach.hpp>
#include <boost/thread/condition_variable.hpp>
+#include <iostream>
/*!
* \brief The abstract base class for all signal processing blocks.
@@ -50,202 +51,215 @@
class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_shared_from_this<gr_basic_block>
{
- typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
-
-private:
- /*
- * This function is called by the runtime system to dispatch messages.
- *
- * The thread-safety guarantees mentioned in set_msg_handler are implemented
- * by the callers of this method.
- */
- void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
- {
- // AA Update this
- if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
- d_msg_handlers[which_port](msg); // Yes, invoke it.
- };
-
- //msg_handler_t d_msg_handler;
- typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
- d_msg_handlers_t d_msg_handlers;
-
- typedef std::deque<pmt::pmt_t> msg_queue_t;
- typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t;
- typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr;
- msg_queue_map_t msg_queue;
-// boost::condition_variable msg_queue_ready;
- std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready;
-
- gruel::mutex mutex; //< protects all vars
-
-
-protected:
- friend class gr_flowgraph;
- friend class gr_flat_flowgraph; // TODO: will be redundant
- friend class gr_tpb_thread_body;
-
- enum vcolor { WHITE, GREY, BLACK };
-
- std::string d_name;
- gr_io_signature_sptr d_input_signature;
- gr_io_signature_sptr d_output_signature;
- long d_unique_id;
- long d_symbolic_id;
- std::string d_symbol_name;
- std::string d_symbol_alias;
- vcolor d_color;
-
- gr_basic_block(void){} //allows pure virtual interface sub-classes
-
- //! Protected constructor prevents instantiation by non-derived classes
- gr_basic_block(const std::string &name,
- gr_io_signature_sptr input_signature,
- gr_io_signature_sptr output_signature);
-
- //! may only be called during constructor
- void set_input_signature(gr_io_signature_sptr iosig) {
- d_input_signature = iosig;
+ typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
+
+ private:
+ /*
+ * This function is called by the runtime system to dispatch messages.
+ *
+ * The thread-safety guarantees mentioned in set_msg_handler are implemented
+ * by the callers of this method.
+ */
+ void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
+ {
+ // AA Update this
+ if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
+ d_msg_handlers[which_port](msg); // Yes, invoke it.
+ };
+
+ //msg_handler_t d_msg_handler;
+ typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
+ d_msg_handlers_t d_msg_handlers;
+
+ typedef std::deque<pmt::pmt_t> msg_queue_t;
+ typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t;
+ typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr;
+ std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready;
+
+ gruel::mutex mutex; //< protects all vars
+
+ protected:
+ friend class gr_flowgraph;
+ friend class gr_flat_flowgraph; // TODO: will be redundant
+ friend class gr_tpb_thread_body;
+
+ enum vcolor { WHITE, GREY, BLACK };
+
+ std::string d_name;
+ gr_io_signature_sptr d_input_signature;
+ gr_io_signature_sptr d_output_signature;
+ long d_unique_id;
+ long d_symbolic_id;
+ std::string d_symbol_name;
+ std::string d_symbol_alias;
+ vcolor d_color;
+ msg_queue_map_t msg_queue;
+
+ gr_basic_block(void){} //allows pure virtual interface sub-classes
+
+ //! Protected constructor prevents instantiation by non-derived classes
+ gr_basic_block(const std::string &name,
+ gr_io_signature_sptr input_signature,
+ gr_io_signature_sptr output_signature);
+
+ //! may only be called during constructor
+ void set_input_signature(gr_io_signature_sptr iosig) {
+ d_input_signature = iosig;
+ }
+
+ //! may only be called during constructor
+ void set_output_signature(gr_io_signature_sptr iosig) {
+ d_output_signature = iosig;
+ }
+
+ /*!
+ * \brief Allow the flowgraph to set for sorting and partitioning
+ */
+ void set_color(vcolor color) { d_color = color; }
+ vcolor color() const { return d_color; }
+
+ // Message passing interface
+ pmt::pmt_t message_subscribers;
+
+ public:
+ virtual ~gr_basic_block();
+ long unique_id() const { return d_unique_id; }
+ long symbolic_id() const { return d_symbolic_id; }
+ std::string name() const { return d_name; }
+ std::string symbol_name() const { return d_symbol_name; }
+ gr_io_signature_sptr input_signature() const { return d_input_signature; }
+ gr_io_signature_sptr output_signature() const { return d_output_signature; }
+ gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion
+ bool alias_set() { return !d_symbol_alias.empty(); }
+ std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
+ pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
+ void set_block_alias(std::string name);
+
+ // ** Message passing interface **
+ void message_port_register_in(pmt::pmt_t port_id);
+ void message_port_register_out(pmt::pmt_t port_id);
+ void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
+ void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
+ void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
+
+ virtual bool message_port_is_hier(pmt::pmt_t port_id) { std::cout << "is_hier\n"; return false; }
+ virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { std::cout << "is_hier_in\n"; return false; }
+ virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { std::cout << "is_hier_out\n"; return false; }
+
+ /*!
+ * \brief Get input message port names.
+ *
+ * Returns the available input message ports for a block. The
+ * return object is a PMT vector that is filled with PMT symbols.
+ */
+ pmt::pmt_t message_ports_in();
+
+ /*!
+ * \brief Get output message port names.
+ *
+ * Returns the available output message ports for a block. The
+ * return object is a PMT vector that is filled with PMT symbols.
+ */
+ pmt::pmt_t message_ports_out();
+
+ /*!
+ * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
+ */
+ void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
+
+ //! is the queue empty?
+ //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); }
+ bool empty_p(pmt::pmt_t which_port) {
+ if(msg_queue.find(which_port) == msg_queue.end())
+ throw std::runtime_error("port does not exist!");
+ return msg_queue[which_port].empty();
+ }
+ bool empty_p() {
+ bool rv = true;
+ BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
+ return rv;
+ }
+
+ //| Acquires and release the mutex
+ void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
+
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
+
+ msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
+ return msg_queue[which_port].begin();
+ }
+
+ void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
+ msg_queue[which_port].erase(it);
+ }
+
+ virtual bool has_msg_port(pmt::pmt_t which_port){
+ if(msg_queue.find(which_port) != msg_queue.end()){
+ return true;
}
-
- //! may only be called during constructor
- void set_output_signature(gr_io_signature_sptr iosig) {
- d_output_signature = iosig;
- }
-
- /*!
- * \brief Allow the flowgraph to set for sorting and partitioning
- */
- void set_color(vcolor color) { d_color = color; }
- vcolor color() const { return d_color; }
-
- // Message passing interface
- pmt::pmt_t message_subscribers;
-
-public:
- virtual ~gr_basic_block();
- long unique_id() const { return d_unique_id; }
- long symbolic_id() const { return d_symbolic_id; }
- std::string name() const { return d_name; }
- std::string symbol_name() const { return d_symbol_name; }
- gr_io_signature_sptr input_signature() const { return d_input_signature; }
- gr_io_signature_sptr output_signature() const { return d_output_signature; }
- gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion
- bool alias_set() { return !d_symbol_alias.empty(); }
- std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
- pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
- void set_block_alias(std::string name);
-
- // ** Message passing interface **
- void message_port_register_in(pmt::pmt_t port_id);
- void message_port_register_out(pmt::pmt_t port_id);
- void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
- void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
- void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
-
- /*!
- * \brief Get input message port names.
- *
- * Returns the available input message ports for a block. The
- * return object is a PMT vector that is filled with PMT symbols.
- */
- pmt::pmt_t message_ports_in();
-
- /*!
- * \brief Get output message port names.
- *
- * Returns the available output message ports for a block. The
- * return object is a PMT vector that is filled with PMT symbols.
- */
- pmt::pmt_t message_ports_out();
-
- /*!
- * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
- */
- void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
-
- //! is the queue empty?
- //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); }
- bool empty_p(pmt::pmt_t which_port) {
- if(msg_queue.find(which_port) == msg_queue.end())
- throw std::runtime_error("port does not exist!");
- return msg_queue[which_port].empty();
- }
- bool empty_p() {
- bool rv = true;
- BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
- return rv;
- }
-
- //| Acquires and release the mutex
- void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
- /*!
- * \returns returns pmt at head of queue or pmt_t() if empty.
- */
- pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
-
- /*!
- * \returns returns pmt at head of queue or pmt_t() if empty.
- */
- pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
-
- msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
- return msg_queue[which_port].begin();
- }
- void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
- msg_queue[which_port].erase(it);
- }
-
-
- /*!
- * \brief Confirm that ninputs and noutputs is an acceptable combination.
- *
- * \param ninputs number of input streams connected
- * \param noutputs number of output streams connected
- *
- * \returns true if this is a valid configuration for this block.
- *
- * This function is called by the runtime system whenever the
- * topology changes. Most classes do not need to override this.
- * This check is in addition to the constraints specified by the input
- * and output gr_io_signatures.
- */
- virtual bool check_topology(int ninputs, int noutputs) { return true; }
-
- /*!
- * \brief Set the callback that is fired when messages are available.
- *
- * \p msg_handler can be any kind of function pointer or function object
- * that has the signature:
- * <pre>
- * void msg_handler(pmt::pmt msg);
- * </pre>
- *
- * (You may want to use boost::bind to massage your callable into the
- * correct form. See gr_nop.{h,cc} for an example that sets up a class
- * method as the callback.)
- *
- * Blocks that desire to handle messages must call this method in their
- * constructors to register the handler that will be invoked when messages
- * are available.
- *
- * If the block inherits from gr_block, the runtime system will ensure that
- * msg_handler is called in a thread-safe manner, such that work and
- * msg_handler will never be called concurrently. This allows msg_handler
- * to update state variables without having to worry about thread-safety
- * issues with work, general_work or another invocation of msg_handler.
- *
- * If the block inherits from gr_hier_block2, the runtime system will
- * ensure that no reentrant calls are made to msg_handler.
- */
- //template <typename T> void set_msg_handler(T msg_handler){
- // d_msg_handler = msg_handler_t(msg_handler);
- //}
- template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){
- if(msg_queue.find(which_port) == msg_queue.end()){
- throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); }
- d_msg_handlers[which_port] = msg_handler_t(msg_handler);
+ if(pmt::pmt_dict_has_key(message_subscribers, which_port)){
+ return true;
}
+ return false;
+ }
+
+
+ /*!
+ * \brief Confirm that ninputs and noutputs is an acceptable combination.
+ *
+ * \param ninputs number of input streams connected
+ * \param noutputs number of output streams connected
+ *
+ * \returns true if this is a valid configuration for this block.
+ *
+ * This function is called by the runtime system whenever the
+ * topology changes. Most classes do not need to override this.
+ * This check is in addition to the constraints specified by the input
+ * and output gr_io_signatures.
+ */
+ virtual bool check_topology(int ninputs, int noutputs) { return true; }
+
+ /*!
+ * \brief Set the callback that is fired when messages are available.
+ *
+ * \p msg_handler can be any kind of function pointer or function object
+ * that has the signature:
+ * <pre>
+ * void msg_handler(pmt::pmt msg);
+ * </pre>
+ *
+ * (You may want to use boost::bind to massage your callable into the
+ * correct form. See gr_nop.{h,cc} for an example that sets up a class
+ * method as the callback.)
+ *
+ * Blocks that desire to handle messages must call this method in their
+ * constructors to register the handler that will be invoked when messages
+ * are available.
+ *
+ * If the block inherits from gr_block, the runtime system will ensure that
+ * msg_handler is called in a thread-safe manner, such that work and
+ * msg_handler will never be called concurrently. This allows msg_handler
+ * to update state variables without having to worry about thread-safety
+ * issues with work, general_work or another invocation of msg_handler.
+ *
+ * If the block inherits from gr_hier_block2, the runtime system will
+ * ensure that no reentrant calls are made to msg_handler.
+ */
+ //template <typename T> void set_msg_handler(T msg_handler){
+ // d_msg_handler = msg_handler_t(msg_handler);
+ //}
+ template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){
+ if(msg_queue.find(which_port) == msg_queue.end()){
+ throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); }
+ d_msg_handlers[which_port] = msg_handler_t(msg_handler);
+ }
};
inline bool operator<(gr_basic_block_sptr lhs, gr_basic_block_sptr rhs)
@@ -260,8 +274,8 @@ GR_CORE_API long gr_basic_block_ncurrently_allocated();
inline std::ostream &operator << (std::ostream &os, gr_basic_block_sptr basic_block)
{
- os << basic_block->name() << "(" << basic_block->unique_id() << ")";
- return os;
+ os << basic_block->name() << "(" << basic_block->unique_id() << ")";
+ return os;
}
#endif /* INCLUDED_GR_BASIC_BLOCK_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
index e04deb948..c19863f34 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
@@ -31,8 +31,9 @@
#include <volk/volk.h>
#include <iostream>
#include <map>
+#include <boost/format.hpp>
-#define GR_FLAT_FLOWGRAPH_DEBUG 0
+#define GR_FLAT_FLOWGRAPH_DEBUG 0
// 32Kbyte buffer size between blocks
#define GR_FIXED_BUFFER_SIZE (32*(1L<<10))
@@ -71,6 +72,15 @@ gr_flat_flowgraph::setup_connections()
block->set_is_unaligned(false);
}
+ // Connect message ports connetions
+ for(gr_msg_edge_viter_t i = d_msg_edges.begin(); i != d_msg_edges.end(); i++){
+ if(GR_FLAT_FLOWGRAPH_DEBUG)
+ std::cout << boost::format("flat_fg connecting msg primitives: (%s, %s)->(%s, %s)\n") %
+ i->src().block() % i->src().port() %
+ i->dst().block() % i->dst().port();
+ i->src().block()->message_port_sub( i->src().port(), pmt::pmt_cons(i->dst().block()->alias_pmt(), i->dst().port()) );
+ }
+
}
gr_block_detail_sptr
@@ -350,3 +360,25 @@ gr_flat_flowgraph::make_block_vector(gr_basic_block_vector_t &blocks)
return result;
}
+
+
+void gr_flat_flowgraph::replace_endpoint(const gr_msg_endpoint &e, const gr_msg_endpoint &r, bool is_src){
+ size_t n_replr(0);
+ if(GR_FLAT_FLOWGRAPH_DEBUG)
+ std::cout << boost::format("gr_flat_flowgraph::replace_endpoint( %s, %s, %d )\n") % e.block()% r.block()% is_src;
+ for(size_t i=0; i<d_msg_edges.size(); i++){
+ if(is_src){
+ if(d_msg_edges[i].src() == e){
+ d_msg_edges[i] = gr_msg_edge(r, d_msg_edges[i].dst() );
+ n_replr++;
+ }
+ } else {
+ if(d_msg_edges[i].dst() == e){
+ d_msg_edges[i] = gr_msg_edge(d_msg_edges[i].src(), r );
+ n_replr++;
+ }
+ }
+ }
+// std::cout << "n_repl = " << n_repl <<"\n";
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
index 0926bcc8f..52f202334 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
@@ -46,7 +46,7 @@ public:
// Wire gr_blocks together in new flat_flowgraph
void setup_connections();
-
+
// Merge applicable connections from existing flat flowgraph
void merge_connections(gr_flat_flowgraph_sptr sfg);
@@ -57,6 +57,8 @@ public:
*/
static gr_block_vector_t make_block_vector(gr_basic_block_vector_t &blocks);
+ void replace_endpoint(const gr_msg_endpoint &e, const gr_msg_endpoint &r, bool is_src);
+
private:
gr_flat_flowgraph();
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
index 69c304a3d..63a208480 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
@@ -149,6 +149,16 @@ gr_flowgraph::check_valid_port(gr_io_signature_sptr sig, int port)
}
}
+void
+gr_flowgraph::check_valid_port(const gr_msg_endpoint &e)
+{
+ if (GR_FLOWGRAPH_DEBUG)
+ std::cout << "check_valid_port( " << e.block() << ", " << e.port() << ")\n";
+
+ if(!e.block()->has_msg_port(e.port()))
+ throw std::invalid_argument("invalid msg port in connect() or disconnect()");
+}
+
void
gr_flowgraph::check_dst_not_used(const gr_endpoint &dst)
{
@@ -181,8 +191,10 @@ gr_flowgraph::calc_used_blocks()
gr_basic_block_vector_t tmp;
// make sure free standing message blocks are included
- for (gr_basic_block_vector_t::iterator it=d_msgblocks.begin(); it!=d_msgblocks.end(); it++){
- tmp.push_back(*it);
+ for (gr_msg_edge_viter_t p = d_msg_edges.begin(); p != d_msg_edges.end(); p++) {
+// for now only blocks receiving messages get a thread context - uncomment to allow senders to also obtain one
+// tmp.push_back(p->src().block());
+ tmp.push_back(p->dst().block());
}
// Collect all blocks in the edge list
@@ -477,7 +489,27 @@ gr_flowgraph::topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_ve
output.push_back(block);
}
-void gr_flowgraph::add_msg_block(gr_basic_block_sptr blk){
- d_msgblocks.push_back(blk);
+void gr_flowgraph::connect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst){
+ check_valid_port(src);
+ check_valid_port(dst);
+ for (gr_msg_edge_viter_t p = d_msg_edges.begin(); p != d_msg_edges.end(); p++) {
+ if(p->src() == src && p->dst() == dst){
+ throw std::runtime_error("connect called on already connected edge!");
+ }
+ }
+ d_msg_edges.push_back(gr_msg_edge(src,dst));
}
+void gr_flowgraph::disconnect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst){
+ check_valid_port(src);
+ check_valid_port(dst);
+ for (gr_msg_edge_viter_t p = d_msg_edges.begin(); p != d_msg_edges.end(); p++) {
+ if(p->src() == src && p->dst() == dst){
+ d_msg_edges.erase(p);
+ return;
+ }
+ }
+ throw std::runtime_error("disconnect called on non-connected edge!");
+}
+
+
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
index 860cb0ff1..bef70f626 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
@@ -52,6 +52,31 @@ inline bool gr_endpoint::operator==(const gr_endpoint &other) const
d_port == other.d_port);
}
+class GR_CORE_API gr_msg_endpoint
+{
+private:
+ gr_basic_block_sptr d_basic_block;
+ pmt::pmt_t d_port;
+ bool d_is_hier;
+public:
+ gr_msg_endpoint() : d_basic_block(), d_port(pmt::PMT_NIL) { }
+ gr_msg_endpoint(gr_basic_block_sptr block, pmt::pmt_t port, bool is_hier=false){ d_basic_block = block; d_port = port; d_is_hier = is_hier;}
+ gr_basic_block_sptr block() const { return d_basic_block; }
+ pmt::pmt_t port() const { return d_port; }
+ bool is_hier() const { return d_is_hier; }
+ void set_hier(bool h) { d_is_hier = h; }
+
+ bool operator==(const gr_msg_endpoint &other) const;
+
+};
+
+inline bool gr_msg_endpoint::operator==(const gr_msg_endpoint &other) const
+{
+ return (d_basic_block == other.d_basic_block &&
+ pmt::pmt_equal(d_port, other.d_port));
+}
+
+
// Hold vectors of gr_endpoint objects
typedef std::vector<gr_endpoint> gr_endpoint_vector_t;
typedef std::vector<gr_endpoint>::iterator gr_endpoint_viter_t;
@@ -75,11 +100,35 @@ private:
gr_endpoint d_dst;
};
+
// Hold vectors of gr_edge objects
typedef std::vector<gr_edge> gr_edge_vector_t;
typedef std::vector<gr_edge>::iterator gr_edge_viter_t;
+/*!
+ *\brief Class representing a msg connection between to graph msg endpoints
+ *
+ */
+class GR_CORE_API gr_msg_edge
+{
+public:
+ gr_msg_edge() : d_src(), d_dst() { };
+ gr_msg_edge(const gr_msg_endpoint &src, const gr_msg_endpoint &dst) : d_src(src), d_dst(dst) { }
+ ~gr_msg_edge() {}
+
+ const gr_msg_endpoint &src() const { return d_src; }
+ const gr_msg_endpoint &dst() const { return d_dst; }
+
+private:
+ gr_msg_endpoint d_src;
+ gr_msg_endpoint d_dst;
+};
+
+// Hold vectors of gr_edge objects
+typedef std::vector<gr_msg_edge> gr_msg_edge_vector_t;
+typedef std::vector<gr_msg_edge>::iterator gr_msg_edge_viter_t;
+
// Create a shared pointer to a heap allocated flowgraph
// (types defined in gr_runtime_types.h)
GR_CORE_API gr_flowgraph_sptr gr_make_flowgraph();
@@ -110,7 +159,11 @@ public:
void disconnect(gr_basic_block_sptr src_block, int src_port,
gr_basic_block_sptr dst_block, int dst_port);
- void add_msg_block(gr_basic_block_sptr blk);
+ // Connect two msg endpoints
+ void connect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst);
+
+ // Disconnect two msg endpoints
+ void disconnect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst);
// Validate connectivity, raise exception if invalid
void validate();
@@ -120,6 +173,9 @@ public:
// Return vector of edges
const gr_edge_vector_t &edges() const { return d_edges; }
+
+ // Return vector of msg edges
+ const gr_msg_edge_vector_t &msg_edges() const { return d_msg_edges; }
// Return vector of connected blocks
gr_basic_block_vector_t calc_used_blocks();
@@ -130,11 +186,11 @@ public:
// Return vector of vectors of disjointly connected blocks, topologically
// sorted.
std::vector<gr_basic_block_vector_t> partition();
- gr_basic_block_vector_t d_msgblocks;
protected:
gr_basic_block_vector_t d_blocks;
gr_edge_vector_t d_edges;
+ gr_msg_edge_vector_t d_msg_edges;
gr_flowgraph();
std::vector<int> calc_used_ports(gr_basic_block_sptr block, bool check_inputs);
@@ -146,6 +202,7 @@ protected:
private:
void check_valid_port(gr_io_signature_sptr sig, int port);
+ void check_valid_port(const gr_msg_endpoint &e);
void check_dst_not_used(const gr_endpoint &dst);
void check_type_match(const gr_endpoint &src, const gr_endpoint &dst);
gr_edge_vector_t calc_connections(gr_basic_block_sptr block, bool check_inputs); // false=use outputs
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
index a19bfe195..8c2794c63 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
@@ -44,7 +44,9 @@ gr_hier_block2::gr_hier_block2(const std::string &name,
gr_io_signature_sptr input_signature,
gr_io_signature_sptr output_signature)
: gr_basic_block(name, input_signature, output_signature),
- d_detail(new gr_hier_block2_detail(this))
+ d_detail(new gr_hier_block2_detail(this)),
+ hier_message_ports_in(pmt::PMT_NIL),
+ hier_message_ports_out(pmt::PMT_NIL)
{
// This bit of magic ensures that self() works in the constructors of derived classes.
gnuradio::detail::sptr_magic::create_and_stash_initial_sptr(this);
@@ -141,6 +143,7 @@ gr_hier_block2::unlock()
d_detail->unlock();
}
+
gr_flat_flowgraph_sptr
gr_hier_block2::flatten() const
{
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.h b/gnuradio-core/src/lib/runtime/gr_hier_block2.h
index e8364a740..f80dd73e4 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.h
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.h
@@ -166,6 +166,39 @@ public:
gr_flat_flowgraph_sptr flatten() const;
gr_hier_block2_sptr to_hier_block2(); // Needed for Python type coercion
+
+ bool has_msg_port(pmt::pmt_t which_port){
+ return message_port_is_hier(which_port) || gr_basic_block::has_msg_port(which_port);
+ }
+
+ bool message_port_is_hier(pmt::pmt_t port_id){
+ return message_port_is_hier_in(port_id) || message_port_is_hier_out(port_id);
+ }
+ bool message_port_is_hier_in(pmt::pmt_t port_id){
+ return pmt::pmt_list_has(hier_message_ports_in, port_id);
+ }
+ bool message_port_is_hier_out(pmt::pmt_t port_id){
+ return pmt::pmt_list_has(hier_message_ports_out, port_id);
+ }
+
+ pmt::pmt_t hier_message_ports_in;
+ pmt::pmt_t hier_message_ports_out;
+
+ void message_port_register_hier_in(pmt::pmt_t port_id){
+ if(pmt::pmt_list_has(hier_message_ports_in, port_id))
+ throw std::invalid_argument("hier msg in port by this name already registered");
+ if(msg_queue.find(port_id) != msg_queue.end())
+ throw std::invalid_argument("block already has a primitive input port by this name");
+ hier_message_ports_in = pmt::pmt_list_add(hier_message_ports_in, port_id);
+ }
+ void message_port_register_hier_out(pmt::pmt_t port_id){
+ if(pmt::pmt_list_has(hier_message_ports_out, port_id))
+ throw std::invalid_argument("hier msg out port by this name already registered");
+ if(pmt::pmt_dict_has_key(message_subscribers, port_id))
+ throw std::invalid_argument("block already has a primitive output port by this name");
+ hier_message_ports_out = pmt::pmt_list_add(hier_message_ports_out, port_id);
+ }
+
};
inline gr_hier_block2_sptr cast_to_hier_block2_sptr(gr_basic_block_sptr block) {
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.i b/gnuradio-core/src/lib/runtime/gr_hier_block2.i
index 7c0e62f28..a857394ca 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.i
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.i
@@ -40,6 +40,8 @@ gr_hier_block2_sptr gr_make_hier_block2(const std::string name,
%rename(primitive_disconnect) gr_hier_block2::disconnect;
%rename(primitive_msg_connect) gr_hier_block2::msg_connect;
%rename(primitive_msg_disconnect) gr_hier_block2::msg_disconnect;
+%rename(primitive_message_port_register_hier_in) gr_hier_block2::message_port_register_hier_in;
+%rename(primitive_message_port_register_hier_out) gr_hier_block2::message_port_register_hier_out;
class gr_hier_block2 : public gr_basic_block
{
@@ -78,5 +80,9 @@ public:
void lock();
void unlock();
+ void message_port_register_hier_in(pmt::pmt_t port_id);
+ void message_port_register_hier_out(pmt::pmt_t port_id);
+
+
gr_hier_block2_sptr to_hier_block2(); // Needed for Python type coercion
};
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
index ff2a5db8c..e70553ddc 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
@@ -27,6 +27,7 @@
#include <gr_io_signature.h>
#include <stdexcept>
#include <sstream>
+#include <boost/format.hpp>
#define GR_HIER_BLOCK2_DETAIL_DEBUG 0
@@ -53,6 +54,7 @@ gr_hier_block2_detail::gr_hier_block2_detail(gr_hier_block2 *owner) :
d_outputs = gr_endpoint_vector_t(max_outputs);
}
+
gr_hier_block2_detail::~gr_hier_block2_detail()
{
d_owner = 0; // Don't use delete, we didn't allocate
@@ -151,15 +153,39 @@ gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
std::cout << "connecting message port..." << std::endl;
// register the subscription
- src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+// this is done later...
+// src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
// add block uniquely to list to internal blocks
if (std::find(d_blocks.begin(), d_blocks.end(), dst) == d_blocks.end()){
+ d_blocks.push_back(src);
d_blocks.push_back(dst);
}
- // make sure we instantiate a thread for this block
- d_fg->add_msg_block(dst);
+ bool hier_out = (d_owner == src.get()) && src->message_port_is_hier_out(srcport);;
+ bool hier_in = (d_owner == dst.get()) && dst->message_port_is_hier_in(dstport);
+
+ gr_hier_block2_sptr src_block(cast_to_hier_block2_sptr(src));
+ gr_hier_block2_sptr dst_block(cast_to_hier_block2_sptr(dst));
+
+ if (src_block && src.get() != d_owner) {
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << "connect: src is hierarchical, setting parent to " << this << std::endl;
+ src_block->d_detail->d_parent_detail = this;
+ }
+
+ if (dst_block && dst.get() != d_owner) {
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << "connect: dst is hierarchical, setting parent to " << this << std::endl;
+ dst_block->d_detail->d_parent_detail = this;
+ }
+
+ // add edge for this message connection
+ if(GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << boost::format("connect( (%s, %s, %d), (%s, %s, %d) )\n") %
+ src % srcport % hier_out %
+ dst % dstport % hier_in;
+ d_fg->connect( gr_msg_endpoint(src, srcport, hier_out), gr_msg_endpoint(dst, dstport, hier_in));
}
void
@@ -169,8 +195,13 @@ gr_hier_block2_detail::msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcpor
if (GR_HIER_BLOCK2_DETAIL_DEBUG)
std::cout << "disconnecting message port..." << std::endl;
- // register the subscription
+ // unregister the subscription - if already subscribed
src->message_port_unsub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+
+ // remove edge for this message connection
+ bool hier_out = (d_owner == src.get()) && src->message_port_is_hier_out(srcport);;
+ bool hier_in = (d_owner == dst.get()) && dst->message_port_is_hier_in(dstport);
+ d_fg->disconnect( gr_msg_endpoint(src, srcport, hier_out), gr_msg_endpoint(dst, dstport, hier_in));
}
void
@@ -435,11 +466,16 @@ void
gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
{
if (GR_HIER_BLOCK2_DETAIL_DEBUG)
- std::cout << "Flattening " << d_owner->name() << std::endl;
+ std::cout << " ** Flattening " << d_owner->name() << std::endl;
// Add my edges to the flow graph, resolving references to actual endpoints
gr_edge_vector_t edges = d_fg->edges();
+ gr_msg_edge_vector_t msg_edges = d_fg->msg_edges();
gr_edge_viter_t p;
+ gr_msg_edge_viter_t q,u;
+
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << "Flattening stream connections: " << std::endl;
for (p = edges.begin(); p != edges.end(); p++) {
if (GR_HIER_BLOCK2_DETAIL_DEBUG)
@@ -457,7 +493,46 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
}
}
}
- sfg->d_msgblocks = d_fg->d_msgblocks;
+
+ // loop through flattening hierarchical connections
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << "Flattening msg connections: " << std::endl;
+
+ for(q = msg_edges.begin(); q != msg_edges.end(); q++) {
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << boost::format(" flattening edge ( %s, %s, %d) -> ( %s, %s, %d)\n") % q->src().block() % q->src().port() % q->src().is_hier() % q->dst().block() % q->dst().port() % q->dst().is_hier();
+
+ bool normal_connection = true;
+
+ // resolve existing connections to hier ports
+ if(q->dst().is_hier()){
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << boost::format(" resolve hier output (%s, %s)") % q->dst().block() % q->dst().port() << std::endl;
+ sfg->replace_endpoint( q->dst(), q->src(), true );
+ normal_connection = false;
+ }
+
+ if(q->src().is_hier()){
+ if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+ std::cout << boost::format(" resolve hier input (%s, %s)") % q->src().block() % q->src().port() << std::endl;
+ sfg->replace_endpoint( q->src(), q->dst(), false );
+ normal_connection = false;
+ }
+
+ // propogate non hier connections through
+ if(normal_connection){
+ sfg->connect( q->src(), q->dst() );
+ }
+ }
+
+/* // connect primitive edges in the new fg
+ for(q = msg_edges.begin(); q != msg_edges.end(); q++) {
+ if( (!q->src().is_hier()) && (!q->dst().is_hier()) ){
+ sfg->connect( q->src(), q->dst() );
+ } else {
+ std::cout << "not connecting hier connection!" << std::endl;
+ }
+ }*/
// Construct unique list of blocks used either in edges, inputs,
// outputs, or by themselves. I still hate STL.
@@ -499,7 +574,7 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
// Recurse hierarchical children
for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
gr_hier_block2_sptr hier_block2(cast_to_hier_block2_sptr(*p));
- if (hier_block2) {
+ if (hier_block2 && (hier_block2.get() != d_owner)) {
if (GR_HIER_BLOCK2_DETAIL_DEBUG)
std::cout << "flatten_aux: recursing into hierarchical block " << hier_block2 << std::endl;
hier_block2->d_detail->flatten_aux(sfg);
@@ -530,3 +605,4 @@ gr_hier_block2_detail::unlock()
else
d_owner->unlock();
}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
index f2d2b3c4e..b38dae301 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
@@ -61,6 +61,7 @@ private:
gr_endpoint_vector_t d_outputs; // Single internal endpoint per external output
gr_basic_block_vector_t d_blocks;
+
void connect_input(int my_port, int port, gr_basic_block_sptr block);
void connect_output(int my_port, int port, gr_basic_block_sptr block);
void disconnect_input(int my_port, int port, gr_basic_block_sptr block);
diff --git a/gnuradio-core/src/python/gnuradio/gr/hier_block2.py b/gnuradio-core/src/python/gnuradio/gr/hier_block2.py
index 0c45f1691..f5f0c00f5 100644
--- a/gnuradio-core/src/python/gnuradio/gr/hier_block2.py
+++ b/gnuradio-core/src/python/gnuradio/gr/hier_block2.py
@@ -20,6 +20,7 @@
#
from gnuradio_core import hier_block2_swig
+from gruel import pmt
#
# This hack forces a 'has-a' relationship to look like an 'is-a' one.
@@ -111,3 +112,15 @@ class hier_block2(object):
self._hb.primitive_disconnect(src_block.to_basic_block(), src_port,
dst_block.to_basic_block(), dst_port)
+ def msg_connect(self, src, srcport, dst, dstport):
+ self.primitive_msg_connect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport);
+
+ def msg_disconnect(self, src, srcport, dst, dstport):
+ self.primitive_msg_disconnect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport);
+
+ def message_port_register_hier_in(self, portname):
+ self.primitive_message_port_register_hier_in(pmt.pmt_intern(portname));
+
+ def message_port_register_hier_out(self, portname):
+ self.primitive_message_port_register_hier_out(pmt.pmt_intern(portname));
+