star-sp

Random number generators and distributions
git clone git://git.meso-star.fr/star-sp.git
Log | Files | Refs | README | LICENSE

ssp_rng_proxy.c (30287B)


      1 /* Copyright (C) 2015-2025 |Méso|Star> (contact@meso-star.com)
      2  *
      3  * This program is free software: you can redistribute it and/or modify
      4  * it under the terms of the GNU General Public License as published by
      5  * the Free Software Foundation, either version 3 of the License, or
      6  * (at your option) any later version.
      7  *
      8  * This program is distributed in the hope that it will be useful,
      9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     11  * GNU General Public License for more details.
     12  *
     13  * You should have received a copy of the GNU General Public License
     14  * along with this program. If not, see <http://www.gnu.org/licenses/>. */
     15 
     16 #define _POSIX_C_SOURCE 200809L /* fmemopen, getpid */
     17 
     18 #include "ssp_rng_c.h"
     19 
     20 #include <rsys/dynamic_array_char.h>
     21 #include <rsys/mutex.h>
     22 #include <rsys/ref_count.h>
     23 #include <rsys/signal.h>
     24 #include <rsys/stretchy_array.h>
     25 
     26 #include <limits.h>
     27 #include <unistd.h> /* getpid */
     28 
     29 #define BUCKET_SIZE_DEFAULT 1000000 /* #RNs per bucket */
     30 
     31 /* Cache capacity. Maximum size in bytes that the cache can store */
     32 #if 1
     33   #define STATE_CACHE_CAPACITY (4*1024*1024) /* 4 MB */
     34 #else
     35   #define STATE_CACHE_CAPACITY 0 /* Disable the cache */
     36 #endif
     37 
     38 /* Cache of RNG states */
     39 struct rng_state_cache {
     40   struct darray_char state; /* Save the next RNG state with 'no_wstream' */
     41   struct darray_char state_scratch; /* Scracth state buffer */
     42 
     43   struct mem_allocator* allocator;
     44   void* buffer; /* Memory in which RNG states are stored */
     45   FILE* stream; /* Stream to buffer storing RNG states */
     46 
     47   size_t state_pitch; /* #RNs between 2 cached states */
     48   size_t nstates; /* #cached states */
     49 
     50   long read, write; /* Offset into the stream where to read/write RNG states */
     51   long end_of_cache; /* Stream offset that marks the end of cache */
     52 
     53   int no_wstream; /* Define if the RNG states are no more written to a stream */
     54   int no_rstream; /* Define if the RNG states are no more read from a stream */
     55 
     56   int is_init; /* Cache initialisation state */
     57 };
     58 
     59 CLBK(rng_proxy_cb_T, ARG1(const struct ssp_rng_proxy*));
     60 
     61 enum rng_proxy_sig {
     62   RNG_PROXY_SIG_SET_STATE,
     63   RNG_PROXY_SIGS_COUNT__
     64 };
     65 
     66 /* A proxy manages a list of N independent RNGs of the same type named buckets.
     67  * One ensure that each bucket have independent `infinite' random numbers by
     68  * partitioning a unique random sequence in N random pools, each containing
     69  * `bucket_size' random numbers. When a bucket has no more random number in its
     70  * affected pool, it silently retrieves a new pool of `bucket_size' random
     71  * numbers from the proxy.
     72  *
     73  * Mapping of the partitions of the unique random sequence to N buckets
     74  *
     75  * bucket_size
     76  * /         \
     77  * +---------+---------+-   -+---------+---------+---------+-
     78  * | Bucket0 | Bucket1 | ... |BucketN-1| Bucket0 | Bucket1 | ...
     79  * +---------+---------+-   -+---------+---------+---------+-
     80  *                  Unique random sequence */
     81 struct ssp_rng_proxy {
     82   enum ssp_rng_type type; /* Type of the RNGs managed by the proxy */
     83 
     84   struct ssp_rng* rng; /* Main `type' RNG */
     85 
     86   /* The following arrays have the same size */
     87   ATOMIC* buckets; /* Flag that defines which bucket RNGs are created */
     88   size_t sequence_size; /* #RNs in a sequence */
     89   size_t sequence_bias; /* #RNs to discard between 2 consecutive sequence */
     90   size_t bucket_size; /* #random numbers per bucket */
     91   struct ssp_rng** pools; /* `type' RNGs wrapped by bucket RNGs */
     92   struct rng_state_cache* states; /* Cache of `type' RNG states */
     93 
     94   /* Index of the last queried sequence. This index is independent of the
     95    * original seed used by the proxy and is designed to identify the status of
     96    * the proxy relative to that original seed. When the proxy is created, the
     97    * sequence index is SSP_SEQUENCE_ID_NONE, i.e. no sequence was queried. At
     98    * the first request for a random number, the first sequence is consumed and
     99    * this sequence index is then 0. It is then incremented by one each time a
    100    * new sequence is required.
    101    *
    102    * Each bucket stores the sequence ID of its local RNG. The proxy sequence ID
    103    * is the maximum between these local sequence indices. Note that we also
    104    * keep track of the RNG proxy's sequence index (main_sequence_id); it is
    105    * equal to the proxy sequence identifier only when the caching mechanism is
    106    * still in use. */
    107   size_t* per_bucket_sequence_id;
    108   size_t main_sequence_id;
    109 
    110   signal_T signals[RNG_PROXY_SIGS_COUNT__];
    111 
    112   struct mutex* mutex;
    113   struct mem_allocator* allocator;
    114   ref_T ref;
    115 };
    116 
    117 /* Return a RNG with a pool of `bucket_size' indenpendant random numbers. Each
    118  * pool are ensured to be independant per `bucket_id' in [0, N) and per function
    119  * call, i.e. calling this function X times with the same bucket_id will
    120  * provide X different random pools.
    121  *
    122  *         bucket_size                 sequence_bias
    123  *         /          \                    /  \
    124  * +------+------------+-   -+------------+----+------------+-
    125  * |######|  Bucket 0  | ... | Bucket N-1 |####|  Bucket 0  | ...
    126  * |######|  1st pool  |     |  1st pool  |####|  2nd pool  |
    127  * +------+------------+-   -+------------+----+------------+-
    128  *  \    / \_________sequence_size_______/    /
    129  * sequence \________sequence_pitch__________/
    130  *  offset
    131  */
    132 static struct ssp_rng*
    133 rng_proxy_next_ran_pool
    134   (struct ssp_rng_proxy* proxy,
    135    const size_t bucket_id);
    136 
    137 /* Write the RNG state into buf. State data are terminated by a null char */
    138 static res_T
    139 rng_write_cstr
    140   (const struct ssp_rng* rng,
    141    struct darray_char* buf,
    142    size_t* out_len) /* May be NULL. String length without the null char */
    143 {
    144   size_t len;
    145   res_T res = RES_OK;
    146   ASSERT(rng && buf);
    147 
    148   /* Write the RNG state into a temporary buffer */
    149   res = ssp_rng_write_cstr
    150     (rng, darray_char_data_get(buf), darray_char_size_get(buf), &len);
    151   if(res != RES_OK) goto error;
    152 
    153   /* Not sufficient space to store the state */
    154   if(len >= darray_char_size_get(buf)) {
    155     res = darray_char_resize(buf, len + 1/*null char*/);
    156     if(res != RES_OK) goto error;
    157 
    158     res = ssp_rng_write_cstr
    159       (rng, darray_char_data_get(buf), darray_char_size_get(buf), &len);
    160     if(res != RES_OK) goto error;
    161     ASSERT(len + 1/*null char*/ == darray_char_size_get(buf));
    162   }
    163 
    164   if(out_len) *out_len = len;
    165 
    166 exit:
    167   return res;
    168 error:
    169   goto exit;
    170 }
    171 
    172 /*******************************************************************************
    173  * Cache of RNG states
    174  ******************************************************************************/
    175 static void
    176 rng_state_cache_release(struct rng_state_cache* cache)
    177 {
    178   ASSERT(cache);
    179   if(!cache->is_init) return; /* Nothing to release */
    180 
    181   if(cache->stream) fclose(cache->stream);
    182   if(cache->buffer) MEM_RM(cache->allocator, cache->buffer);
    183   darray_char_release(&cache->state);
    184   darray_char_release(&cache->state_scratch);
    185 }
    186 
    187 static res_T
    188 rng_state_cache_init
    189   (struct mem_allocator* allocator,
    190    const size_t state_pitch, /* #RNs between cached states */
    191    struct rng_state_cache* cache)
    192 {
    193   res_T res = RES_OK;
    194   ASSERT(cache);
    195 
    196   memset(cache, 0, sizeof(*cache));
    197   darray_char_init(allocator, &cache->state);
    198   darray_char_init(allocator, &cache->state_scratch);
    199   cache->allocator = allocator;
    200   cache->end_of_cache = STATE_CACHE_CAPACITY;
    201   cache->state_pitch = state_pitch;
    202 
    203   if(STATE_CACHE_CAPACITY != 0) {
    204     cache->buffer = MEM_CALLOC(allocator, 1, STATE_CACHE_CAPACITY);
    205     if(!cache->buffer) { res = RES_MEM_ERR; goto error; }
    206     cache->stream = fmemopen(cache->buffer, STATE_CACHE_CAPACITY, "w+");
    207     if(!cache->stream) { res = RES_IO_ERR; goto error; }
    208     cache->read = cache->write = ftell(cache->stream);
    209   }
    210 
    211   cache->is_init = 1;
    212 
    213 exit:
    214   return res;
    215 error:
    216   rng_state_cache_release(cache);
    217   goto exit;
    218 }
    219 
    220 static res_T
    221 rng_state_cache_clear(struct rng_state_cache* cache)
    222 {
    223   ASSERT(cache);
    224   if(cache->stream) {
    225     rewind(cache->stream);
    226     cache->read = cache->write = ftell(cache->stream);
    227   } else {
    228     ASSERT(STATE_CACHE_CAPACITY == 0);
    229     cache->read = cache->write = 0;
    230   }
    231   cache->nstates = 0;
    232   cache->no_wstream = 0;
    233   cache->no_rstream = 0;
    234   return RES_OK;
    235 }
    236 
    237 static char
    238 rng_state_cache_is_empty(struct rng_state_cache* cache)
    239 {
    240   ASSERT(cache);
    241   return cache->nstates == 0;
    242 }
    243 
    244 static res_T
    245 rng_state_cache_dump(struct rng_state_cache* cache)
    246 {
    247   /* Output file into which cache stream is dumped */
    248   char name[128] = {0};
    249   pid_t process = 0; /* Process identifier */
    250   FILE* fp = NULL;
    251 
    252   /* Temporary memory to copy cache stream */
    253   void* mem = NULL;
    254 
    255   /* Miscellaneous */
    256   long offset = 0;
    257   int n = 0;
    258   res_T res = RES_OK;
    259 
    260   ASSERT(cache);
    261 
    262   /* No cache to dump */
    263   if(!cache->stream) goto exit;
    264 
    265   process = getpid();
    266 
    267   #define TRY(Cond, Err) { \
    268     if(!(Cond)) { \
    269       fprintf(stderr, "%s:%d: %s\n", FUNC_NAME, __LINE__, strerror(Err)); \
    270       switch(Err) { \
    271         case EIO: res = RES_IO_ERR; break; \
    272         case ENOMEM: res = RES_MEM_ERR; break; \
    273         default: res = RES_UNKNOWN_ERR; break; \
    274       } \
    275       goto error; \
    276     } \
    277   } (void)0
    278 
    279   /* Requests the offset at the end of the file,
    280    * i.e. the size of the cache stream */
    281   TRY(fseek(cache->stream, 0, SEEK_END) == 0, errno);
    282   TRY((offset = ftell(cache->stream)) >= 0, errno);
    283 
    284   /* Define the state cache filename */
    285   n = snprintf(name, sizeof(name), "rng_cache_r%lu_w%lu_s%ld_%d",
    286     cache->read, cache->write, offset, process);
    287   TRY(n >= 0, errno);
    288   TRY((unsigned long)n < sizeof(name), ENOMEM);
    289 
    290   /* Create the output file */
    291   TRY((fp = fopen(name, "w")) != NULL, errno);
    292 
    293   rewind(cache->stream);
    294 
    295   /* Load cache stream in mem, and then dump it to the state cache */
    296   TRY((mem = mem_alloc(offset)) != NULL, ENOMEM);
    297   TRY((fread(mem, offset, 1, cache->stream)) == 1, EIO);
    298   TRY((fwrite(mem, offset, 1, fp)) == 1, EIO);
    299 
    300   #undef TRY
    301 
    302 exit:
    303   if(mem) mem_rm(mem);
    304   if(fp) CHK(fclose(fp) == 0);
    305   return res;
    306 error:
    307   goto exit;
    308 }
    309 
    310 static res_T
    311 rng_state_cache_read
    312   (struct rng_state_cache* cache,
    313    struct ssp_rng* rng,
    314    struct mutex* mutex) /* Proxy mutex */
    315 {
    316   res_T res = RES_OK;
    317   ASSERT(cache && rng && mutex);
    318 
    319   mutex_lock(mutex);
    320   ASSERT(!rng_state_cache_is_empty(cache));
    321 
    322   /* The read file pointer has reached the end of the cache.
    323    * Rewind it to the beginning of the cache */
    324   if(cache->read == cache->end_of_cache) {
    325     cache->read = 0;
    326     cache->end_of_cache = STATE_CACHE_CAPACITY;
    327   }
    328 
    329   if(!cache->no_rstream
    330   && cache->no_wstream
    331   && cache->read == cache->write
    332   && cache->nstates == 1/* A state is saved in 'cache->state' */) {
    333     /* There is no more data cached into the stream. Do not rely anymore on the
    334      * proxy RNG to generate the RNG states */
    335     cache->no_rstream = 1;
    336   }
    337 
    338   /* Read the cached RNG state from the stream */
    339   if(!cache->no_rstream) {
    340 
    341     fseek(cache->stream, cache->read, SEEK_SET);
    342     res = ssp_rng_read(rng, cache->stream);
    343     if(res != RES_OK) {
    344       mutex_unlock(mutex);
    345       goto error;
    346     }
    347     cache->read = ftell(cache->stream);
    348 
    349     /* Remove one cached states */
    350     cache->nstates -= 1;
    351 
    352     mutex_unlock(mutex);
    353 
    354   /* Generate the next RNG state and load the cached one */
    355   } else {
    356     /* All is done locally to the RNG. We can thus unlock the proxy mutex */
    357     mutex_unlock(mutex);
    358 
    359     /* Copy the cached RNG state */
    360     res = darray_char_copy(&cache->state_scratch, &cache->state);
    361     if(res != RES_OK) goto error;
    362 
    363     /* Load the cached RNG state */
    364     res = ssp_rng_read_cstr(rng, darray_char_cdata_get(&cache->state));
    365     if(res != RES_OK) goto error;
    366 
    367     /* Setup the next RNG state */
    368     res = ssp_rng_discard(rng, cache->state_pitch);
    369     if(res != RES_OK) goto error;
    370 
    371     /* Save the next RNG state */
    372     res = rng_write_cstr(rng, &cache->state, NULL);
    373     if(res != RES_OK) goto error;
    374 
    375     /* Setup the current RNG state */
    376     res = ssp_rng_read_cstr(rng, darray_char_cdata_get(&cache->state_scratch));
    377     if(res != RES_OK) goto error;
    378   }
    379 
    380 exit:
    381   return res;
    382 error:
    383   goto exit;
    384 }
    385 
    386 static res_T
    387 rng_state_cache_write(struct rng_state_cache* cache, struct ssp_rng* rng)
    388 {
    389   long remaining_space = 0; /* Remaining cache space */
    390   size_t len = 0; /* Length of the cache state in bytes */
    391   res_T res = RES_OK;
    392 
    393   ASSERT(cache && rng);
    394 
    395   if(cache->no_wstream) goto exit; /* Do not cache the submitted state */
    396 
    397   /* Store in memory the state to be cached */
    398   res = rng_write_cstr(rng, &cache->state, &len);
    399   if(res != RES_OK) goto error;
    400 
    401   /* There are no spaces left at the end of the stream: rewind the writing */
    402   if(len > (size_t)(STATE_CACHE_CAPACITY - cache->write)) {
    403     cache->end_of_cache = cache->write; /* Mark the end of cache */
    404     cache->write = 0;
    405   }
    406 
    407   /* Calculate remaining cache space */
    408   if(rng_state_cache_is_empty(cache) || cache->write > cache->read) {
    409     remaining_space = STATE_CACHE_CAPACITY - cache->write;
    410   } else {
    411     remaining_space = cache->read - cache->write;
    412   }
    413 
    414   /* There is no sufficient space. Cache cannot be used */
    415   if(remaining_space < 0 || len > (size_t)remaining_space) {
    416     cache->no_wstream = 1;
    417 
    418   /* There is sufficient space. Write the RNG state into the cached stream */
    419   } else {
    420     size_t sz = 0;
    421 
    422     fseek(cache->stream, cache->write, SEEK_SET);
    423     sz = fwrite(darray_char_cdata_get(&cache->state), 1, len, cache->stream);
    424     if(sz != len) { res = RES_IO_ERR; goto error; }
    425     cache->write = ftell(cache->stream);
    426 
    427     /* Flush write state to detect a write error */
    428     if(fflush(cache->stream) != 0) {
    429       res = RES_IO_ERR;
    430       goto error;
    431     }
    432   }
    433 
    434   /* Update the number of cached states */
    435   cache->nstates += 1;
    436 
    437 exit:
    438   return res;
    439 error:
    440   goto exit;
    441 }
    442 
    443 /*******************************************************************************
    444  * RNG that control the scheduling of random number pools for a given bucket
    445  ******************************************************************************/
    446 struct rng_bucket {
    447   struct ssp_rng* pool; /* Wrapped RNG providing a pool of `bucket_size' RNs */
    448   struct ssp_rng_proxy* proxy; /* The RNG proxy */
    449   size_t name; /* Unique bucket identifier in [0, #buckets) */
    450   size_t count; /* Remaining unique random numbers in `pool' */
    451   rng_proxy_cb_T cb_on_proxy_set_state;
    452 };
    453 
    454 static void
    455 rng_bucket_on_proxy_set_state(const struct ssp_rng_proxy* proxy, void* ctx)
    456 {
    457   struct rng_bucket* rng = (struct rng_bucket*)ctx;
    458   ASSERT(proxy && ctx && rng->proxy == proxy);
    459   (void)proxy;
    460   /* Reset bucket */
    461   rng->count = 0;
    462   rng->pool = NULL;
    463 }
    464 
    465 static INLINE void
    466 rng_bucket_next_ran_pool(struct rng_bucket* rng)
    467 {
    468   ASSERT(rng);
    469   rng->pool = rng_proxy_next_ran_pool(rng->proxy, rng->name);
    470   rng->count = rng->proxy->bucket_size;
    471 }
    472 
    473 static res_T
    474 rng_bucket_set(void* data, const uint64_t seed)
    475 {
    476   (void)data, (void)seed;
    477   return RES_BAD_OP;
    478 }
    479 
    480 static uint64_t
    481 rng_bucket_get(void* data)
    482 {
    483   struct rng_bucket* rng = (struct rng_bucket*)data;
    484   ASSERT(data);
    485   if(!rng->count) rng_bucket_next_ran_pool(rng);
    486   --rng->count;
    487   return ssp_rng_get(rng->pool);
    488 }
    489 
    490 static res_T
    491 rng_bucket_read(void* data, FILE* file)
    492 {
    493   (void)data, (void)file;
    494   return RES_BAD_OP;
    495 }
    496 
    497 static res_T
    498 rng_bucket_read_cstr(void* data, const char* cstr)
    499 {
    500   (void)data, (void)cstr;
    501   return RES_BAD_OP;
    502 }
    503 
    504 static res_T
    505 rng_bucket_write(const void* data, FILE* file)
    506 {
    507   (void)data, (void)file;
    508   return RES_BAD_OP;
    509 }
    510 
    511 static res_T
    512 rng_bucket_write_cstr
    513   (const void* data,
    514    char* buf,
    515    const size_t bufsz,
    516    size_t* len)
    517 {
    518   (void)data, (void)buf, (void)bufsz, (void)len;
    519   return RES_BAD_OP;
    520 }
    521 
    522 static res_T
    523 rng_bucket_init(struct mem_allocator* allocator, void* data)
    524 {
    525   struct rng_bucket* rng = (struct rng_bucket*)data;
    526   ASSERT(data);
    527   (void)allocator;
    528   rng->proxy = NULL;
    529   rng->pool = NULL;
    530   rng->name = SIZE_MAX;
    531   rng->count = 0;
    532   return RES_OK;
    533 }
    534 
    535 static void
    536 rng_bucket_release(void* data)
    537 {
    538   struct rng_bucket* rng = (struct rng_bucket*)data;
    539   ASSERT(data && rng->proxy);
    540   ATOMIC_SET(&rng->proxy->buckets[rng->name], 0);
    541   CLBK_DISCONNECT(&rng->cb_on_proxy_set_state);
    542   SSP(rng_proxy_ref_put(rng->proxy));
    543 }
    544 
    545 static double
    546 rng_bucket_entropy(const void* data)
    547 {
    548   (void)data;
    549   return 0;
    550 }
    551 
    552 static res_T
    553 rng_bucket_discard(void* data, uint64_t n)
    554 {
    555   struct rng_bucket* rng = (struct rng_bucket*)data;
    556   ASSERT(data);
    557   while (rng->count < n) {
    558     n -= rng->count;
    559     rng_bucket_next_ran_pool(rng);
    560   }
    561   rng->count -= n;
    562   return ssp_rng_discard(rng->pool, n);
    563 }
    564 
    565 static const struct rng_desc RNG_BUCKET_NULL = {
    566   rng_bucket_init,
    567   rng_bucket_release,
    568   rng_bucket_set,
    569   rng_bucket_get,
    570   rng_bucket_discard,
    571   rng_bucket_read,
    572   rng_bucket_read_cstr,
    573   rng_bucket_write,
    574   rng_bucket_write_cstr,
    575   rng_bucket_entropy,
    576   INT_MAX, /* Min dummy value */
    577   0, /* Max dummy value */
    578   sizeof(struct rng_bucket),
    579   16
    580 };
    581 
    582 /*******************************************************************************
    583  * Helper functions
    584  ******************************************************************************/
    585 /* Scheduler of random number pools */
    586 struct ssp_rng*
    587 rng_proxy_next_ran_pool
    588   (struct ssp_rng_proxy* proxy,
    589    const size_t bucket_name)
    590 {
    591   res_T res = RES_OK;
    592   ASSERT(proxy);
    593   ASSERT(bucket_name <= sa_size(proxy->buckets));
    594   ASSERT(bucket_name <= sa_size(proxy->per_bucket_sequence_id));
    595 
    596   mutex_lock(proxy->mutex);
    597 
    598   if(rng_state_cache_is_empty(proxy->states + bucket_name)) {
    599     size_t ibucket;
    600     /* Register a new state for *all* buckets */
    601     FOR_EACH(ibucket, 0, sa_size(proxy->states)) {
    602       res = rng_state_cache_write(proxy->states + ibucket, proxy->rng);
    603       if(res != RES_OK) {
    604         rng_state_cache_dump(proxy->states + ibucket);
    605         FATAL("RNG proxy: cannot write to state cache\n");
    606       }
    607       ssp_rng_discard(proxy->rng, proxy->bucket_size);
    608     }
    609     /* Discard RNs to reach the next sequence */
    610     ssp_rng_discard(proxy->rng, proxy->sequence_bias);
    611 
    612     /* Increment the sequence id of the main RNG */
    613     proxy->main_sequence_id += 1;
    614   }
    615   mutex_unlock(proxy->mutex);
    616 
    617   /* Read the RNG pool state of `bucket_name' */
    618   res = rng_state_cache_read
    619     (proxy->states + bucket_name,
    620      proxy->pools[bucket_name],
    621      proxy->mutex);
    622   if(res != RES_OK) {
    623     mutex_lock(proxy->mutex);
    624     rng_state_cache_dump(proxy->states + bucket_name);
    625     mutex_unlock(proxy->mutex);
    626     FATAL("RNG proxy: cannot read from state cache\n");
    627   }
    628 
    629   /* Update the sequence of the bucket RNG */
    630   proxy->per_bucket_sequence_id[bucket_name] += 1;
    631 
    632   return proxy->pools[bucket_name];
    633 }
    634 
    635 static void
    636 rng_proxy_clear(struct ssp_rng_proxy* proxy)
    637 {
    638   size_t ibucket;
    639   ASSERT(proxy);
    640   ASSERT(sa_size(proxy->pools) == sa_size(proxy->buckets));
    641   ASSERT(sa_size(proxy->pools) == sa_size(proxy->states));
    642 
    643   FOR_EACH(ibucket, 0, sa_size(proxy->pools)) {
    644     ASSERT(proxy->buckets[ibucket] == 0); /* No bucket RNG should be created */
    645     if(proxy->pools[ibucket]) SSP(rng_ref_put(proxy->pools[ibucket]));
    646     rng_state_cache_release(proxy->states + ibucket);
    647   }
    648   sa_clear(proxy->buckets);
    649   sa_clear(proxy->pools);
    650   sa_clear(proxy->states);
    651 }
    652 
    653 static res_T
    654 rng_proxy_setup
    655   (struct ssp_rng_proxy* proxy,
    656    const size_t sequence_pitch, /* #RNs between 2 consecutive sequences */
    657    const size_t nbuckets)
    658 {
    659   size_t ibucket;
    660   res_T res = RES_OK;
    661 
    662   ASSERT(proxy && sequence_pitch && nbuckets);
    663   rng_proxy_clear(proxy);
    664 
    665   sa_add(proxy->states, nbuckets);
    666   sa_add(proxy->pools, nbuckets);
    667   sa_add(proxy->buckets, nbuckets);
    668   sa_add(proxy->per_bucket_sequence_id, nbuckets);
    669 
    670   /* Clearing allocated memory. This operation is necessary to manage errors and
    671    * identify which data has been initialised and which has not */
    672   memset(proxy->states, 0, sizeof(*proxy->states)*nbuckets);
    673   memset(proxy->pools, 0, sizeof(*proxy->pools)*nbuckets);
    674   memset(proxy->buckets, 0, sizeof(*proxy->buckets)*nbuckets);
    675   memset(proxy->per_bucket_sequence_id, 0,
    676     sizeof(*proxy->per_bucket_sequence_id)*nbuckets);
    677 
    678   FOR_EACH(ibucket, 0, nbuckets) {
    679     res = rng_state_cache_init
    680       (proxy->allocator, sequence_pitch, proxy->states+ibucket);
    681     if(res != RES_OK) goto error;
    682     res = ssp_rng_create(proxy->allocator, proxy->type, proxy->pools+ibucket);
    683     if(res != RES_OK) goto error;
    684     proxy->buckets[ibucket] = 0;
    685 
    686     /* Set the sequence index to SIZE_MAX because no sequence is active until a
    687      * random number query is made. On the first query, the index will be
    688      * incremented to 0 */
    689     proxy->per_bucket_sequence_id[ibucket] = SSP_SEQUENCE_ID_NONE/*<=> SIZE_MAX*/;
    690   }
    691 
    692 exit:
    693   return res;
    694 error:
    695   if(proxy) rng_proxy_clear(proxy);
    696   goto exit;
    697 }
    698 
    699 static res_T
    700 rng_proxy_clear_caches(struct ssp_rng_proxy* proxy)
    701 {
    702   size_t ibucket;
    703   res_T res = RES_OK;
    704   ASSERT(proxy);
    705 
    706   mutex_lock(proxy->mutex);
    707   FOR_EACH(ibucket, 0, sa_size(proxy->pools)) {
    708     res = rng_state_cache_clear(proxy->states+ibucket);
    709     if(res != RES_OK) break;
    710   }
    711   mutex_unlock(proxy->mutex);
    712   return res;
    713 }
    714 
    715 static void
    716 rng_proxy_release(ref_T* ref)
    717 {
    718   struct ssp_rng_proxy* proxy;
    719   ASSERT(ref);
    720   proxy = CONTAINER_OF(ref, struct ssp_rng_proxy, ref);
    721   rng_proxy_clear(proxy);
    722   sa_release(proxy->states);
    723   sa_release(proxy->pools);
    724   sa_release(proxy->buckets);
    725   sa_release(proxy->per_bucket_sequence_id);
    726   if(proxy->rng) SSP(rng_ref_put(proxy->rng));
    727   if(proxy->mutex) mutex_destroy(proxy->mutex);
    728   MEM_RM(proxy->allocator, proxy);
    729 }
    730 
    731 /*******************************************************************************
    732  * Exported functions
    733  ******************************************************************************/
    734 res_T
    735 ssp_rng_proxy_create
    736   (struct mem_allocator* mem_allocator,
    737    const enum ssp_rng_type type,
    738    const size_t nbuckets,
    739    struct ssp_rng_proxy** out_proxy)
    740 {
    741   struct ssp_rng_proxy_create2_args args = SSP_RNG_PROXY_CREATE2_ARGS_NULL;
    742   const size_t sz = BUCKET_SIZE_DEFAULT * nbuckets;
    743   args.type = type;
    744   args.sequence_offset = 0;
    745   args.sequence_size = sz;
    746   args.sequence_pitch = sz;
    747   args.nbuckets = nbuckets;
    748   return ssp_rng_proxy_create2(mem_allocator, &args, out_proxy);
    749 }
    750 
    751 res_T
    752 ssp_rng_proxy_create_from_rng
    753   (struct mem_allocator* mem_allocator,
    754    const struct ssp_rng* rng,
    755    const size_t nbuckets,
    756    struct ssp_rng_proxy** out_proxy)
    757 {
    758   struct ssp_rng_proxy_create2_args args = SSP_RNG_PROXY_CREATE2_ARGS_NULL;
    759   const size_t sz = BUCKET_SIZE_DEFAULT * nbuckets;
    760   args.rng = rng;
    761   args.sequence_offset = 0;
    762   args.sequence_size = sz;
    763   args.sequence_pitch = sz;
    764   args.nbuckets = nbuckets;
    765   return ssp_rng_proxy_create2(mem_allocator, &args, out_proxy);
    766 }
    767 
    768 res_T
    769 ssp_rng_proxy_create2
    770   (struct mem_allocator* mem_allocator,
    771    const struct ssp_rng_proxy_create2_args* args,
    772    struct ssp_rng_proxy** out_proxy)
    773 {
    774   struct darray_char buf;
    775   struct mem_allocator* allocator = NULL;
    776   struct ssp_rng_proxy* proxy = NULL;
    777   size_t i;
    778   res_T res = RES_OK;
    779 
    780   darray_char_init(mem_allocator, &buf);
    781 
    782   if(!args
    783   || !out_proxy
    784   || !args->sequence_size
    785   || !args->nbuckets
    786   || (args->type == SSP_RNG_TYPE_NULL && !args->rng)
    787   || args->sequence_pitch < args->sequence_size
    788   || args->sequence_size < args->nbuckets) {
    789     res = RES_BAD_ARG;
    790     goto error;
    791   }
    792 
    793   allocator = mem_allocator ? mem_allocator : &mem_default_allocator;
    794   proxy = (struct ssp_rng_proxy*)MEM_CALLOC(allocator, 1, sizeof(*proxy));
    795   if(!proxy) {
    796     res = RES_MEM_ERR;
    797     goto error;
    798   }
    799   proxy->allocator = allocator;
    800   ref_init(&proxy->ref);
    801   proxy->bucket_size = args->sequence_size / args->nbuckets;
    802   proxy->sequence_size = args->sequence_size;
    803   proxy->sequence_bias =
    804     args->sequence_pitch - (proxy->bucket_size * args->nbuckets);
    805 
    806   proxy->main_sequence_id = SSP_SEQUENCE_ID_NONE;
    807 
    808   /* Create the proxy RNG in its default state */
    809   if(!args->rng) {
    810     res = ssp_rng_create(allocator, args->type, &proxy->rng);
    811     if(res != RES_OK) goto error;
    812     proxy->type = args->type;
    813 
    814   /* Create the proxy RNG from a submitted RNG state */
    815   } else {
    816     size_t len;
    817 
    818     /* Create the RNG proxy of the type of the submitted RNG. Simply Ignore the
    819      * submitted RNG type if any */
    820     res = ssp_rng_get_type(args->rng, &proxy->type);
    821     if(res != RES_OK) goto error;
    822 
    823     /* Bucket RNG is not allowed to be a proxy RNG */
    824     if(args->rng->desc.init == rng_bucket_init) {
    825       res = RES_BAD_ARG;
    826       goto error;
    827     }
    828 
    829     res = ssp_rng_create(allocator, proxy->type, &proxy->rng);
    830     if(res != RES_OK) goto error;
    831 
    832     /* Initialise the RNG proxy state from the state of the submitted RNG */
    833     res = ssp_rng_write_cstr(args->rng, NULL, 0, &len);
    834     if(res != RES_OK) goto error;
    835     res = darray_char_resize(&buf, len+1/*Null char*/);
    836     if(res != RES_OK) goto error;
    837     res = ssp_rng_write_cstr(args->rng, darray_char_data_get(&buf), len+1, &len);
    838     if(res != RES_OK) goto error;
    839     res = ssp_rng_read_cstr(proxy->rng, darray_char_cdata_get(&buf));
    840     if(res != RES_OK) goto error;
    841   }
    842 
    843   res = ssp_rng_discard(proxy->rng, args->sequence_offset);
    844   if(res != RES_OK) goto error;
    845 
    846   proxy->mutex = mutex_create();
    847   if(!proxy->mutex) {
    848     res = RES_MEM_ERR;
    849     goto error;
    850   }
    851 
    852   FOR_EACH(i, 0, RNG_PROXY_SIGS_COUNT__) {
    853     SIG_INIT(proxy->signals + i);
    854   }
    855 
    856   res = rng_proxy_setup(proxy, args->sequence_pitch, args->nbuckets);
    857   if(res != RES_OK) goto error;
    858 
    859 exit:
    860   darray_char_release(&buf);
    861   if(out_proxy) *out_proxy = proxy;
    862   return res;
    863 error:
    864   if(proxy) {
    865     SSP(rng_proxy_ref_put(proxy));
    866     proxy = NULL;
    867   }
    868   goto exit;
    869 }
    870 
    871 res_T
    872 ssp_rng_proxy_read(struct ssp_rng_proxy* proxy, FILE* stream)
    873 {
    874   res_T res = RES_OK;
    875   if(!proxy || !stream) return RES_BAD_ARG;
    876 
    877   mutex_lock(proxy->mutex);
    878   res = ssp_rng_read(proxy->rng, stream);
    879   mutex_unlock(proxy->mutex);
    880   if(res != RES_OK) return res;
    881 
    882   /* Discard the cached RNG states */
    883   res = rng_proxy_clear_caches(proxy);
    884   if(res != RES_OK) return res;
    885 
    886   /* Notify to bucket RNGs that the proxy RNG state was updated */
    887   SIG_BROADCAST
    888     (proxy->signals+RNG_PROXY_SIG_SET_STATE, rng_proxy_cb_T, ARG1(proxy));
    889   return RES_OK;
    890 }
    891 
    892 res_T
    893 ssp_rng_proxy_write(const struct ssp_rng_proxy* proxy, FILE* stream)
    894 {
    895   res_T res = RES_OK;
    896 
    897   if(!proxy || !stream) return RES_BAD_ARG;
    898 
    899   mutex_lock(proxy->mutex);
    900   res = ssp_rng_write(proxy->rng, stream);
    901   mutex_unlock(proxy->mutex);
    902   return res;
    903 }
    904 
    905 res_T
    906 ssp_rng_proxy_ref_get(struct ssp_rng_proxy* proxy)
    907 {
    908   if(!proxy) return RES_BAD_ARG;
    909   ref_get(&proxy->ref);
    910   return RES_OK;
    911 }
    912 
    913 res_T
    914 ssp_rng_proxy_ref_put(struct ssp_rng_proxy* proxy)
    915 {
    916   if(!proxy) return RES_BAD_ARG;
    917   ref_put(&proxy->ref, rng_proxy_release);
    918   return RES_OK;
    919 }
    920 
    921 res_T
    922 ssp_rng_proxy_create_rng
    923   (struct ssp_rng_proxy* proxy,
    924    const size_t ibucket,
    925    struct ssp_rng** out_rng)
    926 {
    927   struct ssp_rng* rng = NULL;
    928   struct rng_desc desc = RNG_BUCKET_NULL;
    929   struct rng_bucket* bucket = NULL;
    930   res_T res = RES_OK;
    931 
    932   if(!proxy || ibucket >= sa_size(proxy->buckets) || !out_rng) {
    933     res = RES_BAD_ARG;
    934     goto error;
    935   }
    936 
    937   if(ATOMIC_CAS(&proxy->buckets[ibucket], 1, 0) == 1) {
    938     res = RES_BAD_ARG;
    939     goto error;
    940   }
    941 
    942   /* Update the dummy rng bucket min/max value with the min/max values of the
    943    * RNG desc on which the proxy relies */
    944   desc.min = proxy->rng->desc.min;
    945   desc.max = proxy->rng->desc.max;
    946 
    947   res = rng_create(proxy->allocator, &desc, &rng);
    948   if(res != RES_OK) goto error;
    949   rng->type = proxy->type;
    950   bucket = (struct rng_bucket*)rng->state;
    951   bucket->name = ibucket;
    952   bucket->proxy = proxy;
    953   SSP(rng_proxy_ref_get(proxy));
    954 
    955   /* The bucket RNG listens the "write" signal of the proxy to reset its
    956    * internal RNs counter on "write" invocation. */
    957   CLBK_INIT(&bucket->cb_on_proxy_set_state);
    958   CLBK_SETUP
    959     (&bucket->cb_on_proxy_set_state, rng_bucket_on_proxy_set_state, bucket);
    960   SIG_CONNECT_CLBK
    961     (proxy->signals+RNG_PROXY_SIG_SET_STATE, &bucket->cb_on_proxy_set_state);
    962 
    963 exit:
    964   if(out_rng) *out_rng = rng;
    965   return res;
    966 error:
    967   if(rng) {
    968     SSP(rng_ref_put(rng));
    969     rng = NULL;
    970   }
    971   goto exit;
    972 }
    973 
    974 res_T
    975 ssp_rng_proxy_get_type
    976   (const struct ssp_rng_proxy* proxy,
    977    enum ssp_rng_type* type)
    978 {
    979   if(!proxy || !type) return RES_BAD_ARG;
    980   *type = proxy->type;
    981   return RES_OK;
    982 }
    983 
    984 res_T
    985 ssp_rng_proxy_get_sequence_id(const struct ssp_rng_proxy* proxy, size_t* out_id)
    986 {
    987   size_t id = SSP_SEQUENCE_ID_NONE;
    988   size_t i;
    989 
    990   if(!proxy || !out_id) return RES_BAD_ARG;
    991 
    992   mutex_lock(proxy->mutex);
    993   FOR_EACH(i, 0, sa_size(proxy->per_bucket_sequence_id)) {
    994     if(proxy->per_bucket_sequence_id[i] == SSP_SEQUENCE_ID_NONE) continue;
    995     id = id == SSP_SEQUENCE_ID_NONE
    996       ? proxy->per_bucket_sequence_id[i]
    997       : MMAX(id, proxy->per_bucket_sequence_id[i]);
    998   }
    999   mutex_unlock(proxy->mutex);
   1000 
   1001   *out_id = id;
   1002   return RES_OK;
   1003 }
   1004 
   1005 res_T
   1006 ssp_rng_proxy_flush_sequences
   1007   (struct ssp_rng_proxy* proxy,
   1008    const size_t nseqs)
   1009 {
   1010   size_t nseqs_proxy = 0;
   1011   size_t iseq;
   1012   size_t i;
   1013   res_T res = RES_OK;
   1014 
   1015   if(!proxy) {
   1016     res = RES_BAD_ARG;
   1017     goto error;
   1018   }
   1019 
   1020   /* Nothing to discard */
   1021   if(nseqs == 0) goto exit;
   1022 
   1023   res = ssp_rng_proxy_get_sequence_id(proxy, &iseq);
   1024   if(res != RES_OK) goto error;
   1025 
   1026   /* Calculate the number of sequences to flush for the main RNG, i.e. the one
   1027    * used when the cache is in use. We want to dump the 'nseqs' sequences. That
   1028    * said, since the status of the RNG is set to the status of the 1st random
   1029    * number of the next sequence, it is enough to 'nseqs-1' sequences and clear
   1030    * the cache. Anyway, note that the sequence identifier of the main RNG may
   1031    * be behind the global sequence identifier. This happens when the cache
   1032    * mechanism is no longer used. In this case, we need to dump the extra
   1033    * sequences from the main RNG to match the current sequence index of the
   1034    * proxy. */
   1035   nseqs_proxy = (nseqs - 1) + (iseq - proxy->main_sequence_id);
   1036 
   1037   mutex_lock(proxy->mutex);
   1038   res = ssp_rng_discard(proxy->rng, proxy->sequence_size * nseqs_proxy);
   1039   mutex_unlock(proxy->mutex);
   1040   if(res != RES_OK) goto error;
   1041 
   1042   proxy->main_sequence_id += nseqs_proxy;
   1043 
   1044   /* Discard the cached RNG states */
   1045   rng_proxy_clear_caches(proxy);
   1046 
   1047   /* Reset the RNGs sequence id */
   1048   FOR_EACH(i, 0, sa_size(proxy->per_bucket_sequence_id)) {
   1049     proxy->per_bucket_sequence_id[i] = proxy->main_sequence_id;
   1050   }
   1051 
   1052   /* Notify to bucket RNGs that the proxy RNG state was updated */
   1053   SIG_BROADCAST
   1054     (proxy->signals+RNG_PROXY_SIG_SET_STATE, rng_proxy_cb_T, ARG1(proxy));
   1055 
   1056 exit:
   1057   return res;
   1058 error:
   1059   goto exit;
   1060 }