/* -*- c++ -*- */
/*
 * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
 * 
 * This file is part of GNU Radio
 * 
 * GNU Radio is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3, or (at your option)
 * any later version.
 * 
 * GNU Radio is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include "gc_job_manager_impl.h"
#include <gcell/gc_mbox.h>
#include <gcell/gc_aligned_alloc.h>
#include <gcell/memory_barrier.h>
#include <gc_proc_def_utils.h>
#include <atomic_dec_if_positive.h>
#include <stdio.h>
#include <stdexcept>
#include <stdlib.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <sched.h>

typedef boost::unique_lock<boost::mutex> scoped_lock;

#define __nop() __asm__ volatile ("ori 0,0,0" : : : "memory")
#define __cctpl() __asm__ volatile ("or 1,1,1" : : : "memory")
#define __cctpm() __asm__ volatile ("or 2,2,2" : : : "memory")
#define __cctph() __asm__ volatile ("or 3,3,3" : : : "memory")
#define __db8cyc() __asm__ volatile ("or 28,28,28" : : : "memory")
#define __db10cyc() __asm__ volatile ("or 29,29,29" : : : "memory")
#define __db12cyc() __asm__ volatile ("or 30,30,30" : : : "memory")
#define __db16cyc() __asm__ volatile ("or 31,31,31" : : : "memory")


#if 1
#define CCTPL() __cctpl()
#define CCTPM() __cctpm()
#else
#define CCTPL() (void) 0
#define CCTPM() (void) 0
#endif

static const size_t CACHE_LINE_SIZE = 128;

static const unsigned int DEFAULT_MAX_JOBS = 128;
static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;

// FIXME this really depends on the SPU code...
static const unsigned int MAX_TOTAL_INDIRECT_LENGTH = 16 * 1024;


static bool          s_key_initialized = false;
static pthread_key_t s_client_key;

static int s_worker_debug = 0;

// custom deleter of gang_contexts for use with boost::shared_ptr
class gang_deleter {
public:
  void operator()(spe_gang_context_ptr_t ctx) {
    if (ctx){
      int r = spe_gang_context_destroy(ctx);
      if (r != 0){
	perror("spe_gang_context_destroy");
      }
    }
  }
};


// custom deleter of anything that can be freed with "free"
class free_deleter {
public:
  void operator()(void *p) {
    free(p);
  }
};


/*
 * Called when client thread is destroyed.
 * We mark our client info free.
 */
static void
client_key_destructor(void *p)
{
  ((gc_client_thread_info *) p)->d_free = 1;
}

static bool
is_power_of_2(uint32_t x)
{
  return (x != 0) && !(x & (x - 1));
}

////////////////////////////////////////////////////////////////////////


gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
  : d_debug(0), d_spu_args(0),
    d_eh_cond(), d_eh_thread(0), d_eh_state(EHS_INIT),
    d_shutdown_requested(false),
    d_jc_cond(), d_jc_thread(0), d_jc_state(JCS_INIT), d_jc_njobs_active(0),
    d_ntell(0), d_tell_start(0),
    d_client_thread(0), d_ea_args_maxsize(0),
    d_proc_def(0), d_proc_def_ls_addr(0), d_nproc_defs(0)
{
  if (!s_key_initialized){
    int r = pthread_key_create(&s_client_key, client_key_destructor);
    if (r != 0)
      throw std::runtime_error("pthread_key_create");
    s_key_initialized = true;
  }

  // ensure it's zero
  pthread_setspecific(s_client_key, 0);

  if (options != 0)
    d_options = *options;

  // provide the real default for those indicated with a zero
  if (d_options.max_jobs == 0)
    d_options.max_jobs = DEFAULT_MAX_JOBS;
  if (d_options.max_client_threads == 0)
    d_options.max_client_threads = DEFAULT_MAX_CLIENT_THREADS;

  if (!d_options.program_handle){
    fprintf(stderr, "gc_job_manager: options->program_handle must be non-zero\n");
    throw std::runtime_error("gc_job_manager: options->program_handle must be non-zero");
  }

  int ncpu_nodes = spe_cpu_info_get(SPE_COUNT_PHYSICAL_CPU_NODES, -1);
  int nusable_spes = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);

  if (debug()){
    printf("cpu_nodes = %d\n", ncpu_nodes);
    for (int i = 0; i < ncpu_nodes; i++){
      printf("node[%d].physical_spes = %2d\n", i,
	     spe_cpu_info_get(SPE_COUNT_PHYSICAL_SPES, i));
      printf("node[%d].usable_spes   = %2d\n", i,
	     spe_cpu_info_get(SPE_COUNT_USABLE_SPES, i));
    }
  }

  // clamp nspes
  d_options.nspes = std::min(d_options.nspes, (unsigned int) MAX_SPES);
  nusable_spes = std::min(nusable_spes, (int) MAX_SPES);

  //
  // sanity check requested number of spes.
  //
  if (d_options.nspes == 0)	// use all of them
    d_options.nspes = nusable_spes;
  else {
    if (d_options.nspes > (unsigned int) nusable_spes){
      fprintf(stderr,
	      "gc_job_manager: warning: caller requested %d spes.  There are only %d available.\n",
	      d_options.nspes, nusable_spes);
      if (d_options.gang_schedule){
	// If we're gang scheduling we'll never get scheduled if we
	// ask for more than are available.
	throw std::out_of_range("gang_scheduling: not enough spes available");
      }
      else {	// FIXME clamp to usable.  problem on PS3 when overcommited
	fprintf(stderr, "gc_job_manager: clamping nspes to %d\n", nusable_spes);
	d_options.nspes = nusable_spes;
      }
    }
  }

  if (d_options.use_affinity){
    printf("gc_job_manager: warning: affinity request was ignored\n");
  }

  if (d_options.gang_schedule){
    d_gang = spe_gang_context_sptr(spe_gang_context_create(0), gang_deleter());
    if (!d_gang){
      perror("gc_job_manager_impl[spe_gang_context_create]");
      throw std::runtime_error("spe_gang_context_create");
    }
  }

  d_ntell = std::min(d_options.nspes, 2U);

  // ----------------------------------------------------------------
  // initalize the job queue
  
  d_queue = (gc_jd_queue_t *) gc_aligned_alloc(sizeof(gc_jd_queue_t), CACHE_LINE_SIZE);
  _d_queue_boost =
    boost::shared_ptr<void>((void *) d_queue, free_deleter());
  gc_jd_queue_init(d_queue);


  // ----------------------------------------------------------------
  // create the spe contexts

  // 1 spu_arg struct for each SPE
  assert(sizeof(gc_spu_args_t) % 16 == 0);
  d_spu_args =
    (gc_spu_args_t *) gc_aligned_alloc(MAX_SPES * sizeof(gc_spu_args_t), 16);
  _d_spu_args_boost =
    boost::shared_ptr<void>((void *) d_spu_args, free_deleter());

  // 2 completion info structs for each SPE (we double buffer them)
  assert(sizeof(gc_comp_info_t) % CACHE_LINE_SIZE == 0);
  d_comp_info =
    (gc_comp_info_t *) gc_aligned_alloc(2 * MAX_SPES * sizeof(gc_comp_info_t),
					CACHE_LINE_SIZE);
  _d_comp_info_boost =
    boost::shared_ptr<void>((void *) d_comp_info, free_deleter());


  // get a handle to the spe program

  spe_program_handle_t *spe_image = d_options.program_handle.get();

  // fish proc_def table out of SPE ELF file

  if (!gcpd_find_table(spe_image, &d_proc_def, &d_nproc_defs, &d_proc_def_ls_addr)){
    fprintf(stderr, "gc_job_manager_impl: couldn't find gc_proc_defs in SPE ELF file.\n");
    throw std::runtime_error("no gc_proc_defs");
  }
  // fprintf(stderr, "d_proc_def_ls_addr = 0x%0x\n", d_proc_def_ls_addr);

  int spe_flags = (SPE_EVENTS_ENABLE
		   | SPE_MAP_PS
		   | SPE_CFG_SIGNOTIFY1_OR
		   | SPE_CFG_SIGNOTIFY2_OR);
  
  for (unsigned int i = 0; i < d_options.nspes; i++){
    // FIXME affinity stuff goes here
    d_worker[i].spe_ctx = spe_context_create(spe_flags, d_gang.get());;
    if (d_worker[i].spe_ctx == 0){
      perror("spe_context_create");
      throw std::runtime_error("spe_context_create");
    }

    d_worker[i].spe_ctrl = 
      (spe_spu_control_area_t *)spe_ps_area_get(d_worker[i].spe_ctx, SPE_CONTROL_AREA);
    if (d_worker[i].spe_ctrl == 0){
      perror("spe_ps_area_get(SPE_CONTROL_AREA)");
      throw std::runtime_error("spe_ps_area_get(SPE_CONTROL_AREA)");
    }

    d_worker[i].spe_idx = i;
    d_worker[i].spu_args = &d_spu_args[i];
    d_worker[i].spu_args->queue = ptr_to_ea(d_queue);
    d_worker[i].spu_args->comp_info[0] = ptr_to_ea(&d_comp_info[2*i+0]);
    d_worker[i].spu_args->comp_info[1] = ptr_to_ea(&d_comp_info[2*i+1]);
    d_worker[i].spu_args->spu_idx = i;
    d_worker[i].spu_args->nspus = d_options.nspes;
    d_worker[i].spu_args->proc_def_ls_addr = d_proc_def_ls_addr;
    d_worker[i].spu_args->nproc_defs = d_nproc_defs;
    d_worker[i].spu_args->log.base = 0;
    d_worker[i].spu_args->log.nentries = 0;
    d_worker[i].state = WS_INIT;

    int r = spe_program_load(d_worker[i].spe_ctx, spe_image);
    if (r != 0){
      perror("spe_program_load");
      throw std::runtime_error("spe_program_load");
    }
  }

  setup_logfiles();

  // ----------------------------------------------------------------
  // initalize the free list of job descriptors
  
  d_free_list = (gc_jd_stack_t *) gc_aligned_alloc(sizeof(gc_jd_stack_t), CACHE_LINE_SIZE);
  // This ensures that the memory associated with d_free_list is
  // automatically freed in the destructor or if an exception occurs
  // here in the constructor.
  _d_free_list_boost =
    boost::shared_ptr<void>((void *) d_free_list, free_deleter());
  gc_jd_stack_init(d_free_list);

  if (debug()){
    printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
    printf("max_jobs = %u\n", d_options.max_jobs);
  }

  // Initialize the array of job descriptors.
  d_jd = (gc_job_desc_t *) gc_aligned_alloc(sizeof(d_jd[0]) * d_options.max_jobs, CACHE_LINE_SIZE);
  _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());


  // set unique job_id
  for (int i = 0; i < (int) d_options.max_jobs; i++)
    d_jd[i].sys.job_id = i;

  // push them onto the free list
  for (int i = d_options.max_jobs - 1; i >= 0; i--)
    free_job_desc(&d_jd[i]);

  // ----------------------------------------------------------------
  // initialize d_client_thread

  {
    gc_client_thread_info_sa cti(
         new gc_client_thread_info[d_options.max_client_threads]);

    d_client_thread.swap(cti);

    for (unsigned int i = 0; i < d_options.max_client_threads; i++)
      d_client_thread[i].d_client_id = i;
  }

  // ----------------------------------------------------------------
  // initialize bitvectors

  // initialize d_bvlen, the number of longs in job related bitvectors.
  int bits_per_long = sizeof(unsigned long) * 8;
  d_bvlen = (d_options.max_jobs + bits_per_long - 1) / bits_per_long;

  // allocate all bitvectors in a single cache-aligned chunk
  size_t nlongs = d_bvlen * d_options.max_client_threads;
  void *p = gc_aligned_alloc(nlongs * sizeof(unsigned long), CACHE_LINE_SIZE);
  _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());

  // Now point the gc_client_thread_info bitvectors into this storage
  unsigned long *v = (unsigned long *) p;

  for (unsigned int i = 0; i < d_options.max_client_threads; i++, v += d_bvlen)
    d_client_thread[i].d_jobs_done = v;


  // ----------------------------------------------------------------
  // create the spe event handler & worker (SPE) threads

  create_event_handler();
}

////////////////////////////////////////////////////////////////////////

gc_job_manager_impl::~gc_job_manager_impl()
{
  shutdown();

  d_jd = 0;		// handled via _d_jd_boost
  d_free_list = 0;	// handled via _d_free_list_boost
  d_queue = 0;		// handled via _d_queue_boost

  // clear cti, since we've deleted the underlying data
  pthread_setspecific(s_client_key, 0);

  unmap_logfiles();
}

bool
gc_job_manager_impl::shutdown()
{
  scoped_lock 	l(d_eh_mutex);

  {
    scoped_lock 	l2(d_jc_mutex);
    d_shutdown_requested = true;	// set flag for event handler thread
    d_jc_cond.notify_one();			// wake up job completer
  }

  // should only happens during early QA code
  if (d_eh_thread == 0 && d_eh_state == EHS_INIT)
    return false;

  while (d_eh_state != EHS_DEAD)	// wait for it to finish
    d_eh_cond.wait(l);

  return true;
}

int
gc_job_manager_impl::nspes() const
{
  return d_options.nspes;
}

////////////////////////////////////////////////////////////////////////

void
gc_job_manager_impl::bv_zero(unsigned long *bv)
{
  memset(bv, 0, sizeof(unsigned long) * d_bvlen);
}

inline void
gc_job_manager_impl::bv_clr(unsigned long *bv, unsigned int bitno)
{
  unsigned int wi = bitno / (sizeof (unsigned long) * 8);
  unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
  bv[wi] &= ~(1UL << bi);
}

inline void
gc_job_manager_impl::bv_set(unsigned long *bv, unsigned int bitno)
{
  unsigned int wi = bitno / (sizeof (unsigned long) * 8);
  unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
  bv[wi] |= (1UL << bi);
}

inline bool
gc_job_manager_impl::bv_isset(unsigned long *bv, unsigned int bitno)
{
  unsigned int wi = bitno / (sizeof (unsigned long) * 8);
  unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
  return (bv[wi] & (1UL << bi)) != 0;
}

inline bool
gc_job_manager_impl::bv_isclr(unsigned long *bv, unsigned int bitno)
{
  unsigned int wi = bitno / (sizeof (unsigned long) * 8);
  unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
  return (bv[wi] & (1UL << bi)) == 0;
}

////////////////////////////////////////////////////////////////////////

gc_job_desc *
gc_job_manager_impl::alloc_job_desc()
{
  // stack is lock free, and safe to call from any thread
  gc_job_desc *jd = gc_jd_stack_pop(d_free_list);
  if (jd == 0)
    throw gc_bad_alloc("alloc_job_desc: none available");

  return jd;
}

void
gc_job_manager_impl::free_job_desc(gc_job_desc *jd)
{
  // stack is lock free, thus safe to call from any thread
  if (jd != 0)
    gc_jd_stack_push(d_free_list, jd);
}

////////////////////////////////////////////////////////////////////////


inline bool
gc_job_manager_impl::incr_njobs_active()
{
  scoped_lock	l(d_jc_mutex);

  if (d_shutdown_requested)
    return false;

  if (d_jc_njobs_active++ == 0)	// signal on 0 to 1 transition
    d_jc_cond.notify_one();

  return true;
}

inline void
gc_job_manager_impl::decr_njobs_active(int n)
{
  scoped_lock	l(d_jc_mutex);
  d_jc_njobs_active -= n;
}


/*
 * We check as much as we can here on the PPE side, so that the SPE
 * doesn't have to.
 */
static bool
check_direct_args(gc_job_desc *jd, gc_job_direct_args *args)
{
  if (args->nargs > MAX_ARGS_DIRECT){
    jd->status = JS_BAD_N_DIRECT;
    return false;
  }

  return true;
}

static bool
check_ea_args(gc_job_desc *jd, gc_job_ea_args *p)
{
  if (p->nargs > MAX_ARGS_EA){
    jd->status = JS_BAD_N_EA;
    return false;
  }

  uint32_t dir_union = 0;

  for (unsigned int i = 0; i < p->nargs; i++){
    dir_union |= p->arg[i].direction;
    switch(p->arg[i].direction){
    case GCJD_DMA_GET:
    case GCJD_DMA_PUT:
      break;

    default:
      jd->status = JS_BAD_DIRECTION;
      return false;
    }
  }

  if (p->nargs > 1){
    unsigned int common_eah = (p->arg[0].ea_addr) >> 32;
    for (unsigned int i = 1; i < p->nargs; i++){
      if ((p->arg[i].ea_addr >> 32) != common_eah){
	jd->status = JS_BAD_EAH;
	return false;
      }
    }
  }

  jd->sys.direction_union = dir_union;
  return true;
}

bool
gc_job_manager_impl::submit_job(gc_job_desc *jd)
{
  // Ensure it's one of our job descriptors

  if (jd < d_jd || jd >= &d_jd[d_options.max_jobs]){
    jd->status = JS_BAD_JOB_DESC;
    return false;
  }

  // Ensure we've got a client_thread_info assigned to this thread.
  
  gc_client_thread_info *cti =
    (gc_client_thread_info *) pthread_getspecific(s_client_key);
  if (unlikely(cti == 0)){
    if ((cti = alloc_cti()) == 0){
      fprintf(stderr, "gc_job_manager_impl::submit_job: Too many client threads.\n");
      jd->status = JS_TOO_MANY_CLIENTS;
      return false;
    }
    int r = pthread_setspecific(s_client_key, cti);
    if (r != 0){
      jd->status = JS_BAD_JUJU;
      fprintf(stderr, "pthread_setspecific failed (return = %d)\n", r);
      return false;
    }
  }

  if (jd->proc_id == GCP_UNKNOWN_PROC){
    jd->status = JS_UNKNOWN_PROC;
    return false;
  }

  if (!check_direct_args(jd, &jd->input))
    return false;

  if (!check_direct_args(jd, &jd->output))
    return false;

  if (!check_ea_args(jd, &jd->eaa))
    return false;

  jd->status = JS_OK;
  jd->sys.client_id = cti->d_client_id;

  if (!incr_njobs_active()){
    jd->status = JS_SHUTTING_DOWN;
    return false;
  }
  
  gc_jd_queue_enqueue(d_queue, jd);
  // tell_spes_to_check_queue();
  return true;
}

bool
gc_job_manager_impl::wait_job(gc_job_desc *jd)
{
  bool done;
  return wait_jobs(1, &jd, &done, GC_WAIT_ANY) == 1 && jd->status == JS_OK;
}

int
gc_job_manager_impl::wait_jobs(unsigned int njobs,
			       gc_job_desc *jd[],
			       bool done[],
			       gc_wait_mode mode)
{
  unsigned int i;

  gc_client_thread_info *cti =
    (gc_client_thread_info *) pthread_getspecific(s_client_key);
  if (unlikely(cti == 0))
    return -1;

  for (i = 0; i < njobs; i++){
    done[i] = false;
    if (unlikely(jd[i]->sys.client_id != cti->d_client_id)){
      fprintf(stderr, "gc_job_manager_impl::wait_jobs: can't wait for a job you didn't submit\n");
      return -1;
    }
  }

  {
    scoped_lock	l(cti->d_mutex);

    // setup info for event handler
    cti->d_state = (mode == GC_WAIT_ANY) ? CT_WAIT_ANY : CT_WAIT_ALL;
    cti->d_njobs_waiting_for = njobs;
    cti->d_jobs_waiting_for = jd;
    assert(cti->d_jobs_done != 0);

    unsigned int ndone = 0;

    // wait for jobs to complete
    
    while (1){
      ndone = 0;
      for (i= 0; i < njobs; i++){
	if (done[i])
	  ndone++;
	else if (bv_isset(cti->d_jobs_done, jd[i]->sys.job_id)){
	  bv_clr(cti->d_jobs_done, jd[i]->sys.job_id);
	  done[i] = true;
	  ndone++;
	}
      }

      if (mode == GC_WAIT_ANY && ndone > 0)
	break;

      if (mode == GC_WAIT_ALL && ndone == njobs)
	break;

      // FIXME what happens when somebody calls shutdown?

      cti->d_cond.wait(l);	// wait for event handler to wake us up
    }

    cti->d_state = CT_NOT_WAITING;  
    cti->d_njobs_waiting_for = 0;	// tidy up (not reqd)
    cti->d_jobs_waiting_for = 0;	// tidy up (not reqd)
    return ndone;
  }
}

////////////////////////////////////////////////////////////////////////

bool
gc_job_manager_impl::send_all_spes(uint32_t msg)
{
  bool ok = true;

  for (unsigned int i = 0; i < d_options.nspes; i++)
    ok &= send_spe(i, msg);

  return ok;
}

bool
gc_job_manager_impl::send_spe(unsigned int spe, uint32_t msg)
{
  if (spe >= d_options.nspes)
    return false;

  int r = spe_in_mbox_write(d_worker[spe].spe_ctx, &msg, 1,
			    SPE_MBOX_ALL_BLOCKING);
  if (r < 0){
    perror("spe_in_mbox_write");
    return false;
  }

  return r == 1;
}

void 
gc_job_manager_impl::tell_spes_to_check_queue()
{
  int nspes = d_options.nspes;

  for (int i = 0, ntold = 0; ntold < d_ntell && i < nspes ; ++i){
    volatile spe_spu_control_area_t *spe_ctrl = d_worker[d_tell_start].spe_ctrl;
    int nfree = (spe_ctrl->SPU_Mbox_Stat >> 8) & 0xFF;
    if (nfree == 4){
      spe_ctrl->SPU_In_Mbox = MK_MBOX_MSG(OP_CHECK_QUEUE, 0);
      ntold++;
    }

    unsigned int t = d_tell_start + 1;
    if (t >= d_options.nspes)
      t = 0;
    d_tell_start = t;
  }
}


////////////////////////////////////////////////////////////////////////

static void
pthread_create_failure_msg(int r, const char *which)
{
  char buf[256];
  const char *s = 0;

  switch (r){
  case EAGAIN: s = "EAGAIN"; break;
  case EINVAL: s = "EINVAL"; break;
  case EPERM:  s = "EPERM";  break;
  default:
    snprintf(buf, sizeof(buf), "Unknown error %d", r);
    s = buf;
    break;
  }
  fprintf(stderr, "pthread_create[%s] failed: %s\n", which, s);
}


static bool
start_thread(pthread_t *thread,
	     void *(*start_routine)(void *),  void *arg,
	     const char *msg)
{
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

  // FIXME save sigprocmask
  // FIXME set sigprocmask

  int r = pthread_create(thread, &attr, start_routine, arg);
    
  // FIXME restore sigprocmask

  if (r != 0){
    pthread_create_failure_msg(r, msg);
    return false;
  }
  return true;
}


////////////////////////////////////////////////////////////////////////

static void *start_worker(void *arg);

static void *
start_event_handler(void *arg)
{
  gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
  p->event_handler_loop();
  return 0;
}

static void *
start_job_completer(void *arg)
{
  gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
  p->job_completer_loop();
  return 0;
}

void
gc_job_manager_impl::create_event_handler()
{
  // create the SPE event handler and register our interest in events

  d_spe_event_handler.ptr = spe_event_handler_create();
  if (d_spe_event_handler.ptr == 0){
    perror("spe_event_handler_create");
    throw std::runtime_error("spe_event_handler_create");
  }

  for (unsigned int i = 0; i < d_options.nspes; i++){
    spe_event_unit_t	eu;
    memset(&eu, 0, sizeof(eu));
    eu.events = SPE_EVENT_OUT_INTR_MBOX | SPE_EVENT_SPE_STOPPED;
    eu.spe = d_worker[i].spe_ctx;
    eu.data.u32 = i;	// set in events returned by spe_event_wait

    if (spe_event_handler_register(d_spe_event_handler.ptr, &eu) != 0){
      perror("spe_event_handler_register");
      throw std::runtime_error("spe_event_handler_register");
    }
  }

  // create the event handling thread

  if (!start_thread(&d_eh_thread, start_event_handler, this, "event_handler")){
    throw std::runtime_error("pthread_create");
  }

  // create the job completion thread

  if (!start_thread(&d_jc_thread, start_job_completer, this, "job_completer")){
    throw std::runtime_error("pthread_create");
  }

  // create the SPE worker threads

  bool ok = true;
  for (unsigned int i = 0; ok && i < d_options.nspes; i++){
    char name[256];
    snprintf(name, sizeof(name), "worker[%d]", i);
    ok &= start_thread(&d_worker[i].thread, start_worker,
		       &d_worker[i], name);
  }

  if (!ok){
    //
    // FIXME Clean up the mess.  Need to terminate event handler and all workers.
    //
    // this should cause the workers to exit, unless they're seriously broken
    send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));

    shutdown();

    throw std::runtime_error("pthread_create");
  }
}

////////////////////////////////////////////////////////////////////////

void
gc_job_manager_impl::set_eh_state(evt_handler_state s)
{
  scoped_lock	l(d_eh_mutex);
  d_eh_state = s;
  d_eh_cond.notify_all();
}

void
gc_job_manager_impl::set_ea_args_maxsize(int maxsize)
{
  scoped_lock	l(d_eh_mutex);
  d_ea_args_maxsize = maxsize;
  d_eh_cond.notify_all();
}

void
gc_job_manager_impl::print_event(spe_event_unit_t *evt)
{
  printf("evt: spe = %d events = (0x%x)", evt->data.u32, evt->events);

  if (evt->events & SPE_EVENT_OUT_INTR_MBOX)
    printf(" OUT_INTR_MBOX");
  
  if (evt->events & SPE_EVENT_IN_MBOX)
    printf(" IN_MBOX");
  
  if (evt->events & SPE_EVENT_TAG_GROUP)
    printf(" TAG_GROUP");
  
  if (evt->events & SPE_EVENT_SPE_STOPPED)
    printf(" SPE_STOPPED");

  printf("\n");
}

struct job_client_info {
  uint16_t	job_id;
  uint16_t	client_id;
};

static int
compare_jci_clients(const void *va, const void *vb)
{
  const job_client_info *a = (job_client_info *) va;
  const job_client_info *b = (job_client_info *) vb;

  return a->client_id - b->client_id;
}

void
gc_job_manager_impl::notify_clients_jobs_are_done(unsigned int spe_num,
						  unsigned int completion_info_idx)
{
  const char *msg = "gc_job_manager_impl::notify_client_job_is_done (INTERNAL ERROR)";

  smp_rmb();  // order reads so we know that data sent from SPE is here

  gc_comp_info_t *ci = &d_comp_info[2 * spe_num + (completion_info_idx & 0x1)];

  if (ci->ncomplete == 0){	// never happens, but ensures code below is correct
    ci->in_use = 0;
    return;
  }

  decr_njobs_active(ci->ncomplete);

  if (0){
    static int total_jobs;
    static int total_msgs;
    total_msgs++;
    total_jobs += ci->ncomplete;
    printf("ppe:     tj = %6d  tm = %6d\n", total_jobs, total_msgs);
  }

  job_client_info gci[GC_CI_NJOBS];

  /*
   * Make one pass through and sanity check everything while filling in gci
   */
  for (unsigned int i = 0; i < ci->ncomplete; i++){
    unsigned int job_id = ci->job_id[i];

    if (job_id >= d_options.max_jobs){
      // internal error, shouldn't happen
      fprintf(stderr,"%s: invalid job_id = %d\n", msg, job_id);
      ci->in_use = 0;		// clear flag so SPE knows we're done with it
      return;
    }
    gc_job_desc *jd = &d_jd[job_id];

    if (jd->sys.client_id >= d_options.max_client_threads){
      // internal error, shouldn't happen
      fprintf(stderr, "%s: invalid client_id = %d\n", msg, jd->sys.client_id);
      ci->in_use = 0;		// clear flag so SPE knows we're done with it
      return;
    }

    gci[i].job_id = job_id;
    gci[i].client_id = jd->sys.client_id;
  }

  // sort by client_id so we only have to lock & signal once / client

  if (ci->ncomplete > 1)
    qsort(gci, ci->ncomplete, sizeof(gci[0]), compare_jci_clients);

  // "wind-in" 

  gc_client_thread_info *last_cti = &d_client_thread[gci[0].client_id];
  last_cti->d_mutex.lock();
  bv_set(last_cti->d_jobs_done, gci[0].job_id);  // mark job done

  for (unsigned int i = 1; i < ci->ncomplete; i++){

    gc_client_thread_info *cti = &d_client_thread[gci[i].client_id];

    if (cti != last_cti){	// new client?

      // yes.  signal old client, unlock old, lock new

      // FIXME we could distinguish between CT_WAIT_ALL & CT_WAIT_ANY

      if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
	last_cti->d_cond.notify_one();	// wake client thread up

      last_cti->d_mutex.unlock();
      cti->d_mutex.lock();
      last_cti = cti;
    }

    // mark job done
    bv_set(cti->d_jobs_done, gci[i].job_id);
  }

  // "wind-out"

  if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
    last_cti->d_cond.notify_one();	// wake client thread up
  last_cti->d_mutex.unlock();

  ci->in_use = 0;		// clear flag so SPE knows we're done with it
}

void
gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
{
  // print_event(evt);

  int spe_num = evt->data.u32;

  // only a single event type can be signaled at a time
  
  if (evt->events == SPE_EVENT_OUT_INTR_MBOX) { // SPE sent us 1 or more msgs
    static const int NMSGS = 32;
    unsigned int msg[NMSGS];
    int n = spe_out_intr_mbox_read(evt->spe, msg, NMSGS, SPE_MBOX_ANY_BLOCKING);
    // printf("spe_out_intr_mbox_read = %d\n", n);
    if (n < 0){
      perror("spe_out_intr_mbox_read");
    }
    else {
      for (int i = 0; i < n; i++){
	switch(MBOX_MSG_OP(msg[i])){
#if 0
	case OP_JOBS_DONE:
	  if (debug())
	    printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
	  notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg[i]));
	  break;
#endif
	case OP_SPU_BUFSIZE:
	  set_ea_args_maxsize(MBOX_MSG_ARG(msg[i]));
	  break;

	case OP_EXIT:
	default:
	  printf("eh: Unexpected msg (0x%08x) from spu[%d]\n", msg[i], spe_num);
	  break;
	}
      }
    }
  }
  else if (evt->events == SPE_EVENT_SPE_STOPPED){ // the SPE stopped
    spe_stop_info_t si;
    int r = spe_stop_info_read(evt->spe, &si);
    if (r < 0){
      perror("spe_stop_info_read");
    }
    else {
      switch (si.stop_reason){
      case SPE_EXIT:
	if (debug()){
	  printf("eh: spu[%d] SPE_EXIT w/ exit_code = %d\n",
		 spe_num, si.result.spe_exit_code);
	}
	break;
      case SPE_STOP_AND_SIGNAL:
	printf("eh: spu[%d] SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
	       spe_num, si.result.spe_signal_code);
	break;
      case SPE_RUNTIME_ERROR:
	printf("eh: spu[%d] SPE_RUNTIME_ERROR w/ spe_runtime_error = 0x%x\n",
	       spe_num, si.result.spe_runtime_error);
	break;
      case SPE_RUNTIME_EXCEPTION:
	printf("eh: spu[%d] SPE_RUNTIME_EXCEPTION w/ spe_runtime_exception = 0x%x\n",
	       spe_num, si.result.spe_runtime_exception);
	break;
      case SPE_RUNTIME_FATAL:
	printf("eh: spu[%d] SPE_RUNTIME_FATAL w/ spe_runtime_fatal = 0x%x\n",
	       spe_num, si.result.spe_runtime_fatal);
	break;
      case SPE_CALLBACK_ERROR:
	printf("eh: spu[%d] SPE_CALLBACK_ERROR w/ spe_callback_error = 0x%x\n",
	       spe_num, si.result.spe_callback_error);
	break;
      case SPE_ISOLATION_ERROR:
	printf("eh: spu[%d] SPE_ISOLATION_ERROR w/ spe_isolation_error = 0x%x\n",
	       spe_num, si.result.spe_isolation_error);
	break;
      default:
	printf("eh: spu[%d] UNKNOWN STOP REASON (%d) w/ spu_status = 0x%x\n",
	       spe_num, si.stop_reason, si.spu_status);
	break;
      }
    }
  }
#if 0 // not enabled
  else if (evt->events == SPE_EVENT_IN_MBOX){	 // there's room to write to SPE
    // spe_in_mbox_write (ignore)
  }
  else if (evt->events == SPE_EVENT_TAG_GROUP){	 // our DMA completed
    // spe_mfcio_tag_status_read
  }
#endif
  else {
    fprintf(stderr, "handle_event: unexpected evt->events = 0x%x\n", evt->events);
    return;
  }
}

//
// This is the "main program" of the event handling thread
//
void
gc_job_manager_impl::event_handler_loop()
{
  static const int MAX_EVENTS = 16;
  static const int TIMEOUT = 20;	// how long to block in milliseconds

  spe_event_unit_t events[MAX_EVENTS];

  if (d_debug)
    printf("event_handler_loop: starting\n");

  set_eh_state(EHS_RUNNING);

  // ask the first spe for its max bufsize
  send_spe(0, MK_MBOX_MSG(OP_GET_SPU_BUFSIZE, 0));

  while (1){
    switch(d_eh_state){

    case EHS_RUNNING:      		// normal stuff
      if (d_shutdown_requested) {
	set_eh_state(EHS_SHUTTING_DOWN);
      }
      break;

    case EHS_SHUTTING_DOWN:
      if (d_jc_state == JCS_DEAD){
	send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
	set_eh_state(EHS_WAITING_FOR_WORKERS_TO_DIE);
      }
      break;

    case EHS_WAITING_FOR_WORKERS_TO_DIE:
      {
	bool all_dead = true;
	for (unsigned int i = 0; i < d_options.nspes; i++)
	  all_dead &= d_worker[i].state == WS_DEAD;

	if (all_dead){
	  set_eh_state(EHS_DEAD);
	  if (d_debug)
	    printf("event_handler_loop: exiting\n");
	  return;
	}
      }
      break;

    default:
      set_eh_state(EHS_DEAD);
      printf("event_handler_loop(default): exiting\n");
      return;
    }

    // block waiting for events...
    int nevents = spe_event_wait(d_spe_event_handler.ptr,
				 events, MAX_EVENTS, TIMEOUT);
    if (nevents < 0){
      perror("spe_wait_event");
      // FIXME bail?
    }
    for (int i = 0; i < nevents; i++){
      handle_event(&events[i]);
    }
  }
}

////////////////////////////////////////////////////////////////////////

void
gc_job_manager_impl::poll_for_job_completion()
{
  static const int niter = 10000;

  CCTPL();		// change current (h/w) thread priority to low

  for (int n = 0; n < niter; n++){

    for (unsigned int spe_num = 0; spe_num < d_options.nspes; spe_num++){
      volatile spe_spu_control_area_t *spe_ctrl = d_worker[spe_num].spe_ctrl;
      int nentries = spe_ctrl->SPU_Mbox_Stat & 0xFF;
      while (nentries-- > 0){
	unsigned int msg = spe_ctrl->SPU_Out_Mbox;
	switch(MBOX_MSG_OP(msg)){
	case OP_JOBS_DONE:
	  if (debug())
	    printf("jc: job_done (0x%08x) from spu[%d]\n", msg, spe_num);

	  CCTPM();		// change current thread priority to medium
	  notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg));
	  CCTPL();
	  break;

	default:
	  printf("jc: Unexpected msg (0x%08x) from spu[%d]\n", msg, spe_num);
	  break;
	}
      }
    }
  }
  CCTPM();
}

//
// This is the "main program" of the job completer thread
//
void
gc_job_manager_impl::job_completer_loop()
{
  d_jc_state = JCS_RUNNING;

  while (1){
    {
      scoped_lock l(d_jc_mutex);
      if (d_jc_njobs_active == 0){
	if (d_shutdown_requested){
	  d_jc_state = JCS_DEAD;
	  return;
	}
	d_jc_cond.wait(l);
      }
    }

    poll_for_job_completion();
  }
}

////////////////////////////////////////////////////////////////////////
// this is the top of the SPE worker threads

static void *
start_worker(void *arg)
{
  worker_ctx *w = (worker_ctx *) arg;
  spe_stop_info_t	si;

  w->state = WS_RUNNING;
  if (s_worker_debug)
    printf("worker[%d]: WS_RUNNING\n", w->spe_idx);

  unsigned int entry = SPE_DEFAULT_ENTRY;
  int r = spe_context_run(w->spe_ctx,  &entry, 0, w->spu_args, 0, &si);

  if (r < 0){			// error
    char buf[64];
    snprintf(buf, sizeof(buf), "worker[%d]: spe_context_run", w->spe_idx);
    perror(buf);
  }
  else if (r == 0){
    // spe program called exit.
    if (s_worker_debug)
      printf("worker[%d]: SPE_EXIT w/ exit_code = %d\n",
	     w->spe_idx, si.result.spe_exit_code);
  }
  else {
    // called stop_and_signal
    //
    // I'm not sure we'll ever get here.  I think the event
    // handler will catch this...
    printf("worker[%d]: SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
	   w->spe_idx, si.result.spe_signal_code);
  }

  // in any event, we're committing suicide now ;)
  if (s_worker_debug)
    printf("worker[%d]: WS_DEAD\n", w->spe_idx);

  w->state = WS_DEAD;
  return 0;
}

////////////////////////////////////////////////////////////////////////

gc_client_thread_info *
gc_job_manager_impl::alloc_cti()
{
  for (unsigned int i = 0; i < d_options.max_client_threads; i++){
    if (d_client_thread[i].d_free){
      // try to atomically grab it
      if (_atomic_dec_if_positive(ptr_to_ea(&d_client_thread[i].d_free)) == 0){
	// got it...
	gc_client_thread_info *cti = &d_client_thread[i];
	cti->d_state = CT_NOT_WAITING;
	bv_zero(cti->d_jobs_done);
	cti->d_njobs_waiting_for = 0;
	cti->d_jobs_waiting_for = 0;
	
	return cti;
      }
    }
  }
  return 0;
}

void
gc_job_manager_impl::free_cti(gc_client_thread_info *cti)
{
  assert((size_t) (cti - d_client_thread.get()) < d_options.max_client_threads);
  cti->d_free = 1;
}

int
gc_job_manager_impl::ea_args_maxsize()
{
  scoped_lock	l(d_eh_mutex);

  while (d_ea_args_maxsize == 0)	// wait for it to be initialized
    d_eh_cond.wait(l);

  return d_ea_args_maxsize;
}

void
gc_job_manager_impl::set_debug(int debug)
{
  d_debug = debug;
  s_worker_debug = debug;
}

int
gc_job_manager_impl::debug()
{
  return d_debug;
}

////////////////////////////////////////////////////////////////////////

void
gc_job_manager_impl::setup_logfiles()
{
  if (!d_options.enable_logging)
    return;

  if (d_options.log2_nlog_entries == 0)
    d_options.log2_nlog_entries = 12;

  // must end up a multiple of the page size

  size_t pagesize = getpagesize();
  size_t s = (1 << d_options.log2_nlog_entries) * sizeof(gc_log_entry_t);
  s = ((s + pagesize - 1) / pagesize) * pagesize;
  size_t nentries = s / sizeof(gc_log_entry_t);
  assert(is_power_of_2(nentries));

  for (unsigned int i = 0; i < d_options.nspes; i++){
    char filename[100];
    snprintf(filename, sizeof(filename), "spu_log.%02d", i);
    int fd = open(filename, O_CREAT|O_TRUNC|O_RDWR, 0664);
    if (fd == -1){
      perror(filename);
      return;
    }
    lseek(fd, s - 1, SEEK_SET);
    write(fd, "\0", 1);
    void *p = mmap(0, s, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
    if (p == MAP_FAILED){
      perror("gc_job_manager_impl::setup_logfiles: mmap");
      close(fd);
      return;
    }
    close(fd);
    memset(p, 0, s);
    d_spu_args[i].log.base = ptr_to_ea(p);
    d_spu_args[i].log.nentries = nentries;
  }
}

void
gc_job_manager_impl::sync_logfiles()
{
  for (unsigned int i = 0; i < d_options.nspes; i++){
    if (d_spu_args[i].log.base)
      msync(ea_to_ptr(d_spu_args[i].log.base),
	    d_spu_args[i].log.nentries * sizeof(gc_log_entry_t),
	    MS_ASYNC);
  }
}

void
gc_job_manager_impl::unmap_logfiles()
{
  for (unsigned int i = 0; i < d_options.nspes; i++){
    if (d_spu_args[i].log.base)
      munmap(ea_to_ptr(d_spu_args[i].log.base),
	     d_spu_args[i].log.nentries * sizeof(gc_log_entry_t));
  }
}

////////////////////////////////////////////////////////////////////////
//
// lookup proc names in d_proc_def table

gc_proc_id_t 
gc_job_manager_impl::lookup_proc(const std::string &proc_name)
{
  for (int i = 0; i < d_nproc_defs; i++)
    if (proc_name == d_proc_def[i].name)
      return i;

  throw gc_unknown_proc(proc_name);
}

std::vector<std::string>
gc_job_manager_impl::proc_names()
{
  std::vector<std::string> r;
  for (int i = 0; i < d_nproc_defs; i++)
    r.push_back(d_proc_def[i].name);

  return r;
}

////////////////////////////////////////////////////////////////////////

worker_ctx::~worker_ctx()
{
  if (spe_ctx){
    int r = spe_context_destroy(spe_ctx);
    if (r != 0){
      perror("spe_context_destroy");
    }
    spe_ctx = 0;
  }
  state = WS_FREE;
}