summaryrefslogtreecommitdiff
path: root/mblock
diff options
context:
space:
mode:
Diffstat (limited to 'mblock')
-rw-r--r--mblock/README4
-rw-r--r--mblock/src/Makefile.am2
-rw-r--r--mblock/src/lib/Makefile.am51
-rw-r--r--mblock/src/lib/benchmark_send.cc (renamed from mblock/src/lib/mb_runtime_placeholder.h)39
-rw-r--r--mblock/src/lib/getres.cc32
-rw-r--r--mblock/src/lib/mb_class_registry.cc (renamed from mblock/src/lib/mb_runtime_thread_per_mblock.cc)44
-rw-r--r--mblock/src/lib/mb_class_registry.h51
-rw-r--r--mblock/src/lib/mb_exception.cc19
-rw-r--r--mblock/src/lib/mb_exception.h26
-rw-r--r--mblock/src/lib/mb_gettid.cc53
-rw-r--r--mblock/src/lib/mb_gettid.h (renamed from mblock/src/lib/mb_runtime_thread_per_mblock.h)18
-rw-r--r--mblock/src/lib/mb_mblock.cc104
-rw-r--r--mblock/src/lib/mb_mblock.h136
-rw-r--r--mblock/src/lib/mb_mblock_impl.cc76
-rw-r--r--mblock/src/lib/mb_mblock_impl.h26
-rw-r--r--mblock/src/lib/mb_message.cc41
-rw-r--r--mblock/src/lib/mb_message.h17
-rw-r--r--mblock/src/lib/mb_msg_accepter_msgq.cc46
-rw-r--r--mblock/src/lib/mb_msg_accepter_msgq.h39
-rw-r--r--mblock/src/lib/mb_msg_queue.cc19
-rw-r--r--mblock/src/lib/mb_msg_queue.h14
-rw-r--r--mblock/src/lib/mb_port.cc3
-rw-r--r--mblock/src/lib/mb_port.h12
-rw-r--r--mblock/src/lib/mb_port_simple.cc23
-rw-r--r--mblock/src/lib/mb_port_simple.h10
-rw-r--r--mblock/src/lib/mb_protocol_class.cc23
-rw-r--r--mblock/src/lib/mb_protocol_class.h10
-rw-r--r--mblock/src/lib/mb_runtime.cc5
-rw-r--r--mblock/src/lib/mb_runtime.h34
-rw-r--r--mblock/src/lib/mb_runtime_base.cc (renamed from mblock/src/lib/mb_runtime_placeholder.cc)40
-rw-r--r--mblock/src/lib/mb_runtime_base.h78
-rw-r--r--mblock/src/lib/mb_runtime_nop.cc32
-rw-r--r--mblock/src/lib/mb_runtime_nop.h16
-rw-r--r--mblock/src/lib/mb_runtime_thread_per_block.cc349
-rw-r--r--mblock/src/lib/mb_runtime_thread_per_block.h84
-rw-r--r--mblock/src/lib/mb_time.cc84
-rw-r--r--mblock/src/lib/mb_time.h89
-rw-r--r--mblock/src/lib/mb_timer_queue.cc63
-rw-r--r--mblock/src/lib/mb_timer_queue.h73
-rw-r--r--mblock/src/lib/mb_worker.cc178
-rw-r--r--mblock/src/lib/mb_worker.h106
-rw-r--r--mblock/src/lib/mbi_runtime_lock.h4
-rw-r--r--mblock/src/lib/qa_bitset.cc493
-rw-r--r--mblock/src/lib/qa_bitset.mbh61
-rw-r--r--mblock/src/lib/qa_disconnect.cc238
-rw-r--r--mblock/src/lib/qa_mblock.cc4
-rw-r--r--mblock/src/lib/qa_mblock_prims.cc128
-rw-r--r--mblock/src/lib/qa_mblock_send.cc92
-rw-r--r--mblock/src/lib/qa_mblock_sys.cc271
-rw-r--r--mblock/src/lib/qa_mblock_sys.h45
-rw-r--r--mblock/src/lib/qa_timeouts.cc292
-rw-r--r--mblock/src/lib/qa_timeouts.h43
-rw-r--r--mblock/src/scheme/Makefile.am21
-rw-r--r--mblock/src/scheme/gnuradio/Makefile.am19
-rwxr-xr-xmblock/src/scheme/gnuradio/compile-mbh.scm231
55 files changed, 3840 insertions, 271 deletions
diff --git a/mblock/README b/mblock/README
index ca91edfaa..130073afe 100644
--- a/mblock/README
+++ b/mblock/README
@@ -1,5 +1,5 @@
#
-# Copyright 2006 Free Software Foundation, Inc.
+# Copyright 2006,2007 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -20,3 +20,5 @@
#
The "Message block" implementation, a work in progress...
+
+http://gnuradio.org/trac/wiki/MessageBlocks
diff --git a/mblock/src/Makefile.am b/mblock/src/Makefile.am
index bcbab2455..c6c2fcc75 100644
--- a/mblock/src/Makefile.am
+++ b/mblock/src/Makefile.am
@@ -19,4 +19,4 @@
# Boston, MA 02110-1301, USA.
#
-SUBDIRS = lib
+SUBDIRS = lib scheme
diff --git a/mblock/src/lib/Makefile.am b/mblock/src/lib/Makefile.am
index 6273e16d2..604f97246 100644
--- a/mblock/src/lib/Makefile.am
+++ b/mblock/src/lib/Makefile.am
@@ -28,28 +28,41 @@ TESTS = test_mblock
lib_LTLIBRARIES = libmblock.la libmblock-qa.la
EXTRA_DIST = \
- README.locking
+ README.locking \
+ qa_bitset.mbh
+BUILT_SOURCES = \
+ qa_bitset_mbh.cc
+
+qa_bitset_mbh.cc : qa_bitset.mbh
+ $(COMPILE_MBH) qa_bitset.mbh qa_bitset_mbh.cc
+
# These are the source files that go into the mblock shared library
libmblock_la_SOURCES = \
+ mb_class_registry.cc \
mb_connection.cc \
mb_endpoint.cc \
mb_exception.cc \
+ mb_gettid.cc \
mb_mblock.cc \
mb_mblock_impl.cc \
mb_message.cc \
mb_msg_accepter.cc \
+ mb_msg_accepter_msgq.cc \
mb_msg_accepter_smp.cc \
mb_msg_queue.cc \
mb_port.cc \
mb_port_simple.cc \
mb_protocol_class.cc \
mb_runtime.cc \
+ mb_runtime_base.cc \
mb_runtime_nop.cc \
- mb_runtime_placeholder.cc \
- mb_runtime_thread_per_mblock.cc \
- mb_util.cc
+ mb_runtime_thread_per_block.cc \
+ mb_time.cc \
+ mb_timer_queue.cc \
+ mb_util.cc \
+ mb_worker.cc
# magic flags
@@ -62,19 +75,20 @@ libmblock_la_LIBADD = \
-lstdc++
include_HEADERS = \
+ mb_class_registry.h \
mb_common.h \
mb_exception.h \
+ mb_gettid.h \
mb_mblock.h \
mb_message.h \
mb_msg_accepter.h \
+ mb_msg_accepter_msgq.h \
mb_msg_queue.h \
mb_port.h \
mb_port_simple.h \
mb_protocol_class.h \
mb_runtime.h \
- mb_runtime_nop.h \
- mb_runtime_placeholder.h \
- mb_runtime_thread_per_mblock.h \
+ mb_time.h \
mb_util.h
@@ -83,18 +97,30 @@ noinst_HEADERS = \
mb_endpoint.h \
mb_mblock_impl.h \
mb_msg_accepter_smp.h \
+ mb_runtime_base.h \
+ mb_runtime_nop.h \
+ mb_runtime_thread_per_block.h \
+ mb_timer_queue.h \
+ mb_worker.h \
mbi_runtime_lock.h \
qa_mblock.h \
qa_mblock_prims.h \
- qa_mblock_send.h
+ qa_mblock_send.h \
+ qa_mblock_sys.h \
+ qa_timeouts.h
# Build the qa code into its own library
libmblock_qa_la_SOURCES = \
+ qa_bitset.cc \
+ qa_bitset_mbh.cc \
+ qa_disconnect.cc \
qa_mblock.cc \
qa_mblock_prims.cc \
- qa_mblock_send.cc
+ qa_mblock_send.cc \
+ qa_mblock_sys.cc \
+ qa_timeouts.cc
# magic flags
@@ -107,9 +133,14 @@ libmblock_qa_la_LIBADD = \
-lstdc++
-noinst_PROGRAMS = test_mblock
+noinst_PROGRAMS = \
+ test_mblock \
+ benchmark_send
test_mblock_SOURCES = test_mblock.cc
test_mblock_LDADD = libmblock-qa.la
+benchmark_send_SOURCES = benchmark_send.cc
+benchmark_send_LDADD = libmblock-qa.la
+
CLEANFILES = $(BUILT_SOURCES) *.pyc
diff --git a/mblock/src/lib/mb_runtime_placeholder.h b/mblock/src/lib/benchmark_send.cc
index b55d39f94..c9ebe57c3 100644
--- a/mblock/src/lib/mb_runtime_placeholder.h
+++ b/mblock/src/lib/benchmark_send.cc
@@ -18,33 +18,28 @@
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef INCLUDED_MB_RUNTIME_PLACEHOLDER_H
-#define INCLUDED_MB_RUNTIME_PLACEHOLDER_H
#include <mb_runtime.h>
+#include <iostream>
-/*!
- * \brief Concrete runtime that serves as a placeholder until the real
- * runtime is known.
- *
- * The singleton instance of this class is installed in the d_runtime
- * instance variable of each mb_mblock_impl at construction time.
- * Having a valid instance of runtime removes the "pre runtime::run"
- * corner case, and allows us to lock and unlock the big runtime lock
- * even though there's no "real runtime" yet.
- */
-class mb_runtime_placeholder : public mb_runtime
+int
+main(int argc, char **argv)
{
+ mb_runtime_sptr rt = mb_make_runtime();
+ pmt_t result = PMT_NIL;
+
+ long nmsgs = 1000000;
+ long batch_size = 100;
-public:
- mb_runtime_placeholder();
- ~mb_runtime_placeholder();
+ pmt_t arg = pmt_list2(pmt_from_long(nmsgs), // # of messages to send through pipe
+ pmt_from_long(batch_size));
- //! throws mbe_not_implemented
- bool run(mb_mblock_sptr top);
+ rt->run("top", "qa_bitset_top", arg, &result);
- //! Return the placeholder singleton
- static mb_runtime *singleton();
-};
+ if (!pmt_equal(PMT_T, result)){
+ std::cerr << "benchmark_send: incorrect result";
+ return 1;
+ }
-#endif /* INCLUDED_MB_RUNTIME_PLACEHOLDER_H */
+ return 0;
+}
diff --git a/mblock/src/lib/getres.cc b/mblock/src/lib/getres.cc
new file mode 100644
index 000000000..c05ba792a
--- /dev/null
+++ b/mblock/src/lib/getres.cc
@@ -0,0 +1,32 @@
+#include <time.h>
+#include <stdio.h>
+
+int
+main(int argc, char **argv)
+{
+ bool ok = true;
+ struct timespec ts;
+ int r;
+
+ r = clock_getres(CLOCK_REALTIME, &ts);
+ if (r != 0){
+ perror("clock_getres(CLOCK_REALTIME, ...)");
+ ok = false;
+ }
+ else
+ printf("clock_getres(CLOCK_REALTIME, ...) => %11.9f\n",
+ (double) ts.tv_sec + ts.tv_nsec * 1e-9);
+
+
+ r = clock_getres(CLOCK_MONOTONIC, &ts);
+ if (r != 0){
+ perror("clock_getres(CLOCK_MONOTONIC, ...");
+ ok = false;
+ }
+ else
+ printf("clock_getres(CLOCK_MONOTONIC, ...) => %11.9f\n",
+ (double) ts.tv_sec + ts.tv_nsec * 1e-9);
+
+
+ return ok == true ? 0 : 1;
+}
diff --git a/mblock/src/lib/mb_runtime_thread_per_mblock.cc b/mblock/src/lib/mb_class_registry.cc
index 925dcdc45..9eee93618 100644
--- a/mblock/src/lib/mb_runtime_thread_per_mblock.cc
+++ b/mblock/src/lib/mb_class_registry.cc
@@ -22,44 +22,26 @@
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
-#include <mb_runtime_thread_per_mblock.h>
-#include <mb_mblock.h>
-#include <mb_mblock_impl.h>
+#include <mb_class_registry.h>
+#include <map>
+static std::map<std::string, mb_mblock_maker_t> s_registry;
-mb_runtime_thread_per_mblock::mb_runtime_thread_per_mblock()
-{
- // nop for now
-}
-
-mb_runtime_thread_per_mblock::~mb_runtime_thread_per_mblock()
+bool
+mb_class_registry::register_maker(const std::string &name, mb_mblock_maker_t maker)
{
- // nop for now
+ s_registry[name] = maker;
+ return true;
}
bool
-mb_runtime_thread_per_mblock::run(mb_mblock_sptr top)
+mb_class_registry::lookup_maker(const std::string &name, mb_mblock_maker_t *maker)
{
- class initial_visitor : public mb_visitor
- {
- mb_runtime *d_rt;
-
- public:
- initial_visitor(mb_runtime *rt) : d_rt(rt) {}
- bool operator()(mb_mblock *mblock, const std::string &path)
- {
- mblock->impl()->set_runtime(d_rt);
- mblock->set_instance_name(path);
- mblock->init_fsm();
- return true;
- }
- };
-
- initial_visitor visitor(this);
-
- d_top = top; // remember top of tree
-
- d_top->walk_tree(&visitor);
+ if (s_registry.count(name) == 0){ // not registered
+ *maker = (mb_mblock_maker_t) 0;
+ return false;
+ }
+ *maker = s_registry[name];
return true;
}
diff --git a/mblock/src/lib/mb_class_registry.h b/mblock/src/lib/mb_class_registry.h
new file mode 100644
index 000000000..b6e63d38b
--- /dev/null
+++ b/mblock/src/lib/mb_class_registry.h
@@ -0,0 +1,51 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_MB_CLASS_REGISTRY_H
+#define INCLUDED_MB_CLASS_REGISTRY_H
+
+#include <mb_common.h>
+
+//! conceptually, pointer to constructor
+typedef mb_mblock_sptr (*mb_mblock_maker_t)(mb_runtime *runtime,
+ const std::string &instance_name,
+ pmt_t user_arg);
+
+/*
+ * \brief Maintain mapping between mblock class_name and factory (maker)
+ */
+class mb_class_registry : public boost::noncopyable {
+public:
+ static bool register_maker(const std::string &name, mb_mblock_maker_t maker);
+ static bool lookup_maker(const std::string &name, mb_mblock_maker_t *maker);
+};
+
+template<class mblock>
+mb_mblock_sptr mb_mblock_maker(mb_runtime *runtime,
+ const std::string &instance_name,
+ pmt_t user_arg)
+{
+ return mb_mblock_sptr(new mblock(runtime, instance_name, user_arg));
+}
+
+#define REGISTER_MBLOCK_CLASS(name) \
+ bool __RBC__ ## name = mb_class_registry::register_maker(#name, &mb_mblock_maker<name>)
+
+#endif /* INCLUDED_MB_CLASS_REGISTRY_H */
diff --git a/mblock/src/lib/mb_exception.cc b/mblock/src/lib/mb_exception.cc
index 4d4ca70b1..23dfbd9fe 100644
--- a/mblock/src/lib/mb_exception.cc
+++ b/mblock/src/lib/mb_exception.cc
@@ -38,6 +38,11 @@ mbe_not_implemented::mbe_not_implemented(mb_mblock *mb, const std::string &msg)
{
}
+mbe_no_such_class::mbe_no_such_class(mb_mblock *mb, const std::string &class_name)
+ : mbe_base(mb, "No such class: " + class_name)
+{
+}
+
mbe_no_such_component::mbe_no_such_component(mb_mblock *mb, const std::string &component_name)
: mbe_base(mb, "No such component: " + component_name)
{
@@ -85,3 +90,17 @@ mbe_invalid_port_type::mbe_invalid_port_type(mb_mblock *mb,
: mbe_base(mb, "Invalid port type for connection: " + mb_util::join_names(comp_name, port_name))
{
}
+
+mbe_mblock_failed::mbe_mblock_failed(mb_mblock *mb,
+ const std::string &msg)
+ : mbe_base(mb, "Message block failed: " + msg)
+{
+}
+
+mbe_terminate::mbe_terminate()
+{
+}
+
+mbe_exit::mbe_exit()
+{
+}
diff --git a/mblock/src/lib/mb_exception.h b/mblock/src/lib/mb_exception.h
index 40abd1c96..183f7089c 100644
--- a/mblock/src/lib/mb_exception.h
+++ b/mblock/src/lib/mb_exception.h
@@ -38,6 +38,11 @@ public:
mbe_not_implemented(mb_mblock *mb, const std::string &msg);
};
+class mbe_no_such_class : public mbe_base
+{
+public:
+ mbe_no_such_class(mb_mblock *, const std::string &class_name);
+};
class mbe_no_such_component : public mbe_base
{
@@ -57,6 +62,7 @@ public:
mbe_no_such_port(mb_mblock *, const std::string &port_name);
};
+
class mbe_duplicate_port : public mbe_base
{
public:
@@ -87,6 +93,26 @@ public:
const std::string &port_name);
};
+class mbe_mblock_failed : public mbe_base
+{
+public:
+ mbe_mblock_failed(mb_mblock *, const std::string &msg);
+};
+
+// not derived from mbe_base to simplify try/catch
+class mbe_terminate
+{
+public:
+ mbe_terminate();
+};
+
+// not derived from mbe_base to simplify try/catch
+class mbe_exit
+{
+public:
+ mbe_exit();
+};
+
#endif /* INCLUDED_MB_EXCEPTION_H */
diff --git a/mblock/src/lib/mb_gettid.cc b/mblock/src/lib/mb_gettid.cc
new file mode 100644
index 000000000..4f6b3a567
--- /dev/null
+++ b/mblock/src/lib/mb_gettid.cc
@@ -0,0 +1,53 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <mb_gettid.h>
+
+#define NEED_STUB
+
+#if defined(HAVE_SYS_SYSCALL_H) && defined(HAVE_UNISTD_H)
+
+#include <sys/syscall.h>
+#include <unistd.h>
+
+#if defined(SYS_gettid)
+#undef NEED_STUB
+
+int mb_gettid()
+{
+ return syscall(SYS_gettid);
+}
+
+#endif
+#endif
+
+#if defined(NEED_STUB)
+
+int
+mb_gettid()
+{
+ return 0;
+}
+
+#endif
diff --git a/mblock/src/lib/mb_runtime_thread_per_mblock.h b/mblock/src/lib/mb_gettid.h
index cef756ecd..d06276f56 100644
--- a/mblock/src/lib/mb_runtime_thread_per_mblock.h
+++ b/mblock/src/lib/mb_gettid.h
@@ -18,23 +18,9 @@
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef INCLUDED_MB_RUNTIME_THREAD_PER_MBLOCK_H
-#define INCLUDED_MB_RUNTIME_THREAD_PER_MBLOCK_H
-
-#include <mb_runtime.h>
/*!
- * \brief Concrete runtime that uses a single thread for all work.
+ * \brief Return Linux taskid, or 0 if not available
*/
-class mb_runtime_thread_per_mblock : public mb_runtime
-{
- mb_mblock_sptr d_top; // top mblock
-
-public:
- mb_runtime_thread_per_mblock();
- ~mb_runtime_thread_per_mblock();
-
- bool run(mb_mblock_sptr top);
-};
+int mb_gettid();
-#endif /* INCLUDED_MB_RUNTIME_THREAD_PER_MBLOCK_H */
diff --git a/mblock/src/lib/mb_mblock.cc b/mblock/src/lib/mb_mblock.cc
index 9b8658f21..afff1a928 100644
--- a/mblock/src/lib/mb_mblock.cc
+++ b/mblock/src/lib/mb_mblock.cc
@@ -25,29 +25,36 @@
#include <mb_mblock.h>
#include <mb_mblock_impl.h>
+#include <mb_runtime.h>
+#include <mb_exception.h>
+#include <iostream>
+static pmt_t s_sys_port = pmt_intern("%sys-port");
+static pmt_t s_halt = pmt_intern("%halt");
+
mb_visitor::~mb_visitor()
{
// nop base case for virtual destructor.
}
-mb_mblock::mb_mblock()
- : d_impl(mb_mblock_impl_sptr(new mb_mblock_impl(this)))
+mb_mblock::mb_mblock(mb_runtime *runtime,
+ const std::string &instance_name,
+ pmt_t user_arg)
+ : d_impl(mb_mblock_impl_sptr(
+ new mb_mblock_impl(dynamic_cast<mb_runtime_base*>(runtime),
+ this, instance_name)))
{
}
mb_mblock::~mb_mblock()
{
- disconnect_all();
-
- // FIXME more?
}
void
-mb_mblock::init_fsm()
+mb_mblock::initial_transition()
{
// default implementation does nothing
}
@@ -58,6 +65,36 @@ mb_mblock::handle_message(mb_message_sptr msg)
// default implementation does nothing
}
+
+void
+mb_mblock::main_loop()
+{
+ while (1){
+ mb_message_sptr msg;
+ try {
+ while (1){
+ msg = impl()->msgq().get_highest_pri_msg();
+
+ // check for %halt from %sys-port
+ if (pmt_eq(msg->port_id(), s_sys_port) && pmt_eq(msg->signal(), s_halt))
+ exit();
+
+ handle_message(msg);
+ }
+ }
+ catch (pmt_exception e){
+ std::cerr << "\nmb_mblock::main_loop: ignored pmt_exception: "
+ << e.what()
+ << "\nin mblock instance \"" << instance_name()
+ << "\" while handling message:"
+ << "\n port_id = " << msg->port_id()
+ << "\n signal = " << msg->signal()
+ << "\n data = " << msg->data()
+ << "\n metatdata = " << msg->metadata() << std::endl;
+ }
+ }
+}
+
////////////////////////////////////////////////////////////////////////
// Forward other methods to implementation class //
////////////////////////////////////////////////////////////////////////
@@ -74,9 +111,11 @@ mb_mblock::define_port(const std::string &port_name_string,
void
mb_mblock::define_component(const std::string &component_name,
- mb_mblock_sptr component)
+ const std::string &class_name,
+ pmt_t user_arg)
+
{
- d_impl->define_component(component_name, component);
+ d_impl->define_component(component_name, class_name, user_arg);
}
void
@@ -97,7 +136,7 @@ mb_mblock::disconnect(const std::string &comp_name1, const std::string &port_nam
}
void
-mb_mblock::disconnect_component(const std::string component_name)
+mb_mblock::disconnect_component(const std::string &component_name)
{
d_impl->disconnect_component(component_name);
}
@@ -115,9 +154,9 @@ mb_mblock::nconnections() const
}
bool
-mb_mblock::walk_tree(mb_visitor *visitor, const std::string &path)
+mb_mblock::walk_tree(mb_visitor *visitor)
{
- return d_impl->walk_tree(visitor, path);
+ return d_impl->walk_tree(visitor);
}
std::string
@@ -127,7 +166,7 @@ mb_mblock::instance_name() const
}
void
-mb_mblock::set_instance_name(const std::string name)
+mb_mblock::set_instance_name(const std::string &name)
{
d_impl->set_instance_name(name);
}
@@ -139,7 +178,7 @@ mb_mblock::class_name() const
}
void
-mb_mblock::set_class_name(const std::string name)
+mb_mblock::set_class_name(const std::string &name)
{
d_impl->set_class_name(name);
}
@@ -149,3 +188,42 @@ mb_mblock::parent() const
{
return d_impl->mblock_parent();
}
+
+void
+mb_mblock::exit()
+{
+ throw mbe_exit(); // adios...
+}
+
+void
+mb_mblock::shutdown_all(pmt_t result)
+{
+ d_impl->runtime()->request_shutdown(result);
+}
+
+pmt_t
+mb_mblock::schedule_one_shot_timeout(const mb_time &abs_time, pmt_t user_data)
+{
+ mb_msg_accepter_sptr accepter = impl()->make_accepter(s_sys_port);
+ return d_impl->runtime()->schedule_one_shot_timeout(abs_time, user_data,
+ accepter);
+}
+
+pmt_t
+mb_mblock::schedule_periodic_timeout(const mb_time &first_abs_time,
+ const mb_time &delta_time,
+ pmt_t user_data)
+{
+ mb_msg_accepter_sptr accepter = impl()->make_accepter(s_sys_port);
+ return d_impl->runtime()->schedule_periodic_timeout(first_abs_time,
+ delta_time,
+ user_data,
+ accepter);
+}
+
+void
+mb_mblock::cancel_timeout(pmt_t handle)
+{
+ d_impl->runtime()->cancel_timeout(handle);
+}
+
diff --git a/mblock/src/lib/mb_mblock.h b/mblock/src/lib/mb_mblock.h
index 594920f91..6d471c3db 100644
--- a/mblock/src/lib/mb_mblock.h
+++ b/mblock/src/lib/mb_mblock.h
@@ -24,6 +24,7 @@
#include <mb_common.h>
#include <mb_message.h>
#include <mb_port.h>
+#include <mb_time.h>
/*!
@@ -34,7 +35,7 @@ class mb_visitor
{
public:
virtual ~mb_visitor();
- virtual bool operator()(mb_mblock *mblock, const std::string &path) = 0;
+ virtual bool operator()(mb_mblock *mblock) = 0;
};
// ----------------------------------------------------------------------
@@ -52,6 +53,7 @@ private:
friend class mb_runtime;
friend class mb_mblock_impl;
+ friend class mb_worker;
protected:
/*!
@@ -59,22 +61,29 @@ protected:
*
* Initializing all mblocks in the system is a 3 step procedure.
*
- * The top level mblock's constructor is run. That constructor (a)
- * registers all of its ports using define_port, (b) constructs and
- * registers any subcomponents it may have via the define_component
- * method, and then (c) issues connect calls to wire its
- * subcomponents together.
+ * The top level mblock's constructor is run. That constructor
+ * (a) registers all of its ports using define_port, (b) registers any
+ * subcomponents it may have via the define_component method, and
+ * then (c) issues connect calls to wire its subcomponents together.
+ *
+ * \param runtime the runtime associated with this mblock
+ * \param instance_name specify the name of this instance
+ * (for debugging, NUMA mapping, etc)
+ * \param user_arg argument passed by user to constructor
+ * (ignored by the mb_mblock base class)
*/
- mb_mblock();
+ mb_mblock(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
public:
/*!
* \brief Called by the runtime system to execute the initial
* transition of the finite state machine.
*
- * Override this to initialize your finite state machine.
+ * This method is called by the runtime after all blocks are
+ * constructed and before the first message is delivered. Override
+ * this to initialize your finite state machine.
*/
- virtual void init_fsm();
+ virtual void initial_transition();
protected:
/*!
@@ -113,11 +122,13 @@ protected:
* names and identities of our sub-component mblocks.
*
* \param component_name The name of the sub-component (must be unique with this mblock).
- * \param component The sub-component instance.
+ * \param class_name The class of the instance that is to be created.
+ * \param user_arg The argument to pass to the constructor of the component.
*/
void
define_component(const std::string &component_name,
- mb_mblock_sptr component);
+ const std::string &class_name,
+ pmt_t user_arg = PMT_NIL);
/*!
* \brief connect endpoint_1 to endpoint_2
@@ -160,7 +171,7 @@ protected:
* \param component_name component to disconnect
*/
void
- disconnect_component(const std::string component_name);
+ disconnect_component(const std::string &component_name);
/*!
* \brief disconnect all connections to all components
@@ -175,8 +186,47 @@ protected:
nconnections() const;
//! Set the class name
- void set_class_name(const std::string name);
+ void set_class_name(const std::string &name);
+
+ /*!
+ * \brief Tell runtime that we are done.
+ *
+ * This method does not return.
+ */
+ void exit();
+ /*!
+ * \brief Ask runtime to execute the shutdown procedure for all blocks.
+ *
+ * \param result sets value of \p result output argument of runtime->run(...)
+ *
+ * The runtime first sends a maximum priority %shutdown message to
+ * all blocks. All blocks should handle the %shutdown message,
+ * perform whatever clean up is required, and call this->exit();
+ *
+ * After a period of time (~100ms), any blocks which haven't yet
+ * called this->exit() are sent a maximum priority %halt message.
+ * %halt is detected in main_loop, and this->exit() is called.
+ *
+ * After an additional period of time (~100ms), any blocks which
+ * still haven't yet called this->exit() are sent a SIG<FOO> (TBD)
+ * signal, which will blow them out of any blocking system calls and
+ * raise an mbe_terminate exception. The default top-level
+ * runtime-provided exception handler will call this->exit() to
+ * finish the process.
+ *
+ * runtime->run(...) returns when all blocks have called exit.
+ */
+ void shutdown_all(pmt_t result);
+
+ /*!
+ * \brief main event dispatching loop
+ *
+ * Although it is possible to override this, the default implementation
+ * should work for virtually all cases.
+ */
+ virtual void main_loop();
+
public:
virtual ~mb_mblock();
@@ -187,18 +237,74 @@ public:
std::string class_name() const;
//! Set the instance name of this block.
- void set_instance_name(const std::string name);
+ void set_instance_name(const std::string &name);
//! Return the parent of this mblock, or 0 if we're the top-level block.
mb_mblock *parent() const;
/*!
+ * \brief Schedule a "one shot" timeout.
+ *
+ * \param abs_time the absolute time at which the timeout should fire
+ * \param user_data the data passed in the %timeout message.
+ *
+ * When the timeout fires, a message will be sent to the mblock.
+ *
+ * The message will have port_id = %sys-port, signal = %timeout,
+ * data = user_data, metadata = the handle returned from
+ * schedule_one_shot_timeout, pri = MB_PRI_BEST.
+ *
+ * \returns a handle that can be used in cancel_timeout, and is passed
+ * as the metadata field of the generated %timeout message.
+ *
+ * To cancel a pending timeout, call cancel_timeout.
+ */
+ pmt_t
+ schedule_one_shot_timeout(const mb_time &abs_time, pmt_t user_data);
+
+ /*!
+ * \brief Schedule a periodic timeout.
+ *
+ * \param first_abs_time The absolute time at which the first timeout should fire.
+ * \param delta_time The relative delay between the first and successive timeouts.
+ * \param user_data the data passed in the %timeout message.
+ *
+ * When the timeout fires, a message will be sent to the mblock, and a
+ * new timeout will be scheduled for previous absolute time + delta_time.
+ *
+ * The message will have port_id = %sys-port, signal = %timeout,
+ * data = user_data, metadata = the handle returned from
+ * schedule_one_shot_timeout, pri = MB_PRI_BEST.
+ *
+ * \returns a handle that can be used in cancel_timeout, and is passed
+ * as the metadata field of the generated %timeout message.
+ *
+ * To cancel a pending timeout, call cancel_timeout.
+ */
+ pmt_t
+ schedule_periodic_timeout(const mb_time &first_abs_time,
+ const mb_time &delta_time,
+ pmt_t user_data);
+
+ /*!
+ * \brief Attempt to cancel a pending timeout.
+ *
+ * Note that this only stops a future timeout from firing. It is
+ * possible that a timeout may have already fired and enqueued a
+ * %timeout message, but that that message has not yet been seen by
+ * handle_message.
+ *
+ * \param handle returned from schedule_one_shot_timeout or schedule_periodic_timeout.
+ */
+ void cancel_timeout(pmt_t handle);
+
+ /*!
* \brief Perform a pre-order depth-first traversal of the hierarchy.
*
* The traversal stops and returns false if any call to visitor returns false.
*/
bool
- walk_tree(mb_visitor *visitor, const std::string &path="top");
+ walk_tree(mb_visitor *visitor);
//! \implementation
diff --git a/mblock/src/lib/mb_mblock_impl.cc b/mblock/src/lib/mb_mblock_impl.cc
index 1a9e50146..02cbb548d 100644
--- a/mblock/src/lib/mb_mblock_impl.cc
+++ b/mblock/src/lib/mb_mblock_impl.cc
@@ -30,8 +30,8 @@
#include <mb_exception.h>
#include <mb_util.h>
#include <mb_msg_accepter_smp.h>
-#include <mb_runtime_placeholder.h>
#include <mbi_runtime_lock.h>
+#include <iostream>
static pmt_t s_self = pmt_intern("self");
@@ -52,9 +52,10 @@ mb_mblock_impl::comp_is_defined(const std::string &name)
////////////////////////////////////////////////////////////////////////
-mb_mblock_impl::mb_mblock_impl(mb_mblock *mb)
- : d_mb(mb), d_mb_parent(0), d_runtime(mb_runtime_placeholder::singleton()),
- d_instance_name("<unknown>"), d_class_name("mblock")
+mb_mblock_impl::mb_mblock_impl(mb_runtime_base *runtime, mb_mblock *mb,
+ const std::string &instance_name)
+ : d_runtime(runtime), d_mb(mb), d_mb_parent(0),
+ d_instance_name(instance_name), d_class_name("mblock")
{
}
@@ -85,15 +86,28 @@ mb_mblock_impl::define_port(const std::string &port_name,
void
mb_mblock_impl::define_component(const std::string &name,
- mb_mblock_sptr component)
+ const std::string &class_name,
+ pmt_t user_arg)
{
- mbi_runtime_lock l(this);
+ {
+ mbi_runtime_lock l(this);
+
+ if (comp_is_defined(name)) // check for duplicate name
+ throw mbe_duplicate_component(d_mb, name);
+ }
- if (comp_is_defined(name)) // check for duplicate name
- throw mbe_duplicate_component(d_mb, name);
+ // We ask the runtime to create the component so that it can worry about
+ // mblock placement on a NUMA machine or on a distributed multicomputer
- component->d_impl->d_mb_parent = d_mb; // set component's parent link
- d_comp_map[name] = component;
+ mb_mblock_sptr component =
+ d_runtime->create_component(instance_name() + "/" + name,
+ class_name, user_arg);
+ {
+ mbi_runtime_lock l(this);
+
+ component->d_impl->d_mb_parent = d_mb; // set component's parent link
+ d_comp_map[name] = component;
+ }
}
void
@@ -125,6 +139,7 @@ mb_mblock_impl::disconnect(const std::string &comp_name1,
mbi_runtime_lock l(this);
d_conn_table.disconnect(comp_name1, port_name1, comp_name2, port_name2);
+ invalidate_all_port_caches();
}
void
@@ -133,6 +148,7 @@ mb_mblock_impl::disconnect_component(const std::string component_name)
mbi_runtime_lock l(this);
d_conn_table.disconnect_component(component_name);
+ invalidate_all_port_caches();
}
void
@@ -141,6 +157,7 @@ mb_mblock_impl::disconnect_all()
mbi_runtime_lock l(this);
d_conn_table.disconnect_all();
+ invalidate_all_port_caches();
}
int
@@ -220,26 +237,25 @@ mb_mblock_impl::endpoints_are_compatible(const mb_endpoint &ep0,
}
bool
-mb_mblock_impl::walk_tree(mb_visitor *visitor, const std::string &path)
+mb_mblock_impl::walk_tree(mb_visitor *visitor)
{
- if (!(*visitor)(d_mb, path))
+ if (!(*visitor)(d_mb))
return false;
mb_comp_map_t::iterator it;
for (it = d_comp_map.begin(); it != d_comp_map.end(); ++it)
- if (!(it->second->walk_tree(visitor, path + "/" + it->first)))
+ if (!(it->second->walk_tree(visitor)))
return false;
return true;
}
mb_msg_accepter_sptr
-mb_mblock_impl::make_accepter(const std::string port_name)
+mb_mblock_impl::make_accepter(pmt_t port_name)
{
// FIXME this should probably use some kind of configurable factory
mb_msg_accepter *ma =
- new mb_msg_accepter_smp(d_mb->shared_from_this(),
- pmt_intern(port_name));
+ new mb_msg_accepter_smp(d_mb->shared_from_this(), port_name);
return mb_msg_accepter_sptr(ma);
}
@@ -281,3 +297,31 @@ mb_mblock_impl::set_class_name(const std::string &name)
d_class_name = name;
}
+/*
+ * This is the "Big Hammer" port cache invalidator.
+ * It invalidates _all_ of the port caches in the entire mblock tree.
+ * It's overkill, but was simple to code.
+ */
+void
+mb_mblock_impl::invalidate_all_port_caches()
+{
+ class invalidator : public mb_visitor
+ {
+ public:
+ bool operator()(mb_mblock *mblock)
+ {
+ mb_mblock_impl_sptr impl = mblock->impl();
+ mb_port_map_t::iterator it = impl->d_port_map.begin();
+ mb_port_map_t::iterator end = impl->d_port_map.end();
+ for (; it != end; ++it)
+ it->second->invalidate_cache();
+ return true;
+ }
+ };
+
+ invalidator visitor;
+
+ // Always true, except in early QA code
+ if (runtime()->top())
+ runtime()->top()->walk_tree(&visitor);
+}
diff --git a/mblock/src/lib/mb_mblock_impl.h b/mblock/src/lib/mb_mblock_impl.h
index fc0fa6943..03ad414ea 100644
--- a/mblock/src/lib/mb_mblock_impl.h
+++ b/mblock/src/lib/mb_mblock_impl.h
@@ -22,6 +22,7 @@
#define INCLUDED_MB_MBLOCK_IMPL_H
#include <mb_mblock.h>
+#include <mb_runtime_base.h>
#include <mb_connection.h>
#include <mb_msg_queue.h>
#include <list>
@@ -37,9 +38,9 @@ typedef std::map<std::string, mb_mblock_sptr> mb_comp_map_t;
*/
class mb_mblock_impl : boost::noncopyable
{
+ mb_runtime_base *d_runtime; // pointer to runtime
mb_mblock *d_mb; // pointer to our associated mblock
mb_mblock *d_mb_parent; // pointer to our parent
- mb_runtime *d_runtime; // pointer to runtime
std::string d_instance_name; // hierarchical name
std::string d_class_name; // name of this (derived) class
@@ -51,7 +52,8 @@ class mb_mblock_impl : boost::noncopyable
mb_msg_queue d_msgq; // incoming messages for us
public:
- mb_mblock_impl(mb_mblock *mb);
+ mb_mblock_impl(mb_runtime_base *runtime, mb_mblock *mb,
+ const std::string &instance_name);
~mb_mblock_impl();
/*!
@@ -79,11 +81,13 @@ public:
* names and identities of our sub-component mblocks.
*
* \param component_name The name of the sub-component (must be unique with this mblock).
- * \param component The sub-component instance.
+ * \param class_name The class of the instance that is to be created.
+ * \param user_arg The argument to pass to the constructor of the component.
*/
void
define_component(const std::string &component_name,
- mb_mblock_sptr component);
+ const std::string &class_name,
+ pmt_t user_arg);
/*!
* \brief connect endpoint_1 to endpoint_2
@@ -141,10 +145,10 @@ public:
nconnections();
bool
- walk_tree(mb_visitor *visitor, const std::string &path="");
+ walk_tree(mb_visitor *visitor);
mb_msg_accepter_sptr
- make_accepter(const std::string port_name);
+ make_accepter(pmt_t port_name);
mb_msg_queue &
msgq() { return d_msgq; }
@@ -183,10 +187,10 @@ public:
mb_mblock_sptr component(const std::string &comp_name);
//! Return the runtime instance
- mb_runtime *runtime() { return d_runtime; }
+ mb_runtime_base *runtime() { return d_runtime; }
//! Set the runtime instance
- void set_runtime(mb_runtime *runtime) { d_runtime = runtime; }
+ void set_runtime(mb_runtime_base *runtime) { d_runtime = runtime; }
/*
* Our implementation methods
@@ -210,6 +214,12 @@ private:
endpoints_are_compatible(const mb_endpoint &ep0,
const mb_endpoint &ep1);
+ /*!
+ * \brief walk mblock tree and invalidate all port resolution caches.
+ * \implementation
+ */
+ void
+ invalidate_all_port_caches();
};
diff --git a/mblock/src/lib/mb_message.cc b/mblock/src/lib/mb_message.cc
index 23803726a..14e349924 100644
--- a/mblock/src/lib/mb_message.cc
+++ b/mblock/src/lib/mb_message.cc
@@ -23,6 +23,34 @@
#include <config.h>
#endif
#include <mb_message.h>
+#include <stdio.h>
+#include <pmt_pool.h>
+
+static const int CACHE_LINE_SIZE = 64; // good guess
+
+
+#if MB_MESSAGE_LOCAL_ALLOCATOR
+
+static pmt_pool global_msg_pool(sizeof(mb_message), CACHE_LINE_SIZE);
+
+void *
+mb_message::operator new(size_t size)
+{
+ void *p = global_msg_pool.malloc();
+
+ // fprintf(stderr, "mb_message::new p = %p\n", p);
+ assert((reinterpret_cast<intptr_t>(p) & (CACHE_LINE_SIZE - 1)) == 0);
+ return p;
+}
+
+void
+mb_message::operator delete(void *p, size_t size)
+{
+ global_msg_pool.free(p);
+}
+
+#endif
+
mb_message_sptr
mb_make_message(pmt_t signal, pmt_t data, pmt_t metadata, mb_pri_t priority)
@@ -40,3 +68,16 @@ mb_message::~mb_message()
{
// NOP
}
+
+std::ostream&
+operator<<(std::ostream& os, const mb_message &msg)
+{
+ os << "<msg: signal=" << msg.signal()
+ << " port_id=" << msg.port_id()
+ << " data=" << msg.data()
+ << " metadata=" << msg.metadata()
+ << " pri=" << msg.priority()
+ << ">";
+
+ return os;
+}
diff --git a/mblock/src/lib/mb_message.h b/mblock/src/lib/mb_message.h
index 95440f8b7..6132866e2 100644
--- a/mblock/src/lib/mb_message.h
+++ b/mblock/src/lib/mb_message.h
@@ -22,6 +22,9 @@
#define INCLUDED_MB_MESSAGE_H
#include <mb_common.h>
+#include <iosfwd>
+
+#define MB_MESSAGE_LOCAL_ALLOCATOR 0 // define to 0 or 1
class mb_message;
typedef boost::shared_ptr<mb_message> mb_message_sptr;
@@ -66,6 +69,20 @@ public:
pmt_t port_id() const { return d_port_id; }
void set_port_id(pmt_t port_id){ d_port_id = port_id; }
+
+#if (MB_MESSAGE_LOCAL_ALLOCATOR)
+ void *operator new(size_t);
+ void operator delete(void *, size_t);
+#endif
};
+std::ostream& operator<<(std::ostream& os, const mb_message &msg);
+
+inline
+std::ostream& operator<<(std::ostream& os, const mb_message_sptr msg)
+{
+ os << *(msg.get());
+ return os;
+}
+
#endif /* INCLUDED_MB_MESSAGE_H */
diff --git a/mblock/src/lib/mb_msg_accepter_msgq.cc b/mblock/src/lib/mb_msg_accepter_msgq.cc
new file mode 100644
index 000000000..990491fcc
--- /dev/null
+++ b/mblock/src/lib/mb_msg_accepter_msgq.cc
@@ -0,0 +1,46 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <mb_msg_accepter_msgq.h>
+#include <mb_message.h>
+
+pmt_t s_sys_port = pmt_intern("%sys-port");
+
+mb_msg_accepter_msgq::mb_msg_accepter_msgq(mb_msg_queue *msgq)
+ : d_msgq(msgq)
+{
+}
+
+mb_msg_accepter_msgq::~mb_msg_accepter_msgq()
+{
+}
+
+void
+mb_msg_accepter_msgq::operator()(pmt_t signal, pmt_t data,
+ pmt_t metadata, mb_pri_t priority)
+{
+ mb_message_sptr msg = mb_make_message(signal, data, metadata, priority);
+ msg->set_port_id(s_sys_port);
+ d_msgq->insert(msg);
+}
diff --git a/mblock/src/lib/mb_msg_accepter_msgq.h b/mblock/src/lib/mb_msg_accepter_msgq.h
new file mode 100644
index 000000000..f598c7304
--- /dev/null
+++ b/mblock/src/lib/mb_msg_accepter_msgq.h
@@ -0,0 +1,39 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_MB_MSG_ACCEPTER_MSGQ_H
+#define INCLUDED_MB_MSG_ACCEPTER_MSGQ_H
+
+#include <mb_msg_accepter.h>
+#include <mb_msg_queue.h>
+
+/*!
+ * \brief Concrete class that accepts messages and inserts them into a message queue.
+ */
+class mb_msg_accepter_msgq : public mb_msg_accepter {
+ mb_msg_queue *d_msgq;
+
+public:
+ mb_msg_accepter_msgq(mb_msg_queue *msgq);
+ ~mb_msg_accepter_msgq();
+ void operator()(pmt_t signal, pmt_t data, pmt_t metadata, mb_pri_t priority);
+};
+
+#endif /* INCLUDED_MB_MSG_ACCEPTER_MSGQ_H */
diff --git a/mblock/src/lib/mb_msg_queue.cc b/mblock/src/lib/mb_msg_queue.cc
index 79e245ad6..c687e8760 100644
--- a/mblock/src/lib/mb_msg_queue.cc
+++ b/mblock/src/lib/mb_msg_queue.cc
@@ -83,7 +83,7 @@ mb_msg_queue::get_highest_pri_msg_helper()
}
}
- return mb_message_sptr(); // equivalent of a zero pointer
+ return mb_message_sptr(); // eqv to a zero pointer
}
@@ -109,3 +109,20 @@ mb_msg_queue::get_highest_pri_msg()
}
}
+mb_message_sptr
+mb_msg_queue::get_highest_pri_msg_timedwait(const mb_time &abs_time)
+{
+ unsigned long secs = abs_time.d_secs;
+ unsigned long nsecs = abs_time.d_nsecs;
+
+ omni_mutex_lock l(d_mutex);
+
+ while (1){
+ mb_message_sptr msg = get_highest_pri_msg_helper();
+ if (msg) // Got one; return it
+ return msg;
+
+ if (!d_not_empty.timedwait(secs, nsecs)) // timed out
+ return mb_message_sptr(); // eqv to zero pointer
+ }
+}
diff --git a/mblock/src/lib/mb_msg_queue.h b/mblock/src/lib/mb_msg_queue.h
index 7977b7d80..4f805ea8f 100644
--- a/mblock/src/lib/mb_msg_queue.h
+++ b/mblock/src/lib/mb_msg_queue.h
@@ -23,6 +23,7 @@
#include <mb_common.h>
#include <omnithread.h>
+#include <mb_time.h>
/*!
* \brief priority queue for mblock messages
@@ -63,6 +64,19 @@ public:
* If the queue is empty, this call blocks until it can return a message.
*/
mb_message_sptr get_highest_pri_msg();
+
+ /*
+ * \brief Delete highest pri message from the queue and return it.
+ * If the queue is empty, this call blocks until it can return a message
+ * or real-time exceeds the absolute time, abs_time.
+ *
+ * \param abs_time specifies the latest absolute time to wait until.
+ * \sa mb_time::time
+ *
+ * \returns a valid mb_message_sptr, or the equivalent of a zero pointer
+ * if the call timed out while waiting.
+ */
+ mb_message_sptr get_highest_pri_msg_timedwait(const mb_time &abs_time);
};
#endif /* INCLUDED_MB_MSG_QUEUE_H */
diff --git a/mblock/src/lib/mb_port.cc b/mblock/src/lib/mb_port.cc
index b265db2dc..28bb402d8 100644
--- a/mblock/src/lib/mb_port.cc
+++ b/mblock/src/lib/mb_port.cc
@@ -31,7 +31,8 @@ mb_port::mb_port(mb_mblock *mblock,
const std::string &protocol_class_name,
bool conjugated,
mb_port::port_type_t port_type)
- : d_port_name(port_name), d_conjugated(conjugated), d_port_type(port_type),
+ : d_port_name(port_name), d_port_symbol(pmt_intern(port_name)),
+ d_conjugated(conjugated), d_port_type(port_type),
d_mblock(mblock)
{
pmt_t pc = mb_protocol_class_lookup(pmt_intern(protocol_class_name));
diff --git a/mblock/src/lib/mb_port.h b/mblock/src/lib/mb_port.h
index 3c3e96368..3fdd25aff 100644
--- a/mblock/src/lib/mb_port.h
+++ b/mblock/src/lib/mb_port.h
@@ -40,6 +40,7 @@ public:
private:
std::string d_port_name;
+ pmt_t d_port_symbol; // the port_name as a pmt symbol
pmt_t d_protocol_class;
bool d_conjugated;
port_type_t d_port_type;
@@ -58,6 +59,7 @@ protected:
public:
std::string port_name() const { return d_port_name; }
+ pmt_t port_symbol() const { return d_port_symbol; }
pmt_t protocol_class() const { return d_protocol_class; }
bool conjugated() const { return d_conjugated; }
port_type_t port_type() const { return d_port_type; }
@@ -77,9 +79,15 @@ public:
*/
virtual void
send(pmt_t signal,
- pmt_t data = PMT_NIL,
- pmt_t metadata = PMT_NIL,
+ pmt_t data = PMT_F,
+ pmt_t metadata = PMT_F,
mb_pri_t priority = MB_PRI_DEFAULT) = 0;
+
+ /*
+ * \brief Invalidate any cached peer resolutions
+ * \implementation
+ */
+ virtual void invalidate_cache() = 0;
};
#endif /* INCLUDED_MB_PORT_H */
diff --git a/mblock/src/lib/mb_port_simple.cc b/mblock/src/lib/mb_port_simple.cc
index 80cb65efb..cbfb0f27f 100644
--- a/mblock/src/lib/mb_port_simple.cc
+++ b/mblock/src/lib/mb_port_simple.cc
@@ -37,7 +37,8 @@ mb_port_simple::mb_port_simple(mb_mblock *mblock,
const std::string &protocol_class_name,
bool conjugated,
mb_port::port_type_t port_type)
- : mb_port(mblock, port_name, protocol_class_name, conjugated, port_type)
+ : mb_port(mblock, port_name, protocol_class_name, conjugated, port_type),
+ d_cache_valid(false)
{
}
@@ -67,6 +68,9 @@ mb_port_simple::find_accepter(mb_port_simple *start)
mb_endpoint peer_ep;
mb_msg_accepter_sptr r;
+ if (start->d_cache_valid)
+ return start->d_cached_accepter;
+
mbi_runtime_lock l(p->mblock());
// Set up initial context.
@@ -78,6 +82,8 @@ mb_port_simple::find_accepter(mb_port_simple *start)
case mb_port::EXTERNAL: // binding is in parent's name space
context = p->mblock()->parent();
+ if (!context) // can't be bound if there's no parent
+ return mb_msg_accepter_sptr(); // not bound
break;
default:
@@ -97,7 +103,11 @@ mb_port_simple::find_accepter(mb_port_simple *start)
case mb_port::INTERNAL: // Terminate here.
case mb_port::EXTERNAL:
r = pp->make_accepter();
- // FIXME cache the result
+
+ // cache the result
+
+ start->d_cached_accepter = r;
+ start->d_cache_valid = true;
return r;
case mb_port::RELAY: // Traverse to other side of relay port.
@@ -130,5 +140,12 @@ mb_port_simple::find_accepter(mb_port_simple *start)
mb_msg_accepter_sptr
mb_port_simple::make_accepter()
{
- return d_mblock->impl()->make_accepter(port_name());
+ return d_mblock->impl()->make_accepter(port_symbol());
+}
+
+void
+mb_port_simple::invalidate_cache()
+{
+ d_cache_valid = false;
+ d_cached_accepter.reset();
}
diff --git a/mblock/src/lib/mb_port_simple.h b/mblock/src/lib/mb_port_simple.h
index 5cfbd3dc0..9bfe0eaf7 100644
--- a/mblock/src/lib/mb_port_simple.h
+++ b/mblock/src/lib/mb_port_simple.h
@@ -28,6 +28,9 @@
*/
class mb_port_simple : public mb_port
{
+ bool d_cache_valid;
+ mb_msg_accepter_sptr d_cached_accepter;
+
protected:
static mb_msg_accepter_sptr
find_accepter(mb_port_simple *start);
@@ -57,6 +60,13 @@ public:
pmt_t data = PMT_NIL,
pmt_t metadata = PMT_NIL,
mb_pri_t priority = MB_PRI_DEFAULT);
+
+ /*
+ * \brief Invalidate any cached peer resolutions
+ * \implementation
+ */
+ void invalidate_cache();
+
};
#endif /* INCLUDED_MB_PORT_SIMPLE_H */
diff --git a/mblock/src/lib/mb_protocol_class.cc b/mblock/src/lib/mb_protocol_class.cc
index f3eeb6035..a11017fa9 100644
--- a/mblock/src/lib/mb_protocol_class.cc
+++ b/mblock/src/lib/mb_protocol_class.cc
@@ -24,6 +24,7 @@
#endif
#include <mb_protocol_class.h>
+#include <iostream>
static pmt_t s_ALL_PROTOCOL_CLASSES = PMT_NIL;
@@ -80,3 +81,25 @@ mb_protocol_class_lookup(pmt_t name)
return PMT_NIL;
}
+
+mb_protocol_class_init::mb_protocol_class_init(const char *data, size_t len)
+{
+ std::stringbuf sb;
+ sb.str(std::string(data, len));
+
+ while (1){
+ pmt_t obj = pmt_deserialize(sb);
+
+ if (0){
+ pmt_write(obj, std::cout);
+ std::cout << std::endl;
+ }
+
+ if (pmt_is_eof_object(obj))
+ return;
+
+ mb_make_protocol_class(pmt_nth(0, obj), // protocol-class name
+ pmt_nth(1, obj), // list of incoming msg names
+ pmt_nth(2, obj)); // list of outgoing msg names
+ }
+}
diff --git a/mblock/src/lib/mb_protocol_class.h b/mblock/src/lib/mb_protocol_class.h
index f4382ada1..60b87709d 100644
--- a/mblock/src/lib/mb_protocol_class.h
+++ b/mblock/src/lib/mb_protocol_class.h
@@ -39,4 +39,14 @@ pmt_t mb_protocol_class_outgoing(pmt_t pc); //< return outgoing message set
pmt_t mb_protocol_class_lookup(pmt_t name); //< lookup an existing protocol class by name
+
+/*!
+ * \brief Initialize one or more protocol class from a serialized description.
+ * Used by machine generated code.
+ */
+class mb_protocol_class_init {
+public:
+ mb_protocol_class_init(const char *data, size_t len);
+};
+
#endif /* INCLUDED_MB_PROTOCOL_CLASS_H */
diff --git a/mblock/src/lib/mb_runtime.cc b/mblock/src/lib/mb_runtime.cc
index 34a0af358..7f3b78678 100644
--- a/mblock/src/lib/mb_runtime.cc
+++ b/mblock/src/lib/mb_runtime.cc
@@ -24,15 +24,16 @@
#endif
#include <mb_runtime.h>
-#include <mb_runtime_thread_per_mblock.h>
+#include <mb_runtime_thread_per_block.h>
mb_runtime_sptr
mb_make_runtime()
{
- return mb_runtime_sptr(new mb_runtime_thread_per_mblock());
+ return mb_runtime_sptr(new mb_runtime_thread_per_block());
}
mb_runtime::~mb_runtime()
{
// nop
}
+
diff --git a/mblock/src/lib/mb_runtime.h b/mblock/src/lib/mb_runtime.h
index 0929e30dc..a804af06f 100644
--- a/mblock/src/lib/mb_runtime.h
+++ b/mblock/src/lib/mb_runtime.h
@@ -37,40 +37,32 @@ mb_runtime_sptr mb_make_runtime();
class mb_runtime : boost::noncopyable,
public boost::enable_shared_from_this<mb_runtime>
{
- omni_mutex d_brl; // big runtime lock (avoid using this if possible...)
+protected:
+ mb_mblock_sptr d_top;
public:
mb_runtime(){}
virtual ~mb_runtime();
/*!
- * \brief Run the mblock hierarchy rooted at \p top
+ * \brief Construct and run the specified mblock hierarchy.
*
* This routine turns into the m-block scheduler, and
* blocks until the system is shutdown.
*
- * \param top top-level mblock
+ * \param name name of the top-level mblock (conventionally "top")
+ * \param class_name The class of the top-level mblock to create.
+ * \param user_arg The argument to pass to the top-level mblock constructor
+ *
* \returns true if the system ran successfully.
*/
- virtual bool run(mb_mblock_sptr top) = 0;
-
-
- // ----------------------------------------------------------------
- // Stuff from here down is really private to the implementation...
- // ----------------------------------------------------------------
-
- /*!
- * \brief lock the big runtime lock
- * \implementation
- */
- inline void lock() { d_brl.lock(); }
-
- /*!
- * \brief unlock the big runtime lock
- * \implementation
- */
- inline void unlock() { d_brl.unlock(); }
+ virtual bool run(const std::string &instance_name,
+ const std::string &class_name,
+ pmt_t user_arg,
+ pmt_t *result = 0) = 0;
+ // QA only...
+ mb_mblock_sptr top() { return d_top; }
};
#endif /* INCLUDED_MB_RUNTIME_H */
diff --git a/mblock/src/lib/mb_runtime_placeholder.cc b/mblock/src/lib/mb_runtime_base.cc
index 5ce7cc2ac..104b6afcc 100644
--- a/mblock/src/lib/mb_runtime_placeholder.cc
+++ b/mblock/src/lib/mb_runtime_base.cc
@@ -22,36 +22,36 @@
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
-#include <mb_runtime_placeholder.h>
-#include <mb_mblock.h>
-#include <mb_exception.h>
-
-
-static mb_runtime *s_singleton = 0;
+#include <mb_runtime_base.h>
+/*
+ * Default nop implementations...
+ */
-mb_runtime_placeholder::mb_runtime_placeholder()
+void
+mb_runtime_base::request_shutdown(pmt_t result)
{
- // nop
}
-mb_runtime_placeholder::~mb_runtime_placeholder()
+pmt_t
+mb_runtime_base::schedule_one_shot_timeout(const mb_time &abs_time,
+ pmt_t user_data,
+ mb_msg_accepter_sptr accepter)
{
- // nop
+ return PMT_F;
}
-bool
-mb_runtime_placeholder::run(mb_mblock_sptr top)
+pmt_t
+mb_runtime_base::schedule_periodic_timeout(const mb_time &first_abs_time,
+ const mb_time &delta_time,
+ pmt_t user_data,
+ mb_msg_accepter_sptr accepter)
{
- throw mbe_not_implemented(top.get(), "mb_runtime_placeholder::run");
+ return PMT_F;
}
-mb_runtime *
-mb_runtime_placeholder::singleton()
+void
+mb_runtime_base::cancel_timeout(pmt_t handle)
{
- if (s_singleton)
- return s_singleton;
-
- s_singleton = new mb_runtime_placeholder();
- return s_singleton;
}
+
diff --git a/mblock/src/lib/mb_runtime_base.h b/mblock/src/lib/mb_runtime_base.h
new file mode 100644
index 000000000..cb76e4503
--- /dev/null
+++ b/mblock/src/lib/mb_runtime_base.h
@@ -0,0 +1,78 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef INCLUDED_MB_RUNTIME_BASE_H
+#define INCLUDED_MB_RUNTIME_BASE_H
+
+#include <mb_runtime.h>
+#include <omnithread.h>
+#include <mb_time.h>
+
+/*
+ * \brief This is the runtime class used by the implementation.
+ */
+class mb_runtime_base : public mb_runtime
+{
+ omni_mutex d_brl; // big runtime lock (avoid using this if possible...)
+
+protected:
+ mb_msg_accepter_sptr d_accepter;
+
+public:
+
+ /*!
+ * \brief lock the big runtime lock
+ * \implementation
+ */
+ inline void lock() { d_brl.lock(); }
+
+ /*!
+ * \brief unlock the big runtime lock
+ * \implementation
+ */
+ inline void unlock() { d_brl.unlock(); }
+
+ virtual void request_shutdown(pmt_t result);
+
+ virtual mb_mblock_sptr
+ create_component(const std::string &instance_name,
+ const std::string &class_name,
+ pmt_t user_arg) = 0;
+
+ virtual pmt_t
+ schedule_one_shot_timeout(const mb_time &abs_time, pmt_t user_data,
+ mb_msg_accepter_sptr accepter);
+
+ virtual pmt_t
+ schedule_periodic_timeout(const mb_time &first_abs_time,
+ const mb_time &delta_time,
+ pmt_t user_data,
+ mb_msg_accepter_sptr accepter);
+ virtual void
+ cancel_timeout(pmt_t handle);
+
+ mb_msg_accepter_sptr
+ accepter() { return d_accepter; }
+
+};
+
+
+#endif /* INCLUDED_MB_RUNTIME_BASE_H */
diff --git a/mblock/src/lib/mb_runtime_nop.cc b/mblock/src/lib/mb_runtime_nop.cc
index a19f0793a..bcae1ebd5 100644
--- a/mblock/src/lib/mb_runtime_nop.cc
+++ b/mblock/src/lib/mb_runtime_nop.cc
@@ -24,6 +24,8 @@
#endif
#include <mb_runtime_nop.h>
#include <mb_mblock.h>
+#include <mb_class_registry.h>
+#include <mb_exception.h>
mb_runtime_sptr
mb_make_runtime_nop()
@@ -42,23 +44,41 @@ mb_runtime_nop::~mb_runtime_nop()
// nop for now
}
+
bool
-mb_runtime_nop::run(mb_mblock_sptr top)
+mb_runtime_nop::run(const std::string &instance_name,
+ const std::string &class_name,
+ pmt_t user_arg, pmt_t *result)
{
class initial_visitor : public mb_visitor
{
public:
- bool operator()(mb_mblock *mblock, const std::string &path)
+ bool operator()(mb_mblock *mblock)
{
- mblock->set_instance_name(path);
- mblock->init_fsm();
+ mblock->initial_transition();
return true;
}
};
- initial_visitor visitor;
+ initial_visitor visitor;
+
+ if (result)
+ *result = PMT_T;
- top->walk_tree(&visitor);
+ d_top = create_component(instance_name, class_name, user_arg);
+ d_top->walk_tree(&visitor);
return true;
}
+
+mb_mblock_sptr
+mb_runtime_nop::create_component(const std::string &instance_name,
+ const std::string &class_name,
+ pmt_t user_arg)
+{
+ mb_mblock_maker_t maker;
+ if (!mb_class_registry::lookup_maker(class_name, &maker))
+ throw mbe_no_such_class(0, class_name + " (in " + instance_name + ")");
+
+ return maker(this, instance_name, user_arg);
+}
diff --git a/mblock/src/lib/mb_runtime_nop.h b/mblock/src/lib/mb_runtime_nop.h
index dc7887991..d7fe105bd 100644
--- a/mblock/src/lib/mb_runtime_nop.h
+++ b/mblock/src/lib/mb_runtime_nop.h
@@ -21,7 +21,7 @@
#ifndef INCLUDED_MB_RUNTIME_NOP_H
#define INCLUDED_MB_RUNTIME_NOP_H
-#include <mb_runtime.h>
+#include <mb_runtime_base.h>
/*!
* \brief Public constructor (factory) for mb_runtime_nop objects.
@@ -31,14 +31,22 @@ mb_runtime_sptr mb_make_runtime_nop();
/*!
* \brief Concrete runtime that does nothing. Used only during early QA tests.
*/
-class mb_runtime_nop : public mb_runtime
+class mb_runtime_nop : public mb_runtime_base
{
-
public:
mb_runtime_nop();
~mb_runtime_nop();
- bool run(mb_mblock_sptr top);
+ bool run(const std::string &instance_name,
+ const std::string &class_name,
+ pmt_t user_arg,
+ pmt_t *result);
+
+protected:
+ mb_mblock_sptr
+ create_component(const std::string &instance_name,
+ const std::string &class_name,
+ pmt_t user_arg);
};
#endif /* INCLUDED_MB_RUNTIME_NOP_H */
diff --git a/mblock/src/lib/mb_runtime_thread_per_block.cc b/mblock/src/lib/mb_runtime_thread_per_block.cc
new file mode 100644
index 000000000..7ad87aa23
--- /dev/null
+++ b/mblock/src/lib/mb_runtime_thread_per_block.cc
@@ -0,0 +1,349 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <mb_runtime_thread_per_block.h>
+#include <mb_mblock.h>
+#include <mb_mblock_impl.h>
+#include <mb_class_registry.h>
+#include <mb_exception.h>
+#include <mb_worker.h>
+#include <omnithread.h>
+#include <iostream>
+#include <mb_msg_accepter_msgq.h>
+
+
+static pmt_t s_halt = pmt_intern("%halt");
+static pmt_t s_sys_port = pmt_intern("%sys-port");
+static pmt_t s_shutdown = pmt_intern("%shutdown");
+static pmt_t s_request_shutdown = pmt_intern("%request-shutdown");
+static pmt_t s_worker_state_changed = pmt_intern("%worker-state-changed");
+static pmt_t s_timeout = pmt_intern("%timeout");
+static pmt_t s_request_timeout = pmt_intern("%request-timeout");
+static pmt_t s_cancel_timeout = pmt_intern("%cancel-timeout");
+static pmt_t s_send_halt = pmt_intern("send-halt");
+static pmt_t s_exit_now = pmt_intern("exit-now");
+
+static void
+send_sys_msg(mb_msg_queue &msgq, pmt_t signal,
+ pmt_t data = PMT_F, pmt_t metadata = PMT_F,
+ mb_pri_t priority = MB_PRI_BEST)
+{
+ mb_message_sptr msg = mb_make_message(signal, data, metadata, priority);
+ msg->set_port_id(s_sys_port);
+ msgq.insert(msg);
+}
+
+
+mb_runtime_thread_per_block::mb_runtime_thread_per_block()
+ : d_shutdown_in_progress(false),
+ d_shutdown_result(PMT_T)
+{
+ d_accepter = mb_msg_accepter_sptr(new mb_msg_accepter_msgq(&d_msgq));
+}
+
+mb_runtime_thread_per_block::~mb_runtime_thread_per_block()
+{
+ // FIXME iterate over workers and ensure that they are dead.
+
+ if (!d_workers.empty())
+ std::cerr << "\nmb_runtime_thread_per_block: dtor (# workers = "
+ << d_workers.size() << ")\n";
+}
+
+void
+mb_runtime_thread_per_block::request_shutdown(pmt_t result)
+{
+ (*accepter())(s_request_shutdown, result, PMT_F, MB_PRI_BEST);
+}
+
+bool
+mb_runtime_thread_per_block::run(const std::string &instance_name,
+ const std::string &class_name,
+ pmt_t user_arg, pmt_t *result)
+{
+ if (result) // set it to something now, in case we throw
+ *result = PMT_F;
+
+ // reset the shutdown state
+ d_shutdown_in_progress = false;
+ d_shutdown_result = PMT_T;
+
+ assert(d_workers.empty());
+
+ while (!d_timer_queue.empty()) // ensure timer queue is empty
+ d_timer_queue.pop();
+
+ /*
+ * Create the top-level component, and recursively all of its
+ * subcomponents.
+ */
+ d_top = create_component(instance_name, class_name, user_arg);
+
+ try {
+ run_loop();
+ }
+ catch (...){
+ d_top.reset();
+ throw;
+ }
+
+ if (result)
+ *result = d_shutdown_result;
+
+ d_top.reset();
+ return true;
+}
+
+void
+mb_runtime_thread_per_block::run_loop()
+{
+ while (1){
+ mb_message_sptr msg;
+
+ if (d_timer_queue.empty()) // Any timeouts pending?
+ msg = d_msgq.get_highest_pri_msg(); // Nope. Block forever.
+
+ else {
+ mb_timeout_sptr to = d_timer_queue.top(); // Yep. Get earliest timeout.
+
+ // wait for a msg or the timeout...
+ msg = d_msgq.get_highest_pri_msg_timedwait(to->d_when);
+
+ if (!msg){ // We timed out.
+ d_timer_queue.pop(); // Remove timeout from timer queue.
+
+ // send the %timeout msg
+ (*to->d_accepter)(s_timeout, to->d_user_data, to->handle(), MB_PRI_BEST);
+
+ if (to->d_is_periodic){
+ to->d_when = to->d_when + to->d_delta; // update time of next firing
+ d_timer_queue.push(to); // push it back into the queue
+ }
+ continue;
+ }
+ }
+
+ pmt_t signal = msg->signal();
+
+ if (pmt_eq(signal, s_worker_state_changed)){ // %worker-state-changed
+ omni_mutex_lock l1(d_workers_mutex);
+ reap_dead_workers();
+ if (d_workers.empty()) // no work left to do...
+ return;
+ }
+ else if (pmt_eq(signal, s_request_shutdown)){ // %request-shutdown
+ if (!d_shutdown_in_progress){
+ d_shutdown_in_progress = true;
+ d_shutdown_result = msg->data();
+
+ // schedule a timeout for ourselves...
+ schedule_one_shot_timeout(mb_time::time(0.100), s_send_halt, d_accepter);
+ send_all_sys_msg(s_shutdown);
+ }
+ }
+ else if (pmt_eq(signal, s_request_timeout)){ // %request-timeout
+ mb_timeout_sptr to =
+ boost::any_cast<mb_timeout_sptr>(pmt_any_ref(msg->data()));
+ d_timer_queue.push(to);
+ }
+ else if (pmt_eq(signal, s_cancel_timeout)){ // %cancel-timeout
+ d_timer_queue.cancel(msg->data());
+ }
+ else if (pmt_eq(signal, s_timeout)
+ && pmt_eq(msg->data(), s_send_halt)){ // %timeout, send-halt
+
+ // schedule another timeout for ourselves...
+ schedule_one_shot_timeout(mb_time::time(0.100), s_exit_now, d_accepter);
+ send_all_sys_msg(s_halt);
+ }
+ else if (pmt_eq(signal, s_timeout)
+ && pmt_eq(msg->data(), s_exit_now)){ // %timeout, exit-now
+
+ // We only get here if we've sent all workers %shutdown followed
+ // by %halt, and one or more of them is still alive. They must
+ // be blocked in the kernel. FIXME We could add one more step:
+ // pthread_kill(...) but for now, we'll just ignore them...
+ return;
+ }
+ else {
+ std::cerr << "mb_runtime_thread_per_block: unhandled msg: " << msg << std::endl;
+ }
+ }
+}
+
+void
+mb_runtime_thread_per_block::reap_dead_workers()
+{
+ // Already holding mutex
+ // omni_mutex_lock l1(d_workers_mutex);
+
+ for (worker_iter_t wi = d_workers.begin(); wi != d_workers.end(); ){
+ bool is_dead;
+
+ // We can't join while holding the worker mutex, since that would
+ // attempt to destroy the mutex we're holding (omnithread's join
+ // deletes the omni_thread object after the pthread_join
+ // completes) Instead, we lock just long enough to figure out if
+ // the worker is dead.
+ {
+ omni_mutex_lock l2((*wi)->d_mutex);
+ is_dead = (*wi)->d_state == mb_worker::TS_DEAD;
+ }
+
+ if (is_dead){
+ if (0)
+ std::cerr << "\nruntime: "
+ << "(" << (*wi)->id() << ") "
+ << (*wi)->d_mblock->instance_name() << " is TS_DEAD\n";
+ void *ignore;
+ (*wi)->join(&ignore);
+ wi = d_workers.erase(wi);
+ continue;
+ }
+ ++wi;
+ }
+}
+
+//
+// Create the thread, then create the component in the thread.
+// Return a pointer to the created mblock.
+//
+// Can be invoked from any thread
+//
+mb_mblock_sptr
+mb_runtime_thread_per_block::create_component(const std::string &instance_name,
+ const std::string &class_name,
+ pmt_t user_arg)
+{
+ mb_mblock_maker_t maker;
+ if (!mb_class_registry::lookup_maker(class_name, &maker))
+ throw mbe_no_such_class(0, class_name + " (in " + instance_name + ")");
+
+ // FIXME here's where we'd lookup NUMA placement requests & mblock
+ // priorities and communicate them to the worker we're creating...
+
+ // Create the worker thread
+ mb_worker *w =
+ new mb_worker(this, maker, instance_name, user_arg);
+
+ w->start_undetached(); // start it
+
+ // Wait for it to reach TS_RUNNING or TS_DEAD
+
+ bool is_dead;
+ mb_worker::cause_of_death_t why_dead;
+ {
+ omni_mutex_lock l(w->d_mutex);
+ while (!(w->d_state == mb_worker::TS_RUNNING
+ || w->d_state == mb_worker::TS_DEAD))
+ w->d_state_cond.wait();
+
+ is_dead = w->d_state == mb_worker::TS_DEAD;
+ why_dead = w->d_why_dead;
+ }
+
+ // If the worker failed to init (constructor or initial_transition
+ // raised an exception), reap the worker now and raise an exception.
+
+ if (is_dead && why_dead != mb_worker::RIP_EXIT){
+
+ void *ignore;
+ w->join(&ignore);
+
+ // FIXME with some work we ought to be able to propagate the
+ // exception from the worker.
+ throw mbe_mblock_failed(0, instance_name);
+ }
+
+ assert(w->d_mblock);
+
+ // Add w to the vector of workers, and return the mblock.
+ {
+ omni_mutex_lock l(d_workers_mutex);
+ d_workers.push_back(w);
+ }
+
+ if (0)
+ std::cerr << "\nruntime: created "
+ << "(" << w->id() << ") "
+ << w->d_mblock->instance_name() << "\n";
+
+ return w->d_mblock;
+}
+
+void
+mb_runtime_thread_per_block::send_all_sys_msg(pmt_t signal,
+ pmt_t data,
+ pmt_t metadata,
+ mb_pri_t priority)
+{
+ omni_mutex_lock l1(d_workers_mutex);
+
+ for (worker_iter_t wi = d_workers.begin(); wi != d_workers.end(); ++wi){
+ send_sys_msg((*wi)->d_mblock->impl()->msgq(),
+ signal, data, metadata, priority);
+ }
+}
+
+//
+// Can be invoked from any thread.
+// Sends a message to the runtime.
+//
+pmt_t
+mb_runtime_thread_per_block::schedule_one_shot_timeout
+ (const mb_time &abs_time,
+ pmt_t user_data,
+ mb_msg_accepter_sptr accepter)
+{
+ mb_timeout_sptr to(new mb_timeout(abs_time, user_data, accepter));
+ (*d_accepter)(s_request_timeout, pmt_make_any(to), PMT_F, MB_PRI_BEST);
+ return to->handle();
+}
+
+//
+// Can be invoked from any thread.
+// Sends a message to the runtime.
+//
+pmt_t
+mb_runtime_thread_per_block::schedule_periodic_timeout
+ (const mb_time &first_abs_time,
+ const mb_time &delta_time,
+ pmt_t user_data,
+ mb_msg_accepter_sptr accepter)
+{
+ mb_timeout_sptr to(new mb_timeout(first_abs_time, delta_time,
+ user_data, accepter));
+ (*d_accepter)(s_request_timeout, pmt_make_any(to), PMT_F, MB_PRI_BEST);
+ return to->handle();
+}
+
+//
+// Can be invoked from any thread.
+// Sends a message to the runtime.
+//
+void
+mb_runtime_thread_per_block::cancel_timeout(pmt_t handle)
+{
+ (*d_accepter)(s_cancel_timeout, handle, PMT_F, MB_PRI_BEST);
+}
diff --git a/mblock/src/lib/mb_runtime_thread_per_block.h b/mblock/src/lib/mb_runtime_thread_per_block.h
new file mode 100644
index 000000000..773b8b4e9
--- /dev/null
+++ b/mblock/src/lib/mb_runtime_thread_per_block.h
@@ -0,0 +1,84 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_MB_RUNTIME_THREAD_PER_BLOCK_H
+#define INCLUDED_MB_RUNTIME_THREAD_PER_BLOCK_H
+
+#include <mb_runtime_base.h>
+#include <mb_worker.h>
+#include <mb_msg_queue.h>
+#include <mb_timer_queue.h>
+
+/*!
+ * \brief Concrete runtime that uses a thread per mblock
+ * \implementation
+ *
+ * These are all implementation details.
+ */
+class mb_runtime_thread_per_block : public mb_runtime_base
+{
+public:
+ omni_mutex d_workers_mutex; // hold while manipulating d_workers
+ std::vector<mb_worker*> d_workers;
+ bool d_shutdown_in_progress;
+ pmt_t d_shutdown_result;
+ mb_msg_queue d_msgq;
+ mb_timer_queue d_timer_queue;
+
+ typedef std::vector<mb_worker*>::iterator worker_iter_t;
+
+ mb_runtime_thread_per_block();
+ ~mb_runtime_thread_per_block();
+
+ bool run(const std::string &instance_name,
+ const std::string &class_name,
+ pmt_t user_arg,
+ pmt_t *result);
+
+ void request_shutdown(pmt_t result);
+
+protected:
+ mb_mblock_sptr
+ create_component(const std::string &instance_name,
+ const std::string &class_name,
+ pmt_t user_arg);
+
+ pmt_t
+ schedule_one_shot_timeout(const mb_time &abs_time, pmt_t user_data,
+ mb_msg_accepter_sptr accepter);
+
+ pmt_t
+ schedule_periodic_timeout(const mb_time &first_abs_time,
+ const mb_time &delta_time,
+ pmt_t user_data,
+ mb_msg_accepter_sptr accepter);
+ void
+ cancel_timeout(pmt_t handle);
+
+private:
+ void reap_dead_workers();
+ void run_loop();
+
+ void send_all_sys_msg(pmt_t signal, pmt_t data = PMT_F,
+ pmt_t metadata = PMT_F,
+ mb_pri_t priority = MB_PRI_BEST);
+};
+
+#endif /* INCLUDED_MB_RUNTIME_THREAD_PER_BLOCK_H */
diff --git a/mblock/src/lib/mb_time.cc b/mblock/src/lib/mb_time.cc
new file mode 100644
index 000000000..73c86e4f4
--- /dev/null
+++ b/mblock/src/lib/mb_time.cc
@@ -0,0 +1,84 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <mb_time.h>
+#include <omnithread.h>
+#include <math.h>
+#include <assert.h>
+
+
+mb_time::mb_time(double real_secs)
+{
+ double floor_secs = floor(real_secs);
+ d_secs = (long) floor_secs;
+ d_nsecs = (long) ((real_secs - floor_secs) * 1e9); // always positive
+}
+
+mb_time
+mb_time::time(const mb_time &delta_t)
+{
+ unsigned long abs_sec, abs_nsec;
+ unsigned long rel_sec = delta_t.d_secs;
+ unsigned long rel_nsec = delta_t.d_nsecs;
+
+ omni_thread::get_time(&abs_sec, &abs_nsec, rel_sec, rel_nsec);
+ return mb_time(abs_sec, abs_nsec);
+}
+
+
+mb_time
+operator+(const mb_time &x, const mb_time &y)
+{
+ mb_time r(x.d_secs + y.d_secs, x.d_nsecs + y.d_nsecs);
+ while (r.d_nsecs >= 1000000000){
+ r.d_nsecs -= 1000000000;
+ r.d_secs++;
+ }
+ return r;
+}
+
+mb_time
+operator-(const mb_time &x, const mb_time &y)
+{
+ // assert(!(x < y));
+
+ mb_time r(x.d_secs - y.d_secs, x.d_nsecs - y.d_nsecs);
+ while (r.d_nsecs < 0){
+ r.d_nsecs += 1000000000;
+ r.d_secs--;
+ }
+ return r;
+}
+
+mb_time
+operator+(const mb_time &x, double y)
+{
+ return x + mb_time(y);
+}
+
+mb_time
+operator-(const mb_time &x, double y)
+{
+ return x - mb_time(y);
+}
diff --git a/mblock/src/lib/mb_time.h b/mblock/src/lib/mb_time.h
new file mode 100644
index 000000000..872304caa
--- /dev/null
+++ b/mblock/src/lib/mb_time.h
@@ -0,0 +1,89 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_MB_TIME_H
+#define INCLUDED_MB_TIME_H
+
+struct mb_time {
+ long int d_secs; // seconds.
+ long int d_nsecs; // nanoseconds. Always in [0, 1e9-1]
+
+ mb_time() : d_secs(0), d_nsecs(0) {}
+ mb_time(long secs, long nanosecs=0) : d_secs(secs), d_nsecs(nanosecs) {}
+
+ // N.B., this only makes sense for differences between times.
+ // Double doesn't have enough bits to precisely represent an absolute time.
+ mb_time(double secs);
+
+ // N.B. This only makes sense for differences between times.
+ // Double doesn't have enough bits to precisely represent an absolute time.
+ double double_time() const { return (double)d_secs + d_nsecs * 1e-9; }
+
+ /*!
+ * \brief Return an absolute time suitable for use with
+ * schedule_one_shot_timeout & schedule_periodic_timeout
+ *
+ * The return value is the current time plus the given relative offset.
+ */
+ static mb_time time(const mb_time &relative_offset = mb_time());
+};
+
+
+inline static bool
+operator<(const mb_time &x, const mb_time &y)
+{
+ return ((x.d_secs < y.d_secs)
+ || (x.d_secs == y.d_secs && x.d_nsecs < y.d_nsecs));
+}
+
+inline static bool
+operator>(const mb_time &x, const mb_time &y)
+{
+ return ((x.d_secs > y.d_secs)
+ || (x.d_secs == y.d_secs && x.d_nsecs > y.d_nsecs));
+}
+
+inline static bool
+operator>=(const mb_time &x, const mb_time &y)
+{
+ return ((x.d_secs > y.d_secs)
+ || (x.d_secs == y.d_secs && x.d_nsecs >= y.d_nsecs));
+}
+
+inline static bool
+operator<=(const mb_time &x, const mb_time &y)
+{
+ return ((x.d_secs < y.d_secs)
+ || (x.d_secs == y.d_secs && x.d_nsecs <= y.d_nsecs));
+}
+
+inline static bool
+operator==(const mb_time &x, const mb_time &y)
+{
+ return (x.d_secs == y.d_secs && x.d_nsecs == y.d_nsecs);
+}
+
+
+mb_time operator+(const mb_time &x, const mb_time &y);
+mb_time operator+(const mb_time &x, double y);
+mb_time operator-(const mb_time &x, const mb_time &y);
+mb_time operator-(const mb_time &x, double y);
+
+#endif /* INCLUDED_MB_TIME_H */
diff --git a/mblock/src/lib/mb_timer_queue.cc b/mblock/src/lib/mb_timer_queue.cc
new file mode 100644
index 000000000..026035ec5
--- /dev/null
+++ b/mblock/src/lib/mb_timer_queue.cc
@@ -0,0 +1,63 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <mb_timer_queue.h>
+
+static pmt_t
+make_handle()
+{
+ static long counter = 0;
+ pmt_t n = pmt_from_long(counter++);
+ return pmt_list1(n); // guaranteed to be a unique object
+}
+
+// one-shot constructor
+mb_timeout::mb_timeout(const mb_time &abs_time,
+ pmt_t user_data, mb_msg_accepter_sptr accepter)
+ : d_when(abs_time), d_is_periodic(false),
+ d_user_data(user_data), d_handle(make_handle()), d_accepter(accepter)
+{
+}
+
+// periodic constructor
+mb_timeout::mb_timeout(const mb_time &first_abs_time, const mb_time &delta_time,
+ pmt_t user_data, mb_msg_accepter_sptr accepter)
+ : d_when(first_abs_time), d_delta(delta_time), d_is_periodic(true),
+ d_user_data(user_data), d_handle(make_handle()), d_accepter(accepter)
+{
+}
+
+void
+mb_timer_queue::cancel(pmt_t handle)
+{
+ container_type::iterator it;
+
+ for (it = c.begin(); it != c.end();){
+ if (pmt_equal((*it)->handle(), handle))
+ it = c.erase(it);
+ else
+ ++it;
+ }
+ std::make_heap(c.begin(), c.end(), comp);
+}
diff --git a/mblock/src/lib/mb_timer_queue.h b/mblock/src/lib/mb_timer_queue.h
new file mode 100644
index 000000000..a17688331
--- /dev/null
+++ b/mblock/src/lib/mb_timer_queue.h
@@ -0,0 +1,73 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef INCLUDED_MB_TIMER_QUEUE_H
+#define INCLUDED_MB_TIMER_QUEUE_H
+
+#include <mb_time.h>
+#include <vector>
+#include <queue>
+#include <pmt.h>
+#include <mb_msg_accepter.h>
+
+class mb_timeout {
+public:
+ mb_time d_when; // absolute time to fire timeout
+ mb_time d_delta; // if periodic, delta_t to next timeout
+ bool d_is_periodic; // true iff this is a periodic timeout
+ pmt_t d_user_data; // data from %timeout msg
+ pmt_t d_handle; // handle for cancellation
+ mb_msg_accepter_sptr d_accepter; // where to send the message
+
+ // one-shot constructor
+ mb_timeout(const mb_time &abs_time,
+ pmt_t user_data, mb_msg_accepter_sptr accepter);
+
+ // periodic constructor
+ mb_timeout(const mb_time &first_abs_time, const mb_time &delta_time,
+ pmt_t user_data, mb_msg_accepter_sptr accepter);
+
+ pmt_t handle() const { return d_handle; }
+};
+
+typedef boost::shared_ptr<mb_timeout> mb_timeout_sptr;
+
+
+//! Sort criterion for priority_queue
+class timeout_later
+{
+public:
+ bool operator() (const mb_timeout_sptr t1, const mb_timeout_sptr t2)
+ {
+ return t1->d_when > t2->d_when;
+ }
+};
+
+
+class mb_timer_queue : public std::priority_queue<mb_timeout_sptr,
+ std::vector<mb_timeout_sptr>,
+ timeout_later>
+{
+public:
+ void cancel(pmt_t handle);
+};
+
+#endif /* INCLUDED_MB_TIMER_QUEUE_H */
diff --git a/mblock/src/lib/mb_worker.cc b/mblock/src/lib/mb_worker.cc
new file mode 100644
index 000000000..8a9248443
--- /dev/null
+++ b/mblock/src/lib/mb_worker.cc
@@ -0,0 +1,178 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <mb_worker.h>
+#include <mb_runtime_thread_per_block.h>
+#include <mb_exception.h>
+#include <mb_mblock.h>
+#include <mb_gettid.h>
+#include <mb_msg_accepter.h>
+#include <iostream>
+#ifdef HAVE_SCHED_H
+#include <sched.h>
+#endif
+
+#define VERBOSE 0 // define to 0 or 1
+
+
+static pmt_t s_worker_state_changed = pmt_intern("%worker-state-changed");
+
+
+mb_worker::mb_worker(mb_runtime_thread_per_block *runtime,
+ mb_mblock_maker_t maker,
+ const std::string &instance_name,
+ pmt_t user_arg)
+ : omni_thread((void *) 0, PRIORITY_NORMAL),
+ d_runtime(runtime), d_maker(maker),
+ d_instance_name(instance_name), d_user_arg(user_arg),
+ d_state_cond(&d_mutex), d_state(TS_UNINITIALIZED),
+ d_why_dead(RIP_NOT_DEAD_YET)
+{
+}
+
+#if 0
+mb_worker::~mb_worker()
+{
+}
+#endif
+
+#ifdef HAVE_SCHED_SETAFFINITY
+static void
+set_affinity(const std::string &instance_name, const std::string &class_name)
+{
+ //static int counter = 0;
+ cpu_set_t mask;
+ CPU_ZERO(&mask);
+
+ if (0){
+
+ //CPU_SET(counter & 0x1, &mask);
+ //counter++;
+ CPU_SET(0, &mask);
+
+ int r = sched_setaffinity(mb_gettid(), sizeof(mask), &mask);
+ if (r == -1)
+ perror("sched_setaffinity");
+ }
+}
+#else
+static void
+set_affinity(const std::string &instance_name, const std::string &class_name)
+{
+}
+#endif
+
+void
+mb_worker::set_state(worker_state_t state)
+{
+ {
+ omni_mutex_lock l2(d_mutex);
+
+ d_state = state; // update our state
+ d_state_cond.broadcast(); // Notify everybody who cares...
+ }
+
+ // send msg to runtime, telling it something changed.
+ (*d_runtime->accepter())(s_worker_state_changed, PMT_F, PMT_F, MB_PRI_BEST);
+}
+
+void *
+mb_worker::run_undetached(void *ignored)
+{
+ // FIXME add pthread_sigmask stuff
+
+ //set_affinity(d_instance_name, d_class_name);
+ set_affinity(d_instance_name, "");
+
+ try {
+ worker_thread_top_level();
+ d_why_dead = RIP_EXIT;
+ }
+ catch (mbe_terminate){
+ d_why_dead = RIP_TERMINATE;
+ }
+ catch (mbe_exit){
+ d_why_dead = RIP_EXIT;
+ }
+ catch (std::logic_error e){
+ if (d_why_dead == RIP_NOT_DEAD_YET)
+ d_why_dead = RIP_UNHANDLED_EXCEPTION;
+
+ std::cerr << "\nmb_worker::run_undetached: unhandled exception:\n";
+ std::cerr << " " << e.what() << std::endl;
+ }
+ catch (...){
+ if (d_why_dead == RIP_NOT_DEAD_YET)
+ d_why_dead = RIP_UNHANDLED_EXCEPTION;
+ }
+
+ if (VERBOSE)
+ std::cerr << "\nrun_undetached: about to return, d_why_dead = "
+ << d_why_dead << std::endl;
+
+ set_state(TS_DEAD);
+ return 0;
+}
+
+void
+mb_worker::worker_thread_top_level()
+{
+ if (VERBOSE)
+ std::cerr << "worker_thread_top_level (enter):" << std::endl
+ << " instance_name: " << d_instance_name << std::endl
+ << " omnithread id: " << id() << std::endl
+ << " gettid: " << mb_gettid() << std::endl
+ << " getpid: " << getpid() << std::endl;
+
+ cause_of_death_t pending_cause_of_death = RIP_NOT_DEAD_YET;
+
+ try {
+ pending_cause_of_death = RIP_CTOR_EXCEPTION;
+ d_mblock = d_maker(d_runtime, d_instance_name, d_user_arg);
+
+ if (VERBOSE)
+ std::cerr << "worker_thread_top_level (post-construction):" << std::endl
+ << " instance_name: " << d_instance_name << std::endl;
+
+ pending_cause_of_death = RIP_INIT_EXCEPTION;
+ d_mblock->initial_transition();
+
+ if (VERBOSE)
+ std::cerr << "worker_thread_top_level (post-initial-transition):" << std::endl
+ << " instance_name: " << d_instance_name << std::endl;
+
+ set_state(TS_RUNNING);
+
+ pending_cause_of_death = RIP_UNHANDLED_EXCEPTION;
+ d_mblock->main_loop();
+ }
+ catch (...){
+ d_why_dead = pending_cause_of_death;
+ throw;
+ }
+
+ if (VERBOSE)
+ std::cerr << "worker_thread_top_level (exit):" << std::endl
+ << " instance_name: " << d_instance_name << std::endl;
+}
diff --git a/mblock/src/lib/mb_worker.h b/mblock/src/lib/mb_worker.h
new file mode 100644
index 000000000..b207f4db5
--- /dev/null
+++ b/mblock/src/lib/mb_worker.h
@@ -0,0 +1,106 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef INCLUDED_MB_WORKER_H
+#define INCLUDED_MB_WORKER_H
+
+#include <omnithread.h>
+#include <mb_common.h>
+#include <mb_class_registry.h>
+
+
+class mb_worker;
+//typedef boost::shared_ptr<mb_worker> mb_worker_sptr;
+
+class mb_runtime_thread_per_block;
+
+/*!
+ * \brief Worker thread for thread_per_block runtime
+ * \implementation
+ */
+class mb_worker : public omni_thread
+{
+public:
+ //! worker thread states
+ enum worker_state_t {
+ TS_UNINITIALIZED, // new, uninitialized
+ TS_RUNNING, // normal steady-state condition.
+ TS_DEAD // thread is dead
+ };
+
+ //! why we're dead
+ enum cause_of_death_t {
+ RIP_NOT_DEAD_YET, // not dead
+ RIP_EXIT, // normal exit
+ RIP_TERMINATE, // caught terminate exception
+ RIP_CTOR_EXCEPTION, // constructor raised an exception
+ RIP_INIT_EXCEPTION, // initial_transition rasised an exception
+ RIP_UNHANDLED_EXCEPTION // somebody (most likely handle_message) raised an exception
+ };
+
+ /*
+ * Args used by new thread to create mb_mblock
+ */
+ mb_runtime_thread_per_block *d_runtime;
+ mb_mblock_maker_t d_maker;
+ std::string d_instance_name;
+ pmt_t d_user_arg;
+
+ mb_mblock_sptr d_mblock; //< holds pointer to created mblock
+
+ /*!
+ * \brief General mutex for all these fields.
+ *
+ * They are accessed by both the main runtime thread and the newly
+ * created thread that runs the mblock's main loop.
+ */
+ omni_mutex d_mutex;
+ omni_condition d_state_cond; //< state change notifications
+ worker_state_t d_state;
+ cause_of_death_t d_why_dead;
+
+ mb_worker(mb_runtime_thread_per_block *runtime,
+ mb_mblock_maker_t maker,
+ const std::string &instance_name,
+ pmt_t user_arg);
+
+ // ~mb_worker();
+
+
+ /*!
+ * \brief This code runs as the top-level of the new thread
+ */
+ void worker_thread_top_level();
+
+ /*!
+ * \brief Invokes the top-level of the new thread (name kind of sucks)
+ */
+ void *run_undetached(void *arg);
+
+private:
+ // Neither d_mutex nor runtime->d_mutex may be held while calling this.
+ // It locks and unlocks them itself.
+ void set_state(worker_state_t state);
+};
+
+
+
+#endif /* INCLUDED_MB_WORKER_H */
diff --git a/mblock/src/lib/mbi_runtime_lock.h b/mblock/src/lib/mbi_runtime_lock.h
index 3138a7e1e..4174f6d54 100644
--- a/mblock/src/lib/mbi_runtime_lock.h
+++ b/mblock/src/lib/mbi_runtime_lock.h
@@ -48,9 +48,9 @@
*/
class mbi_runtime_lock : boost::noncopyable {
- mb_runtime *d_rt;
+ mb_runtime_base *d_rt;
public:
- mbi_runtime_lock(mb_runtime *rt) : d_rt(rt) { d_rt->lock(); }
+ mbi_runtime_lock(mb_runtime_base *rt) : d_rt(rt) { d_rt->lock(); }
mbi_runtime_lock(mb_mblock_impl *mi) : d_rt(mi->runtime()) { d_rt->lock(); }
mbi_runtime_lock(mb_mblock *mb) : d_rt(mb->impl()->runtime()) { d_rt->lock(); }
~mbi_runtime_lock(void) { d_rt->unlock(); }
diff --git a/mblock/src/lib/qa_bitset.cc b/mblock/src/lib/qa_bitset.cc
new file mode 100644
index 000000000..0acda8232
--- /dev/null
+++ b/mblock/src/lib/qa_bitset.cc
@@ -0,0 +1,493 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <mb_mblock.h>
+#include <mb_protocol_class.h>
+#include <mb_message.h>
+#include <mb_class_registry.h>
+#include <iostream>
+#include <sstream>
+#include <bitset>
+
+static pmt_t s_in = pmt_intern("in");
+static pmt_t s_out = pmt_intern("out");
+static pmt_t s_data = pmt_intern("data");
+static pmt_t s_start = pmt_intern("start");
+static pmt_t s_send_batch = pmt_intern("send-batch");
+static pmt_t s_long0 = pmt_from_long(0);
+
+static std::string
+str(long x)
+{
+ std::ostringstream s;
+ s << x;
+ return s.str();
+}
+
+/*!
+ * \brief mblock used for QA.
+ *
+ * Messages arriving on "in" consist of a pair containing a (long)
+ * message number in the car, and a (long) bitmap in the cdr. For
+ * each message received on "in", a new message is sent on "out". The
+ * new message is the same format as the input, but the bitmap in
+ * the cdr has a "1" or'd into it that corresponds to the bit number
+ * specified in the constructor.
+ *
+ * The bitmap can be used by the ultimate receiver to confirm
+ * traversal of a set of blocks, if the blocks are assigned unique bit
+ * numbers.
+ */
+class qa_bitset : public mb_mblock
+{
+ mb_port_sptr d_in;
+ mb_port_sptr d_out;
+ int d_bitno;
+
+public:
+ qa_bitset(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+ void handle_message(mb_message_sptr msg);
+};
+
+qa_bitset::qa_bitset(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+{
+ d_bitno = pmt_to_long(user_arg); // The bit we are to set
+
+ d_in = define_port("in", "qa-bitset", false, mb_port::EXTERNAL);
+ d_out = define_port("out", "qa-bitset", true, mb_port::EXTERNAL);
+}
+
+void
+qa_bitset::handle_message(mb_message_sptr msg)
+{
+ if (pmt_eq(msg->port_id(), s_in) && pmt_eq(msg->signal(), s_data)){
+ d_out->send(s_data,
+ pmt_cons(pmt_car(msg->data()),
+ pmt_from_long((1L << d_bitno) | pmt_to_long(pmt_cdr(msg->data())))));
+ }
+}
+
+REGISTER_MBLOCK_CLASS(qa_bitset);
+
+// ------------------------------------------------------------------------
+
+/*!
+ * \brief mblock used for QA. Compose two qa_bitset mblocks.
+ */
+class qa_bitset2 : public mb_mblock
+{
+ mb_port_sptr d_in;
+ mb_port_sptr d_out;
+
+public:
+ qa_bitset2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+};
+
+qa_bitset2::qa_bitset2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+{
+ long bitno = pmt_to_long(user_arg); // The bit we are to set
+
+ d_in = define_port("in", "qa-bitset", false, mb_port::RELAY);
+ d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
+
+ define_component("bs0", "qa_bitset", pmt_from_long(bitno));
+ define_component("bs1", "qa_bitset", pmt_from_long(bitno + 1));
+ connect("self", "in", "bs0", "in");
+ connect("bs0", "out", "bs1", "in");
+ connect("bs1", "out", "self", "out");
+}
+
+REGISTER_MBLOCK_CLASS(qa_bitset2);
+
+// ------------------------------------------------------------------------
+
+/*!
+ * \brief mblock used for QA. Compose two qa_bitset2 mblocks.
+ */
+class qa_bitset4 : public mb_mblock
+{
+ mb_port_sptr d_in;
+ mb_port_sptr d_out;
+
+public:
+ qa_bitset4(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+};
+
+qa_bitset4::qa_bitset4(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+{
+ long bitno = pmt_to_long(user_arg); // The bit we are to set
+
+ d_in = define_port("in", "qa-bitset", false, mb_port::RELAY);
+ d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
+
+ define_component("bs0", "qa_bitset2", pmt_from_long(bitno));
+ define_component("bs1", "qa_bitset2", pmt_from_long(bitno + 2));
+ connect("self", "in", "bs0", "in");
+ connect("bs0", "out", "bs1", "in");
+ connect("bs1", "out", "self", "out");
+}
+
+REGISTER_MBLOCK_CLASS(qa_bitset4);
+
+// ------------------------------------------------------------------------
+
+/*!
+ * \brief mblock used for QA. Compose two qa_bitset4 mblocks.
+ */
+class qa_bitset8 : public mb_mblock
+{
+ mb_port_sptr d_in;
+ mb_port_sptr d_out;
+
+public:
+ qa_bitset8(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+};
+
+qa_bitset8::qa_bitset8(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+{
+ long bitno = pmt_to_long(user_arg); // The bit we are to set
+
+ d_in = define_port("in", "qa-bitset", false, mb_port::RELAY);
+ d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
+
+ define_component("bs0", "qa_bitset4", pmt_from_long(bitno));
+ define_component("bs1", "qa_bitset4", pmt_from_long(bitno + 4));
+ connect("self", "in", "bs0", "in");
+ connect("bs0", "out", "bs1", "in");
+ connect("bs1", "out", "self", "out");
+}
+
+REGISTER_MBLOCK_CLASS(qa_bitset8);
+
+// ------------------------------------------------------------------------
+
+/*!
+ * \brief mblock used for QA. Compose two qa_bitset8 mblocks.
+ */
+class qa_bitset16 : public mb_mblock
+{
+ mb_port_sptr d_in;
+ mb_port_sptr d_out;
+
+public:
+ qa_bitset16(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+};
+
+qa_bitset16::qa_bitset16(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+{
+ long bitno = pmt_to_long(user_arg); // The bit we are to set
+
+ d_in = define_port("in", "qa-bitset", false, mb_port::RELAY);
+ d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
+
+ define_component("bs0", "qa_bitset8", pmt_from_long(bitno));
+ define_component("bs1", "qa_bitset8", pmt_from_long(bitno + 8));
+ connect("self", "in", "bs0", "in");
+ connect("bs0", "out", "bs1", "in");
+ connect("bs1", "out", "self", "out");
+}
+
+REGISTER_MBLOCK_CLASS(qa_bitset16);
+
+// ------------------------------------------------------------------------
+
+/*!
+ * \brief mblock used for QA. Compose two qa_bitset16 mblocks.
+ */
+class qa_bitset32 : public mb_mblock
+{
+ mb_port_sptr d_in;
+ mb_port_sptr d_out;
+
+public:
+ qa_bitset32(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+};
+
+qa_bitset32::qa_bitset32(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+{
+ long bitno = pmt_to_long(user_arg); // The bit we are to set
+
+ d_in = define_port("in", "qa-bitset", false, mb_port::RELAY);
+ d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
+
+ define_component("bs0", "qa_bitset16", pmt_from_long(bitno));
+ define_component("bs1", "qa_bitset16", pmt_from_long(bitno + 16));
+ connect("self", "in", "bs0", "in");
+ connect("bs0", "out", "bs1", "in");
+ connect("bs1", "out", "self", "out");
+}
+
+REGISTER_MBLOCK_CLASS(qa_bitset32);
+
+// ------------------------------------------------------------------------
+
+class qa_bitset_src : public mb_mblock
+{
+ mb_port_sptr d_cs_top;
+ mb_port_sptr d_cs;
+
+ mb_port_sptr d_out;
+
+ long d_msg_number; // starting message number
+ long d_nmsgs_to_send; // # of messages to send
+ long d_batch_size; // # of messages to send per batch
+
+public:
+ qa_bitset_src(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+ void handle_message(mb_message_sptr msg);
+
+protected:
+ void send_one();
+ void send_batch();
+};
+
+qa_bitset_src::qa_bitset_src(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+{
+ d_msg_number = pmt_to_long(pmt_nth(0, user_arg));
+ d_nmsgs_to_send = pmt_to_long(pmt_nth(1, user_arg));
+ d_batch_size = pmt_to_long(pmt_nth(2, user_arg));
+
+ d_cs_top = define_port("cs_top", "qa-bitset-cs", true, mb_port::EXTERNAL);
+ d_cs = define_port("cs", "qa-bitset-cs", true, mb_port::EXTERNAL);
+
+ d_out = define_port("out", "qa-bitset", true, mb_port::EXTERNAL);
+}
+
+void
+qa_bitset_src::handle_message(mb_message_sptr msg)
+{
+ if ((pmt_eq(msg->port_id(), d_cs_top->port_symbol())
+ || pmt_eq(msg->port_id(), d_cs->port_symbol()))
+ && pmt_eq(msg->signal(), s_send_batch)){
+ send_batch();
+ }
+}
+
+void
+qa_bitset_src::send_batch()
+{
+ for (int i = 0; i < d_batch_size; i++)
+ send_one();
+}
+
+void
+qa_bitset_src::send_one()
+{
+ if (d_nmsgs_to_send > 0){
+ pmt_t msg_number = pmt_from_long(d_msg_number++);
+ d_out->send(s_data, pmt_cons(msg_number, s_long0));
+ }
+ if (--d_nmsgs_to_send <= 0)
+ exit();
+}
+
+REGISTER_MBLOCK_CLASS(qa_bitset_src);
+
+// ------------------------------------------------------------------------
+
+class qa_bitset_sink : public mb_mblock
+{
+ // Maximum number of messages we can track
+ static const size_t MAX_MSGS = 1 * 1024 * 1024;
+
+ mb_port_sptr d_cs0;
+ mb_port_sptr d_cs1;
+ mb_port_sptr d_cs2;
+ mb_port_sptr d_cs3;
+
+ mb_port_sptr d_in0;
+ mb_port_sptr d_in1;
+ mb_port_sptr d_in2;
+ mb_port_sptr d_in3;
+
+ long d_nmsgs_to_recv; // # of messages to receive
+ long d_batch_size; // # of messages to receive per batch
+ uint32_t d_expected_mask;
+
+ std::bitset<MAX_MSGS> d_bitset;
+ long d_nrecvd;
+
+public:
+ qa_bitset_sink(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+ void handle_message(mb_message_sptr msg);
+
+protected:
+ void receive_one(mb_message_sptr msg);
+};
+
+qa_bitset_sink::qa_bitset_sink(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg),
+ d_nrecvd(0)
+{
+ d_nmsgs_to_recv = pmt_to_long(pmt_nth(0, user_arg));
+ d_batch_size = pmt_to_long(pmt_nth(1, user_arg));
+ d_expected_mask = pmt_to_long(pmt_nth(2, user_arg));
+
+ if (d_nmsgs_to_recv > (long) MAX_MSGS)
+ throw std::out_of_range("qa_bitset_sink: nmsgs_to_recv is too big");
+
+ if (d_batch_size < 1)
+ throw std::out_of_range("qa_bitset_sink: batch_size must be >= 1");
+
+ d_cs0 = define_port("cs0", "qa-bitset-cs", true, mb_port::EXTERNAL);
+ d_cs1 = define_port("cs1", "qa-bitset-cs", true, mb_port::EXTERNAL);
+ d_cs2 = define_port("cs2", "qa-bitset-cs", true, mb_port::EXTERNAL);
+ d_cs3 = define_port("cs3", "qa-bitset-cs", true, mb_port::EXTERNAL);
+
+ d_in0 = define_port("in0", "qa-bitset", false, mb_port::EXTERNAL);
+ d_in1 = define_port("in1", "qa-bitset", false, mb_port::EXTERNAL);
+ d_in2 = define_port("in2", "qa-bitset", false, mb_port::EXTERNAL);
+ d_in3 = define_port("in3", "qa-bitset", false, mb_port::EXTERNAL);
+}
+
+void
+qa_bitset_sink::handle_message(mb_message_sptr msg)
+{
+ if ((pmt_eq(msg->port_id(), d_in0->port_symbol())
+ || pmt_eq(msg->port_id(), d_in1->port_symbol())
+ || pmt_eq(msg->port_id(), d_in2->port_symbol())
+ || pmt_eq(msg->port_id(), d_in3->port_symbol()))
+ && pmt_eq(msg->signal(), s_data)){
+
+ receive_one(msg);
+ }
+}
+
+void
+qa_bitset_sink::receive_one(mb_message_sptr msg)
+{
+ long msg_number = pmt_to_long(pmt_car(msg->data()));
+ uint32_t mask = pmt_to_long(pmt_cdr(msg->data()));
+
+ // std::cout << msg->data() << std::endl;
+
+ d_nrecvd++;
+ if (d_nrecvd % d_batch_size == d_batch_size - 1){
+ d_cs0->send(s_send_batch);
+ d_cs1->send(s_send_batch);
+ d_cs2->send(s_send_batch);
+ d_cs3->send(s_send_batch);
+ }
+
+ if (msg_number >= d_nmsgs_to_recv){
+ std::cerr << "qa_bitset_sink::receive_one: msg_number too big ("
+ << msg_number << ")\n";
+ shutdown_all(PMT_F);
+ return;
+ }
+ if (mask != d_expected_mask){
+ fprintf(stderr,
+ "qa_bitset_sink::receive_one: Wrong mask. Expected 0x%08x, got 0x%08x\n",
+ d_expected_mask, mask);
+ shutdown_all(PMT_F);
+ return;
+ }
+
+ if (d_bitset.test((size_t) msg_number)){
+ std::cerr << "qa_bitset_sink::receive_one: duplicate msg_number ("
+ << msg_number << ")\n";
+ shutdown_all(PMT_F);
+ return;
+ }
+
+ d_bitset.set((size_t) msg_number);
+ if (d_nrecvd == d_nmsgs_to_recv)
+ shutdown_all(PMT_T); // we're done!
+}
+
+REGISTER_MBLOCK_CLASS(qa_bitset_sink);
+
+// ------------------------------------------------------------------------
+
+class qa_bitset_top : public mb_mblock
+{
+ static const int NPIPES = 4;
+
+ std::vector<mb_port_sptr> d_cs;
+
+ long d_nmsgs; // # of messages to send
+ long d_batch_size; // # of messages to receive per batch
+
+public:
+ qa_bitset_top(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+ void initial_transition();
+};
+
+qa_bitset_top::qa_bitset_top(mb_runtime *runtime,
+ const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+{
+ d_nmsgs = pmt_to_long(pmt_nth(0, user_arg));
+ d_nmsgs = (d_nmsgs / NPIPES) * NPIPES;
+ d_batch_size = pmt_to_long(pmt_nth(1, user_arg));
+
+ /*
+ * We build NPIPES sources which feed NPIPES pipelines, each of which
+ * consists of 8-mblocks. All pipelines feed into a single sink
+ * which keeps track the results.
+ */
+ for (int i = 0; i < NPIPES; i++){
+ d_cs.push_back(define_port("cs"+str(i), "qa-bitset-cs", false, mb_port::INTERNAL));
+
+ // sources of test messages
+ define_component("src"+str(i), "qa_bitset_src",
+ pmt_list3(pmt_from_long(i * d_nmsgs/NPIPES),
+ pmt_from_long(d_nmsgs/NPIPES),
+ pmt_from_long(d_batch_size)));
+
+ // 8-mblock processing pipelines
+ define_component("pipeline"+str(i), "qa_bitset8", pmt_from_long(0));
+ }
+
+ // sink for output of pipelines
+ define_component("sink", "qa_bitset_sink",
+ pmt_list3(pmt_from_long(d_nmsgs),
+ pmt_from_long(d_batch_size * NPIPES),
+ pmt_from_long(0x000000ff)));
+
+ for (int i = 0; i < NPIPES; i++){
+ connect("self", "cs"+str(i), "src"+str(i), "cs_top");
+ connect("src"+str(i), "out", "pipeline"+str(i), "in");
+ connect("src"+str(i), "cs", "sink", "cs"+str(i));
+ connect("pipeline"+str(i), "out", "sink", "in"+str(i));
+ }
+}
+
+void
+qa_bitset_top::initial_transition()
+{
+ for (int i = 0; i < NPIPES; i++){
+ d_cs[i]->send(s_send_batch); // prime the pump
+ d_cs[i]->send(s_send_batch);
+ }
+}
+
+REGISTER_MBLOCK_CLASS(qa_bitset_top);
diff --git a/mblock/src/lib/qa_bitset.mbh b/mblock/src/lib/qa_bitset.mbh
new file mode 100644
index 000000000..0b2df003e
--- /dev/null
+++ b/mblock/src/lib/qa_bitset.mbh
@@ -0,0 +1,61 @@
+;; -*- scheme -*- ; not really, but tells emacs how to format this
+;;
+;; Copyright 2007 Free Software Foundation, Inc.
+;;
+;; This file is part of GNU Radio
+;;
+;; GNU Radio is free software; you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation; either version 2, or (at your option)
+;; any later version.
+;;
+;; GNU Radio is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;; GNU General Public License for more details.
+;;
+;; You should have received a copy of the GNU General Public License along
+;; with this program; if not, write to the Free Software Foundation, Inc.,
+;; 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+;;
+
+;; ----------------------------------------------------------------
+;; qa-bitset -- interface to mblock QA code
+;;
+
+(define-protocol-class qa-bitset
+
+ (:incoming
+
+ (data n bitmask)
+
+ )
+ )
+
+(define-protocol-class qa-bitset-cs
+
+ (:outgoing
+
+ (send-batch)
+
+ )
+ )
+
+;; ----------------------------------------------------------------
+;; qa-disconnect -- interface to mblock QA code
+;;
+
+(define-protocol-class qa-disconnect-cs
+
+ (:outgoing
+
+ (select-pipe n)
+
+ )
+
+ (:incoming
+
+ (ack n)
+
+ )
+ )
diff --git a/mblock/src/lib/qa_disconnect.cc b/mblock/src/lib/qa_disconnect.cc
new file mode 100644
index 000000000..cde8d6e5a
--- /dev/null
+++ b/mblock/src/lib/qa_disconnect.cc
@@ -0,0 +1,238 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <mb_mblock.h>
+#include <mb_protocol_class.h>
+#include <mb_message.h>
+#include <mb_class_registry.h>
+#include <iostream>
+#include <sstream>
+#include <bitset>
+
+static pmt_t s_in = pmt_intern("in");
+static pmt_t s_out = pmt_intern("out");
+static pmt_t s_data = pmt_intern("data");
+static pmt_t s_ack = pmt_intern("ack");
+static pmt_t s_select_pipe = pmt_intern("select-pipe");
+static pmt_t s_long0 = pmt_from_long(0);
+static pmt_t s_sys_port = pmt_intern("%sys-port");
+static pmt_t s_shutdown = pmt_intern("%shutdown");
+
+class qa_disconnect_mux : public mb_mblock
+{
+ mb_port_sptr d_in;
+ mb_port_sptr d_out;
+ mb_port_sptr d_cs;
+
+public:
+ qa_disconnect_mux(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+ void initial_transition();
+ void handle_message(mb_message_sptr msg);
+};
+
+qa_disconnect_mux::qa_disconnect_mux(mb_runtime *runtime,
+ const std::string &instance_name,
+ pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+{
+ d_in = define_port("in", "qa-bitset", false, mb_port::RELAY);
+ d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
+ d_cs = define_port("cs", "qa-disconnect-cs", true, mb_port::EXTERNAL);
+
+ define_component("pipeline0", "qa_bitset8", pmt_from_long(0));
+ define_component("pipeline1", "qa_bitset8", pmt_from_long(8));
+}
+
+void
+qa_disconnect_mux::initial_transition(){}
+
+void
+qa_disconnect_mux::handle_message(mb_message_sptr msg)
+{
+ if (pmt_eq(msg->port_id(), d_cs->port_symbol()) // select-pipe on cs
+ && pmt_eq(msg->signal(), s_select_pipe)){
+
+ long which_pipe = pmt_to_long(pmt_nth(0, msg->data()));
+
+ disconnect_component("pipeline0");
+ disconnect_component("pipeline1");
+
+ switch(which_pipe){
+
+ case 0:
+ connect("self", "in", "pipeline0", "in");
+ connect("self", "out", "pipeline0", "out");
+ break;
+
+ case 1:
+ connect("self", "in", "pipeline1", "in");
+ connect("self", "out", "pipeline1", "out");
+ break;
+ }
+
+ d_cs->send(s_ack, msg->data());
+ return;
+ }
+}
+
+REGISTER_MBLOCK_CLASS(qa_disconnect_mux);
+
+// ------------------------------------------------------------------------
+
+class qa_disconnect_top : public mb_mblock
+{
+ enum state_t {
+ UNINITIALIZED,
+ WAIT_FOR_ACK,
+ WAIT_FOR_DATA
+ };
+
+ state_t d_state;
+ int d_msg_number;
+ int d_nmsgs_to_send;
+
+ mb_port_sptr d_in;
+ mb_port_sptr d_out;
+ mb_port_sptr d_cs;
+
+ void check_pipe_send_next_msg();
+ void send_next_msg();
+ void select_pipe(int n);
+
+ // alternate pipes every 128 messages
+ static int which_pipe(int msg_number) { return (msg_number >> 7) & 0x1; }
+ bool time_to_switch() { return (d_msg_number & 0x7f) == 0; }
+
+public:
+ qa_disconnect_top(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+ void initial_transition();
+ void handle_message(mb_message_sptr msg);
+};
+
+qa_disconnect_top::qa_disconnect_top(mb_runtime *runtime,
+ const std::string &instance_name,
+ pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg),
+ d_state(UNINITIALIZED), d_msg_number(0)
+{
+ d_nmsgs_to_send = pmt_to_long(pmt_nth(0, user_arg));
+
+ d_in = define_port("in", "qa-bitset", false, mb_port::INTERNAL);
+ d_out = define_port("out", "qa-bitset", true, mb_port::INTERNAL);
+ d_cs = define_port("cs", "qa-disconnect-cs", false, mb_port::INTERNAL);
+
+ define_component("mux", "qa_disconnect_mux", PMT_F);
+
+ connect("self", "cs", "mux", "cs");
+ connect("self", "out", "mux", "in");
+ connect("self", "in", "mux", "out");
+}
+
+void
+qa_disconnect_top::initial_transition()
+{
+ check_pipe_send_next_msg();
+}
+
+void
+qa_disconnect_top::handle_message(mb_message_sptr msg)
+{
+ if (0)
+ std::cerr << "qa_disconnect_top::handle_msg state = "
+ << d_state << "\n msg = " << msg << std::endl;
+
+ if (pmt_eq(msg->port_id(), d_cs->port_symbol()) // ack on cs
+ && pmt_eq(msg->signal(), s_ack)
+ && d_state == WAIT_FOR_ACK){
+
+ send_next_msg();
+ return;
+ }
+
+ if (pmt_eq(msg->port_id(), d_in->port_symbol()) // data on in
+ && pmt_eq(msg->signal(), s_data)
+ && d_state == WAIT_FOR_DATA){
+
+ /*
+ * Confirm that msg passed through the pipe that we expect...
+ */
+ static const long expected_mask[2] = { 0x000000ff, 0x0000ff00 };
+
+ long msg_number = pmt_to_long(pmt_car(msg->data()));
+ long mask = pmt_to_long(pmt_cdr(msg->data()));
+
+ if (mask != expected_mask[which_pipe(msg_number)]){
+ fprintf(stderr, "\nqa_disconnect_top: wrong mask in msg_number = 0x%08lx\n",
+ msg_number);
+ fprintf(stderr, " expected = 0x%08lx, actual = 0x%08lx\n",
+ expected_mask[which_pipe(msg_number)], mask);
+ shutdown_all(PMT_F);
+ return;
+ }
+
+ if (msg_number == d_nmsgs_to_send - 1){ // we're done (and were successful)
+ shutdown_all(PMT_T);
+ return;
+ }
+
+ check_pipe_send_next_msg();
+ return;
+ }
+
+ if (pmt_eq(msg->port_id(), s_sys_port) // ignore %shutdown on %sys-port
+ && pmt_eq(msg->signal(), s_shutdown))
+ return;
+
+ std::cerr << "qa_disconnect_top: unhandled msg: state = "
+ << d_state << "\n msg = " << msg << std::endl;
+}
+
+void
+qa_disconnect_top::select_pipe(int n)
+{
+ d_cs->send(s_select_pipe, pmt_list1(pmt_from_long(n)));
+ d_state = WAIT_FOR_ACK;
+}
+
+void
+qa_disconnect_top::send_next_msg()
+{
+ d_state = WAIT_FOR_DATA;
+ if (d_msg_number == d_nmsgs_to_send) // we've sent all we're supposed to
+ return;
+
+ d_out->send(s_data, pmt_cons(pmt_from_long(d_msg_number), s_long0));
+ d_msg_number++;
+}
+
+void
+qa_disconnect_top::check_pipe_send_next_msg()
+{
+ if (time_to_switch())
+ select_pipe(which_pipe(d_msg_number));
+ else
+ send_next_msg();
+}
+
+REGISTER_MBLOCK_CLASS(qa_disconnect_top);
diff --git a/mblock/src/lib/qa_mblock.cc b/mblock/src/lib/qa_mblock.cc
index 4be4a23c3..cf4224544 100644
--- a/mblock/src/lib/qa_mblock.cc
+++ b/mblock/src/lib/qa_mblock.cc
@@ -27,6 +27,8 @@
#include <qa_mblock.h>
#include <qa_mblock_prims.h>
#include <qa_mblock_send.h>
+#include <qa_mblock_sys.h>
+#include <qa_timeouts.h>
CppUnit::TestSuite *
qa_mblock::suite()
@@ -35,6 +37,8 @@ qa_mblock::suite()
s->addTest (qa_mblock_prims::suite());
s->addTest (qa_mblock_send::suite());
+ s->addTest (qa_mblock_sys::suite());
+ s->addTest (qa_timeouts::suite());
return s;
}
diff --git a/mblock/src/lib/qa_mblock_prims.cc b/mblock/src/lib/qa_mblock_prims.cc
index 79ed5a21e..2995215fb 100644
--- a/mblock/src/lib/qa_mblock_prims.cc
+++ b/mblock/src/lib/qa_mblock_prims.cc
@@ -34,6 +34,7 @@
#include <mb_message.h>
#include <mb_mblock_impl.h>
#include <mb_msg_accepter.h>
+#include <mb_class_registry.h>
#include <stdio.h>
static pmt_t s_cs = pmt_intern("cs");
@@ -47,42 +48,49 @@ static pmt_t s_out = pmt_intern("out");
class dp_1 : public mb_mblock
{
public:
- dp_1();
+ dp_1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
~dp_1();
};
-dp_1::dp_1()
+dp_1::dp_1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
{
}
dp_1::~dp_1(){}
+REGISTER_MBLOCK_CLASS(dp_1);
+
// ----------------------------------------------------------------
class dp_2 : public mb_mblock
{
public:
- dp_2();
+ dp_2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
~dp_2();
};
-dp_2::dp_2()
+dp_2::dp_2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
{
define_port("cs", "cs-protocol", false, mb_port::EXTERNAL);
}
dp_2::~dp_2(){}
+REGISTER_MBLOCK_CLASS(dp_2);
+
// ----------------------------------------------------------------
class dp_3 : public mb_mblock
{
public:
- dp_3();
+ dp_3(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
~dp_3();
};
-dp_3::dp_3()
+dp_3::dp_3(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
{
define_port("cs", "cs-protocol", false, mb_port::EXTERNAL);
define_port("cs", "cs-protocol", false, mb_port::EXTERNAL); // duplicate def
@@ -90,19 +98,23 @@ dp_3::dp_3()
dp_3::~dp_3(){}
+REGISTER_MBLOCK_CLASS(dp_3);
+
// ----------------------------------------------------------------
void
qa_mblock_prims::test_define_ports()
{
- // std::vector<mb_port_sptr> intf;
-
- mb_mblock_sptr mb1 = mb_mblock_sptr(new dp_1());
- // intf = mb1->peer_interface();
- // CPPUNIT_ASSERT_EQUAL(size_t(0), intf.size());
+
+ mb_runtime_sptr rts = mb_make_runtime();
+ mb_runtime *rt = rts.get();
+
+ // Should work
+ mb_mblock_sptr mb1 = mb_mblock_sptr(new dp_1(rt, "top", PMT_F));
// raises runtime_error because of unknown protocol "cs-protocol"
- CPPUNIT_ASSERT_THROW(mb_mblock_sptr(new dp_2()), std::runtime_error);
+ CPPUNIT_ASSERT_THROW(mb_mblock_sptr(new dp_2(rt, "top", PMT_F)),
+ std::runtime_error);
// define the protocol class
pmt_t pc = mb_make_protocol_class(pmt_intern("cs-protocol"),
@@ -112,10 +124,11 @@ qa_mblock_prims::test_define_ports()
// std::cout << "pc = " << pc << '\n';
- mb_mblock_sptr mb2 = mb_mblock_sptr(new dp_2());
+ mb_mblock_sptr mb2 = mb_mblock_sptr(new dp_2(rt, "top", PMT_F));
// raises pmt_exception because of duplicate port definition of "cs"
- CPPUNIT_ASSERT_THROW(mb_mblock_sptr(new dp_3()), mbe_duplicate_port);
+ CPPUNIT_ASSERT_THROW(mb_mblock_sptr(new dp_3(rt, "top", PMT_F)),
+ mbe_duplicate_port);
}
// ================================================================
@@ -123,61 +136,74 @@ qa_mblock_prims::test_define_ports()
class dc_0 : public mb_mblock
{
public:
- dc_0();
+ dc_0(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
~dc_0();
};
-dc_0::dc_0()
+dc_0::dc_0(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
{
}
dc_0::~dc_0() {}
+REGISTER_MBLOCK_CLASS(dc_0);
+
// ----------------------------------------------------------------
class dc_ok : public mb_mblock
{
public:
- dc_ok();
+ dc_ok(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
~dc_ok();
};
-dc_ok::dc_ok()
+dc_ok::dc_ok(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
{
- define_component("c0", mb_mblock_sptr(new dc_0()));
- define_component("c1", mb_mblock_sptr(new dc_0()));
- define_component("c2", mb_mblock_sptr(new dc_0()));
+ define_component("c0", "dc_0");
+ define_component("c1", "dc_0");
+ define_component("c2", "dc_0");
}
dc_ok::~dc_ok(){}
+REGISTER_MBLOCK_CLASS(dc_ok);
+
// ----------------------------------------------------------------
class dc_not_ok : public mb_mblock
{
public:
- dc_not_ok();
+ dc_not_ok(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
~dc_not_ok();
};
-dc_not_ok::dc_not_ok()
- : mb_mblock()
+dc_not_ok::dc_not_ok(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
{
- define_component("c0", mb_mblock_sptr(new dc_0()));
- define_component("c0", mb_mblock_sptr(new dc_0())); // duplicate name
+ define_component("c0", "dc_0");
+ define_component("c0", "dc_0"); // duplicate name
}
dc_not_ok::~dc_not_ok(){}
+REGISTER_MBLOCK_CLASS(dc_not_ok);
+
// ----------------------------------------------------------------
void
qa_mblock_prims::test_define_components()
{
- mb_mblock_sptr mb1 = mb_mblock_sptr(new dc_ok()); // OK
+ mb_runtime_sptr rts = mb_make_runtime();
+ mb_runtime *rt = rts.get();
+
+ // Should work
+ mb_mblock_sptr mb1 = mb_mblock_sptr(new dc_ok(rt, "top", PMT_F));
// raises pmt_exception because of duplicate component definition of "c0"
- CPPUNIT_ASSERT_THROW(mb_mblock_sptr(new dc_not_ok()), mbe_duplicate_component);
+ CPPUNIT_ASSERT_THROW(mb_mblock_sptr(new dc_not_ok(rt, "top", PMT_F)),
+ mbe_duplicate_component);
}
// ================================================================
@@ -185,7 +211,9 @@ qa_mblock_prims::test_define_components()
class tc_norm : public mb_mblock
{
public:
- tc_norm(){
+ tc_norm(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+ {
define_port("data", "i/o", false, mb_port::EXTERNAL);
define_port("norm", "i/o", false, mb_port::EXTERNAL);
define_port("conj", "i/o", true, mb_port::EXTERNAL);
@@ -197,22 +225,26 @@ public:
tc_norm::~tc_norm(){}
+REGISTER_MBLOCK_CLASS(tc_norm);
+
////////////////////////////////////////////////////////////////
class tc_0 : public mb_mblock
{
public:
- tc_0(){
+ tc_0(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+ {
define_port("norm", "i/o", false, mb_port::EXTERNAL);
define_port("conj", "i/o", true, mb_port::EXTERNAL);
define_port("int", "i/o", false, mb_port::INTERNAL);
- define_component("c0", mb_mblock_sptr(new tc_norm()));
- define_component("c1", mb_mblock_sptr(new tc_norm()));
- define_component("c2", mb_mblock_sptr(new tc_norm()));
- define_component("c3", mb_mblock_sptr(new tc_norm()));
- define_component("c4", mb_mblock_sptr(new tc_norm()));
- define_component("c5", mb_mblock_sptr(new tc_norm()));
+ define_component("c0", "tc_norm");
+ define_component("c1", "tc_norm");
+ define_component("c2", "tc_norm");
+ define_component("c3", "tc_norm");
+ define_component("c4", "tc_norm");
+ define_component("c5", "tc_norm");
// OK
connect("c0", "norm", "c1", "conj");
@@ -284,14 +316,18 @@ public:
tc_0::~tc_0(){}
+REGISTER_MBLOCK_CLASS(tc_0);
+
////////////////////////////////////////////////////////////////
class tc_1 : public mb_mblock
{
public:
- tc_1(){
- define_component("c0", mb_mblock_sptr(new tc_norm()));
- define_component("c1", mb_mblock_sptr(new tc_norm()));
+ tc_1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+ {
+ define_component("c0", "tc_norm");
+ define_component("c1", "tc_norm");
connect("c0", "norm", "c1", "conj");
}
@@ -301,6 +337,8 @@ public:
tc_1::~tc_1(){}
+REGISTER_MBLOCK_CLASS(tc_1);
+
////////////////////////////////////////////////////////////////
void
@@ -315,7 +353,10 @@ qa_mblock_prims::test_connect()
pmt_list1(pmt_intern("in")), // in
pmt_list1(pmt_intern("out"))); // out
- mb_mblock_sptr mb0 = mb_mblock_sptr(new tc_0());
+ mb_runtime_sptr rts = mb_make_runtime();
+ mb_runtime *rt = rts.get();
+
+ mb_mblock_sptr mb0 = mb_mblock_sptr(new tc_0(rt, "top", PMT_F));
}
////////////////////////////////////////////////////////////////
@@ -377,11 +418,14 @@ qa_mblock_prims::test_msg_queue()
void
qa_mblock_prims::test_make_accepter()
{
+ mb_runtime_sptr rts = mb_make_runtime();
+ mb_runtime *rt = rts.get();
+
// create a block
- mb_mblock_sptr mb = mb_mblock_sptr(new dp_2());
+ mb_mblock_sptr mb = mb_mblock_sptr(new dp_2(rt, "top", PMT_F));
// use "internal use only" method...
- mb_msg_accepter_sptr accepter = mb->impl()->make_accepter("cs");
+ mb_msg_accepter_sptr accepter = mb->impl()->make_accepter(pmt_intern("cs"));
// Now push a few messages into it...
// signal data metadata pri
diff --git a/mblock/src/lib/qa_mblock_send.cc b/mblock/src/lib/qa_mblock_send.cc
index 46d6b6440..cd81709e2 100644
--- a/mblock/src/lib/qa_mblock_send.cc
+++ b/mblock/src/lib/qa_mblock_send.cc
@@ -35,6 +35,7 @@
#include <mb_message.h>
#include <mb_mblock_impl.h>
#include <mb_msg_accepter.h>
+#include <mb_class_registry.h>
#include <stdio.h>
static pmt_t s_data = pmt_intern("data");
@@ -57,6 +58,12 @@ define_protocol_classes()
}
+mb_mblock_sptr
+get_top(mb_runtime_sptr rts)
+{
+ return dynamic_cast<mb_runtime_nop *>(rts.get())->top();
+}
+
// ================================================================
// test_simple_routing
// ================================================================
@@ -70,12 +77,13 @@ class sr1 : public mb_mblock
mb_port_sptr d_p3;
public:
- sr1();
+ sr1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
~sr1();
- void init_fsm();
+ void initial_transition();
};
-sr1::sr1()
+sr1::sr1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
{
d_p1 = define_port("p1", "qa-send-cs", true, mb_port::EXTERNAL);
d_p2 = define_port("p2", "qa-send-cs", true, mb_port::EXTERNAL);
@@ -85,9 +93,9 @@ sr1::sr1()
sr1::~sr1(){}
void
-sr1::init_fsm()
+sr1::initial_transition()
{
- // std::cout << instance_name() << "[sr1]: init_fsm\n";
+ // std::cout << instance_name() << "[sr1]: initial_transition\n";
// send two messages to each port
pmt_t our_name = pmt_intern(instance_name());
@@ -98,6 +106,8 @@ sr1::init_fsm()
d_p2->send(s_status, pmt_list3(our_name, s_p2, pmt_from_long(1)));
}
+REGISTER_MBLOCK_CLASS(sr1);
+
// ----------------------------------------------------------------
// top-level container block for test_simple_routing
@@ -106,17 +116,18 @@ class sr0 : public mb_mblock
mb_port_sptr d_p0;
public:
- sr0();
+ sr0(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
~sr0();
- void init_fsm();
+ void initial_transition();
};
-sr0::sr0()
+sr0::sr0(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
{
d_p0 = define_port("p0", "qa-send-cs", false, mb_port::INTERNAL);
- define_component("mb1", mb_mblock_sptr(new sr1()));
- define_component("mb2", mb_mblock_sptr(new sr1()));
+ define_component("mb1", "sr1");
+ define_component("mb2", "sr1");
connect("self", "p0", "mb1", "p1");
connect("mb1", "p2", "mb2", "p3");
@@ -126,9 +137,9 @@ sr0::sr0()
sr0::~sr0(){}
void
-sr0::init_fsm()
+sr0::initial_transition()
{
- // std::cout << instance_name() << "[sr0]: init_fsm\n";
+ // std::cout << instance_name() << "[sr0]: initial_transition\n";
// send two messages to p0
pmt_t our_name = pmt_intern(instance_name());
@@ -136,6 +147,8 @@ sr0::init_fsm()
d_p0->send(s_control, pmt_list3(our_name, s_p0, pmt_from_long(1)));
}
+REGISTER_MBLOCK_CLASS(sr0);
+
// ----------------------------------------------------------------
/*
@@ -151,9 +164,10 @@ qa_mblock_send::test_simple_routing()
mb_message_sptr msg;
mb_runtime_sptr rt = mb_make_runtime_nop();
- mb_mblock_sptr mb0 = mb_mblock_sptr(new sr0());
- rt->run(mb0);
+ rt->run("top", "sr0", PMT_F);
+ mb_mblock_sptr mb0 = get_top(rt);
+
// Reach into the guts and see if the messages ended up where they should have
// mb0 should have received two messages sent from mb1 via its p1
@@ -238,12 +252,13 @@ class rr2 : public mb_mblock
mb_port_sptr d_p2;
public:
- rr2();
+ rr2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
~rr2();
- void init_fsm();
+ void initial_transition();
};
-rr2::rr2()
+rr2::rr2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
{
d_p1 = define_port("p1", "qa-send-cs", true, mb_port::EXTERNAL);
d_p2 = define_port("p2", "qa-send-cs", false, mb_port::EXTERNAL);
@@ -252,9 +267,9 @@ rr2::rr2()
rr2::~rr2(){}
void
-rr2::init_fsm()
+rr2::initial_transition()
{
- // std::cout << instance_name() << "[rr2]: init_fsm\n";
+ // std::cout << instance_name() << "[rr2]: initial_transition\n";
// send two messages via p1
pmt_t our_name = pmt_intern(instance_name());
@@ -262,6 +277,8 @@ rr2::init_fsm()
d_p1->send(s_status, pmt_list3(our_name, s_p1, pmt_from_long(1)));
}
+REGISTER_MBLOCK_CLASS(rr2);
+
// ----------------------------------------------------------------
// intermediate block for test_relay_routing
@@ -272,16 +289,17 @@ class rr1 : public mb_mblock
mb_port_sptr d_p2;
public:
- rr1();
+ rr1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
~rr1();
};
-rr1::rr1()
+rr1::rr1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
{
d_p1 = define_port("p1", "qa-send-cs", true, mb_port::RELAY);
d_p2 = define_port("p2", "qa-send-cs", false, mb_port::RELAY);
- define_component("c0", mb_mblock_sptr(new rr2()));
+ define_component("c0", "rr2");
connect("self", "p1", "c0", "p1");
connect("self", "p2", "c0", "p2");
@@ -289,6 +307,8 @@ rr1::rr1()
rr1::~rr1(){}
+REGISTER_MBLOCK_CLASS(rr1);
+
// ----------------------------------------------------------------
// top-level container for test_relay_routing
@@ -296,14 +316,15 @@ rr1::~rr1(){}
class rr0_a : public mb_mblock
{
public:
- rr0_a();
+ rr0_a(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
~rr0_a();
};
-rr0_a::rr0_a()
+rr0_a::rr0_a(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
{
- define_component("c0", mb_mblock_sptr(new rr1()));
- define_component("c1", mb_mblock_sptr(new rr2()));
+ define_component("c0", "rr1");
+ define_component("c1", "rr2");
connect("c0", "p1", "c1", "p2");
connect("c0", "p2", "c1", "p1");
@@ -311,6 +332,7 @@ rr0_a::rr0_a()
rr0_a::~rr0_a(){}
+REGISTER_MBLOCK_CLASS(rr0_a);
/*
* This tests basic message routing using RELAY and EXTERNAL ports.
@@ -323,8 +345,8 @@ qa_mblock_send::test_relay_routing_1()
mb_message_sptr msg;
mb_runtime_sptr rt = mb_make_runtime_nop();
- mb_mblock_sptr top = mb_mblock_sptr(new rr0_a());
- rt->run(top);
+ rt->run("top", "rr0_a", PMT_F);
+ mb_mblock_sptr top = get_top(rt);
// Reach into the guts and see if the messages ended up where they should have
@@ -377,14 +399,15 @@ qa_mblock_send::test_relay_routing_1()
class rr0_b : public mb_mblock
{
public:
- rr0_b();
+ rr0_b(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
~rr0_b();
};
-rr0_b::rr0_b()
+rr0_b::rr0_b(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
{
- define_component("c0", mb_mblock_sptr(new rr1()));
- define_component("c1", mb_mblock_sptr(new rr1()));
+ define_component("c0", "rr1");
+ define_component("c1", "rr1");
connect("c0", "p1", "c1", "p2");
connect("c0", "p2", "c1", "p1");
@@ -392,6 +415,7 @@ rr0_b::rr0_b()
rr0_b::~rr0_b(){}
+REGISTER_MBLOCK_CLASS(rr0_b);
/*
* This tests basic message routing using RELAY and EXTERNAL ports.
@@ -404,8 +428,8 @@ qa_mblock_send::test_relay_routing_2()
mb_message_sptr msg;
mb_runtime_sptr rt = mb_make_runtime_nop();
- mb_mblock_sptr top = mb_mblock_sptr(new rr0_b());
- rt->run(top);
+ rt->run("top", "rr0_b", PMT_F);
+ mb_mblock_sptr top = get_top(rt);
// Reach into the guts and see if the messages ended up where they should have
diff --git a/mblock/src/lib/qa_mblock_sys.cc b/mblock/src/lib/qa_mblock_sys.cc
new file mode 100644
index 000000000..a64f546c7
--- /dev/null
+++ b/mblock/src/lib/qa_mblock_sys.cc
@@ -0,0 +1,271 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2006,2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <qa_mblock_sys.h>
+#include <cppunit/TestAssert.h>
+#include <mb_mblock.h>
+#include <mb_runtime.h>
+#include <mb_runtime_nop.h> // QA only
+#include <mb_protocol_class.h>
+#include <mb_exception.h>
+#include <mb_msg_queue.h>
+#include <mb_message.h>
+#include <mb_mblock_impl.h>
+#include <mb_msg_accepter.h>
+#include <mb_class_registry.h>
+#include <stdio.h>
+#include <string.h>
+#include <iostream>
+
+
+static pmt_t s_data = pmt_intern("data");
+static pmt_t s_status = pmt_intern("status");
+static pmt_t s_control = pmt_intern("control");
+static pmt_t s_p0 = pmt_intern("p0");
+static pmt_t s_p1 = pmt_intern("p1");
+static pmt_t s_p2 = pmt_intern("p2");
+static pmt_t s_p3 = pmt_intern("p3");
+static pmt_t s_e1 = pmt_intern("e1");
+static pmt_t s_r1 = pmt_intern("r1");
+
+static void
+define_protocol_classes()
+{
+ mb_make_protocol_class(s_data, // name
+ pmt_list1(s_data), // incoming
+ pmt_list1(s_data)); // outgoing
+}
+
+
+// ================================================================
+// test_sys_1
+// ================================================================
+
+class sys_1 : public mb_mblock
+{
+ pmt_t d_user_arg;
+ mb_port_sptr d_data;
+
+public:
+ sys_1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+ ~sys_1();
+ void initial_transition();
+};
+
+sys_1::sys_1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg),
+ d_user_arg(user_arg)
+{
+ d_data = define_port("data", "data", true, mb_port::EXTERNAL);
+}
+
+sys_1::~sys_1(){}
+
+void
+sys_1::initial_transition()
+{
+ shutdown_all(d_user_arg);
+}
+
+REGISTER_MBLOCK_CLASS(sys_1);
+
+void
+qa_mblock_sys::test_sys_1()
+{
+ define_protocol_classes();
+
+ pmt_t result;
+ pmt_t n1 = pmt_from_long(1);
+ pmt_t n2 = pmt_from_long(2);
+
+ mb_runtime_sptr rt1 = mb_make_runtime();
+
+#if 0
+ try {
+ rt1->run("top-1", "sys_1", n1, &result);
+ }
+ catch (omni_thread_fatal e){
+ std::cerr << "caught omni_thread_fatal: error = " << e.error
+ << ": " << strerror(e.error) << std::endl;
+ }
+ catch (omni_thread_invalid){
+ std::cerr << "caught omni_thread_invalid\n";
+ }
+#else
+ rt1->run("top-1", "sys_1", n1, &result);
+#endif
+ CPPUNIT_ASSERT(pmt_equal(n1, result));
+
+ // Execute run a second time, with the same rt, to ensure sanity.
+ rt1->run("top-2", "sys_1", n2, &result);
+ CPPUNIT_ASSERT(pmt_equal(n2, result));
+}
+
+// ================================================================
+// test_sys_2
+// ================================================================
+
+class squarer : public mb_mblock
+{
+ mb_port_sptr d_data;
+
+public:
+ squarer(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+
+ void handle_message(mb_message_sptr msg);
+};
+
+squarer::squarer(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+{
+ d_data = define_port("data", "data", true, mb_port::EXTERNAL);
+}
+
+void
+squarer::handle_message(mb_message_sptr msg)
+{
+ if (!pmt_eq(msg->signal(), s_data)) // we only handle the "data" message
+ return;
+
+ // long x -> (long x . long (x * x))
+
+ pmt_t x_pmt = msg->data();
+ long x = pmt_to_long(x_pmt);
+ d_data->send(s_data, pmt_cons(x_pmt, pmt_from_long(x * x)));
+}
+
+REGISTER_MBLOCK_CLASS(squarer);
+
+// ----------------------------------------------------------------
+
+class sys_2 : public mb_mblock
+{
+ mb_port_sptr d_data;
+
+public:
+ sys_2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
+ void initial_transition();
+ void handle_message(mb_message_sptr msg);
+};
+
+sys_2::sys_2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg)
+{
+ d_data = define_port("data", "data", true, mb_port::INTERNAL);
+ define_component("squarer", "squarer");
+ connect("self", "data", "squarer", "data");
+}
+
+void
+sys_2::initial_transition()
+{
+ // FIXME start timer to detect general failure
+
+ d_data->send(s_data, pmt_from_long(0)); // send initial message
+}
+
+void
+sys_2::handle_message(mb_message_sptr msg)
+{
+ if (!pmt_eq(msg->signal(), s_data)) // we only handle the "data" message
+ return;
+
+ // first check correctness of message
+
+ long x = pmt_to_long(pmt_car(msg->data()));
+ long y = pmt_to_long(pmt_cdr(msg->data()));
+
+ // std::cout << msg->data() << std::endl;
+
+ if (y != x * x){
+ std::cerr << "sys_2::handle_message: Expected y == x * x. Got y = "
+ << y << " for x = " << x << std::endl;
+
+ shutdown_all(PMT_F); // failed
+ }
+
+ if (x == 100)
+ shutdown_all(PMT_T); // done, OK
+ else
+ d_data->send(s_data, pmt_from_long(x + 1)); // send next request
+}
+
+REGISTER_MBLOCK_CLASS(sys_2);
+
+// ----------------------------------------------------------------
+
+void
+qa_mblock_sys::test_sys_2()
+{
+ mb_runtime_sptr rt = mb_make_runtime();
+ pmt_t result = PMT_NIL;
+
+ // std::cerr << "qa_mblock_sys::test_sys_2 (enter)\n";
+
+ rt->run("top-sys-2", "sys_2", PMT_F, &result);
+ CPPUNIT_ASSERT(pmt_equal(PMT_T, result));
+}
+
+// ================================================================
+// test_bitset_1
+// ================================================================
+
+void
+qa_mblock_sys::test_bitset_1()
+{
+ mb_runtime_sptr rt = mb_make_runtime();
+ pmt_t result = PMT_NIL;
+
+ long nmsgs = 1000;
+ long batch_size = 8;
+
+ pmt_t arg = pmt_list2(pmt_from_long(nmsgs), // # of messages to send through pipe
+ pmt_from_long(batch_size));
+
+ rt->run("top", "qa_bitset_top", arg, &result);
+
+ CPPUNIT_ASSERT(pmt_equal(PMT_T, result));
+}
+
+// ================================================================
+// test_disconnect
+// ================================================================
+
+void
+qa_mblock_sys::test_disconnect()
+{
+ mb_runtime_sptr rt = mb_make_runtime();
+ pmt_t result = PMT_NIL;
+
+ long nmsgs = 10240;
+
+ pmt_t arg = pmt_list1(pmt_from_long(nmsgs)); // # of messages to send through pipe
+
+
+ rt->run("top", "qa_disconnect_top", arg, &result);
+
+ CPPUNIT_ASSERT(pmt_equal(PMT_T, result));
+}
diff --git a/mblock/src/lib/qa_mblock_sys.h b/mblock/src/lib/qa_mblock_sys.h
new file mode 100644
index 000000000..87333818e
--- /dev/null
+++ b/mblock/src/lib/qa_mblock_sys.h
@@ -0,0 +1,45 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2006,2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+#ifndef INCLUDED_QA_MBLOCK_SYS_H
+#define INCLUDED_QA_MBLOCK_SYS_H
+
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/TestCase.h>
+
+class qa_mblock_sys : public CppUnit::TestCase {
+
+ CPPUNIT_TEST_SUITE(qa_mblock_sys);
+ CPPUNIT_TEST(test_sys_1);
+ CPPUNIT_TEST(test_sys_2);
+ CPPUNIT_TEST(test_bitset_1);
+ CPPUNIT_TEST(test_disconnect);
+ CPPUNIT_TEST_SUITE_END();
+
+ private:
+ void test_sys_1();
+ void test_sys_2();
+ void test_bitset_1();
+ void test_disconnect();
+};
+
+#endif /* INCLUDED_QA_MBLOCK_SYS_H */
+
diff --git a/mblock/src/lib/qa_timeouts.cc b/mblock/src/lib/qa_timeouts.cc
new file mode 100644
index 000000000..4439b6e8f
--- /dev/null
+++ b/mblock/src/lib/qa_timeouts.cc
@@ -0,0 +1,292 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <qa_timeouts.h>
+#include <cppunit/TestAssert.h>
+#include <mb_mblock.h>
+#include <mb_runtime.h>
+#include <mb_protocol_class.h>
+#include <mb_exception.h>
+#include <mb_msg_queue.h>
+#include <mb_message.h>
+#include <mb_mblock_impl.h>
+#include <mb_msg_accepter.h>
+#include <mb_class_registry.h>
+#include <mb_timer_queue.h>
+#include <stdio.h>
+#include <string.h>
+#include <iostream>
+
+
+static pmt_t s_timeout = pmt_intern("%timeout");
+static pmt_t s_done = pmt_intern("done");
+
+
+// ------------------------------------------------------------------------
+// Exercise the priority queue used to implement timeouts.
+// ------------------------------------------------------------------------
+void
+qa_timeouts::test_timer_queue()
+{
+ mb_timer_queue tq;
+ mb_msg_accepter_sptr accepter;
+
+ mb_timeout_sptr t1000_000 =
+ mb_timeout_sptr(new mb_timeout(mb_time(1000,0), PMT_F, accepter));
+
+ mb_timeout_sptr t2000_000 =
+ mb_timeout_sptr(new mb_timeout(mb_time(2000,0), PMT_F, accepter));
+
+ mb_timeout_sptr t3000_000 =
+ mb_timeout_sptr(new mb_timeout(mb_time(3000,0), PMT_F, accepter));
+
+ mb_timeout_sptr t3000_125 =
+ mb_timeout_sptr(new mb_timeout(mb_time(3000,125), PMT_F, accepter));
+
+ mb_timeout_sptr t3000_250 =
+ mb_timeout_sptr(new mb_timeout(mb_time(3000,250), PMT_F, accepter));
+
+ mb_timeout_sptr t4000_000 =
+ mb_timeout_sptr(new mb_timeout(mb_time(4000,0), PMT_F, accepter));
+
+ // insert in pseudo-random order
+
+ tq.push(t3000_125);
+ tq.push(t1000_000);
+ tq.push(t4000_000);
+ tq.push(t3000_250);
+ tq.push(t2000_000);
+ tq.push(t3000_000);
+
+ CPPUNIT_ASSERT_EQUAL(t1000_000, tq.top());
+ tq.pop();
+
+ CPPUNIT_ASSERT_EQUAL(t2000_000, tq.top());
+ tq.pop();
+
+ CPPUNIT_ASSERT_EQUAL(t3000_000, tq.top());
+ tq.pop();
+
+ CPPUNIT_ASSERT_EQUAL(t3000_125, tq.top());
+ tq.pop();
+
+ CPPUNIT_ASSERT_EQUAL(t3000_250, tq.top());
+ tq.pop();
+
+ CPPUNIT_ASSERT_EQUAL(t4000_000, tq.top());
+ tq.pop();
+
+ CPPUNIT_ASSERT(tq.empty());
+
+ // insert in pseudo-random order
+
+ tq.push(t3000_000);
+ tq.push(t4000_000);
+ tq.push(t3000_125);
+ tq.push(t1000_000);
+ tq.push(t2000_000);
+ tq.push(t3000_250);
+
+ tq.cancel(t1000_000->handle());
+
+ CPPUNIT_ASSERT_EQUAL(t2000_000, tq.top());
+ tq.pop();
+
+ CPPUNIT_ASSERT_EQUAL(t3000_000, tq.top());
+ tq.pop();
+
+ tq.cancel(t3000_250->handle());
+
+ CPPUNIT_ASSERT_EQUAL(t3000_125, tq.top());
+ tq.pop();
+
+ CPPUNIT_ASSERT_EQUAL(t4000_000, tq.top());
+ tq.pop();
+
+ CPPUNIT_ASSERT(tq.empty());
+}
+
+// ------------------------------------------------------------------------
+// Test one-shot timeouts
+// ------------------------------------------------------------------------
+
+// FWIW, on SuSE 10.1 for x86-64, clock_getres returns 0.004 seconds.
+
+#define TIMING_MARGIN 0.010 // seconds
+
+class qa_timeouts_1_top : public mb_mblock
+{
+ int d_nleft;
+ int d_nerrors;
+ mb_time d_t0;
+
+public:
+ qa_timeouts_1_top(mb_runtime *runtime,
+ const std::string &instance_name, pmt_t user_arg);
+
+ void initial_transition();
+ void handle_message(mb_message_sptr msg);
+};
+
+qa_timeouts_1_top::qa_timeouts_1_top(mb_runtime *runtime,
+ const std::string &instance_name,
+ pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg),
+ d_nleft(0), d_nerrors(0)
+{
+}
+
+void
+qa_timeouts_1_top::initial_transition()
+{
+ d_t0 = mb_time::time(); // now
+
+ schedule_one_shot_timeout(d_t0 + 0.200, pmt_from_double(0.200));
+ schedule_one_shot_timeout(d_t0 + 0.125, pmt_from_double(0.125));
+ schedule_one_shot_timeout(d_t0 + 0.075, pmt_from_double(0.075));
+ schedule_one_shot_timeout(d_t0 + 0.175, pmt_from_double(0.175));
+
+ d_nleft = 4;
+}
+
+void
+qa_timeouts_1_top::handle_message(mb_message_sptr msg)
+{
+ if (pmt_eq(msg->signal(), s_timeout)){
+ mb_time t_now = mb_time::time();
+ double expected_delta_t = pmt_to_double(msg->data());
+ double actual_delta_t = (t_now - d_t0).double_time();
+ double delta = expected_delta_t - actual_delta_t;
+
+ if (fabs(delta) > TIMING_MARGIN){
+ std::cerr << "qa_timeouts_1_top: expected_delta_t = " << expected_delta_t
+ << " actual_delta_t = " << actual_delta_t << std::endl;
+ d_nerrors++;
+ }
+
+ if (--d_nleft <= 0)
+ shutdown_all(d_nerrors == 0 ? PMT_T : PMT_F);
+ }
+}
+
+REGISTER_MBLOCK_CLASS(qa_timeouts_1_top);
+
+void
+qa_timeouts::test_timeouts_1()
+{
+ mb_runtime_sptr rt = mb_make_runtime();
+ pmt_t result = PMT_NIL;
+
+ rt->run("top", "qa_timeouts_1_top", PMT_F, &result);
+
+ CPPUNIT_ASSERT(pmt_equal(PMT_T, result));
+}
+
+// ------------------------------------------------------------------------
+// Test periodic timeouts
+// ------------------------------------------------------------------------
+
+class qa_timeouts_2_top : public mb_mblock
+{
+ int d_nhandled;
+ int d_nerrors;
+ double d_delta_t;
+ mb_time d_t0;
+
+public:
+ qa_timeouts_2_top(mb_runtime *runtime,
+ const std::string &instance_name, pmt_t user_arg);
+
+ void initial_transition();
+ void handle_message(mb_message_sptr msg);
+};
+
+qa_timeouts_2_top::qa_timeouts_2_top(mb_runtime *runtime,
+ const std::string &instance_name,
+ pmt_t user_arg)
+ : mb_mblock(runtime, instance_name, user_arg),
+ d_nhandled(0), d_nerrors(0), d_delta_t(0.075)
+{
+}
+
+void
+qa_timeouts_2_top::initial_transition()
+{
+ d_t0 = mb_time::time(); // now
+
+ schedule_periodic_timeout(d_t0 + d_delta_t, mb_time(d_delta_t), PMT_T);
+}
+
+void
+qa_timeouts_2_top::handle_message(mb_message_sptr msg)
+{
+ static const int NMSGS_TO_HANDLE = 5;
+
+ if (pmt_eq(msg->signal(), s_timeout)
+ && !pmt_eq(msg->data(), s_done)){
+
+ mb_time t_now = mb_time::time();
+
+ d_nhandled++;
+
+ double expected_delta_t = d_delta_t * d_nhandled;
+ double actual_delta_t = (t_now - d_t0).double_time();
+ double delta = expected_delta_t - actual_delta_t;
+
+ if (fabs(delta) > TIMING_MARGIN){
+ std::cerr << "qa_timeouts_2_top: expected_delta_t = " << expected_delta_t
+ << " actual_delta_t = " << actual_delta_t << std::endl;
+ d_nerrors++;
+ }
+
+ if (d_nhandled == NMSGS_TO_HANDLE){
+ cancel_timeout(msg->metadata()); // test cancel_timeout...
+ schedule_one_shot_timeout(d_t0 + (d_delta_t * (d_nhandled + 2)), s_done);
+ }
+ }
+
+ if (pmt_eq(msg->signal(), s_timeout)
+ && pmt_eq(msg->data(), s_done)){
+ if (d_nhandled != NMSGS_TO_HANDLE){
+ std::cerr << "qa_timeouts_2_top: d_nhandled = " << d_nhandled
+ << " expected d_nhandled = " << NMSGS_TO_HANDLE
+ << " (cancel_timeout didn't work)\n";
+ d_nerrors++;
+ }
+ shutdown_all(d_nerrors == 0 ? PMT_T : PMT_F);
+ }
+}
+
+REGISTER_MBLOCK_CLASS(qa_timeouts_2_top);
+
+void
+qa_timeouts::test_timeouts_2()
+{
+ mb_runtime_sptr rt = mb_make_runtime();
+ pmt_t result = PMT_NIL;
+
+ rt->run("top", "qa_timeouts_2_top", PMT_F, &result);
+
+ CPPUNIT_ASSERT(pmt_equal(PMT_T, result));
+}
diff --git a/mblock/src/lib/qa_timeouts.h b/mblock/src/lib/qa_timeouts.h
new file mode 100644
index 000000000..736c4c2d9
--- /dev/null
+++ b/mblock/src/lib/qa_timeouts.h
@@ -0,0 +1,43 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2006,2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+#ifndef INCLUDED_QA_TIMEOUTS_H
+#define INCLUDED_QA_TIMEOUTS_H
+
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/TestCase.h>
+
+class qa_timeouts : public CppUnit::TestCase {
+
+ CPPUNIT_TEST_SUITE(qa_timeouts);
+ CPPUNIT_TEST(test_timer_queue);
+ CPPUNIT_TEST(test_timeouts_1);
+ CPPUNIT_TEST(test_timeouts_2);
+ CPPUNIT_TEST_SUITE_END();
+
+ private:
+ void test_timer_queue();
+ void test_timeouts_1();
+ void test_timeouts_2();
+};
+
+#endif /* INCLUDED_QA_TIMEOUTS_H */
+
diff --git a/mblock/src/scheme/Makefile.am b/mblock/src/scheme/Makefile.am
new file mode 100644
index 000000000..7700a1fc0
--- /dev/null
+++ b/mblock/src/scheme/Makefile.am
@@ -0,0 +1,21 @@
+#
+# Copyright 2007 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+
+SUBDIRS = gnuradio
diff --git a/mblock/src/scheme/gnuradio/Makefile.am b/mblock/src/scheme/gnuradio/Makefile.am
new file mode 100644
index 000000000..f217f6852
--- /dev/null
+++ b/mblock/src/scheme/gnuradio/Makefile.am
@@ -0,0 +1,19 @@
+#
+# Copyright 2007 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
diff --git a/mblock/src/scheme/gnuradio/compile-mbh.scm b/mblock/src/scheme/gnuradio/compile-mbh.scm
new file mode 100755
index 000000000..fbad90d7a
--- /dev/null
+++ b/mblock/src/scheme/gnuradio/compile-mbh.scm
@@ -0,0 +1,231 @@
+#!/usr/bin/guile \
+-e main -s
+!#
+;; -*-scheme-*-
+;;
+;; Copyright 2007 Free Software Foundation, Inc.
+;;
+;; This file is part of GNU Radio
+;;
+;; GNU Radio is free software; you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation; either version 2, or (at your option)
+;; any later version.
+;;
+;; GNU Radio is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;; GNU General Public License for more details.
+;;
+;; You should have received a copy of the GNU General Public License along
+;; with this program; if not, write to the Free Software Foundation, Inc.,
+;; 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+;;
+
+;; usage: compile-mbh <input-file> <output-file>
+
+(use-modules (ice-9 getopt-long))
+(use-modules (ice-9 format))
+(use-modules (ice-9 pretty-print))
+;(use-modules (ice-9 slib))
+(use-modules (gnuradio pmt-serialize))
+(use-modules (gnuradio macros-etc))
+
+(debug-enable 'backtrace)
+
+;; ----------------------------------------------------------------
+
+(define (main args)
+
+ (define (usage)
+ (format 0 "usage: ~a input-file output-file~%" (car args)))
+
+ (when (not (= (length args) 3))
+ (usage)
+ (exit 1))
+
+ (let ((input-filename (cadr args))
+ (output-filename (caddr args)))
+ (if (compile-mbh-file input-filename output-filename)
+ (exit 0)
+ (exit 1))))
+
+
+;; ----------------------------------------------------------------
+;; constructor and accessors for protocol-class
+
+(define %protocol-class-tag (string->symbol "[PROTOCOL-CLASS-TAG]"))
+
+(define (make-protocol-class name incoming outgoing)
+ (vector %protocol-class-tag name incoming outgoing))
+
+(define (protocol-class? obj)
+ (and (vector? obj) (eq? %protocol-class-tag (vector-ref obj 0))))
+
+(define (protocol-class-name pc)
+ (vector-ref pc 1))
+
+(define (protocol-class-incoming pc)
+ (vector-ref pc 2))
+
+(define (protocol-class-outgoing pc)
+ (vector-ref pc 3))
+
+
+;; ----------------------------------------------------------------
+
+(define (syntax-error msg e)
+ (throw 'syntax-error msg e))
+
+(define (unrecognized-form form)
+ (syntax-error "Unrecognized form" form))
+
+
+(define (mbh-chk-length= e y n)
+ (cond ((and (null? y)(zero? n))
+ #f)
+ ((null? y)
+ (syntax-error "Expression has too few subexpressions" e))
+ ((atom? y)
+ (syntax-error (if (atom? e)
+ "List expected"
+ "Expression ends with `dotted' atom")
+ e))
+ ((zero? n)
+ (syntax-error "Expression has too many subexpressions" e))
+ (else
+ (mbh-chk-length= e (cdr y) (- n 1)))))
+
+(define (mbh-chk-length>= e y n)
+ (cond ((and (null? y)(< n 1))
+ #f)
+ ((atom? y)
+ (mbh-chk-length= e y -1))
+ (else
+ (mbh-chk-length>= e (cdr y) (- n 1)))))
+
+
+(define (compile-mbh-file input-filename output-filename)
+ (let ((i-port (open-input-file input-filename))
+ (o-port (open-output-file output-filename)))
+
+ (letrec
+ ((protocol-classes '()) ; alist
+
+ (lookup-protocol-class ; returns protocol-class or #f
+ (lambda (name)
+ (cond ((assq name protocol-classes) => cdr)
+ (else #f))))
+
+ (register-protocol-class
+ (lambda (pc)
+ (set! protocol-classes (acons (protocol-class-name pc)
+ pc protocol-classes))
+ pc))
+
+ (parse-top-level-form
+ (lambda (form)
+ (mbh-chk-length>= form form 1)
+ (case (car form)
+ ((define-protocol-class) (parse-define-protocol-class form))
+ (else (syntax-error form)))))
+
+ (parse-define-protocol-class
+ (lambda (form)
+ (mbh-chk-length>= form form 2)
+ ;; form => (define-protocol-class name
+ ;; (:include protocol-class-name)
+ ;; (:incoming list-of-msgs)
+ ;; (:outgoing list-of-msgs))
+ (let ((name (cadr form))
+ (incoming '())
+ (outgoing '()))
+ (if (lookup-protocol-class name)
+ (syntax-error "Duplicate protocol-class name" name))
+ (for-each
+ (lambda (sub-form)
+ (mbh-chk-length>= sub-form sub-form 1)
+ (case (car sub-form)
+ ((:include)
+ (mbh-chk-length>= sub-form sub-form 2)
+ (cond ((lookup-protocol-class (cadr sub-form)) =>
+ (lambda (pc)
+ (set! incoming (append incoming (protocol-class-incoming pc)))
+ (set! outgoing (append outgoing (protocol-class-outgoing pc)))))
+ (else
+ (syntax-error "Unknown protocol-class-name" (cadr sub-form)))))
+ ((:incoming)
+ (set! incoming (append incoming (cdr sub-form))))
+ ((:outgoing)
+ (set! outgoing (append outgoing (cdr sub-form))))
+ (else
+ (unrecognized-form (car sub-form)))))
+ (cddr form))
+
+ (register-protocol-class (make-protocol-class name incoming outgoing)))))
+
+ ) ; end of bindings
+
+ (for-each-in-file i-port parse-top-level-form)
+
+ ;; generate the output here...
+
+ (letrec ((classes (map cdr protocol-classes))
+ (so-stream (make-serial-output-stream))
+ (format-output-for-c++
+ (lambda (output)
+ (format o-port "//~%")
+ (format o-port "// Machine generated by compile-mbh from ~a~%" input-filename)
+ (format o-port "//~%")
+ (format o-port "// protocol-classes: ~{~a ~}~%" (map car protocol-classes))
+ (format o-port "//~%")
+
+ (format o-port "#include <mb_protocol_class.h>~%")
+ (format o-port "#include <unistd.h>~%")
+ (format o-port
+ "static const char~%protocol_class_init_data[~d] = {~% "
+ (length output))
+
+ (do ((lst output (cdr lst))
+ (i 0 (+ i 1)))
+ ((null? lst) #t)
+ (format o-port "~a, " (car lst))
+ (when (= 15 (modulo i 16))
+ (format o-port "~% ")))
+
+ (format o-port "~&};~%")
+ (format o-port "static mb_protocol_class_init _init_(protocol_class_init_data, sizeof(protocol_class_init_data));~%")
+ )))
+
+
+ (map (lambda (pc)
+ (let ((obj-to-dump
+ (list (protocol-class-name pc) ; class name
+ (map car (protocol-class-incoming pc)) ; incoming msg names
+ (map car (protocol-class-outgoing pc)) ; outgoing msg names
+ ;;(protocol-class-incoming pc) ; full incoming msg descriptions
+ ;;(protocol-class-outgoing pc) ; full outgoing msg descriptions
+ )))
+ ;;(pretty-print obj-to-dump)
+ (pmt-serialize obj-to-dump (so-stream 'put-byte))))
+ classes)
+
+ (format-output-for-c++ ((so-stream 'get-output)))
+
+ #t))))
+
+
+(define (make-serial-output-stream)
+ (letrec ((output '())
+ (put-byte
+ (lambda (byte)
+ (set! output (cons byte output))))
+ (get-output
+ (lambda ()
+ (reverse output))))
+ (lambda (key)
+ (case key
+ ((put-byte) put-byte)
+ ((get-output) get-output)
+ (else (error "Unknown key" key))))))
+