commit 28338e1177010cccfa121818b520098202ad39e1
parent 537e23b9d2c5f727871a808ee8cdb2f22c72a6cb
Author: Vincent Forest <vincent.forest@meso-star.com>
Date: Wed, 15 Apr 2015 17:39:59 +0200
Add parallelism to the smc_solve function
Major refactoring of the smc_solve_N parallelism
Diffstat:
7 files changed, 222 insertions(+), 69 deletions(-)
diff --git a/src/smc.h b/src/smc.h
@@ -53,6 +53,8 @@
#define SMC(Func) smc_ ## Func
#endif
+#define SMC_NTHREADS_DEFAULT (~0u)
+
/* Forward declaration of external types */
struct logger;
struct mem_allocator;
@@ -87,11 +89,10 @@ struct smc_integrator {
void (*integrand)(void* value, struct ssp_rng* rng, void* ctx);
const struct smc_type* type;
unsigned long max_steps; /* Maximum # of experiments */
- unsigned long max_time; /* Hint on the maximum time in micro seconds */
};
static const struct smc_integrator SMC_INTEGRATOR_NULL =
-{ NULL, NULL, ULONG_MAX, ULONG_MAX };
+{ NULL, NULL, ULONG_MAX };
/* Forward declaration of opaque types */
struct smc_device; /* Entry point of the library */
@@ -114,6 +115,7 @@ SMC_API res_T
smc_device_create
(struct logger* logger, /* May be NULL <=> use default logger */
struct mem_allocator* allocator, /* May be NULL <=> use default allocator */
+ const unsigned nthreads_hint, /* Hint on the number of threads to use */
struct smc_device** dev);
SMC_API res_T
diff --git a/src/smc_device.c b/src/smc_device.c
@@ -37,6 +37,8 @@
#include <rsys/logger.h>
#include <rsys/mem_allocator.h>
+#include <omp.h>
+
/*******************************************************************************
* Helper functions
******************************************************************************/
@@ -44,9 +46,14 @@ static void
device_release(ref_T* ref)
{
struct smc_device* dev;
+ size_t i;
ASSERT(ref);
dev = CONTAINER_OF(ref, struct smc_device, ref);
- if(dev->rng) ssp_rng_ref_put(dev->rng);
+ if(dev->rng_proxy) ssp_rng_proxy_ref_put(dev->rng_proxy);
+ FOR_EACH(i, 0, sa_size(dev->rngs)) {
+ ssp_rng_ref_put(dev->rngs[i]);
+ }
+ sa_release(dev->rngs);
MEM_FREE(dev->allocator, dev);
}
@@ -57,13 +64,15 @@ res_T
smc_device_create
(struct logger* logger,
struct mem_allocator* mem_allocator,
+ const unsigned nthreads_hint,
struct smc_device** out_dev)
{
struct smc_device* dev = NULL;
struct mem_allocator* allocator;
+ unsigned i, nthreads;
res_T res = RES_OK;
- if(!out_dev) {
+ if(!out_dev || !nthreads_hint) {
res = RES_BAD_ARG;
goto exit;
}
@@ -76,10 +85,21 @@ smc_device_create
ref_init(&dev->ref);
dev->allocator = allocator;
dev->logger = logger ? logger : LOGGER_DEFAULT;
- res = ssp_rng_create(allocator, &ssp_rng_mt19937_64, &dev->rng);
+
+ nthreads = MMIN(nthreads_hint, (unsigned)omp_get_num_procs());
+ omp_set_num_threads((int)nthreads);
+
+ res = ssp_rng_proxy_create
+ (allocator, &ssp_rng_mt19937_64, nthreads, &dev->rng_proxy);
if(res != RES_OK)
goto error;
+ dev->rngs = sa_add(dev->rngs, nthreads);
+ FOR_EACH(i, 0, nthreads) {
+ res = ssp_rng_proxy_create_rng(dev->rng_proxy, i, dev->rngs + i);
+ if(res != RES_OK) goto error;
+ }
+
exit:
if(out_dev) *out_dev = dev;
return res;
diff --git a/src/smc_device_c.h b/src/smc_device_c.h
@@ -33,13 +33,22 @@
#define SMC_DEVICE_C_H
#include <rsys/ref_count.h>
+#include <rsys/stretchy_array.h>
struct smc_device {
- struct ssp_rng* rng;
+ struct ssp_rng_proxy* rng_proxy;
+ struct ssp_rng** rngs;
struct logger* logger;
struct mem_allocator* allocator;
ref_T ref;
};
+static FINLINE size_t
+device_get_threads_count(const struct smc_device* dev)
+{
+ ASSERT(dev);
+ return sa_size(dev->rngs);
+}
+
#endif /* SMC_DEVICE_C_H */
diff --git a/src/smc_integrator.c b/src/smc_integrator.c
@@ -37,6 +37,8 @@
#include <rsys/mem_allocator.h>
#include <limits.h>
+#include <omp.h>
+#include <string.h>
struct smc_estimator {
struct smc_type type;
@@ -53,6 +55,53 @@ struct smc_estimator {
/*******************************************************************************
* Helper functions
******************************************************************************/
+static res_T
+estimator_create
+ (struct smc_device* dev,
+ const struct smc_type* type,
+ struct smc_estimator** out_estimator)
+{
+ struct smc_estimator* estimator = NULL;
+ res_T res = RES_OK;
+ ASSERT(out_estimator && dev && type);
+
+ estimator = MEM_CALLOC(dev->allocator, 1, sizeof(struct smc_estimator));
+ if(!estimator) {
+ res = RES_MEM_ERR;
+ goto error;
+ }
+ SMC(device_ref_get(dev));
+ estimator->dev = dev;
+ ref_init(&estimator->ref);
+
+ #define TYPE_CREATE(Dst) { \
+ if(!((Dst) = type->create(dev->allocator))) { \
+ res = RES_MEM_ERR; \
+ goto error; \
+ } \
+ type->zero((Dst)); \
+ } (void)0
+ TYPE_CREATE(estimator->value);
+ TYPE_CREATE(estimator->square_value);
+ TYPE_CREATE(estimator->status.E);
+ TYPE_CREATE(estimator->status.V);
+ TYPE_CREATE(estimator->status.SE);
+ #undef TYPE_CREATE
+ estimator->nsamples = 0;
+ estimator->status.N = 0;
+ estimator->type = *type;
+
+exit:
+ *out_estimator = estimator;
+ return res;
+error:
+ if(estimator) {
+ SMC(estimator_ref_put(estimator));
+ estimator = NULL;
+ }
+ goto exit;
+}
+
static char
check_integrator(struct smc_integrator* integrator)
{
@@ -60,7 +109,6 @@ check_integrator(struct smc_integrator* integrator)
return integrator->integrand
&& integrator->type
&& integrator->max_steps
- && integrator->max_time
&& check_type(integrator->type);
}
@@ -98,66 +146,78 @@ smc_solve
struct smc_estimator** out_estimator)
{
struct smc_estimator* estimator = NULL;
- struct time time_begin;
unsigned long i;
- void* val = NULL;
+ size_t nthreads = 0;
+ char* raw = NULL;
+ void** vals = NULL;
+ void** accums = NULL;
+ void** accums_sqr = NULL;
+ size_t* nsamples = NULL;
res_T res = RES_OK;
if(!dev || !integrator || !out_estimator || !check_integrator(integrator)) {
res = RES_BAD_ARG;
goto error;
}
+ nthreads = device_get_threads_count(dev);
- estimator = MEM_CALLOC(dev->allocator, 1, sizeof(struct smc_estimator));
- if(!estimator) {
+ /* Create per thread temporary variables */
+ raw = MEM_CALLOC(dev->allocator, nthreads, sizeof(void*)*3 + sizeof(size_t));
+ if(!raw) {
res = RES_MEM_ERR;
goto error;
}
- SMC(device_ref_get(dev));
- estimator->dev = dev;
- ref_init(&estimator->ref);
-
+ vals = (void**) (raw + 0 * sizeof(void*) * nthreads);
+ accums = (void**) (raw + 1 * sizeof(void*) * nthreads);
+ accums_sqr = (void**) (raw + 2 * sizeof(void*) * nthreads);
+ nsamples = (size_t*)(raw + 3 * sizeof(void*) * nthreads);
#define TYPE_CREATE(Dst) { \
- (Dst) = integrator->type->create(dev->allocator); \
- if(!(Dst)) { \
+ if(!((Dst) = integrator->type->create(dev->allocator))) { \
res = RES_MEM_ERR; \
goto error; \
} \
integrator->type->zero((Dst)); \
} (void)0
- TYPE_CREATE(estimator->value);
- TYPE_CREATE(estimator->square_value);
- TYPE_CREATE(estimator->status.E);
- TYPE_CREATE(estimator->status.V);
- TYPE_CREATE(estimator->status.SE);
- TYPE_CREATE(val);
+ FOR_EACH(i, 0, nthreads) {
+ TYPE_CREATE(vals[i]);
+ TYPE_CREATE(accums[i]);
+ TYPE_CREATE(accums_sqr[i]);
+ nsamples[i] = 0;
+ }
#undef TYPE_CREATE
- estimator->nsamples = 0;
- estimator->status.N = 0;
- estimator->type = *integrator->type;
- if(integrator->max_time != ULONG_MAX)
- time_current(&time_begin);
+ /* Create the estimator */
+ res = estimator_create(dev, integrator->type, &estimator);
+ if(res != RES_OK) goto error;
- FOR_EACH(i, 0, integrator->max_steps) {
- struct time elapsed_time;
-
- integrator->integrand(val, dev->rng, ctx);
- integrator->type->add(estimator->value, estimator->value, val);
- integrator->type->mul(val, val, val);
- integrator->type->add(estimator->square_value, estimator->square_value, val);
- ++estimator->nsamples;
+ /* Parallel evaluation of the simulation */
+ #pragma omp parallel for schedule(static)
+ for(i = 0; i < integrator->max_steps; ++i) {
+ const int ithread = omp_get_thread_num();
+ integrator->integrand(vals[ithread], dev->rngs[ithread], ctx);
+ integrator->type->add(accums[ithread], accums[ithread], vals[ithread]);
+ integrator->type->mul(vals[ithread], vals[ithread], vals[ithread]);
+ integrator->type->add(accums_sqr[ithread], accums_sqr[ithread], vals[ithread]);
+ ++nsamples[ithread];
+ }
- if(integrator->max_time != ULONG_MAX) {
- time_current(&elapsed_time);
- time_sub(&elapsed_time, &elapsed_time, &time_begin);
- if((unsigned long)time_val(&elapsed_time, TIME_USEC) > integrator->max_time)
- break;
- }
+ /* Merge the parallel estimation into the final estimator */
+ FOR_EACH(i, 0, nthreads) {
+ estimator->nsamples += nsamples[i];
+ integrator->type->add(estimator->value, estimator->value, accums[i]);
+ integrator->type->add
+ (estimator->square_value, estimator->square_value, accums_sqr[i]);
}
- integrator->type->destroy(dev->allocator, val);
exit:
+ if(raw) { /* Release temporary variables */
+ FOR_EACH(i, 0, nthreads) {
+ if(vals[i]) integrator->type->destroy(dev->allocator, vals[i]);
+ if(accums[i]) integrator->type->destroy(dev->allocator, accums[i]);
+ if(accums_sqr[i]) integrator->type->destroy(dev->allocator, accums_sqr[i]);
+ }
+ MEM_FREE(dev->allocator, raw);
+ }
if(out_estimator) *out_estimator = estimator;
return res;
error:
@@ -177,17 +237,74 @@ smc_solve_N
const size_t sizeof_ctx,
struct smc_estimator* estimators[])
{
+ res_T res = RES_OK;
size_t i;
- if(!dev || !integrator || !estimators || !count
- || !check_integrator(integrator)) {
- return RES_BAD_ARG;
+ size_t nthreads;
+ void** vals = NULL;
+
+ if(!estimators) {
+ res = RES_BAD_ARG;
+ goto error;
}
+ memset(estimators, 0, sizeof(struct smc_estimator*) * count);
- #pragma omp parallel for schedule(dynamic)
+ if(!dev || !integrator || !count || !check_integrator(integrator)) {
+ res = RES_BAD_ARG;
+ goto error;
+ }
+
+ /* Create the estimators */
+ FOR_EACH(i, 0, count ) {
+ res = estimator_create(dev, integrator->type, estimators + i);
+ if(res != RES_OK) goto error;
+ }
+
+ /* Create the per thread temporary variables */
+ nthreads = device_get_threads_count(dev);
+ vals = MEM_CALLOC(dev->allocator, nthreads, sizeof(void*));
+ FOR_EACH(i, 0, nthreads) {
+ vals[i] = integrator->type->create(dev->allocator);
+ if(!vals[i]) {
+ res = RES_MEM_ERR;
+ goto error;
+ }
+ }
+
+ /* Parallel estimation of N simulations */
+ #pragma omp parallel for schedule(static)
for(i = 0; i < count; ++i) {
- SMC(solve(dev, integrator, (char*)ctx + i*sizeof_ctx, estimators + i));
+ unsigned long istep;
+ const int ithread = omp_get_thread_num();
+ FOR_EACH(istep, 0, integrator->max_steps) {
+ integrator->integrand
+ (vals[ithread], dev->rngs[ithread], (char*)ctx + i*sizeof_ctx);
+ integrator->type->add
+ (estimators[i]->value, estimators[i]->value, vals[ithread]);
+ integrator->type->mul(vals[ithread], vals[ithread], vals[ithread]);
+ integrator->type->add
+ (estimators[i]->square_value, estimators[i]->square_value, vals[ithread]);
+ ++estimators[i]->nsamples;
+ }
}
- return RES_OK;
+
+exit:
+ if(vals) {
+ FOR_EACH(i, 0, nthreads) {
+ if(vals[i]) integrator->type->destroy(dev->allocator, vals[i]);
+ }
+ MEM_FREE(dev->allocator, vals);
+ }
+ return res;
+error:
+ if(estimators) {
+ FOR_EACH(i, 0, count) {
+ if(estimators[i]) {
+ SMC(estimator_ref_put(estimators[i]));
+ estimators[i] = NULL;
+ }
+ }
+ }
+ goto exit;
}
res_T
diff --git a/src/test_smc_device.c b/src/test_smc_device.c
@@ -50,8 +50,8 @@ main(int argc, char** argv)
struct smc_device* dev;
(void)argc, (void)argv;
- CHECK(smc_device_create(NULL, NULL, NULL), RES_BAD_ARG);
- CHECK(smc_device_create(NULL, NULL, &dev), RES_OK);
+ CHECK(smc_device_create(NULL, NULL, SMC_NTHREADS_DEFAULT, NULL), RES_BAD_ARG);
+ CHECK(smc_device_create(NULL, NULL, SMC_NTHREADS_DEFAULT, &dev), RES_OK);
CHECK(smc_device_ref_get(NULL), RES_BAD_ARG);
CHECK(smc_device_ref_get(dev), RES_OK);
@@ -59,11 +59,13 @@ main(int argc, char** argv)
CHECK(smc_device_ref_put(dev), RES_OK);
CHECK(smc_device_ref_put(dev), RES_OK);
+ CHECK(smc_device_create(NULL, NULL, 0, &dev), RES_BAD_ARG);
+
mem_init_proxy_allocator(&allocator, &mem_default_allocator);
CHECK(MEM_ALLOCATED_SIZE(&allocator), 0);
- CHECK(smc_device_create(NULL, &allocator, NULL), RES_BAD_ARG);
- CHECK(smc_device_create(NULL, &allocator, &dev), RES_OK);
+ CHECK(smc_device_create(NULL, &allocator, 1, NULL), RES_BAD_ARG);
+ CHECK(smc_device_create(NULL, &allocator, 1, &dev), RES_OK);
CHECK(smc_device_ref_put(dev), RES_OK);
CHECK(MEM_ALLOCATED_SIZE(&allocator), 0);
@@ -72,12 +74,12 @@ main(int argc, char** argv)
logger_set_stream(&logger, LOG_ERROR, log_stream, NULL);
logger_set_stream(&logger, LOG_WARNING, log_stream, NULL);
- CHECK(smc_device_create(&logger, NULL, NULL), RES_BAD_ARG);
- CHECK(smc_device_create(&logger, NULL, &dev), RES_OK);
+ CHECK(smc_device_create(&logger, NULL, 1, NULL), RES_BAD_ARG);
+ CHECK(smc_device_create(&logger, NULL, 1, &dev), RES_OK);
CHECK(smc_device_ref_put(dev), RES_OK);
- CHECK(smc_device_create(&logger, &allocator, NULL), RES_BAD_ARG);
- CHECK(smc_device_create(&logger, &allocator, &dev), RES_OK);
+ CHECK(smc_device_create(&logger, &allocator, 4, NULL), RES_BAD_ARG);
+ CHECK(smc_device_create(&logger, &allocator, 4, &dev), RES_OK);
CHECK(smc_device_ref_put(dev), RES_OK);
logger_release(&logger);
diff --git a/src/test_smc_light_path.c b/src/test_smc_light_path.c
@@ -375,11 +375,11 @@ main(int argc, char** argv)
CHECK(s3d_scene_begin_trace(scn), RES_OK);
- CHECK(smc_device_create(NULL, &allocator, &smc), RES_OK);
+ CHECK(smc_device_create(NULL, &allocator, SMC_NTHREADS_DEFAULT, &smc), RES_OK);
integrator.integrand = light_path_integrator;
integrator.type = &smc_float;
- integrator.max_steps = 8;
+ integrator.max_steps = 4;
contexts = MEM_CALLOC
(&allocator, IMG_WIDTH*IMG_HEIGHT, sizeof(struct integrator_context));
diff --git a/src/test_smc_solve.c b/src/test_smc_solve.c
@@ -45,8 +45,8 @@ rcp_x(void* value, struct ssp_rng* rng, void* ctx)
NCHECK(value, NULL);
NCHECK(rng, NULL);
CHECK(ctx, NULL);
- samp = ssp_rng_uniform_double(rng, 1.0, 2.0);
- *result = 1.0 / samp;
+ samp = ssp_rng_uniform_double(rng, 1.0, 4.0);
+ *result = 1.0 / samp * 3;
}
static void
@@ -57,8 +57,8 @@ cos_x(void* value, struct ssp_rng* rng, void* ctx)
NCHECK(value, NULL);
NCHECK(rng, NULL);
CHECK(ctx, (void*)0xC0DE);
- samp = ssp_rng_uniform_double(rng, PI/4.0, (3.0*PI)/4.0);
- *result = (float)cos(samp);
+ samp = ssp_rng_uniform_double(rng, -PI/4.0, PI/4.0);
+ *result = (float)(cos(samp) * PI / 2.0);
}
int
@@ -73,11 +73,11 @@ main(int argc, char** argv)
mem_init_proxy_allocator(&allocator, &mem_default_allocator);
- CHECK(smc_device_create(NULL, &allocator, &dev), RES_OK);
+ CHECK(smc_device_create(NULL, &allocator, SMC_NTHREADS_DEFAULT, &dev), RES_OK);
integrator.integrand = rcp_x;
integrator.type = &smc_double;
- integrator.max_steps = 4096;
+ integrator.max_steps = 10000;
CHECK(smc_solve(NULL, NULL, NULL, NULL), RES_BAD_ARG);
CHECK(smc_solve(dev, NULL, NULL, NULL), RES_BAD_ARG);
@@ -92,8 +92,10 @@ main(int argc, char** argv)
CHECK(smc_estimator_get_status(estimator, NULL), RES_BAD_ARG);
CHECK(smc_estimator_get_status(NULL, &status), RES_BAD_ARG);
CHECK(smc_estimator_get_status(estimator, &status), RES_OK);
- CHECK(eq_eps
- (log(2.0) - log(1.0), SMC_DOUBLE(status.E), SMC_DOUBLE(status.SE)), 1);
+ printf("Integral[1, 4] 1/x dx = %f; E = %f; SE = %f\n",
+ log(4.0) - log(1.0), SMC_DOUBLE(status.E), SMC_DOUBLE(status.SE));
+ CHECK(eq_eps
+ (log(4.0) - log(1.0), SMC_DOUBLE(status.E), 2.0 * SMC_DOUBLE(status.SE)), 1);
CHECK(smc_estimator_ref_put(estimator), RES_OK);
integrator.type = NULL;
@@ -106,9 +108,10 @@ main(int argc, char** argv)
integrator.type = &smc_float;
CHECK(smc_solve(dev, &integrator, (void*)0xC0DE, &estimator), RES_OK);
CHECK(smc_estimator_get_status(estimator, &status), RES_OK);
+ printf("Integral[-PI/4, PI/4] cos(x) dx = %f; E = %f; SE = %f\n",
+ 2*sin(PI/4.0), SMC_FLOAT(status.E), SMC_FLOAT(status.SE));
CHECK(eq_eps
- ((float)(sin(3.0*PI/4.0) - sin(PI/4.0)),
- SMC_FLOAT(status.E), SMC_FLOAT(status.SE)), 1);
+ ((float)2*sin(PI/4.0), SMC_FLOAT(status.E), 2 * SMC_FLOAT(status.SE)), 1);
CHECK(smc_device_ref_put(dev), RES_OK);
CHECK(smc_estimator_ref_put(estimator), RES_OK);