sdis_device.c (13311B)
1 /* Copyright (C) 2016-2025 |Méso|Star> (contact@meso-star.com) 2 * 3 * This program is free software: you can redistribute it and/or modify 4 * it under the terms of the GNU General Public License as published by 5 * the Free Software Foundation, either version 3 of the License, or 6 * (at your option) any later version. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public License 14 * along with this program. If not, see <http://www.gnu.org/licenses/>. */ 15 16 #include "sdis.h" 17 #include "sdis_device_c.h" 18 #include "sdis_log.h" 19 20 #include <rsys/cstr.h> 21 #include <rsys/logger.h> 22 #include <rsys/mem_allocator.h> 23 #include <rsys/mutex.h> 24 25 #include <star/s2d.h> 26 #include <star/s3d.h> 27 #include <star/ssp.h> 28 #include <star/swf.h> 29 30 #include <omp.h> 31 32 #ifdef SDIS_ENABLE_MPI 33 #include <mpi.h> 34 #endif 35 36 /******************************************************************************* 37 * Helper functions 38 ******************************************************************************/ 39 #ifdef SDIS_ENABLE_MPI 40 41 static const char* 42 mpi_error_string(struct sdis_device* dev, const int mpi_err) 43 { 44 int res_mpi = MPI_SUCCESS; 45 int len; 46 ASSERT(dev); 47 48 res_mpi = MPI_Error_string(mpi_err, str_get(&dev->mpi_err_str), &len); 49 return res_mpi == MPI_SUCCESS 50 ? str_get(&dev->mpi_err_str) : "Invalid MPI error"; 51 } 52 53 static const char* 54 mpi_thread_support_string(const int val) 55 { 56 switch(val) { 57 case MPI_THREAD_SINGLE: return "MPI_THREAD_SINGLE"; 58 case MPI_THREAD_FUNNELED: return "MPI_THREAD_FUNNELED"; 59 case MPI_THREAD_SERIALIZED: return "MPI_THREAD_SERIALIZED"; 60 case MPI_THREAD_MULTIPLE: return "MPI_THREAD_MULTIPLE"; 61 default: FATAL("Unreachable code.\n"); break; 62 } 63 } 64 65 static res_T 66 mpi_print_proc_info(struct sdis_device* dev) 67 { 68 char proc_name[MPI_MAX_PROCESSOR_NAME]; 69 int proc_name_len; 70 char* proc_names = NULL; 71 uint32_t* proc_nthreads = NULL; 72 uint32_t nthreads = 0; 73 int iproc; 74 res_T res = RES_OK; 75 ASSERT(dev); 76 77 /* On process 0, allocate the arrays to stored gathered data */ 78 if(dev->mpi_rank == 0) { 79 80 /* Allocate the array to store the per process name */ 81 proc_names = MEM_CALLOC(dev->allocator, (size_t)dev->mpi_nprocs, 82 MPI_MAX_PROCESSOR_NAME*sizeof(*proc_names)); 83 if(!proc_names) { 84 res = RES_MEM_ERR; 85 log_err(dev, 86 "Could not allocate the temporary memory for MPI process names -- " 87 "%s.\n", res_to_cstr(res)); 88 goto error; 89 } 90 91 /* Allocate the array to store the per process #threads */ 92 proc_nthreads = MEM_CALLOC(dev->allocator, (size_t)dev->mpi_nprocs, 93 sizeof(*proc_nthreads)); 94 if(!proc_nthreads) { 95 res = RES_MEM_ERR; 96 log_err(dev, 97 "Could not allocate the temporary memory for the #threads of the MPI " 98 "processes -- %s.\n", res_to_cstr(res)); 99 goto error; 100 } 101 } 102 103 /* Gather the process name to the process 0 */ 104 MPI(Get_processor_name(proc_name, &proc_name_len)); 105 MPI(Gather(proc_name, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, proc_names, 106 MPI_MAX_PROCESSOR_NAME, MPI_CHAR, 0, MPI_COMM_WORLD)); 107 108 /* Gather the #threads to process 0*/ 109 nthreads = (uint32_t)dev->nthreads; 110 MPI(Gather(&nthreads, 1, MPI_UINT32_T, proc_nthreads, 1, MPI_UINT32_T, 0, 111 MPI_COMM_WORLD)); 112 113 if(dev->mpi_rank == 0) { 114 FOR_EACH(iproc, 0, dev->mpi_nprocs) { 115 log_info(dev, "Process %d -- %s; #threads: %u\n", 116 iproc, proc_names + iproc*MPI_MAX_PROCESSOR_NAME, proc_nthreads[iproc]); 117 } 118 } 119 120 exit: 121 if(proc_names) MEM_RM(dev->allocator, proc_names); 122 if(proc_nthreads) MEM_RM(dev->allocator, proc_nthreads); 123 return res; 124 error: 125 goto exit; 126 } 127 128 static res_T 129 mpi_init(struct sdis_device* dev) 130 { 131 int res_mpi = MPI_SUCCESS; 132 int is_init = 0; 133 int thread_support = 0; 134 res_T res = RES_OK; 135 ASSERT(dev); 136 137 #define CALL_MPI(Func, ErrMsg) { \ 138 res_mpi = MPI_##Func; \ 139 if(res_mpi != MPI_SUCCESS) { \ 140 log_err(dev, ErrMsg" - %s\n", mpi_error_string(dev, res_mpi)); \ 141 res = RES_UNKNOWN_ERR; \ 142 goto error; \ 143 } \ 144 } (void)0 145 146 CALL_MPI(Initialized(&is_init), 147 "Error querying the MPI init state"); 148 149 if(!is_init) { 150 log_err(dev, 151 "MPI is not initialized. The MPI_Init[_thread] function must be called " 152 "priorly to the creation of the Stardis device.\n"); 153 res = RES_BAD_OP; 154 goto error; 155 } 156 157 CALL_MPI(Query_thread(&thread_support), 158 "Error querying the MPI thread support"); 159 160 if(thread_support < MPI_THREAD_SERIALIZED) { 161 log_err(dev, 162 "The provided MPI implementation does not support serialized API calls " 163 "from multiple threads. The thread support is limited to %s.\n", 164 mpi_thread_support_string(thread_support)); 165 res = RES_BAD_OP; 166 goto error; 167 } 168 169 CALL_MPI(Comm_rank(MPI_COMM_WORLD, &dev->mpi_rank), 170 "Error retrieving the MPI rank"); 171 CALL_MPI(Comm_size(MPI_COMM_WORLD, &dev->mpi_nprocs), 172 "Error retrieving the size of the MPI group"); 173 174 #undef CALL_MPI 175 176 dev->mpi_mutex = mutex_create(); 177 if(!dev->mpi_mutex) { 178 log_err(dev, 179 "Error creating the mutex used to protect the MPI calls.\n"); 180 res = RES_MEM_ERR; 181 goto error; 182 } 183 184 mpi_print_proc_info(dev); 185 186 exit: 187 return res; 188 error: 189 if(dev->mpi_mutex) { 190 mutex_destroy(dev->mpi_mutex); 191 dev->mpi_mutex = NULL; 192 } 193 goto exit; 194 } 195 196 #endif /* SDIS_ENABLE_MPI */ 197 198 static INLINE int 199 check_sdis_device_create_args(const struct sdis_device_create_args* args) 200 { 201 return args && args->nthreads_hint != 0; 202 } 203 204 static INLINE res_T 205 setup_logger 206 (struct sdis_device* dev, 207 const struct sdis_device_create_args* args) 208 { 209 ASSERT(dev && args); 210 if(args->logger) { 211 dev->logger = args->logger; 212 } else { 213 setup_log_default(dev); 214 } 215 return RES_OK; 216 } 217 218 static INLINE res_T 219 setup_star2d(struct sdis_device* dev) 220 { 221 res_T res = RES_OK; 222 ASSERT(dev); 223 res = s2d_device_create(dev->logger, dev->allocator, 0, &dev->s2d_dev); 224 if(res != RES_OK) { 225 log_err(dev, 226 "Could not create the Star-2D device for Stardis-Solver -- %s.\n", 227 res_to_cstr(res)); 228 goto error; 229 } 230 exit: 231 return res; 232 error: 233 goto exit; 234 } 235 236 static INLINE res_T 237 setup_star3d(struct sdis_device* dev) 238 { 239 res_T res = RES_OK; 240 ASSERT(dev); 241 res = s3d_device_create(dev->logger, dev->allocator, 0, &dev->s3d_dev); 242 if(res != RES_OK) { 243 log_err(dev, 244 "Could not create the Star-3D device for Stardis-Solver -- %s.\n", 245 res_to_cstr(res)); 246 goto error; 247 } 248 exit: 249 return res; 250 error: 251 goto exit; 252 } 253 254 static INLINE res_T 255 setup_starwf(struct sdis_device* dev) 256 { 257 struct swf_H_tabulate_args H2d_args = SWF_H2D_TABULATE_ARGS_DEFAULT; 258 struct swf_H_tabulate_args H3d_args = SWF_H3D_TABULATE_ARGS_DEFAULT; 259 res_T res = RES_OK; 260 ASSERT(dev); 261 262 H2d_args.allocator = dev->allocator; 263 H3d_args.allocator = dev->allocator; 264 265 res = swf_H2d_tabulate(&H2d_args, &dev->H_2d); 266 if(res != RES_OK) { 267 log_err(dev, "Unable to tabulate H2d function -- %s.\n", 268 res_to_cstr(res)); 269 goto error; 270 } 271 272 res = swf_H3d_tabulate(&H3d_args, &dev->H_3d); 273 if(res != RES_OK) { 274 log_err(dev, "Unable to tabulate H3d function -- %s.\n", 275 res_to_cstr(res)); 276 goto error; 277 } 278 279 exit: 280 return res; 281 error: 282 goto exit; 283 } 284 285 static INLINE res_T 286 setup_mpi(struct sdis_device* dev, const struct sdis_device_create_args* args) 287 { 288 ASSERT(dev && args); 289 #ifdef SDIS_ENABLE_MPI 290 dev->use_mpi = args->use_mpi; 291 if(args->use_mpi) { 292 const res_T res = mpi_init(dev); 293 if(res != RES_OK) return res; 294 } 295 #else 296 if(args->use_mpi) { 297 log_warn(dev, 298 "Stardis-Solver is built without the support of the Message Passing " 299 "Interface. MPI cannot be used for parallel computations.\n"); 300 } 301 #endif 302 return RES_OK; 303 304 } 305 306 static void 307 device_release(ref_T* ref) 308 { 309 struct sdis_device* dev; 310 ASSERT(ref); 311 dev = CONTAINER_OF(ref, struct sdis_device, ref); 312 if(dev->s2d_dev) S2D(device_ref_put(dev->s2d_dev)); 313 if(dev->s3d_dev) S3D(device_ref_put(dev->s3d_dev)); 314 if(dev->H_2d) SWF(tabulation_ref_put(dev->H_2d)); 315 if(dev->H_3d) SWF(tabulation_ref_put(dev->H_3d)); 316 if(dev->logger == &dev->logger__) logger_release(&dev->logger__); 317 ASSERT(flist_name_is_empty(&dev->interfaces_names)); 318 ASSERT(flist_name_is_empty(&dev->media_names)); 319 ASSERT(flist_name_is_empty(&dev->source_names)); 320 flist_name_release(&dev->interfaces_names); 321 flist_name_release(&dev->media_names); 322 flist_name_release(&dev->source_names); 323 #ifdef SDIS_ENABLE_MPI 324 if(dev->mpi_mutex) mutex_destroy(dev->mpi_mutex); 325 str_release(&dev->mpi_err_str); 326 #endif 327 MEM_RM(dev->allocator, dev); 328 } 329 330 /******************************************************************************* 331 * Exported functions 332 ******************************************************************************/ 333 res_T 334 sdis_device_create 335 (const struct sdis_device_create_args* args, 336 struct sdis_device** out_dev) 337 { 338 struct sdis_device* dev = NULL; 339 struct mem_allocator* allocator = NULL; 340 unsigned nthreads_max = 0; 341 res_T res = RES_OK; 342 343 if(!check_sdis_device_create_args(args) || !out_dev) { 344 res = RES_BAD_ARG; 345 goto error; 346 } 347 348 allocator = args->allocator ? args->allocator : &mem_default_allocator; 349 dev = MEM_CALLOC(allocator, 1, sizeof(struct sdis_device)); 350 if(!dev) { 351 if(args->verbosity) { 352 #define ERR_STR STR(FUNC_NAME)": could not allocate the Stardis device -- %s." 353 if(args->logger) { 354 logger_print(args->logger, LOG_ERROR, ERR_STR, res_to_cstr(res)); 355 } else { 356 fprintf(stderr, MSG_ERROR_PREFIX ERR_STR, res_to_cstr(res)); 357 } 358 #undef ERR_STR 359 } 360 res = RES_MEM_ERR; 361 goto error; 362 } 363 nthreads_max = (unsigned)MMAX(omp_get_max_threads(), omp_get_num_procs()); 364 dev->allocator = allocator; 365 dev->no_escape_sequence = args->no_escape_sequence; 366 dev->verbose = args->verbosity; 367 dev->nthreads = MMIN(args->nthreads_hint, nthreads_max); 368 ref_init(&dev->ref); 369 flist_name_init(allocator, &dev->interfaces_names); 370 flist_name_init(allocator, &dev->media_names); 371 flist_name_init(allocator, &dev->source_names); 372 #ifdef SDIS_ENABLE_MPI 373 str_init(allocator, &dev->mpi_err_str); 374 #endif 375 376 res = setup_logger(dev, args); 377 if(res != RES_OK) goto error; 378 res = setup_star2d(dev); 379 if(res != RES_OK) goto error; 380 res = setup_star3d(dev); 381 if(res != RES_OK) goto error; 382 res = setup_starwf(dev); 383 if(res != RES_OK) goto error; 384 res = setup_mpi(dev, args); 385 if(res != RES_OK) goto error; 386 387 log_info(dev, "Using %lu %s.\n", (unsigned long)dev->nthreads, 388 dev->nthreads == 1 ? "thread" : "threads"); 389 390 exit: 391 if(out_dev) *out_dev = dev; 392 return res; 393 error: 394 if(dev) { SDIS(device_ref_put(dev)); dev = NULL; } 395 goto exit; 396 } 397 398 res_T 399 sdis_device_ref_get(struct sdis_device* dev) 400 { 401 if(!dev) return RES_BAD_ARG; 402 ref_get(&dev->ref); 403 return RES_OK; 404 } 405 406 res_T 407 sdis_device_ref_put(struct sdis_device* dev) 408 { 409 if(!dev) return RES_BAD_ARG; 410 ref_put(&dev->ref, device_release); 411 return RES_OK; 412 } 413 414 res_T 415 sdis_device_is_mpi_used(struct sdis_device* dev, int* is_mpi_used) 416 { 417 if(!dev || !is_mpi_used) return RES_BAD_ARG; 418 #ifndef SDIS_ENABLE_MPI 419 *is_mpi_used = 0; 420 #else 421 *is_mpi_used = dev->use_mpi; 422 #endif 423 return RES_OK; 424 } 425 426 res_T 427 sdis_device_get_mpi_rank(struct sdis_device* dev, int* rank) 428 { 429 #ifndef SDIS_ENABLE_MPI 430 (void)dev, (void)rank; 431 return RES_BAD_OP; 432 #else 433 if(!dev || !rank) return RES_BAD_ARG; 434 if(!dev->use_mpi) return RES_BAD_OP; 435 ASSERT(dev->mpi_rank >= 0); 436 *rank = dev->mpi_rank; 437 return RES_OK; 438 #endif 439 } 440 441 /******************************************************************************* 442 * Local functions 443 ******************************************************************************/ 444 res_T 445 create_rng_from_rng_proxy 446 (struct sdis_device* dev, 447 const struct ssp_rng_proxy* proxy, 448 struct ssp_rng** out_rng) 449 { 450 enum ssp_rng_type rng_type; 451 struct ssp_rng* rng = NULL; 452 FILE* stream = NULL; 453 res_T res = RES_OK; 454 ASSERT(dev && proxy && out_rng); 455 456 stream = tmpfile(); 457 if(!stream) { 458 log_err(dev, 459 "Could not open a temporary stream to store the RNG state.\n"); 460 res = RES_IO_ERR; 461 goto error; 462 } 463 464 SSP(rng_proxy_get_type(proxy, &rng_type)); 465 res = ssp_rng_create(dev->allocator, rng_type, &rng); 466 if(res != RES_OK) { 467 log_err(dev, "Could not create the RNG -- %s\n", res_to_cstr(res)); 468 goto error; 469 } 470 471 res = ssp_rng_proxy_write(proxy, stream); 472 if(res != RES_OK) { 473 log_err(dev, "Could not serialize the RNG state -- %s\n", 474 res_to_cstr(res)); 475 goto error; 476 } 477 478 rewind(stream); 479 res = ssp_rng_read(rng, stream); 480 if(res != RES_OK) { 481 log_err(dev, "Could not read the serialized RNG state -- %s\n", 482 res_to_cstr(res)); 483 goto error; 484 } 485 486 exit: 487 if(out_rng) *out_rng = rng; 488 if(stream) fclose(stream); 489 return res; 490 error: 491 if(rng) { 492 SSP(rng_ref_put(rng)); 493 rng = NULL; 494 } 495 goto exit; 496 }