stardis-solver

Solve coupled heat transfers
git clone git://git.meso-star.fr/stardis-solver.git
Log | Files | Refs | README | LICENSE

sdis_device.c (13311B)


      1 /* Copyright (C) 2016-2025 |Méso|Star> (contact@meso-star.com)
      2  *
      3  * This program is free software: you can redistribute it and/or modify
      4  * it under the terms of the GNU General Public License as published by
      5  * the Free Software Foundation, either version 3 of the License, or
      6  * (at your option) any later version.
      7  *
      8  * This program is distributed in the hope that it will be useful,
      9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     11  * GNU General Public License for more details.
     12  *
     13  * You should have received a copy of the GNU General Public License
     14  * along with this program. If not, see <http://www.gnu.org/licenses/>. */
     15 
     16 #include "sdis.h"
     17 #include "sdis_device_c.h"
     18 #include "sdis_log.h"
     19 
     20 #include <rsys/cstr.h>
     21 #include <rsys/logger.h>
     22 #include <rsys/mem_allocator.h>
     23 #include <rsys/mutex.h>
     24 
     25 #include <star/s2d.h>
     26 #include <star/s3d.h>
     27 #include <star/ssp.h>
     28 #include <star/swf.h>
     29 
     30 #include <omp.h>
     31 
     32 #ifdef SDIS_ENABLE_MPI
     33   #include <mpi.h>
     34 #endif
     35 
     36 /*******************************************************************************
     37  * Helper functions
     38  ******************************************************************************/
     39 #ifdef SDIS_ENABLE_MPI
     40 
     41 static const char*
     42 mpi_error_string(struct sdis_device* dev, const int mpi_err)
     43 {
     44   int res_mpi = MPI_SUCCESS;
     45   int len;
     46   ASSERT(dev);
     47 
     48   res_mpi = MPI_Error_string(mpi_err, str_get(&dev->mpi_err_str), &len);
     49   return res_mpi == MPI_SUCCESS
     50     ? str_get(&dev->mpi_err_str) : "Invalid MPI error";
     51 }
     52 
     53 static const char*
     54 mpi_thread_support_string(const int val)
     55 {
     56   switch(val) {
     57     case MPI_THREAD_SINGLE: return "MPI_THREAD_SINGLE";
     58     case MPI_THREAD_FUNNELED: return "MPI_THREAD_FUNNELED";
     59     case MPI_THREAD_SERIALIZED: return "MPI_THREAD_SERIALIZED";
     60     case MPI_THREAD_MULTIPLE: return "MPI_THREAD_MULTIPLE";
     61     default: FATAL("Unreachable code.\n"); break;
     62   }
     63 }
     64 
     65 static res_T
     66 mpi_print_proc_info(struct sdis_device* dev)
     67 {
     68   char proc_name[MPI_MAX_PROCESSOR_NAME];
     69   int proc_name_len;
     70   char* proc_names = NULL;
     71   uint32_t* proc_nthreads = NULL;
     72   uint32_t nthreads = 0;
     73   int iproc;
     74   res_T res = RES_OK;
     75   ASSERT(dev);
     76 
     77   /* On process 0, allocate the arrays to stored gathered data */
     78   if(dev->mpi_rank == 0) {
     79 
     80     /* Allocate the array to store the per process name */
     81     proc_names = MEM_CALLOC(dev->allocator, (size_t)dev->mpi_nprocs,
     82       MPI_MAX_PROCESSOR_NAME*sizeof(*proc_names));
     83     if(!proc_names) {
     84       res = RES_MEM_ERR;
     85       log_err(dev,
     86         "Could not allocate the temporary memory for MPI process names -- "
     87         "%s.\n", res_to_cstr(res));
     88       goto error;
     89     }
     90 
     91     /* Allocate the array to store the per process #threads */
     92     proc_nthreads = MEM_CALLOC(dev->allocator, (size_t)dev->mpi_nprocs,
     93       sizeof(*proc_nthreads));
     94     if(!proc_nthreads) {
     95       res = RES_MEM_ERR;
     96       log_err(dev,
     97         "Could not allocate the temporary memory for the #threads of the MPI "
     98         "processes -- %s.\n", res_to_cstr(res));
     99       goto error;
    100     }
    101   }
    102 
    103   /* Gather the process name to the process 0 */
    104   MPI(Get_processor_name(proc_name, &proc_name_len));
    105   MPI(Gather(proc_name, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, proc_names,
    106     MPI_MAX_PROCESSOR_NAME, MPI_CHAR, 0, MPI_COMM_WORLD));
    107 
    108   /* Gather the #threads to process 0*/
    109   nthreads = (uint32_t)dev->nthreads;
    110   MPI(Gather(&nthreads, 1, MPI_UINT32_T, proc_nthreads, 1, MPI_UINT32_T, 0,
    111     MPI_COMM_WORLD));
    112 
    113   if(dev->mpi_rank == 0) {
    114     FOR_EACH(iproc, 0, dev->mpi_nprocs) {
    115       log_info(dev, "Process %d -- %s; #threads: %u\n",
    116         iproc, proc_names + iproc*MPI_MAX_PROCESSOR_NAME, proc_nthreads[iproc]);
    117     }
    118   }
    119 
    120 exit:
    121   if(proc_names) MEM_RM(dev->allocator, proc_names);
    122   if(proc_nthreads) MEM_RM(dev->allocator, proc_nthreads);
    123   return res;
    124 error:
    125   goto exit;
    126 }
    127 
    128 static res_T
    129 mpi_init(struct sdis_device* dev)
    130 {
    131   int res_mpi = MPI_SUCCESS;
    132   int is_init = 0;
    133   int thread_support = 0;
    134   res_T res = RES_OK;
    135   ASSERT(dev);
    136 
    137   #define CALL_MPI(Func, ErrMsg) {                                             \
    138     res_mpi = MPI_##Func;                                                      \
    139     if(res_mpi != MPI_SUCCESS) {                                               \
    140       log_err(dev, ErrMsg" - %s\n", mpi_error_string(dev, res_mpi));           \
    141       res = RES_UNKNOWN_ERR;                                                   \
    142       goto error;                                                              \
    143     }                                                                          \
    144   } (void)0
    145 
    146   CALL_MPI(Initialized(&is_init),
    147     "Error querying the MPI init state");
    148 
    149   if(!is_init) {
    150     log_err(dev,
    151       "MPI is not initialized. The MPI_Init[_thread] function must be called "
    152       "priorly to the creation of the Stardis device.\n");
    153     res = RES_BAD_OP;
    154     goto error;
    155   }
    156 
    157   CALL_MPI(Query_thread(&thread_support),
    158     "Error querying the MPI thread support");
    159 
    160   if(thread_support < MPI_THREAD_SERIALIZED) {
    161     log_err(dev,
    162      "The provided MPI implementation does not support serialized API calls "
    163      "from multiple threads. The thread support is limited to %s.\n",
    164      mpi_thread_support_string(thread_support));
    165     res = RES_BAD_OP;
    166     goto error;
    167   }
    168 
    169   CALL_MPI(Comm_rank(MPI_COMM_WORLD, &dev->mpi_rank),
    170     "Error retrieving the MPI rank");
    171   CALL_MPI(Comm_size(MPI_COMM_WORLD, &dev->mpi_nprocs),
    172     "Error retrieving the size of the MPI group");
    173 
    174   #undef CALL_MPI
    175 
    176   dev->mpi_mutex = mutex_create();
    177   if(!dev->mpi_mutex) {
    178     log_err(dev,
    179       "Error creating the mutex used to protect the MPI calls.\n");
    180     res = RES_MEM_ERR;
    181     goto error;
    182   }
    183 
    184   mpi_print_proc_info(dev);
    185 
    186 exit:
    187   return res;
    188 error:
    189   if(dev->mpi_mutex) {
    190     mutex_destroy(dev->mpi_mutex);
    191     dev->mpi_mutex = NULL;
    192   }
    193   goto exit;
    194 }
    195 
    196 #endif /* SDIS_ENABLE_MPI */
    197 
    198 static INLINE int
    199 check_sdis_device_create_args(const struct sdis_device_create_args* args)
    200 {
    201   return args && args->nthreads_hint != 0;
    202 }
    203 
    204 static INLINE res_T
    205 setup_logger
    206   (struct sdis_device* dev,
    207    const struct sdis_device_create_args* args)
    208 {
    209   ASSERT(dev && args);
    210   if(args->logger) {
    211     dev->logger = args->logger;
    212   } else {
    213     setup_log_default(dev);
    214   }
    215   return RES_OK;
    216 }
    217 
    218 static INLINE res_T
    219 setup_star2d(struct sdis_device* dev)
    220 {
    221   res_T res = RES_OK;
    222   ASSERT(dev);
    223   res = s2d_device_create(dev->logger, dev->allocator, 0, &dev->s2d_dev);
    224   if(res != RES_OK) {
    225     log_err(dev,
    226       "Could not create the Star-2D device for Stardis-Solver -- %s.\n",
    227       res_to_cstr(res));
    228     goto error;
    229   }
    230 exit:
    231   return res;
    232 error:
    233   goto exit;
    234 }
    235 
    236 static INLINE res_T
    237 setup_star3d(struct sdis_device* dev)
    238 {
    239   res_T res = RES_OK;
    240   ASSERT(dev);
    241   res = s3d_device_create(dev->logger, dev->allocator, 0, &dev->s3d_dev);
    242   if(res != RES_OK) {
    243     log_err(dev,
    244       "Could not create the Star-3D device for Stardis-Solver -- %s.\n",
    245       res_to_cstr(res));
    246     goto error;
    247   }
    248 exit:
    249   return res;
    250 error:
    251   goto exit;
    252 }
    253 
    254 static INLINE res_T
    255 setup_starwf(struct sdis_device* dev)
    256 {
    257   struct swf_H_tabulate_args H2d_args = SWF_H2D_TABULATE_ARGS_DEFAULT;
    258   struct swf_H_tabulate_args H3d_args = SWF_H3D_TABULATE_ARGS_DEFAULT;
    259   res_T res = RES_OK;
    260   ASSERT(dev);
    261 
    262   H2d_args.allocator = dev->allocator;
    263   H3d_args.allocator = dev->allocator;
    264 
    265   res = swf_H2d_tabulate(&H2d_args, &dev->H_2d);
    266   if(res != RES_OK) {
    267     log_err(dev, "Unable to tabulate H2d function -- %s.\n",
    268       res_to_cstr(res));
    269     goto error;
    270   }
    271 
    272   res = swf_H3d_tabulate(&H3d_args, &dev->H_3d);
    273   if(res != RES_OK) {
    274     log_err(dev, "Unable to tabulate H3d function -- %s.\n",
    275       res_to_cstr(res));
    276     goto error;
    277   }
    278 
    279 exit:
    280   return res;
    281 error:
    282   goto exit;
    283 }
    284 
    285 static INLINE res_T
    286 setup_mpi(struct sdis_device* dev, const struct sdis_device_create_args* args)
    287 {
    288   ASSERT(dev && args);
    289 #ifdef SDIS_ENABLE_MPI
    290   dev->use_mpi = args->use_mpi;
    291   if(args->use_mpi) {
    292     const res_T res = mpi_init(dev);
    293     if(res != RES_OK) return res;
    294   }
    295 #else
    296   if(args->use_mpi) {
    297     log_warn(dev,
    298       "Stardis-Solver is built without the support of the Message Passing "
    299       "Interface. MPI cannot be used for parallel computations.\n");
    300   }
    301 #endif
    302   return RES_OK;
    303 
    304 }
    305 
    306 static void
    307 device_release(ref_T* ref)
    308 {
    309   struct sdis_device* dev;
    310   ASSERT(ref);
    311   dev = CONTAINER_OF(ref, struct sdis_device, ref);
    312   if(dev->s2d_dev) S2D(device_ref_put(dev->s2d_dev));
    313   if(dev->s3d_dev) S3D(device_ref_put(dev->s3d_dev));
    314   if(dev->H_2d) SWF(tabulation_ref_put(dev->H_2d));
    315   if(dev->H_3d) SWF(tabulation_ref_put(dev->H_3d));
    316   if(dev->logger == &dev->logger__) logger_release(&dev->logger__);
    317   ASSERT(flist_name_is_empty(&dev->interfaces_names));
    318   ASSERT(flist_name_is_empty(&dev->media_names));
    319   ASSERT(flist_name_is_empty(&dev->source_names));
    320   flist_name_release(&dev->interfaces_names);
    321   flist_name_release(&dev->media_names);
    322   flist_name_release(&dev->source_names);
    323 #ifdef SDIS_ENABLE_MPI
    324   if(dev->mpi_mutex) mutex_destroy(dev->mpi_mutex);
    325   str_release(&dev->mpi_err_str);
    326 #endif
    327   MEM_RM(dev->allocator, dev);
    328 }
    329 
    330 /*******************************************************************************
    331  * Exported functions
    332  ******************************************************************************/
    333 res_T
    334 sdis_device_create
    335   (const struct sdis_device_create_args* args,
    336    struct sdis_device** out_dev)
    337 {
    338   struct sdis_device* dev = NULL;
    339   struct mem_allocator* allocator = NULL;
    340   unsigned nthreads_max = 0;
    341   res_T res = RES_OK;
    342 
    343   if(!check_sdis_device_create_args(args) || !out_dev) {
    344     res = RES_BAD_ARG;
    345     goto error;
    346   }
    347 
    348   allocator = args->allocator ? args->allocator : &mem_default_allocator;
    349   dev = MEM_CALLOC(allocator, 1, sizeof(struct sdis_device));
    350   if(!dev) {
    351     if(args->verbosity) {
    352       #define ERR_STR STR(FUNC_NAME)": could not allocate the Stardis device -- %s."
    353       if(args->logger) {
    354         logger_print(args->logger, LOG_ERROR, ERR_STR, res_to_cstr(res));
    355       } else {
    356         fprintf(stderr, MSG_ERROR_PREFIX ERR_STR, res_to_cstr(res));
    357       }
    358       #undef ERR_STR
    359     }
    360     res = RES_MEM_ERR;
    361     goto error;
    362   }
    363   nthreads_max = (unsigned)MMAX(omp_get_max_threads(), omp_get_num_procs());
    364   dev->allocator = allocator;
    365   dev->no_escape_sequence = args->no_escape_sequence;
    366   dev->verbose = args->verbosity;
    367   dev->nthreads = MMIN(args->nthreads_hint, nthreads_max);
    368   ref_init(&dev->ref);
    369   flist_name_init(allocator, &dev->interfaces_names);
    370   flist_name_init(allocator, &dev->media_names);
    371   flist_name_init(allocator, &dev->source_names);
    372 #ifdef SDIS_ENABLE_MPI
    373   str_init(allocator, &dev->mpi_err_str);
    374 #endif
    375 
    376   res = setup_logger(dev, args);
    377   if(res != RES_OK) goto error;
    378   res = setup_star2d(dev);
    379   if(res != RES_OK) goto error;
    380   res = setup_star3d(dev);
    381   if(res != RES_OK) goto error;
    382   res = setup_starwf(dev);
    383   if(res != RES_OK) goto error;
    384   res = setup_mpi(dev, args);
    385   if(res != RES_OK) goto error;
    386 
    387   log_info(dev, "Using %lu %s.\n", (unsigned long)dev->nthreads,
    388     dev->nthreads == 1 ? "thread" : "threads");
    389 
    390 exit:
    391   if(out_dev) *out_dev = dev;
    392   return res;
    393 error:
    394   if(dev) { SDIS(device_ref_put(dev)); dev = NULL; }
    395   goto exit;
    396 }
    397 
    398 res_T
    399 sdis_device_ref_get(struct sdis_device* dev)
    400 {
    401   if(!dev) return RES_BAD_ARG;
    402   ref_get(&dev->ref);
    403   return RES_OK;
    404 }
    405 
    406 res_T
    407 sdis_device_ref_put(struct sdis_device* dev)
    408 {
    409   if(!dev) return RES_BAD_ARG;
    410   ref_put(&dev->ref, device_release);
    411   return RES_OK;
    412 }
    413 
    414 res_T
    415 sdis_device_is_mpi_used(struct sdis_device* dev, int* is_mpi_used)
    416 {
    417   if(!dev || !is_mpi_used) return RES_BAD_ARG;
    418 #ifndef SDIS_ENABLE_MPI
    419   *is_mpi_used = 0;
    420 #else
    421   *is_mpi_used = dev->use_mpi;
    422 #endif
    423   return RES_OK;
    424 }
    425 
    426 res_T
    427 sdis_device_get_mpi_rank(struct sdis_device* dev, int* rank)
    428 {
    429 #ifndef SDIS_ENABLE_MPI
    430   (void)dev, (void)rank;
    431   return RES_BAD_OP;
    432 #else
    433   if(!dev || !rank) return RES_BAD_ARG;
    434   if(!dev->use_mpi) return RES_BAD_OP;
    435   ASSERT(dev->mpi_rank >= 0);
    436   *rank = dev->mpi_rank;
    437   return RES_OK;
    438 #endif
    439 }
    440 
    441 /*******************************************************************************
    442  * Local functions
    443  ******************************************************************************/
    444 res_T
    445 create_rng_from_rng_proxy
    446   (struct sdis_device* dev,
    447    const struct ssp_rng_proxy* proxy,
    448    struct ssp_rng** out_rng)
    449 {
    450   enum ssp_rng_type rng_type;
    451   struct ssp_rng* rng = NULL;
    452   FILE* stream = NULL;
    453   res_T res = RES_OK;
    454   ASSERT(dev && proxy && out_rng);
    455 
    456   stream = tmpfile();
    457   if(!stream) {
    458     log_err(dev,
    459       "Could not open a temporary stream to store the RNG state.\n");
    460     res = RES_IO_ERR;
    461     goto error;
    462   }
    463 
    464   SSP(rng_proxy_get_type(proxy, &rng_type));
    465   res = ssp_rng_create(dev->allocator, rng_type, &rng);
    466   if(res != RES_OK) {
    467     log_err(dev, "Could not create the RNG -- %s\n", res_to_cstr(res));
    468     goto error;
    469   }
    470 
    471   res = ssp_rng_proxy_write(proxy, stream);
    472   if(res != RES_OK) {
    473     log_err(dev, "Could not serialize the RNG state -- %s\n",
    474       res_to_cstr(res));
    475     goto error;
    476   }
    477 
    478   rewind(stream);
    479   res = ssp_rng_read(rng, stream);
    480   if(res != RES_OK) {
    481     log_err(dev, "Could not read the serialized RNG state -- %s\n",
    482       res_to_cstr(res));
    483     goto error;
    484   }
    485 
    486 exit:
    487   if(out_rng) *out_rng = rng;
    488   if(stream) fclose(stream);
    489   return res;
    490 error:
    491   if(rng) {
    492     SSP(rng_ref_put(rng));
    493     rng = NULL;
    494   }
    495   goto exit;
    496 }