sdis.c (29640B)
1 /* Copyright (C) 2016-2025 |Méso|Star> (contact@meso-star.com) 2 * 3 * This program is free software: you can redistribute it and/or modify 4 * it under the terms of the GNU General Public License as published by 5 * the Free Software Foundation, either version 3 of the License, or 6 * (at your option) any later version. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public License 14 * along with this program. If not, see <http://www.gnu.org/licenses/>. */ 15 16 #define _POSIX_C_SOURCE 200112L 17 18 #include "sdis.h" 19 #include "sdis_c.h" 20 #include "sdis_device_c.h" 21 #include "sdis_estimator_c.h" 22 #include "sdis_green.h" 23 #include "sdis_log.h" 24 #include "sdis_misc.h" 25 #include "sdis_scene_c.h" 26 #ifdef SDIS_ENABLE_MPI 27 #include "sdis_mpi.h" 28 #endif 29 30 #include <star/ssp.h> 31 32 #include <rsys/cstr.h> 33 #include <rsys/clock_time.h> 34 #include <rsys/mem_allocator.h> 35 36 #include <errno.h> 37 38 /* Number random numbers in a sequence, i.e. number of consecutive random 39 * numbers that can be used by a thread */ 40 #define RNG_SEQUENCE_SIZE 100000 41 42 /******************************************************************************* 43 * Helper functions 44 ******************************************************************************/ 45 static INLINE int 46 check_accum_message(const enum mpi_sdis_message msg) 47 { 48 return msg == MPI_SDIS_MSG_ACCUM_TEMP 49 || msg == MPI_SDIS_MSG_ACCUM_TIME 50 || msg == MPI_SDIS_MSG_ACCUM_FLUX_CONVECTIVE 51 || msg == MPI_SDIS_MSG_ACCUM_FLUX_IMPOSED 52 || msg == MPI_SDIS_MSG_ACCUM_FLUX_RADIATIVE 53 || msg == MPI_SDIS_MSG_ACCUM_FLUX_TOTAL 54 || msg == MPI_SDIS_MSG_ACCUM_MEAN_POWER; 55 } 56 57 static res_T 58 gather_green_functions_no_mpi 59 (struct sdis_scene* scn, 60 struct ssp_rng_proxy* rng_proxy, 61 struct sdis_green_function* per_thread_green[], 62 const struct accum* per_thread_acc_time, 63 struct sdis_green_function** out_green) 64 { 65 struct sdis_green_function* green = NULL; 66 struct accum acc_time = ACCUM_NULL; 67 res_T res = RES_OK; 68 ASSERT(scn && rng_proxy && per_thread_green && per_thread_acc_time); 69 ASSERT(out_green); 70 71 /* Redux the per thread green function into the green function of the 1st 72 * thread */ 73 res = green_function_redux_and_clear 74 (per_thread_green[0], per_thread_green+1, scn->dev->nthreads-1); 75 if(res != RES_OK) goto error; 76 77 /* Return the green of the 1st thread */ 78 SDIS(green_function_ref_get(per_thread_green[0])); 79 green = per_thread_green[0]; 80 81 res = gather_accumulators 82 (scn->dev, MPI_SDIS_MSG_ACCUM_TIME, per_thread_acc_time, &acc_time); 83 if(res != RES_OK) goto error; 84 85 /* Finalize the estimated green */ 86 res = green_function_finalize(green, rng_proxy, &acc_time); 87 if(res != RES_OK) goto error; 88 89 exit: 90 *out_green = green; 91 return res; 92 error: 93 if(green) { SDIS(green_function_ref_put(green)); green = NULL; } 94 goto exit; 95 } 96 97 #ifdef SDIS_ENABLE_MPI 98 static void 99 rewind_progress_printing(struct sdis_device* dev) 100 { 101 size_t i; 102 103 if(!dev->use_mpi 104 || dev->no_escape_sequence 105 || dev->mpi_nprocs == 1) 106 return; 107 108 FOR_EACH(i, 0, (size_t)(dev->mpi_nprocs-1)) { 109 log_info(dev, "\033[1A\r"); /* Move up */ 110 } 111 } 112 113 static res_T 114 send_green_function_to_master_process 115 (struct sdis_device* dev, 116 struct sdis_green_function* green) 117 { 118 char buf[128]; 119 FILE* stream = NULL; /* Temp file that stores the serialized green function */ 120 void* data = NULL; /* Pointer to serialized green function data */ 121 long sz = 0; /* Size in Bytes of the serialized green function data */ 122 res_T res = RES_OK; 123 ASSERT(dev && green && dev->mpi_rank != 0); 124 125 /* Open a stream to store the serialized green function */ 126 stream = tmpfile(); 127 if(!stream) { 128 log_err(dev, 129 "Could not open the stream used to temporary store the green function " 130 "before it is sent to the master process.\n"); 131 res = RES_IO_ERR; 132 goto error; 133 } 134 135 /* Write the green function into the stream */ 136 res = sdis_green_function_write(green, stream); 137 if(res != RES_OK) goto error; 138 139 /* Fetch the size of the serialized data */ 140 sz = ftell(stream); 141 if(sz == -1) { 142 strerror_r(errno, buf, sizeof(buf)); 143 log_err(dev, 144 "Could not query the size of the serialized green function data to sent " 145 "to the master process -- %s.\n", buf); 146 res = RES_IO_ERR; 147 goto error; 148 } 149 if(sz > INT_MAX) { 150 log_err(dev, 151 "The size of the green function data is too large. It must be less than " 152 "%d Mebabytes while it is of %ld MegabBytes.\n", 153 INT_MAX / (1024*1024), sz/(1024*1024)); 154 res = RES_MEM_ERR; 155 goto error; 156 } 157 158 data = MEM_CALLOC(dev->allocator, 1, (size_t)sz); 159 if(!data) { 160 log_err(dev, "Could not allocate the memory to store the serialized green " 161 "function before it is sent to the master process.\n"); 162 res = RES_MEM_ERR; 163 goto error; 164 } 165 166 /* Load in memory the serialized data */ 167 rewind(stream); 168 if(fread(data, (size_t)sz, 1, stream) != 1) { 169 log_err(dev, 170 "Could not read the serialized green function data from the temporary " 171 "stream before it is sent to the master process.\n"); 172 res = RES_IO_ERR; 173 goto error; 174 } 175 176 /* Send the serialized data to the master process */ 177 mutex_lock(dev->mpi_mutex); 178 MPI(Send(data, (int)sz, MPI_CHAR, 0/*Dst*/, MPI_SDIS_MSG_GREEN_FUNCTION, 179 MPI_COMM_WORLD)); 180 mutex_unlock(dev->mpi_mutex); 181 182 exit: 183 if(stream) CHK(fclose(stream) == 0); 184 if(data) MEM_RM(dev->allocator, data); 185 return RES_OK; 186 error: 187 goto exit; 188 } 189 190 static res_T 191 gather_green_functions_from_non_master_process 192 (struct sdis_scene* scn, 193 struct sdis_green_function* greens[]) 194 { 195 struct sdis_green_function_create_from_stream_args green_args = 196 SDIS_GREEN_FUNCTION_CREATE_FROM_STREAM_ARGS_DEFAULT; 197 198 void* data = NULL; /* Pointer to gathered serialized green function data */ 199 FILE* stream = NULL; /* Temp file that stores the serialized green function */ 200 int iproc; 201 res_T res = RES_OK; 202 ASSERT(scn->dev && greens && scn->dev->mpi_rank == 0); 203 204 FOR_EACH(iproc, 1, scn->dev->mpi_nprocs) { 205 MPI_Request req; 206 MPI_Status status; 207 int count; 208 209 /* Waiting for the serialized green function sent by the process `iproc'*/ 210 mpi_waiting_for_message 211 (scn->dev, iproc, MPI_SDIS_MSG_GREEN_FUNCTION, &status); 212 213 /* Fetch the sizeof the green function sent by the process `iproc' */ 214 mutex_lock(scn->dev->mpi_mutex); 215 MPI(Get_count(&status, MPI_CHAR, &count)); 216 mutex_unlock(scn->dev->mpi_mutex); 217 218 /* Allocate the memory to store the serialized green function sent by the 219 * process `iproc' */ 220 data = MEM_REALLOC(scn->dev->allocator, data, (size_t)count); 221 if(!data) { 222 log_err(scn->dev, 223 "Could not allocate %d bytes to store the serialized green function " 224 "sent by the process %d.\n", 225 count, iproc); 226 res = RES_MEM_ERR; 227 goto error; 228 } 229 230 /* Asynchronously receive the green function */ 231 mutex_lock(scn->dev->mpi_mutex); 232 MPI(Irecv(data, count, MPI_CHAR, iproc, MPI_SDIS_MSG_GREEN_FUNCTION, 233 MPI_COMM_WORLD, &req)); 234 mutex_unlock(scn->dev->mpi_mutex); 235 mpi_waiting_for_request(scn->dev, &req); 236 237 /* Open a stream to store the serialized green function */ 238 stream = tmpfile(); 239 if(!stream) { 240 log_err(scn->dev, 241 "Could not open the stream used to temporary store the green function " 242 "sent by the process %d.\n", iproc); 243 res = RES_IO_ERR; 244 goto error; 245 } 246 247 if(fwrite(data, (size_t)count, 1, stream) != 1) { 248 log_err(scn->dev, 249 "Could not write the green function sent by the process %d into the " 250 "temporary stream.\n", iproc); 251 res = RES_IO_ERR; 252 goto error; 253 } 254 255 /* Deserialized the green function of the process `iproc'. Note that the 256 * number of green functions to output is #procs - 1. Since we 257 * iterate over the indices of non master processes in [1, #procs], 258 * the index the green function to deserialized is iproc - 1 */ 259 rewind(stream); 260 green_args.scene = scn; 261 green_args.stream = stream; 262 res = sdis_green_function_create_from_stream(&green_args, &greens[iproc-1]); 263 if(res != RES_OK) { 264 log_err(scn->dev, 265 "Error deserializing the green function sent by the process %d -- %s.\n", 266 iproc, res_to_cstr(res)); 267 goto error; 268 } 269 270 CHK(fclose(stream) == 0); 271 stream = NULL; 272 } 273 274 exit: 275 if(data) MEM_RM(scn->dev->allocator, data); 276 if(stream) CHK(fclose(stream) == 0); 277 return res; 278 error: 279 goto exit; 280 } 281 #endif 282 283 /******************************************************************************* 284 * Exported function 285 ******************************************************************************/ 286 res_T 287 sdis_get_info(struct sdis_info* info) 288 { 289 if(!info) return RES_BAD_ARG; 290 *info = SDIS_INFO_NULL; 291 #ifdef SDIS_ENABLE_MPI 292 info->mpi_enabled = 1; 293 #else 294 info->mpi_enabled = 0; 295 #endif 296 return RES_OK; 297 } 298 299 /******************************************************************************* 300 * Local functions 301 ******************************************************************************/ 302 res_T 303 create_per_thread_rng 304 (struct sdis_device* dev, 305 struct ssp_rng* rng_state, 306 const enum ssp_rng_type rng_type, 307 struct ssp_rng_proxy** out_proxy, 308 struct ssp_rng** out_rngs[]) 309 { 310 struct ssp_rng_proxy_create2_args proxy_args = SSP_RNG_PROXY_CREATE2_ARGS_NULL; 311 struct ssp_rng_proxy* proxy = NULL; 312 struct ssp_rng** rngs = NULL; 313 size_t i; 314 res_T res = RES_OK; 315 ASSERT(dev && out_proxy && out_rngs); 316 317 rngs = MEM_CALLOC(dev->allocator, dev->nthreads, sizeof(*rngs)); 318 if(!rngs) { 319 log_err(dev, "Could not allocate the list of per thread RNG.\n"); 320 res = RES_MEM_ERR; 321 goto error; 322 } 323 324 /* Create the RNG proxy */ 325 proxy_args.rng= rng_state; 326 proxy_args.type = rng_type; 327 proxy_args.nbuckets = dev->nthreads; 328 #ifdef SDIS_ENABLE_MPI 329 if(dev->use_mpi) { 330 proxy_args.sequence_size = RNG_SEQUENCE_SIZE; 331 proxy_args.sequence_offset = RNG_SEQUENCE_SIZE * (size_t)dev->mpi_rank; 332 proxy_args.sequence_pitch = RNG_SEQUENCE_SIZE * (size_t)dev->mpi_nprocs; 333 } else 334 #endif 335 { 336 proxy_args.sequence_size = RNG_SEQUENCE_SIZE; 337 proxy_args.sequence_offset = 0; 338 proxy_args.sequence_pitch = RNG_SEQUENCE_SIZE; 339 } 340 res = ssp_rng_proxy_create2(dev->allocator, &proxy_args, &proxy); 341 if(res != RES_OK) goto error; 342 343 /* Query the RNG proxy to create the per thread RNGs */ 344 FOR_EACH(i, 0, dev->nthreads) { 345 res = ssp_rng_proxy_create_rng(proxy, i, &rngs[i]); 346 if(res != RES_OK) goto error; 347 } 348 349 exit: 350 *out_rngs = rngs; 351 *out_proxy = proxy; 352 return res; 353 error: 354 if(rngs) { release_per_thread_rng(dev, rngs); rngs = NULL; } 355 if(proxy) { SSP(rng_proxy_ref_put(proxy)); proxy = NULL; } 356 goto exit; 357 } 358 359 void 360 release_per_thread_rng(struct sdis_device* dev, struct ssp_rng* rngs[]) 361 { 362 size_t i; 363 ASSERT(dev); 364 if(!rngs) return; 365 FOR_EACH(i, 0, dev->nthreads) { if(rngs[i]) SSP(rng_ref_put(rngs[i])); } 366 MEM_RM(dev->allocator, rngs); 367 } 368 369 res_T 370 create_per_thread_green_function 371 (struct sdis_scene* scn, 372 const hash256_T signature, 373 struct sdis_green_function** out_greens[]) 374 { 375 struct sdis_green_function** greens = NULL; 376 size_t i; 377 res_T res = RES_OK; 378 ASSERT(scn && out_greens); 379 380 greens = MEM_CALLOC(scn->dev->allocator, scn->dev->nthreads, sizeof(*greens)); 381 if(!greens) { 382 log_err(scn->dev, 383 "Could not allocate the list of per thread green function.\n"); 384 res = RES_MEM_ERR; 385 goto error; 386 } 387 388 FOR_EACH(i, 0, scn->dev->nthreads) { 389 res = green_function_create(scn, signature, &greens[i]); 390 if(res != RES_OK) goto error; 391 } 392 393 exit: 394 *out_greens = greens; 395 return res; 396 error: 397 if(greens) { 398 release_per_thread_green_function(scn, greens); 399 greens = NULL; 400 } 401 goto exit; 402 } 403 404 void 405 release_per_thread_green_function 406 (struct sdis_scene* scn, 407 struct sdis_green_function* greens[]) 408 { 409 size_t i; 410 ASSERT(greens); 411 FOR_EACH(i, 0, scn->dev->nthreads) { 412 if(greens[i]) SDIS(green_function_ref_put(greens[i])); 413 } 414 MEM_RM(scn->dev->allocator, greens); 415 } 416 417 res_T 418 alloc_process_progress(struct sdis_device* dev, int32_t** out_progress) 419 { 420 int32_t* progress = NULL; 421 size_t nprocs; 422 res_T res = RES_OK; 423 ASSERT(dev && out_progress); 424 425 #ifdef SDIS_ENABLE_MPI 426 if(dev->use_mpi) { 427 nprocs = (size_t)dev->mpi_nprocs; 428 } else 429 #endif 430 { 431 nprocs = 1; 432 } 433 progress = MEM_CALLOC(dev->allocator, nprocs, sizeof(*progress)); 434 if(!progress) { 435 log_err(dev,"Could not allocate the list of per process progress status.\n"); 436 res = RES_MEM_ERR; 437 goto error; 438 } 439 440 exit: 441 *out_progress = progress; 442 return res; 443 error: 444 if(progress) { MEM_RM(dev->allocator, progress); progress = NULL; } 445 goto exit; 446 } 447 448 void 449 free_process_progress(struct sdis_device* dev, int32_t progress[]) 450 { 451 ASSERT(dev && progress); 452 MEM_RM(dev->allocator, progress); 453 } 454 455 size_t 456 compute_process_index_range 457 (const struct sdis_device* dev, 458 const size_t nindices, 459 size_t range[2]) 460 { 461 #ifndef SDIS_ENABLE_MPI 462 (void)dev; 463 range[0] = 0; 464 range[1] = nindices; /* Upper bound is _exclusive_ */ 465 #else 466 ASSERT(dev); 467 468 if(!dev->use_mpi) { 469 range[0] = 0; 470 range[1] = nindices; 471 } else { 472 size_t per_process_indices = 0; 473 size_t remaining_indices = 0; 474 475 /* Compute the minimum number of indices on each process */ 476 per_process_indices = nindices / (size_t)dev->mpi_nprocs; 477 478 range[0] = per_process_indices * (size_t)dev->mpi_rank; 479 range[1] = range[0] + per_process_indices; /* Upper bound is _exclusive_ */ 480 ASSERT(range[0] <= range[1]); 481 482 /* Set the remaining number of indices that are not managed by one process */ 483 remaining_indices = 484 nindices - per_process_indices * (size_t)dev->mpi_nprocs; 485 486 /* Distribute the remaining indices among the processes. Each process whose 487 * rank is lower than the number of remaining indices takes an additional 488 * index. To ensure continuity of indices per process, subsequent processes 489 * shift their initial rank accordingly, i.e. process 1 shifts its indices 490 * by 1, process 2 shifts them by 2 and so on until there are no more 491 * indices to distribute. From then on, subsequent processes simply shift 492 * their index range by the number of remaining indices that have been 493 * distributed. */ 494 if((size_t)dev->mpi_rank < remaining_indices) { 495 range[0] += (size_t)dev->mpi_rank; 496 range[1] += (size_t)dev->mpi_rank + 1/* Take one more index */; 497 } else { 498 range[0] += remaining_indices; 499 range[1] += remaining_indices; 500 } 501 } 502 #endif 503 return range[1] - range[0]; 504 } 505 506 #ifndef SDIS_ENABLE_MPI 507 res_T 508 gather_accumulators 509 (struct sdis_device* dev, 510 const enum mpi_sdis_message msg, 511 const struct accum* per_thread_acc, 512 struct accum* acc) 513 { 514 (void)msg; 515 ASSERT(dev); 516 /* Gather thread accumulators */ 517 sum_accums(per_thread_acc, dev->nthreads, acc); 518 return RES_OK; 519 } 520 #endif 521 522 #ifdef SDIS_ENABLE_MPI 523 res_T 524 gather_accumulators 525 (struct sdis_device* dev, 526 const enum mpi_sdis_message msg, 527 const struct accum* per_thread_acc, 528 struct accum* acc) 529 { 530 struct accum* per_proc_acc = NULL; 531 size_t nprocs = 0; 532 res_T res = RES_OK; 533 ASSERT(dev && per_thread_acc && acc && check_accum_message(msg)); 534 535 if(!dev->use_mpi) { 536 /* Gather thread accumulators */ 537 sum_accums(per_thread_acc, dev->nthreads, acc); 538 goto exit; 539 } 540 541 nprocs = dev->mpi_rank == 0 ? (size_t)dev->mpi_nprocs : 1; 542 per_proc_acc = MEM_CALLOC(dev->allocator, nprocs, sizeof(struct accum)); 543 if(!per_proc_acc) { res = RES_MEM_ERR; goto error; } 544 545 /* Gather thread accumulators */ 546 sum_accums(per_thread_acc, dev->nthreads, &per_proc_acc[0]); 547 548 /* Non master process */ 549 if(dev->mpi_rank != 0) { 550 551 /* Send the accumulator to the master process */ 552 mutex_lock(dev->mpi_mutex); 553 MPI(Send(&per_proc_acc[0], sizeof(per_proc_acc[0]), MPI_CHAR, 0/*Dst*/, 554 msg, MPI_COMM_WORLD)); 555 mutex_unlock(dev->mpi_mutex); 556 557 *acc = per_proc_acc[0]; 558 559 /* Master process */ 560 } else { 561 int iproc; 562 563 /* Gather process accumulators */ 564 FOR_EACH(iproc, 1, dev->mpi_nprocs) { 565 MPI_Request req; 566 567 /* Asynchronously receive the accumulator of `iproc' */ 568 mutex_lock(dev->mpi_mutex); 569 MPI(Irecv(&per_proc_acc[iproc], sizeof(per_proc_acc[iproc]), MPI_CHAR, 570 iproc, msg, MPI_COMM_WORLD, &req)); 571 mutex_unlock(dev->mpi_mutex); 572 mpi_waiting_for_request(dev, &req); 573 } 574 575 /* Sum the process accumulators */ 576 sum_accums(per_proc_acc, (size_t)dev->mpi_nprocs, acc); 577 } 578 579 exit: 580 if(per_proc_acc) MEM_RM(dev->allocator, per_proc_acc); 581 return res; 582 error: 583 goto exit; 584 } 585 #endif /* SDIS_ENABLE_MPI */ 586 587 #ifndef SDIS_ENABLE_MPI 588 res_T 589 gather_accumulators_list 590 (struct sdis_device* dev, 591 const enum mpi_sdis_message msg, 592 const size_t nprobes, /* Total number of probes */ 593 const size_t process_probes[2], /* Ids of the probes managed by the process */ 594 struct accum* per_probe_acc) /* List of per probe accumulators */ 595 { 596 (void)dev, (void)msg, (void) nprobes; 597 (void)process_probes, (void)per_probe_acc; 598 return RES_OK; 599 } 600 #else 601 res_T 602 gather_accumulators_list 603 (struct sdis_device* dev, 604 const enum mpi_sdis_message msg, 605 const size_t nprobes, /* Total number of probes */ 606 const size_t process_probes[2], /* Range of probes managed by the process */ 607 struct accum* per_probe_acc) /* List of per probe accumulators */ 608 { 609 struct accum_list { 610 size_t size; 611 /* Simulate a C99 flexible array */ 612 ALIGN(16) struct accum accums[1/*Dummy element*/]; 613 }* accum_list = NULL; 614 size_t max_per_process_nprobes = 0; /* Maximum #probes per process */ 615 size_t process_nprobes = 0; /* Number of process probes */ 616 size_t msg_sz = 0; /* Size in bytes of the message to send */ 617 res_T res = RES_OK; 618 619 /* Check pre-conditions */ 620 ASSERT(dev); 621 ASSERT(process_nprobes == 0 || (process_probes && per_probe_acc)); 622 623 /* Without MPI, do nothing since per_probe_acc already has all the 624 * accumulators */ 625 if(!dev->use_mpi) goto exit; 626 627 /* Defines the maximum number of probes managed by a process. In fact, it's 628 * the number of probes divided by the number of processes, plus one to manage 629 * the remainder of the entire division: the remaining probes are distributed 630 * between the processes */ 631 max_per_process_nprobes = nprobes/(size_t)dev->mpi_nprocs + 1; 632 633 /* Number of probes */ 634 process_nprobes = process_probes[1] - process_probes[0]; 635 636 /* Allocate the array into which the data to be collected is copied */ 637 msg_sz = 638 sizeof(struct accum_list) 639 + sizeof(struct accum)*max_per_process_nprobes 640 - 1/*Dummy element */; 641 if(msg_sz > INT_MAX) { 642 log_err(dev, "%s: invalid MPI message size %lu.\n", 643 FUNC_NAME, (unsigned long)msg_sz); 644 res = RES_BAD_ARG; 645 goto error; 646 } 647 648 accum_list = MEM_CALLOC(dev->allocator, 1, msg_sz); 649 if(!accum_list) { 650 log_err(dev, 651 "%s: unable to allocate the temporary list of accumulators.\n", 652 FUNC_NAME); 653 res = RES_MEM_ERR; 654 goto error; 655 } 656 657 /* Non master process */ 658 if(dev->mpi_rank != 0) { 659 660 /* Setup the message to be sent */ 661 accum_list->size = process_nprobes; 662 memcpy(accum_list->accums, per_probe_acc, 663 sizeof(struct accum)*process_nprobes); 664 665 mutex_lock(dev->mpi_mutex); 666 MPI(Send(accum_list, (int)msg_sz, MPI_CHAR, 0/*Dst*/, msg, MPI_COMM_WORLD)); 667 mutex_unlock(dev->mpi_mutex); 668 669 /* Master process */ 670 } else { 671 size_t gathered_nprobes = process_nprobes; 672 int iproc; 673 674 FOR_EACH(iproc, 1, dev->mpi_nprocs) { 675 MPI_Request req; 676 677 /* Asynchronously receive the accumulator of `iproc' */ 678 mutex_lock(dev->mpi_mutex); 679 MPI(Irecv 680 (accum_list, (int)msg_sz, MPI_CHAR, iproc, msg, MPI_COMM_WORLD, &req)); 681 mutex_unlock(dev->mpi_mutex); 682 683 mpi_waiting_for_request(dev, &req); 684 685 memcpy(per_probe_acc+gathered_nprobes, accum_list->accums, 686 sizeof(struct accum)*accum_list->size); 687 688 gathered_nprobes += accum_list->size; 689 } 690 } 691 692 exit: 693 if(accum_list) MEM_RM(dev->allocator, accum_list); 694 return res; 695 error: 696 goto exit; 697 } 698 #endif 699 700 #ifndef SDIS_ENABLE_MPI 701 res_T 702 gather_green_functions 703 (struct sdis_scene* scn, 704 struct ssp_rng_proxy* rng_proxy, 705 struct sdis_green_function* per_thread_green[], 706 const struct accum* per_thread_acc_time, 707 struct sdis_green_function** out_green) 708 { 709 return gather_green_functions_no_mpi 710 (scn, rng_proxy, per_thread_green, per_thread_acc_time, out_green); 711 } 712 #else 713 res_T 714 gather_green_functions 715 (struct sdis_scene* scn, 716 struct ssp_rng_proxy* rng_proxy, 717 struct sdis_green_function* per_thread_green[], 718 const struct accum* per_thread_acc_time, 719 struct sdis_green_function** out_green) 720 { 721 struct accum acc_time = ACCUM_NULL; 722 struct sdis_green_function* green = NULL; 723 struct sdis_green_function** per_proc_green = NULL; 724 unsigned ithread; 725 int iproc; 726 res_T res = RES_OK; 727 ASSERT(scn && per_thread_green && out_green); 728 729 if(!scn->dev->use_mpi) { 730 return gather_green_functions_no_mpi 731 (scn, rng_proxy, per_thread_green, per_thread_acc_time, out_green); 732 goto exit; 733 } 734 735 /* Redux the per thread green function into the green function of the 1st 736 * thread */ 737 res = green_function_redux_and_clear 738 (per_thread_green[0], per_thread_green+1, scn->dev->nthreads-1); 739 if(res != RES_OK) goto error; 740 741 /* Gather the accumulators. The master process gathers all accumulators and 742 * non master process gather their per thread accumulators only that is 743 * sent to the master process */ 744 res = gather_accumulators 745 (scn->dev, MPI_SDIS_MSG_ACCUM_TIME, per_thread_acc_time, &acc_time); 746 if(res != RES_OK) goto error; 747 748 /* Non master process */ 749 if(scn->dev->mpi_rank != 0) { 750 /* Return the green of the 1st thread */ 751 SDIS(green_function_ref_get(per_thread_green[0])); 752 green = per_thread_green[0]; 753 754 /* We have to finalize the green function priorly to its sent to the master 755 * process. Its serialization failed without it. */ 756 res = green_function_finalize(green, rng_proxy, &acc_time); 757 if(res != RES_OK) goto error; 758 759 res = send_green_function_to_master_process(scn->dev, green); 760 if(res != RES_OK) goto error; 761 762 /* Master process */ 763 } else { 764 /* Allocate the list of per process green functions */ 765 per_proc_green = MEM_CALLOC(scn->dev->allocator, 766 (size_t)scn->dev->mpi_nprocs, sizeof(*per_proc_green)); 767 if(!per_proc_green) { 768 log_err(scn->dev, 769 "Could not allocate the temporary list of per process " 770 "green functions.\n"); 771 res = RES_MEM_ERR; 772 goto error; 773 } 774 775 /* Set the gathered per thread green function stores on thread 0 at the 776 * green function for master process */ 777 SDIS(green_function_ref_get(per_thread_green[0])); 778 per_proc_green[0] = per_thread_green[0]; 779 780 /* Release per thread green functions */ 781 FOR_EACH(ithread, 0, scn->dev->nthreads) { 782 SDIS(green_function_ref_put(per_thread_green[ithread])); 783 per_thread_green[ithread] = NULL; 784 } 785 786 res = gather_green_functions_from_non_master_process(scn, per_proc_green+1); 787 if(res != RES_OK) goto error; 788 789 /* Redux the per proc green function into the green function of the master 790 * process */ 791 res = green_function_redux_and_clear 792 (per_proc_green[0], per_proc_green+1, (size_t)scn->dev->mpi_nprocs-1); 793 if(res != RES_OK) goto error; 794 795 /* Return the gatherd green function of the master process */ 796 SDIS(green_function_ref_get(per_proc_green[0])); 797 green = per_proc_green[0]; 798 799 /* Finalize the green function */ 800 res = green_function_finalize(green, rng_proxy, &acc_time); 801 if(res != RES_OK) goto error; 802 } 803 804 exit: 805 if(per_proc_green) { 806 FOR_EACH(iproc, 0, scn->dev->mpi_nprocs) { 807 if(per_proc_green[iproc]) { 808 SDIS(green_function_ref_put(per_proc_green[iproc])); 809 } 810 } 811 MEM_RM(scn->dev->allocator, per_proc_green); 812 } 813 *out_green = green; 814 return res; 815 error: 816 if(green) { SDIS(green_function_ref_put(green)); green = NULL; } 817 goto exit; 818 } 819 820 #endif 821 822 #ifndef SDIS_ENABLE_MPI 823 res_T 824 gather_rng_proxy_sequence_id 825 (struct sdis_device* dev, 826 struct ssp_rng_proxy* proxy) 827 { 828 ASSERT(dev && proxy); 829 (void)dev, (void)proxy; 830 return RES_OK; 831 } 832 #else 833 834 res_T 835 gather_rng_proxy_sequence_id 836 (struct sdis_device* dev, 837 struct ssp_rng_proxy* proxy) 838 { 839 unsigned long proc_seq_id = 0; 840 size_t seq_id = SSP_SEQUENCE_ID_NONE; 841 res_T res = RES_OK; 842 ASSERT(dev && proxy); 843 844 if(!dev->use_mpi) goto exit; 845 846 /* Retrieve the sequence id of the process */ 847 SSP(rng_proxy_get_sequence_id(proxy, &seq_id)); 848 CHK(seq_id <= ULONG_MAX); 849 proc_seq_id = (unsigned long)seq_id; 850 851 /* Non master process */ 852 if(dev->mpi_rank != 0) { 853 854 /* Send the sequence id to the master process */ 855 mutex_lock(dev->mpi_mutex); 856 MPI(Send(&proc_seq_id, 1, MPI_UNSIGNED_LONG, 0/*Dst*/, 857 MPI_SDIS_MSG_RNG_PROXY_SEQUENCE_ID, MPI_COMM_WORLD)); 858 mutex_unlock(dev->mpi_mutex); 859 860 /* Master process */ 861 } else { 862 size_t nseqs_to_flush = 0; 863 unsigned long max_seq_id = 0; 864 int iproc; 865 866 max_seq_id = proc_seq_id; 867 868 /* Gather per process sequence id and defined the maximum sequence id */ 869 FOR_EACH(iproc, 1, dev->mpi_nprocs) { 870 MPI_Request req; 871 unsigned long tmp_seq_id; 872 873 /* Asynchronously receive the sequence id of `iproc' */ 874 mutex_lock(dev->mpi_mutex); 875 MPI(Irecv(&tmp_seq_id, 1, MPI_UNSIGNED_LONG, iproc, 876 MPI_SDIS_MSG_RNG_PROXY_SEQUENCE_ID, MPI_COMM_WORLD, &req)); 877 mutex_unlock(dev->mpi_mutex); 878 mpi_waiting_for_request(dev, &req); 879 880 /* Define the maximum sequence id between all processes */ 881 max_seq_id = MMAX(max_seq_id, tmp_seq_id); 882 } 883 884 /* Flush the current sequence that is already consumed in addition to the 885 * sequences queried by the other processes */ 886 nseqs_to_flush = 1/*Current sequence*/ + max_seq_id - proc_seq_id; 887 res = ssp_rng_proxy_flush_sequences(proxy, nseqs_to_flush); 888 if(res != RES_OK) goto error; 889 } 890 891 exit: 892 return res; 893 error: 894 goto exit; 895 } 896 #endif 897 898 #ifndef SDIS_ENABLE_MPI 899 res_T 900 gather_res_T(struct sdis_device* dev, const res_T res) 901 { 902 (void)dev; 903 return res; 904 } 905 #else 906 res_T 907 gather_res_T(struct sdis_device* dev, const res_T proc_res) 908 { 909 int32_t status; 910 res_T res = proc_res; 911 int iproc; 912 ASSERT(dev); 913 914 if(!dev->use_mpi) return proc_res; 915 916 status = (int32_t)(proc_res); 917 918 /* Send the local res status to all other processes */ 919 FOR_EACH(iproc, 0, dev->mpi_nprocs) { 920 /* Do not send the res status to yourself */ 921 if(iproc == dev->mpi_rank) continue; 922 923 mutex_lock(dev->mpi_mutex); 924 MPI(Send(&status, 1, MPI_INT32_T, iproc, MPI_SDIS_MSG_RES_T, 925 MPI_COMM_WORLD)); 926 mutex_unlock(dev->mpi_mutex); 927 } 928 929 /* Receive the res status of all other processes */ 930 res = proc_res; 931 FOR_EACH(iproc, 0, dev->mpi_nprocs) { 932 MPI_Request req; 933 934 /* Do not receive the res status from yourself */ 935 if(iproc == dev->mpi_rank) continue; 936 937 mutex_lock(dev->mpi_mutex); 938 MPI(Irecv(&status, 1, MPI_INT32_T, iproc, MPI_SDIS_MSG_RES_T, 939 MPI_COMM_WORLD, &req)); 940 mutex_unlock(dev->mpi_mutex); 941 mpi_waiting_for_request(dev, &req); 942 943 if(res == RES_OK && status != RES_OK) { 944 res = (res_T)status; 945 } 946 } 947 948 return res; 949 } 950 951 #endif 952 953 void 954 print_progress 955 (struct sdis_device* dev, 956 int32_t progress[], 957 const char* label) 958 { 959 ASSERT(dev && label); 960 #ifndef SDIS_ENABLE_MPI 961 log_info(dev, "%s%3d%%%c", label, progress[0], 962 dev->no_escape_sequence ? '\n' : '\r'); 963 #else 964 if(!dev->use_mpi) { 965 log_info(dev, "%s%3d%%%c", label, progress[0], 966 dev->no_escape_sequence ? '\n' : '\r'); 967 } else { 968 int i; 969 if(dev->mpi_rank != 0) return; 970 mpi_fetch_progress(dev, progress); 971 FOR_EACH(i, 0, dev->mpi_nprocs) { 972 log_info(dev, "Process %d -- %s%3d%%%c", i, label, progress[i], 973 i == dev->mpi_nprocs - 1 && !dev->no_escape_sequence ? '\r' : '\n'); 974 } 975 } 976 #endif 977 } 978 979 void 980 print_progress_update 981 (struct sdis_device* dev, 982 int32_t progress[], 983 const char* label) 984 { 985 ASSERT(dev); 986 #ifndef SDIS_ENABLE_MPI 987 print_progress(dev, progress, label); 988 #else 989 if(!dev->use_mpi) { 990 print_progress(dev, progress, label); 991 } else { 992 if(dev->mpi_rank != 0) { 993 mpi_send_progress(dev, progress[0]); 994 } else { 995 mpi_fetch_progress(dev, progress); 996 rewind_progress_printing(dev); 997 print_progress(dev, progress, label); 998 } 999 } 1000 #endif 1001 } 1002 1003 void 1004 print_progress_completion 1005 (struct sdis_device* dev, 1006 int32_t progress[], 1007 const char* label) 1008 { 1009 ASSERT(dev); 1010 (void)dev, (void)progress, (void)label; 1011 1012 /* Only print at 100% completion when MPI is enabled, because when last 1013 * printed non-master processes might still be running. When MPI is disabled, 1014 * 100% completion is printed during calculation */ 1015 #ifdef SDIS_ENABLE_MPI 1016 if(dev->use_mpi && dev->mpi_rank == 0 && dev->mpi_nprocs > 1) { 1017 mpi_fetch_progress(dev, progress); 1018 rewind_progress_printing(dev); 1019 print_progress(dev, progress, label); 1020 1021 /* When escape sequences are allowed, the last newline character of the 1022 * progress message is replaced with a carriage return. After the 1023 * calculation is complete, we therefore print an additional newline 1024 * character after this carriage return. */ 1025 if(!dev->no_escape_sequence) { 1026 log_info(dev, "\n"); 1027 } 1028 } 1029 #endif 1030 } 1031 1032 void 1033 process_barrier(struct sdis_device* dev) 1034 { 1035 #ifndef SDIS_ENABLE_MPI 1036 (void)dev; 1037 return; 1038 #else 1039 if(dev->use_mpi) { 1040 mpi_barrier(dev); 1041 } 1042 #endif 1043 }