stardis-solver

Solve coupled heat transfers
git clone git://git.meso-star.fr/stardis-solver.git
Log | Files | Refs | README | LICENSE

sdis.c (29640B)


      1 /* Copyright (C) 2016-2025 |Méso|Star> (contact@meso-star.com)
      2  *
      3  * This program is free software: you can redistribute it and/or modify
      4  * it under the terms of the GNU General Public License as published by
      5  * the Free Software Foundation, either version 3 of the License, or
      6  * (at your option) any later version.
      7  *
      8  * This program is distributed in the hope that it will be useful,
      9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     11  * GNU General Public License for more details.
     12  *
     13  * You should have received a copy of the GNU General Public License
     14  * along with this program. If not, see <http://www.gnu.org/licenses/>. */
     15 
     16 #define _POSIX_C_SOURCE 200112L
     17 
     18 #include "sdis.h"
     19 #include "sdis_c.h"
     20 #include "sdis_device_c.h"
     21 #include "sdis_estimator_c.h"
     22 #include "sdis_green.h"
     23 #include "sdis_log.h"
     24 #include "sdis_misc.h"
     25 #include "sdis_scene_c.h"
     26 #ifdef SDIS_ENABLE_MPI
     27   #include "sdis_mpi.h"
     28 #endif
     29 
     30 #include <star/ssp.h>
     31 
     32 #include <rsys/cstr.h>
     33 #include <rsys/clock_time.h>
     34 #include <rsys/mem_allocator.h>
     35 
     36 #include <errno.h>
     37 
     38 /* Number random numbers in a sequence, i.e. number of consecutive random
     39  * numbers that can be used by a thread */
     40 #define RNG_SEQUENCE_SIZE 100000
     41 
     42 /*******************************************************************************
     43  * Helper functions
     44  ******************************************************************************/
     45 static INLINE int
     46 check_accum_message(const enum mpi_sdis_message msg)
     47 {
     48   return msg == MPI_SDIS_MSG_ACCUM_TEMP
     49       || msg == MPI_SDIS_MSG_ACCUM_TIME
     50       || msg == MPI_SDIS_MSG_ACCUM_FLUX_CONVECTIVE
     51       || msg == MPI_SDIS_MSG_ACCUM_FLUX_IMPOSED
     52       || msg == MPI_SDIS_MSG_ACCUM_FLUX_RADIATIVE
     53       || msg == MPI_SDIS_MSG_ACCUM_FLUX_TOTAL
     54       || msg == MPI_SDIS_MSG_ACCUM_MEAN_POWER;
     55 }
     56 
     57 static res_T
     58 gather_green_functions_no_mpi
     59   (struct sdis_scene* scn,
     60    struct ssp_rng_proxy* rng_proxy,
     61    struct sdis_green_function* per_thread_green[],
     62    const struct accum* per_thread_acc_time,
     63    struct sdis_green_function** out_green)
     64 {
     65   struct sdis_green_function* green = NULL;
     66   struct accum acc_time = ACCUM_NULL;
     67   res_T res = RES_OK;
     68   ASSERT(scn && rng_proxy && per_thread_green && per_thread_acc_time);
     69   ASSERT(out_green);
     70 
     71   /* Redux the per thread green function into the green function of the 1st
     72    * thread */
     73   res = green_function_redux_and_clear
     74     (per_thread_green[0], per_thread_green+1, scn->dev->nthreads-1);
     75   if(res != RES_OK) goto error;
     76 
     77   /* Return the green of the 1st thread */
     78   SDIS(green_function_ref_get(per_thread_green[0]));
     79   green = per_thread_green[0];
     80 
     81   res = gather_accumulators
     82     (scn->dev, MPI_SDIS_MSG_ACCUM_TIME, per_thread_acc_time, &acc_time);
     83   if(res != RES_OK) goto error;
     84 
     85   /* Finalize the estimated green */
     86   res = green_function_finalize(green, rng_proxy, &acc_time);
     87   if(res != RES_OK) goto error;
     88 
     89 exit:
     90   *out_green = green;
     91   return res;
     92 error:
     93   if(green) { SDIS(green_function_ref_put(green)); green = NULL; }
     94   goto exit;
     95 }
     96 
     97 #ifdef SDIS_ENABLE_MPI
     98 static void
     99 rewind_progress_printing(struct sdis_device* dev)
    100 {
    101   size_t i;
    102 
    103   if(!dev->use_mpi
    104   || dev->no_escape_sequence
    105   || dev->mpi_nprocs == 1)
    106     return;
    107 
    108   FOR_EACH(i, 0, (size_t)(dev->mpi_nprocs-1)) {
    109     log_info(dev, "\033[1A\r"); /* Move up */
    110   }
    111 }
    112 
    113 static res_T
    114 send_green_function_to_master_process
    115   (struct sdis_device* dev,
    116    struct sdis_green_function* green)
    117 {
    118   char buf[128];
    119   FILE* stream = NULL; /* Temp file that stores the serialized green function */
    120   void* data = NULL; /* Pointer to serialized green function data */
    121   long sz = 0; /* Size in Bytes of the serialized green function data */
    122   res_T res = RES_OK;
    123   ASSERT(dev && green && dev->mpi_rank != 0);
    124 
    125   /* Open a stream to store the serialized green function */
    126   stream = tmpfile();
    127   if(!stream) {
    128     log_err(dev,
    129       "Could not open the stream used to temporary store the green function "
    130       "before it is sent to the master process.\n");
    131     res = RES_IO_ERR;
    132     goto error;
    133   }
    134 
    135   /* Write the green function into the stream */
    136   res = sdis_green_function_write(green, stream);
    137   if(res != RES_OK) goto error;
    138 
    139   /* Fetch the size of the serialized data */
    140   sz = ftell(stream);
    141   if(sz == -1) {
    142     strerror_r(errno, buf, sizeof(buf));
    143     log_err(dev,
    144       "Could not query the size of the serialized green function data to sent "
    145       "to the master process -- %s.\n", buf);
    146     res = RES_IO_ERR;
    147     goto error;
    148   }
    149   if(sz > INT_MAX) {
    150     log_err(dev,
    151       "The size of the green function data is too large. It must be less than "
    152       "%d Mebabytes while it is of %ld MegabBytes.\n",
    153       INT_MAX / (1024*1024), sz/(1024*1024));
    154     res = RES_MEM_ERR;
    155     goto error;
    156   }
    157 
    158   data = MEM_CALLOC(dev->allocator, 1, (size_t)sz);
    159   if(!data) {
    160     log_err(dev, "Could not allocate the memory to store the serialized green "
    161       "function before it is sent to the master process.\n");
    162     res = RES_MEM_ERR;
    163     goto error;
    164   }
    165 
    166   /* Load in memory the serialized data */
    167   rewind(stream);
    168   if(fread(data, (size_t)sz, 1, stream) != 1) {
    169     log_err(dev,
    170       "Could not read the serialized green function data from the temporary "
    171       "stream before it is sent to the master process.\n");
    172     res = RES_IO_ERR;
    173     goto error;
    174   }
    175 
    176   /* Send the serialized data to the master process */
    177   mutex_lock(dev->mpi_mutex);
    178   MPI(Send(data, (int)sz, MPI_CHAR, 0/*Dst*/, MPI_SDIS_MSG_GREEN_FUNCTION,
    179     MPI_COMM_WORLD));
    180   mutex_unlock(dev->mpi_mutex);
    181 
    182 exit:
    183   if(stream) CHK(fclose(stream) == 0);
    184   if(data) MEM_RM(dev->allocator, data);
    185   return RES_OK;
    186 error:
    187   goto exit;
    188 }
    189 
    190 static res_T
    191 gather_green_functions_from_non_master_process
    192   (struct sdis_scene* scn,
    193    struct sdis_green_function* greens[])
    194 {
    195   struct sdis_green_function_create_from_stream_args green_args =
    196     SDIS_GREEN_FUNCTION_CREATE_FROM_STREAM_ARGS_DEFAULT;
    197 
    198   void* data = NULL; /* Pointer to gathered serialized green function data */
    199   FILE* stream = NULL; /* Temp file that stores the serialized green function */
    200   int iproc;
    201   res_T res = RES_OK;
    202   ASSERT(scn->dev && greens && scn->dev->mpi_rank == 0);
    203 
    204   FOR_EACH(iproc, 1, scn->dev->mpi_nprocs) {
    205     MPI_Request req;
    206     MPI_Status status;
    207     int count;
    208 
    209     /* Waiting for the serialized green function sent by the process `iproc'*/
    210     mpi_waiting_for_message
    211       (scn->dev, iproc, MPI_SDIS_MSG_GREEN_FUNCTION, &status);
    212 
    213     /* Fetch the sizeof the green function sent by the process `iproc' */
    214     mutex_lock(scn->dev->mpi_mutex);
    215     MPI(Get_count(&status, MPI_CHAR, &count));
    216     mutex_unlock(scn->dev->mpi_mutex);
    217 
    218     /* Allocate the memory to store the serialized green function sent by the
    219      * process `iproc' */
    220     data = MEM_REALLOC(scn->dev->allocator, data, (size_t)count);
    221     if(!data) {
    222       log_err(scn->dev,
    223         "Could not allocate %d bytes to store the serialized green function "
    224         "sent by the process %d.\n",
    225         count, iproc);
    226       res = RES_MEM_ERR;
    227       goto error;
    228     }
    229 
    230     /* Asynchronously receive the green function */
    231     mutex_lock(scn->dev->mpi_mutex);
    232     MPI(Irecv(data, count, MPI_CHAR, iproc, MPI_SDIS_MSG_GREEN_FUNCTION,
    233       MPI_COMM_WORLD, &req));
    234     mutex_unlock(scn->dev->mpi_mutex);
    235     mpi_waiting_for_request(scn->dev, &req);
    236 
    237     /* Open a stream to store the serialized green function */
    238     stream = tmpfile();
    239     if(!stream) {
    240       log_err(scn->dev,
    241         "Could not open the stream used to temporary store the green function "
    242         "sent by the process %d.\n", iproc);
    243       res = RES_IO_ERR;
    244       goto error;
    245     }
    246 
    247     if(fwrite(data, (size_t)count, 1, stream) != 1) {
    248       log_err(scn->dev,
    249         "Could not write the green function sent by the process %d into the "
    250         "temporary stream.\n", iproc);
    251       res = RES_IO_ERR;
    252       goto error;
    253     }
    254 
    255     /* Deserialized the green function of the process `iproc'. Note that the
    256      * number of green functions to output is #procs - 1. Since we
    257      * iterate over the indices of non master processes in [1, #procs],
    258      * the index the green function to deserialized is iproc - 1 */
    259     rewind(stream);
    260     green_args.scene = scn;
    261     green_args.stream = stream;
    262     res = sdis_green_function_create_from_stream(&green_args, &greens[iproc-1]);
    263     if(res != RES_OK) {
    264       log_err(scn->dev,
    265         "Error deserializing the green function sent by the process %d -- %s.\n",
    266         iproc, res_to_cstr(res));
    267       goto error;
    268     }
    269 
    270     CHK(fclose(stream) == 0);
    271     stream = NULL;
    272   }
    273 
    274 exit:
    275   if(data) MEM_RM(scn->dev->allocator, data);
    276   if(stream) CHK(fclose(stream) == 0);
    277   return res;
    278 error:
    279   goto exit;
    280 }
    281 #endif
    282 
    283 /*******************************************************************************
    284  * Exported function
    285  ******************************************************************************/
    286 res_T
    287 sdis_get_info(struct sdis_info* info)
    288 {
    289   if(!info) return RES_BAD_ARG;
    290   *info = SDIS_INFO_NULL;
    291 #ifdef SDIS_ENABLE_MPI
    292   info->mpi_enabled = 1;
    293 #else
    294   info->mpi_enabled = 0;
    295 #endif
    296   return RES_OK;
    297 }
    298 
    299 /*******************************************************************************
    300  * Local functions
    301  ******************************************************************************/
    302 res_T
    303 create_per_thread_rng
    304   (struct sdis_device* dev,
    305    struct ssp_rng* rng_state,
    306    const enum ssp_rng_type rng_type,
    307    struct ssp_rng_proxy** out_proxy,
    308    struct ssp_rng** out_rngs[])
    309 {
    310   struct ssp_rng_proxy_create2_args proxy_args = SSP_RNG_PROXY_CREATE2_ARGS_NULL;
    311   struct ssp_rng_proxy* proxy = NULL;
    312   struct ssp_rng** rngs = NULL;
    313   size_t i;
    314   res_T res = RES_OK;
    315   ASSERT(dev && out_proxy && out_rngs);
    316 
    317   rngs = MEM_CALLOC(dev->allocator, dev->nthreads, sizeof(*rngs));
    318   if(!rngs) {
    319     log_err(dev, "Could not allocate the list of per thread RNG.\n");
    320     res = RES_MEM_ERR;
    321     goto error;
    322   }
    323 
    324   /* Create the RNG proxy */
    325   proxy_args.rng= rng_state;
    326   proxy_args.type = rng_type;
    327   proxy_args.nbuckets = dev->nthreads;
    328 #ifdef SDIS_ENABLE_MPI
    329   if(dev->use_mpi) {
    330     proxy_args.sequence_size = RNG_SEQUENCE_SIZE;
    331     proxy_args.sequence_offset = RNG_SEQUENCE_SIZE * (size_t)dev->mpi_rank;
    332     proxy_args.sequence_pitch = RNG_SEQUENCE_SIZE * (size_t)dev->mpi_nprocs;
    333   } else
    334 #endif
    335   {
    336     proxy_args.sequence_size = RNG_SEQUENCE_SIZE;
    337     proxy_args.sequence_offset = 0;
    338     proxy_args.sequence_pitch = RNG_SEQUENCE_SIZE;
    339   }
    340   res = ssp_rng_proxy_create2(dev->allocator, &proxy_args, &proxy);
    341   if(res != RES_OK) goto error;
    342 
    343   /* Query the RNG proxy to create the per thread RNGs */
    344   FOR_EACH(i, 0, dev->nthreads) {
    345     res = ssp_rng_proxy_create_rng(proxy, i, &rngs[i]);
    346     if(res != RES_OK) goto error;
    347   }
    348 
    349 exit:
    350   *out_rngs = rngs;
    351   *out_proxy = proxy;
    352   return res;
    353 error:
    354   if(rngs) { release_per_thread_rng(dev, rngs); rngs = NULL; }
    355   if(proxy) { SSP(rng_proxy_ref_put(proxy)); proxy = NULL; }
    356   goto exit;
    357 }
    358 
    359 void
    360 release_per_thread_rng(struct sdis_device* dev, struct ssp_rng* rngs[])
    361 {
    362   size_t i;
    363   ASSERT(dev);
    364   if(!rngs) return;
    365   FOR_EACH(i, 0, dev->nthreads) { if(rngs[i]) SSP(rng_ref_put(rngs[i])); }
    366   MEM_RM(dev->allocator, rngs);
    367 }
    368 
    369 res_T
    370 create_per_thread_green_function
    371   (struct sdis_scene* scn,
    372    const hash256_T signature,
    373    struct sdis_green_function** out_greens[])
    374 {
    375   struct sdis_green_function** greens = NULL;
    376   size_t i;
    377   res_T res = RES_OK;
    378   ASSERT(scn && out_greens);
    379 
    380   greens = MEM_CALLOC(scn->dev->allocator, scn->dev->nthreads, sizeof(*greens));
    381   if(!greens) {
    382     log_err(scn->dev,
    383       "Could not allocate the list of per thread green function.\n");
    384     res = RES_MEM_ERR;
    385     goto error;
    386   }
    387 
    388   FOR_EACH(i, 0, scn->dev->nthreads) {
    389     res = green_function_create(scn, signature, &greens[i]);
    390     if(res != RES_OK) goto error;
    391   }
    392 
    393 exit:
    394   *out_greens = greens;
    395   return res;
    396 error:
    397   if(greens) {
    398     release_per_thread_green_function(scn, greens);
    399     greens = NULL;
    400   }
    401   goto exit;
    402 }
    403 
    404 void
    405 release_per_thread_green_function
    406   (struct sdis_scene* scn,
    407    struct sdis_green_function* greens[])
    408 {
    409   size_t i;
    410   ASSERT(greens);
    411   FOR_EACH(i, 0, scn->dev->nthreads) {
    412     if(greens[i]) SDIS(green_function_ref_put(greens[i]));
    413   }
    414   MEM_RM(scn->dev->allocator, greens);
    415 }
    416 
    417 res_T
    418 alloc_process_progress(struct sdis_device* dev, int32_t** out_progress)
    419 {
    420   int32_t* progress = NULL;
    421   size_t nprocs;
    422   res_T res = RES_OK;
    423   ASSERT(dev && out_progress);
    424 
    425 #ifdef SDIS_ENABLE_MPI
    426   if(dev->use_mpi) {
    427     nprocs = (size_t)dev->mpi_nprocs;
    428   } else
    429 #endif
    430   {
    431     nprocs = 1;
    432   }
    433   progress = MEM_CALLOC(dev->allocator, nprocs, sizeof(*progress));
    434   if(!progress) {
    435     log_err(dev,"Could not allocate the list of per process progress status.\n");
    436     res = RES_MEM_ERR;
    437     goto error;
    438   }
    439 
    440 exit:
    441   *out_progress = progress;
    442   return res;
    443 error:
    444   if(progress) { MEM_RM(dev->allocator, progress); progress = NULL; }
    445   goto exit;
    446 }
    447 
    448 void
    449 free_process_progress(struct sdis_device* dev, int32_t progress[])
    450 {
    451   ASSERT(dev && progress);
    452   MEM_RM(dev->allocator, progress);
    453 }
    454 
    455 size_t
    456 compute_process_index_range
    457   (const struct sdis_device* dev,
    458    const size_t nindices,
    459    size_t range[2])
    460 {
    461 #ifndef SDIS_ENABLE_MPI
    462   (void)dev;
    463   range[0] = 0;
    464   range[1] = nindices; /* Upper bound is _exclusive_ */
    465 #else
    466   ASSERT(dev);
    467 
    468   if(!dev->use_mpi) {
    469     range[0] = 0;
    470     range[1] = nindices;
    471   } else {
    472     size_t per_process_indices = 0;
    473     size_t remaining_indices = 0;
    474 
    475     /* Compute the minimum number of indices on each process */
    476     per_process_indices = nindices / (size_t)dev->mpi_nprocs;
    477 
    478     range[0] = per_process_indices * (size_t)dev->mpi_rank;
    479     range[1] = range[0] + per_process_indices; /* Upper bound is _exclusive_ */
    480     ASSERT(range[0] <= range[1]);
    481 
    482     /* Set the remaining number of indices that are not managed by one process */
    483     remaining_indices =
    484       nindices - per_process_indices * (size_t)dev->mpi_nprocs;
    485 
    486     /* Distribute the remaining indices among the processes. Each process whose
    487      * rank is lower than the number of remaining indices takes an additional
    488      * index. To ensure continuity of indices per process, subsequent processes
    489      * shift their initial rank accordingly, i.e. process 1 shifts its indices
    490      * by 1, process 2 shifts them by 2 and so on until there are no more
    491      * indices to distribute. From then on, subsequent processes simply shift
    492      * their index range by the number of remaining indices that have been
    493      * distributed. */
    494     if((size_t)dev->mpi_rank < remaining_indices) {
    495       range[0] += (size_t)dev->mpi_rank;
    496       range[1] += (size_t)dev->mpi_rank + 1/* Take one more index */;
    497     } else {
    498       range[0] += remaining_indices;
    499       range[1] += remaining_indices;
    500     }
    501   }
    502 #endif
    503   return range[1] - range[0];
    504 }
    505 
    506 #ifndef SDIS_ENABLE_MPI
    507 res_T
    508 gather_accumulators
    509   (struct sdis_device* dev,
    510    const enum mpi_sdis_message msg,
    511    const struct accum* per_thread_acc,
    512    struct accum* acc)
    513 {
    514   (void)msg;
    515   ASSERT(dev);
    516   /* Gather thread accumulators */
    517   sum_accums(per_thread_acc, dev->nthreads, acc);
    518   return RES_OK;
    519 }
    520 #endif
    521 
    522 #ifdef SDIS_ENABLE_MPI
    523 res_T
    524 gather_accumulators
    525   (struct sdis_device* dev,
    526    const enum mpi_sdis_message msg,
    527    const struct accum* per_thread_acc,
    528    struct accum* acc)
    529 {
    530   struct accum* per_proc_acc = NULL;
    531   size_t nprocs = 0;
    532   res_T res = RES_OK;
    533   ASSERT(dev && per_thread_acc && acc && check_accum_message(msg));
    534 
    535   if(!dev->use_mpi) {
    536     /* Gather thread accumulators */
    537     sum_accums(per_thread_acc, dev->nthreads, acc);
    538     goto exit;
    539   }
    540 
    541   nprocs = dev->mpi_rank == 0 ? (size_t)dev->mpi_nprocs : 1;
    542   per_proc_acc = MEM_CALLOC(dev->allocator, nprocs, sizeof(struct accum));
    543   if(!per_proc_acc) { res = RES_MEM_ERR; goto error; }
    544 
    545   /* Gather thread accumulators */
    546   sum_accums(per_thread_acc, dev->nthreads, &per_proc_acc[0]);
    547 
    548   /* Non master process */
    549   if(dev->mpi_rank != 0) {
    550 
    551     /* Send the accumulator to the master process */
    552     mutex_lock(dev->mpi_mutex);
    553     MPI(Send(&per_proc_acc[0], sizeof(per_proc_acc[0]), MPI_CHAR, 0/*Dst*/,
    554       msg, MPI_COMM_WORLD));
    555     mutex_unlock(dev->mpi_mutex);
    556 
    557     *acc = per_proc_acc[0];
    558 
    559   /* Master process */
    560   } else {
    561     int iproc;
    562 
    563     /* Gather process accumulators */
    564     FOR_EACH(iproc, 1, dev->mpi_nprocs) {
    565       MPI_Request req;
    566 
    567       /* Asynchronously receive the accumulator of `iproc' */
    568       mutex_lock(dev->mpi_mutex);
    569       MPI(Irecv(&per_proc_acc[iproc], sizeof(per_proc_acc[iproc]), MPI_CHAR,
    570         iproc, msg, MPI_COMM_WORLD, &req));
    571       mutex_unlock(dev->mpi_mutex);
    572       mpi_waiting_for_request(dev, &req);
    573     }
    574 
    575     /* Sum the process accumulators */
    576     sum_accums(per_proc_acc, (size_t)dev->mpi_nprocs, acc);
    577   }
    578 
    579 exit:
    580   if(per_proc_acc) MEM_RM(dev->allocator, per_proc_acc);
    581   return res;
    582 error:
    583   goto exit;
    584 }
    585 #endif /* SDIS_ENABLE_MPI */
    586 
    587 #ifndef SDIS_ENABLE_MPI
    588 res_T
    589 gather_accumulators_list
    590   (struct sdis_device* dev,
    591    const enum mpi_sdis_message msg,
    592    const size_t nprobes, /* Total number of probes */
    593    const size_t process_probes[2], /* Ids of the probes managed by the process */
    594    struct accum* per_probe_acc) /* List of per probe accumulators */
    595 {
    596   (void)dev, (void)msg, (void) nprobes;
    597   (void)process_probes, (void)per_probe_acc;
    598   return RES_OK;
    599 }
    600 #else
    601 res_T
    602 gather_accumulators_list
    603   (struct sdis_device* dev,
    604    const enum mpi_sdis_message msg,
    605    const size_t nprobes, /* Total number of probes */
    606    const size_t process_probes[2], /* Range of probes managed by the process */
    607    struct accum* per_probe_acc) /* List of per probe accumulators */
    608 {
    609   struct accum_list {
    610     size_t size;
    611     /* Simulate a C99 flexible array */
    612     ALIGN(16) struct accum accums[1/*Dummy element*/];
    613   }* accum_list = NULL;
    614   size_t max_per_process_nprobes = 0; /* Maximum #probes per process */
    615   size_t process_nprobes = 0; /* Number of process probes */
    616   size_t msg_sz = 0; /* Size in bytes of the message to send */
    617   res_T res = RES_OK;
    618 
    619   /* Check pre-conditions */
    620   ASSERT(dev);
    621   ASSERT(process_nprobes == 0 || (process_probes && per_probe_acc));
    622 
    623   /* Without MPI, do nothing since per_probe_acc already has all the
    624    * accumulators */
    625   if(!dev->use_mpi) goto exit;
    626 
    627   /* Defines the maximum number of probes managed by a process. In fact, it's
    628    * the number of probes divided by the number of processes, plus one to manage
    629    * the remainder of the entire division: the remaining probes are distributed
    630    * between the processes */
    631   max_per_process_nprobes = nprobes/(size_t)dev->mpi_nprocs + 1;
    632 
    633   /* Number of probes */
    634   process_nprobes = process_probes[1] - process_probes[0];
    635 
    636   /* Allocate the array into which the data to be collected is copied */
    637   msg_sz =
    638     sizeof(struct accum_list)
    639   + sizeof(struct accum)*max_per_process_nprobes
    640   - 1/*Dummy element */;
    641   if(msg_sz > INT_MAX) {
    642     log_err(dev, "%s: invalid MPI message size %lu.\n",
    643       FUNC_NAME, (unsigned long)msg_sz);
    644     res = RES_BAD_ARG;
    645     goto error;
    646   }
    647 
    648   accum_list = MEM_CALLOC(dev->allocator, 1, msg_sz);
    649   if(!accum_list) {
    650     log_err(dev,
    651       "%s: unable to allocate the temporary list of accumulators.\n",
    652       FUNC_NAME);
    653     res = RES_MEM_ERR;
    654     goto error;
    655   }
    656 
    657   /* Non master process */
    658   if(dev->mpi_rank != 0) {
    659 
    660     /* Setup the message to be sent */
    661     accum_list->size = process_nprobes;
    662     memcpy(accum_list->accums, per_probe_acc,
    663       sizeof(struct accum)*process_nprobes);
    664 
    665     mutex_lock(dev->mpi_mutex);
    666     MPI(Send(accum_list, (int)msg_sz, MPI_CHAR, 0/*Dst*/, msg, MPI_COMM_WORLD));
    667     mutex_unlock(dev->mpi_mutex);
    668 
    669   /* Master process */
    670   } else {
    671     size_t gathered_nprobes = process_nprobes;
    672     int iproc;
    673 
    674     FOR_EACH(iproc, 1, dev->mpi_nprocs) {
    675       MPI_Request req;
    676 
    677       /* Asynchronously receive the accumulator of `iproc' */
    678       mutex_lock(dev->mpi_mutex);
    679       MPI(Irecv
    680         (accum_list, (int)msg_sz, MPI_CHAR, iproc, msg, MPI_COMM_WORLD, &req));
    681       mutex_unlock(dev->mpi_mutex);
    682 
    683       mpi_waiting_for_request(dev, &req);
    684 
    685       memcpy(per_probe_acc+gathered_nprobes, accum_list->accums,
    686         sizeof(struct accum)*accum_list->size);
    687 
    688       gathered_nprobes += accum_list->size;
    689     }
    690   }
    691 
    692 exit:
    693   if(accum_list) MEM_RM(dev->allocator, accum_list);
    694   return res;
    695 error:
    696   goto exit;
    697 }
    698 #endif
    699 
    700 #ifndef SDIS_ENABLE_MPI
    701 res_T
    702 gather_green_functions
    703   (struct sdis_scene* scn,
    704    struct ssp_rng_proxy* rng_proxy,
    705    struct sdis_green_function* per_thread_green[],
    706    const struct accum* per_thread_acc_time,
    707    struct sdis_green_function** out_green)
    708 {
    709   return gather_green_functions_no_mpi
    710     (scn, rng_proxy, per_thread_green, per_thread_acc_time, out_green);
    711 }
    712 #else
    713 res_T
    714 gather_green_functions
    715   (struct sdis_scene* scn,
    716    struct ssp_rng_proxy* rng_proxy,
    717    struct sdis_green_function* per_thread_green[],
    718    const struct accum* per_thread_acc_time,
    719    struct sdis_green_function** out_green)
    720 {
    721   struct accum acc_time = ACCUM_NULL;
    722   struct sdis_green_function* green = NULL;
    723   struct sdis_green_function** per_proc_green = NULL;
    724   unsigned ithread;
    725   int iproc;
    726   res_T res = RES_OK;
    727   ASSERT(scn && per_thread_green && out_green);
    728 
    729   if(!scn->dev->use_mpi) {
    730     return gather_green_functions_no_mpi
    731       (scn, rng_proxy, per_thread_green, per_thread_acc_time, out_green);
    732     goto exit;
    733   }
    734 
    735   /* Redux the per thread green function into the green function of the 1st
    736    * thread */
    737   res = green_function_redux_and_clear
    738     (per_thread_green[0], per_thread_green+1, scn->dev->nthreads-1);
    739   if(res != RES_OK) goto error;
    740 
    741   /* Gather the accumulators. The master process gathers all accumulators and
    742    * non master process gather their per thread accumulators only that is
    743    * sent to the master process */
    744   res = gather_accumulators
    745     (scn->dev, MPI_SDIS_MSG_ACCUM_TIME, per_thread_acc_time, &acc_time);
    746   if(res != RES_OK) goto error;
    747 
    748   /* Non master process */
    749   if(scn->dev->mpi_rank != 0) {
    750     /* Return the green of the 1st thread */
    751     SDIS(green_function_ref_get(per_thread_green[0]));
    752     green = per_thread_green[0];
    753 
    754     /* We have to finalize the green function priorly to its sent to the master
    755      * process. Its serialization failed without it. */
    756     res = green_function_finalize(green, rng_proxy, &acc_time);
    757     if(res != RES_OK) goto error;
    758 
    759     res  = send_green_function_to_master_process(scn->dev, green);
    760     if(res != RES_OK) goto error;
    761 
    762   /* Master process */
    763   } else {
    764     /* Allocate the list of per process green functions */
    765     per_proc_green = MEM_CALLOC(scn->dev->allocator,
    766       (size_t)scn->dev->mpi_nprocs, sizeof(*per_proc_green));
    767     if(!per_proc_green) {
    768       log_err(scn->dev,
    769         "Could not allocate the temporary list of per process "
    770         "green functions.\n");
    771       res = RES_MEM_ERR;
    772       goto error;
    773     }
    774 
    775     /* Set the gathered per thread green function stores on thread 0 at the
    776      * green function for master process */
    777     SDIS(green_function_ref_get(per_thread_green[0]));
    778     per_proc_green[0] = per_thread_green[0];
    779 
    780     /* Release per thread green functions */
    781     FOR_EACH(ithread, 0, scn->dev->nthreads) {
    782       SDIS(green_function_ref_put(per_thread_green[ithread]));
    783       per_thread_green[ithread] = NULL;
    784     }
    785 
    786     res = gather_green_functions_from_non_master_process(scn, per_proc_green+1);
    787     if(res != RES_OK) goto error;
    788 
    789     /* Redux the per proc green function into the green function of the master
    790      * process */
    791     res = green_function_redux_and_clear
    792       (per_proc_green[0], per_proc_green+1, (size_t)scn->dev->mpi_nprocs-1);
    793     if(res != RES_OK) goto error;
    794 
    795     /* Return the gatherd green function of the master process */
    796     SDIS(green_function_ref_get(per_proc_green[0]));
    797     green = per_proc_green[0];
    798 
    799     /* Finalize the green function */
    800     res = green_function_finalize(green, rng_proxy, &acc_time);
    801     if(res != RES_OK) goto error;
    802   }
    803 
    804 exit:
    805   if(per_proc_green) {
    806     FOR_EACH(iproc, 0, scn->dev->mpi_nprocs) {
    807       if(per_proc_green[iproc]) {
    808         SDIS(green_function_ref_put(per_proc_green[iproc]));
    809       }
    810     }
    811     MEM_RM(scn->dev->allocator, per_proc_green);
    812   }
    813   *out_green = green;
    814   return res;
    815 error:
    816   if(green) { SDIS(green_function_ref_put(green)); green = NULL; }
    817   goto exit;
    818 }
    819 
    820 #endif
    821 
    822 #ifndef SDIS_ENABLE_MPI
    823 res_T
    824 gather_rng_proxy_sequence_id
    825   (struct sdis_device* dev,
    826    struct ssp_rng_proxy* proxy)
    827 {
    828   ASSERT(dev && proxy);
    829   (void)dev, (void)proxy;
    830   return RES_OK;
    831 }
    832 #else
    833 
    834 res_T
    835 gather_rng_proxy_sequence_id
    836   (struct sdis_device* dev,
    837    struct ssp_rng_proxy* proxy)
    838 {
    839   unsigned long proc_seq_id = 0;
    840   size_t seq_id = SSP_SEQUENCE_ID_NONE;
    841   res_T res = RES_OK;
    842   ASSERT(dev && proxy);
    843 
    844   if(!dev->use_mpi) goto exit;
    845 
    846   /* Retrieve the sequence id of the process */
    847   SSP(rng_proxy_get_sequence_id(proxy, &seq_id));
    848   CHK(seq_id <= ULONG_MAX);
    849   proc_seq_id = (unsigned long)seq_id;
    850 
    851   /* Non master process */
    852   if(dev->mpi_rank != 0) {
    853 
    854     /* Send the sequence id to the master process */
    855     mutex_lock(dev->mpi_mutex);
    856     MPI(Send(&proc_seq_id, 1, MPI_UNSIGNED_LONG, 0/*Dst*/,
    857       MPI_SDIS_MSG_RNG_PROXY_SEQUENCE_ID, MPI_COMM_WORLD));
    858     mutex_unlock(dev->mpi_mutex);
    859 
    860   /* Master process */
    861   } else {
    862     size_t nseqs_to_flush = 0;
    863     unsigned long max_seq_id = 0;
    864     int iproc;
    865 
    866     max_seq_id = proc_seq_id;
    867 
    868     /* Gather per process sequence id and defined the maximum sequence id */
    869     FOR_EACH(iproc, 1, dev->mpi_nprocs) {
    870       MPI_Request req;
    871       unsigned long tmp_seq_id;
    872 
    873       /* Asynchronously receive the sequence id of `iproc' */
    874       mutex_lock(dev->mpi_mutex);
    875       MPI(Irecv(&tmp_seq_id, 1, MPI_UNSIGNED_LONG, iproc,
    876         MPI_SDIS_MSG_RNG_PROXY_SEQUENCE_ID, MPI_COMM_WORLD, &req));
    877       mutex_unlock(dev->mpi_mutex);
    878       mpi_waiting_for_request(dev, &req);
    879 
    880       /* Define the maximum sequence id between all processes */
    881       max_seq_id = MMAX(max_seq_id, tmp_seq_id);
    882     }
    883 
    884     /* Flush the current sequence that is already consumed in addition to the
    885      * sequences queried by the other processes */
    886     nseqs_to_flush = 1/*Current sequence*/ + max_seq_id - proc_seq_id;
    887     res = ssp_rng_proxy_flush_sequences(proxy, nseqs_to_flush);
    888     if(res != RES_OK) goto error;
    889   }
    890 
    891 exit:
    892   return res;
    893 error:
    894   goto exit;
    895 }
    896 #endif
    897 
    898 #ifndef SDIS_ENABLE_MPI
    899 res_T
    900 gather_res_T(struct sdis_device* dev, const res_T res)
    901 {
    902   (void)dev;
    903   return res;
    904 }
    905 #else
    906 res_T
    907 gather_res_T(struct sdis_device* dev, const res_T proc_res)
    908 {
    909   int32_t status;
    910   res_T res = proc_res;
    911   int iproc;
    912   ASSERT(dev);
    913 
    914   if(!dev->use_mpi) return proc_res;
    915 
    916   status = (int32_t)(proc_res);
    917 
    918   /* Send the local res status to all other processes */
    919   FOR_EACH(iproc, 0, dev->mpi_nprocs) {
    920     /* Do not send the res status to yourself */
    921     if(iproc == dev->mpi_rank) continue;
    922 
    923     mutex_lock(dev->mpi_mutex);
    924     MPI(Send(&status, 1, MPI_INT32_T, iproc, MPI_SDIS_MSG_RES_T,
    925       MPI_COMM_WORLD));
    926     mutex_unlock(dev->mpi_mutex);
    927   }
    928 
    929   /* Receive the res status of all other processes */
    930   res = proc_res;
    931   FOR_EACH(iproc, 0, dev->mpi_nprocs) {
    932     MPI_Request req;
    933 
    934     /* Do not receive the res status from yourself */
    935     if(iproc == dev->mpi_rank) continue;
    936 
    937     mutex_lock(dev->mpi_mutex);
    938     MPI(Irecv(&status, 1, MPI_INT32_T, iproc, MPI_SDIS_MSG_RES_T,
    939       MPI_COMM_WORLD, &req));
    940     mutex_unlock(dev->mpi_mutex);
    941     mpi_waiting_for_request(dev, &req);
    942 
    943     if(res == RES_OK && status != RES_OK) {
    944       res = (res_T)status;
    945     }
    946   }
    947 
    948   return res;
    949 }
    950 
    951 #endif
    952 
    953 void
    954 print_progress
    955   (struct sdis_device* dev,
    956    int32_t progress[],
    957    const char* label)
    958 {
    959   ASSERT(dev && label);
    960 #ifndef SDIS_ENABLE_MPI
    961   log_info(dev, "%s%3d%%%c", label, progress[0],
    962     dev->no_escape_sequence ? '\n' : '\r');
    963 #else
    964   if(!dev->use_mpi) {
    965     log_info(dev, "%s%3d%%%c", label, progress[0],
    966       dev->no_escape_sequence ? '\n' : '\r');
    967   } else {
    968     int i;
    969     if(dev->mpi_rank != 0) return;
    970     mpi_fetch_progress(dev, progress);
    971     FOR_EACH(i, 0, dev->mpi_nprocs) {
    972       log_info(dev, "Process %d -- %s%3d%%%c", i, label, progress[i],
    973         i == dev->mpi_nprocs - 1 && !dev->no_escape_sequence ? '\r' : '\n');
    974     }
    975   }
    976 #endif
    977 }
    978 
    979 void
    980 print_progress_update
    981   (struct sdis_device* dev,
    982    int32_t progress[],
    983    const char* label)
    984 {
    985   ASSERT(dev);
    986 #ifndef SDIS_ENABLE_MPI
    987   print_progress(dev, progress, label);
    988 #else
    989   if(!dev->use_mpi) {
    990     print_progress(dev, progress, label);
    991   } else {
    992     if(dev->mpi_rank != 0) {
    993       mpi_send_progress(dev, progress[0]);
    994     } else {
    995       mpi_fetch_progress(dev, progress);
    996       rewind_progress_printing(dev);
    997       print_progress(dev, progress, label);
    998     }
    999   }
   1000 #endif
   1001 }
   1002 
   1003 void
   1004 print_progress_completion
   1005   (struct sdis_device* dev,
   1006    int32_t progress[],
   1007    const char* label)
   1008 {
   1009   ASSERT(dev);
   1010   (void)dev, (void)progress, (void)label;
   1011 
   1012   /* Only print at 100% completion when MPI is enabled, because when last
   1013    * printed non-master processes might still be running. When MPI is disabled,
   1014    * 100% completion is printed during calculation */
   1015 #ifdef  SDIS_ENABLE_MPI
   1016   if(dev->use_mpi && dev->mpi_rank == 0 && dev->mpi_nprocs > 1) {
   1017     mpi_fetch_progress(dev, progress);
   1018     rewind_progress_printing(dev);
   1019     print_progress(dev, progress, label);
   1020 
   1021     /* When escape sequences are allowed, the last newline character of the
   1022      * progress message is replaced with a carriage return. After the
   1023      * calculation is complete, we therefore print an additional newline
   1024      * character after this carriage return. */
   1025     if(!dev->no_escape_sequence) {
   1026       log_info(dev, "\n");
   1027     }
   1028   }
   1029 #endif
   1030 }
   1031 
   1032 void
   1033 process_barrier(struct sdis_device* dev)
   1034 {
   1035 #ifndef SDIS_ENABLE_MPI
   1036   (void)dev;
   1037   return;
   1038 #else
   1039   if(dev->use_mpi) {
   1040     mpi_barrier(dev);
   1041   }
   1042 #endif
   1043 }