commit 1ac4a8bd590835683bd6287edf708fcd9589b2d6
parent c7b47ff81cd070a3254159cdc5574528ee77a82f
Author: Vincent Forest <vincent.forest@meso-star.com>
Date: Mon, 22 Oct 2018 14:41:26 +0200
Add support of MPI
Diffstat:
7 files changed, 270 insertions(+), 28 deletions(-)
diff --git a/cmake/CMakeLists.txt b/cmake/CMakeLists.txt
@@ -34,11 +34,14 @@ find_package(HTCP REQUIRED)
find_package(HTGOP REQUIRED)
find_package(HTMIE REQUIRED)
find_package(OpenMP 1.2 REQUIRED)
+find_package(MPI REQUIRED)
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${RCMAKE_SOURCE_DIR})
include(rcmake)
include(rcmake_runtime)
+set(CMAKE_C_COMPILER ${MPI_C_COMPILER})
+
include_directories(
${RSys_INCLUDE_DIR}
${Star3D_INCLUDE_DIR}
@@ -48,7 +51,8 @@ include_directories(
${StarVX_INCLUDE_DIR}
${HTCP_INCLUDE_DIR}
${HTGOP_INCLUDE_DIR}
- ${HTMIE_INCLUDE_DIR})
+ ${HTMIE_INCLUDE_DIR}
+ ${MPI_INCLUDE_PATH})
################################################################################
# Configure and define targets
diff --git a/src/htrdr.c b/src/htrdr.c
@@ -26,6 +26,7 @@
#include "htrdr_solve.h"
#include <rsys/clock_time.h>
+#include <rsys/cstr.h>
#include <rsys/mem_allocator.h>
#include <rsys/str.h>
@@ -43,6 +44,7 @@
#include <sys/stat.h> /* S_IRUSR & S_IWUSR */
#include <omp.h>
+#include <mpi.h>
/*******************************************************************************
* Helper functions
@@ -91,7 +93,7 @@ log_msg
va_list vargs)
{
ASSERT(htrdr && msg);
- if(htrdr->verbose) {
+ if(htrdr->verbose && htrdr->mpi_rank == 0) {
CHK(logger_vprint(&htrdr->logger, stream, msg, vargs) == RES_OK);
}
}
@@ -243,6 +245,51 @@ spherical_to_cartesian_dir
dir[2] = sin_elevation;
}
+static res_T
+init_mpi(struct htrdr* htrdr)
+{
+ int err;
+ res_T res = RES_OK;
+ ASSERT(htrdr);
+
+ htrdr->mpi_err_str = MEM_CALLOC
+ (htrdr->allocator, htrdr->nthreads, MPI_MAX_ERROR_STRING);
+ if(!htrdr->mpi_err_str) {
+ res = RES_MEM_ERR;
+ htrdr_log_err(htrdr,
+ "could not allocate the MPI error strings -- %s.\n",
+ res_to_cstr(res));
+ goto error;
+ }
+
+ err = MPI_Comm_rank(MPI_COMM_WORLD, &htrdr->mpi_rank);
+ if(err != MPI_SUCCESS) {
+ htrdr_log_err(htrdr,
+ "could not determine the MPI rank of the calling process -- %s.\n",
+ htrdr_mpi_error_string(htrdr, err));
+ res = RES_UNKNOWN_ERR;
+ goto error;
+ }
+
+ err = MPI_Comm_size(MPI_COMM_WORLD, &htrdr->mpi_nprocs);
+ if(err != MPI_SUCCESS) {
+ htrdr_log_err(htrdr,
+ "could retrieve the size of the MPI group -- %s.\n",
+ htrdr_mpi_error_string(htrdr, err));
+ res = RES_UNKNOWN_ERR;
+ goto error;
+ }
+
+exit:
+ return res;
+error:
+ if(htrdr->mpi_err_str) {
+ MEM_RM(htrdr->allocator, htrdr->mpi_err_str);
+ htrdr->mpi_err_str = NULL;
+ }
+ goto exit;
+}
+
/*******************************************************************************
* Local functions
******************************************************************************/
@@ -275,6 +322,20 @@ htrdr_init
htrdr->nthreads = MMIN(args->nthreads, (unsigned)omp_get_num_procs());
htrdr->spp = args->image.spp;
+ res = init_mpi(htrdr);
+ if(res != RES_OK) goto error;
+
+ if((size_t)htrdr->mpi_nprocs > htrdr->spp) {
+ htrdr_log_err(htrdr,
+ "%s: insufficient number samples per pixel `%lu': it must be greater or "
+ "equal to the number of running processes, i.e. `%lu'.\n",
+ FUNC_NAME,
+ (unsigned long)htrdr->spp,
+ (unsigned long)htrdr->mpi_nprocs);
+ res = RES_BAD_ARG;
+ goto error;
+ }
+
if(!args->output) {
htrdr->output = stdout;
output_name = "<stdout>";
@@ -287,14 +348,17 @@ htrdr_init
res = str_set(&htrdr->output_name, output_name);
if(res != RES_OK) {
htrdr_log_err(htrdr,
- "could not store the name of the output stream `%s'.\n", output_name);
+ "%s: could not store the name of the output stream `%s' -- %s.\n",
+ FUNC_NAME, output_name, res_to_cstr(res));
goto error;
}
res = svx_device_create
(&htrdr->logger, htrdr->allocator, args->verbose, &htrdr->svx);
if(res != RES_OK) {
- htrdr_log_err(htrdr, "could not create the Star-VoXel device.\n");
+ htrdr_log_err(htrdr,
+ "%s: could not create the Star-VoXel device -- %s.\n",
+ FUNC_NAME, res_to_cstr(res));
goto error;
}
@@ -304,7 +368,9 @@ htrdr_init
res = s3d_device_create
(&htrdr->logger, htrdr->allocator, 0, &htrdr->s3d);
if(res != RES_OK) {
- htrdr_log_err(htrdr, "could not create the Star-3D device.\n");
+ htrdr_log_err(htrdr,
+ "%s: could not create the Star-3D device -- %s.\n",
+ FUNC_NAME, res_to_cstr(res));
goto error;
}
@@ -342,9 +408,10 @@ htrdr_init
htrdr->lifo_allocators = MEM_CALLOC
(htrdr->allocator, htrdr->nthreads, sizeof(*htrdr->lifo_allocators));
if(!htrdr->lifo_allocators) {
- htrdr_log_err(htrdr,
- "could not allocate the list of per thread LIFO allocator.\n");
res = RES_MEM_ERR;
+ htrdr_log_err(htrdr,
+ "%s: could not allocate the list of per thread LIFO allocator -- %s.\n",
+ FUNC_NAME, res_to_cstr(res));
goto error;
}
@@ -353,13 +420,13 @@ htrdr_init
(&htrdr->lifo_allocators[ithread], htrdr->allocator, 4096);
if(res != RES_OK) {
htrdr_log_err(htrdr,
- "could not initialise the LIFO allocator of the thread %lu.\n",
- (unsigned long)ithread);
+ "%s: could not initialise the LIFO allocator of the thread %lu -- %s.\n",
+ FUNC_NAME, (unsigned long)ithread, res_to_cstr(res));
goto error;
}
}
- exit:
+exit:
return res;
error:
htrdr_release(htrdr);
@@ -377,6 +444,7 @@ htrdr_release(struct htrdr* htrdr)
if(htrdr->sun) htrdr_sun_ref_put(htrdr->sun);
if(htrdr->cam) htrdr_camera_ref_put(htrdr->cam);
if(htrdr->buf) htrdr_buffer_ref_put(htrdr->buf);
+ if(htrdr->mpi_err_str) MEM_RM(htrdr->allocator, htrdr->mpi_err_str);
if(htrdr->lifo_allocators) {
size_t i;
FOR_EACH(i, 0, htrdr->nthreads) {
@@ -395,6 +463,10 @@ htrdr_run(struct htrdr* htrdr)
if(htrdr->dump_vtk) {
const size_t nbands = htrdr_sky_get_sw_spectral_bands_count(htrdr->sky);
size_t i;
+
+ /* Nothing to do */
+ if(htrdr->mpi_rank != 0) goto exit;
+
FOR_EACH(i, 0, nbands) {
const size_t iband = htrdr_sky_get_sw_spectral_band_id(htrdr->sky, i);
const size_t nquads = htrdr_sky_get_sw_spectral_band_quadrature_length
@@ -417,9 +489,11 @@ htrdr_run(struct htrdr* htrdr)
time_dump(&t0, TIME_ALL, NULL, buf, sizeof(buf));
htrdr_log(htrdr, "Rendering time: %s\n", buf);
- res = dump_accum_buffer
- (htrdr, htrdr->buf, str_cget(&htrdr->output_name), htrdr->output);
- if(res != RES_OK) goto error;
+ if(htrdr->mpi_rank == 0) {
+ res = dump_accum_buffer
+ (htrdr, htrdr->buf, str_cget(&htrdr->output_name), htrdr->output);
+ if(res != RES_OK) goto error;
+ }
}
exit:
return res;
@@ -457,6 +531,40 @@ htrdr_log_warn(struct htrdr* htrdr, const char* msg, ...)
va_end(vargs_list);
}
+const char*
+htrdr_mpi_error_string(struct htrdr* htrdr, const int mpi_err)
+{
+ const int ithread = omp_get_thread_num();
+ char* str;
+ int strlen_err;
+ int err;
+ ASSERT(htrdr && (size_t)ithread < htrdr->nthreads);
+ str = htrdr->mpi_err_str + ithread*MPI_MAX_ERROR_STRING;
+ err = MPI_Error_string(mpi_err, str, &strlen_err);
+ return err == MPI_SUCCESS ? str : "Invalid MPI error";
+}
+
+void
+htrdr_fprintf(struct htrdr* htrdr, FILE* stream, const char* msg, ...)
+{
+ ASSERT(htrdr && msg);
+ if(htrdr->mpi_rank == 0) {
+ va_list vargs_list;
+ va_start(vargs_list, msg);
+ vfprintf(stream, msg, vargs_list);
+ va_end(vargs_list);
+ }
+}
+
+void
+htrdr_fflush(struct htrdr* htrdr, FILE* stream)
+{
+ ASSERT(htrdr);
+ if(htrdr->mpi_rank == 0) {
+ fflush(stream);
+ }
+}
+
/*******************************************************************************
* Local functions
******************************************************************************/
diff --git a/src/htrdr.h b/src/htrdr.h
@@ -60,6 +60,10 @@ struct htrdr {
int dump_vtk;
int verbose;
+ int mpi_rank;
+ int mpi_nprocs;
+ char* mpi_err_str;
+
struct logger logger;
struct mem_allocator* allocator;
struct mem_allocator* lifo_allocators; /* Per thread lifo allocator */
@@ -109,5 +113,26 @@ htrdr_log_warn
#endif
;
+extern LOCAL_SYM const char*
+htrdr_mpi_error_string
+ (struct htrdr* htrdr,
+ const int mpi_err);
+
+extern LOCAL_SYM void
+htrdr_fprintf
+ (struct htrdr* htrdr,
+ FILE* stream,
+ const char* msg,
+ ...)
+#ifdef COMPILER_GCC
+ __attribute((format(printf, 3, 4)))
+#endif
+ ;
+
+extern LOCAL_SYM void
+htrdr_fflush
+ (struct htrdr* htrdr,
+ FILE* stream);
+
#endif /* HTRDR_H */
diff --git a/src/htrdr_buffer.c b/src/htrdr_buffer.c
@@ -163,3 +163,4 @@ htrdr_buffer_at(struct htrdr_buffer* buf, const size_t x, const size_t y)
ASSERT(buf && x < buf->width && y < buf->height);
return buf->mem + y*buf->pitch + x*buf->elmtsz;
}
+
diff --git a/src/htrdr_draw_radiance_sw.c b/src/htrdr_draw_radiance_sw.c
@@ -20,12 +20,16 @@
#include "htrdr_sky.h"
#include "htrdr_solve.h"
+#include <rsys/cstr.h>
+#include <rsys/math.h>
#include <star/ssp.h>
#include <omp.h>
-#include <rsys/math.h>
+#include <mpi.h>
-#define TILE_SIZE 32 /* definition in X & Y of a tile */
+#define RNG_SEQUENCE_SIZE 1000000
+
+#define TILE_SIZE 32 /* Definition in X & Y of a tile */
STATIC_ASSERT(IS_POW2(TILE_SIZE), TILE_SIZE_must_be_a_power_of_2);
/*******************************************************************************
@@ -43,6 +47,76 @@ morton2D_decode(const uint32_t u32)
}
static res_T
+gather_buffer
+ (struct htrdr* htrdr,
+ struct htrdr_buffer* buf)
+{
+ struct htrdr_buffer_layout layout;
+ struct htrdr_accum* gathered_accums = NULL;
+ size_t x, y;
+ int iproc;
+ res_T res = RES_OK;
+ ASSERT(htrdr && buf);
+
+ /* Fetch the memory layout of the submitted buffer */
+ htrdr_buffer_get_layout(buf, &layout);
+ ASSERT(layout.elmt_size == sizeof(struct htrdr_accum) * 3/*#channels*/);
+ ASSERT(layout.alignment <= ALIGNOF(struct htrdr_accum));
+
+ /* The process 0 allocates the memory used to store the gathered buffer lines
+ * of the MPI processes */
+ if(htrdr->mpi_rank == 0) {
+ gathered_accums = MEM_ALLOC
+ (htrdr->allocator, layout.pitch * (size_t)htrdr->mpi_nprocs);
+ if(!gathered_accums) {
+ res = RES_MEM_ERR;
+ htrdr_log_err(htrdr,
+ "could not allocate the temporary memory for MPI gathering -- %s.\n",
+ res_to_cstr(res));
+ goto error;
+ }
+ }
+
+ FOR_EACH(y, 0, layout.height) {
+ struct htrdr_accum* buf_row_accums = (struct htrdr_accum*)
+ ((char*)htrdr_buffer_get_data(buf) + y * layout.pitch);
+ int err; /* MPI error */
+
+ /* Gather the buffer lines */
+ err = MPI_Gather(buf_row_accums, (int)layout.pitch, MPI_CHAR, gathered_accums,
+ (int)layout.pitch, MPI_CHAR, 0, MPI_COMM_WORLD);
+ if(err != MPI_SUCCESS) {
+ htrdr_log_err(htrdr,
+"could not gather the buffer line `%lu' from the group of processes -- %s.\n",
+ (unsigned long)y, htrdr_mpi_error_string(htrdr, err));
+ res = RES_UNKNOWN_ERR;
+ goto error;
+ }
+
+ /* Accumulates the gathered lines into the buffer of the process 0 */
+ if(htrdr->mpi_rank == 0) {
+ memset(buf_row_accums, 0, layout.pitch);
+ FOR_EACH(iproc, 0, htrdr->mpi_nprocs) {
+ struct htrdr_accum* proc_accums = (struct htrdr_accum*)
+ ((char*)gathered_accums + (size_t)iproc * layout.pitch);
+ FOR_EACH(x, 0, layout.width * 3/*#channels*/) {
+ buf_row_accums[x].sum_weights += proc_accums[x].sum_weights;
+ buf_row_accums[x].sum_weights_sqr += proc_accums[x].sum_weights_sqr;
+ buf_row_accums[x].nweights += proc_accums[x].nweights;
+ buf_row_accums[x].nfailures += proc_accums[x].nfailures;
+ }
+ }
+ }
+ }
+
+exit:
+ if(gathered_accums) MEM_RM(htrdr->allocator, gathered_accums);
+ return res;
+error:
+ goto exit;
+}
+
+static res_T
draw_tile
(struct htrdr* htrdr,
const size_t ithread,
@@ -140,7 +214,7 @@ res_T
htrdr_draw_radiance_sw
(struct htrdr* htrdr,
const struct htrdr_camera* cam,
- const size_t spp,
+ const size_t total_spp,
struct htrdr_buffer* buf)
{
struct ssp_rng_proxy* rng_proxy = NULL;
@@ -151,6 +225,7 @@ htrdr_draw_radiance_sw
int32_t mcode; /* Morton code of the tile */
struct htrdr_buffer_layout layout;
double pix_sz[2]; /* Pixel size in the normalized image plane */
+ size_t spp;
ATOMIC nsolved_tiles = 0;
ATOMIC res = RES_OK;
ASSERT(htrdr && cam && buf);
@@ -158,6 +233,13 @@ htrdr_draw_radiance_sw
htrdr_buffer_get_layout(buf, &layout);
ASSERT(layout.width || layout.height || layout.elmt_size);
+ spp = total_spp / (size_t)htrdr->mpi_nprocs;
+
+ /* Add the remaining realisations to the 1st process */
+ if(htrdr->mpi_rank == 0) {
+ spp += total_spp - (spp*(size_t)htrdr->mpi_nprocs);
+ }
+
if(layout.elmt_size != sizeof(struct htrdr_accum[3])/*#channels*/
|| layout.alignment < ALIGNOF(struct htrdr_accum[3])) {
htrdr_log_err(htrdr,
@@ -168,8 +250,13 @@ htrdr_draw_radiance_sw
goto error;
}
- res = ssp_rng_proxy_create
- (htrdr->allocator, &ssp_rng_mt19937_64, htrdr->nthreads, &rng_proxy);
+ res = ssp_rng_proxy_create2
+ (htrdr->allocator,
+ &ssp_rng_mt19937_64,
+ RNG_SEQUENCE_SIZE * (size_t)htrdr->mpi_rank, /* Offset */
+ RNG_SEQUENCE_SIZE, /* Size */
+ RNG_SEQUENCE_SIZE * (size_t)htrdr->mpi_nprocs, /* Pitch */
+ htrdr->nthreads, &rng_proxy);
if(res != RES_OK) {
htrdr_log_err(htrdr, "%s: could not create the RNG proxy.\n", FUNC_NAME);
goto error;
@@ -199,8 +286,8 @@ htrdr_draw_radiance_sw
pix_sz[0] = 1.0 / (double)layout.width;
pix_sz[1] = 1.0 / (double)layout.height;
- fprintf(stderr, "Rendering: %3i%%", 0);
- fflush(stderr);
+ htrdr_fprintf(htrdr, stderr, "Rendering: %3i%%", 0);
+ htrdr_fflush(htrdr, stderr);
omp_set_num_threads((int)htrdr->nthreads);
@@ -241,14 +328,18 @@ htrdr_draw_radiance_sw
#pragma omp critical
if(pcent > progress) {
progress = pcent;
- fprintf(stderr, "%c[2K\rRendering: %3lu%%",
+ htrdr_fprintf(htrdr, stderr, "%c[2K\rRendering: %3lu%%",
27, (unsigned long)pcent);
- fflush(stderr);
+ htrdr_fflush(htrdr, stderr);
}
if(ATOMIC_GET(&res) != RES_OK) continue;
}
- fprintf(stderr, "\n");
+ htrdr_fprintf(htrdr, stderr, "\n");
+
+ /* Gather accum buffers from the group of processes */
+ res = gather_buffer(htrdr, buf);
+ if(res != RES_OK) goto error;
exit:
if(rng_proxy) SSP(rng_proxy_ref_put(rng_proxy));
diff --git a/src/htrdr_main.c b/src/htrdr_main.c
@@ -16,6 +16,7 @@
#include "htrdr.h"
#include "htrdr_args.h"
+#include <mpi.h>
#include <rsys/mem_allocator.h>
/*******************************************************************************
@@ -31,6 +32,15 @@ main(int argc, char** argv)
int is_htrdr_init = 0;
res_T res = RES_OK;
+ err = MPI_Init(&argc, &argv);
+ if(err != MPI_SUCCESS) {
+ char str[MPI_MAX_ERROR_STRING];
+ int i;
+ CHK(MPI_Error_string(err, str, &i) == MPI_SUCCESS);
+ fprintf(stderr, "Error initializing MPI -- %s.\n", str);
+ goto error;
+ }
+
res = htrdr_args_init(&args, argc, argv);
if(res != RES_OK) goto error;
if(args.quit) goto exit;
@@ -43,6 +53,7 @@ main(int argc, char** argv)
if(res != RES_OK) goto error;
exit:
+ MPI_Finalize();
if(is_htrdr_init) htrdr_release(&htrdr);
htrdr_args_release(&args);
if((memsz = mem_allocated_size()) != 0) {
diff --git a/src/htrdr_sky.c b/src/htrdr_sky.c
@@ -1009,8 +1009,9 @@ setup_cloud_grid
/* Compute the overall number of voxels in the htrdr_grid */
ncells = definition[0] * definition[1] * definition[2];
- fprintf(stderr, "Generating cloud grid %lu.%lu: %3u%%", iband, iquad, 0);
- fflush(stderr);
+ htrdr_fprintf(sky->htrdr, stderr,
+ "Generating cloud grid %lu.%lu: %3u%%", iband, iquad, 0);
+ htrdr_fflush(sky->htrdr, stderr);
omp_set_num_threads((int)sky->htrdr->nthreads);
#pragma omp parallel for
@@ -1033,12 +1034,13 @@ setup_cloud_grid
#pragma omp critical
if(pcent > progress) {
progress = pcent;
- fprintf(stderr, "%c[2K\rGenerating cloud grid %lu.%lu: %3u%%",
+ htrdr_fprintf(sky->htrdr, stderr,
+ "%c[2K\rGenerating cloud grid %lu.%lu: %3u%%",
27, iband, iquad, (unsigned)pcent);
- fflush(stderr);
+ htrdr_fflush(sky->htrdr, stderr);
}
}
- fprintf(stderr, "\n");
+ htrdr_fprintf(sky->htrdr, stderr, "\n");
exit:
*out_grid = grid;