commit f71c362bd3209a06e5eb6253fa6ac8986fce15c1
parent fefbba23644a740816523eb61626084b7ce6414c
Author: Vincent Forest <vincent.forest@meso-star.com>
Date: Wed, 1 Dec 2021 15:31:19 +0100
Add the gather_green_function function
Diffstat:
| M | src/sdis.c | | | 313 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
| M | src/sdis_c.h | | | 17 | ++++++++++++----- |
2 files changed, 325 insertions(+), 5 deletions(-)
diff --git a/src/sdis.c b/src/sdis.c
@@ -13,6 +13,8 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. */
+#define _POSIX_C_SOURCE 200112L
+
#include "sdis.h"
#include "sdis_c.h"
#include "sdis_device_c.h"
@@ -27,9 +29,12 @@
#include <star/ssp.h>
+#include <rsys/cstr.h>
#include <rsys/clock_time.h>
#include <rsys/mem_allocator.h>
+#include <errno.h>
+
/* Number random numbers in a sequence, i.e. number of consecutive random
* numbers that can be used by a thread */
#define RNG_SEQUENCE_SIZE 100000
@@ -47,6 +52,170 @@ rewind_progress_printing(struct sdis_device* dev)
log_info(dev, "\033[1A\r"); /* Move up */
}
}
+
+static res_T
+send_green_function_to_master_process
+ (struct sdis_device* dev,
+ struct sdis_green_function* green)
+{
+ char buf[128];
+ FILE* stream = NULL; /* Temp file that stores the serialized green function */
+ void* data = NULL; /* Pointer to serialized green function data */
+ long sz = 0; /* Size in Bytes of the serialized green function data */
+ res_T res = RES_OK;
+ ASSERT(dev && green && dev->mpi_rank != 0);
+
+ /* Open a stream to store the serialized green function */
+ stream = tmpfile();
+ if(!stream) {
+ log_err(dev,
+ "Could not open the stream used to temporary store the green function "
+ "before it is sent to the master process.\n");
+ res = RES_IO_ERR;
+ goto error;
+ }
+
+ /* Write the green function into the stream */
+ res = sdis_green_function_write(green, stream);
+ if(res != RES_OK) goto error;
+
+ /* Fetch the size of the serialized data */
+ sz = ftell(stream);
+ if(sz == -1) {
+ strerror_r(errno, buf, sizeof(buf));
+ log_err(dev,
+ "Could not query the size of the serialized green function data to sent "
+ "to the master process -- %s.\n", buf);
+ res = RES_IO_ERR;
+ goto error;
+ }
+ if(sz > INT_MAX) {
+ log_err(dev,
+ "The size of the green function data is too large. It must be less than "
+ "%d Mebabytes while it is of %ld MegabBytes.\n",
+ INT_MAX / (1024*1024), sz/(1024*1024));
+ res = RES_MEM_ERR;
+ goto error;
+ }
+
+ data = MEM_CALLOC(dev->allocator, 1, (size_t)sz);
+ if(!data) {
+ log_err(dev, "Could not allocate the memory to store the serialized green "
+ "function before it is sent to the master process.\n");
+ res = RES_MEM_ERR;
+ goto error;
+ }
+
+ /* Load in memory the serialized data */
+ rewind(stream);
+ if(fread(data, (size_t)sz, 1, stream) != 1) {
+ log_err(dev,
+ "Could not read the serialized green function data from the temporary "
+ "stream before it is sent to the master process.\n");
+ res = RES_IO_ERR;
+ goto error;
+ }
+
+ /* Send the serialized data to the master process */
+ mutex_lock(dev->mpi_mutex);
+ MPI(Send(data, (int)sz, MPI_CHAR, 0/*Dst*/, MPI_SDIS_MSG_GREEN_FUNCTION,
+ MPI_COMM_WORLD));
+ mutex_unlock(dev->mpi_mutex);
+
+exit:
+ if(stream) CHK(fclose(stream) == 0);
+ if(data) MEM_RM(dev->allocator, data);
+ return RES_OK;
+error:
+ goto exit;
+}
+
+static res_T
+gather_green_functions_from_non_master_process
+ (struct sdis_scene* scn,
+ struct sdis_green_function* greens[])
+{
+ void* data = NULL; /* Pointer to gathered serialized green function data */
+ FILE* stream = NULL; /* Temp file that stores the serialized green function */
+ int iproc;
+ res_T res = RES_OK;
+ ASSERT(scn->dev && greens && scn->dev->mpi_rank == 0);
+
+ FOR_EACH(iproc, 1, scn->dev->mpi_nprocs) {
+ MPI_Request req;
+ MPI_Status status;
+ int count;
+
+ /* Waiting for the serialized green function sent by the process `iproc'*/
+ mpi_waiting_for_message
+ (scn->dev, iproc, MPI_SDIS_MSG_GREEN_FUNCTION, &status);
+
+ /* Fetch the sizeof the green function sent by the process `iproc' */
+ mutex_lock(scn->dev->mpi_mutex);
+ MPI(Get_count(&status, MPI_CHAR, &count));
+ mutex_unlock(scn->dev->mpi_mutex);
+
+ /* Allocate the memory to store the serialized green function sent by the
+ * process `iproc' */
+ data = MEM_REALLOC(scn->dev->allocator, data, (size_t)count);
+ if(!data) {
+ log_err(scn->dev,
+ "Could not allocate %d bytes to store the serialized green function "
+ "sent by the process %d.\n",
+ count, iproc);
+ res = RES_MEM_ERR;
+ goto error;
+ }
+
+ /* Asynchronously receive the green function */
+ mutex_lock(scn->dev->mpi_mutex);
+ MPI(Irecv(data, count, MPI_CHAR, iproc, MPI_SDIS_MSG_GREEN_FUNCTION,
+ MPI_COMM_WORLD, &req));
+ mutex_unlock(scn->dev->mpi_mutex);
+ mpi_waiting_for_request(scn->dev, &req);
+
+ /* Open a stream to store the serialized green function */
+ stream = tmpfile();
+ if(!stream) {
+ log_err(scn->dev,
+ "Could not open the stream used to temporary store the green function "
+ "sent by the process %d.\n", iproc);
+ res = RES_IO_ERR;
+ goto error;
+ }
+
+ if(fwrite(data, (size_t)count, 1, stream) != 1) {
+ log_err(scn->dev,
+ "Could not write the green function sent by the process %d into the "
+ "temporary stream.\n", iproc);
+ res = RES_IO_ERR;
+ goto error;
+ }
+
+ /* Deserialized the green function of the process `iproc'. Note that the
+ * number of green functions to output is #procs - 1. Since we
+ * iterate over the indices of non master processes in [1, #procs],
+ * the index the green function to deserialized is iproc - 1 */
+ rewind(stream);
+ res = sdis_green_function_create_from_stream(scn, stream, &greens[iproc-1]);
+ if(res != RES_OK) {
+ log_err(scn->dev,
+ "Error deserializing the green function sent by the process %d -- %s.\n",
+ iproc, res_to_cstr(res));
+ goto error;
+ }
+
+ CHK(fclose(stream) == 0);
+ stream = NULL;
+ }
+
+exit:
+ if(data) MEM_RM(scn->dev->allocator, data);
+ if(stream) CHK(fclose(stream) == 0);
+ return res;
+error:
+ goto exit;
+}
#endif
/*******************************************************************************
@@ -369,6 +538,150 @@ error:
goto exit;
}
+#ifndef SDIS_ENABLE_MPI
+res_T
+gather_green_functions
+ (struct sdis_scene* scn,
+ struct ssp_rng_proxy* rng_proxy,
+ struct sdis_green_function* per_thread_green[],
+ const struct accum* per_thread_acc_time,
+ struct sdis_green_function** out_green)
+{
+ struct sdis_green_function* green = NULL;
+ struct accum acc_time = ACCUM_NULL;
+ res_T res = RES_OK;
+ ASSERT(scn && rng_proxy && per_thread_green && per_thread_acc_time);
+ ASSERT(out_green);
+
+ /* Redux the per thread green function into the green function of the 1st
+ * thread */
+ res = green_function_redux_and_clear
+ (per_thread_green[0], per_thread_green+1, scn->dev->nthreads-1);
+ if(res != RES_OK) goto error;
+
+ /* Return the green of the 1st thread */
+ SDIS(green_function_ref_get(per_thread_green[0]));
+ green = per_thread_green[0];
+
+ res = gather_accumulators
+ (scn->dev, MPI_SDIS_MSG_ACCUM_TIME, per_thread_acc_time, &acc_time);
+ if(res != RES_OK) goto error;
+
+ /* Finalize the estimated green */
+ res = green_function_finalize(green, rng_proxy, &acc_time);
+ if(res != RES_OK) goto error;
+
+exit:
+ *out_green = green;
+ return res;
+error:
+ if(green) { SDIS(green_function_ref_put(green)); green = NULL; }
+ goto exit;
+}
+#else
+res_T
+gather_green_functions
+ (struct sdis_scene* scn,
+ struct ssp_rng_proxy* rng_proxy,
+ struct sdis_green_function* per_thread_green[],
+ const struct accum* per_thread_acc_time,
+ struct sdis_green_function** out_green)
+{
+ struct accum acc_time = ACCUM_NULL;
+ struct sdis_green_function* green = NULL;
+ struct sdis_green_function** per_proc_green = NULL;
+ unsigned ithread;
+ int iproc;
+ res_T res = RES_OK;
+ ASSERT(scn && per_thread_green && out_green);
+
+ /* Redux the per thread green function into the green function of the 1st
+ * thread */
+ res = green_function_redux_and_clear
+ (per_thread_green[0], per_thread_green+1, scn->dev->nthreads-1);
+ if(res != RES_OK) goto error;
+
+ /* Gather the accumulators. The master process gathers all accumulators and
+ * non master process gather their per thread accumulators only that is is
+ * sent to the master process */
+ res = gather_accumulators
+ (scn->dev, MPI_SDIS_MSG_ACCUM_TIME, per_thread_acc_time, &acc_time);
+ if(res != RES_OK) goto error;
+
+ /* Non master process */
+ if(scn->dev->mpi_rank != 0) {
+ /* Return the green of the 1st thread */
+ SDIS(green_function_ref_get(per_thread_green[0]));
+ green = per_thread_green[0];
+
+ /* We have to finalize the green function priorly to its sent to the master
+ * process. Its serialization failed without it. */
+ res = green_function_finalize(green, rng_proxy, &acc_time);
+ if(res != RES_OK) goto error;
+
+ res = send_green_function_to_master_process(scn->dev, green);
+ if(res != RES_OK) goto error;
+
+ /* Master process */
+ } else {
+ /* Allocate the list of per process green functions */
+ per_proc_green = MEM_CALLOC(scn->dev->allocator,
+ (size_t)scn->dev->mpi_nprocs, sizeof(*per_proc_green));
+ if(!per_proc_green) {
+ log_err(scn->dev,
+ "Could not allocate the temporary list of per process "
+ "green functions.\n");
+ res = RES_MEM_ERR;
+ goto error;
+ }
+
+ /* Set the gathered per thread green function stores on thread 0 at the
+ * green function for master process */
+ SDIS(green_function_ref_get(per_thread_green[0]));
+ per_proc_green[0] = per_thread_green[0];
+
+ /* Release per thread green functions */
+ FOR_EACH(ithread, 0, scn->dev->nthreads) {
+ SDIS(green_function_ref_put(per_thread_green[ithread]));
+ per_thread_green[ithread] = NULL;
+ }
+
+ res = gather_green_functions_from_non_master_process(scn, per_proc_green+1);
+ if(res != RES_OK) goto error;
+
+ /* Redux the per proc green function into the green function of the master
+ * process */
+ res = green_function_redux_and_clear
+ (per_proc_green[0], per_proc_green+1, (size_t)scn->dev->mpi_nprocs-1);
+ if(res != RES_OK) goto error;
+
+ /* Return the gatherd green function of the master process */
+ SDIS(green_function_ref_get(per_proc_green[0]));
+ green = per_proc_green[0];
+
+ /* Finalize the green function */
+ res = green_function_finalize(green, rng_proxy, &acc_time);
+ if(res != RES_OK) goto error;
+ }
+
+exit:
+ if(per_proc_green) {
+ FOR_EACH(iproc, 0, scn->dev->mpi_nprocs) {
+ if(per_proc_green[iproc]) {
+ SDIS(green_function_ref_put(per_proc_green[iproc]));
+ }
+ }
+ MEM_RM(scn->dev->allocator, per_proc_green);
+ }
+ *out_green = green;
+ return res;
+error:
+ if(green) { SDIS(green_function_ref_put(green)); green = NULL; }
+ goto exit;
+}
+
+#endif
+
void
print_progress
(struct sdis_device* dev,
diff --git a/src/sdis_c.h b/src/sdis_c.h
@@ -20,9 +20,10 @@
/* Id of the messages sent between processes */
enum mpi_sdis_message {
- MPI_SDIS_MSG_PROGRESS, /* Progress status */
MPI_SDIS_MSG_ACCUM_TEMP, /* Temperature accumulator */
MPI_SDIS_MSG_ACCUM_TIME, /* Time accumulator */
+ MPI_SDIS_MSG_GREEN_FUNCTION, /* Serialized green function */
+ MPI_SDIS_MSG_PROGRESS, /* Progress status */
MPI_SDIS_MSG_COUNT__
};
@@ -97,11 +98,17 @@ setup_estimator
const struct accum* acc_time,
const size_t overall_realisations_count);
+/* Gather the green functions. With MPI, non master processes store in green
+ * the gathering of their per thread green functions and sent the result to the
+ * master process. The master process gathers both per thread green functions
+ * and per process ones and finally save the result in green */
extern LOCAL_SYM res_T
-setup_green_function
- (struct sdis_green_function* per_thread_green[],
- const struct ssp_rng_proxy* proxy,
- const struct accum* per_thread_acc_time);
+gather_green_functions
+ (struct sdis_scene* scn,
+ struct ssp_rng_proxy* proxy,
+ struct sdis_green_function* per_thread_green[],
+ const struct accum* acc_time,
+ struct sdis_green_function** green);
/* Print the progress status. With MPI, the master process print the progress
* of all processes stored in the progress list. Non master processes do not