htrdr

Solving radiative transfer in heterogeneous media
git clone git://git.meso-star.fr/htrdr.git
Log | Files | Refs | README | LICENSE

htrdr_solve_buffer.c (17165B)


      1 /* Copyright (C) 2018-2019, 2022-2025 Centre National de la Recherche Scientifique
      2  * Copyright (C) 2020-2022 Institut Mines Télécom Albi-Carmaux
      3  * Copyright (C) 2022-2025 Institut Pierre-Simon Laplace
      4  * Copyright (C) 2022-2025 Institut de Physique du Globe de Paris
      5  * Copyright (C) 2018-2025 |Méso|Star> (contact@meso-star.com)
      6  * Copyright (C) 2022-2025 Observatoire de Paris
      7  * Copyright (C) 2022-2025 Université de Reims Champagne-Ardenne
      8  * Copyright (C) 2022-2025 Université de Versaille Saint-Quentin
      9  * Copyright (C) 2018-2019, 2022-2025 Université Paul Sabatier
     10  *
     11  * This program is free software: you can redistribute it and/or modify
     12  * it under the terms of the GNU General Public License as published by
     13  * the Free Software Foundation, either version 3 of the License, or
     14  * (at your option) any later version.
     15  *
     16  * This program is distributed in the hope that it will be useful,
     17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     19  * GNU General Public License for more details.
     20  *
     21  * You should have received a copy of the GNU General Public License
     22  * along with this program. If not, see <http://www.gnu.org/licenses/>. */
     23 
     24 #include "core/htrdr.h"
     25 #include "core/htrdr_c.h"
     26 #include "core/htrdr_buffer.h"
     27 #include "core/htrdr_proc_work.h"
     28 #include "core/htrdr_log.h"
     29 #include "core/htrdr_solve_buffer.h"
     30 
     31 #include <rsys/clock_time.h>
     32 #include <rsys/cstr.h>
     33 #include <rsys/list.h>
     34 #include <rsys/math.h>
     35 #include <rsys/mutex.h>
     36 #include <rsys/ref_count.h>
     37 
     38 #include <star/ssp.h>
     39 
     40 #include <mpi.h>
     41 #include <omp.h>
     42 
     43 #define CHUNK_SIZE 32 /* Number of items in one chunk */
     44 
     45 /* Collection of items */
     46 struct chunk {
     47   struct list_node node;
     48   struct mem_allocator* allocator;
     49   ref_T ref;
     50 
     51   struct chunk_data {
     52     size_t item_sz; /* Size of an item */
     53     size_t item_al; /* Item alignment */
     54     uint64_t index; /* Chunk index in chunk space */
     55     /* Simulate the flexible array member of the C99 standard */
     56     char ALIGN(16) items[1/*Dummy*/];
     57   } data;
     58 };
     59 
     60 /*******************************************************************************
     61  * Helper functions
     62  ******************************************************************************/
     63 static INLINE res_T
     64 check_solve_buffer_args(const struct htrdr_solve_buffer_args* args)
     65 {
     66   if(!args) return RES_BAD_ARG;
     67 
     68   /* A functor must be defined */
     69   if(!args->solve_item) return RES_BAD_ARG;
     70 
     71   /* The number of realisations cannot be null */
     72   if(!args->nrealisations) return RES_BAD_ARG;
     73 
     74   /* The buffer must in one-dimensional */
     75   if(args->buffer_layout.height != 1) return RES_BAD_ARG;
     76 
     77   /* Check buffer layout consistency */
     78   return htrdr_buffer_layout_check(&args->buffer_layout);
     79 }
     80 
     81 static INLINE void
     82 release_chunk(ref_T* ref)
     83 {
     84   struct chunk* chunk = CONTAINER_OF(ref, struct chunk, ref);
     85   ASSERT(ref);
     86   MEM_RM(chunk->allocator, chunk);
     87 }
     88 
     89 static INLINE void
     90 chunk_ref_get(struct chunk* chunk)
     91 {
     92   ASSERT(chunk);
     93   ref_get(&chunk->ref);
     94 }
     95 
     96 static INLINE void
     97 chunk_ref_put(struct chunk* chunk)
     98 {
     99   ASSERT(chunk);
    100   ref_put(&chunk->ref, release_chunk);
    101 }
    102 
    103 static FINLINE struct chunk*
    104 chunk_create
    105   (struct mem_allocator* allocator,
    106    const size_t item_sz, /* Size in bytes */
    107    const size_t item_al, /* Alignment in bytes */
    108    const uint64_t ichunk) /* Chunk index */
    109 {
    110   struct chunk* chunk = NULL;
    111   const size_t header_sz = sizeof(*chunk) - 1/*dummy octet in flexible array*/;
    112   const size_t buf_sz = CHUNK_SIZE*item_sz;
    113 
    114   /* Pre conditions */
    115   ASSERT(allocator);
    116   ASSERT(IS_ALIGNED(item_sz, item_al));
    117   ASSERT(IS_POW2(item_al));
    118 
    119   chunk = MEM_ALLOC_ALIGNED(allocator, header_sz + buf_sz, 16);
    120   if(!chunk) goto error;
    121   ref_init(&chunk->ref);
    122   list_init(&chunk->node);
    123   chunk->allocator = allocator;
    124   chunk->data.item_sz = item_sz;
    125   chunk->data.item_al = item_al;
    126   chunk->data.index = ichunk;
    127   CHK(IS_ALIGNED(chunk->data.items, item_al));
    128 
    129 exit:
    130   return chunk;
    131 error:
    132   if(chunk) { chunk_ref_put(chunk); chunk = NULL; }
    133   goto exit;
    134 }
    135 
    136 static FINLINE void*
    137 chunk_at(struct chunk* chunk, const size_t i)
    138 {
    139   ASSERT(chunk && i < CHUNK_SIZE);
    140   return (void*)(chunk->data.items + i*chunk->data.item_sz);
    141 }
    142 
    143 static void
    144 release_chunk_list(struct list_node* chunks)
    145 {
    146   struct list_node* node = NULL;
    147   struct list_node* tmp = NULL;
    148   ASSERT(chunks);
    149 
    150   LIST_FOR_EACH_SAFE(node, tmp, chunks) {
    151     struct chunk* chunk = CONTAINER_OF(node, struct chunk, node);
    152     list_del(node);
    153     chunk_ref_put(chunk);
    154   }
    155 }
    156 
    157 static INLINE uint64_t
    158 get_chunk
    159   (struct htrdr* htrdr,
    160    struct ssp_rng* rng,
    161    struct proc_work* work)
    162 {
    163   uint64_t ichunk = CHUNK_ID_NULL;
    164 
    165   /* Make the function critical, as the entire process must be executed
    166    * atomically. Indeed, the first thread to query an invalid chunk steals work
    167    * from other processes. So, when the function exits, the other threads will
    168    * have valid chunks */
    169   #pragma omp critical
    170   {
    171     ichunk = proc_work_get_chunk(work);
    172     if(ichunk == CHUNK_ID_NULL) { /* No more work on this process */
    173       size_t nthieves = 0;
    174 
    175       proc_work_reset(work);
    176       nthieves = mpi_steal_work(htrdr, rng, work);
    177       if(nthieves != 0) {
    178         ichunk = proc_work_get_chunk(work);
    179       }
    180     }
    181   }
    182 
    183   return ichunk;
    184 }
    185 
    186 static INLINE void
    187 status_update(struct htrdr* htrdr, const int32_t progress)
    188 {
    189   ASSERT(htrdr);
    190 
    191   #pragma omp critical
    192   if(progress > htrdr->mpi_progress_render[0]) {
    193     htrdr->mpi_progress_render[0] = progress;
    194 
    195     /* Print update on master process */
    196     if(htrdr->mpi_rank == 0) {
    197       update_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING);
    198 
    199       /* Send the progress percentage to the master process */
    200     } else {
    201       send_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING, progress);
    202     }
    203   }
    204 }
    205 
    206 static struct ssp_rng*
    207 rng_create
    208   (struct htrdr* htrdr,
    209    const size_t ithread,
    210    const size_t nchunks,
    211    const uint64_t ichunk)
    212 {
    213   struct ssp_rng_proxy_create2_args args = SSP_RNG_PROXY_CREATE2_ARGS_NULL;
    214   struct ssp_rng_proxy* proxy = NULL;
    215   struct ssp_rng* rng = NULL;
    216   struct mem_allocator* allocator = NULL;
    217   ASSERT(htrdr);
    218 
    219   allocator = htrdr_get_thread_allocator(htrdr, (size_t)ithread);
    220 
    221   args.type = SSP_RNG_THREEFRY;
    222   args.sequence_offset = RNG_SEQUENCE_SIZE * (size_t)ichunk;
    223   args.sequence_size = RNG_SEQUENCE_SIZE;
    224   args.sequence_pitch = RNG_SEQUENCE_SIZE * nchunks;
    225   args.nbuckets = 1;
    226   SSP(rng_proxy_create2(allocator, &args, &proxy));
    227   SSP(rng_proxy_create_rng(proxy, 0, &rng));
    228   SSP(rng_proxy_ref_put(proxy));
    229 
    230   return rng;
    231 }
    232 
    233 static void
    234 write_chunk_data
    235   (struct htrdr* htrdr,
    236    struct htrdr_buffer* buf,
    237    const struct chunk_data* chunk_data)
    238 {
    239   struct htrdr_buffer_layout layout = HTRDR_BUFFER_LAYOUT_NULL;
    240   char* mem = NULL;
    241   size_t iitem = 0;
    242   size_t nitems = 0;
    243 
    244   ASSERT(htrdr && buf && chunk_data);
    245   ASSERT(chunk_data->index != CHUNK_ID_NULL);
    246   (void)htrdr;
    247 
    248   htrdr_buffer_get_layout(buf, &layout);
    249   ASSERT(layout.height == 1);
    250   ASSERT(layout.elmt_size == chunk_data->item_sz);
    251 
    252   /* Calculate the index of the first item to write */
    253   iitem = chunk_data->index * CHUNK_SIZE;
    254 
    255   /* Define the number of items to write into the buffer */
    256   nitems = MMIN(iitem + CHUNK_SIZE, layout.width) - iitem;
    257 
    258   /* Calculate destination address for writing chunk data */
    259   mem = htrdr_buffer_get_data(buf);
    260   mem = mem + iitem*layout.elmt_size;
    261 
    262   /* Write the chunk items into the buffer */
    263   memcpy(mem, chunk_data->items, nitems*layout.elmt_size);
    264 }
    265 
    266 static res_T
    267 mpi_gather_chunks
    268   (struct htrdr* htrdr,
    269    const struct htrdr_buffer_layout* layout,
    270    struct htrdr_buffer* buf, /* Can be NULL for non master processes */
    271    const size_t nchunks,
    272    struct list_node* chunks)
    273 {
    274   size_t msg_sz = 0;
    275   struct list_node* node = NULL;
    276   struct chunk* chunk = NULL; /* Temporary chunk */
    277   res_T res = RES_OK;
    278 
    279   /* Pre conditions */
    280   ASSERT(htrdr && layout && chunks);
    281   ASSERT(layout->height == 1);
    282   ASSERT(htrdr_buffer_layout_check(layout) == RES_OK);
    283 
    284   /* Compute the size of chunk data */
    285   msg_sz = sizeof(struct chunk_data)
    286     + CHUNK_SIZE*layout->elmt_size
    287     -1 /* Dummy octet of the flexible array */;
    288   ASSERT(msg_sz <= INT_MAX);
    289 
    290   /* Non master process: Send the computed chunk to the master process */
    291   if(htrdr->mpi_rank != 0) {
    292     LIST_FOR_EACH(node, chunks) {
    293       struct chunk* c = CONTAINER_OF(node, struct chunk, node);
    294       MPI(Send(&c->data, (int)msg_sz, MPI_CHAR, 0/*Master process*/,
    295         HTRDR_MPI_CHUNK_DATA, MPI_COMM_WORLD));
    296     }
    297 
    298   /* Master rrocess */
    299   } else {
    300     size_t ichunk = 0;
    301     ASSERT(buf);
    302 
    303 #ifndef NDEBUG
    304     /* Check that the submitted buffer layout matches the buffer layout */
    305     {
    306       struct htrdr_buffer_layout buf_layout = HTRDR_BUFFER_LAYOUT_NULL;
    307       htrdr_buffer_get_layout(buf, &buf_layout);
    308       ASSERT(htrdr_buffer_layout_eq(&buf_layout, layout));
    309     }
    310 #endif
    311 
    312     /* Write data for chunks resolved by the master process */
    313     LIST_FOR_EACH(node, chunks) {
    314       struct chunk* c = CONTAINER_OF(node, struct chunk, node);
    315       write_chunk_data(htrdr, buf, &c->data);
    316       ++ichunk;
    317     }
    318 
    319     /* There are chunks unresolved by the master process */
    320     if(ichunk != nchunks) {
    321       ASSERT(htrdr->mpi_nprocs > 1);
    322 
    323       /* Create a temporary chunk to receive the chunk data computed by the
    324        * non master processes */
    325       chunk = chunk_create
    326         (htrdr->allocator,
    327          layout->elmt_size,
    328          layout->alignment,
    329          CHUNK_ID_NULL); /* Dummy chunk id that is going to be overwritten */
    330       if(!chunk) goto error;
    331     }
    332 
    333     /* Gather chunks sent by non master processes */
    334     FOR_EACH(ichunk, ichunk, nchunks) {
    335       MPI(Recv(&chunk->data, (int)msg_sz, MPI_CHAR, MPI_ANY_SOURCE,
    336         HTRDR_MPI_CHUNK_DATA, MPI_COMM_WORLD, MPI_STATUS_IGNORE));
    337       write_chunk_data(htrdr, buf, &chunk->data);
    338     }
    339   }
    340 
    341 exit:
    342   if(chunk) chunk_ref_put(chunk);
    343   return res;
    344 error:
    345   htrdr_log_err(htrdr, "Error while gathering results -- %s\n",
    346     res_to_cstr(res));
    347   goto exit;
    348 }
    349 
    350 static res_T
    351 solve_chunk
    352   (struct htrdr* htrdr,
    353    const struct htrdr_solve_buffer_args* args,
    354    const size_t ithread,
    355    const uint64_t ichunk,
    356    struct ssp_rng* rng,
    357    struct chunk* chunk)
    358 {
    359   struct htrdr_solve_item_args item_args = HTRDR_SOLVE_ITEM_ARGS_NULL;
    360   size_t i = 0;
    361   size_t nitems = 0;
    362 
    363   ASSERT(htrdr && args && rng && chunk);
    364   ASSERT(args->buffer_layout.height == 1);
    365   ASSERT(ichunk * CHUNK_SIZE < args->buffer_layout.width);
    366 
    367   nitems = args->buffer_layout.width;
    368 
    369   /* Setup the shared item arguments */
    370   item_args.rng = rng;
    371   item_args.nrealisations = args->nrealisations;
    372   item_args.ithread = ithread;
    373   item_args.context = args->context;
    374 
    375   FOR_EACH(i, 0, CHUNK_SIZE) {
    376     void* item = NULL;
    377     size_t item_id = ichunk*CHUNK_SIZE + i;
    378 
    379     if(item_id >= nitems) {
    380       /* The item is out of the range,
    381        * i.e. we've reached the total number of items to solve */
    382       break;
    383     }
    384 
    385     /* Fetch the item */
    386     item = chunk_at(chunk, i);
    387 
    388     /* Setup the item index */
    389     item_args.item_id = item_id;
    390 
    391     /* Solve the item */
    392     args->solve_item(htrdr, &item_args, item);
    393   }
    394   return RES_OK;
    395 }
    396 
    397 static res_T
    398 solve_buffer
    399   (struct htrdr* htrdr,
    400    const struct htrdr_solve_buffer_args* args,
    401    const size_t nchunks, /* Total #chunks distributed between processes */
    402    struct proc_work* work,
    403    struct list_node* chunks)
    404 {
    405   struct ssp_rng* rng_proc = NULL; /* Used to sample a working process */
    406   size_t nchunks_proc = 0; /* #chunks of the process */
    407   size_t nthreads = 0; /* #threads to use */
    408   ATOMIC nchunks_solved = 0; /* #chunks solved bu the process */
    409   ATOMIC res = RES_OK;
    410 
    411   /* Pre-conditions */
    412   ASSERT(htrdr && args && work && chunks);
    413 
    414   res = ssp_rng_create(htrdr->allocator, SSP_RNG_MT19937_64, &rng_proc);
    415   if(res != RES_OK) goto error;
    416 
    417   nchunks_proc = proc_work_get_nchunks(work);
    418   nthreads = MMIN(htrdr->nthreads, nchunks_proc);
    419 
    420   /* The process is not considered as a working process for himself */
    421   htrdr->mpi_working_procs[htrdr->mpi_rank] = 0;
    422   --htrdr->mpi_nworking_procs;
    423 
    424   omp_set_num_threads((int)nthreads);
    425   #pragma omp parallel
    426   for(;;) {
    427     /* Chunk */
    428     uint64_t ichunk = get_chunk(htrdr, rng_proc, work);
    429     struct chunk* chunk = NULL;
    430 
    431     /* Miscellaneous */
    432     const size_t ithread = (size_t)omp_get_thread_num();
    433     struct ssp_rng* rng = NULL;
    434     size_t n = 0;
    435     int32_t pcent = 0;
    436     res_T res_local = RES_OK;
    437 
    438     if(ichunk == CHUNK_ID_NULL) break; /* No more work */
    439 
    440     chunk = chunk_create
    441       (htrdr->allocator,
    442        args->buffer_layout.elmt_size,
    443        args->buffer_layout.alignment,
    444        ichunk);
    445     if(!chunk) {
    446       ATOMIC_SET(&res, RES_MEM_ERR);
    447       break;
    448     }
    449 
    450     #pragma omp critical
    451     list_add_tail(chunks, &chunk->node); /* Register the chunk */
    452 
    453     rng = rng_create(htrdr, ithread, nchunks, ichunk);
    454     res_local = solve_chunk(htrdr, args, ithread, ichunk, rng, chunk);
    455 
    456     SSP(rng_ref_put(rng));
    457     if(res_local != RES_OK) {
    458       ATOMIC_SET(&res, res_local);
    459       break;
    460     }
    461 
    462     /* Status update */
    463     n = (size_t)ATOMIC_INCR(&nchunks_solved);
    464     pcent = (int32_t)((double)n * 100.0 / (double)nchunks_proc + 0.5/*round*/);
    465     status_update(htrdr, pcent);
    466   }
    467 
    468   if(ATOMIC_GET(&res) != RES_OK) goto error;
    469 
    470   /* Asynchronously wait for processes completion. Use an asynchronous barrier to
    471    * avoid a dead lock with the `mpi_probe_thieves' thread that requires also
    472    * the `mpi_mutex'. */
    473   {
    474     MPI_Request req;
    475 
    476     mutex_lock(htrdr->mpi_mutex);
    477     MPI(Ibarrier(MPI_COMM_WORLD, &req));
    478     mutex_unlock(htrdr->mpi_mutex);
    479 
    480     mpi_wait_for_request(htrdr, &req);
    481   }
    482 
    483 exit:
    484   if(rng_proc) SSP(rng_ref_put(rng_proc));
    485   return (res_T)res;
    486 error:
    487   htrdr_log_err(htrdr, "Error while solving buffer -- %s\n",
    488     res_to_cstr((res_T)res));
    489   goto exit;
    490 }
    491 
    492 /*******************************************************************************
    493  * Local functions
    494  ******************************************************************************/
    495 res_T
    496 htrdr_solve_buffer
    497   (struct htrdr* htrdr,
    498    const struct htrdr_solve_buffer_args* args,
    499    struct htrdr_buffer* buf)
    500 {
    501   /* Time registration */
    502   char strbuf[128];
    503   struct time t0, t1;
    504 
    505   /* Chunks */
    506   struct list_node chunks; /* List of solved chunks */
    507   size_t nchunks = 0; /* Overall number of chunks */
    508   size_t nchunks_proc = 0; /* #chunks for the current proc*/
    509   size_t nchunks_remain = 0; /* Remaining #chunks to distribute between procs */
    510 
    511   /* Miscellaneous */
    512   struct proc_work work;
    513   size_t nitems = 0; /* Number of Monte Carlo estimations */
    514   size_t i = 0;
    515   ATOMIC probe_thieves = 1; /* Boolean that controls thieves' polling */
    516 
    517   res_T res = RES_OK;
    518   ASSERT(htrdr && check_solve_buffer_args(args) == RES_OK);
    519   ASSERT(htrdr->mpi_rank != 0 || buf);
    520 
    521   list_init(&chunks);
    522   proc_work_init(htrdr->allocator, &work);
    523 
    524   nitems = args->buffer_layout.width;
    525   nchunks = (nitems + (CHUNK_SIZE-1)/*ceil*/) / CHUNK_SIZE;
    526   nchunks_proc = nchunks / (size_t)htrdr->mpi_nprocs;
    527   nchunks_remain = nchunks - nchunks_proc*(size_t)htrdr->mpi_nprocs;
    528 
    529   /* Distribute the remaining chunks among the processes. Each process whose
    530    * rank is lower than the number of remaining chunks takes an additional
    531    * chunk */
    532   if((size_t)htrdr->mpi_rank < nchunks_remain) {
    533     ++nchunks_proc;
    534   }
    535 
    536   /* Register the list of chunks to be processed by the current process */
    537   FOR_EACH(i, 0, nchunks_proc) {
    538     size_t ichunk = i * (size_t)htrdr->mpi_nprocs + (size_t)htrdr->mpi_rank;
    539     proc_work_add_chunk(&work, (uint64_t)ichunk);
    540   }
    541 
    542   /* On the master process, request and print the progress report, since the
    543    * other processes have been able to start the calculation */
    544   if(htrdr->mpi_rank == 0) {
    545     fetch_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING);
    546     print_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING);
    547   }
    548 
    549   /* Start of calculation time recording */
    550   time_current(&t0);
    551 
    552   /* Enable nested threads to enable parallelization of the solve function */
    553   omp_set_nested(1);
    554 
    555   #pragma omp parallel sections num_threads(2)
    556   {
    557     /* Polling of steal queries */
    558     #pragma omp section
    559     mpi_probe_thieves(htrdr, &work, &probe_thieves);
    560 
    561     #pragma omp section
    562     {
    563       solve_buffer(htrdr, args, nchunks, &work, &chunks);
    564       /* The process have no more work to do. Stop polling for thieves */
    565       ATOMIC_SET(&probe_thieves, 0);
    566     }
    567   }
    568 
    569   if(htrdr->mpi_rank == 0) {
    570     update_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING);
    571     htrdr_log(htrdr, "\n"); /* Add a new line after the progress statuses */
    572   }
    573 
    574   /* Stop time recording */
    575   time_sub(&t0, time_current(&t1), &t0);
    576   time_dump(&t0, TIME_ALL, NULL, strbuf, sizeof(strbuf));
    577   htrdr_log(htrdr, "Calculation time: %s\n", strbuf);
    578 
    579   /* Gather chunks on master process */
    580   time_current(&t0);
    581   res = mpi_gather_chunks(htrdr, &args->buffer_layout, buf, nchunks, &chunks);
    582   if(res != RES_OK) goto error;
    583   time_sub(&t0, time_current(&t1), &t0);
    584   time_dump(&t0, TIME_ALL, NULL, strbuf, sizeof(strbuf));
    585   htrdr_log(htrdr, "Time to gather results: %s\n", strbuf);
    586 
    587 exit:
    588   release_chunk_list(&chunks);
    589   proc_work_release(&work);
    590   return res;
    591 error:
    592   goto exit;
    593 }