summaryrefslogtreecommitdiff
path: root/gcell/lib/runtime/gc_job_manager_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gcell/lib/runtime/gc_job_manager_impl.cc')
-rw-r--r--gcell/lib/runtime/gc_job_manager_impl.cc1249
1 files changed, 1249 insertions, 0 deletions
diff --git a/gcell/lib/runtime/gc_job_manager_impl.cc b/gcell/lib/runtime/gc_job_manager_impl.cc
new file mode 100644
index 000000000..629019f4d
--- /dev/null
+++ b/gcell/lib/runtime/gc_job_manager_impl.cc
@@ -0,0 +1,1249 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include "gc_job_manager_impl.h"
+#include <gcell/gc_mbox.h>
+#include <gcell/gc_aligned_alloc.h>
+#include <gcell/memory_barrier.h>
+#include <gc_proc_def_utils.h>
+#include <atomic_dec_if_positive.h>
+#include <stdio.h>
+#include <stdexcept>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <string.h>
+
+
+static const size_t CACHE_LINE_SIZE = 128;
+
+static const unsigned int DEFAULT_MAX_JOBS = 128;
+static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;
+
+// FIXME this really depends on the SPU code...
+static const unsigned int MAX_TOTAL_INDIRECT_LENGTH = 16 * 1024;
+
+
+static bool s_key_initialized = false;
+static pthread_key_t s_client_key;
+
+static int s_worker_debug = 0;
+
+// custom deleter of gang_contexts for use with boost::shared_ptr
+class gang_deleter {
+public:
+ void operator()(spe_gang_context_ptr_t ctx) {
+ if (ctx){
+ int r = spe_gang_context_destroy(ctx);
+ if (r != 0){
+ perror("spe_gang_context_destroy");
+ }
+ }
+ }
+};
+
+
+// custom deleter of anything that can be freed with "free"
+class free_deleter {
+public:
+ void operator()(void *p) {
+ free(p);
+ }
+};
+
+
+/*
+ * Called when client thread is destroyed.
+ * We mark our client info free.
+ */
+static void
+client_key_destructor(void *p)
+{
+ ((gc_client_thread_info *) p)->d_free = 1;
+}
+
+static bool
+is_power_of_2(uint32_t x)
+{
+ return (x != 0) && !(x & (x - 1));
+}
+
+////////////////////////////////////////////////////////////////////////
+
+
+gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
+ : d_debug(0), d_spu_args(0),
+ d_eh_cond(&d_eh_mutex), d_eh_thread(0), d_eh_state(EHS_INIT),
+ d_shutdown_requested(false),
+ d_client_thread(0), d_ea_args_maxsize(0),
+ d_proc_def(0), d_proc_def_ls_addr(0), d_nproc_defs(0)
+{
+ if (!s_key_initialized){
+ int r = pthread_key_create(&s_client_key, client_key_destructor);
+ if (r != 0)
+ throw std::runtime_error("pthread_key_create");
+ s_key_initialized = true;
+ }
+
+ // ensure it's zero
+ pthread_setspecific(s_client_key, 0);
+
+ if (options != 0)
+ d_options = *options;
+
+ // provide the real default for those indicated with a zero
+ if (d_options.max_jobs == 0)
+ d_options.max_jobs = DEFAULT_MAX_JOBS;
+ if (d_options.max_client_threads == 0)
+ d_options.max_client_threads = DEFAULT_MAX_CLIENT_THREADS;
+
+ if (!d_options.program_handle){
+ fprintf(stderr, "gc_job_manager: options->program_handle must be non-zero\n");
+ throw std::runtime_error("gc_job_manager: options->program_handle must be non-zero");
+ }
+
+ int ncpu_nodes = spe_cpu_info_get(SPE_COUNT_PHYSICAL_CPU_NODES, -1);
+ int nusable_spes = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);
+
+ if (debug()){
+ printf("cpu_nodes = %d\n", ncpu_nodes);
+ for (int i = 0; i < ncpu_nodes; i++){
+ printf("node[%d].physical_spes = %2d\n", i,
+ spe_cpu_info_get(SPE_COUNT_PHYSICAL_SPES, i));
+ printf("node[%d].usable_spes = %2d\n", i,
+ spe_cpu_info_get(SPE_COUNT_USABLE_SPES, i));
+ }
+ }
+
+ // clamp nspes
+ d_options.nspes = std::min(d_options.nspes, (unsigned int) MAX_SPES);
+ nusable_spes = std::min(nusable_spes, (int) MAX_SPES);
+
+ //
+ // sanity check requested number of spes.
+ //
+ if (d_options.nspes == 0) // use all of them
+ d_options.nspes = nusable_spes;
+ else {
+ if (d_options.nspes > (unsigned int) nusable_spes){
+ fprintf(stderr,
+ "gc_job_manager: warning: caller requested %d spes. There are only %d available.\n",
+ d_options.nspes, nusable_spes);
+ if (d_options.gang_schedule){
+ // If we're gang scheduling we'll never get scheduled if we
+ // ask for more than are available.
+ throw std::out_of_range("gang_scheduling: not enough spes available");
+ }
+ else { // FIXME clamp to usable. problem on PS3 when overcommited
+ fprintf(stderr, "gc_job_manager: clamping nspes to %d\n", nusable_spes);
+ d_options.nspes = nusable_spes;
+ }
+ }
+ }
+
+ if (d_options.use_affinity){
+ printf("gc_job_manager: warning: affinity request was ignored\n");
+ }
+
+ if (d_options.gang_schedule){
+ d_gang = spe_gang_context_sptr(spe_gang_context_create(0), gang_deleter());
+ if (!d_gang){
+ perror("gc_job_manager_impl[spe_gang_context_create]");
+ throw std::runtime_error("spe_gang_context_create");
+ }
+ }
+
+ // ----------------------------------------------------------------
+ // initalize the job queue
+
+ d_queue = (gc_jd_queue_t *) gc_aligned_alloc(sizeof(gc_jd_queue_t), CACHE_LINE_SIZE);
+ _d_queue_boost =
+ boost::shared_ptr<void>((void *) d_queue, free_deleter());
+ gc_jd_queue_init(d_queue);
+
+
+ // ----------------------------------------------------------------
+ // create the spe contexts
+
+ // 1 spu_arg struct for each SPE
+ assert(sizeof(gc_spu_args_t) % 16 == 0);
+ d_spu_args =
+ (gc_spu_args_t *) gc_aligned_alloc(MAX_SPES * sizeof(gc_spu_args_t), 16);
+ _d_spu_args_boost =
+ boost::shared_ptr<void>((void *) d_spu_args, free_deleter());
+
+ // 2 completion info structs for each SPE (we double buffer them)
+ assert(sizeof(gc_comp_info_t) % CACHE_LINE_SIZE == 0);
+ d_comp_info =
+ (gc_comp_info_t *) gc_aligned_alloc(2 * MAX_SPES * sizeof(gc_comp_info_t),
+ CACHE_LINE_SIZE);
+ _d_comp_info_boost =
+ boost::shared_ptr<void>((void *) d_comp_info, free_deleter());
+
+
+ // get a handle to the spe program
+
+ spe_program_handle_t *spe_image = d_options.program_handle.get();
+
+ // fish proc_def table out of SPE ELF file
+
+ if (!gcpd_find_table(spe_image, &d_proc_def, &d_nproc_defs, &d_proc_def_ls_addr)){
+ fprintf(stderr, "gc_job_manager_impl: couldn't find gc_proc_defs in SPE ELF file.\n");
+ throw std::runtime_error("no gc_proc_defs");
+ }
+ // fprintf(stderr, "d_proc_def_ls_addr = 0x%0x\n", d_proc_def_ls_addr);
+
+ int spe_flags = (SPE_EVENTS_ENABLE
+ | SPE_CFG_SIGNOTIFY1_OR
+ | SPE_CFG_SIGNOTIFY2_OR);
+
+ for (unsigned int i = 0; i < d_options.nspes; i++){
+ // FIXME affinity stuff goes here
+ d_worker[i].spe_ctx = spe_context_create(spe_flags, d_gang.get());;
+ if (d_worker[i].spe_ctx == 0){
+ perror("spe_context_create");
+ throw std::runtime_error("spe_context_create");
+ }
+ d_worker[i].spe_idx = i;
+ d_worker[i].spu_args = &d_spu_args[i];
+ d_worker[i].spu_args->queue = ptr_to_ea(d_queue);
+ d_worker[i].spu_args->comp_info[0] = ptr_to_ea(&d_comp_info[2*i+0]);
+ d_worker[i].spu_args->comp_info[1] = ptr_to_ea(&d_comp_info[2*i+1]);
+ d_worker[i].spu_args->spu_idx = i;
+ d_worker[i].spu_args->nspus = d_options.nspes;
+ d_worker[i].spu_args->proc_def_ls_addr = d_proc_def_ls_addr;
+ d_worker[i].spu_args->nproc_defs = d_nproc_defs;
+ d_worker[i].spu_args->log.base = 0;
+ d_worker[i].spu_args->log.nentries = 0;
+ d_worker[i].state = WS_INIT;
+
+ int r = spe_program_load(d_worker[i].spe_ctx, spe_image);
+ if (r != 0){
+ perror("spe_program_load");
+ throw std::runtime_error("spe_program_load");
+ }
+ }
+
+ setup_logfiles();
+
+ // ----------------------------------------------------------------
+ // initalize the free list of job descriptors
+
+ d_free_list = (gc_jd_stack_t *) gc_aligned_alloc(sizeof(gc_jd_stack_t), CACHE_LINE_SIZE);
+ // This ensures that the memory associated with d_free_list is
+ // automatically freed in the destructor or if an exception occurs
+ // here in the constructor.
+ _d_free_list_boost =
+ boost::shared_ptr<void>((void *) d_free_list, free_deleter());
+ gc_jd_stack_init(d_free_list);
+
+ if (debug()){
+ printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
+ printf("max_jobs = %u\n", d_options.max_jobs);
+ }
+
+ // Initialize the array of job descriptors.
+ d_jd = (gc_job_desc_t *) gc_aligned_alloc(sizeof(d_jd[0]) * d_options.max_jobs, CACHE_LINE_SIZE);
+ _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());
+
+
+ // set unique job_id
+ for (int i = 0; i < (int) d_options.max_jobs; i++)
+ d_jd[i].sys.job_id = i;
+
+ // push them onto the free list
+ for (int i = d_options.max_jobs - 1; i >= 0; i--)
+ free_job_desc(&d_jd[i]);
+
+ // ----------------------------------------------------------------
+ // initialize d_client_thread
+
+ {
+ gc_client_thread_info_sa cti(
+ new gc_client_thread_info[d_options.max_client_threads]);
+
+ d_client_thread.swap(cti);
+
+ for (unsigned int i = 0; i < d_options.max_client_threads; i++)
+ d_client_thread[i].d_client_id = i;
+ }
+
+ // ----------------------------------------------------------------
+ // initialize bitvectors
+
+ // initialize d_bvlen, the number of longs in job related bitvectors.
+ int bits_per_long = sizeof(unsigned long) * 8;
+ d_bvlen = (d_options.max_jobs + bits_per_long - 1) / bits_per_long;
+
+ // allocate all bitvectors in a single cache-aligned chunk
+ size_t nlongs = d_bvlen * d_options.max_client_threads;
+ void *p = gc_aligned_alloc(nlongs * sizeof(unsigned long), CACHE_LINE_SIZE);
+ _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
+
+ // Now point the gc_client_thread_info bitvectors into this storage
+ unsigned long *v = (unsigned long *) p;
+
+ for (unsigned int i = 0; i < d_options.max_client_threads; i++, v += d_bvlen)
+ d_client_thread[i].d_jobs_done = v;
+
+
+ // ----------------------------------------------------------------
+ // create the spe event handler & worker (SPE) threads
+
+ create_event_handler();
+
+}
+
+////////////////////////////////////////////////////////////////////////
+
+gc_job_manager_impl::~gc_job_manager_impl()
+{
+ shutdown();
+
+ d_jd = 0; // handled via _d_jd_boost
+ d_free_list = 0; // handled via _d_free_list_boost
+ d_queue = 0; // handled via _d_queue_boost
+
+ // clear cti, since we've deleted the underlying data
+ pthread_setspecific(s_client_key, 0);
+
+ unmap_logfiles();
+}
+
+bool
+gc_job_manager_impl::shutdown()
+{
+ omni_mutex_lock l(d_eh_mutex);
+
+ d_shutdown_requested = true; // set flag for event handler thread
+
+ // should only happens during early QA code
+ if (d_eh_thread == 0 && d_eh_state == EHS_INIT)
+ return false;
+
+ while (d_eh_state != EHS_DEAD) // wait for it to finish
+ d_eh_cond.wait();
+
+ return true;
+}
+
+int
+gc_job_manager_impl::nspes() const
+{
+ return d_options.nspes;
+}
+
+////////////////////////////////////////////////////////////////////////
+
+void
+gc_job_manager_impl::bv_zero(unsigned long *bv)
+{
+ memset(bv, 0, sizeof(unsigned long) * d_bvlen);
+}
+
+inline void
+gc_job_manager_impl::bv_clr(unsigned long *bv, unsigned int bitno)
+{
+ unsigned int wi = bitno / (sizeof (unsigned long) * 8);
+ unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
+ bv[wi] &= ~(1UL << bi);
+}
+
+inline void
+gc_job_manager_impl::bv_set(unsigned long *bv, unsigned int bitno)
+{
+ unsigned int wi = bitno / (sizeof (unsigned long) * 8);
+ unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
+ bv[wi] |= (1UL << bi);
+}
+
+inline bool
+gc_job_manager_impl::bv_isset(unsigned long *bv, unsigned int bitno)
+{
+ unsigned int wi = bitno / (sizeof (unsigned long) * 8);
+ unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
+ return (bv[wi] & (1UL << bi)) != 0;
+}
+
+inline bool
+gc_job_manager_impl::bv_isclr(unsigned long *bv, unsigned int bitno)
+{
+ unsigned int wi = bitno / (sizeof (unsigned long) * 8);
+ unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
+ return (bv[wi] & (1UL << bi)) == 0;
+}
+
+////////////////////////////////////////////////////////////////////////
+
+gc_job_desc *
+gc_job_manager_impl::alloc_job_desc()
+{
+ // stack is lock free, and safe to call from any thread
+ gc_job_desc *jd = gc_jd_stack_pop(d_free_list);
+ if (jd == 0)
+ throw gc_bad_alloc("alloc_job_desc: none available");
+
+ return jd;
+}
+
+void
+gc_job_manager_impl::free_job_desc(gc_job_desc *jd)
+{
+ // stack is lock free, thus safe to call from any thread
+ if (jd != 0)
+ gc_jd_stack_push(d_free_list, jd);
+}
+
+////////////////////////////////////////////////////////////////////////
+
+/*
+ * We check as much as we can here on the PPE side, so that the SPE
+ * doesn't have to.
+ */
+static bool
+check_direct_args(gc_job_desc *jd, gc_job_direct_args *args)
+{
+ if (args->nargs > MAX_ARGS_DIRECT){
+ jd->status = JS_BAD_N_DIRECT;
+ return false;
+ }
+
+ return true;
+}
+
+static bool
+check_ea_args(gc_job_desc *jd, gc_job_ea_args *p)
+{
+ if (p->nargs > MAX_ARGS_EA){
+ jd->status = JS_BAD_N_EA;
+ return false;
+ }
+
+ uint32_t dir_union = 0;
+
+ for (unsigned int i = 0; i < p->nargs; i++){
+ dir_union |= p->arg[i].direction;
+ switch(p->arg[i].direction){
+ case GCJD_DMA_GET:
+ case GCJD_DMA_PUT:
+ break;
+
+ default:
+ jd->status = JS_BAD_DIRECTION;
+ return false;
+ }
+ }
+
+ if (p->nargs > 1){
+ unsigned int common_eah = (p->arg[0].ea_addr) >> 32;
+ for (unsigned int i = 1; i < p->nargs; i++){
+ if ((p->arg[i].ea_addr >> 32) != common_eah){
+ jd->status = JS_BAD_EAH;
+ return false;
+ }
+ }
+ }
+
+ jd->sys.direction_union = dir_union;
+ return true;
+}
+
+bool
+gc_job_manager_impl::submit_job(gc_job_desc *jd)
+{
+ if (unlikely(d_shutdown_requested)){
+ jd->status = JS_SHUTTING_DOWN;
+ return false;
+ }
+
+ // Ensure it's one of our job descriptors
+
+ if (jd < d_jd || jd >= &d_jd[d_options.max_jobs]){
+ jd->status = JS_BAD_JOB_DESC;
+ return false;
+ }
+
+ // Ensure we've got a client_thread_info assigned to this thread.
+
+ gc_client_thread_info *cti =
+ (gc_client_thread_info *) pthread_getspecific(s_client_key);
+ if (unlikely(cti == 0)){
+ if ((cti = alloc_cti()) == 0){
+ fprintf(stderr, "gc_job_manager_impl::submit_job: Too many client threads.\n");
+ jd->status = JS_TOO_MANY_CLIENTS;
+ return false;
+ }
+ int r = pthread_setspecific(s_client_key, cti);
+ if (r != 0){
+ jd->status = JS_BAD_JUJU;
+ fprintf(stderr, "pthread_setspecific failed (return = %d)\n", r);
+ return false;
+ }
+ }
+
+ if (jd->proc_id == GCP_UNKNOWN_PROC){
+ jd->status = JS_UNKNOWN_PROC;
+ return false;
+ }
+
+ if (!check_direct_args(jd, &jd->input))
+ return false;
+
+ if (!check_direct_args(jd, &jd->output))
+ return false;
+
+ if (!check_ea_args(jd, &jd->eaa))
+ return false;
+
+ jd->status = JS_OK;
+ jd->sys.client_id = cti->d_client_id;
+
+ // FIXME keep count of jobs in progress?
+
+ gc_jd_queue_enqueue(d_queue, jd);
+ return true;
+}
+
+bool
+gc_job_manager_impl::wait_job(gc_job_desc *jd)
+{
+ bool done;
+ return wait_jobs(1, &jd, &done, GC_WAIT_ANY) == 1 && jd->status == JS_OK;
+}
+
+int
+gc_job_manager_impl::wait_jobs(unsigned int njobs,
+ gc_job_desc *jd[],
+ bool done[],
+ gc_wait_mode mode)
+{
+ unsigned int i;
+
+ gc_client_thread_info *cti =
+ (gc_client_thread_info *) pthread_getspecific(s_client_key);
+ if (unlikely(cti == 0))
+ return -1;
+
+ for (i = 0; i < njobs; i++){
+ done[i] = false;
+ if (unlikely(jd[i]->sys.client_id != cti->d_client_id)){
+ fprintf(stderr, "gc_job_manager_impl::wait_jobs: can't wait for a job you didn't submit\n");
+ return -1;
+ }
+ }
+
+ {
+ omni_mutex_lock l(cti->d_mutex);
+
+ // setup info for event handler
+ cti->d_state = (mode == GC_WAIT_ANY) ? CT_WAIT_ANY : CT_WAIT_ALL;
+ cti->d_njobs_waiting_for = njobs;
+ cti->d_jobs_waiting_for = jd;
+ assert(cti->d_jobs_done != 0);
+
+ unsigned int ndone = 0;
+
+ // wait for jobs to complete
+
+ while (1){
+ ndone = 0;
+ for (i= 0; i < njobs; i++){
+ if (done[i])
+ ndone++;
+ else if (bv_isset(cti->d_jobs_done, jd[i]->sys.job_id)){
+ bv_clr(cti->d_jobs_done, jd[i]->sys.job_id);
+ done[i] = true;
+ ndone++;
+ }
+ }
+
+ if (mode == GC_WAIT_ANY && ndone > 0)
+ break;
+
+ if (mode == GC_WAIT_ALL && ndone == njobs)
+ break;
+
+ // FIXME what happens when somebody calls shutdown?
+
+ cti->d_cond.wait(); // wait for event handler to wake us up
+ }
+
+ cti->d_state = CT_NOT_WAITING;
+ cti->d_njobs_waiting_for = 0; // tidy up (not reqd)
+ cti->d_jobs_waiting_for = 0; // tidy up (not reqd)
+ return ndone;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////
+
+bool
+gc_job_manager_impl::send_all_spes(uint32_t msg)
+{
+ bool ok = true;
+
+ for (unsigned int i = 0; i < d_options.nspes; i++)
+ ok &= send_spe(i, msg);
+
+ return ok;
+}
+
+bool
+gc_job_manager_impl::send_spe(unsigned int spe, uint32_t msg)
+{
+ if (spe >= d_options.nspes)
+ return false;
+
+ int r = spe_in_mbox_write(d_worker[spe].spe_ctx, &msg, 1,
+ SPE_MBOX_ALL_BLOCKING);
+ if (r < 0){
+ perror("spe_in_mbox_write");
+ return false;
+ }
+
+ return r == 1;
+}
+
+////////////////////////////////////////////////////////////////////////
+
+static void
+pthread_create_failure_msg(int r, const char *which)
+{
+ char buf[256];
+ char *s = 0;
+
+ switch (r){
+ case EAGAIN: s = "EAGAIN"; break;
+ case EINVAL: s = "EINVAL"; break;
+ case EPERM: s = "EPERM"; break;
+ default:
+ snprintf(buf, sizeof(buf), "Unknown error %d", r);
+ s = buf;
+ break;
+ }
+ fprintf(stderr, "pthread_create[%s] failed: %s\n", which, s);
+}
+
+
+static bool
+start_thread(pthread_t *thread,
+ void *(*start_routine)(void *), void *arg,
+ const char *msg)
+{
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+
+ // FIXME save sigprocmask
+ // FIXME set sigprocmask
+
+ int r = pthread_create(thread, &attr, start_routine, arg);
+
+ // FIXME restore sigprocmask
+
+ if (r != 0){
+ pthread_create_failure_msg(r, msg);
+ return false;
+ }
+ return true;
+}
+
+
+////////////////////////////////////////////////////////////////////////
+
+static void *start_worker(void *arg);
+
+static void *
+start_event_handler(void *arg)
+{
+ gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
+ p->event_handler_loop();
+ return 0;
+}
+
+void
+gc_job_manager_impl::create_event_handler()
+{
+ // create the SPE event handler and register our interest in events
+
+ d_spe_event_handler.ptr = spe_event_handler_create();
+ if (d_spe_event_handler.ptr == 0){
+ perror("spe_event_handler_create");
+ throw std::runtime_error("spe_event_handler_create");
+ }
+
+ for (unsigned int i = 0; i < d_options.nspes; i++){
+ spe_event_unit_t eu;
+ memset(&eu, 0, sizeof(eu));
+ eu.events = SPE_EVENT_OUT_INTR_MBOX | SPE_EVENT_SPE_STOPPED;
+ eu.spe = d_worker[i].spe_ctx;
+ eu.data.u32 = i; // set in events returned by spe_event_wait
+
+ if (spe_event_handler_register(d_spe_event_handler.ptr, &eu) != 0){
+ perror("spe_event_handler_register");
+ throw std::runtime_error("spe_event_handler_register");
+ }
+ }
+
+ // create our event handling thread
+
+ if (!start_thread(&d_eh_thread, start_event_handler, this, "event_handler")){
+ throw std::runtime_error("pthread_create");
+ }
+
+ // create the SPE worker threads
+
+ bool ok = true;
+ for (unsigned int i = 0; ok && i < d_options.nspes; i++){
+ char name[256];
+ snprintf(name, sizeof(name), "worker[%d]", i);
+ ok &= start_thread(&d_worker[i].thread, start_worker,
+ &d_worker[i], name);
+ }
+
+ if (!ok){
+ //
+ // FIXME Clean up the mess. Need to terminate event handler and all workers.
+ //
+ // this should cause the workers to exit, unless they're seriously broken
+ send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
+
+ shutdown();
+
+ throw std::runtime_error("pthread_create");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////
+
+void
+gc_job_manager_impl::set_eh_state(evt_handler_state s)
+{
+ omni_mutex_lock l(d_eh_mutex);
+ d_eh_state = s;
+ d_eh_cond.broadcast();
+}
+
+void
+gc_job_manager_impl::set_ea_args_maxsize(int maxsize)
+{
+ omni_mutex_lock l(d_eh_mutex);
+ d_ea_args_maxsize = maxsize;
+ d_eh_cond.broadcast();
+}
+
+void
+gc_job_manager_impl::print_event(spe_event_unit_t *evt)
+{
+ printf("evt: spe = %d events = (0x%x)", evt->data.u32, evt->events);
+
+ if (evt->events & SPE_EVENT_OUT_INTR_MBOX)
+ printf(" OUT_INTR_MBOX");
+
+ if (evt->events & SPE_EVENT_IN_MBOX)
+ printf(" IN_MBOX");
+
+ if (evt->events & SPE_EVENT_TAG_GROUP)
+ printf(" TAG_GROUP");
+
+ if (evt->events & SPE_EVENT_SPE_STOPPED)
+ printf(" SPE_STOPPED");
+
+ printf("\n");
+}
+
+struct job_client_info {
+ uint16_t job_id;
+ uint16_t client_id;
+};
+
+static int
+compare_jci_clients(const void *va, const void *vb)
+{
+ const job_client_info *a = (job_client_info *) va;
+ const job_client_info *b = (job_client_info *) vb;
+
+ return a->client_id - b->client_id;
+}
+
+void
+gc_job_manager_impl::notify_clients_jobs_are_done(unsigned int spe_num,
+ unsigned int completion_info_idx)
+{
+ const char *msg = "gc_job_manager_impl::notify_client_job_is_done (INTERNAL ERROR)";
+
+ smp_rmb(); // order reads so we know that data sent from SPE is here
+
+ gc_comp_info_t *ci = &d_comp_info[2 * spe_num + (completion_info_idx & 0x1)];
+
+ if (ci->ncomplete == 0){ // never happens, but ensures code below is correct
+ ci->in_use = 0;
+ return;
+ }
+
+ if (0){
+ static int total_jobs;
+ static int total_msgs;
+ total_msgs++;
+ total_jobs += ci->ncomplete;
+ printf("ppe: tj = %6d tm = %6d\n", total_jobs, total_msgs);
+ }
+
+ job_client_info gci[GC_CI_NJOBS];
+
+ /*
+ * Make one pass through and sanity check everything while filling in gci
+ */
+ for (unsigned int i = 0; i < ci->ncomplete; i++){
+ unsigned int job_id = ci->job_id[i];
+
+ if (job_id >= d_options.max_jobs){
+ // internal error, shouldn't happen
+ fprintf(stderr,"%s: invalid job_id = %d\n", msg, job_id);
+ ci->in_use = 0; // clear flag so SPE knows we're done with it
+ return;
+ }
+ gc_job_desc *jd = &d_jd[job_id];
+
+ if (jd->sys.client_id >= d_options.max_client_threads){
+ // internal error, shouldn't happen
+ fprintf(stderr, "%s: invalid client_id = %d\n", msg, jd->sys.client_id);
+ ci->in_use = 0; // clear flag so SPE knows we're done with it
+ return;
+ }
+
+ gci[i].job_id = job_id;
+ gci[i].client_id = jd->sys.client_id;
+ }
+
+ // sort by client_id so we only have to lock & signal once / client
+
+ if (ci->ncomplete > 1)
+ qsort(gci, ci->ncomplete, sizeof(gci[0]), compare_jci_clients);
+
+ // "wind-in"
+
+ gc_client_thread_info *last_cti = &d_client_thread[gci[0].client_id];
+ last_cti->d_mutex.lock();
+ bv_set(last_cti->d_jobs_done, gci[0].job_id); // mark job done
+
+ for (unsigned int i = 1; i < ci->ncomplete; i++){
+
+ gc_client_thread_info *cti = &d_client_thread[gci[i].client_id];
+
+ if (cti != last_cti){ // new client?
+
+ // yes. signal old client, unlock old, lock new
+
+ // FIXME we could distinguish between CT_WAIT_ALL & CT_WAIT_ANY
+
+ if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
+ last_cti->d_cond.signal(); // wake client thread up
+
+ last_cti->d_mutex.unlock();
+ cti->d_mutex.lock();
+ last_cti = cti;
+ }
+
+ // mark job done
+ bv_set(cti->d_jobs_done, gci[i].job_id);
+ }
+
+ // "wind-out"
+
+ if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
+ last_cti->d_cond.signal(); // wake client thread up
+ last_cti->d_mutex.unlock();
+
+ ci->in_use = 0; // clear flag so SPE knows we're done with it
+}
+
+void
+gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
+{
+ // print_event(evt);
+
+ int spe_num = evt->data.u32;
+
+ // only a single event type can be signaled at a time
+
+ if (evt->events == SPE_EVENT_OUT_INTR_MBOX) { // SPE sent us 1 or more msgs
+ static const int NMSGS = 32;
+ unsigned int msg[NMSGS];
+ int n = spe_out_intr_mbox_read(evt->spe, msg, NMSGS, SPE_MBOX_ANY_BLOCKING);
+ // printf("spe_out_intr_mbox_read = %d\n", n);
+ if (n < 0){
+ perror("spe_out_intr_mbox_read");
+ }
+ else {
+ for (int i = 0; i < n; i++){
+ switch(MBOX_MSG_OP(msg[i])){
+ case OP_JOBS_DONE:
+ if (debug())
+ printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
+ notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg[i]));
+ break;
+
+ case OP_SPU_BUFSIZE:
+ set_ea_args_maxsize(MBOX_MSG_ARG(msg[i]));
+ break;
+
+ case OP_EXIT:
+ default:
+ printf("eh: Unexpected msg (0x%08x) from spu[%d]\n", msg[i], spe_num);
+ break;
+ }
+ }
+ }
+ }
+ else if (evt->events == SPE_EVENT_SPE_STOPPED){ // the SPE stopped
+ spe_stop_info_t si;
+ int r = spe_stop_info_read(evt->spe, &si);
+ if (r < 0){
+ perror("spe_stop_info_read");
+ }
+ else {
+ switch (si.stop_reason){
+ case SPE_EXIT:
+ if (debug()){
+ printf("eh: spu[%d] SPE_EXIT w/ exit_code = %d\n",
+ spe_num, si.result.spe_exit_code);
+ }
+ break;
+ case SPE_STOP_AND_SIGNAL:
+ printf("eh: spu[%d] SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
+ spe_num, si.result.spe_signal_code);
+ break;
+ case SPE_RUNTIME_ERROR:
+ printf("eh: spu[%d] SPE_RUNTIME_ERROR w/ spe_runtime_error = 0x%x\n",
+ spe_num, si.result.spe_runtime_error);
+ break;
+ case SPE_RUNTIME_EXCEPTION:
+ printf("eh: spu[%d] SPE_RUNTIME_EXCEPTION w/ spe_runtime_exception = 0x%x\n",
+ spe_num, si.result.spe_runtime_exception);
+ break;
+ case SPE_RUNTIME_FATAL:
+ printf("eh: spu[%d] SPE_RUNTIME_FATAL w/ spe_runtime_fatal = 0x%x\n",
+ spe_num, si.result.spe_runtime_fatal);
+ break;
+ case SPE_CALLBACK_ERROR:
+ printf("eh: spu[%d] SPE_CALLBACK_ERROR w/ spe_callback_error = 0x%x\n",
+ spe_num, si.result.spe_callback_error);
+ break;
+ case SPE_ISOLATION_ERROR:
+ printf("eh: spu[%d] SPE_ISOLATION_ERROR w/ spe_isolation_error = 0x%x\n",
+ spe_num, si.result.spe_isolation_error);
+ break;
+ default:
+ printf("eh: spu[%d] UNKNOWN STOP REASON (%d) w/ spu_status = 0x%x\n",
+ spe_num, si.stop_reason, si.spu_status);
+ break;
+ }
+ }
+ }
+#if 0 // not enabled
+ else if (evt->events == SPE_EVENT_IN_MBOX){ // there's room to write to SPE
+ // spe_in_mbox_write (ignore)
+ }
+ else if (evt->events == SPE_EVENT_TAG_GROUP){ // our DMA completed
+ // spe_mfcio_tag_status_read
+ }
+#endif
+ else {
+ fprintf(stderr, "handle_event: unexpected evt->events = 0x%x\n", evt->events);
+ return;
+ }
+}
+
+//
+// This is the "main program" of the event handling thread
+//
+void
+gc_job_manager_impl::event_handler_loop()
+{
+ static const int MAX_EVENTS = 16;
+ static const int TIMEOUT = 20; // how long to block in milliseconds
+
+ spe_event_unit_t events[MAX_EVENTS];
+
+ if (d_debug)
+ printf("event_handler_loop: starting\n");
+
+ set_eh_state(EHS_RUNNING);
+
+ // ask the first spe for its max bufsize
+ send_spe(0, MK_MBOX_MSG(OP_GET_SPU_BUFSIZE, 0));
+
+ while (1){
+ switch(d_eh_state){
+
+ case EHS_RUNNING: // normal stuff
+ if (d_shutdown_requested) {
+ set_eh_state(EHS_SHUTTING_DOWN);
+ }
+ break;
+
+ case EHS_SHUTTING_DOWN:
+
+ // FIXME wait until job queue is empty, then tell them to exit
+
+ send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
+ set_eh_state(EHS_WAITING_FOR_WORKERS_TO_DIE);
+ break;
+
+ case EHS_WAITING_FOR_WORKERS_TO_DIE:
+ {
+ bool all_dead = true;
+ for (unsigned int i = 0; i < d_options.nspes; i++)
+ all_dead &= d_worker[i].state == WS_DEAD;
+
+ if (all_dead){
+ set_eh_state(EHS_DEAD);
+ if (d_debug)
+ printf("event_handler_loop: exiting\n");
+ return;
+ }
+ }
+ break;
+
+ default:
+ set_eh_state(EHS_DEAD);
+ printf("event_handler_loop(default): exiting\n");
+ return;
+ }
+
+ // block waiting for events...
+ int nevents = spe_event_wait(d_spe_event_handler.ptr,
+ events, MAX_EVENTS, TIMEOUT);
+ if (nevents < 0){
+ perror("spe_wait_event");
+ // FIXME bail?
+ }
+ for (int i = 0; i < nevents; i++){
+ handle_event(&events[i]);
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////
+// This is the top of the SPE worker threads
+
+static void *
+start_worker(void *arg)
+{
+ worker_ctx *w = (worker_ctx *) arg;
+ spe_stop_info_t si;
+
+ w->state = WS_RUNNING;
+ if (s_worker_debug)
+ printf("worker[%d]: WS_RUNNING\n", w->spe_idx);
+
+ unsigned int entry = SPE_DEFAULT_ENTRY;
+ int r = spe_context_run(w->spe_ctx, &entry, 0, w->spu_args, 0, &si);
+
+ if (r < 0){ // error
+ char buf[64];
+ snprintf(buf, sizeof(buf), "worker[%d]: spe_context_run", w->spe_idx);
+ perror(buf);
+ }
+ else if (r == 0){
+ // spe program called exit.
+ if (s_worker_debug)
+ printf("worker[%d]: SPE_EXIT w/ exit_code = %d\n",
+ w->spe_idx, si.result.spe_exit_code);
+ }
+ else {
+ // called stop_and_signal
+ //
+ // I'm not sure we'll ever get here. I think the event
+ // handler will catch this...
+ printf("worker[%d]: SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
+ w->spe_idx, si.result.spe_signal_code);
+ }
+
+ // in any event, we're committing suicide now ;)
+ if (s_worker_debug)
+ printf("worker[%d]: WS_DEAD\n", w->spe_idx);
+
+ w->state = WS_DEAD;
+ return 0;
+}
+
+////////////////////////////////////////////////////////////////////////
+
+gc_client_thread_info *
+gc_job_manager_impl::alloc_cti()
+{
+ for (unsigned int i = 0; i < d_options.max_client_threads; i++){
+ if (d_client_thread[i].d_free){
+ // try to atomically grab it
+ if (_atomic_dec_if_positive(ptr_to_ea(&d_client_thread[i].d_free)) == 0){
+ // got it...
+ gc_client_thread_info *cti = &d_client_thread[i];
+ cti->d_state = CT_NOT_WAITING;
+ bv_zero(cti->d_jobs_done);
+ cti->d_njobs_waiting_for = 0;
+ cti->d_jobs_waiting_for = 0;
+
+ return cti;
+ }
+ }
+ }
+ return 0;
+}
+
+void
+gc_job_manager_impl::free_cti(gc_client_thread_info *cti)
+{
+ assert((size_t) (cti - d_client_thread.get()) < d_options.max_client_threads);
+ cti->d_free = 1;
+}
+
+int
+gc_job_manager_impl::ea_args_maxsize()
+{
+ omni_mutex_lock l(d_eh_mutex);
+
+ while (d_ea_args_maxsize == 0) // wait for it to be initialized
+ d_eh_cond.wait();
+
+ return d_ea_args_maxsize;
+}
+
+void
+gc_job_manager_impl::set_debug(int debug)
+{
+ d_debug = debug;
+ s_worker_debug = debug;
+}
+
+int
+gc_job_manager_impl::debug()
+{
+ return d_debug;
+}
+
+////////////////////////////////////////////////////////////////////////
+
+void
+gc_job_manager_impl::setup_logfiles()
+{
+ if (!d_options.enable_logging)
+ return;
+
+ if (d_options.log2_nlog_entries == 0)
+ d_options.log2_nlog_entries = 12;
+
+ // must end up a multiple of the page size
+
+ size_t pagesize = getpagesize();
+ size_t s = (1 << d_options.log2_nlog_entries) * sizeof(gc_log_entry_t);
+ s = ((s + pagesize - 1) / pagesize) * pagesize;
+ size_t nentries = s / sizeof(gc_log_entry_t);
+ assert(is_power_of_2(nentries));
+
+ for (unsigned int i = 0; i < d_options.nspes; i++){
+ char filename[100];
+ snprintf(filename, sizeof(filename), "spu_log.%02d", i);
+ int fd = open(filename, O_CREAT|O_TRUNC|O_RDWR, 0664);
+ if (fd == -1){
+ perror(filename);
+ return;
+ }
+ lseek(fd, s - 1, SEEK_SET);
+ write(fd, "\0", 1);
+ void *p = mmap(0, s, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
+ if (p == MAP_FAILED){
+ perror("gc_job_manager_impl::setup_logfiles: mmap");
+ close(fd);
+ return;
+ }
+ close(fd);
+ memset(p, 0, s);
+ d_spu_args[i].log.base = ptr_to_ea(p);
+ d_spu_args[i].log.nentries = nentries;
+ }
+}
+
+void
+gc_job_manager_impl::sync_logfiles()
+{
+ for (unsigned int i = 0; i < d_options.nspes; i++){
+ if (d_spu_args[i].log.base)
+ msync(ea_to_ptr(d_spu_args[i].log.base),
+ d_spu_args[i].log.nentries * sizeof(gc_log_entry_t),
+ MS_ASYNC);
+ }
+}
+
+void
+gc_job_manager_impl::unmap_logfiles()
+{
+ for (unsigned int i = 0; i < d_options.nspes; i++){
+ if (d_spu_args[i].log.base)
+ munmap(ea_to_ptr(d_spu_args[i].log.base),
+ d_spu_args[i].log.nentries * sizeof(gc_log_entry_t));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////
+//
+// lookup proc names in d_proc_def table
+
+gc_proc_id_t
+gc_job_manager_impl::lookup_proc(const std::string &proc_name)
+{
+ for (int i = 0; i < d_nproc_defs; i++)
+ if (proc_name == d_proc_def[i].name)
+ return i;
+
+ throw gc_unknown_proc(proc_name);
+}
+
+std::vector<std::string>
+gc_job_manager_impl::proc_names()
+{
+ std::vector<std::string> r;
+ for (int i = 0; i < d_nproc_defs; i++)
+ r.push_back(d_proc_def[i].name);
+
+ return r;
+}
+
+////////////////////////////////////////////////////////////////////////
+
+worker_ctx::~worker_ctx()
+{
+ if (spe_ctx){
+ int r = spe_context_destroy(spe_ctx);
+ if (r != 0){
+ perror("spe_context_destroy");
+ }
+ spe_ctx = 0;
+ }
+ state = WS_FREE;
+}