/* * Scilab ( http://www.scilab.org/ ) - This file is part of Scilab * Copyright (C) 2010 - DIGITEO - Bernard HUGUENEY * * This file must be used under the terms of the CeCILL. * This source file is licensed as described in the file COPYING, which * you should have received as part of this distribution. The terms * are also available at * http://www.cecill.info/licences/Licence_CeCILL_V2.1-en.txt * */ #ifndef PARALLEL_RUN_HXX #define PARALLEL_RUN_HXX #include #include #include #include #include #include #include #include extern "C" { #include "scilabmode.h" } #ifndef _MSC_VER #ifndef MAP_ANONYMOUS # define MAP_ANONYMOUS MAP_ANON #endif #include #include #include #include #include #include using std::min; using std::max; #else #include extern "C" { #include "forkWindows.h" #include "mmapWindows.h" #include "semWindows.h" }; #endif /* implementation notes: due to alignment issues, we have to use lhs differents shared memory buffers we avoif busywaiting (bad for cpu time) or sleeping (bad for wallclock time) thanks to semaphores in shared memopry */ namespace { /* * allocates shared memory for s elements of type T. (anonymous, not mapped to a file) * * @param T type to alloc * @param s nb of elements to alloc mem, defaults to 1. */ template T* allocSharedMem(std::size_t s = 1) { return static_cast(mmap(0, sizeof(T) * s, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0)); } void freeSharedMem(void* ptr, std::size_t s = 1) { munmap(ptr, s); } struct PortableSemaphore { explicit PortableSemaphore (unsigned int init): ptr(allocSharedMem()) { sem_init(ptr, 1, init); } void post() { sem_post(ptr); } void wait() { sem_wait(ptr); } ~PortableSemaphore() { munmap(ptr, sizeof(sem_t)); } sem_t* ptr; }; struct PortableSignalInhibiter { PortableSignalInhibiter() { #ifndef _MSC_VER struct sigaction reapchildren; std::memset( &reapchildren, 0, sizeof reapchildren ); reapchildren.sa_flags = SA_NOCLDWAIT; sigaction( SIGCHLD, &reapchildren, &backup_sigaction ); #else #endif } ~PortableSignalInhibiter() { #ifndef _MSC_VER sigaction(SIGCHLD, &backup_sigaction, 0 );/* restore of sigaction */ #else #endif } #ifndef _MSC_VER struct sigaction backup_sigaction; #else #endif }; /* * Handles scheduling. Could be done in parallel_wrapper, but it would a very long applyWithProcesses() member function * breaking it would involve adding many member variables in the wrapper, so I chose an utility class with friend access to a parallel_wrapper * taken by ref. * * The trick is to exchange the res[] ptr with ptrs to some shared memory so that callF() from each process fills the same shared result buffer. * When all is done, we copy the result to the original (not shared) result buffer. Apart from the result buffer, we also share a std::size_t *todo * poiting to the next index to compute. * * We use two cross process synch semaphores : * 1°) todo_protect to protect access to the *todo shared value * 2°) out_of_work to count how many workers have finished their work. Master process waits until nb_process have done their work. * */ #ifndef _MSC_VER #define __HAVE_FORK__ 1 #endif #ifdef __HAVE_FORK__ template< typename ParallelWrapper> struct scheduler { typedef std::pair workshare_t; /* * constructs the scheduler, allocating the ressources (shared memory) and semaphores * @param wrapper the parallel wrapper launching the computations * @param nb_proc number of processes to use (spawns nb_proc-1 processes) * @param dyn if scheduling is dynamic or static * @param chunk_s chunk size. Only useful for dynamic sheduling as static is always faster with max chunk size for processes. * /!\ changes w.res[] to point to shared memory buffer */ scheduler( ParallelWrapper& wrapper, std::size_t nb_proc, bool dyn, std::size_t chunk_s) : w(wrapper), nb_process(nb_proc), dynamic(dyn), chunk_size(chunk_s), todo(0) , out_of_work(0) , todo_protect(1) , backup_res(wrapper.lhs) { for (std::size_t i(0); i != w.lhs; ++i) { backup_res[i] = w.res[i]; const_cast(w.res)[i] = allocSharedMem(w.n * w.res_size[i]); } todo = allocSharedMem(); *todo = 0; } /* * performs concurrent calls from w. (with w.f()) and copy results to the original w.res[] locations * but does not restore w.res[} (this is done in destructor. */ void operator()() { PortableSignalInhibiter guard; /* disable waiting notification from child death to avoid zombies */ std::vector init_ws(nb_process); for (std::size_t i(0); i != nb_process; ++i) /* we precompute shares so that we don't have to synch */ { init_ws[i] = initialWork(i); } std::size_t p; for (p = 1; p != nb_process; ++p) { if (!fork()) /* child process goes to work at once */ { setScilabMode(SCILAB_NWNI); break; }/* parent process continues to spawn children */ } if (p == nb_process) { p = 0; } w.prologue(p); for (workshare_t ws(init_ws[p]); ws.first != ws.second; ws = getWork()) { for (std::size_t i(ws.first); i != ws.second; ++i) { w.callF(i);/* callF() is performed on our shared mem as res[] */ } } out_of_work.post(); w.epilogue(p); if (p) { exit(EXIT_SUCCESS); // does not call destructor which is good :) } for (std::size_t i(0); i != nb_process; ++i) /* wait for all workers to finish */ { out_of_work.wait(); } for (std::size_t i(0); i != w.lhs; ++i) /* copy results into the original res[] memory */ { std::memcpy(backup_res[i], w.res[i], w.res_size[i]*w.n); } }/* guard destructor restores the signals */ /* destroy/ free semaphores/shared memory, and restores original w.res[] values. */ ~scheduler() { for (std::size_t j(0); j != w.lhs; ++j) { freeSharedMem(w.res[j], w.n * w.res_size[j]); const_cast(w.res)[j] = backup_res[j]; } freeSharedMem(todo, sizeof(std::size_t)); } private: /* compute initial workshares. no need to synch because we did not fork() yet * @param p process id from 0(parent) -> nb_process-1 */ workshare_t initialWork( std::size_t p) const { std::size_t old_todo(*todo); // std::cerr<<"*todo="<<*todo<<" dynamic="<