summaryrefslogtreecommitdiff
path: root/mblock/src/lib/mb_runtime_thread_per_block.cc
diff options
context:
space:
mode:
Diffstat (limited to 'mblock/src/lib/mb_runtime_thread_per_block.cc')
-rw-r--r--mblock/src/lib/mb_runtime_thread_per_block.cc350
1 files changed, 0 insertions, 350 deletions
diff --git a/mblock/src/lib/mb_runtime_thread_per_block.cc b/mblock/src/lib/mb_runtime_thread_per_block.cc
deleted file mode 100644
index b16009436..000000000
--- a/mblock/src/lib/mb_runtime_thread_per_block.cc
+++ /dev/null
@@ -1,350 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-#include <mb_runtime_thread_per_block.h>
-#include <mblock/mblock.h>
-#include <mb_mblock_impl.h>
-#include <mblock/class_registry.h>
-#include <mblock/exception.h>
-#include <mb_worker.h>
-#include <gnuradio/omnithread.h>
-#include <iostream>
-#include <mb_msg_accepter_msgq.h>
-
-using namespace pmt;
-
-static pmt_t s_halt = pmt_intern("%halt");
-static pmt_t s_sys_port = pmt_intern("%sys-port");
-static pmt_t s_shutdown = pmt_intern("%shutdown");
-static pmt_t s_request_shutdown = pmt_intern("%request-shutdown");
-static pmt_t s_worker_state_changed = pmt_intern("%worker-state-changed");
-static pmt_t s_timeout = pmt_intern("%timeout");
-static pmt_t s_request_timeout = pmt_intern("%request-timeout");
-static pmt_t s_cancel_timeout = pmt_intern("%cancel-timeout");
-static pmt_t s_send_halt = pmt_intern("send-halt");
-static pmt_t s_exit_now = pmt_intern("exit-now");
-
-static void
-send_sys_msg(mb_msg_queue &msgq, pmt_t signal,
- pmt_t data = PMT_F, pmt_t metadata = PMT_F,
- mb_pri_t priority = MB_PRI_BEST)
-{
- mb_message_sptr msg = mb_make_message(signal, data, metadata, priority);
- msg->set_port_id(s_sys_port);
- msgq.insert(msg);
-}
-
-
-mb_runtime_thread_per_block::mb_runtime_thread_per_block()
- : d_shutdown_in_progress(false),
- d_shutdown_result(PMT_T)
-{
- d_accepter = mb_msg_accepter_sptr(new mb_msg_accepter_msgq(&d_msgq));
-}
-
-mb_runtime_thread_per_block::~mb_runtime_thread_per_block()
-{
- // FIXME iterate over workers and ensure that they are dead.
-
- if (!d_workers.empty())
- std::cerr << "\nmb_runtime_thread_per_block: dtor (# workers = "
- << d_workers.size() << ")\n";
-}
-
-void
-mb_runtime_thread_per_block::request_shutdown(pmt_t result)
-{
- (*accepter())(s_request_shutdown, result, PMT_F, MB_PRI_BEST);
-}
-
-bool
-mb_runtime_thread_per_block::run(const std::string &instance_name,
- const std::string &class_name,
- pmt_t user_arg, pmt_t *result)
-{
- if (result) // set it to something now, in case we throw
- *result = PMT_F;
-
- // reset the shutdown state
- d_shutdown_in_progress = false;
- d_shutdown_result = PMT_T;
-
- assert(d_workers.empty());
-
- while (!d_timer_queue.empty()) // ensure timer queue is empty
- d_timer_queue.pop();
-
- /*
- * Create the top-level component, and recursively all of its
- * subcomponents.
- */
- d_top = create_component(instance_name, class_name, user_arg);
-
- try {
- run_loop();
- }
- catch (...){
- d_top.reset();
- throw;
- }
-
- if (result)
- *result = d_shutdown_result;
-
- d_top.reset();
- return true;
-}
-
-void
-mb_runtime_thread_per_block::run_loop()
-{
- while (1){
- mb_message_sptr msg;
-
- if (d_timer_queue.empty()) // Any timeouts pending?
- msg = d_msgq.get_highest_pri_msg(); // Nope. Block forever.
-
- else {
- mb_timeout_sptr to = d_timer_queue.top(); // Yep. Get earliest timeout.
-
- // wait for a msg or the timeout...
- msg = d_msgq.get_highest_pri_msg_timedwait(to->d_when);
-
- if (!msg){ // We timed out.
- d_timer_queue.pop(); // Remove timeout from timer queue.
-
- // send the %timeout msg
- (*to->d_accepter)(s_timeout, to->d_user_data, to->handle(), MB_PRI_BEST);
-
- if (to->d_is_periodic){
- to->d_when = to->d_when + to->d_delta; // update time of next firing
- d_timer_queue.push(to); // push it back into the queue
- }
- continue;
- }
- }
-
- pmt_t signal = msg->signal();
-
- if (pmt_eq(signal, s_worker_state_changed)){ // %worker-state-changed
- omni_mutex_lock l1(d_workers_mutex);
- reap_dead_workers();
- if (d_workers.empty()) // no work left to do...
- return;
- }
- else if (pmt_eq(signal, s_request_shutdown)){ // %request-shutdown
- if (!d_shutdown_in_progress){
- d_shutdown_in_progress = true;
- d_shutdown_result = msg->data();
-
- // schedule a timeout for ourselves...
- schedule_one_shot_timeout(mb_time::time(0.100), s_send_halt, d_accepter);
- send_all_sys_msg(s_shutdown);
- }
- }
- else if (pmt_eq(signal, s_request_timeout)){ // %request-timeout
- mb_timeout_sptr to =
- boost::any_cast<mb_timeout_sptr>(pmt_any_ref(msg->data()));
- d_timer_queue.push(to);
- }
- else if (pmt_eq(signal, s_cancel_timeout)){ // %cancel-timeout
- d_timer_queue.cancel(msg->data());
- }
- else if (pmt_eq(signal, s_timeout)
- && pmt_eq(msg->data(), s_send_halt)){ // %timeout, send-halt
-
- // schedule another timeout for ourselves...
- schedule_one_shot_timeout(mb_time::time(0.100), s_exit_now, d_accepter);
- send_all_sys_msg(s_halt);
- }
- else if (pmt_eq(signal, s_timeout)
- && pmt_eq(msg->data(), s_exit_now)){ // %timeout, exit-now
-
- // We only get here if we've sent all workers %shutdown followed
- // by %halt, and one or more of them is still alive. They must
- // be blocked in the kernel. FIXME We could add one more step:
- // pthread_kill(...) but for now, we'll just ignore them...
- return;
- }
- else {
- std::cerr << "mb_runtime_thread_per_block: unhandled msg: " << msg << std::endl;
- }
- }
-}
-
-void
-mb_runtime_thread_per_block::reap_dead_workers()
-{
- // Already holding mutex
- // omni_mutex_lock l1(d_workers_mutex);
-
- for (worker_iter_t wi = d_workers.begin(); wi != d_workers.end(); ){
- bool is_dead;
-
- // We can't join while holding the worker mutex, since that would
- // attempt to destroy the mutex we're holding (omnithread's join
- // deletes the omni_thread object after the pthread_join
- // completes) Instead, we lock just long enough to figure out if
- // the worker is dead.
- {
- omni_mutex_lock l2((*wi)->d_mutex);
- is_dead = (*wi)->d_state == mb_worker::TS_DEAD;
- }
-
- if (is_dead){
- if (0)
- std::cerr << "\nruntime: "
- << "(" << (*wi)->id() << ") "
- << (*wi)->d_mblock->instance_name() << " is TS_DEAD\n";
- void *ignore;
- (*wi)->join(&ignore);
- wi = d_workers.erase(wi);
- continue;
- }
- ++wi;
- }
-}
-
-//
-// Create the thread, then create the component in the thread.
-// Return a pointer to the created mblock.
-//
-// Can be invoked from any thread
-//
-mb_mblock_sptr
-mb_runtime_thread_per_block::create_component(const std::string &instance_name,
- const std::string &class_name,
- pmt_t user_arg)
-{
- mb_mblock_maker_t maker;
- if (!mb_class_registry::lookup_maker(class_name, &maker))
- throw mbe_no_such_class(0, class_name + " (in " + instance_name + ")");
-
- // FIXME here's where we'd lookup NUMA placement requests & mblock
- // priorities and communicate them to the worker we're creating...
-
- // Create the worker thread
- mb_worker *w =
- new mb_worker(this, maker, instance_name, user_arg);
-
- w->start_undetached(); // start it
-
- // Wait for it to reach TS_RUNNING or TS_DEAD
-
- bool is_dead;
- mb_worker::cause_of_death_t why_dead;
- {
- omni_mutex_lock l(w->d_mutex);
- while (!(w->d_state == mb_worker::TS_RUNNING
- || w->d_state == mb_worker::TS_DEAD))
- w->d_state_cond.wait();
-
- is_dead = w->d_state == mb_worker::TS_DEAD;
- why_dead = w->d_why_dead;
- }
-
- // If the worker failed to init (constructor or initial_transition
- // raised an exception), reap the worker now and raise an exception.
-
- if (is_dead && why_dead != mb_worker::RIP_EXIT){
-
- void *ignore;
- w->join(&ignore);
-
- // FIXME with some work we ought to be able to propagate the
- // exception from the worker.
- throw mbe_mblock_failed(0, instance_name);
- }
-
- assert(w->d_mblock);
-
- // Add w to the vector of workers, and return the mblock.
- {
- omni_mutex_lock l(d_workers_mutex);
- d_workers.push_back(w);
- }
-
- if (0)
- std::cerr << "\nruntime: created "
- << "(" << w->id() << ") "
- << w->d_mblock->instance_name() << "\n";
-
- return w->d_mblock;
-}
-
-void
-mb_runtime_thread_per_block::send_all_sys_msg(pmt_t signal,
- pmt_t data,
- pmt_t metadata,
- mb_pri_t priority)
-{
- omni_mutex_lock l1(d_workers_mutex);
-
- for (worker_iter_t wi = d_workers.begin(); wi != d_workers.end(); ++wi){
- send_sys_msg((*wi)->d_mblock->impl()->msgq(),
- signal, data, metadata, priority);
- }
-}
-
-//
-// Can be invoked from any thread.
-// Sends a message to the runtime.
-//
-pmt_t
-mb_runtime_thread_per_block::schedule_one_shot_timeout
- (const mb_time &abs_time,
- pmt_t user_data,
- mb_msg_accepter_sptr accepter)
-{
- mb_timeout_sptr to(new mb_timeout(abs_time, user_data, accepter));
- (*d_accepter)(s_request_timeout, pmt_make_any(to), PMT_F, MB_PRI_BEST);
- return to->handle();
-}
-
-//
-// Can be invoked from any thread.
-// Sends a message to the runtime.
-//
-pmt_t
-mb_runtime_thread_per_block::schedule_periodic_timeout
- (const mb_time &first_abs_time,
- const mb_time &delta_time,
- pmt_t user_data,
- mb_msg_accepter_sptr accepter)
-{
- mb_timeout_sptr to(new mb_timeout(first_abs_time, delta_time,
- user_data, accepter));
- (*d_accepter)(s_request_timeout, pmt_make_any(to), PMT_F, MB_PRI_BEST);
- return to->handle();
-}
-
-//
-// Can be invoked from any thread.
-// Sends a message to the runtime.
-//
-void
-mb_runtime_thread_per_block::cancel_timeout(pmt_t handle)
-{
- (*d_accepter)(s_cancel_timeout, handle, PMT_F, MB_PRI_BEST);
-}