diff options
Diffstat (limited to 'mblock')
55 files changed, 3840 insertions, 271 deletions
diff --git a/mblock/README b/mblock/README index ca91edfaa..130073afe 100644 --- a/mblock/README +++ b/mblock/README @@ -1,5 +1,5 @@ # -# Copyright 2006 Free Software Foundation, Inc. +# Copyright 2006,2007 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,3 +20,5 @@ # The "Message block" implementation, a work in progress... + +http://gnuradio.org/trac/wiki/MessageBlocks diff --git a/mblock/src/Makefile.am b/mblock/src/Makefile.am index bcbab2455..c6c2fcc75 100644 --- a/mblock/src/Makefile.am +++ b/mblock/src/Makefile.am @@ -19,4 +19,4 @@ # Boston, MA 02110-1301, USA. # -SUBDIRS = lib +SUBDIRS = lib scheme diff --git a/mblock/src/lib/Makefile.am b/mblock/src/lib/Makefile.am index 6273e16d2..604f97246 100644 --- a/mblock/src/lib/Makefile.am +++ b/mblock/src/lib/Makefile.am @@ -28,28 +28,41 @@ TESTS = test_mblock lib_LTLIBRARIES = libmblock.la libmblock-qa.la EXTRA_DIST = \ - README.locking + README.locking \ + qa_bitset.mbh +BUILT_SOURCES = \ + qa_bitset_mbh.cc + +qa_bitset_mbh.cc : qa_bitset.mbh + $(COMPILE_MBH) qa_bitset.mbh qa_bitset_mbh.cc + # These are the source files that go into the mblock shared library libmblock_la_SOURCES = \ + mb_class_registry.cc \ mb_connection.cc \ mb_endpoint.cc \ mb_exception.cc \ + mb_gettid.cc \ mb_mblock.cc \ mb_mblock_impl.cc \ mb_message.cc \ mb_msg_accepter.cc \ + mb_msg_accepter_msgq.cc \ mb_msg_accepter_smp.cc \ mb_msg_queue.cc \ mb_port.cc \ mb_port_simple.cc \ mb_protocol_class.cc \ mb_runtime.cc \ + mb_runtime_base.cc \ mb_runtime_nop.cc \ - mb_runtime_placeholder.cc \ - mb_runtime_thread_per_mblock.cc \ - mb_util.cc + mb_runtime_thread_per_block.cc \ + mb_time.cc \ + mb_timer_queue.cc \ + mb_util.cc \ + mb_worker.cc # magic flags @@ -62,19 +75,20 @@ libmblock_la_LIBADD = \ -lstdc++ include_HEADERS = \ + mb_class_registry.h \ mb_common.h \ mb_exception.h \ + mb_gettid.h \ mb_mblock.h \ mb_message.h \ mb_msg_accepter.h \ + mb_msg_accepter_msgq.h \ mb_msg_queue.h \ mb_port.h \ mb_port_simple.h \ mb_protocol_class.h \ mb_runtime.h \ - mb_runtime_nop.h \ - mb_runtime_placeholder.h \ - mb_runtime_thread_per_mblock.h \ + mb_time.h \ mb_util.h @@ -83,18 +97,30 @@ noinst_HEADERS = \ mb_endpoint.h \ mb_mblock_impl.h \ mb_msg_accepter_smp.h \ + mb_runtime_base.h \ + mb_runtime_nop.h \ + mb_runtime_thread_per_block.h \ + mb_timer_queue.h \ + mb_worker.h \ mbi_runtime_lock.h \ qa_mblock.h \ qa_mblock_prims.h \ - qa_mblock_send.h + qa_mblock_send.h \ + qa_mblock_sys.h \ + qa_timeouts.h # Build the qa code into its own library libmblock_qa_la_SOURCES = \ + qa_bitset.cc \ + qa_bitset_mbh.cc \ + qa_disconnect.cc \ qa_mblock.cc \ qa_mblock_prims.cc \ - qa_mblock_send.cc + qa_mblock_send.cc \ + qa_mblock_sys.cc \ + qa_timeouts.cc # magic flags @@ -107,9 +133,14 @@ libmblock_qa_la_LIBADD = \ -lstdc++ -noinst_PROGRAMS = test_mblock +noinst_PROGRAMS = \ + test_mblock \ + benchmark_send test_mblock_SOURCES = test_mblock.cc test_mblock_LDADD = libmblock-qa.la +benchmark_send_SOURCES = benchmark_send.cc +benchmark_send_LDADD = libmblock-qa.la + CLEANFILES = $(BUILT_SOURCES) *.pyc diff --git a/mblock/src/lib/mb_runtime_placeholder.h b/mblock/src/lib/benchmark_send.cc index b55d39f94..c9ebe57c3 100644 --- a/mblock/src/lib/mb_runtime_placeholder.h +++ b/mblock/src/lib/benchmark_send.cc @@ -18,33 +18,28 @@ * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef INCLUDED_MB_RUNTIME_PLACEHOLDER_H -#define INCLUDED_MB_RUNTIME_PLACEHOLDER_H #include <mb_runtime.h> +#include <iostream> -/*! - * \brief Concrete runtime that serves as a placeholder until the real - * runtime is known. - * - * The singleton instance of this class is installed in the d_runtime - * instance variable of each mb_mblock_impl at construction time. - * Having a valid instance of runtime removes the "pre runtime::run" - * corner case, and allows us to lock and unlock the big runtime lock - * even though there's no "real runtime" yet. - */ -class mb_runtime_placeholder : public mb_runtime +int +main(int argc, char **argv) { + mb_runtime_sptr rt = mb_make_runtime(); + pmt_t result = PMT_NIL; + + long nmsgs = 1000000; + long batch_size = 100; -public: - mb_runtime_placeholder(); - ~mb_runtime_placeholder(); + pmt_t arg = pmt_list2(pmt_from_long(nmsgs), // # of messages to send through pipe + pmt_from_long(batch_size)); - //! throws mbe_not_implemented - bool run(mb_mblock_sptr top); + rt->run("top", "qa_bitset_top", arg, &result); - //! Return the placeholder singleton - static mb_runtime *singleton(); -}; + if (!pmt_equal(PMT_T, result)){ + std::cerr << "benchmark_send: incorrect result"; + return 1; + } -#endif /* INCLUDED_MB_RUNTIME_PLACEHOLDER_H */ + return 0; +} diff --git a/mblock/src/lib/getres.cc b/mblock/src/lib/getres.cc new file mode 100644 index 000000000..c05ba792a --- /dev/null +++ b/mblock/src/lib/getres.cc @@ -0,0 +1,32 @@ +#include <time.h> +#include <stdio.h> + +int +main(int argc, char **argv) +{ + bool ok = true; + struct timespec ts; + int r; + + r = clock_getres(CLOCK_REALTIME, &ts); + if (r != 0){ + perror("clock_getres(CLOCK_REALTIME, ...)"); + ok = false; + } + else + printf("clock_getres(CLOCK_REALTIME, ...) => %11.9f\n", + (double) ts.tv_sec + ts.tv_nsec * 1e-9); + + + r = clock_getres(CLOCK_MONOTONIC, &ts); + if (r != 0){ + perror("clock_getres(CLOCK_MONOTONIC, ..."); + ok = false; + } + else + printf("clock_getres(CLOCK_MONOTONIC, ...) => %11.9f\n", + (double) ts.tv_sec + ts.tv_nsec * 1e-9); + + + return ok == true ? 0 : 1; +} diff --git a/mblock/src/lib/mb_runtime_thread_per_mblock.cc b/mblock/src/lib/mb_class_registry.cc index 925dcdc45..9eee93618 100644 --- a/mblock/src/lib/mb_runtime_thread_per_mblock.cc +++ b/mblock/src/lib/mb_class_registry.cc @@ -22,44 +22,26 @@ #ifdef HAVE_CONFIG_H #include <config.h> #endif -#include <mb_runtime_thread_per_mblock.h> -#include <mb_mblock.h> -#include <mb_mblock_impl.h> +#include <mb_class_registry.h> +#include <map> +static std::map<std::string, mb_mblock_maker_t> s_registry; -mb_runtime_thread_per_mblock::mb_runtime_thread_per_mblock() -{ - // nop for now -} - -mb_runtime_thread_per_mblock::~mb_runtime_thread_per_mblock() +bool +mb_class_registry::register_maker(const std::string &name, mb_mblock_maker_t maker) { - // nop for now + s_registry[name] = maker; + return true; } bool -mb_runtime_thread_per_mblock::run(mb_mblock_sptr top) +mb_class_registry::lookup_maker(const std::string &name, mb_mblock_maker_t *maker) { - class initial_visitor : public mb_visitor - { - mb_runtime *d_rt; - - public: - initial_visitor(mb_runtime *rt) : d_rt(rt) {} - bool operator()(mb_mblock *mblock, const std::string &path) - { - mblock->impl()->set_runtime(d_rt); - mblock->set_instance_name(path); - mblock->init_fsm(); - return true; - } - }; - - initial_visitor visitor(this); - - d_top = top; // remember top of tree - - d_top->walk_tree(&visitor); + if (s_registry.count(name) == 0){ // not registered + *maker = (mb_mblock_maker_t) 0; + return false; + } + *maker = s_registry[name]; return true; } diff --git a/mblock/src/lib/mb_class_registry.h b/mblock/src/lib/mb_class_registry.h new file mode 100644 index 000000000..b6e63d38b --- /dev/null +++ b/mblock/src/lib/mb_class_registry.h @@ -0,0 +1,51 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#ifndef INCLUDED_MB_CLASS_REGISTRY_H +#define INCLUDED_MB_CLASS_REGISTRY_H + +#include <mb_common.h> + +//! conceptually, pointer to constructor +typedef mb_mblock_sptr (*mb_mblock_maker_t)(mb_runtime *runtime, + const std::string &instance_name, + pmt_t user_arg); + +/* + * \brief Maintain mapping between mblock class_name and factory (maker) + */ +class mb_class_registry : public boost::noncopyable { +public: + static bool register_maker(const std::string &name, mb_mblock_maker_t maker); + static bool lookup_maker(const std::string &name, mb_mblock_maker_t *maker); +}; + +template<class mblock> +mb_mblock_sptr mb_mblock_maker(mb_runtime *runtime, + const std::string &instance_name, + pmt_t user_arg) +{ + return mb_mblock_sptr(new mblock(runtime, instance_name, user_arg)); +} + +#define REGISTER_MBLOCK_CLASS(name) \ + bool __RBC__ ## name = mb_class_registry::register_maker(#name, &mb_mblock_maker<name>) + +#endif /* INCLUDED_MB_CLASS_REGISTRY_H */ diff --git a/mblock/src/lib/mb_exception.cc b/mblock/src/lib/mb_exception.cc index 4d4ca70b1..23dfbd9fe 100644 --- a/mblock/src/lib/mb_exception.cc +++ b/mblock/src/lib/mb_exception.cc @@ -38,6 +38,11 @@ mbe_not_implemented::mbe_not_implemented(mb_mblock *mb, const std::string &msg) { } +mbe_no_such_class::mbe_no_such_class(mb_mblock *mb, const std::string &class_name) + : mbe_base(mb, "No such class: " + class_name) +{ +} + mbe_no_such_component::mbe_no_such_component(mb_mblock *mb, const std::string &component_name) : mbe_base(mb, "No such component: " + component_name) { @@ -85,3 +90,17 @@ mbe_invalid_port_type::mbe_invalid_port_type(mb_mblock *mb, : mbe_base(mb, "Invalid port type for connection: " + mb_util::join_names(comp_name, port_name)) { } + +mbe_mblock_failed::mbe_mblock_failed(mb_mblock *mb, + const std::string &msg) + : mbe_base(mb, "Message block failed: " + msg) +{ +} + +mbe_terminate::mbe_terminate() +{ +} + +mbe_exit::mbe_exit() +{ +} diff --git a/mblock/src/lib/mb_exception.h b/mblock/src/lib/mb_exception.h index 40abd1c96..183f7089c 100644 --- a/mblock/src/lib/mb_exception.h +++ b/mblock/src/lib/mb_exception.h @@ -38,6 +38,11 @@ public: mbe_not_implemented(mb_mblock *mb, const std::string &msg); }; +class mbe_no_such_class : public mbe_base +{ +public: + mbe_no_such_class(mb_mblock *, const std::string &class_name); +}; class mbe_no_such_component : public mbe_base { @@ -57,6 +62,7 @@ public: mbe_no_such_port(mb_mblock *, const std::string &port_name); }; + class mbe_duplicate_port : public mbe_base { public: @@ -87,6 +93,26 @@ public: const std::string &port_name); }; +class mbe_mblock_failed : public mbe_base +{ +public: + mbe_mblock_failed(mb_mblock *, const std::string &msg); +}; + +// not derived from mbe_base to simplify try/catch +class mbe_terminate +{ +public: + mbe_terminate(); +}; + +// not derived from mbe_base to simplify try/catch +class mbe_exit +{ +public: + mbe_exit(); +}; + #endif /* INCLUDED_MB_EXCEPTION_H */ diff --git a/mblock/src/lib/mb_gettid.cc b/mblock/src/lib/mb_gettid.cc new file mode 100644 index 000000000..4f6b3a567 --- /dev/null +++ b/mblock/src/lib/mb_gettid.cc @@ -0,0 +1,53 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <mb_gettid.h> + +#define NEED_STUB + +#if defined(HAVE_SYS_SYSCALL_H) && defined(HAVE_UNISTD_H) + +#include <sys/syscall.h> +#include <unistd.h> + +#if defined(SYS_gettid) +#undef NEED_STUB + +int mb_gettid() +{ + return syscall(SYS_gettid); +} + +#endif +#endif + +#if defined(NEED_STUB) + +int +mb_gettid() +{ + return 0; +} + +#endif diff --git a/mblock/src/lib/mb_runtime_thread_per_mblock.h b/mblock/src/lib/mb_gettid.h index cef756ecd..d06276f56 100644 --- a/mblock/src/lib/mb_runtime_thread_per_mblock.h +++ b/mblock/src/lib/mb_gettid.h @@ -18,23 +18,9 @@ * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef INCLUDED_MB_RUNTIME_THREAD_PER_MBLOCK_H -#define INCLUDED_MB_RUNTIME_THREAD_PER_MBLOCK_H - -#include <mb_runtime.h> /*! - * \brief Concrete runtime that uses a single thread for all work. + * \brief Return Linux taskid, or 0 if not available */ -class mb_runtime_thread_per_mblock : public mb_runtime -{ - mb_mblock_sptr d_top; // top mblock - -public: - mb_runtime_thread_per_mblock(); - ~mb_runtime_thread_per_mblock(); - - bool run(mb_mblock_sptr top); -}; +int mb_gettid(); -#endif /* INCLUDED_MB_RUNTIME_THREAD_PER_MBLOCK_H */ diff --git a/mblock/src/lib/mb_mblock.cc b/mblock/src/lib/mb_mblock.cc index 9b8658f21..afff1a928 100644 --- a/mblock/src/lib/mb_mblock.cc +++ b/mblock/src/lib/mb_mblock.cc @@ -25,29 +25,36 @@ #include <mb_mblock.h> #include <mb_mblock_impl.h> +#include <mb_runtime.h> +#include <mb_exception.h> +#include <iostream> +static pmt_t s_sys_port = pmt_intern("%sys-port"); +static pmt_t s_halt = pmt_intern("%halt"); + mb_visitor::~mb_visitor() { // nop base case for virtual destructor. } -mb_mblock::mb_mblock() - : d_impl(mb_mblock_impl_sptr(new mb_mblock_impl(this))) +mb_mblock::mb_mblock(mb_runtime *runtime, + const std::string &instance_name, + pmt_t user_arg) + : d_impl(mb_mblock_impl_sptr( + new mb_mblock_impl(dynamic_cast<mb_runtime_base*>(runtime), + this, instance_name))) { } mb_mblock::~mb_mblock() { - disconnect_all(); - - // FIXME more? } void -mb_mblock::init_fsm() +mb_mblock::initial_transition() { // default implementation does nothing } @@ -58,6 +65,36 @@ mb_mblock::handle_message(mb_message_sptr msg) // default implementation does nothing } + +void +mb_mblock::main_loop() +{ + while (1){ + mb_message_sptr msg; + try { + while (1){ + msg = impl()->msgq().get_highest_pri_msg(); + + // check for %halt from %sys-port + if (pmt_eq(msg->port_id(), s_sys_port) && pmt_eq(msg->signal(), s_halt)) + exit(); + + handle_message(msg); + } + } + catch (pmt_exception e){ + std::cerr << "\nmb_mblock::main_loop: ignored pmt_exception: " + << e.what() + << "\nin mblock instance \"" << instance_name() + << "\" while handling message:" + << "\n port_id = " << msg->port_id() + << "\n signal = " << msg->signal() + << "\n data = " << msg->data() + << "\n metatdata = " << msg->metadata() << std::endl; + } + } +} + //////////////////////////////////////////////////////////////////////// // Forward other methods to implementation class // //////////////////////////////////////////////////////////////////////// @@ -74,9 +111,11 @@ mb_mblock::define_port(const std::string &port_name_string, void mb_mblock::define_component(const std::string &component_name, - mb_mblock_sptr component) + const std::string &class_name, + pmt_t user_arg) + { - d_impl->define_component(component_name, component); + d_impl->define_component(component_name, class_name, user_arg); } void @@ -97,7 +136,7 @@ mb_mblock::disconnect(const std::string &comp_name1, const std::string &port_nam } void -mb_mblock::disconnect_component(const std::string component_name) +mb_mblock::disconnect_component(const std::string &component_name) { d_impl->disconnect_component(component_name); } @@ -115,9 +154,9 @@ mb_mblock::nconnections() const } bool -mb_mblock::walk_tree(mb_visitor *visitor, const std::string &path) +mb_mblock::walk_tree(mb_visitor *visitor) { - return d_impl->walk_tree(visitor, path); + return d_impl->walk_tree(visitor); } std::string @@ -127,7 +166,7 @@ mb_mblock::instance_name() const } void -mb_mblock::set_instance_name(const std::string name) +mb_mblock::set_instance_name(const std::string &name) { d_impl->set_instance_name(name); } @@ -139,7 +178,7 @@ mb_mblock::class_name() const } void -mb_mblock::set_class_name(const std::string name) +mb_mblock::set_class_name(const std::string &name) { d_impl->set_class_name(name); } @@ -149,3 +188,42 @@ mb_mblock::parent() const { return d_impl->mblock_parent(); } + +void +mb_mblock::exit() +{ + throw mbe_exit(); // adios... +} + +void +mb_mblock::shutdown_all(pmt_t result) +{ + d_impl->runtime()->request_shutdown(result); +} + +pmt_t +mb_mblock::schedule_one_shot_timeout(const mb_time &abs_time, pmt_t user_data) +{ + mb_msg_accepter_sptr accepter = impl()->make_accepter(s_sys_port); + return d_impl->runtime()->schedule_one_shot_timeout(abs_time, user_data, + accepter); +} + +pmt_t +mb_mblock::schedule_periodic_timeout(const mb_time &first_abs_time, + const mb_time &delta_time, + pmt_t user_data) +{ + mb_msg_accepter_sptr accepter = impl()->make_accepter(s_sys_port); + return d_impl->runtime()->schedule_periodic_timeout(first_abs_time, + delta_time, + user_data, + accepter); +} + +void +mb_mblock::cancel_timeout(pmt_t handle) +{ + d_impl->runtime()->cancel_timeout(handle); +} + diff --git a/mblock/src/lib/mb_mblock.h b/mblock/src/lib/mb_mblock.h index 594920f91..6d471c3db 100644 --- a/mblock/src/lib/mb_mblock.h +++ b/mblock/src/lib/mb_mblock.h @@ -24,6 +24,7 @@ #include <mb_common.h> #include <mb_message.h> #include <mb_port.h> +#include <mb_time.h> /*! @@ -34,7 +35,7 @@ class mb_visitor { public: virtual ~mb_visitor(); - virtual bool operator()(mb_mblock *mblock, const std::string &path) = 0; + virtual bool operator()(mb_mblock *mblock) = 0; }; // ---------------------------------------------------------------------- @@ -52,6 +53,7 @@ private: friend class mb_runtime; friend class mb_mblock_impl; + friend class mb_worker; protected: /*! @@ -59,22 +61,29 @@ protected: * * Initializing all mblocks in the system is a 3 step procedure. * - * The top level mblock's constructor is run. That constructor (a) - * registers all of its ports using define_port, (b) constructs and - * registers any subcomponents it may have via the define_component - * method, and then (c) issues connect calls to wire its - * subcomponents together. + * The top level mblock's constructor is run. That constructor + * (a) registers all of its ports using define_port, (b) registers any + * subcomponents it may have via the define_component method, and + * then (c) issues connect calls to wire its subcomponents together. + * + * \param runtime the runtime associated with this mblock + * \param instance_name specify the name of this instance + * (for debugging, NUMA mapping, etc) + * \param user_arg argument passed by user to constructor + * (ignored by the mb_mblock base class) */ - mb_mblock(); + mb_mblock(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); public: /*! * \brief Called by the runtime system to execute the initial * transition of the finite state machine. * - * Override this to initialize your finite state machine. + * This method is called by the runtime after all blocks are + * constructed and before the first message is delivered. Override + * this to initialize your finite state machine. */ - virtual void init_fsm(); + virtual void initial_transition(); protected: /*! @@ -113,11 +122,13 @@ protected: * names and identities of our sub-component mblocks. * * \param component_name The name of the sub-component (must be unique with this mblock). - * \param component The sub-component instance. + * \param class_name The class of the instance that is to be created. + * \param user_arg The argument to pass to the constructor of the component. */ void define_component(const std::string &component_name, - mb_mblock_sptr component); + const std::string &class_name, + pmt_t user_arg = PMT_NIL); /*! * \brief connect endpoint_1 to endpoint_2 @@ -160,7 +171,7 @@ protected: * \param component_name component to disconnect */ void - disconnect_component(const std::string component_name); + disconnect_component(const std::string &component_name); /*! * \brief disconnect all connections to all components @@ -175,8 +186,47 @@ protected: nconnections() const; //! Set the class name - void set_class_name(const std::string name); + void set_class_name(const std::string &name); + + /*! + * \brief Tell runtime that we are done. + * + * This method does not return. + */ + void exit(); + /*! + * \brief Ask runtime to execute the shutdown procedure for all blocks. + * + * \param result sets value of \p result output argument of runtime->run(...) + * + * The runtime first sends a maximum priority %shutdown message to + * all blocks. All blocks should handle the %shutdown message, + * perform whatever clean up is required, and call this->exit(); + * + * After a period of time (~100ms), any blocks which haven't yet + * called this->exit() are sent a maximum priority %halt message. + * %halt is detected in main_loop, and this->exit() is called. + * + * After an additional period of time (~100ms), any blocks which + * still haven't yet called this->exit() are sent a SIG<FOO> (TBD) + * signal, which will blow them out of any blocking system calls and + * raise an mbe_terminate exception. The default top-level + * runtime-provided exception handler will call this->exit() to + * finish the process. + * + * runtime->run(...) returns when all blocks have called exit. + */ + void shutdown_all(pmt_t result); + + /*! + * \brief main event dispatching loop + * + * Although it is possible to override this, the default implementation + * should work for virtually all cases. + */ + virtual void main_loop(); + public: virtual ~mb_mblock(); @@ -187,18 +237,74 @@ public: std::string class_name() const; //! Set the instance name of this block. - void set_instance_name(const std::string name); + void set_instance_name(const std::string &name); //! Return the parent of this mblock, or 0 if we're the top-level block. mb_mblock *parent() const; /*! + * \brief Schedule a "one shot" timeout. + * + * \param abs_time the absolute time at which the timeout should fire + * \param user_data the data passed in the %timeout message. + * + * When the timeout fires, a message will be sent to the mblock. + * + * The message will have port_id = %sys-port, signal = %timeout, + * data = user_data, metadata = the handle returned from + * schedule_one_shot_timeout, pri = MB_PRI_BEST. + * + * \returns a handle that can be used in cancel_timeout, and is passed + * as the metadata field of the generated %timeout message. + * + * To cancel a pending timeout, call cancel_timeout. + */ + pmt_t + schedule_one_shot_timeout(const mb_time &abs_time, pmt_t user_data); + + /*! + * \brief Schedule a periodic timeout. + * + * \param first_abs_time The absolute time at which the first timeout should fire. + * \param delta_time The relative delay between the first and successive timeouts. + * \param user_data the data passed in the %timeout message. + * + * When the timeout fires, a message will be sent to the mblock, and a + * new timeout will be scheduled for previous absolute time + delta_time. + * + * The message will have port_id = %sys-port, signal = %timeout, + * data = user_data, metadata = the handle returned from + * schedule_one_shot_timeout, pri = MB_PRI_BEST. + * + * \returns a handle that can be used in cancel_timeout, and is passed + * as the metadata field of the generated %timeout message. + * + * To cancel a pending timeout, call cancel_timeout. + */ + pmt_t + schedule_periodic_timeout(const mb_time &first_abs_time, + const mb_time &delta_time, + pmt_t user_data); + + /*! + * \brief Attempt to cancel a pending timeout. + * + * Note that this only stops a future timeout from firing. It is + * possible that a timeout may have already fired and enqueued a + * %timeout message, but that that message has not yet been seen by + * handle_message. + * + * \param handle returned from schedule_one_shot_timeout or schedule_periodic_timeout. + */ + void cancel_timeout(pmt_t handle); + + /*! * \brief Perform a pre-order depth-first traversal of the hierarchy. * * The traversal stops and returns false if any call to visitor returns false. */ bool - walk_tree(mb_visitor *visitor, const std::string &path="top"); + walk_tree(mb_visitor *visitor); //! \implementation diff --git a/mblock/src/lib/mb_mblock_impl.cc b/mblock/src/lib/mb_mblock_impl.cc index 1a9e50146..02cbb548d 100644 --- a/mblock/src/lib/mb_mblock_impl.cc +++ b/mblock/src/lib/mb_mblock_impl.cc @@ -30,8 +30,8 @@ #include <mb_exception.h> #include <mb_util.h> #include <mb_msg_accepter_smp.h> -#include <mb_runtime_placeholder.h> #include <mbi_runtime_lock.h> +#include <iostream> static pmt_t s_self = pmt_intern("self"); @@ -52,9 +52,10 @@ mb_mblock_impl::comp_is_defined(const std::string &name) //////////////////////////////////////////////////////////////////////// -mb_mblock_impl::mb_mblock_impl(mb_mblock *mb) - : d_mb(mb), d_mb_parent(0), d_runtime(mb_runtime_placeholder::singleton()), - d_instance_name("<unknown>"), d_class_name("mblock") +mb_mblock_impl::mb_mblock_impl(mb_runtime_base *runtime, mb_mblock *mb, + const std::string &instance_name) + : d_runtime(runtime), d_mb(mb), d_mb_parent(0), + d_instance_name(instance_name), d_class_name("mblock") { } @@ -85,15 +86,28 @@ mb_mblock_impl::define_port(const std::string &port_name, void mb_mblock_impl::define_component(const std::string &name, - mb_mblock_sptr component) + const std::string &class_name, + pmt_t user_arg) { - mbi_runtime_lock l(this); + { + mbi_runtime_lock l(this); + + if (comp_is_defined(name)) // check for duplicate name + throw mbe_duplicate_component(d_mb, name); + } - if (comp_is_defined(name)) // check for duplicate name - throw mbe_duplicate_component(d_mb, name); + // We ask the runtime to create the component so that it can worry about + // mblock placement on a NUMA machine or on a distributed multicomputer - component->d_impl->d_mb_parent = d_mb; // set component's parent link - d_comp_map[name] = component; + mb_mblock_sptr component = + d_runtime->create_component(instance_name() + "/" + name, + class_name, user_arg); + { + mbi_runtime_lock l(this); + + component->d_impl->d_mb_parent = d_mb; // set component's parent link + d_comp_map[name] = component; + } } void @@ -125,6 +139,7 @@ mb_mblock_impl::disconnect(const std::string &comp_name1, mbi_runtime_lock l(this); d_conn_table.disconnect(comp_name1, port_name1, comp_name2, port_name2); + invalidate_all_port_caches(); } void @@ -133,6 +148,7 @@ mb_mblock_impl::disconnect_component(const std::string component_name) mbi_runtime_lock l(this); d_conn_table.disconnect_component(component_name); + invalidate_all_port_caches(); } void @@ -141,6 +157,7 @@ mb_mblock_impl::disconnect_all() mbi_runtime_lock l(this); d_conn_table.disconnect_all(); + invalidate_all_port_caches(); } int @@ -220,26 +237,25 @@ mb_mblock_impl::endpoints_are_compatible(const mb_endpoint &ep0, } bool -mb_mblock_impl::walk_tree(mb_visitor *visitor, const std::string &path) +mb_mblock_impl::walk_tree(mb_visitor *visitor) { - if (!(*visitor)(d_mb, path)) + if (!(*visitor)(d_mb)) return false; mb_comp_map_t::iterator it; for (it = d_comp_map.begin(); it != d_comp_map.end(); ++it) - if (!(it->second->walk_tree(visitor, path + "/" + it->first))) + if (!(it->second->walk_tree(visitor))) return false; return true; } mb_msg_accepter_sptr -mb_mblock_impl::make_accepter(const std::string port_name) +mb_mblock_impl::make_accepter(pmt_t port_name) { // FIXME this should probably use some kind of configurable factory mb_msg_accepter *ma = - new mb_msg_accepter_smp(d_mb->shared_from_this(), - pmt_intern(port_name)); + new mb_msg_accepter_smp(d_mb->shared_from_this(), port_name); return mb_msg_accepter_sptr(ma); } @@ -281,3 +297,31 @@ mb_mblock_impl::set_class_name(const std::string &name) d_class_name = name; } +/* + * This is the "Big Hammer" port cache invalidator. + * It invalidates _all_ of the port caches in the entire mblock tree. + * It's overkill, but was simple to code. + */ +void +mb_mblock_impl::invalidate_all_port_caches() +{ + class invalidator : public mb_visitor + { + public: + bool operator()(mb_mblock *mblock) + { + mb_mblock_impl_sptr impl = mblock->impl(); + mb_port_map_t::iterator it = impl->d_port_map.begin(); + mb_port_map_t::iterator end = impl->d_port_map.end(); + for (; it != end; ++it) + it->second->invalidate_cache(); + return true; + } + }; + + invalidator visitor; + + // Always true, except in early QA code + if (runtime()->top()) + runtime()->top()->walk_tree(&visitor); +} diff --git a/mblock/src/lib/mb_mblock_impl.h b/mblock/src/lib/mb_mblock_impl.h index fc0fa6943..03ad414ea 100644 --- a/mblock/src/lib/mb_mblock_impl.h +++ b/mblock/src/lib/mb_mblock_impl.h @@ -22,6 +22,7 @@ #define INCLUDED_MB_MBLOCK_IMPL_H #include <mb_mblock.h> +#include <mb_runtime_base.h> #include <mb_connection.h> #include <mb_msg_queue.h> #include <list> @@ -37,9 +38,9 @@ typedef std::map<std::string, mb_mblock_sptr> mb_comp_map_t; */ class mb_mblock_impl : boost::noncopyable { + mb_runtime_base *d_runtime; // pointer to runtime mb_mblock *d_mb; // pointer to our associated mblock mb_mblock *d_mb_parent; // pointer to our parent - mb_runtime *d_runtime; // pointer to runtime std::string d_instance_name; // hierarchical name std::string d_class_name; // name of this (derived) class @@ -51,7 +52,8 @@ class mb_mblock_impl : boost::noncopyable mb_msg_queue d_msgq; // incoming messages for us public: - mb_mblock_impl(mb_mblock *mb); + mb_mblock_impl(mb_runtime_base *runtime, mb_mblock *mb, + const std::string &instance_name); ~mb_mblock_impl(); /*! @@ -79,11 +81,13 @@ public: * names and identities of our sub-component mblocks. * * \param component_name The name of the sub-component (must be unique with this mblock). - * \param component The sub-component instance. + * \param class_name The class of the instance that is to be created. + * \param user_arg The argument to pass to the constructor of the component. */ void define_component(const std::string &component_name, - mb_mblock_sptr component); + const std::string &class_name, + pmt_t user_arg); /*! * \brief connect endpoint_1 to endpoint_2 @@ -141,10 +145,10 @@ public: nconnections(); bool - walk_tree(mb_visitor *visitor, const std::string &path=""); + walk_tree(mb_visitor *visitor); mb_msg_accepter_sptr - make_accepter(const std::string port_name); + make_accepter(pmt_t port_name); mb_msg_queue & msgq() { return d_msgq; } @@ -183,10 +187,10 @@ public: mb_mblock_sptr component(const std::string &comp_name); //! Return the runtime instance - mb_runtime *runtime() { return d_runtime; } + mb_runtime_base *runtime() { return d_runtime; } //! Set the runtime instance - void set_runtime(mb_runtime *runtime) { d_runtime = runtime; } + void set_runtime(mb_runtime_base *runtime) { d_runtime = runtime; } /* * Our implementation methods @@ -210,6 +214,12 @@ private: endpoints_are_compatible(const mb_endpoint &ep0, const mb_endpoint &ep1); + /*! + * \brief walk mblock tree and invalidate all port resolution caches. + * \implementation + */ + void + invalidate_all_port_caches(); }; diff --git a/mblock/src/lib/mb_message.cc b/mblock/src/lib/mb_message.cc index 23803726a..14e349924 100644 --- a/mblock/src/lib/mb_message.cc +++ b/mblock/src/lib/mb_message.cc @@ -23,6 +23,34 @@ #include <config.h> #endif #include <mb_message.h> +#include <stdio.h> +#include <pmt_pool.h> + +static const int CACHE_LINE_SIZE = 64; // good guess + + +#if MB_MESSAGE_LOCAL_ALLOCATOR + +static pmt_pool global_msg_pool(sizeof(mb_message), CACHE_LINE_SIZE); + +void * +mb_message::operator new(size_t size) +{ + void *p = global_msg_pool.malloc(); + + // fprintf(stderr, "mb_message::new p = %p\n", p); + assert((reinterpret_cast<intptr_t>(p) & (CACHE_LINE_SIZE - 1)) == 0); + return p; +} + +void +mb_message::operator delete(void *p, size_t size) +{ + global_msg_pool.free(p); +} + +#endif + mb_message_sptr mb_make_message(pmt_t signal, pmt_t data, pmt_t metadata, mb_pri_t priority) @@ -40,3 +68,16 @@ mb_message::~mb_message() { // NOP } + +std::ostream& +operator<<(std::ostream& os, const mb_message &msg) +{ + os << "<msg: signal=" << msg.signal() + << " port_id=" << msg.port_id() + << " data=" << msg.data() + << " metadata=" << msg.metadata() + << " pri=" << msg.priority() + << ">"; + + return os; +} diff --git a/mblock/src/lib/mb_message.h b/mblock/src/lib/mb_message.h index 95440f8b7..6132866e2 100644 --- a/mblock/src/lib/mb_message.h +++ b/mblock/src/lib/mb_message.h @@ -22,6 +22,9 @@ #define INCLUDED_MB_MESSAGE_H #include <mb_common.h> +#include <iosfwd> + +#define MB_MESSAGE_LOCAL_ALLOCATOR 0 // define to 0 or 1 class mb_message; typedef boost::shared_ptr<mb_message> mb_message_sptr; @@ -66,6 +69,20 @@ public: pmt_t port_id() const { return d_port_id; } void set_port_id(pmt_t port_id){ d_port_id = port_id; } + +#if (MB_MESSAGE_LOCAL_ALLOCATOR) + void *operator new(size_t); + void operator delete(void *, size_t); +#endif }; +std::ostream& operator<<(std::ostream& os, const mb_message &msg); + +inline +std::ostream& operator<<(std::ostream& os, const mb_message_sptr msg) +{ + os << *(msg.get()); + return os; +} + #endif /* INCLUDED_MB_MESSAGE_H */ diff --git a/mblock/src/lib/mb_msg_accepter_msgq.cc b/mblock/src/lib/mb_msg_accepter_msgq.cc new file mode 100644 index 000000000..990491fcc --- /dev/null +++ b/mblock/src/lib/mb_msg_accepter_msgq.cc @@ -0,0 +1,46 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <mb_msg_accepter_msgq.h> +#include <mb_message.h> + +pmt_t s_sys_port = pmt_intern("%sys-port"); + +mb_msg_accepter_msgq::mb_msg_accepter_msgq(mb_msg_queue *msgq) + : d_msgq(msgq) +{ +} + +mb_msg_accepter_msgq::~mb_msg_accepter_msgq() +{ +} + +void +mb_msg_accepter_msgq::operator()(pmt_t signal, pmt_t data, + pmt_t metadata, mb_pri_t priority) +{ + mb_message_sptr msg = mb_make_message(signal, data, metadata, priority); + msg->set_port_id(s_sys_port); + d_msgq->insert(msg); +} diff --git a/mblock/src/lib/mb_msg_accepter_msgq.h b/mblock/src/lib/mb_msg_accepter_msgq.h new file mode 100644 index 000000000..f598c7304 --- /dev/null +++ b/mblock/src/lib/mb_msg_accepter_msgq.h @@ -0,0 +1,39 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#ifndef INCLUDED_MB_MSG_ACCEPTER_MSGQ_H +#define INCLUDED_MB_MSG_ACCEPTER_MSGQ_H + +#include <mb_msg_accepter.h> +#include <mb_msg_queue.h> + +/*! + * \brief Concrete class that accepts messages and inserts them into a message queue. + */ +class mb_msg_accepter_msgq : public mb_msg_accepter { + mb_msg_queue *d_msgq; + +public: + mb_msg_accepter_msgq(mb_msg_queue *msgq); + ~mb_msg_accepter_msgq(); + void operator()(pmt_t signal, pmt_t data, pmt_t metadata, mb_pri_t priority); +}; + +#endif /* INCLUDED_MB_MSG_ACCEPTER_MSGQ_H */ diff --git a/mblock/src/lib/mb_msg_queue.cc b/mblock/src/lib/mb_msg_queue.cc index 79e245ad6..c687e8760 100644 --- a/mblock/src/lib/mb_msg_queue.cc +++ b/mblock/src/lib/mb_msg_queue.cc @@ -83,7 +83,7 @@ mb_msg_queue::get_highest_pri_msg_helper() } } - return mb_message_sptr(); // equivalent of a zero pointer + return mb_message_sptr(); // eqv to a zero pointer } @@ -109,3 +109,20 @@ mb_msg_queue::get_highest_pri_msg() } } +mb_message_sptr +mb_msg_queue::get_highest_pri_msg_timedwait(const mb_time &abs_time) +{ + unsigned long secs = abs_time.d_secs; + unsigned long nsecs = abs_time.d_nsecs; + + omni_mutex_lock l(d_mutex); + + while (1){ + mb_message_sptr msg = get_highest_pri_msg_helper(); + if (msg) // Got one; return it + return msg; + + if (!d_not_empty.timedwait(secs, nsecs)) // timed out + return mb_message_sptr(); // eqv to zero pointer + } +} diff --git a/mblock/src/lib/mb_msg_queue.h b/mblock/src/lib/mb_msg_queue.h index 7977b7d80..4f805ea8f 100644 --- a/mblock/src/lib/mb_msg_queue.h +++ b/mblock/src/lib/mb_msg_queue.h @@ -23,6 +23,7 @@ #include <mb_common.h> #include <omnithread.h> +#include <mb_time.h> /*! * \brief priority queue for mblock messages @@ -63,6 +64,19 @@ public: * If the queue is empty, this call blocks until it can return a message. */ mb_message_sptr get_highest_pri_msg(); + + /* + * \brief Delete highest pri message from the queue and return it. + * If the queue is empty, this call blocks until it can return a message + * or real-time exceeds the absolute time, abs_time. + * + * \param abs_time specifies the latest absolute time to wait until. + * \sa mb_time::time + * + * \returns a valid mb_message_sptr, or the equivalent of a zero pointer + * if the call timed out while waiting. + */ + mb_message_sptr get_highest_pri_msg_timedwait(const mb_time &abs_time); }; #endif /* INCLUDED_MB_MSG_QUEUE_H */ diff --git a/mblock/src/lib/mb_port.cc b/mblock/src/lib/mb_port.cc index b265db2dc..28bb402d8 100644 --- a/mblock/src/lib/mb_port.cc +++ b/mblock/src/lib/mb_port.cc @@ -31,7 +31,8 @@ mb_port::mb_port(mb_mblock *mblock, const std::string &protocol_class_name, bool conjugated, mb_port::port_type_t port_type) - : d_port_name(port_name), d_conjugated(conjugated), d_port_type(port_type), + : d_port_name(port_name), d_port_symbol(pmt_intern(port_name)), + d_conjugated(conjugated), d_port_type(port_type), d_mblock(mblock) { pmt_t pc = mb_protocol_class_lookup(pmt_intern(protocol_class_name)); diff --git a/mblock/src/lib/mb_port.h b/mblock/src/lib/mb_port.h index 3c3e96368..3fdd25aff 100644 --- a/mblock/src/lib/mb_port.h +++ b/mblock/src/lib/mb_port.h @@ -40,6 +40,7 @@ public: private: std::string d_port_name; + pmt_t d_port_symbol; // the port_name as a pmt symbol pmt_t d_protocol_class; bool d_conjugated; port_type_t d_port_type; @@ -58,6 +59,7 @@ protected: public: std::string port_name() const { return d_port_name; } + pmt_t port_symbol() const { return d_port_symbol; } pmt_t protocol_class() const { return d_protocol_class; } bool conjugated() const { return d_conjugated; } port_type_t port_type() const { return d_port_type; } @@ -77,9 +79,15 @@ public: */ virtual void send(pmt_t signal, - pmt_t data = PMT_NIL, - pmt_t metadata = PMT_NIL, + pmt_t data = PMT_F, + pmt_t metadata = PMT_F, mb_pri_t priority = MB_PRI_DEFAULT) = 0; + + /* + * \brief Invalidate any cached peer resolutions + * \implementation + */ + virtual void invalidate_cache() = 0; }; #endif /* INCLUDED_MB_PORT_H */ diff --git a/mblock/src/lib/mb_port_simple.cc b/mblock/src/lib/mb_port_simple.cc index 80cb65efb..cbfb0f27f 100644 --- a/mblock/src/lib/mb_port_simple.cc +++ b/mblock/src/lib/mb_port_simple.cc @@ -37,7 +37,8 @@ mb_port_simple::mb_port_simple(mb_mblock *mblock, const std::string &protocol_class_name, bool conjugated, mb_port::port_type_t port_type) - : mb_port(mblock, port_name, protocol_class_name, conjugated, port_type) + : mb_port(mblock, port_name, protocol_class_name, conjugated, port_type), + d_cache_valid(false) { } @@ -67,6 +68,9 @@ mb_port_simple::find_accepter(mb_port_simple *start) mb_endpoint peer_ep; mb_msg_accepter_sptr r; + if (start->d_cache_valid) + return start->d_cached_accepter; + mbi_runtime_lock l(p->mblock()); // Set up initial context. @@ -78,6 +82,8 @@ mb_port_simple::find_accepter(mb_port_simple *start) case mb_port::EXTERNAL: // binding is in parent's name space context = p->mblock()->parent(); + if (!context) // can't be bound if there's no parent + return mb_msg_accepter_sptr(); // not bound break; default: @@ -97,7 +103,11 @@ mb_port_simple::find_accepter(mb_port_simple *start) case mb_port::INTERNAL: // Terminate here. case mb_port::EXTERNAL: r = pp->make_accepter(); - // FIXME cache the result + + // cache the result + + start->d_cached_accepter = r; + start->d_cache_valid = true; return r; case mb_port::RELAY: // Traverse to other side of relay port. @@ -130,5 +140,12 @@ mb_port_simple::find_accepter(mb_port_simple *start) mb_msg_accepter_sptr mb_port_simple::make_accepter() { - return d_mblock->impl()->make_accepter(port_name()); + return d_mblock->impl()->make_accepter(port_symbol()); +} + +void +mb_port_simple::invalidate_cache() +{ + d_cache_valid = false; + d_cached_accepter.reset(); } diff --git a/mblock/src/lib/mb_port_simple.h b/mblock/src/lib/mb_port_simple.h index 5cfbd3dc0..9bfe0eaf7 100644 --- a/mblock/src/lib/mb_port_simple.h +++ b/mblock/src/lib/mb_port_simple.h @@ -28,6 +28,9 @@ */ class mb_port_simple : public mb_port { + bool d_cache_valid; + mb_msg_accepter_sptr d_cached_accepter; + protected: static mb_msg_accepter_sptr find_accepter(mb_port_simple *start); @@ -57,6 +60,13 @@ public: pmt_t data = PMT_NIL, pmt_t metadata = PMT_NIL, mb_pri_t priority = MB_PRI_DEFAULT); + + /* + * \brief Invalidate any cached peer resolutions + * \implementation + */ + void invalidate_cache(); + }; #endif /* INCLUDED_MB_PORT_SIMPLE_H */ diff --git a/mblock/src/lib/mb_protocol_class.cc b/mblock/src/lib/mb_protocol_class.cc index f3eeb6035..a11017fa9 100644 --- a/mblock/src/lib/mb_protocol_class.cc +++ b/mblock/src/lib/mb_protocol_class.cc @@ -24,6 +24,7 @@ #endif #include <mb_protocol_class.h> +#include <iostream> static pmt_t s_ALL_PROTOCOL_CLASSES = PMT_NIL; @@ -80,3 +81,25 @@ mb_protocol_class_lookup(pmt_t name) return PMT_NIL; } + +mb_protocol_class_init::mb_protocol_class_init(const char *data, size_t len) +{ + std::stringbuf sb; + sb.str(std::string(data, len)); + + while (1){ + pmt_t obj = pmt_deserialize(sb); + + if (0){ + pmt_write(obj, std::cout); + std::cout << std::endl; + } + + if (pmt_is_eof_object(obj)) + return; + + mb_make_protocol_class(pmt_nth(0, obj), // protocol-class name + pmt_nth(1, obj), // list of incoming msg names + pmt_nth(2, obj)); // list of outgoing msg names + } +} diff --git a/mblock/src/lib/mb_protocol_class.h b/mblock/src/lib/mb_protocol_class.h index f4382ada1..60b87709d 100644 --- a/mblock/src/lib/mb_protocol_class.h +++ b/mblock/src/lib/mb_protocol_class.h @@ -39,4 +39,14 @@ pmt_t mb_protocol_class_outgoing(pmt_t pc); //< return outgoing message set pmt_t mb_protocol_class_lookup(pmt_t name); //< lookup an existing protocol class by name + +/*! + * \brief Initialize one or more protocol class from a serialized description. + * Used by machine generated code. + */ +class mb_protocol_class_init { +public: + mb_protocol_class_init(const char *data, size_t len); +}; + #endif /* INCLUDED_MB_PROTOCOL_CLASS_H */ diff --git a/mblock/src/lib/mb_runtime.cc b/mblock/src/lib/mb_runtime.cc index 34a0af358..7f3b78678 100644 --- a/mblock/src/lib/mb_runtime.cc +++ b/mblock/src/lib/mb_runtime.cc @@ -24,15 +24,16 @@ #endif #include <mb_runtime.h> -#include <mb_runtime_thread_per_mblock.h> +#include <mb_runtime_thread_per_block.h> mb_runtime_sptr mb_make_runtime() { - return mb_runtime_sptr(new mb_runtime_thread_per_mblock()); + return mb_runtime_sptr(new mb_runtime_thread_per_block()); } mb_runtime::~mb_runtime() { // nop } + diff --git a/mblock/src/lib/mb_runtime.h b/mblock/src/lib/mb_runtime.h index 0929e30dc..a804af06f 100644 --- a/mblock/src/lib/mb_runtime.h +++ b/mblock/src/lib/mb_runtime.h @@ -37,40 +37,32 @@ mb_runtime_sptr mb_make_runtime(); class mb_runtime : boost::noncopyable, public boost::enable_shared_from_this<mb_runtime> { - omni_mutex d_brl; // big runtime lock (avoid using this if possible...) +protected: + mb_mblock_sptr d_top; public: mb_runtime(){} virtual ~mb_runtime(); /*! - * \brief Run the mblock hierarchy rooted at \p top + * \brief Construct and run the specified mblock hierarchy. * * This routine turns into the m-block scheduler, and * blocks until the system is shutdown. * - * \param top top-level mblock + * \param name name of the top-level mblock (conventionally "top") + * \param class_name The class of the top-level mblock to create. + * \param user_arg The argument to pass to the top-level mblock constructor + * * \returns true if the system ran successfully. */ - virtual bool run(mb_mblock_sptr top) = 0; - - - // ---------------------------------------------------------------- - // Stuff from here down is really private to the implementation... - // ---------------------------------------------------------------- - - /*! - * \brief lock the big runtime lock - * \implementation - */ - inline void lock() { d_brl.lock(); } - - /*! - * \brief unlock the big runtime lock - * \implementation - */ - inline void unlock() { d_brl.unlock(); } + virtual bool run(const std::string &instance_name, + const std::string &class_name, + pmt_t user_arg, + pmt_t *result = 0) = 0; + // QA only... + mb_mblock_sptr top() { return d_top; } }; #endif /* INCLUDED_MB_RUNTIME_H */ diff --git a/mblock/src/lib/mb_runtime_placeholder.cc b/mblock/src/lib/mb_runtime_base.cc index 5ce7cc2ac..104b6afcc 100644 --- a/mblock/src/lib/mb_runtime_placeholder.cc +++ b/mblock/src/lib/mb_runtime_base.cc @@ -22,36 +22,36 @@ #ifdef HAVE_CONFIG_H #include <config.h> #endif -#include <mb_runtime_placeholder.h> -#include <mb_mblock.h> -#include <mb_exception.h> - - -static mb_runtime *s_singleton = 0; +#include <mb_runtime_base.h> +/* + * Default nop implementations... + */ -mb_runtime_placeholder::mb_runtime_placeholder() +void +mb_runtime_base::request_shutdown(pmt_t result) { - // nop } -mb_runtime_placeholder::~mb_runtime_placeholder() +pmt_t +mb_runtime_base::schedule_one_shot_timeout(const mb_time &abs_time, + pmt_t user_data, + mb_msg_accepter_sptr accepter) { - // nop + return PMT_F; } -bool -mb_runtime_placeholder::run(mb_mblock_sptr top) +pmt_t +mb_runtime_base::schedule_periodic_timeout(const mb_time &first_abs_time, + const mb_time &delta_time, + pmt_t user_data, + mb_msg_accepter_sptr accepter) { - throw mbe_not_implemented(top.get(), "mb_runtime_placeholder::run"); + return PMT_F; } -mb_runtime * -mb_runtime_placeholder::singleton() +void +mb_runtime_base::cancel_timeout(pmt_t handle) { - if (s_singleton) - return s_singleton; - - s_singleton = new mb_runtime_placeholder(); - return s_singleton; } + diff --git a/mblock/src/lib/mb_runtime_base.h b/mblock/src/lib/mb_runtime_base.h new file mode 100644 index 000000000..cb76e4503 --- /dev/null +++ b/mblock/src/lib/mb_runtime_base.h @@ -0,0 +1,78 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef INCLUDED_MB_RUNTIME_BASE_H +#define INCLUDED_MB_RUNTIME_BASE_H + +#include <mb_runtime.h> +#include <omnithread.h> +#include <mb_time.h> + +/* + * \brief This is the runtime class used by the implementation. + */ +class mb_runtime_base : public mb_runtime +{ + omni_mutex d_brl; // big runtime lock (avoid using this if possible...) + +protected: + mb_msg_accepter_sptr d_accepter; + +public: + + /*! + * \brief lock the big runtime lock + * \implementation + */ + inline void lock() { d_brl.lock(); } + + /*! + * \brief unlock the big runtime lock + * \implementation + */ + inline void unlock() { d_brl.unlock(); } + + virtual void request_shutdown(pmt_t result); + + virtual mb_mblock_sptr + create_component(const std::string &instance_name, + const std::string &class_name, + pmt_t user_arg) = 0; + + virtual pmt_t + schedule_one_shot_timeout(const mb_time &abs_time, pmt_t user_data, + mb_msg_accepter_sptr accepter); + + virtual pmt_t + schedule_periodic_timeout(const mb_time &first_abs_time, + const mb_time &delta_time, + pmt_t user_data, + mb_msg_accepter_sptr accepter); + virtual void + cancel_timeout(pmt_t handle); + + mb_msg_accepter_sptr + accepter() { return d_accepter; } + +}; + + +#endif /* INCLUDED_MB_RUNTIME_BASE_H */ diff --git a/mblock/src/lib/mb_runtime_nop.cc b/mblock/src/lib/mb_runtime_nop.cc index a19f0793a..bcae1ebd5 100644 --- a/mblock/src/lib/mb_runtime_nop.cc +++ b/mblock/src/lib/mb_runtime_nop.cc @@ -24,6 +24,8 @@ #endif #include <mb_runtime_nop.h> #include <mb_mblock.h> +#include <mb_class_registry.h> +#include <mb_exception.h> mb_runtime_sptr mb_make_runtime_nop() @@ -42,23 +44,41 @@ mb_runtime_nop::~mb_runtime_nop() // nop for now } + bool -mb_runtime_nop::run(mb_mblock_sptr top) +mb_runtime_nop::run(const std::string &instance_name, + const std::string &class_name, + pmt_t user_arg, pmt_t *result) { class initial_visitor : public mb_visitor { public: - bool operator()(mb_mblock *mblock, const std::string &path) + bool operator()(mb_mblock *mblock) { - mblock->set_instance_name(path); - mblock->init_fsm(); + mblock->initial_transition(); return true; } }; - initial_visitor visitor; + initial_visitor visitor; + + if (result) + *result = PMT_T; - top->walk_tree(&visitor); + d_top = create_component(instance_name, class_name, user_arg); + d_top->walk_tree(&visitor); return true; } + +mb_mblock_sptr +mb_runtime_nop::create_component(const std::string &instance_name, + const std::string &class_name, + pmt_t user_arg) +{ + mb_mblock_maker_t maker; + if (!mb_class_registry::lookup_maker(class_name, &maker)) + throw mbe_no_such_class(0, class_name + " (in " + instance_name + ")"); + + return maker(this, instance_name, user_arg); +} diff --git a/mblock/src/lib/mb_runtime_nop.h b/mblock/src/lib/mb_runtime_nop.h index dc7887991..d7fe105bd 100644 --- a/mblock/src/lib/mb_runtime_nop.h +++ b/mblock/src/lib/mb_runtime_nop.h @@ -21,7 +21,7 @@ #ifndef INCLUDED_MB_RUNTIME_NOP_H #define INCLUDED_MB_RUNTIME_NOP_H -#include <mb_runtime.h> +#include <mb_runtime_base.h> /*! * \brief Public constructor (factory) for mb_runtime_nop objects. @@ -31,14 +31,22 @@ mb_runtime_sptr mb_make_runtime_nop(); /*! * \brief Concrete runtime that does nothing. Used only during early QA tests. */ -class mb_runtime_nop : public mb_runtime +class mb_runtime_nop : public mb_runtime_base { - public: mb_runtime_nop(); ~mb_runtime_nop(); - bool run(mb_mblock_sptr top); + bool run(const std::string &instance_name, + const std::string &class_name, + pmt_t user_arg, + pmt_t *result); + +protected: + mb_mblock_sptr + create_component(const std::string &instance_name, + const std::string &class_name, + pmt_t user_arg); }; #endif /* INCLUDED_MB_RUNTIME_NOP_H */ diff --git a/mblock/src/lib/mb_runtime_thread_per_block.cc b/mblock/src/lib/mb_runtime_thread_per_block.cc new file mode 100644 index 000000000..7ad87aa23 --- /dev/null +++ b/mblock/src/lib/mb_runtime_thread_per_block.cc @@ -0,0 +1,349 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <mb_runtime_thread_per_block.h> +#include <mb_mblock.h> +#include <mb_mblock_impl.h> +#include <mb_class_registry.h> +#include <mb_exception.h> +#include <mb_worker.h> +#include <omnithread.h> +#include <iostream> +#include <mb_msg_accepter_msgq.h> + + +static pmt_t s_halt = pmt_intern("%halt"); +static pmt_t s_sys_port = pmt_intern("%sys-port"); +static pmt_t s_shutdown = pmt_intern("%shutdown"); +static pmt_t s_request_shutdown = pmt_intern("%request-shutdown"); +static pmt_t s_worker_state_changed = pmt_intern("%worker-state-changed"); +static pmt_t s_timeout = pmt_intern("%timeout"); +static pmt_t s_request_timeout = pmt_intern("%request-timeout"); +static pmt_t s_cancel_timeout = pmt_intern("%cancel-timeout"); +static pmt_t s_send_halt = pmt_intern("send-halt"); +static pmt_t s_exit_now = pmt_intern("exit-now"); + +static void +send_sys_msg(mb_msg_queue &msgq, pmt_t signal, + pmt_t data = PMT_F, pmt_t metadata = PMT_F, + mb_pri_t priority = MB_PRI_BEST) +{ + mb_message_sptr msg = mb_make_message(signal, data, metadata, priority); + msg->set_port_id(s_sys_port); + msgq.insert(msg); +} + + +mb_runtime_thread_per_block::mb_runtime_thread_per_block() + : d_shutdown_in_progress(false), + d_shutdown_result(PMT_T) +{ + d_accepter = mb_msg_accepter_sptr(new mb_msg_accepter_msgq(&d_msgq)); +} + +mb_runtime_thread_per_block::~mb_runtime_thread_per_block() +{ + // FIXME iterate over workers and ensure that they are dead. + + if (!d_workers.empty()) + std::cerr << "\nmb_runtime_thread_per_block: dtor (# workers = " + << d_workers.size() << ")\n"; +} + +void +mb_runtime_thread_per_block::request_shutdown(pmt_t result) +{ + (*accepter())(s_request_shutdown, result, PMT_F, MB_PRI_BEST); +} + +bool +mb_runtime_thread_per_block::run(const std::string &instance_name, + const std::string &class_name, + pmt_t user_arg, pmt_t *result) +{ + if (result) // set it to something now, in case we throw + *result = PMT_F; + + // reset the shutdown state + d_shutdown_in_progress = false; + d_shutdown_result = PMT_T; + + assert(d_workers.empty()); + + while (!d_timer_queue.empty()) // ensure timer queue is empty + d_timer_queue.pop(); + + /* + * Create the top-level component, and recursively all of its + * subcomponents. + */ + d_top = create_component(instance_name, class_name, user_arg); + + try { + run_loop(); + } + catch (...){ + d_top.reset(); + throw; + } + + if (result) + *result = d_shutdown_result; + + d_top.reset(); + return true; +} + +void +mb_runtime_thread_per_block::run_loop() +{ + while (1){ + mb_message_sptr msg; + + if (d_timer_queue.empty()) // Any timeouts pending? + msg = d_msgq.get_highest_pri_msg(); // Nope. Block forever. + + else { + mb_timeout_sptr to = d_timer_queue.top(); // Yep. Get earliest timeout. + + // wait for a msg or the timeout... + msg = d_msgq.get_highest_pri_msg_timedwait(to->d_when); + + if (!msg){ // We timed out. + d_timer_queue.pop(); // Remove timeout from timer queue. + + // send the %timeout msg + (*to->d_accepter)(s_timeout, to->d_user_data, to->handle(), MB_PRI_BEST); + + if (to->d_is_periodic){ + to->d_when = to->d_when + to->d_delta; // update time of next firing + d_timer_queue.push(to); // push it back into the queue + } + continue; + } + } + + pmt_t signal = msg->signal(); + + if (pmt_eq(signal, s_worker_state_changed)){ // %worker-state-changed + omni_mutex_lock l1(d_workers_mutex); + reap_dead_workers(); + if (d_workers.empty()) // no work left to do... + return; + } + else if (pmt_eq(signal, s_request_shutdown)){ // %request-shutdown + if (!d_shutdown_in_progress){ + d_shutdown_in_progress = true; + d_shutdown_result = msg->data(); + + // schedule a timeout for ourselves... + schedule_one_shot_timeout(mb_time::time(0.100), s_send_halt, d_accepter); + send_all_sys_msg(s_shutdown); + } + } + else if (pmt_eq(signal, s_request_timeout)){ // %request-timeout + mb_timeout_sptr to = + boost::any_cast<mb_timeout_sptr>(pmt_any_ref(msg->data())); + d_timer_queue.push(to); + } + else if (pmt_eq(signal, s_cancel_timeout)){ // %cancel-timeout + d_timer_queue.cancel(msg->data()); + } + else if (pmt_eq(signal, s_timeout) + && pmt_eq(msg->data(), s_send_halt)){ // %timeout, send-halt + + // schedule another timeout for ourselves... + schedule_one_shot_timeout(mb_time::time(0.100), s_exit_now, d_accepter); + send_all_sys_msg(s_halt); + } + else if (pmt_eq(signal, s_timeout) + && pmt_eq(msg->data(), s_exit_now)){ // %timeout, exit-now + + // We only get here if we've sent all workers %shutdown followed + // by %halt, and one or more of them is still alive. They must + // be blocked in the kernel. FIXME We could add one more step: + // pthread_kill(...) but for now, we'll just ignore them... + return; + } + else { + std::cerr << "mb_runtime_thread_per_block: unhandled msg: " << msg << std::endl; + } + } +} + +void +mb_runtime_thread_per_block::reap_dead_workers() +{ + // Already holding mutex + // omni_mutex_lock l1(d_workers_mutex); + + for (worker_iter_t wi = d_workers.begin(); wi != d_workers.end(); ){ + bool is_dead; + + // We can't join while holding the worker mutex, since that would + // attempt to destroy the mutex we're holding (omnithread's join + // deletes the omni_thread object after the pthread_join + // completes) Instead, we lock just long enough to figure out if + // the worker is dead. + { + omni_mutex_lock l2((*wi)->d_mutex); + is_dead = (*wi)->d_state == mb_worker::TS_DEAD; + } + + if (is_dead){ + if (0) + std::cerr << "\nruntime: " + << "(" << (*wi)->id() << ") " + << (*wi)->d_mblock->instance_name() << " is TS_DEAD\n"; + void *ignore; + (*wi)->join(&ignore); + wi = d_workers.erase(wi); + continue; + } + ++wi; + } +} + +// +// Create the thread, then create the component in the thread. +// Return a pointer to the created mblock. +// +// Can be invoked from any thread +// +mb_mblock_sptr +mb_runtime_thread_per_block::create_component(const std::string &instance_name, + const std::string &class_name, + pmt_t user_arg) +{ + mb_mblock_maker_t maker; + if (!mb_class_registry::lookup_maker(class_name, &maker)) + throw mbe_no_such_class(0, class_name + " (in " + instance_name + ")"); + + // FIXME here's where we'd lookup NUMA placement requests & mblock + // priorities and communicate them to the worker we're creating... + + // Create the worker thread + mb_worker *w = + new mb_worker(this, maker, instance_name, user_arg); + + w->start_undetached(); // start it + + // Wait for it to reach TS_RUNNING or TS_DEAD + + bool is_dead; + mb_worker::cause_of_death_t why_dead; + { + omni_mutex_lock l(w->d_mutex); + while (!(w->d_state == mb_worker::TS_RUNNING + || w->d_state == mb_worker::TS_DEAD)) + w->d_state_cond.wait(); + + is_dead = w->d_state == mb_worker::TS_DEAD; + why_dead = w->d_why_dead; + } + + // If the worker failed to init (constructor or initial_transition + // raised an exception), reap the worker now and raise an exception. + + if (is_dead && why_dead != mb_worker::RIP_EXIT){ + + void *ignore; + w->join(&ignore); + + // FIXME with some work we ought to be able to propagate the + // exception from the worker. + throw mbe_mblock_failed(0, instance_name); + } + + assert(w->d_mblock); + + // Add w to the vector of workers, and return the mblock. + { + omni_mutex_lock l(d_workers_mutex); + d_workers.push_back(w); + } + + if (0) + std::cerr << "\nruntime: created " + << "(" << w->id() << ") " + << w->d_mblock->instance_name() << "\n"; + + return w->d_mblock; +} + +void +mb_runtime_thread_per_block::send_all_sys_msg(pmt_t signal, + pmt_t data, + pmt_t metadata, + mb_pri_t priority) +{ + omni_mutex_lock l1(d_workers_mutex); + + for (worker_iter_t wi = d_workers.begin(); wi != d_workers.end(); ++wi){ + send_sys_msg((*wi)->d_mblock->impl()->msgq(), + signal, data, metadata, priority); + } +} + +// +// Can be invoked from any thread. +// Sends a message to the runtime. +// +pmt_t +mb_runtime_thread_per_block::schedule_one_shot_timeout + (const mb_time &abs_time, + pmt_t user_data, + mb_msg_accepter_sptr accepter) +{ + mb_timeout_sptr to(new mb_timeout(abs_time, user_data, accepter)); + (*d_accepter)(s_request_timeout, pmt_make_any(to), PMT_F, MB_PRI_BEST); + return to->handle(); +} + +// +// Can be invoked from any thread. +// Sends a message to the runtime. +// +pmt_t +mb_runtime_thread_per_block::schedule_periodic_timeout + (const mb_time &first_abs_time, + const mb_time &delta_time, + pmt_t user_data, + mb_msg_accepter_sptr accepter) +{ + mb_timeout_sptr to(new mb_timeout(first_abs_time, delta_time, + user_data, accepter)); + (*d_accepter)(s_request_timeout, pmt_make_any(to), PMT_F, MB_PRI_BEST); + return to->handle(); +} + +// +// Can be invoked from any thread. +// Sends a message to the runtime. +// +void +mb_runtime_thread_per_block::cancel_timeout(pmt_t handle) +{ + (*d_accepter)(s_cancel_timeout, handle, PMT_F, MB_PRI_BEST); +} diff --git a/mblock/src/lib/mb_runtime_thread_per_block.h b/mblock/src/lib/mb_runtime_thread_per_block.h new file mode 100644 index 000000000..773b8b4e9 --- /dev/null +++ b/mblock/src/lib/mb_runtime_thread_per_block.h @@ -0,0 +1,84 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#ifndef INCLUDED_MB_RUNTIME_THREAD_PER_BLOCK_H +#define INCLUDED_MB_RUNTIME_THREAD_PER_BLOCK_H + +#include <mb_runtime_base.h> +#include <mb_worker.h> +#include <mb_msg_queue.h> +#include <mb_timer_queue.h> + +/*! + * \brief Concrete runtime that uses a thread per mblock + * \implementation + * + * These are all implementation details. + */ +class mb_runtime_thread_per_block : public mb_runtime_base +{ +public: + omni_mutex d_workers_mutex; // hold while manipulating d_workers + std::vector<mb_worker*> d_workers; + bool d_shutdown_in_progress; + pmt_t d_shutdown_result; + mb_msg_queue d_msgq; + mb_timer_queue d_timer_queue; + + typedef std::vector<mb_worker*>::iterator worker_iter_t; + + mb_runtime_thread_per_block(); + ~mb_runtime_thread_per_block(); + + bool run(const std::string &instance_name, + const std::string &class_name, + pmt_t user_arg, + pmt_t *result); + + void request_shutdown(pmt_t result); + +protected: + mb_mblock_sptr + create_component(const std::string &instance_name, + const std::string &class_name, + pmt_t user_arg); + + pmt_t + schedule_one_shot_timeout(const mb_time &abs_time, pmt_t user_data, + mb_msg_accepter_sptr accepter); + + pmt_t + schedule_periodic_timeout(const mb_time &first_abs_time, + const mb_time &delta_time, + pmt_t user_data, + mb_msg_accepter_sptr accepter); + void + cancel_timeout(pmt_t handle); + +private: + void reap_dead_workers(); + void run_loop(); + + void send_all_sys_msg(pmt_t signal, pmt_t data = PMT_F, + pmt_t metadata = PMT_F, + mb_pri_t priority = MB_PRI_BEST); +}; + +#endif /* INCLUDED_MB_RUNTIME_THREAD_PER_BLOCK_H */ diff --git a/mblock/src/lib/mb_time.cc b/mblock/src/lib/mb_time.cc new file mode 100644 index 000000000..73c86e4f4 --- /dev/null +++ b/mblock/src/lib/mb_time.cc @@ -0,0 +1,84 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <mb_time.h> +#include <omnithread.h> +#include <math.h> +#include <assert.h> + + +mb_time::mb_time(double real_secs) +{ + double floor_secs = floor(real_secs); + d_secs = (long) floor_secs; + d_nsecs = (long) ((real_secs - floor_secs) * 1e9); // always positive +} + +mb_time +mb_time::time(const mb_time &delta_t) +{ + unsigned long abs_sec, abs_nsec; + unsigned long rel_sec = delta_t.d_secs; + unsigned long rel_nsec = delta_t.d_nsecs; + + omni_thread::get_time(&abs_sec, &abs_nsec, rel_sec, rel_nsec); + return mb_time(abs_sec, abs_nsec); +} + + +mb_time +operator+(const mb_time &x, const mb_time &y) +{ + mb_time r(x.d_secs + y.d_secs, x.d_nsecs + y.d_nsecs); + while (r.d_nsecs >= 1000000000){ + r.d_nsecs -= 1000000000; + r.d_secs++; + } + return r; +} + +mb_time +operator-(const mb_time &x, const mb_time &y) +{ + // assert(!(x < y)); + + mb_time r(x.d_secs - y.d_secs, x.d_nsecs - y.d_nsecs); + while (r.d_nsecs < 0){ + r.d_nsecs += 1000000000; + r.d_secs--; + } + return r; +} + +mb_time +operator+(const mb_time &x, double y) +{ + return x + mb_time(y); +} + +mb_time +operator-(const mb_time &x, double y) +{ + return x - mb_time(y); +} diff --git a/mblock/src/lib/mb_time.h b/mblock/src/lib/mb_time.h new file mode 100644 index 000000000..872304caa --- /dev/null +++ b/mblock/src/lib/mb_time.h @@ -0,0 +1,89 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#ifndef INCLUDED_MB_TIME_H +#define INCLUDED_MB_TIME_H + +struct mb_time { + long int d_secs; // seconds. + long int d_nsecs; // nanoseconds. Always in [0, 1e9-1] + + mb_time() : d_secs(0), d_nsecs(0) {} + mb_time(long secs, long nanosecs=0) : d_secs(secs), d_nsecs(nanosecs) {} + + // N.B., this only makes sense for differences between times. + // Double doesn't have enough bits to precisely represent an absolute time. + mb_time(double secs); + + // N.B. This only makes sense for differences between times. + // Double doesn't have enough bits to precisely represent an absolute time. + double double_time() const { return (double)d_secs + d_nsecs * 1e-9; } + + /*! + * \brief Return an absolute time suitable for use with + * schedule_one_shot_timeout & schedule_periodic_timeout + * + * The return value is the current time plus the given relative offset. + */ + static mb_time time(const mb_time &relative_offset = mb_time()); +}; + + +inline static bool +operator<(const mb_time &x, const mb_time &y) +{ + return ((x.d_secs < y.d_secs) + || (x.d_secs == y.d_secs && x.d_nsecs < y.d_nsecs)); +} + +inline static bool +operator>(const mb_time &x, const mb_time &y) +{ + return ((x.d_secs > y.d_secs) + || (x.d_secs == y.d_secs && x.d_nsecs > y.d_nsecs)); +} + +inline static bool +operator>=(const mb_time &x, const mb_time &y) +{ + return ((x.d_secs > y.d_secs) + || (x.d_secs == y.d_secs && x.d_nsecs >= y.d_nsecs)); +} + +inline static bool +operator<=(const mb_time &x, const mb_time &y) +{ + return ((x.d_secs < y.d_secs) + || (x.d_secs == y.d_secs && x.d_nsecs <= y.d_nsecs)); +} + +inline static bool +operator==(const mb_time &x, const mb_time &y) +{ + return (x.d_secs == y.d_secs && x.d_nsecs == y.d_nsecs); +} + + +mb_time operator+(const mb_time &x, const mb_time &y); +mb_time operator+(const mb_time &x, double y); +mb_time operator-(const mb_time &x, const mb_time &y); +mb_time operator-(const mb_time &x, double y); + +#endif /* INCLUDED_MB_TIME_H */ diff --git a/mblock/src/lib/mb_timer_queue.cc b/mblock/src/lib/mb_timer_queue.cc new file mode 100644 index 000000000..026035ec5 --- /dev/null +++ b/mblock/src/lib/mb_timer_queue.cc @@ -0,0 +1,63 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <mb_timer_queue.h> + +static pmt_t +make_handle() +{ + static long counter = 0; + pmt_t n = pmt_from_long(counter++); + return pmt_list1(n); // guaranteed to be a unique object +} + +// one-shot constructor +mb_timeout::mb_timeout(const mb_time &abs_time, + pmt_t user_data, mb_msg_accepter_sptr accepter) + : d_when(abs_time), d_is_periodic(false), + d_user_data(user_data), d_handle(make_handle()), d_accepter(accepter) +{ +} + +// periodic constructor +mb_timeout::mb_timeout(const mb_time &first_abs_time, const mb_time &delta_time, + pmt_t user_data, mb_msg_accepter_sptr accepter) + : d_when(first_abs_time), d_delta(delta_time), d_is_periodic(true), + d_user_data(user_data), d_handle(make_handle()), d_accepter(accepter) +{ +} + +void +mb_timer_queue::cancel(pmt_t handle) +{ + container_type::iterator it; + + for (it = c.begin(); it != c.end();){ + if (pmt_equal((*it)->handle(), handle)) + it = c.erase(it); + else + ++it; + } + std::make_heap(c.begin(), c.end(), comp); +} diff --git a/mblock/src/lib/mb_timer_queue.h b/mblock/src/lib/mb_timer_queue.h new file mode 100644 index 000000000..a17688331 --- /dev/null +++ b/mblock/src/lib/mb_timer_queue.h @@ -0,0 +1,73 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef INCLUDED_MB_TIMER_QUEUE_H +#define INCLUDED_MB_TIMER_QUEUE_H + +#include <mb_time.h> +#include <vector> +#include <queue> +#include <pmt.h> +#include <mb_msg_accepter.h> + +class mb_timeout { +public: + mb_time d_when; // absolute time to fire timeout + mb_time d_delta; // if periodic, delta_t to next timeout + bool d_is_periodic; // true iff this is a periodic timeout + pmt_t d_user_data; // data from %timeout msg + pmt_t d_handle; // handle for cancellation + mb_msg_accepter_sptr d_accepter; // where to send the message + + // one-shot constructor + mb_timeout(const mb_time &abs_time, + pmt_t user_data, mb_msg_accepter_sptr accepter); + + // periodic constructor + mb_timeout(const mb_time &first_abs_time, const mb_time &delta_time, + pmt_t user_data, mb_msg_accepter_sptr accepter); + + pmt_t handle() const { return d_handle; } +}; + +typedef boost::shared_ptr<mb_timeout> mb_timeout_sptr; + + +//! Sort criterion for priority_queue +class timeout_later +{ +public: + bool operator() (const mb_timeout_sptr t1, const mb_timeout_sptr t2) + { + return t1->d_when > t2->d_when; + } +}; + + +class mb_timer_queue : public std::priority_queue<mb_timeout_sptr, + std::vector<mb_timeout_sptr>, + timeout_later> +{ +public: + void cancel(pmt_t handle); +}; + +#endif /* INCLUDED_MB_TIMER_QUEUE_H */ diff --git a/mblock/src/lib/mb_worker.cc b/mblock/src/lib/mb_worker.cc new file mode 100644 index 000000000..8a9248443 --- /dev/null +++ b/mblock/src/lib/mb_worker.cc @@ -0,0 +1,178 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <mb_worker.h> +#include <mb_runtime_thread_per_block.h> +#include <mb_exception.h> +#include <mb_mblock.h> +#include <mb_gettid.h> +#include <mb_msg_accepter.h> +#include <iostream> +#ifdef HAVE_SCHED_H +#include <sched.h> +#endif + +#define VERBOSE 0 // define to 0 or 1 + + +static pmt_t s_worker_state_changed = pmt_intern("%worker-state-changed"); + + +mb_worker::mb_worker(mb_runtime_thread_per_block *runtime, + mb_mblock_maker_t maker, + const std::string &instance_name, + pmt_t user_arg) + : omni_thread((void *) 0, PRIORITY_NORMAL), + d_runtime(runtime), d_maker(maker), + d_instance_name(instance_name), d_user_arg(user_arg), + d_state_cond(&d_mutex), d_state(TS_UNINITIALIZED), + d_why_dead(RIP_NOT_DEAD_YET) +{ +} + +#if 0 +mb_worker::~mb_worker() +{ +} +#endif + +#ifdef HAVE_SCHED_SETAFFINITY +static void +set_affinity(const std::string &instance_name, const std::string &class_name) +{ + //static int counter = 0; + cpu_set_t mask; + CPU_ZERO(&mask); + + if (0){ + + //CPU_SET(counter & 0x1, &mask); + //counter++; + CPU_SET(0, &mask); + + int r = sched_setaffinity(mb_gettid(), sizeof(mask), &mask); + if (r == -1) + perror("sched_setaffinity"); + } +} +#else +static void +set_affinity(const std::string &instance_name, const std::string &class_name) +{ +} +#endif + +void +mb_worker::set_state(worker_state_t state) +{ + { + omni_mutex_lock l2(d_mutex); + + d_state = state; // update our state + d_state_cond.broadcast(); // Notify everybody who cares... + } + + // send msg to runtime, telling it something changed. + (*d_runtime->accepter())(s_worker_state_changed, PMT_F, PMT_F, MB_PRI_BEST); +} + +void * +mb_worker::run_undetached(void *ignored) +{ + // FIXME add pthread_sigmask stuff + + //set_affinity(d_instance_name, d_class_name); + set_affinity(d_instance_name, ""); + + try { + worker_thread_top_level(); + d_why_dead = RIP_EXIT; + } + catch (mbe_terminate){ + d_why_dead = RIP_TERMINATE; + } + catch (mbe_exit){ + d_why_dead = RIP_EXIT; + } + catch (std::logic_error e){ + if (d_why_dead == RIP_NOT_DEAD_YET) + d_why_dead = RIP_UNHANDLED_EXCEPTION; + + std::cerr << "\nmb_worker::run_undetached: unhandled exception:\n"; + std::cerr << " " << e.what() << std::endl; + } + catch (...){ + if (d_why_dead == RIP_NOT_DEAD_YET) + d_why_dead = RIP_UNHANDLED_EXCEPTION; + } + + if (VERBOSE) + std::cerr << "\nrun_undetached: about to return, d_why_dead = " + << d_why_dead << std::endl; + + set_state(TS_DEAD); + return 0; +} + +void +mb_worker::worker_thread_top_level() +{ + if (VERBOSE) + std::cerr << "worker_thread_top_level (enter):" << std::endl + << " instance_name: " << d_instance_name << std::endl + << " omnithread id: " << id() << std::endl + << " gettid: " << mb_gettid() << std::endl + << " getpid: " << getpid() << std::endl; + + cause_of_death_t pending_cause_of_death = RIP_NOT_DEAD_YET; + + try { + pending_cause_of_death = RIP_CTOR_EXCEPTION; + d_mblock = d_maker(d_runtime, d_instance_name, d_user_arg); + + if (VERBOSE) + std::cerr << "worker_thread_top_level (post-construction):" << std::endl + << " instance_name: " << d_instance_name << std::endl; + + pending_cause_of_death = RIP_INIT_EXCEPTION; + d_mblock->initial_transition(); + + if (VERBOSE) + std::cerr << "worker_thread_top_level (post-initial-transition):" << std::endl + << " instance_name: " << d_instance_name << std::endl; + + set_state(TS_RUNNING); + + pending_cause_of_death = RIP_UNHANDLED_EXCEPTION; + d_mblock->main_loop(); + } + catch (...){ + d_why_dead = pending_cause_of_death; + throw; + } + + if (VERBOSE) + std::cerr << "worker_thread_top_level (exit):" << std::endl + << " instance_name: " << d_instance_name << std::endl; +} diff --git a/mblock/src/lib/mb_worker.h b/mblock/src/lib/mb_worker.h new file mode 100644 index 000000000..b207f4db5 --- /dev/null +++ b/mblock/src/lib/mb_worker.h @@ -0,0 +1,106 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef INCLUDED_MB_WORKER_H +#define INCLUDED_MB_WORKER_H + +#include <omnithread.h> +#include <mb_common.h> +#include <mb_class_registry.h> + + +class mb_worker; +//typedef boost::shared_ptr<mb_worker> mb_worker_sptr; + +class mb_runtime_thread_per_block; + +/*! + * \brief Worker thread for thread_per_block runtime + * \implementation + */ +class mb_worker : public omni_thread +{ +public: + //! worker thread states + enum worker_state_t { + TS_UNINITIALIZED, // new, uninitialized + TS_RUNNING, // normal steady-state condition. + TS_DEAD // thread is dead + }; + + //! why we're dead + enum cause_of_death_t { + RIP_NOT_DEAD_YET, // not dead + RIP_EXIT, // normal exit + RIP_TERMINATE, // caught terminate exception + RIP_CTOR_EXCEPTION, // constructor raised an exception + RIP_INIT_EXCEPTION, // initial_transition rasised an exception + RIP_UNHANDLED_EXCEPTION // somebody (most likely handle_message) raised an exception + }; + + /* + * Args used by new thread to create mb_mblock + */ + mb_runtime_thread_per_block *d_runtime; + mb_mblock_maker_t d_maker; + std::string d_instance_name; + pmt_t d_user_arg; + + mb_mblock_sptr d_mblock; //< holds pointer to created mblock + + /*! + * \brief General mutex for all these fields. + * + * They are accessed by both the main runtime thread and the newly + * created thread that runs the mblock's main loop. + */ + omni_mutex d_mutex; + omni_condition d_state_cond; //< state change notifications + worker_state_t d_state; + cause_of_death_t d_why_dead; + + mb_worker(mb_runtime_thread_per_block *runtime, + mb_mblock_maker_t maker, + const std::string &instance_name, + pmt_t user_arg); + + // ~mb_worker(); + + + /*! + * \brief This code runs as the top-level of the new thread + */ + void worker_thread_top_level(); + + /*! + * \brief Invokes the top-level of the new thread (name kind of sucks) + */ + void *run_undetached(void *arg); + +private: + // Neither d_mutex nor runtime->d_mutex may be held while calling this. + // It locks and unlocks them itself. + void set_state(worker_state_t state); +}; + + + +#endif /* INCLUDED_MB_WORKER_H */ diff --git a/mblock/src/lib/mbi_runtime_lock.h b/mblock/src/lib/mbi_runtime_lock.h index 3138a7e1e..4174f6d54 100644 --- a/mblock/src/lib/mbi_runtime_lock.h +++ b/mblock/src/lib/mbi_runtime_lock.h @@ -48,9 +48,9 @@ */ class mbi_runtime_lock : boost::noncopyable { - mb_runtime *d_rt; + mb_runtime_base *d_rt; public: - mbi_runtime_lock(mb_runtime *rt) : d_rt(rt) { d_rt->lock(); } + mbi_runtime_lock(mb_runtime_base *rt) : d_rt(rt) { d_rt->lock(); } mbi_runtime_lock(mb_mblock_impl *mi) : d_rt(mi->runtime()) { d_rt->lock(); } mbi_runtime_lock(mb_mblock *mb) : d_rt(mb->impl()->runtime()) { d_rt->lock(); } ~mbi_runtime_lock(void) { d_rt->unlock(); } diff --git a/mblock/src/lib/qa_bitset.cc b/mblock/src/lib/qa_bitset.cc new file mode 100644 index 000000000..0acda8232 --- /dev/null +++ b/mblock/src/lib/qa_bitset.cc @@ -0,0 +1,493 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <mb_mblock.h> +#include <mb_protocol_class.h> +#include <mb_message.h> +#include <mb_class_registry.h> +#include <iostream> +#include <sstream> +#include <bitset> + +static pmt_t s_in = pmt_intern("in"); +static pmt_t s_out = pmt_intern("out"); +static pmt_t s_data = pmt_intern("data"); +static pmt_t s_start = pmt_intern("start"); +static pmt_t s_send_batch = pmt_intern("send-batch"); +static pmt_t s_long0 = pmt_from_long(0); + +static std::string +str(long x) +{ + std::ostringstream s; + s << x; + return s.str(); +} + +/*! + * \brief mblock used for QA. + * + * Messages arriving on "in" consist of a pair containing a (long) + * message number in the car, and a (long) bitmap in the cdr. For + * each message received on "in", a new message is sent on "out". The + * new message is the same format as the input, but the bitmap in + * the cdr has a "1" or'd into it that corresponds to the bit number + * specified in the constructor. + * + * The bitmap can be used by the ultimate receiver to confirm + * traversal of a set of blocks, if the blocks are assigned unique bit + * numbers. + */ +class qa_bitset : public mb_mblock +{ + mb_port_sptr d_in; + mb_port_sptr d_out; + int d_bitno; + +public: + qa_bitset(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); + void handle_message(mb_message_sptr msg); +}; + +qa_bitset::qa_bitset(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) +{ + d_bitno = pmt_to_long(user_arg); // The bit we are to set + + d_in = define_port("in", "qa-bitset", false, mb_port::EXTERNAL); + d_out = define_port("out", "qa-bitset", true, mb_port::EXTERNAL); +} + +void +qa_bitset::handle_message(mb_message_sptr msg) +{ + if (pmt_eq(msg->port_id(), s_in) && pmt_eq(msg->signal(), s_data)){ + d_out->send(s_data, + pmt_cons(pmt_car(msg->data()), + pmt_from_long((1L << d_bitno) | pmt_to_long(pmt_cdr(msg->data()))))); + } +} + +REGISTER_MBLOCK_CLASS(qa_bitset); + +// ------------------------------------------------------------------------ + +/*! + * \brief mblock used for QA. Compose two qa_bitset mblocks. + */ +class qa_bitset2 : public mb_mblock +{ + mb_port_sptr d_in; + mb_port_sptr d_out; + +public: + qa_bitset2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); +}; + +qa_bitset2::qa_bitset2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) +{ + long bitno = pmt_to_long(user_arg); // The bit we are to set + + d_in = define_port("in", "qa-bitset", false, mb_port::RELAY); + d_out = define_port("out", "qa-bitset", true, mb_port::RELAY); + + define_component("bs0", "qa_bitset", pmt_from_long(bitno)); + define_component("bs1", "qa_bitset", pmt_from_long(bitno + 1)); + connect("self", "in", "bs0", "in"); + connect("bs0", "out", "bs1", "in"); + connect("bs1", "out", "self", "out"); +} + +REGISTER_MBLOCK_CLASS(qa_bitset2); + +// ------------------------------------------------------------------------ + +/*! + * \brief mblock used for QA. Compose two qa_bitset2 mblocks. + */ +class qa_bitset4 : public mb_mblock +{ + mb_port_sptr d_in; + mb_port_sptr d_out; + +public: + qa_bitset4(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); +}; + +qa_bitset4::qa_bitset4(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) +{ + long bitno = pmt_to_long(user_arg); // The bit we are to set + + d_in = define_port("in", "qa-bitset", false, mb_port::RELAY); + d_out = define_port("out", "qa-bitset", true, mb_port::RELAY); + + define_component("bs0", "qa_bitset2", pmt_from_long(bitno)); + define_component("bs1", "qa_bitset2", pmt_from_long(bitno + 2)); + connect("self", "in", "bs0", "in"); + connect("bs0", "out", "bs1", "in"); + connect("bs1", "out", "self", "out"); +} + +REGISTER_MBLOCK_CLASS(qa_bitset4); + +// ------------------------------------------------------------------------ + +/*! + * \brief mblock used for QA. Compose two qa_bitset4 mblocks. + */ +class qa_bitset8 : public mb_mblock +{ + mb_port_sptr d_in; + mb_port_sptr d_out; + +public: + qa_bitset8(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); +}; + +qa_bitset8::qa_bitset8(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) +{ + long bitno = pmt_to_long(user_arg); // The bit we are to set + + d_in = define_port("in", "qa-bitset", false, mb_port::RELAY); + d_out = define_port("out", "qa-bitset", true, mb_port::RELAY); + + define_component("bs0", "qa_bitset4", pmt_from_long(bitno)); + define_component("bs1", "qa_bitset4", pmt_from_long(bitno + 4)); + connect("self", "in", "bs0", "in"); + connect("bs0", "out", "bs1", "in"); + connect("bs1", "out", "self", "out"); +} + +REGISTER_MBLOCK_CLASS(qa_bitset8); + +// ------------------------------------------------------------------------ + +/*! + * \brief mblock used for QA. Compose two qa_bitset8 mblocks. + */ +class qa_bitset16 : public mb_mblock +{ + mb_port_sptr d_in; + mb_port_sptr d_out; + +public: + qa_bitset16(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); +}; + +qa_bitset16::qa_bitset16(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) +{ + long bitno = pmt_to_long(user_arg); // The bit we are to set + + d_in = define_port("in", "qa-bitset", false, mb_port::RELAY); + d_out = define_port("out", "qa-bitset", true, mb_port::RELAY); + + define_component("bs0", "qa_bitset8", pmt_from_long(bitno)); + define_component("bs1", "qa_bitset8", pmt_from_long(bitno + 8)); + connect("self", "in", "bs0", "in"); + connect("bs0", "out", "bs1", "in"); + connect("bs1", "out", "self", "out"); +} + +REGISTER_MBLOCK_CLASS(qa_bitset16); + +// ------------------------------------------------------------------------ + +/*! + * \brief mblock used for QA. Compose two qa_bitset16 mblocks. + */ +class qa_bitset32 : public mb_mblock +{ + mb_port_sptr d_in; + mb_port_sptr d_out; + +public: + qa_bitset32(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); +}; + +qa_bitset32::qa_bitset32(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) +{ + long bitno = pmt_to_long(user_arg); // The bit we are to set + + d_in = define_port("in", "qa-bitset", false, mb_port::RELAY); + d_out = define_port("out", "qa-bitset", true, mb_port::RELAY); + + define_component("bs0", "qa_bitset16", pmt_from_long(bitno)); + define_component("bs1", "qa_bitset16", pmt_from_long(bitno + 16)); + connect("self", "in", "bs0", "in"); + connect("bs0", "out", "bs1", "in"); + connect("bs1", "out", "self", "out"); +} + +REGISTER_MBLOCK_CLASS(qa_bitset32); + +// ------------------------------------------------------------------------ + +class qa_bitset_src : public mb_mblock +{ + mb_port_sptr d_cs_top; + mb_port_sptr d_cs; + + mb_port_sptr d_out; + + long d_msg_number; // starting message number + long d_nmsgs_to_send; // # of messages to send + long d_batch_size; // # of messages to send per batch + +public: + qa_bitset_src(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); + void handle_message(mb_message_sptr msg); + +protected: + void send_one(); + void send_batch(); +}; + +qa_bitset_src::qa_bitset_src(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) +{ + d_msg_number = pmt_to_long(pmt_nth(0, user_arg)); + d_nmsgs_to_send = pmt_to_long(pmt_nth(1, user_arg)); + d_batch_size = pmt_to_long(pmt_nth(2, user_arg)); + + d_cs_top = define_port("cs_top", "qa-bitset-cs", true, mb_port::EXTERNAL); + d_cs = define_port("cs", "qa-bitset-cs", true, mb_port::EXTERNAL); + + d_out = define_port("out", "qa-bitset", true, mb_port::EXTERNAL); +} + +void +qa_bitset_src::handle_message(mb_message_sptr msg) +{ + if ((pmt_eq(msg->port_id(), d_cs_top->port_symbol()) + || pmt_eq(msg->port_id(), d_cs->port_symbol())) + && pmt_eq(msg->signal(), s_send_batch)){ + send_batch(); + } +} + +void +qa_bitset_src::send_batch() +{ + for (int i = 0; i < d_batch_size; i++) + send_one(); +} + +void +qa_bitset_src::send_one() +{ + if (d_nmsgs_to_send > 0){ + pmt_t msg_number = pmt_from_long(d_msg_number++); + d_out->send(s_data, pmt_cons(msg_number, s_long0)); + } + if (--d_nmsgs_to_send <= 0) + exit(); +} + +REGISTER_MBLOCK_CLASS(qa_bitset_src); + +// ------------------------------------------------------------------------ + +class qa_bitset_sink : public mb_mblock +{ + // Maximum number of messages we can track + static const size_t MAX_MSGS = 1 * 1024 * 1024; + + mb_port_sptr d_cs0; + mb_port_sptr d_cs1; + mb_port_sptr d_cs2; + mb_port_sptr d_cs3; + + mb_port_sptr d_in0; + mb_port_sptr d_in1; + mb_port_sptr d_in2; + mb_port_sptr d_in3; + + long d_nmsgs_to_recv; // # of messages to receive + long d_batch_size; // # of messages to receive per batch + uint32_t d_expected_mask; + + std::bitset<MAX_MSGS> d_bitset; + long d_nrecvd; + +public: + qa_bitset_sink(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); + void handle_message(mb_message_sptr msg); + +protected: + void receive_one(mb_message_sptr msg); +}; + +qa_bitset_sink::qa_bitset_sink(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg), + d_nrecvd(0) +{ + d_nmsgs_to_recv = pmt_to_long(pmt_nth(0, user_arg)); + d_batch_size = pmt_to_long(pmt_nth(1, user_arg)); + d_expected_mask = pmt_to_long(pmt_nth(2, user_arg)); + + if (d_nmsgs_to_recv > (long) MAX_MSGS) + throw std::out_of_range("qa_bitset_sink: nmsgs_to_recv is too big"); + + if (d_batch_size < 1) + throw std::out_of_range("qa_bitset_sink: batch_size must be >= 1"); + + d_cs0 = define_port("cs0", "qa-bitset-cs", true, mb_port::EXTERNAL); + d_cs1 = define_port("cs1", "qa-bitset-cs", true, mb_port::EXTERNAL); + d_cs2 = define_port("cs2", "qa-bitset-cs", true, mb_port::EXTERNAL); + d_cs3 = define_port("cs3", "qa-bitset-cs", true, mb_port::EXTERNAL); + + d_in0 = define_port("in0", "qa-bitset", false, mb_port::EXTERNAL); + d_in1 = define_port("in1", "qa-bitset", false, mb_port::EXTERNAL); + d_in2 = define_port("in2", "qa-bitset", false, mb_port::EXTERNAL); + d_in3 = define_port("in3", "qa-bitset", false, mb_port::EXTERNAL); +} + +void +qa_bitset_sink::handle_message(mb_message_sptr msg) +{ + if ((pmt_eq(msg->port_id(), d_in0->port_symbol()) + || pmt_eq(msg->port_id(), d_in1->port_symbol()) + || pmt_eq(msg->port_id(), d_in2->port_symbol()) + || pmt_eq(msg->port_id(), d_in3->port_symbol())) + && pmt_eq(msg->signal(), s_data)){ + + receive_one(msg); + } +} + +void +qa_bitset_sink::receive_one(mb_message_sptr msg) +{ + long msg_number = pmt_to_long(pmt_car(msg->data())); + uint32_t mask = pmt_to_long(pmt_cdr(msg->data())); + + // std::cout << msg->data() << std::endl; + + d_nrecvd++; + if (d_nrecvd % d_batch_size == d_batch_size - 1){ + d_cs0->send(s_send_batch); + d_cs1->send(s_send_batch); + d_cs2->send(s_send_batch); + d_cs3->send(s_send_batch); + } + + if (msg_number >= d_nmsgs_to_recv){ + std::cerr << "qa_bitset_sink::receive_one: msg_number too big (" + << msg_number << ")\n"; + shutdown_all(PMT_F); + return; + } + if (mask != d_expected_mask){ + fprintf(stderr, + "qa_bitset_sink::receive_one: Wrong mask. Expected 0x%08x, got 0x%08x\n", + d_expected_mask, mask); + shutdown_all(PMT_F); + return; + } + + if (d_bitset.test((size_t) msg_number)){ + std::cerr << "qa_bitset_sink::receive_one: duplicate msg_number (" + << msg_number << ")\n"; + shutdown_all(PMT_F); + return; + } + + d_bitset.set((size_t) msg_number); + if (d_nrecvd == d_nmsgs_to_recv) + shutdown_all(PMT_T); // we're done! +} + +REGISTER_MBLOCK_CLASS(qa_bitset_sink); + +// ------------------------------------------------------------------------ + +class qa_bitset_top : public mb_mblock +{ + static const int NPIPES = 4; + + std::vector<mb_port_sptr> d_cs; + + long d_nmsgs; // # of messages to send + long d_batch_size; // # of messages to receive per batch + +public: + qa_bitset_top(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); + void initial_transition(); +}; + +qa_bitset_top::qa_bitset_top(mb_runtime *runtime, + const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) +{ + d_nmsgs = pmt_to_long(pmt_nth(0, user_arg)); + d_nmsgs = (d_nmsgs / NPIPES) * NPIPES; + d_batch_size = pmt_to_long(pmt_nth(1, user_arg)); + + /* + * We build NPIPES sources which feed NPIPES pipelines, each of which + * consists of 8-mblocks. All pipelines feed into a single sink + * which keeps track the results. + */ + for (int i = 0; i < NPIPES; i++){ + d_cs.push_back(define_port("cs"+str(i), "qa-bitset-cs", false, mb_port::INTERNAL)); + + // sources of test messages + define_component("src"+str(i), "qa_bitset_src", + pmt_list3(pmt_from_long(i * d_nmsgs/NPIPES), + pmt_from_long(d_nmsgs/NPIPES), + pmt_from_long(d_batch_size))); + + // 8-mblock processing pipelines + define_component("pipeline"+str(i), "qa_bitset8", pmt_from_long(0)); + } + + // sink for output of pipelines + define_component("sink", "qa_bitset_sink", + pmt_list3(pmt_from_long(d_nmsgs), + pmt_from_long(d_batch_size * NPIPES), + pmt_from_long(0x000000ff))); + + for (int i = 0; i < NPIPES; i++){ + connect("self", "cs"+str(i), "src"+str(i), "cs_top"); + connect("src"+str(i), "out", "pipeline"+str(i), "in"); + connect("src"+str(i), "cs", "sink", "cs"+str(i)); + connect("pipeline"+str(i), "out", "sink", "in"+str(i)); + } +} + +void +qa_bitset_top::initial_transition() +{ + for (int i = 0; i < NPIPES; i++){ + d_cs[i]->send(s_send_batch); // prime the pump + d_cs[i]->send(s_send_batch); + } +} + +REGISTER_MBLOCK_CLASS(qa_bitset_top); diff --git a/mblock/src/lib/qa_bitset.mbh b/mblock/src/lib/qa_bitset.mbh new file mode 100644 index 000000000..0b2df003e --- /dev/null +++ b/mblock/src/lib/qa_bitset.mbh @@ -0,0 +1,61 @@ +;; -*- scheme -*- ; not really, but tells emacs how to format this +;; +;; Copyright 2007 Free Software Foundation, Inc. +;; +;; This file is part of GNU Radio +;; +;; GNU Radio is free software; you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published by +;; the Free Software Foundation; either version 2, or (at your option) +;; any later version. +;; +;; GNU Radio is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU General Public License for more details. +;; +;; You should have received a copy of the GNU General Public License along +;; with this program; if not, write to the Free Software Foundation, Inc., +;; 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +;; + +;; ---------------------------------------------------------------- +;; qa-bitset -- interface to mblock QA code +;; + +(define-protocol-class qa-bitset + + (:incoming + + (data n bitmask) + + ) + ) + +(define-protocol-class qa-bitset-cs + + (:outgoing + + (send-batch) + + ) + ) + +;; ---------------------------------------------------------------- +;; qa-disconnect -- interface to mblock QA code +;; + +(define-protocol-class qa-disconnect-cs + + (:outgoing + + (select-pipe n) + + ) + + (:incoming + + (ack n) + + ) + ) diff --git a/mblock/src/lib/qa_disconnect.cc b/mblock/src/lib/qa_disconnect.cc new file mode 100644 index 000000000..cde8d6e5a --- /dev/null +++ b/mblock/src/lib/qa_disconnect.cc @@ -0,0 +1,238 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <mb_mblock.h> +#include <mb_protocol_class.h> +#include <mb_message.h> +#include <mb_class_registry.h> +#include <iostream> +#include <sstream> +#include <bitset> + +static pmt_t s_in = pmt_intern("in"); +static pmt_t s_out = pmt_intern("out"); +static pmt_t s_data = pmt_intern("data"); +static pmt_t s_ack = pmt_intern("ack"); +static pmt_t s_select_pipe = pmt_intern("select-pipe"); +static pmt_t s_long0 = pmt_from_long(0); +static pmt_t s_sys_port = pmt_intern("%sys-port"); +static pmt_t s_shutdown = pmt_intern("%shutdown"); + +class qa_disconnect_mux : public mb_mblock +{ + mb_port_sptr d_in; + mb_port_sptr d_out; + mb_port_sptr d_cs; + +public: + qa_disconnect_mux(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); + void initial_transition(); + void handle_message(mb_message_sptr msg); +}; + +qa_disconnect_mux::qa_disconnect_mux(mb_runtime *runtime, + const std::string &instance_name, + pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) +{ + d_in = define_port("in", "qa-bitset", false, mb_port::RELAY); + d_out = define_port("out", "qa-bitset", true, mb_port::RELAY); + d_cs = define_port("cs", "qa-disconnect-cs", true, mb_port::EXTERNAL); + + define_component("pipeline0", "qa_bitset8", pmt_from_long(0)); + define_component("pipeline1", "qa_bitset8", pmt_from_long(8)); +} + +void +qa_disconnect_mux::initial_transition(){} + +void +qa_disconnect_mux::handle_message(mb_message_sptr msg) +{ + if (pmt_eq(msg->port_id(), d_cs->port_symbol()) // select-pipe on cs + && pmt_eq(msg->signal(), s_select_pipe)){ + + long which_pipe = pmt_to_long(pmt_nth(0, msg->data())); + + disconnect_component("pipeline0"); + disconnect_component("pipeline1"); + + switch(which_pipe){ + + case 0: + connect("self", "in", "pipeline0", "in"); + connect("self", "out", "pipeline0", "out"); + break; + + case 1: + connect("self", "in", "pipeline1", "in"); + connect("self", "out", "pipeline1", "out"); + break; + } + + d_cs->send(s_ack, msg->data()); + return; + } +} + +REGISTER_MBLOCK_CLASS(qa_disconnect_mux); + +// ------------------------------------------------------------------------ + +class qa_disconnect_top : public mb_mblock +{ + enum state_t { + UNINITIALIZED, + WAIT_FOR_ACK, + WAIT_FOR_DATA + }; + + state_t d_state; + int d_msg_number; + int d_nmsgs_to_send; + + mb_port_sptr d_in; + mb_port_sptr d_out; + mb_port_sptr d_cs; + + void check_pipe_send_next_msg(); + void send_next_msg(); + void select_pipe(int n); + + // alternate pipes every 128 messages + static int which_pipe(int msg_number) { return (msg_number >> 7) & 0x1; } + bool time_to_switch() { return (d_msg_number & 0x7f) == 0; } + +public: + qa_disconnect_top(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); + void initial_transition(); + void handle_message(mb_message_sptr msg); +}; + +qa_disconnect_top::qa_disconnect_top(mb_runtime *runtime, + const std::string &instance_name, + pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg), + d_state(UNINITIALIZED), d_msg_number(0) +{ + d_nmsgs_to_send = pmt_to_long(pmt_nth(0, user_arg)); + + d_in = define_port("in", "qa-bitset", false, mb_port::INTERNAL); + d_out = define_port("out", "qa-bitset", true, mb_port::INTERNAL); + d_cs = define_port("cs", "qa-disconnect-cs", false, mb_port::INTERNAL); + + define_component("mux", "qa_disconnect_mux", PMT_F); + + connect("self", "cs", "mux", "cs"); + connect("self", "out", "mux", "in"); + connect("self", "in", "mux", "out"); +} + +void +qa_disconnect_top::initial_transition() +{ + check_pipe_send_next_msg(); +} + +void +qa_disconnect_top::handle_message(mb_message_sptr msg) +{ + if (0) + std::cerr << "qa_disconnect_top::handle_msg state = " + << d_state << "\n msg = " << msg << std::endl; + + if (pmt_eq(msg->port_id(), d_cs->port_symbol()) // ack on cs + && pmt_eq(msg->signal(), s_ack) + && d_state == WAIT_FOR_ACK){ + + send_next_msg(); + return; + } + + if (pmt_eq(msg->port_id(), d_in->port_symbol()) // data on in + && pmt_eq(msg->signal(), s_data) + && d_state == WAIT_FOR_DATA){ + + /* + * Confirm that msg passed through the pipe that we expect... + */ + static const long expected_mask[2] = { 0x000000ff, 0x0000ff00 }; + + long msg_number = pmt_to_long(pmt_car(msg->data())); + long mask = pmt_to_long(pmt_cdr(msg->data())); + + if (mask != expected_mask[which_pipe(msg_number)]){ + fprintf(stderr, "\nqa_disconnect_top: wrong mask in msg_number = 0x%08lx\n", + msg_number); + fprintf(stderr, " expected = 0x%08lx, actual = 0x%08lx\n", + expected_mask[which_pipe(msg_number)], mask); + shutdown_all(PMT_F); + return; + } + + if (msg_number == d_nmsgs_to_send - 1){ // we're done (and were successful) + shutdown_all(PMT_T); + return; + } + + check_pipe_send_next_msg(); + return; + } + + if (pmt_eq(msg->port_id(), s_sys_port) // ignore %shutdown on %sys-port + && pmt_eq(msg->signal(), s_shutdown)) + return; + + std::cerr << "qa_disconnect_top: unhandled msg: state = " + << d_state << "\n msg = " << msg << std::endl; +} + +void +qa_disconnect_top::select_pipe(int n) +{ + d_cs->send(s_select_pipe, pmt_list1(pmt_from_long(n))); + d_state = WAIT_FOR_ACK; +} + +void +qa_disconnect_top::send_next_msg() +{ + d_state = WAIT_FOR_DATA; + if (d_msg_number == d_nmsgs_to_send) // we've sent all we're supposed to + return; + + d_out->send(s_data, pmt_cons(pmt_from_long(d_msg_number), s_long0)); + d_msg_number++; +} + +void +qa_disconnect_top::check_pipe_send_next_msg() +{ + if (time_to_switch()) + select_pipe(which_pipe(d_msg_number)); + else + send_next_msg(); +} + +REGISTER_MBLOCK_CLASS(qa_disconnect_top); diff --git a/mblock/src/lib/qa_mblock.cc b/mblock/src/lib/qa_mblock.cc index 4be4a23c3..cf4224544 100644 --- a/mblock/src/lib/qa_mblock.cc +++ b/mblock/src/lib/qa_mblock.cc @@ -27,6 +27,8 @@ #include <qa_mblock.h> #include <qa_mblock_prims.h> #include <qa_mblock_send.h> +#include <qa_mblock_sys.h> +#include <qa_timeouts.h> CppUnit::TestSuite * qa_mblock::suite() @@ -35,6 +37,8 @@ qa_mblock::suite() s->addTest (qa_mblock_prims::suite()); s->addTest (qa_mblock_send::suite()); + s->addTest (qa_mblock_sys::suite()); + s->addTest (qa_timeouts::suite()); return s; } diff --git a/mblock/src/lib/qa_mblock_prims.cc b/mblock/src/lib/qa_mblock_prims.cc index 79ed5a21e..2995215fb 100644 --- a/mblock/src/lib/qa_mblock_prims.cc +++ b/mblock/src/lib/qa_mblock_prims.cc @@ -34,6 +34,7 @@ #include <mb_message.h> #include <mb_mblock_impl.h> #include <mb_msg_accepter.h> +#include <mb_class_registry.h> #include <stdio.h> static pmt_t s_cs = pmt_intern("cs"); @@ -47,42 +48,49 @@ static pmt_t s_out = pmt_intern("out"); class dp_1 : public mb_mblock { public: - dp_1(); + dp_1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); ~dp_1(); }; -dp_1::dp_1() +dp_1::dp_1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) { } dp_1::~dp_1(){} +REGISTER_MBLOCK_CLASS(dp_1); + // ---------------------------------------------------------------- class dp_2 : public mb_mblock { public: - dp_2(); + dp_2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); ~dp_2(); }; -dp_2::dp_2() +dp_2::dp_2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) { define_port("cs", "cs-protocol", false, mb_port::EXTERNAL); } dp_2::~dp_2(){} +REGISTER_MBLOCK_CLASS(dp_2); + // ---------------------------------------------------------------- class dp_3 : public mb_mblock { public: - dp_3(); + dp_3(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); ~dp_3(); }; -dp_3::dp_3() +dp_3::dp_3(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) { define_port("cs", "cs-protocol", false, mb_port::EXTERNAL); define_port("cs", "cs-protocol", false, mb_port::EXTERNAL); // duplicate def @@ -90,19 +98,23 @@ dp_3::dp_3() dp_3::~dp_3(){} +REGISTER_MBLOCK_CLASS(dp_3); + // ---------------------------------------------------------------- void qa_mblock_prims::test_define_ports() { - // std::vector<mb_port_sptr> intf; - - mb_mblock_sptr mb1 = mb_mblock_sptr(new dp_1()); - // intf = mb1->peer_interface(); - // CPPUNIT_ASSERT_EQUAL(size_t(0), intf.size()); + + mb_runtime_sptr rts = mb_make_runtime(); + mb_runtime *rt = rts.get(); + + // Should work + mb_mblock_sptr mb1 = mb_mblock_sptr(new dp_1(rt, "top", PMT_F)); // raises runtime_error because of unknown protocol "cs-protocol" - CPPUNIT_ASSERT_THROW(mb_mblock_sptr(new dp_2()), std::runtime_error); + CPPUNIT_ASSERT_THROW(mb_mblock_sptr(new dp_2(rt, "top", PMT_F)), + std::runtime_error); // define the protocol class pmt_t pc = mb_make_protocol_class(pmt_intern("cs-protocol"), @@ -112,10 +124,11 @@ qa_mblock_prims::test_define_ports() // std::cout << "pc = " << pc << '\n'; - mb_mblock_sptr mb2 = mb_mblock_sptr(new dp_2()); + mb_mblock_sptr mb2 = mb_mblock_sptr(new dp_2(rt, "top", PMT_F)); // raises pmt_exception because of duplicate port definition of "cs" - CPPUNIT_ASSERT_THROW(mb_mblock_sptr(new dp_3()), mbe_duplicate_port); + CPPUNIT_ASSERT_THROW(mb_mblock_sptr(new dp_3(rt, "top", PMT_F)), + mbe_duplicate_port); } // ================================================================ @@ -123,61 +136,74 @@ qa_mblock_prims::test_define_ports() class dc_0 : public mb_mblock { public: - dc_0(); + dc_0(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); ~dc_0(); }; -dc_0::dc_0() +dc_0::dc_0(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) { } dc_0::~dc_0() {} +REGISTER_MBLOCK_CLASS(dc_0); + // ---------------------------------------------------------------- class dc_ok : public mb_mblock { public: - dc_ok(); + dc_ok(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); ~dc_ok(); }; -dc_ok::dc_ok() +dc_ok::dc_ok(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) { - define_component("c0", mb_mblock_sptr(new dc_0())); - define_component("c1", mb_mblock_sptr(new dc_0())); - define_component("c2", mb_mblock_sptr(new dc_0())); + define_component("c0", "dc_0"); + define_component("c1", "dc_0"); + define_component("c2", "dc_0"); } dc_ok::~dc_ok(){} +REGISTER_MBLOCK_CLASS(dc_ok); + // ---------------------------------------------------------------- class dc_not_ok : public mb_mblock { public: - dc_not_ok(); + dc_not_ok(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); ~dc_not_ok(); }; -dc_not_ok::dc_not_ok() - : mb_mblock() +dc_not_ok::dc_not_ok(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) { - define_component("c0", mb_mblock_sptr(new dc_0())); - define_component("c0", mb_mblock_sptr(new dc_0())); // duplicate name + define_component("c0", "dc_0"); + define_component("c0", "dc_0"); // duplicate name } dc_not_ok::~dc_not_ok(){} +REGISTER_MBLOCK_CLASS(dc_not_ok); + // ---------------------------------------------------------------- void qa_mblock_prims::test_define_components() { - mb_mblock_sptr mb1 = mb_mblock_sptr(new dc_ok()); // OK + mb_runtime_sptr rts = mb_make_runtime(); + mb_runtime *rt = rts.get(); + + // Should work + mb_mblock_sptr mb1 = mb_mblock_sptr(new dc_ok(rt, "top", PMT_F)); // raises pmt_exception because of duplicate component definition of "c0" - CPPUNIT_ASSERT_THROW(mb_mblock_sptr(new dc_not_ok()), mbe_duplicate_component); + CPPUNIT_ASSERT_THROW(mb_mblock_sptr(new dc_not_ok(rt, "top", PMT_F)), + mbe_duplicate_component); } // ================================================================ @@ -185,7 +211,9 @@ qa_mblock_prims::test_define_components() class tc_norm : public mb_mblock { public: - tc_norm(){ + tc_norm(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) + { define_port("data", "i/o", false, mb_port::EXTERNAL); define_port("norm", "i/o", false, mb_port::EXTERNAL); define_port("conj", "i/o", true, mb_port::EXTERNAL); @@ -197,22 +225,26 @@ public: tc_norm::~tc_norm(){} +REGISTER_MBLOCK_CLASS(tc_norm); + //////////////////////////////////////////////////////////////// class tc_0 : public mb_mblock { public: - tc_0(){ + tc_0(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) + { define_port("norm", "i/o", false, mb_port::EXTERNAL); define_port("conj", "i/o", true, mb_port::EXTERNAL); define_port("int", "i/o", false, mb_port::INTERNAL); - define_component("c0", mb_mblock_sptr(new tc_norm())); - define_component("c1", mb_mblock_sptr(new tc_norm())); - define_component("c2", mb_mblock_sptr(new tc_norm())); - define_component("c3", mb_mblock_sptr(new tc_norm())); - define_component("c4", mb_mblock_sptr(new tc_norm())); - define_component("c5", mb_mblock_sptr(new tc_norm())); + define_component("c0", "tc_norm"); + define_component("c1", "tc_norm"); + define_component("c2", "tc_norm"); + define_component("c3", "tc_norm"); + define_component("c4", "tc_norm"); + define_component("c5", "tc_norm"); // OK connect("c0", "norm", "c1", "conj"); @@ -284,14 +316,18 @@ public: tc_0::~tc_0(){} +REGISTER_MBLOCK_CLASS(tc_0); + //////////////////////////////////////////////////////////////// class tc_1 : public mb_mblock { public: - tc_1(){ - define_component("c0", mb_mblock_sptr(new tc_norm())); - define_component("c1", mb_mblock_sptr(new tc_norm())); + tc_1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) + { + define_component("c0", "tc_norm"); + define_component("c1", "tc_norm"); connect("c0", "norm", "c1", "conj"); } @@ -301,6 +337,8 @@ public: tc_1::~tc_1(){} +REGISTER_MBLOCK_CLASS(tc_1); + //////////////////////////////////////////////////////////////// void @@ -315,7 +353,10 @@ qa_mblock_prims::test_connect() pmt_list1(pmt_intern("in")), // in pmt_list1(pmt_intern("out"))); // out - mb_mblock_sptr mb0 = mb_mblock_sptr(new tc_0()); + mb_runtime_sptr rts = mb_make_runtime(); + mb_runtime *rt = rts.get(); + + mb_mblock_sptr mb0 = mb_mblock_sptr(new tc_0(rt, "top", PMT_F)); } //////////////////////////////////////////////////////////////// @@ -377,11 +418,14 @@ qa_mblock_prims::test_msg_queue() void qa_mblock_prims::test_make_accepter() { + mb_runtime_sptr rts = mb_make_runtime(); + mb_runtime *rt = rts.get(); + // create a block - mb_mblock_sptr mb = mb_mblock_sptr(new dp_2()); + mb_mblock_sptr mb = mb_mblock_sptr(new dp_2(rt, "top", PMT_F)); // use "internal use only" method... - mb_msg_accepter_sptr accepter = mb->impl()->make_accepter("cs"); + mb_msg_accepter_sptr accepter = mb->impl()->make_accepter(pmt_intern("cs")); // Now push a few messages into it... // signal data metadata pri diff --git a/mblock/src/lib/qa_mblock_send.cc b/mblock/src/lib/qa_mblock_send.cc index 46d6b6440..cd81709e2 100644 --- a/mblock/src/lib/qa_mblock_send.cc +++ b/mblock/src/lib/qa_mblock_send.cc @@ -35,6 +35,7 @@ #include <mb_message.h> #include <mb_mblock_impl.h> #include <mb_msg_accepter.h> +#include <mb_class_registry.h> #include <stdio.h> static pmt_t s_data = pmt_intern("data"); @@ -57,6 +58,12 @@ define_protocol_classes() } +mb_mblock_sptr +get_top(mb_runtime_sptr rts) +{ + return dynamic_cast<mb_runtime_nop *>(rts.get())->top(); +} + // ================================================================ // test_simple_routing // ================================================================ @@ -70,12 +77,13 @@ class sr1 : public mb_mblock mb_port_sptr d_p3; public: - sr1(); + sr1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); ~sr1(); - void init_fsm(); + void initial_transition(); }; -sr1::sr1() +sr1::sr1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) { d_p1 = define_port("p1", "qa-send-cs", true, mb_port::EXTERNAL); d_p2 = define_port("p2", "qa-send-cs", true, mb_port::EXTERNAL); @@ -85,9 +93,9 @@ sr1::sr1() sr1::~sr1(){} void -sr1::init_fsm() +sr1::initial_transition() { - // std::cout << instance_name() << "[sr1]: init_fsm\n"; + // std::cout << instance_name() << "[sr1]: initial_transition\n"; // send two messages to each port pmt_t our_name = pmt_intern(instance_name()); @@ -98,6 +106,8 @@ sr1::init_fsm() d_p2->send(s_status, pmt_list3(our_name, s_p2, pmt_from_long(1))); } +REGISTER_MBLOCK_CLASS(sr1); + // ---------------------------------------------------------------- // top-level container block for test_simple_routing @@ -106,17 +116,18 @@ class sr0 : public mb_mblock mb_port_sptr d_p0; public: - sr0(); + sr0(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); ~sr0(); - void init_fsm(); + void initial_transition(); }; -sr0::sr0() +sr0::sr0(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) { d_p0 = define_port("p0", "qa-send-cs", false, mb_port::INTERNAL); - define_component("mb1", mb_mblock_sptr(new sr1())); - define_component("mb2", mb_mblock_sptr(new sr1())); + define_component("mb1", "sr1"); + define_component("mb2", "sr1"); connect("self", "p0", "mb1", "p1"); connect("mb1", "p2", "mb2", "p3"); @@ -126,9 +137,9 @@ sr0::sr0() sr0::~sr0(){} void -sr0::init_fsm() +sr0::initial_transition() { - // std::cout << instance_name() << "[sr0]: init_fsm\n"; + // std::cout << instance_name() << "[sr0]: initial_transition\n"; // send two messages to p0 pmt_t our_name = pmt_intern(instance_name()); @@ -136,6 +147,8 @@ sr0::init_fsm() d_p0->send(s_control, pmt_list3(our_name, s_p0, pmt_from_long(1))); } +REGISTER_MBLOCK_CLASS(sr0); + // ---------------------------------------------------------------- /* @@ -151,9 +164,10 @@ qa_mblock_send::test_simple_routing() mb_message_sptr msg; mb_runtime_sptr rt = mb_make_runtime_nop(); - mb_mblock_sptr mb0 = mb_mblock_sptr(new sr0()); - rt->run(mb0); + rt->run("top", "sr0", PMT_F); + mb_mblock_sptr mb0 = get_top(rt); + // Reach into the guts and see if the messages ended up where they should have // mb0 should have received two messages sent from mb1 via its p1 @@ -238,12 +252,13 @@ class rr2 : public mb_mblock mb_port_sptr d_p2; public: - rr2(); + rr2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); ~rr2(); - void init_fsm(); + void initial_transition(); }; -rr2::rr2() +rr2::rr2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) { d_p1 = define_port("p1", "qa-send-cs", true, mb_port::EXTERNAL); d_p2 = define_port("p2", "qa-send-cs", false, mb_port::EXTERNAL); @@ -252,9 +267,9 @@ rr2::rr2() rr2::~rr2(){} void -rr2::init_fsm() +rr2::initial_transition() { - // std::cout << instance_name() << "[rr2]: init_fsm\n"; + // std::cout << instance_name() << "[rr2]: initial_transition\n"; // send two messages via p1 pmt_t our_name = pmt_intern(instance_name()); @@ -262,6 +277,8 @@ rr2::init_fsm() d_p1->send(s_status, pmt_list3(our_name, s_p1, pmt_from_long(1))); } +REGISTER_MBLOCK_CLASS(rr2); + // ---------------------------------------------------------------- // intermediate block for test_relay_routing @@ -272,16 +289,17 @@ class rr1 : public mb_mblock mb_port_sptr d_p2; public: - rr1(); + rr1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); ~rr1(); }; -rr1::rr1() +rr1::rr1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) { d_p1 = define_port("p1", "qa-send-cs", true, mb_port::RELAY); d_p2 = define_port("p2", "qa-send-cs", false, mb_port::RELAY); - define_component("c0", mb_mblock_sptr(new rr2())); + define_component("c0", "rr2"); connect("self", "p1", "c0", "p1"); connect("self", "p2", "c0", "p2"); @@ -289,6 +307,8 @@ rr1::rr1() rr1::~rr1(){} +REGISTER_MBLOCK_CLASS(rr1); + // ---------------------------------------------------------------- // top-level container for test_relay_routing @@ -296,14 +316,15 @@ rr1::~rr1(){} class rr0_a : public mb_mblock { public: - rr0_a(); + rr0_a(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); ~rr0_a(); }; -rr0_a::rr0_a() +rr0_a::rr0_a(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) { - define_component("c0", mb_mblock_sptr(new rr1())); - define_component("c1", mb_mblock_sptr(new rr2())); + define_component("c0", "rr1"); + define_component("c1", "rr2"); connect("c0", "p1", "c1", "p2"); connect("c0", "p2", "c1", "p1"); @@ -311,6 +332,7 @@ rr0_a::rr0_a() rr0_a::~rr0_a(){} +REGISTER_MBLOCK_CLASS(rr0_a); /* * This tests basic message routing using RELAY and EXTERNAL ports. @@ -323,8 +345,8 @@ qa_mblock_send::test_relay_routing_1() mb_message_sptr msg; mb_runtime_sptr rt = mb_make_runtime_nop(); - mb_mblock_sptr top = mb_mblock_sptr(new rr0_a()); - rt->run(top); + rt->run("top", "rr0_a", PMT_F); + mb_mblock_sptr top = get_top(rt); // Reach into the guts and see if the messages ended up where they should have @@ -377,14 +399,15 @@ qa_mblock_send::test_relay_routing_1() class rr0_b : public mb_mblock { public: - rr0_b(); + rr0_b(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); ~rr0_b(); }; -rr0_b::rr0_b() +rr0_b::rr0_b(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) { - define_component("c0", mb_mblock_sptr(new rr1())); - define_component("c1", mb_mblock_sptr(new rr1())); + define_component("c0", "rr1"); + define_component("c1", "rr1"); connect("c0", "p1", "c1", "p2"); connect("c0", "p2", "c1", "p1"); @@ -392,6 +415,7 @@ rr0_b::rr0_b() rr0_b::~rr0_b(){} +REGISTER_MBLOCK_CLASS(rr0_b); /* * This tests basic message routing using RELAY and EXTERNAL ports. @@ -404,8 +428,8 @@ qa_mblock_send::test_relay_routing_2() mb_message_sptr msg; mb_runtime_sptr rt = mb_make_runtime_nop(); - mb_mblock_sptr top = mb_mblock_sptr(new rr0_b()); - rt->run(top); + rt->run("top", "rr0_b", PMT_F); + mb_mblock_sptr top = get_top(rt); // Reach into the guts and see if the messages ended up where they should have diff --git a/mblock/src/lib/qa_mblock_sys.cc b/mblock/src/lib/qa_mblock_sys.cc new file mode 100644 index 000000000..a64f546c7 --- /dev/null +++ b/mblock/src/lib/qa_mblock_sys.cc @@ -0,0 +1,271 @@ +/* -*- c++ -*- */ +/* + * Copyright 2006,2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <qa_mblock_sys.h> +#include <cppunit/TestAssert.h> +#include <mb_mblock.h> +#include <mb_runtime.h> +#include <mb_runtime_nop.h> // QA only +#include <mb_protocol_class.h> +#include <mb_exception.h> +#include <mb_msg_queue.h> +#include <mb_message.h> +#include <mb_mblock_impl.h> +#include <mb_msg_accepter.h> +#include <mb_class_registry.h> +#include <stdio.h> +#include <string.h> +#include <iostream> + + +static pmt_t s_data = pmt_intern("data"); +static pmt_t s_status = pmt_intern("status"); +static pmt_t s_control = pmt_intern("control"); +static pmt_t s_p0 = pmt_intern("p0"); +static pmt_t s_p1 = pmt_intern("p1"); +static pmt_t s_p2 = pmt_intern("p2"); +static pmt_t s_p3 = pmt_intern("p3"); +static pmt_t s_e1 = pmt_intern("e1"); +static pmt_t s_r1 = pmt_intern("r1"); + +static void +define_protocol_classes() +{ + mb_make_protocol_class(s_data, // name + pmt_list1(s_data), // incoming + pmt_list1(s_data)); // outgoing +} + + +// ================================================================ +// test_sys_1 +// ================================================================ + +class sys_1 : public mb_mblock +{ + pmt_t d_user_arg; + mb_port_sptr d_data; + +public: + sys_1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); + ~sys_1(); + void initial_transition(); +}; + +sys_1::sys_1(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg), + d_user_arg(user_arg) +{ + d_data = define_port("data", "data", true, mb_port::EXTERNAL); +} + +sys_1::~sys_1(){} + +void +sys_1::initial_transition() +{ + shutdown_all(d_user_arg); +} + +REGISTER_MBLOCK_CLASS(sys_1); + +void +qa_mblock_sys::test_sys_1() +{ + define_protocol_classes(); + + pmt_t result; + pmt_t n1 = pmt_from_long(1); + pmt_t n2 = pmt_from_long(2); + + mb_runtime_sptr rt1 = mb_make_runtime(); + +#if 0 + try { + rt1->run("top-1", "sys_1", n1, &result); + } + catch (omni_thread_fatal e){ + std::cerr << "caught omni_thread_fatal: error = " << e.error + << ": " << strerror(e.error) << std::endl; + } + catch (omni_thread_invalid){ + std::cerr << "caught omni_thread_invalid\n"; + } +#else + rt1->run("top-1", "sys_1", n1, &result); +#endif + CPPUNIT_ASSERT(pmt_equal(n1, result)); + + // Execute run a second time, with the same rt, to ensure sanity. + rt1->run("top-2", "sys_1", n2, &result); + CPPUNIT_ASSERT(pmt_equal(n2, result)); +} + +// ================================================================ +// test_sys_2 +// ================================================================ + +class squarer : public mb_mblock +{ + mb_port_sptr d_data; + +public: + squarer(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); + + void handle_message(mb_message_sptr msg); +}; + +squarer::squarer(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) +{ + d_data = define_port("data", "data", true, mb_port::EXTERNAL); +} + +void +squarer::handle_message(mb_message_sptr msg) +{ + if (!pmt_eq(msg->signal(), s_data)) // we only handle the "data" message + return; + + // long x -> (long x . long (x * x)) + + pmt_t x_pmt = msg->data(); + long x = pmt_to_long(x_pmt); + d_data->send(s_data, pmt_cons(x_pmt, pmt_from_long(x * x))); +} + +REGISTER_MBLOCK_CLASS(squarer); + +// ---------------------------------------------------------------- + +class sys_2 : public mb_mblock +{ + mb_port_sptr d_data; + +public: + sys_2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg); + void initial_transition(); + void handle_message(mb_message_sptr msg); +}; + +sys_2::sys_2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg) +{ + d_data = define_port("data", "data", true, mb_port::INTERNAL); + define_component("squarer", "squarer"); + connect("self", "data", "squarer", "data"); +} + +void +sys_2::initial_transition() +{ + // FIXME start timer to detect general failure + + d_data->send(s_data, pmt_from_long(0)); // send initial message +} + +void +sys_2::handle_message(mb_message_sptr msg) +{ + if (!pmt_eq(msg->signal(), s_data)) // we only handle the "data" message + return; + + // first check correctness of message + + long x = pmt_to_long(pmt_car(msg->data())); + long y = pmt_to_long(pmt_cdr(msg->data())); + + // std::cout << msg->data() << std::endl; + + if (y != x * x){ + std::cerr << "sys_2::handle_message: Expected y == x * x. Got y = " + << y << " for x = " << x << std::endl; + + shutdown_all(PMT_F); // failed + } + + if (x == 100) + shutdown_all(PMT_T); // done, OK + else + d_data->send(s_data, pmt_from_long(x + 1)); // send next request +} + +REGISTER_MBLOCK_CLASS(sys_2); + +// ---------------------------------------------------------------- + +void +qa_mblock_sys::test_sys_2() +{ + mb_runtime_sptr rt = mb_make_runtime(); + pmt_t result = PMT_NIL; + + // std::cerr << "qa_mblock_sys::test_sys_2 (enter)\n"; + + rt->run("top-sys-2", "sys_2", PMT_F, &result); + CPPUNIT_ASSERT(pmt_equal(PMT_T, result)); +} + +// ================================================================ +// test_bitset_1 +// ================================================================ + +void +qa_mblock_sys::test_bitset_1() +{ + mb_runtime_sptr rt = mb_make_runtime(); + pmt_t result = PMT_NIL; + + long nmsgs = 1000; + long batch_size = 8; + + pmt_t arg = pmt_list2(pmt_from_long(nmsgs), // # of messages to send through pipe + pmt_from_long(batch_size)); + + rt->run("top", "qa_bitset_top", arg, &result); + + CPPUNIT_ASSERT(pmt_equal(PMT_T, result)); +} + +// ================================================================ +// test_disconnect +// ================================================================ + +void +qa_mblock_sys::test_disconnect() +{ + mb_runtime_sptr rt = mb_make_runtime(); + pmt_t result = PMT_NIL; + + long nmsgs = 10240; + + pmt_t arg = pmt_list1(pmt_from_long(nmsgs)); // # of messages to send through pipe + + + rt->run("top", "qa_disconnect_top", arg, &result); + + CPPUNIT_ASSERT(pmt_equal(PMT_T, result)); +} diff --git a/mblock/src/lib/qa_mblock_sys.h b/mblock/src/lib/qa_mblock_sys.h new file mode 100644 index 000000000..87333818e --- /dev/null +++ b/mblock/src/lib/qa_mblock_sys.h @@ -0,0 +1,45 @@ +/* -*- c++ -*- */ +/* + * Copyright 2006,2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ +#ifndef INCLUDED_QA_MBLOCK_SYS_H +#define INCLUDED_QA_MBLOCK_SYS_H + +#include <cppunit/extensions/HelperMacros.h> +#include <cppunit/TestCase.h> + +class qa_mblock_sys : public CppUnit::TestCase { + + CPPUNIT_TEST_SUITE(qa_mblock_sys); + CPPUNIT_TEST(test_sys_1); + CPPUNIT_TEST(test_sys_2); + CPPUNIT_TEST(test_bitset_1); + CPPUNIT_TEST(test_disconnect); + CPPUNIT_TEST_SUITE_END(); + + private: + void test_sys_1(); + void test_sys_2(); + void test_bitset_1(); + void test_disconnect(); +}; + +#endif /* INCLUDED_QA_MBLOCK_SYS_H */ + diff --git a/mblock/src/lib/qa_timeouts.cc b/mblock/src/lib/qa_timeouts.cc new file mode 100644 index 000000000..4439b6e8f --- /dev/null +++ b/mblock/src/lib/qa_timeouts.cc @@ -0,0 +1,292 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <qa_timeouts.h> +#include <cppunit/TestAssert.h> +#include <mb_mblock.h> +#include <mb_runtime.h> +#include <mb_protocol_class.h> +#include <mb_exception.h> +#include <mb_msg_queue.h> +#include <mb_message.h> +#include <mb_mblock_impl.h> +#include <mb_msg_accepter.h> +#include <mb_class_registry.h> +#include <mb_timer_queue.h> +#include <stdio.h> +#include <string.h> +#include <iostream> + + +static pmt_t s_timeout = pmt_intern("%timeout"); +static pmt_t s_done = pmt_intern("done"); + + +// ------------------------------------------------------------------------ +// Exercise the priority queue used to implement timeouts. +// ------------------------------------------------------------------------ +void +qa_timeouts::test_timer_queue() +{ + mb_timer_queue tq; + mb_msg_accepter_sptr accepter; + + mb_timeout_sptr t1000_000 = + mb_timeout_sptr(new mb_timeout(mb_time(1000,0), PMT_F, accepter)); + + mb_timeout_sptr t2000_000 = + mb_timeout_sptr(new mb_timeout(mb_time(2000,0), PMT_F, accepter)); + + mb_timeout_sptr t3000_000 = + mb_timeout_sptr(new mb_timeout(mb_time(3000,0), PMT_F, accepter)); + + mb_timeout_sptr t3000_125 = + mb_timeout_sptr(new mb_timeout(mb_time(3000,125), PMT_F, accepter)); + + mb_timeout_sptr t3000_250 = + mb_timeout_sptr(new mb_timeout(mb_time(3000,250), PMT_F, accepter)); + + mb_timeout_sptr t4000_000 = + mb_timeout_sptr(new mb_timeout(mb_time(4000,0), PMT_F, accepter)); + + // insert in pseudo-random order + + tq.push(t3000_125); + tq.push(t1000_000); + tq.push(t4000_000); + tq.push(t3000_250); + tq.push(t2000_000); + tq.push(t3000_000); + + CPPUNIT_ASSERT_EQUAL(t1000_000, tq.top()); + tq.pop(); + + CPPUNIT_ASSERT_EQUAL(t2000_000, tq.top()); + tq.pop(); + + CPPUNIT_ASSERT_EQUAL(t3000_000, tq.top()); + tq.pop(); + + CPPUNIT_ASSERT_EQUAL(t3000_125, tq.top()); + tq.pop(); + + CPPUNIT_ASSERT_EQUAL(t3000_250, tq.top()); + tq.pop(); + + CPPUNIT_ASSERT_EQUAL(t4000_000, tq.top()); + tq.pop(); + + CPPUNIT_ASSERT(tq.empty()); + + // insert in pseudo-random order + + tq.push(t3000_000); + tq.push(t4000_000); + tq.push(t3000_125); + tq.push(t1000_000); + tq.push(t2000_000); + tq.push(t3000_250); + + tq.cancel(t1000_000->handle()); + + CPPUNIT_ASSERT_EQUAL(t2000_000, tq.top()); + tq.pop(); + + CPPUNIT_ASSERT_EQUAL(t3000_000, tq.top()); + tq.pop(); + + tq.cancel(t3000_250->handle()); + + CPPUNIT_ASSERT_EQUAL(t3000_125, tq.top()); + tq.pop(); + + CPPUNIT_ASSERT_EQUAL(t4000_000, tq.top()); + tq.pop(); + + CPPUNIT_ASSERT(tq.empty()); +} + +// ------------------------------------------------------------------------ +// Test one-shot timeouts +// ------------------------------------------------------------------------ + +// FWIW, on SuSE 10.1 for x86-64, clock_getres returns 0.004 seconds. + +#define TIMING_MARGIN 0.010 // seconds + +class qa_timeouts_1_top : public mb_mblock +{ + int d_nleft; + int d_nerrors; + mb_time d_t0; + +public: + qa_timeouts_1_top(mb_runtime *runtime, + const std::string &instance_name, pmt_t user_arg); + + void initial_transition(); + void handle_message(mb_message_sptr msg); +}; + +qa_timeouts_1_top::qa_timeouts_1_top(mb_runtime *runtime, + const std::string &instance_name, + pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg), + d_nleft(0), d_nerrors(0) +{ +} + +void +qa_timeouts_1_top::initial_transition() +{ + d_t0 = mb_time::time(); // now + + schedule_one_shot_timeout(d_t0 + 0.200, pmt_from_double(0.200)); + schedule_one_shot_timeout(d_t0 + 0.125, pmt_from_double(0.125)); + schedule_one_shot_timeout(d_t0 + 0.075, pmt_from_double(0.075)); + schedule_one_shot_timeout(d_t0 + 0.175, pmt_from_double(0.175)); + + d_nleft = 4; +} + +void +qa_timeouts_1_top::handle_message(mb_message_sptr msg) +{ + if (pmt_eq(msg->signal(), s_timeout)){ + mb_time t_now = mb_time::time(); + double expected_delta_t = pmt_to_double(msg->data()); + double actual_delta_t = (t_now - d_t0).double_time(); + double delta = expected_delta_t - actual_delta_t; + + if (fabs(delta) > TIMING_MARGIN){ + std::cerr << "qa_timeouts_1_top: expected_delta_t = " << expected_delta_t + << " actual_delta_t = " << actual_delta_t << std::endl; + d_nerrors++; + } + + if (--d_nleft <= 0) + shutdown_all(d_nerrors == 0 ? PMT_T : PMT_F); + } +} + +REGISTER_MBLOCK_CLASS(qa_timeouts_1_top); + +void +qa_timeouts::test_timeouts_1() +{ + mb_runtime_sptr rt = mb_make_runtime(); + pmt_t result = PMT_NIL; + + rt->run("top", "qa_timeouts_1_top", PMT_F, &result); + + CPPUNIT_ASSERT(pmt_equal(PMT_T, result)); +} + +// ------------------------------------------------------------------------ +// Test periodic timeouts +// ------------------------------------------------------------------------ + +class qa_timeouts_2_top : public mb_mblock +{ + int d_nhandled; + int d_nerrors; + double d_delta_t; + mb_time d_t0; + +public: + qa_timeouts_2_top(mb_runtime *runtime, + const std::string &instance_name, pmt_t user_arg); + + void initial_transition(); + void handle_message(mb_message_sptr msg); +}; + +qa_timeouts_2_top::qa_timeouts_2_top(mb_runtime *runtime, + const std::string &instance_name, + pmt_t user_arg) + : mb_mblock(runtime, instance_name, user_arg), + d_nhandled(0), d_nerrors(0), d_delta_t(0.075) +{ +} + +void +qa_timeouts_2_top::initial_transition() +{ + d_t0 = mb_time::time(); // now + + schedule_periodic_timeout(d_t0 + d_delta_t, mb_time(d_delta_t), PMT_T); +} + +void +qa_timeouts_2_top::handle_message(mb_message_sptr msg) +{ + static const int NMSGS_TO_HANDLE = 5; + + if (pmt_eq(msg->signal(), s_timeout) + && !pmt_eq(msg->data(), s_done)){ + + mb_time t_now = mb_time::time(); + + d_nhandled++; + + double expected_delta_t = d_delta_t * d_nhandled; + double actual_delta_t = (t_now - d_t0).double_time(); + double delta = expected_delta_t - actual_delta_t; + + if (fabs(delta) > TIMING_MARGIN){ + std::cerr << "qa_timeouts_2_top: expected_delta_t = " << expected_delta_t + << " actual_delta_t = " << actual_delta_t << std::endl; + d_nerrors++; + } + + if (d_nhandled == NMSGS_TO_HANDLE){ + cancel_timeout(msg->metadata()); // test cancel_timeout... + schedule_one_shot_timeout(d_t0 + (d_delta_t * (d_nhandled + 2)), s_done); + } + } + + if (pmt_eq(msg->signal(), s_timeout) + && pmt_eq(msg->data(), s_done)){ + if (d_nhandled != NMSGS_TO_HANDLE){ + std::cerr << "qa_timeouts_2_top: d_nhandled = " << d_nhandled + << " expected d_nhandled = " << NMSGS_TO_HANDLE + << " (cancel_timeout didn't work)\n"; + d_nerrors++; + } + shutdown_all(d_nerrors == 0 ? PMT_T : PMT_F); + } +} + +REGISTER_MBLOCK_CLASS(qa_timeouts_2_top); + +void +qa_timeouts::test_timeouts_2() +{ + mb_runtime_sptr rt = mb_make_runtime(); + pmt_t result = PMT_NIL; + + rt->run("top", "qa_timeouts_2_top", PMT_F, &result); + + CPPUNIT_ASSERT(pmt_equal(PMT_T, result)); +} diff --git a/mblock/src/lib/qa_timeouts.h b/mblock/src/lib/qa_timeouts.h new file mode 100644 index 000000000..736c4c2d9 --- /dev/null +++ b/mblock/src/lib/qa_timeouts.h @@ -0,0 +1,43 @@ +/* -*- c++ -*- */ +/* + * Copyright 2006,2007 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ +#ifndef INCLUDED_QA_TIMEOUTS_H +#define INCLUDED_QA_TIMEOUTS_H + +#include <cppunit/extensions/HelperMacros.h> +#include <cppunit/TestCase.h> + +class qa_timeouts : public CppUnit::TestCase { + + CPPUNIT_TEST_SUITE(qa_timeouts); + CPPUNIT_TEST(test_timer_queue); + CPPUNIT_TEST(test_timeouts_1); + CPPUNIT_TEST(test_timeouts_2); + CPPUNIT_TEST_SUITE_END(); + + private: + void test_timer_queue(); + void test_timeouts_1(); + void test_timeouts_2(); +}; + +#endif /* INCLUDED_QA_TIMEOUTS_H */ + diff --git a/mblock/src/scheme/Makefile.am b/mblock/src/scheme/Makefile.am new file mode 100644 index 000000000..7700a1fc0 --- /dev/null +++ b/mblock/src/scheme/Makefile.am @@ -0,0 +1,21 @@ +# +# Copyright 2007 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +SUBDIRS = gnuradio diff --git a/mblock/src/scheme/gnuradio/Makefile.am b/mblock/src/scheme/gnuradio/Makefile.am new file mode 100644 index 000000000..f217f6852 --- /dev/null +++ b/mblock/src/scheme/gnuradio/Makefile.am @@ -0,0 +1,19 @@ +# +# Copyright 2007 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# diff --git a/mblock/src/scheme/gnuradio/compile-mbh.scm b/mblock/src/scheme/gnuradio/compile-mbh.scm new file mode 100755 index 000000000..fbad90d7a --- /dev/null +++ b/mblock/src/scheme/gnuradio/compile-mbh.scm @@ -0,0 +1,231 @@ +#!/usr/bin/guile \ +-e main -s +!# +;; -*-scheme-*- +;; +;; Copyright 2007 Free Software Foundation, Inc. +;; +;; This file is part of GNU Radio +;; +;; GNU Radio is free software; you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published by +;; the Free Software Foundation; either version 2, or (at your option) +;; any later version. +;; +;; GNU Radio is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU General Public License for more details. +;; +;; You should have received a copy of the GNU General Public License along +;; with this program; if not, write to the Free Software Foundation, Inc., +;; 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +;; + +;; usage: compile-mbh <input-file> <output-file> + +(use-modules (ice-9 getopt-long)) +(use-modules (ice-9 format)) +(use-modules (ice-9 pretty-print)) +;(use-modules (ice-9 slib)) +(use-modules (gnuradio pmt-serialize)) +(use-modules (gnuradio macros-etc)) + +(debug-enable 'backtrace) + +;; ---------------------------------------------------------------- + +(define (main args) + + (define (usage) + (format 0 "usage: ~a input-file output-file~%" (car args))) + + (when (not (= (length args) 3)) + (usage) + (exit 1)) + + (let ((input-filename (cadr args)) + (output-filename (caddr args))) + (if (compile-mbh-file input-filename output-filename) + (exit 0) + (exit 1)))) + + +;; ---------------------------------------------------------------- +;; constructor and accessors for protocol-class + +(define %protocol-class-tag (string->symbol "[PROTOCOL-CLASS-TAG]")) + +(define (make-protocol-class name incoming outgoing) + (vector %protocol-class-tag name incoming outgoing)) + +(define (protocol-class? obj) + (and (vector? obj) (eq? %protocol-class-tag (vector-ref obj 0)))) + +(define (protocol-class-name pc) + (vector-ref pc 1)) + +(define (protocol-class-incoming pc) + (vector-ref pc 2)) + +(define (protocol-class-outgoing pc) + (vector-ref pc 3)) + + +;; ---------------------------------------------------------------- + +(define (syntax-error msg e) + (throw 'syntax-error msg e)) + +(define (unrecognized-form form) + (syntax-error "Unrecognized form" form)) + + +(define (mbh-chk-length= e y n) + (cond ((and (null? y)(zero? n)) + #f) + ((null? y) + (syntax-error "Expression has too few subexpressions" e)) + ((atom? y) + (syntax-error (if (atom? e) + "List expected" + "Expression ends with `dotted' atom") + e)) + ((zero? n) + (syntax-error "Expression has too many subexpressions" e)) + (else + (mbh-chk-length= e (cdr y) (- n 1))))) + +(define (mbh-chk-length>= e y n) + (cond ((and (null? y)(< n 1)) + #f) + ((atom? y) + (mbh-chk-length= e y -1)) + (else + (mbh-chk-length>= e (cdr y) (- n 1))))) + + +(define (compile-mbh-file input-filename output-filename) + (let ((i-port (open-input-file input-filename)) + (o-port (open-output-file output-filename))) + + (letrec + ((protocol-classes '()) ; alist + + (lookup-protocol-class ; returns protocol-class or #f + (lambda (name) + (cond ((assq name protocol-classes) => cdr) + (else #f)))) + + (register-protocol-class + (lambda (pc) + (set! protocol-classes (acons (protocol-class-name pc) + pc protocol-classes)) + pc)) + + (parse-top-level-form + (lambda (form) + (mbh-chk-length>= form form 1) + (case (car form) + ((define-protocol-class) (parse-define-protocol-class form)) + (else (syntax-error form))))) + + (parse-define-protocol-class + (lambda (form) + (mbh-chk-length>= form form 2) + ;; form => (define-protocol-class name + ;; (:include protocol-class-name) + ;; (:incoming list-of-msgs) + ;; (:outgoing list-of-msgs)) + (let ((name (cadr form)) + (incoming '()) + (outgoing '())) + (if (lookup-protocol-class name) + (syntax-error "Duplicate protocol-class name" name)) + (for-each + (lambda (sub-form) + (mbh-chk-length>= sub-form sub-form 1) + (case (car sub-form) + ((:include) + (mbh-chk-length>= sub-form sub-form 2) + (cond ((lookup-protocol-class (cadr sub-form)) => + (lambda (pc) + (set! incoming (append incoming (protocol-class-incoming pc))) + (set! outgoing (append outgoing (protocol-class-outgoing pc))))) + (else + (syntax-error "Unknown protocol-class-name" (cadr sub-form))))) + ((:incoming) + (set! incoming (append incoming (cdr sub-form)))) + ((:outgoing) + (set! outgoing (append outgoing (cdr sub-form)))) + (else + (unrecognized-form (car sub-form))))) + (cddr form)) + + (register-protocol-class (make-protocol-class name incoming outgoing))))) + + ) ; end of bindings + + (for-each-in-file i-port parse-top-level-form) + + ;; generate the output here... + + (letrec ((classes (map cdr protocol-classes)) + (so-stream (make-serial-output-stream)) + (format-output-for-c++ + (lambda (output) + (format o-port "//~%") + (format o-port "// Machine generated by compile-mbh from ~a~%" input-filename) + (format o-port "//~%") + (format o-port "// protocol-classes: ~{~a ~}~%" (map car protocol-classes)) + (format o-port "//~%") + + (format o-port "#include <mb_protocol_class.h>~%") + (format o-port "#include <unistd.h>~%") + (format o-port + "static const char~%protocol_class_init_data[~d] = {~% " + (length output)) + + (do ((lst output (cdr lst)) + (i 0 (+ i 1))) + ((null? lst) #t) + (format o-port "~a, " (car lst)) + (when (= 15 (modulo i 16)) + (format o-port "~% "))) + + (format o-port "~&};~%") + (format o-port "static mb_protocol_class_init _init_(protocol_class_init_data, sizeof(protocol_class_init_data));~%") + ))) + + + (map (lambda (pc) + (let ((obj-to-dump + (list (protocol-class-name pc) ; class name + (map car (protocol-class-incoming pc)) ; incoming msg names + (map car (protocol-class-outgoing pc)) ; outgoing msg names + ;;(protocol-class-incoming pc) ; full incoming msg descriptions + ;;(protocol-class-outgoing pc) ; full outgoing msg descriptions + ))) + ;;(pretty-print obj-to-dump) + (pmt-serialize obj-to-dump (so-stream 'put-byte)))) + classes) + + (format-output-for-c++ ((so-stream 'get-output))) + + #t)))) + + +(define (make-serial-output-stream) + (letrec ((output '()) + (put-byte + (lambda (byte) + (set! output (cons byte output)))) + (get-output + (lambda () + (reverse output)))) + (lambda (key) + (case key + ((put-byte) put-byte) + ((get-output) get-output) + (else (error "Unknown key" key)))))) + |