commit 8be0709bea69f04caf980414f6bff38db4dd3e2b
parent da90140e8d47b04aa9aeed9595dac86e97f918ef
Author: Vincent Forest <vincent.forest@meso-star.com>
Date: Fri, 21 Mar 2025 19:04:53 +0100
Store RNG state caches in memory
The use of temporary files could lead to write failures when the
temporary directory ran out of space. A situation encountered on
supercomputers where, not only is a large number of caches used
(presumably as many as threads), but also temporary files are stored in
a partition with limited space.
Nowadays, caches are stored in memory buffers accessed as streams.
However, these streams now have a maximum size that cannot be exceeded.
The cache management policy had to be updated accordingly.
Note that the memory cost of caches can be significant (e.g. 4 GB with
128 independent random generators). Some may therefore consider reducing
their capacity. But to do this, more tests need to be carried out to
verify the impact of such a change.
Diffstat:
| M | src/ssp_rng_proxy.c | | | 130 | ++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------- |
1 file changed, 82 insertions(+), 48 deletions(-)
diff --git a/src/ssp_rng_proxy.c b/src/ssp_rng_proxy.c
@@ -13,7 +13,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. */
-#define _POSIX_C_SOURCE 200809L /* getpid */
+#define _POSIX_C_SOURCE 200809L /* fmemopen, getpid */
#include "ssp_rng_c.h"
@@ -28,22 +28,28 @@
#define BUCKET_SIZE_DEFAULT 1000000 /* #RNs per bucket */
-/* Cache size to use. This is just a hint: the effective size of the cache is
- * actually a multiple of the size of the RNG state it should store. */
+/* Cache capacity. Maximum size in bytes that the cache can store */
#if 1
- #define STATE_CACHE_HINT_MAX_SIZE (32*(1024*1024)) /* 32 MB */
+ #define STATE_CACHE_CAPACITY (32*(1024*1024)) /* 32 MB */
#else
- #define STATE_CACHE_HINT_MAX_SIZE 0 /* Disable the cache */
+ #define STATE_CACHE_CAPACITY 0 /* Disable the cache */
#endif
/* Cache of RNG states */
struct rng_state_cache {
struct darray_char state; /* Save the next RNG state with 'no_wstream' */
struct darray_char state_scratch; /* Scracth state buffer */
- FILE* stream; /* Stream into which the RNG states are stored */
+
+ struct mem_allocator* allocator;
+ void* buffer; /* Memory in which RNG states are stored */
+ FILE* stream; /* Stream to buffer storing RNG states */
+
size_t state_pitch; /* #RNs between 2 cached states */
size_t nstates; /* #cached states */
+
long read, write; /* Offset into the stream where to read/write RNG states */
+ long end_of_cache; /* Stream offset that marks the end of cache */
+
int no_wstream; /* Define if the RNG states are no more written to a stream */
int no_rstream; /* Define if the RNG states are no more read from a stream */
};
@@ -170,15 +176,35 @@ rng_state_cache_init
const size_t state_pitch, /* #RNs between cached states */
struct rng_state_cache* cache)
{
+ res_T res = RES_OK;
ASSERT(cache);
+
memset(cache, 0, sizeof(*cache));
darray_char_init(allocator, &cache->state);
darray_char_init(allocator, &cache->state_scratch);
- cache->stream = tmpfile();
- if(!cache->stream) return RES_IO_ERR;
+ cache->allocator = allocator;
+ cache->end_of_cache = STATE_CACHE_CAPACITY;
+
+ cache->buffer = MEM_CALLOC(allocator, 1, STATE_CACHE_CAPACITY);
+ if(!cache->buffer) {
+ res = RES_MEM_ERR;
+ goto error;
+ }
+
+ cache->stream = fmemopen(cache->buffer, STATE_CACHE_CAPACITY, "w+");
+ if(!cache->stream) {
+ res = RES_IO_ERR;
+ goto error;
+ }
cache->read = cache->write = ftell(cache->stream);
cache->state_pitch = state_pitch;
- return RES_OK;
+
+exit:
+ return res;
+error:
+ if(cache->buffer) MEM_RM(allocator, cache->buffer);
+ if(cache->stream) fclose(cache->stream);
+ goto exit;
}
static void
@@ -186,6 +212,7 @@ rng_state_cache_release(struct rng_state_cache* cache)
{
ASSERT(cache);
if(cache->stream) fclose(cache->stream);
+ if(cache->buffer) MEM_RM(cache->allocator, cache->buffer);
darray_char_release(&cache->state);
darray_char_release(&cache->state_scratch);
}
@@ -193,13 +220,9 @@ rng_state_cache_release(struct rng_state_cache* cache)
static res_T
rng_state_cache_clear(struct rng_state_cache* cache)
{
- if(!cache->stream) {
- cache->stream = tmpfile();
- if(!cache->stream) return RES_IO_ERR;
- } else {
- rewind(cache->stream);
- cache->read = cache->write = ftell(cache->stream);
- }
+ ASSERT(cache);
+ rewind(cache->stream);
+ cache->read = cache->write = ftell(cache->stream);
cache->nstates = 0;
cache->no_wstream = 0;
cache->no_rstream = 0;
@@ -291,6 +314,13 @@ rng_state_cache_read
mutex_lock(mutex);
ASSERT(!rng_state_cache_is_empty(cache));
+ /* The read file pointer has reached the end of the cache.
+ * Rewind it to the beginning of the cache */
+ if(cache->read == cache->end_of_cache) {
+ cache->read = 0;
+ cache->end_of_cache = STATE_CACHE_CAPACITY;
+ }
+
if(!cache->no_rstream
&& cache->no_wstream
&& cache->read == cache->write
@@ -298,12 +328,15 @@ rng_state_cache_read
/* There is no more data cached into the stream. Close the stream and do
* not rely anymore on the proxy RNG to generate the RNG states */
fclose(cache->stream);
+ MEM_RM(cache->allocator, cache->buffer);
cache->stream = NULL;
+ cache->buffer = NULL;
cache->no_rstream = 1;
}
/* Read the cached RNG state from the stream */
if(!cache->no_rstream) {
+
fseek(cache->stream, cache->read, SEEK_SET);
res = ssp_rng_read(rng, cache->stream);
if(res != RES_OK) {
@@ -312,11 +345,6 @@ rng_state_cache_read
}
cache->read = ftell(cache->stream);
- /* The fp reaches the end of the cached data */
- if(cache->read >= STATE_CACHE_HINT_MAX_SIZE) {
- cache->read = 0;
- }
-
/* Remove one cached states */
cache->nstates -= 1;
@@ -357,43 +385,49 @@ error:
static res_T
rng_state_cache_write(struct rng_state_cache* cache, struct ssp_rng* rng)
{
+ long remaining_space = 0; /* Remaining cache space */
+ size_t len = 0; /* Length of the cache state in bytes */
res_T res = RES_OK;
+
ASSERT(cache && rng);
if(cache->no_wstream) goto exit; /* Do not cache the submitted state */
+ /* Store in memory the state to be cached */
+ res = rng_write_cstr(rng, &cache->state, &len);
+ if(res != RES_OK) goto error;
+
+ /* There are no spaces left at the end of the stream: rewind the writing */
+ if(len > (size_t)(STATE_CACHE_CAPACITY - cache->write)) {
+ cache->end_of_cache = cache->write; /* Mark the end of cache */
+ cache->write = 0;
+ }
+
fseek(cache->stream, cache->write, SEEK_SET);
- if(STATE_CACHE_HINT_MAX_SIZE > 0
- && (rng_state_cache_is_empty(cache) || cache->write > cache->read)) {
- /* Directly write the RNG state into the cache stream */
- res = ssp_rng_write(rng, cache->stream);
- if(res != RES_OK) goto error;
- cache->write = ftell(cache->stream);
- /* The fp exceed the amount of cached data */
- if(cache->write >= STATE_CACHE_HINT_MAX_SIZE) {
- cache->write = 0;
- }
+ /* Calculate remaining cache space */
+ if(rng_state_cache_is_empty(cache) || cache->write > cache->read) {
+ remaining_space = STATE_CACHE_CAPACITY - cache->write;
+ } else {
+ remaining_space = cache->read - cache->write;
+ }
+
+ /* There is no sufficient space. Cache cannot be used */
+ if(remaining_space < 0 || len > (size_t)remaining_space) {
+ cache->no_wstream = 1;
+ /* There is sufficient space. Write the RNG state into the cached stream */
} else {
- size_t len;
- res = rng_write_cstr(rng, &cache->state, &len);
- if(res != RES_OK) goto error;
+ size_t sz = 0;
- if(len > (size_t)(cache->read - cache->write)) {
- /* No sufficient space into the cache stream to save the RNG state */
- cache->no_wstream = 1;
- } else {
- /* Write the RNG state into the cached stream */
- size_t sz;
- sz = fwrite(darray_char_cdata_get(&cache->state), 1, len, cache->stream);
- if(sz != len) { res = RES_IO_ERR; goto error; }
- cache->write = ftell(cache->stream);
-
- /* The fp exceed the amount of cached data */
- if(cache->write >= STATE_CACHE_HINT_MAX_SIZE) {
- cache->write = 0;
- }
+ sz = fwrite(darray_char_cdata_get(&cache->state), 1, len, cache->stream);
+ if(sz != len) { res = RES_IO_ERR; goto error; }
+ cache->write = ftell(cache->stream);
+
+ /* Flush write state to detect a write error */
+ if(fflush(cache->stream) != 0) {
+ res = RES_IO_ERR;
+ goto error;
}
}