/* -*- c++ -*- */ /* * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * * GNU Radio is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3, or (at your option) * any later version. * * GNU Radio is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #ifndef INCLUDED_GC_JOB_MANAGER_IMPL_H #define INCLUDED_GC_JOB_MANAGER_IMPL_H #include #include #include #include #include "gc_client_thread_info.h" #include #include #include typedef boost::shared_ptr spe_gang_context_sptr; typedef boost::shared_ptr spe_program_handle_sptr; typedef boost::scoped_array gc_client_thread_info_sa; enum worker_state { WS_FREE, // not in use WS_INIT, // allocated and being initialized WS_RUNNING, // the thread is running WS_DEAD, // the thread is dead }; struct worker_ctx { volatile worker_state state; unsigned int spe_idx; // [0, nspes-1] spe_context_ptr_t spe_ctx; spe_spu_control_area_t *spe_ctrl; pthread_t thread; gc_spu_args_t *spu_args; // pointer to 16-byte aligned struct worker_ctx() : state(WS_FREE), spe_idx(0), spe_ctx(0), spe_ctrl(0), thread(0), spu_args(0) {} ~worker_ctx(); }; enum evt_handler_state { EHS_INIT, // being initialized EHS_RUNNING, // thread is running EHS_SHUTTING_DOWN, // in process of shutting down everything EHS_WAITING_FOR_WORKERS_TO_DIE, EHS_DEAD, // thread is dead }; enum job_completer_state { JCS_INIT, // being initialized JCS_RUNNING, // thread is running JCS_DEAD, // thread is dead }; struct spe_event_handler { spe_event_handler_ptr_t ptr; spe_event_handler() : ptr(0) {} ~spe_event_handler(){ if (ptr){ if (spe_event_handler_destroy(ptr) != 0){ perror("spe_event_handler_destroy"); } } } }; /*! * \brief Concrete class that manages SPE jobs. * * This class contains all the implementation details. */ class gc_job_manager_impl : public gc_job_manager { enum { MAX_SPES = 16 }; int d_debug; gc_jm_options d_options; spe_program_handle_sptr d_spe_image; spe_gang_context_sptr d_gang; // boost::shared_ptr worker_ctx d_worker[MAX_SPES]; // SPE ctx, thread, etc gc_spu_args_t *d_spu_args; // 16-byte aligned structs boost::shared_ptr _d_spu_args_boost; // hack for automatic storage mgmt gc_comp_info_t *d_comp_info; // 128-byte aligned structs boost::shared_ptr _d_comp_info_boost; // hack for automatic storage mgmt // used to coordinate communication w/ the event handling thread boost::mutex d_eh_mutex; boost::condition_variable d_eh_cond; pthread_t d_eh_thread; // the event handler thread volatile evt_handler_state d_eh_state; volatile bool d_shutdown_requested; spe_event_handler d_spe_event_handler; // used to coordinate communication w/ the job completer thread boost::mutex d_jc_mutex; boost::condition_variable d_jc_cond; pthread_t d_jc_thread; // the job completion thread volatile job_completer_state d_jc_state; int d_jc_njobs_active; // # of jobs submitted but not yet reaped // round robin notification of spes int d_ntell; // # of spes to tell unsigned int d_tell_start; // which one to start with // All of the job descriptors are hung off of here. // We allocate them all in a single cache aligned chunk. gc_job_desc_t *d_jd; // [options.max_jobs] boost::shared_ptr _d_jd_boost; // hack for automatic storage mgmt gc_client_thread_info_sa d_client_thread; // [options.max_client_threads] // We use bitvectors to represent the completing state of a job. Each // bitvector is d_bvlen longs in length. int d_bvlen; // bit vector length in longs // This contains the storage for all the bitvectors used by the job // manager. There's 1 for each client thread, in the d_jobs_done // field. We allocate them all in a single cache aligned chunk. boost::shared_ptr _d_all_bitvectors; // hack for automatic storage mgmt // Lock free stack where we keep track of the free job descriptors. gc_jd_stack_t *d_free_list; // stack of free job descriptors boost::shared_ptr _d_free_list_boost; // hack for automatic storage mgmt // The PPE inserts jobs here; SPEs pull jobs from here. gc_jd_queue_t *d_queue; // job queue boost::shared_ptr _d_queue_boost; // hack for automatic storage mgmt int d_ea_args_maxsize; struct gc_proc_def *d_proc_def; // the SPE procedure table uint32_t d_proc_def_ls_addr; // the LS address of the table int d_nproc_defs; // number of proc_defs in table gc_client_thread_info *alloc_cti(); void free_cti(gc_client_thread_info *cti); void create_event_handler(); void set_eh_state(evt_handler_state s); void set_ea_args_maxsize(int maxsize); void notify_clients_jobs_are_done(unsigned int spe_num, unsigned int completion_info_idx); public: void event_handler_loop(); // really private void job_completer_loop(); // really private private: bool send_all_spes(uint32_t msg); bool send_spe(unsigned int spe, uint32_t msg); void print_event(spe_event_unit_t *evt); void handle_event(spe_event_unit_t *evt); bool incr_njobs_active(); void decr_njobs_active(int n); void tell_spes_to_check_queue(); void poll_for_job_completion(); // bitvector ops void bv_zero(unsigned long *bv); void bv_clr(unsigned long *bv, unsigned int bitno); void bv_set(unsigned long *bv, unsigned int bitno); bool bv_isset(unsigned long *bv, unsigned int bitno); bool bv_isclr(unsigned long *bv, unsigned int bitno); void setup_logfiles(); void sync_logfiles(); void unmap_logfiles(); friend gc_job_manager_sptr gc_make_job_manager(const gc_jm_options *options); gc_job_manager_impl(const gc_jm_options *options = 0); public: virtual ~gc_job_manager_impl(); /*! * Stop accepting new jobs. Wait for existing jobs to complete. * Return all managed SPE's to the system. */ virtual bool shutdown(); /*! * \brief Return number of SPE's currently allocated to job manager. */ virtual int nspes() const; /*! * \brief Return a pointer to a properly aligned job descriptor, * or zero if none are available. */ virtual gc_job_desc *alloc_job_desc(); /* *! Return a job descriptor previously allocated with alloc_job_desc() * * \param[in] jd pointer to job descriptor to free. */ virtual void free_job_desc(gc_job_desc *jd); /*! * \brief Submit a job for asynchronous processing on an SPE. * * \param[in] jd pointer to job description * * The caller must not read or write the job description * or any of the memory associated with any indirect arguments * until after calling wait_job. * * \returns true iff the job was successfully enqueued. * If submit_job returns false, check jd->status for additional info. */ virtual bool submit_job(gc_job_desc *jd); /*! * \brief Wait for job to complete. * * A thread may only wait for jobs which it submitted. * * \returns true if sucessful, else false. */ virtual bool wait_job(gc_job_desc *jd); /*! * \brief wait for 1 or more jobs to complete. * * \param[in] njobs is the length of arrays \p jd and \p done. * \param[in] jd are the jobs that are to be waited for. * \param[out] done indicates whether the corresponding job is complete. * \param[in] mode indicates whether to wait for ALL or ANY of the jobs * in \p jd to complete. * * A thread may only wait for jobs which it submitted. * * \returns number of jobs completed, or -1 if error. */ virtual int wait_jobs(unsigned int njobs, gc_job_desc *jd[], bool done[], gc_wait_mode mode); virtual int ea_args_maxsize(); virtual gc_proc_id_t lookup_proc(const std::string &name); virtual std::vector proc_names(); virtual void set_debug(int debug); virtual int debug(); }; #endif /* INCLUDED_GC_JOB_MANAGER_IMPL_H */