htrdr

Solving radiative transfer in heterogeneous media
git clone git://git.meso-star.fr/htrdr.git
Log | Files | Refs | README | LICENSE

commit a945a02670bbb791ba61430b8aec6043efa8eddf
parent f319647ac65b2c79f20c06726a422e3dc2ca9306
Author: Vincent Forest <vincent.forest@meso-star.com>
Date:   Tue, 30 Oct 2018 12:26:30 +0100

Upd the work scheduling policy

Use dynamic scheduling rather than static scheduling and implement a work
stealing strategy among MPI processes.

Even though work scheduling is now totally dynamic, we still ensure the
reproducibility of the results by splitting the random sequence in
pools of random numbers that is ensured to be unique for each image tile
rather for each running processes/threads. Note that actually, this
solution is practical only for random number generators with an
efficient discard function like the threefry generator.

Diffstat:
Msrc/htrdr.c | 171+++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------
Msrc/htrdr.h | 9+++++++--
Msrc/htrdr_c.h | 30++++++++++++++++++++++--------
Msrc/htrdr_draw_radiance_sw.c | 554+++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------
Msrc/htrdr_main.c | 22+++++++++++++++++++++-
Msrc/htrdr_sky.c | 11+++++------
6 files changed, 568 insertions(+), 229 deletions(-)

diff --git a/src/htrdr.c b/src/htrdr.c @@ -40,6 +40,7 @@ #include <stdarg.h> #include <stdio.h> #include <unistd.h> +#include <time.h> #include <sys/time.h> /* timespec */ #include <sys/stat.h> /* S_IRUSR & S_IWUSR */ @@ -93,7 +94,7 @@ log_msg va_list vargs) { ASSERT(htrdr && msg); - if(htrdr->verbose && htrdr->mpi_rank == 0) { + if(htrdr->verbose) { CHK(logger_vprint(&htrdr->logger, stream, msg, vargs) == RES_OK); } } @@ -245,6 +246,32 @@ spherical_to_cartesian_dir dir[2] = sin_elevation; } +static void +release_mpi(struct htrdr* htrdr) +{ + ASSERT(htrdr); + if(htrdr->mpi_working_procs) { + MEM_RM(htrdr->allocator, htrdr->mpi_working_procs); + htrdr->mpi_working_procs = NULL; + } + if(htrdr->mpi_progress_octree) { + MEM_RM(htrdr->allocator, htrdr->mpi_progress_octree); + htrdr->mpi_progress_octree = NULL; + } + if(htrdr->mpi_progress_render) { + MEM_RM(htrdr->allocator, htrdr->mpi_progress_render); + htrdr->mpi_progress_render = NULL; + } + if(htrdr->mpi_err_str) { + MEM_RM(htrdr->allocator, htrdr->mpi_err_str); + htrdr->mpi_err_str = NULL; + } + if(htrdr->mpi_mutex) { + mutex_destroy(htrdr->mpi_mutex); + htrdr->mpi_mutex = NULL; + } +} + static res_T init_mpi(struct htrdr* htrdr) { @@ -281,6 +308,20 @@ init_mpi(struct htrdr* htrdr) goto error; } + htrdr->mpi_working_procs = MEM_CALLOC(htrdr->allocator, + (size_t)htrdr->mpi_nprocs, sizeof(*htrdr->mpi_working_procs)); + if(!htrdr->mpi_working_procs) { + htrdr_log_err(htrdr, + "could not allocate the list of working processes.\n"); + res = RES_MEM_ERR; + goto error; + } + + /* Initialy, all the processes are working */ + htrdr->mpi_nworking_procs = (size_t)htrdr->mpi_nprocs; + memset(htrdr->mpi_working_procs, 0xFF, + htrdr->mpi_nworking_procs*sizeof(*htrdr->mpi_working_procs)); + /* Allocate #processes progress statuses on the master process and only 1 * progress status on the other ones: the master process will gather the * status of the other processes to report their progression. */ @@ -289,28 +330,36 @@ init_mpi(struct htrdr* htrdr) htrdr->mpi_progress_octree = MEM_CALLOC (htrdr->allocator, n, sizeof(*htrdr->mpi_progress_octree)); if(!htrdr->mpi_progress_octree) { - htrdr_log_err(htrdr, - "could not allocate the progress state of the octree building.\n"); res = RES_MEM_ERR; + htrdr_log_err(htrdr, + "could not allocate the progress state of the octree building -- %s.\n", + res_to_cstr(res)); goto error; } htrdr->mpi_progress_render = MEM_CALLOC (htrdr->allocator, n, sizeof(*htrdr->mpi_progress_render)); if(!htrdr->mpi_progress_render) { + res = RES_MEM_ERR; htrdr_log_err(htrdr, - "could not allocate the progress state of the scene rendering.\n"); + "could not allocate the progress state of the scene rendering -- %s.\n", + res_to_cstr(res)); + goto error; + } + + htrdr->mpi_mutex = mutex_create(); + if(!htrdr->mpi_mutex) { res = RES_MEM_ERR; + htrdr_log_err(htrdr, + "could not create the mutex to protect MPI calls from concurrent " + "threads -- %s.\n", res_to_cstr(res)); goto error; } exit: return res; error: - if(htrdr->mpi_err_str) { - MEM_RM(htrdr->allocator, htrdr->mpi_err_str); - htrdr->mpi_err_str = NULL; - } + release_mpi(htrdr); goto exit; } @@ -436,7 +485,7 @@ htrdr_init FOR_EACH(ithread, 0, htrdr->nthreads) { res = mem_init_lifo_allocator - (&htrdr->lifo_allocators[ithread], htrdr->allocator, 4096); + (&htrdr->lifo_allocators[ithread], htrdr->allocator, 16384); if(res != RES_OK) { htrdr_log_err(htrdr, "%s: could not initialise the LIFO allocator of the thread %lu -- %s.\n", @@ -456,6 +505,7 @@ void htrdr_release(struct htrdr* htrdr) { ASSERT(htrdr); + release_mpi(htrdr); if(htrdr->s3d) S3D(device_ref_put(htrdr->s3d)); if(htrdr->svx) SVX(device_ref_put(htrdr->svx)); if(htrdr->ground) htrdr_ground_ref_put(htrdr->ground); @@ -463,13 +513,6 @@ htrdr_release(struct htrdr* htrdr) if(htrdr->sun) htrdr_sun_ref_put(htrdr->sun); if(htrdr->cam) htrdr_camera_ref_put(htrdr->cam); if(htrdr->buf) htrdr_buffer_ref_put(htrdr->buf); - if(htrdr->mpi_err_str) MEM_RM(htrdr->allocator, htrdr->mpi_err_str); - if(htrdr->mpi_progress_octree) { - MEM_RM(htrdr->allocator, htrdr->mpi_progress_octree); - } - if(htrdr->mpi_progress_render) { - MEM_RM(htrdr->allocator, htrdr->mpi_progress_render); - } if(htrdr->lifo_allocators) { size_t i; FOR_EACH(i, 0, htrdr->nthreads) { @@ -529,11 +572,14 @@ error: void htrdr_log(struct htrdr* htrdr, const char* msg, ...) { - va_list vargs_list; ASSERT(htrdr && msg); - va_start(vargs_list, msg); - log_msg(htrdr, LOG_OUTPUT, msg, vargs_list); - va_end(vargs_list); + /* Log standard message only on master process */ + if(htrdr->mpi_rank == 0) { + va_list vargs_list; + va_start(vargs_list, msg); + log_msg(htrdr, LOG_OUTPUT, msg, vargs_list); + va_end(vargs_list); + } } void @@ -541,6 +587,7 @@ htrdr_log_err(struct htrdr* htrdr, const char* msg, ...) { va_list vargs_list; ASSERT(htrdr && msg); + /* Log errors on all processes */ va_start(vargs_list, msg); log_msg(htrdr, LOG_ERROR, msg, vargs_list); va_end(vargs_list); @@ -549,11 +596,14 @@ htrdr_log_err(struct htrdr* htrdr, const char* msg, ...) void htrdr_log_warn(struct htrdr* htrdr, const char* msg, ...) { - va_list vargs_list; ASSERT(htrdr && msg); - va_start(vargs_list, msg); - log_msg(htrdr, LOG_WARNING, msg, vargs_list); - va_end(vargs_list); + /* Log warnings only on master process */ + if(htrdr->mpi_rank == 0) { + va_list vargs_list; + va_start(vargs_list, msg); + log_msg(htrdr, LOG_WARNING, msg, vargs_list); + va_end(vargs_list); + } } const char* @@ -777,45 +827,73 @@ error: } void -fetch_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_progress tag) +send_mpi_progress + (struct htrdr* htrdr, const enum htrdr_mpi_message msg, const int32_t percent) { - int8_t* progress = NULL; + ASSERT(htrdr); + ASSERT(msg == HTRDR_MPI_PROGRESS_RENDERING + || msg == HTRDR_MPI_PROGRESS_BUILD_OCTREE); + (void)htrdr; + mutex_lock(htrdr->mpi_mutex); + MPI(Send(&percent, 1, MPI_INT32_T, 0, msg, MPI_COMM_WORLD)); + mutex_unlock(htrdr->mpi_mutex); +} + +void +fetch_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_message msg) +{ + struct timespec t; + int32_t* progress = NULL; int iproc; ASSERT(htrdr && htrdr->mpi_rank == 0); - switch(tag) { - case HTRDR_MPI_PROGRESS_BUILD_OCTREE: + t.tv_sec = 0; + t.tv_nsec = 10000000; /* 10ms */ + + switch(msg) { + case HTRDR_MPI_PROGRESS_BUILD_OCTREE: progress = htrdr->mpi_progress_octree; break; case HTRDR_MPI_PROGRESS_RENDERING: progress = htrdr->mpi_progress_render; - break; + break; default: FATAL("Unreachable code.\n"); break; } FOR_EACH(iproc, 1, htrdr->mpi_nprocs) { - /* Flush the last sent percentage of the process `iproc' */ for(;;) { + MPI_Request req; int flag; + int complete; + + mutex_lock(htrdr->mpi_mutex); + MPI(Iprobe(iproc, msg, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE)); + mutex_unlock(htrdr->mpi_mutex); - CHK(MPI_Iprobe - (iproc, tag, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE) == MPI_SUCCESS); if(flag == 0) break; /* No more message */ - CHK(MPI_Recv(&progress[iproc], sizeof(size_t), MPI_CHAR, iproc, tag, - MPI_COMM_WORLD, MPI_STATUS_IGNORE) == MPI_SUCCESS); + mutex_lock(htrdr->mpi_mutex); + MPI(Irecv(&progress[iproc], 1, MPI_INT32_T, iproc, msg, MPI_COMM_WORLD, &req)); + mutex_unlock(htrdr->mpi_mutex); + for(;;) { + mutex_lock(htrdr->mpi_mutex); + MPI(Test(&req, &complete, MPI_STATUS_IGNORE)); + mutex_unlock(htrdr->mpi_mutex); + if(complete) break; + nanosleep(&t, NULL); + } } } } void -print_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_progress tag) +print_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_message msg) { ASSERT(htrdr && htrdr->mpi_rank == 0); if(htrdr->mpi_nprocs == 1) { - switch(tag) { + switch(msg) { case HTRDR_MPI_PROGRESS_BUILD_OCTREE: htrdr_fprintf(htrdr, stderr, "\033[2K\rBuilding octree: %3d%%", htrdr->mpi_progress_octree[0]); @@ -830,7 +908,7 @@ print_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_progress tag) } else { int iproc; FOR_EACH(iproc, 0, htrdr->mpi_nprocs) { - switch(tag) { + switch(msg) { case HTRDR_MPI_PROGRESS_BUILD_OCTREE: htrdr_fprintf(htrdr, stderr, "\033[2K\rProcess %d -- building octree: %3d%%%c", @@ -850,25 +928,25 @@ print_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_progress tag) } void -clear_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_progress tag) +clear_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_message msg) { ASSERT(htrdr); - (void)tag; + (void)msg; if(htrdr->mpi_nprocs > 1) { htrdr_fprintf(htrdr, stderr, "\033[%dA", htrdr->mpi_nprocs-1); } } -int8_t -total_mpi_progress(const struct htrdr* htrdr, const enum htrdr_mpi_progress tag) +int +total_mpi_progress(const struct htrdr* htrdr, const enum htrdr_mpi_message msg) { - const int8_t* progress = NULL; + const int* progress = NULL; int total = 0; int iproc; ASSERT(htrdr && htrdr->mpi_rank == 0); - switch(tag) { - case HTRDR_MPI_PROGRESS_BUILD_OCTREE: + switch(msg) { + case HTRDR_MPI_PROGRESS_BUILD_OCTREE: progress = htrdr->mpi_progress_octree; break; case HTRDR_MPI_PROGRESS_RENDERING: @@ -878,10 +956,9 @@ total_mpi_progress(const struct htrdr* htrdr, const enum htrdr_mpi_progress tag) } FOR_EACH(iproc, 0, htrdr->mpi_nprocs) { - total += progress[iproc]; + total += progress[iproc]; } total = total / htrdr->mpi_nprocs; - ASSERT(total <= 100); - return (int8_t)total; + return total; } diff --git a/src/htrdr.h b/src/htrdr.h @@ -35,6 +35,7 @@ struct htrdr_buffer; struct htrdr_sky; struct htrdr_rectangle; struct mem_allocator; +struct mutext; struct s3d_device; struct s3d_scene; struct ssf_bsdf; @@ -64,10 +65,14 @@ struct htrdr { int mpi_rank; /* Rank of the process in the MPI group */ int mpi_nprocs; /* Overall #processes in the MPI group */ char* mpi_err_str; /* Temp buffer used to store MPI error string */ + int8_t* mpi_working_procs; /* Define the rank of active processes */ + size_t mpi_nworking_procs; /* Process progress percentage */ - int8_t* mpi_progress_octree; - int8_t* mpi_progress_render; + int32_t* mpi_progress_octree; + int32_t* mpi_progress_render; + + struct mutex* mpi_mutex; /* Protect MPI calls from concurrent threads */ struct logger logger; struct mem_allocator* allocator; diff --git a/src/htrdr_c.h b/src/htrdr_c.h @@ -18,9 +18,17 @@ #include <rsys/rsys.h> -enum htrdr_mpi_progress { +#ifndef NDEBUG + #define MPI(Func) ASSERT(MPI_##Func == MPI_SUCCESS) +#else + #define MPI(Func) MPI_##Func +#endif + +enum htrdr_mpi_message { HTRDR_MPI_PROGRESS_BUILD_OCTREE, - HTRDR_MPI_PROGRESS_RENDERING + HTRDR_MPI_PROGRESS_RENDERING, + HTRDR_MPI_STEAL_REQUEST, + HTRDR_MPI_WORK_STEALING }; struct htrdr; @@ -110,27 +118,33 @@ create_directory const char* path); extern LOCAL_SYM void +send_mpi_progress + (struct htrdr* htrdr, + const enum htrdr_mpi_message progress, + const int32_t percent); + +extern LOCAL_SYM void fetch_mpi_progress (struct htrdr* htrdr, - const enum htrdr_mpi_progress progress); + const enum htrdr_mpi_message progress); extern LOCAL_SYM void print_mpi_progress (struct htrdr* htrdr, - const enum htrdr_mpi_progress progress); + const enum htrdr_mpi_message progress); extern LOCAL_SYM void clear_mpi_progress (struct htrdr* htrdr, - const enum htrdr_mpi_progress progress); + const enum htrdr_mpi_message progress); -extern int8_t +extern int32_t total_mpi_progress (const struct htrdr* htrdr, - const enum htrdr_mpi_progress progress); + const enum htrdr_mpi_message progress); static INLINE void -update_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_progress progress) +update_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_message progress) { ASSERT(htrdr); fetch_mpi_progress(htrdr, progress); diff --git a/src/htrdr_draw_radiance_sw.c b/src/htrdr_draw_radiance_sw.c @@ -21,18 +21,28 @@ #include "htrdr_solve.h" #include <rsys/cstr.h> +#include <rsys/dynamic_array_size_t.h> #include <rsys/math.h> #include <star/ssp.h> #include <omp.h> #include <mpi.h> +#include <time.h> #include <unistd.h> -#define RNG_SEQUENCE_SIZE 1000000 +#define RNG_SEQUENCE_SIZE 100000 #define TILE_SIZE 32 /* Definition in X & Y of a tile */ STATIC_ASSERT(IS_POW2(TILE_SIZE), TILE_SIZE_must_be_a_power_of_2); +/* Overall work of a process */ +struct proc_work { + struct darray_size_t tiles; /* #tiles to render */ + size_t itile; /* Next tile to render in the above list of tiles */ +}; + +#define TILE_MCODE_NULL INT64_MAX + /******************************************************************************* * Helper functions ******************************************************************************/ @@ -47,74 +57,54 @@ morton2D_decode(const uint32_t u32) return (uint16_t)x; } -static res_T -gather_buffer - (struct htrdr* htrdr, - struct htrdr_buffer* buf) +static INLINE void +proc_work_init(struct mem_allocator* allocator, struct proc_work* work) { - struct htrdr_buffer_layout layout; - struct htrdr_accum* gathered_accums = NULL; - size_t x, y; - int iproc; - res_T res = RES_OK; - ASSERT(htrdr && buf); + ASSERT(work); + darray_size_t_init(allocator, &work->tiles); + work->itile = 0; +} - /* Fetch the memory layout of the submitted buffer */ - htrdr_buffer_get_layout(buf, &layout); - ASSERT(layout.elmt_size == sizeof(struct htrdr_accum) * 3/*#channels*/); - ASSERT(layout.alignment <= ALIGNOF(struct htrdr_accum)); +static INLINE void +proc_work_release(struct proc_work* work) +{ + darray_size_t_release(&work->tiles); +} - /* The process 0 allocates the memory used to store the gathered buffer lines - * of the MPI processes */ - if(htrdr->mpi_rank == 0) { - gathered_accums = MEM_ALLOC - (htrdr->allocator, layout.pitch * (size_t)htrdr->mpi_nprocs); - if(!gathered_accums) { - res = RES_MEM_ERR; - htrdr_log_err(htrdr, - "could not allocate the temporary memory for MPI gathering -- %s.\n", - res_to_cstr(res)); - goto error; - } +static INLINE void +proc_work_reset(struct proc_work* work) +{ + ASSERT(work); + #pragma omp critical + { + darray_size_t_clear(&work->tiles); + work->itile = 0; } +} - FOR_EACH(y, 0, layout.height) { - struct htrdr_accum* buf_row_accums = (struct htrdr_accum*) - ((char*)htrdr_buffer_get_data(buf) + y * layout.pitch); - int err; /* MPI error */ - - /* Gather the buffer lines */ - err = MPI_Gather(buf_row_accums, (int)layout.pitch, MPI_CHAR, gathered_accums, - (int)layout.pitch, MPI_CHAR, 0, MPI_COMM_WORLD); - if(err != MPI_SUCCESS) { - htrdr_log_err(htrdr, -"could not gather the buffer line `%lu' from the group of processes -- %s.\n", - (unsigned long)y, htrdr_mpi_error_string(htrdr, err)); - res = RES_UNKNOWN_ERR; - goto error; - } +static INLINE void +proc_work_add_tile(struct proc_work* work, const int64_t mcode) +{ + size_t st = (size_t)mcode; + ASSERT(mcode >= 0); + #pragma omp critical + CHK(darray_size_t_push_back(&work->tiles, &st) == RES_OK); +} - /* Accumulates the gathered lines into the buffer of the process 0 */ - if(htrdr->mpi_rank == 0) { - memset(buf_row_accums, 0, layout.pitch); - FOR_EACH(iproc, 0, htrdr->mpi_nprocs) { - struct htrdr_accum* proc_accums = (struct htrdr_accum*) - ((char*)gathered_accums + (size_t)iproc * layout.pitch); - FOR_EACH(x, 0, layout.width * 3/*#channels*/) { - buf_row_accums[x].sum_weights += proc_accums[x].sum_weights; - buf_row_accums[x].sum_weights_sqr += proc_accums[x].sum_weights_sqr; - buf_row_accums[x].nweights += proc_accums[x].nweights; - buf_row_accums[x].nfailures += proc_accums[x].nfailures; - } - } +static INLINE int64_t +proc_work_get_tile(struct proc_work* work) +{ + int64_t mcode; + #pragma omp critical + { + if(work->itile >= darray_size_t_size_get(&work->tiles)) { + mcode = TILE_MCODE_NULL; + } else { + mcode = (int64_t)darray_size_t_cdata_get(&work->tiles)[work->itile]; + ++work->itile; } } - -exit: - if(gathered_accums) MEM_RM(htrdr->allocator, gathered_accums); - return res; -error: - goto exit; + return mcode; } static res_T @@ -208,6 +198,225 @@ draw_tile return RES_OK; } +static void +mpi_wait_for_request(struct htrdr* htrdr, MPI_Request* req) +{ + ASSERT(htrdr && req); + + /* Wait for process synchronisation */ + for(;;) { + struct timespec t; + int complete; + t.tv_sec = 0; + t.tv_nsec = 10000000; /* 10ms */ + + mutex_lock(htrdr->mpi_mutex); + MPI(Test(req, &complete, MPI_STATUS_IGNORE)); + mutex_unlock(htrdr->mpi_mutex); + if(complete) break; + + nanosleep(&t, NULL); + } +} + +static void +mpi_probe_thieves + (struct htrdr* htrdr, + struct proc_work* work, + ATOMIC* probe_thieves) +{ + struct timespec t; + ASSERT(htrdr && work && probe_thieves); + + if(htrdr->mpi_nprocs == 1) /* The process is alone. No thief is possible */ + return; + + t.tv_sec = 0; + + /* Protect MPI calls of multiple invocations from concurrent threads */ + #define P_MPI(Func) { \ + mutex_lock(htrdr->mpi_mutex); \ + MPI(Func); \ + mutex_unlock(htrdr->mpi_mutex); \ + } (void)0 + + while(ATOMIC_GET(probe_thieves)) { + MPI_Status status; + int msg; + int64_t tile; + + /* Probe if a steal request was submitted by any processes */ + P_MPI(Iprobe(MPI_ANY_SOURCE, HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD, &msg, + &status)); + + if(msg) { /* A steal request was posted */ + MPI_Request req; + int8_t dummy; + + /* Asynchronously receive the steal request */ + P_MPI(Irecv(&dummy, 1, MPI_INT8_T, status.MPI_SOURCE, + HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD, &req)); + + /* Wait for the completion of the steal request */ + mpi_wait_for_request(htrdr, &req); + + /* Thief a tile */ + tile = proc_work_get_tile(work); + P_MPI(Send(&tile, 1, MPI_INT64_T, status.MPI_SOURCE, + HTRDR_MPI_WORK_STEALING, MPI_COMM_WORLD)); + } + t.tv_nsec = 500000000; /* 500ms */ + nanosleep(&t, NULL); + } + #undef P_MPI +} + +static int +mpi_sample_working_process(struct htrdr* htrdr, struct ssp_rng* rng) +{ + int iproc, i; + int dst_rank; + ASSERT(htrdr && rng && htrdr->mpi_nworking_procs); + + /* Sample the index of the 1st active process */ + iproc = (int)(ssp_rng_canonical(rng) * (double)htrdr->mpi_nworking_procs); + + /* Find the rank of the sampled active process. Use a simple linear search + * since the overall number of process should be quite low; at most few + * dozens. */ + i = 0; + FOR_EACH(dst_rank, 0, htrdr->mpi_nprocs) { + if(htrdr->mpi_working_procs[dst_rank] == 0) continue; /* Inactive process */ + if(i == iproc) break; /* The rank of the sampled process is found */ + ++i; + } + ASSERT(dst_rank < htrdr->mpi_nprocs); + return dst_rank; +} + +/* Return the number of stolen tiles */ +static size_t +mpi_steal_work + (struct htrdr* htrdr, + struct ssp_rng* rng, + struct proc_work* work) +{ + size_t ntiles_to_steal = (size_t)htrdr->nthreads; + size_t nthieves = 0; + ASSERT(htrdr && rng && work); + + /* Protect MPI calls of multiple invocations from concurrent threads */ + #define P_MPI(Func) { \ + mutex_lock(htrdr->mpi_mutex); \ + MPI(Func); \ + mutex_unlock(htrdr->mpi_mutex); \ + } (void)0 + + while(nthieves < ntiles_to_steal && htrdr->mpi_nworking_procs) { + MPI_Request req; + const int8_t dummy = 1; + int64_t tile; /* Morton code of the stolen tile */ + int proc_to_steal; /* Process to steal */ + + /* Sample a process to steal */ + proc_to_steal = mpi_sample_working_process(htrdr, rng); + + /* Send a steal request to the sampled process and wait for a response */ + P_MPI(Send(&dummy, 1, MPI_INT8_T, proc_to_steal, HTRDR_MPI_STEAL_REQUEST, + MPI_COMM_WORLD)); + + /* Receive the stolen tile from the sampled process */ + P_MPI(Irecv(&tile, 1, MPI_INT64_T, proc_to_steal, + HTRDR_MPI_WORK_STEALING, MPI_COMM_WORLD, &req)); + + mpi_wait_for_request(htrdr, &req); + + if(tile != TILE_MCODE_NULL) { + proc_work_add_tile(work, tile); + ++nthieves; + } else { + ASSERT(htrdr->mpi_working_procs[proc_to_steal] != 0); + htrdr->mpi_working_procs[proc_to_steal] = 0; + htrdr->mpi_nworking_procs--; + } + } + #undef P_MPI + return nthieves; +} + +static res_T +mpi_gather_buffer + (struct htrdr* htrdr, + struct htrdr_buffer* buf) +{ + struct htrdr_buffer_layout layout; + struct htrdr_accum* gathered_accums = NULL; + size_t x, y; + int iproc; + res_T res = RES_OK; + ASSERT(htrdr && buf); + + /* Fetch the memory layout of the submitted buffer */ + htrdr_buffer_get_layout(buf, &layout); + ASSERT(layout.elmt_size == sizeof(struct htrdr_accum) * 3/*#channels*/); + ASSERT(layout.alignment <= ALIGNOF(struct htrdr_accum)); + + /* The process 0 allocates the memory used to store the gathered buffer lines + * of the MPI processes */ + if(htrdr->mpi_rank == 0) { + gathered_accums = MEM_ALLOC + (htrdr->allocator, layout.pitch * (size_t)htrdr->mpi_nprocs); + if(!gathered_accums) { + res = RES_MEM_ERR; + htrdr_log_err(htrdr, + "could not allocate the temporary memory for MPI gathering -- %s.\n", + res_to_cstr(res)); + goto error; + } + } + + FOR_EACH(y, 0, layout.height) { + struct htrdr_accum* buf_row_accums = (struct htrdr_accum*) + ((char*)htrdr_buffer_get_data(buf) + y * layout.pitch); + int err; /* MPI error */ + + /* Gather the buffer lines */ + mutex_lock(htrdr->mpi_mutex); + err = MPI_Gather(buf_row_accums, (int)layout.pitch, MPI_CHAR, gathered_accums, + (int)layout.pitch, MPI_CHAR, 0, MPI_COMM_WORLD); + mutex_unlock(htrdr->mpi_mutex); + if(err != MPI_SUCCESS) { + htrdr_log_err(htrdr, + "could not gather the buffer line `%lu' from the group of processes -- " + "%s.\n", + (unsigned long)y, htrdr_mpi_error_string(htrdr, err)); + res = RES_UNKNOWN_ERR; + goto error; + } + + /* Accumulates the gathered lines into the buffer of the process 0 */ + if(htrdr->mpi_rank == 0) { + memset(buf_row_accums, 0, layout.pitch); + FOR_EACH(iproc, 0, htrdr->mpi_nprocs) { + struct htrdr_accum* proc_accums = (struct htrdr_accum*) + ((char*)gathered_accums + (size_t)iproc * layout.pitch); + FOR_EACH(x, 0, layout.width * 3/*#channels*/) { + buf_row_accums[x].sum_weights += proc_accums[x].sum_weights; + buf_row_accums[x].sum_weights_sqr += proc_accums[x].sum_weights_sqr; + buf_row_accums[x].nweights += proc_accums[x].nweights; + buf_row_accums[x].nfailures += proc_accums[x].nfailures; + } + } + } + } + +exit: + if(gathered_accums) MEM_RM(htrdr->allocator, gathered_accums); + return res; +error: + goto exit; +} + /******************************************************************************* * Local functions ******************************************************************************/ @@ -218,20 +427,21 @@ htrdr_draw_radiance_sw const size_t spp, struct htrdr_buffer* buf) { - struct ssp_rng_proxy* rng_proxy = NULL; - struct ssp_rng** rngs = NULL; - size_t ntiles_x, ntiles_y, ntiles_adjusted; - size_t i; - int64_t* proc_tiles = NULL; + struct ssp_rng* rng_main = NULL; + size_t ntiles_x, ntiles_y, ntiles, ntiles_adjusted; + struct proc_work work; int64_t itile; struct htrdr_buffer_layout layout; double pix_sz[2]; /* Pixel size in the normalized image plane */ size_t proc_ntiles; size_t proc_ntiles_adjusted; ATOMIC nsolved_tiles = 0; + ATOMIC probe_thieves = 1; ATOMIC res = RES_OK; ASSERT(htrdr && cam && buf); + proc_work_init(htrdr->allocator, &work); + htrdr_buffer_get_layout(buf, &layout); ASSERT(layout.width || layout.height || layout.elmt_size); @@ -245,63 +455,37 @@ htrdr_draw_radiance_sw goto error; } - res = ssp_rng_proxy_create2 - (htrdr->allocator, - &ssp_rng_mt19937_64, - RNG_SEQUENCE_SIZE * (size_t)htrdr->mpi_rank, /* Offset */ - RNG_SEQUENCE_SIZE, /* Size */ - RNG_SEQUENCE_SIZE * (size_t)htrdr->mpi_nprocs, /* Pitch */ - htrdr->nthreads, &rng_proxy); + res = ssp_rng_create(htrdr->allocator, &ssp_rng_mt19937_64, &rng_main); if(res != RES_OK) { - htrdr_log_err(htrdr, "%s: could not create the RNG proxy -- %s.\n", + htrdr_log_err(htrdr, "%s: could not create the main RNG -- %s.\n", FUNC_NAME, res_to_cstr((res_T)res)); goto error; } - rngs = MEM_CALLOC(htrdr->allocator, htrdr->nthreads, sizeof(*rngs)); - if(!rngs) { - htrdr_log_err(htrdr, "%s: could not allocate the RNGs list.\n", FUNC_NAME); - res = RES_MEM_ERR; - goto error; - } - - FOR_EACH(i, 0, htrdr->nthreads) { - res = ssp_rng_proxy_create_rng(rng_proxy, i, &rngs[i]); - if(res != RES_OK) { - htrdr_log_err(htrdr,"%s: could not create the per thread RNG -- %s.\n", - FUNC_NAME, res_to_cstr((res_T)res)); - goto error; - } - } - + /* Compute the overall number of tiles */ ntiles_x = (layout.width + (TILE_SIZE-1)/*ceil*/)/TILE_SIZE; ntiles_y = (layout.height+ (TILE_SIZE-1)/*ceil*/)/TILE_SIZE; + ntiles = ntiles_x * ntiles_y; + + /* Adjust the #tiles for the morton-encoding procedure */ ntiles_adjusted = round_up_pow2(MMAX(ntiles_x, ntiles_y)); ntiles_adjusted *= ntiles_adjusted; + /* Compute the size of a pixel in the normalized image plane */ pix_sz[0] = 1.0 / (double)layout.width; pix_sz[1] = 1.0 / (double)layout.height; /* Define the initial number of tiles of the current process */ - proc_ntiles = ntiles_adjusted / (size_t)htrdr->mpi_nprocs; + proc_ntiles = ntiles / (size_t)htrdr->mpi_nprocs; + proc_ntiles_adjusted = ntiles_adjusted / (size_t)htrdr->mpi_nprocs; if(htrdr->mpi_rank == 0) {/* Affect the remaining tiles to the master proc */ - ASSERT(ntiles_adjusted >= proc_ntiles * (size_t)htrdr->mpi_nprocs); - proc_ntiles += ntiles_adjusted - proc_ntiles*(size_t)htrdr->mpi_nprocs; - } - - /* Allocate the per process list of tiles */ - proc_tiles = MEM_CALLOC(htrdr->allocator, proc_ntiles, sizeof(*proc_tiles)); - if(!proc_tiles) { - res = RES_MEM_ERR; - htrdr_log_err(htrdr, - "%s: could not allocate the per process list of tiles -- %s.\n", - FUNC_NAME, res_to_cstr((res_T)res)); - goto error; + proc_ntiles += ntiles - proc_ntiles*(size_t)htrdr->mpi_nprocs; + proc_ntiles_adjusted += + ntiles_adjusted - proc_ntiles*(size_t)htrdr->mpi_nprocs; } - /* Define the initial list of tiles of the process */ - proc_ntiles_adjusted = 0; - FOR_EACH(itile, 0, proc_ntiles) { + /* Define the initial list of tiles of the process */ + FOR_EACH(itile, 0, proc_ntiles_adjusted) { size_t tile_org[2]; int64_t mcode = htrdr->mpi_rank + (itile*htrdr->mpi_nprocs); @@ -311,8 +495,7 @@ htrdr_draw_radiance_sw tile_org[1] = morton2D_decode((uint32_t)(mcode>>1)); if(tile_org[1] >= ntiles_y) continue; /* Skip border tile */ - proc_tiles[proc_ntiles_adjusted] = mcode; - proc_ntiles_adjusted++; + proc_work_add_tile(&work, mcode); } if(htrdr->mpi_rank == 0) { @@ -320,77 +503,118 @@ htrdr_draw_radiance_sw print_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING); } - omp_set_num_threads((int)htrdr->nthreads); - #pragma omp parallel for schedule(static, 1/*chunck size*/) - for(itile=0; itile<(int64_t)proc_ntiles_adjusted; ++itile) { - const int ithread = omp_get_thread_num(); - struct ssp_rng* rng = rngs[ithread]; - int64_t mcode = proc_tiles[itile]; - size_t tile_org[2]; - size_t tile_sz[2]; - size_t n; - res_T res_local = RES_OK; - int8_t pcent; - - /* Decode the morton code to retrieve the tile index */ - tile_org[0] = morton2D_decode((uint32_t)(mcode>>0)); - tile_org[1] = morton2D_decode((uint32_t)(mcode>>1)); - ASSERT(tile_org[0] < ntiles_x && tile_org[1] < ntiles_y); + omp_set_nested(1); + #pragma omp parallel sections num_threads(2) + { + #pragma omp section + mpi_probe_thieves(htrdr, &work, &probe_thieves); + + #pragma omp section + { + MPI_Request req; + size_t nthreads = htrdr->nthreads; + size_t nthieves = 0; + + /* The process is not considered as a working process for himself */ + htrdr->mpi_working_procs[htrdr->mpi_rank] = 0; + --htrdr->mpi_nworking_procs; + + do { + omp_set_num_threads((int)nthreads); + #pragma omp parallel + for(;;) { + const int ithread = omp_get_thread_num(); + struct ssp_rng_proxy* rng_proxy = NULL; + struct ssp_rng* rng; + int64_t mcode; + size_t tile_org[2]; + size_t tile_sz[2]; + size_t n; + res_T res_local = RES_OK; + int32_t pcent; + + if(ATOMIC_GET(&res) != RES_OK) continue; + + mcode = proc_work_get_tile(&work); + if(mcode == TILE_MCODE_NULL) break; + + /* Decode the morton code to retrieve the tile index */ + tile_org[0] = morton2D_decode((uint32_t)(mcode>>0)); + tile_org[1] = morton2D_decode((uint32_t)(mcode>>1)); + ASSERT(tile_org[0] < ntiles_x && tile_org[1] < ntiles_y); + + /* Define the tile origin in pixel space */ + tile_org[0] *= TILE_SIZE; + tile_org[1] *= TILE_SIZE; + + /* Compute the size of the tile clamped by the borders of the buffer */ + tile_sz[0] = MMIN(TILE_SIZE, layout.width - tile_org[0]); + tile_sz[1] = MMIN(TILE_SIZE, layout.height - tile_org[1]); + + SSP(rng_proxy_create2 + (&htrdr->lifo_allocators[ithread], + &ssp_rng_threefry, + RNG_SEQUENCE_SIZE * (size_t)mcode, /* Offset */ + RNG_SEQUENCE_SIZE, /* Size */ + RNG_SEQUENCE_SIZE * (size_t)ntiles_adjusted, /* Pitch */ + 1, &rng_proxy)); + SSP(rng_proxy_create_rng(rng_proxy, 0, &rng)); + + res_local = draw_tile(htrdr, (size_t)ithread, mcode, tile_org, tile_sz, + pix_sz, cam, spp, rng, buf); + + SSP(rng_proxy_ref_put(rng_proxy)); + SSP(rng_ref_put(rng)); + + if(res_local != RES_OK) { + ATOMIC_SET(&res, res_local); + continue; + } + + n = (size_t)ATOMIC_INCR(&nsolved_tiles); + pcent = (int32_t)((double)n * 100.0 / (double)proc_ntiles + 0.5/*round*/); + + #pragma omp critical + if(pcent > htrdr->mpi_progress_render[0]) { + htrdr->mpi_progress_render[0] = pcent; + if(htrdr->mpi_rank == 0) { + update_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING); + } else { /* Send the progress percentage to the master process */ + send_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING, pcent); + } + } + if(ATOMIC_GET(&res) != RES_OK) continue; + } - /* Define the tile origin in pixel space */ - tile_org[0] *= TILE_SIZE; - tile_org[1] *= TILE_SIZE; + proc_work_reset(&work); + nthieves = mpi_steal_work(htrdr, rng_main, &work); + } while(nthieves); - /* Compute the size of the tile clamped by the borders of the buffer */ - tile_sz[0] = MMIN(TILE_SIZE, layout.width - tile_org[0]); - tile_sz[1] = MMIN(TILE_SIZE, layout.height - tile_org[1]); + /* Synchronize the process */ + mutex_lock(htrdr->mpi_mutex); + MPI(Ibarrier(MPI_COMM_WORLD, &req)); + mutex_unlock(htrdr->mpi_mutex); - res_local = draw_tile(htrdr, (size_t)ithread, mcode, tile_org, tile_sz, - pix_sz, cam, spp, rng, buf); - if(res_local != RES_OK) { - ATOMIC_SET(&res, res_local); - continue; - } + /* Wait for synchronization */ + mpi_wait_for_request(htrdr, &req); - n = (size_t)ATOMIC_INCR(&nsolved_tiles); - pcent = (int8_t)(n * 100 / proc_ntiles_adjusted); - - #pragma omp critical - if(pcent > htrdr->mpi_progress_render[0]) { - htrdr->mpi_progress_render[0] = pcent; - if(htrdr->mpi_rank == 0) { - update_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING); - } else { - /* Send the progress percentage of the process to the master process */ - CHK(MPI_Send(&pcent, sizeof(pcent), MPI_CHAR, 0/*dst*/, - HTRDR_MPI_PROGRESS_RENDERING/*tag*/, MPI_COMM_WORLD) == MPI_SUCCESS); - } + /* The processes have no more work to do. Stop probing for thieves */ + ATOMIC_SET(&probe_thieves, 0); } - - if(ATOMIC_GET(&res) != RES_OK) continue; } if(htrdr->mpi_rank == 0) { - while(total_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING) != 100) { - update_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING); - sleep(1); - } - fprintf(stderr, "\n"); + update_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING); + fprintf(stderr, "\n"); /* Add a new line after the progress statuses */ } /* Gather accum buffers from the group of processes */ - res = gather_buffer(htrdr, buf); + res = mpi_gather_buffer(htrdr, buf); if(res != RES_OK) goto error; exit: - if(rng_proxy) SSP(rng_proxy_ref_put(rng_proxy)); - if(proc_tiles) MEM_RM(htrdr->allocator, proc_tiles); - if(rngs) { - FOR_EACH(i, 0, htrdr->nthreads) { - if(rngs[i]) SSP(rng_ref_put(rngs[i])); - } - MEM_RM(htrdr->allocator, rngs); - } + proc_work_release(&work); + if(rng_main) SSP(rng_ref_put(rng_main)); return (res_T)res; error: goto exit; diff --git a/src/htrdr_main.c b/src/htrdr_main.c @@ -19,6 +19,18 @@ #include <mpi.h> #include <rsys/mem_allocator.h> +static const char* +thread_support_string(const int val) +{ + switch(val) { + case MPI_THREAD_SINGLE: return "MPI_THREAD_SINGLE"; + case MPI_THREAD_FUNNELED: return "MPI_THREAD_FUNNELED"; + case MPI_THREAD_SERIALIZED: return "MPI_THREAD_SERIALIZED"; + case MPI_THREAD_MULTIPLE: return "MPI_THREAD_MULTIPLE"; + default: FATAL("Unreachable code.\n"); break; + } +} + /******************************************************************************* * Program ******************************************************************************/ @@ -30,14 +42,22 @@ main(int argc, char** argv) size_t memsz = 0; int err = 0; int is_htrdr_init = 0; + int thread_support = 0; res_T res = RES_OK; - err = MPI_Init(&argc, &argv); + err = MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thread_support); if(err != MPI_SUCCESS) { fprintf(stderr, "Error initializing MPI.\n"); goto error; } + if(thread_support != MPI_THREAD_SERIALIZED) { + fprintf(stderr, "The provided MPI implementation does not support " + "serialized API calls from multiple threads. Provided thread support: " + "%s.\n", thread_support_string(thread_support)); + goto error; + } + res = htrdr_args_init(&args, argc, argv); if(res != RES_OK) goto error; if(args.quit) goto exit; diff --git a/src/htrdr_sky.c b/src/htrdr_sky.c @@ -1219,7 +1219,7 @@ setup_clouds const size_t iband = darray_specdata_data_get(&specdata)[ispecdata].iband; const size_t iquad = darray_specdata_data_get(&specdata)[ispecdata].iquad; const size_t id = iband - sky->sw_bands_range[0]; - int8_t pcent; + int32_t pcent; size_t n; res_T res_local = RES_OK; @@ -1268,17 +1268,15 @@ setup_clouds if(!sky->htrdr->cache_grids) { /* Update the progress message */ n = (size_t)ATOMIC_INCR(&nbuilt_octrees); - pcent = (int8_t)(n * 100 / darray_specdata_size_get(&specdata)); + pcent = (int32_t)(n * 100 / darray_specdata_size_get(&specdata)); #pragma omp critical if(pcent > sky->htrdr->mpi_progress_octree[0]) { sky->htrdr->mpi_progress_octree[0] = pcent; if(sky->htrdr->mpi_rank == 0) { update_mpi_progress(sky->htrdr, HTRDR_MPI_PROGRESS_BUILD_OCTREE); - } else { - /* Send the progress percentage of the process to the master process */ - CHK(MPI_Send(&pcent, sizeof(pcent), MPI_CHAR, 0/*dst*/, - HTRDR_MPI_PROGRESS_BUILD_OCTREE, MPI_COMM_WORLD) == MPI_SUCCESS); + } else { /* Send the progress percentage to the master process */ + send_mpi_progress(sky->htrdr, HTRDR_MPI_PROGRESS_BUILD_OCTREE, pcent); } } } @@ -1289,6 +1287,7 @@ setup_clouds update_mpi_progress(sky->htrdr, HTRDR_MPI_PROGRESS_BUILD_OCTREE); sleep(1); } + fprintf(stderr, "\n"); /* Add a new line after the progress statuses */ } exit: