ssp_rng_proxy.c (30287B)
1 /* Copyright (C) 2015-2025 |Méso|Star> (contact@meso-star.com) 2 * 3 * This program is free software: you can redistribute it and/or modify 4 * it under the terms of the GNU General Public License as published by 5 * the Free Software Foundation, either version 3 of the License, or 6 * (at your option) any later version. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public License 14 * along with this program. If not, see <http://www.gnu.org/licenses/>. */ 15 16 #define _POSIX_C_SOURCE 200809L /* fmemopen, getpid */ 17 18 #include "ssp_rng_c.h" 19 20 #include <rsys/dynamic_array_char.h> 21 #include <rsys/mutex.h> 22 #include <rsys/ref_count.h> 23 #include <rsys/signal.h> 24 #include <rsys/stretchy_array.h> 25 26 #include <limits.h> 27 #include <unistd.h> /* getpid */ 28 29 #define BUCKET_SIZE_DEFAULT 1000000 /* #RNs per bucket */ 30 31 /* Cache capacity. Maximum size in bytes that the cache can store */ 32 #if 1 33 #define STATE_CACHE_CAPACITY (4*1024*1024) /* 4 MB */ 34 #else 35 #define STATE_CACHE_CAPACITY 0 /* Disable the cache */ 36 #endif 37 38 /* Cache of RNG states */ 39 struct rng_state_cache { 40 struct darray_char state; /* Save the next RNG state with 'no_wstream' */ 41 struct darray_char state_scratch; /* Scracth state buffer */ 42 43 struct mem_allocator* allocator; 44 void* buffer; /* Memory in which RNG states are stored */ 45 FILE* stream; /* Stream to buffer storing RNG states */ 46 47 size_t state_pitch; /* #RNs between 2 cached states */ 48 size_t nstates; /* #cached states */ 49 50 long read, write; /* Offset into the stream where to read/write RNG states */ 51 long end_of_cache; /* Stream offset that marks the end of cache */ 52 53 int no_wstream; /* Define if the RNG states are no more written to a stream */ 54 int no_rstream; /* Define if the RNG states are no more read from a stream */ 55 56 int is_init; /* Cache initialisation state */ 57 }; 58 59 CLBK(rng_proxy_cb_T, ARG1(const struct ssp_rng_proxy*)); 60 61 enum rng_proxy_sig { 62 RNG_PROXY_SIG_SET_STATE, 63 RNG_PROXY_SIGS_COUNT__ 64 }; 65 66 /* A proxy manages a list of N independent RNGs of the same type named buckets. 67 * One ensure that each bucket have independent `infinite' random numbers by 68 * partitioning a unique random sequence in N random pools, each containing 69 * `bucket_size' random numbers. When a bucket has no more random number in its 70 * affected pool, it silently retrieves a new pool of `bucket_size' random 71 * numbers from the proxy. 72 * 73 * Mapping of the partitions of the unique random sequence to N buckets 74 * 75 * bucket_size 76 * / \ 77 * +---------+---------+- -+---------+---------+---------+- 78 * | Bucket0 | Bucket1 | ... |BucketN-1| Bucket0 | Bucket1 | ... 79 * +---------+---------+- -+---------+---------+---------+- 80 * Unique random sequence */ 81 struct ssp_rng_proxy { 82 enum ssp_rng_type type; /* Type of the RNGs managed by the proxy */ 83 84 struct ssp_rng* rng; /* Main `type' RNG */ 85 86 /* The following arrays have the same size */ 87 ATOMIC* buckets; /* Flag that defines which bucket RNGs are created */ 88 size_t sequence_size; /* #RNs in a sequence */ 89 size_t sequence_bias; /* #RNs to discard between 2 consecutive sequence */ 90 size_t bucket_size; /* #random numbers per bucket */ 91 struct ssp_rng** pools; /* `type' RNGs wrapped by bucket RNGs */ 92 struct rng_state_cache* states; /* Cache of `type' RNG states */ 93 94 /* Index of the last queried sequence. This index is independent of the 95 * original seed used by the proxy and is designed to identify the status of 96 * the proxy relative to that original seed. When the proxy is created, the 97 * sequence index is SSP_SEQUENCE_ID_NONE, i.e. no sequence was queried. At 98 * the first request for a random number, the first sequence is consumed and 99 * this sequence index is then 0. It is then incremented by one each time a 100 * new sequence is required. 101 * 102 * Each bucket stores the sequence ID of its local RNG. The proxy sequence ID 103 * is the maximum between these local sequence indices. Note that we also 104 * keep track of the RNG proxy's sequence index (main_sequence_id); it is 105 * equal to the proxy sequence identifier only when the caching mechanism is 106 * still in use. */ 107 size_t* per_bucket_sequence_id; 108 size_t main_sequence_id; 109 110 signal_T signals[RNG_PROXY_SIGS_COUNT__]; 111 112 struct mutex* mutex; 113 struct mem_allocator* allocator; 114 ref_T ref; 115 }; 116 117 /* Return a RNG with a pool of `bucket_size' indenpendant random numbers. Each 118 * pool are ensured to be independant per `bucket_id' in [0, N) and per function 119 * call, i.e. calling this function X times with the same bucket_id will 120 * provide X different random pools. 121 * 122 * bucket_size sequence_bias 123 * / \ / \ 124 * +------+------------+- -+------------+----+------------+- 125 * |######| Bucket 0 | ... | Bucket N-1 |####| Bucket 0 | ... 126 * |######| 1st pool | | 1st pool |####| 2nd pool | 127 * +------+------------+- -+------------+----+------------+- 128 * \ / \_________sequence_size_______/ / 129 * sequence \________sequence_pitch__________/ 130 * offset 131 */ 132 static struct ssp_rng* 133 rng_proxy_next_ran_pool 134 (struct ssp_rng_proxy* proxy, 135 const size_t bucket_id); 136 137 /* Write the RNG state into buf. State data are terminated by a null char */ 138 static res_T 139 rng_write_cstr 140 (const struct ssp_rng* rng, 141 struct darray_char* buf, 142 size_t* out_len) /* May be NULL. String length without the null char */ 143 { 144 size_t len; 145 res_T res = RES_OK; 146 ASSERT(rng && buf); 147 148 /* Write the RNG state into a temporary buffer */ 149 res = ssp_rng_write_cstr 150 (rng, darray_char_data_get(buf), darray_char_size_get(buf), &len); 151 if(res != RES_OK) goto error; 152 153 /* Not sufficient space to store the state */ 154 if(len >= darray_char_size_get(buf)) { 155 res = darray_char_resize(buf, len + 1/*null char*/); 156 if(res != RES_OK) goto error; 157 158 res = ssp_rng_write_cstr 159 (rng, darray_char_data_get(buf), darray_char_size_get(buf), &len); 160 if(res != RES_OK) goto error; 161 ASSERT(len + 1/*null char*/ == darray_char_size_get(buf)); 162 } 163 164 if(out_len) *out_len = len; 165 166 exit: 167 return res; 168 error: 169 goto exit; 170 } 171 172 /******************************************************************************* 173 * Cache of RNG states 174 ******************************************************************************/ 175 static void 176 rng_state_cache_release(struct rng_state_cache* cache) 177 { 178 ASSERT(cache); 179 if(!cache->is_init) return; /* Nothing to release */ 180 181 if(cache->stream) fclose(cache->stream); 182 if(cache->buffer) MEM_RM(cache->allocator, cache->buffer); 183 darray_char_release(&cache->state); 184 darray_char_release(&cache->state_scratch); 185 } 186 187 static res_T 188 rng_state_cache_init 189 (struct mem_allocator* allocator, 190 const size_t state_pitch, /* #RNs between cached states */ 191 struct rng_state_cache* cache) 192 { 193 res_T res = RES_OK; 194 ASSERT(cache); 195 196 memset(cache, 0, sizeof(*cache)); 197 darray_char_init(allocator, &cache->state); 198 darray_char_init(allocator, &cache->state_scratch); 199 cache->allocator = allocator; 200 cache->end_of_cache = STATE_CACHE_CAPACITY; 201 cache->state_pitch = state_pitch; 202 203 if(STATE_CACHE_CAPACITY != 0) { 204 cache->buffer = MEM_CALLOC(allocator, 1, STATE_CACHE_CAPACITY); 205 if(!cache->buffer) { res = RES_MEM_ERR; goto error; } 206 cache->stream = fmemopen(cache->buffer, STATE_CACHE_CAPACITY, "w+"); 207 if(!cache->stream) { res = RES_IO_ERR; goto error; } 208 cache->read = cache->write = ftell(cache->stream); 209 } 210 211 cache->is_init = 1; 212 213 exit: 214 return res; 215 error: 216 rng_state_cache_release(cache); 217 goto exit; 218 } 219 220 static res_T 221 rng_state_cache_clear(struct rng_state_cache* cache) 222 { 223 ASSERT(cache); 224 if(cache->stream) { 225 rewind(cache->stream); 226 cache->read = cache->write = ftell(cache->stream); 227 } else { 228 ASSERT(STATE_CACHE_CAPACITY == 0); 229 cache->read = cache->write = 0; 230 } 231 cache->nstates = 0; 232 cache->no_wstream = 0; 233 cache->no_rstream = 0; 234 return RES_OK; 235 } 236 237 static char 238 rng_state_cache_is_empty(struct rng_state_cache* cache) 239 { 240 ASSERT(cache); 241 return cache->nstates == 0; 242 } 243 244 static res_T 245 rng_state_cache_dump(struct rng_state_cache* cache) 246 { 247 /* Output file into which cache stream is dumped */ 248 char name[128] = {0}; 249 pid_t process = 0; /* Process identifier */ 250 FILE* fp = NULL; 251 252 /* Temporary memory to copy cache stream */ 253 void* mem = NULL; 254 255 /* Miscellaneous */ 256 long offset = 0; 257 int n = 0; 258 res_T res = RES_OK; 259 260 ASSERT(cache); 261 262 /* No cache to dump */ 263 if(!cache->stream) goto exit; 264 265 process = getpid(); 266 267 #define TRY(Cond, Err) { \ 268 if(!(Cond)) { \ 269 fprintf(stderr, "%s:%d: %s\n", FUNC_NAME, __LINE__, strerror(Err)); \ 270 switch(Err) { \ 271 case EIO: res = RES_IO_ERR; break; \ 272 case ENOMEM: res = RES_MEM_ERR; break; \ 273 default: res = RES_UNKNOWN_ERR; break; \ 274 } \ 275 goto error; \ 276 } \ 277 } (void)0 278 279 /* Requests the offset at the end of the file, 280 * i.e. the size of the cache stream */ 281 TRY(fseek(cache->stream, 0, SEEK_END) == 0, errno); 282 TRY((offset = ftell(cache->stream)) >= 0, errno); 283 284 /* Define the state cache filename */ 285 n = snprintf(name, sizeof(name), "rng_cache_r%lu_w%lu_s%ld_%d", 286 cache->read, cache->write, offset, process); 287 TRY(n >= 0, errno); 288 TRY((unsigned long)n < sizeof(name), ENOMEM); 289 290 /* Create the output file */ 291 TRY((fp = fopen(name, "w")) != NULL, errno); 292 293 rewind(cache->stream); 294 295 /* Load cache stream in mem, and then dump it to the state cache */ 296 TRY((mem = mem_alloc(offset)) != NULL, ENOMEM); 297 TRY((fread(mem, offset, 1, cache->stream)) == 1, EIO); 298 TRY((fwrite(mem, offset, 1, fp)) == 1, EIO); 299 300 #undef TRY 301 302 exit: 303 if(mem) mem_rm(mem); 304 if(fp) CHK(fclose(fp) == 0); 305 return res; 306 error: 307 goto exit; 308 } 309 310 static res_T 311 rng_state_cache_read 312 (struct rng_state_cache* cache, 313 struct ssp_rng* rng, 314 struct mutex* mutex) /* Proxy mutex */ 315 { 316 res_T res = RES_OK; 317 ASSERT(cache && rng && mutex); 318 319 mutex_lock(mutex); 320 ASSERT(!rng_state_cache_is_empty(cache)); 321 322 /* The read file pointer has reached the end of the cache. 323 * Rewind it to the beginning of the cache */ 324 if(cache->read == cache->end_of_cache) { 325 cache->read = 0; 326 cache->end_of_cache = STATE_CACHE_CAPACITY; 327 } 328 329 if(!cache->no_rstream 330 && cache->no_wstream 331 && cache->read == cache->write 332 && cache->nstates == 1/* A state is saved in 'cache->state' */) { 333 /* There is no more data cached into the stream. Do not rely anymore on the 334 * proxy RNG to generate the RNG states */ 335 cache->no_rstream = 1; 336 } 337 338 /* Read the cached RNG state from the stream */ 339 if(!cache->no_rstream) { 340 341 fseek(cache->stream, cache->read, SEEK_SET); 342 res = ssp_rng_read(rng, cache->stream); 343 if(res != RES_OK) { 344 mutex_unlock(mutex); 345 goto error; 346 } 347 cache->read = ftell(cache->stream); 348 349 /* Remove one cached states */ 350 cache->nstates -= 1; 351 352 mutex_unlock(mutex); 353 354 /* Generate the next RNG state and load the cached one */ 355 } else { 356 /* All is done locally to the RNG. We can thus unlock the proxy mutex */ 357 mutex_unlock(mutex); 358 359 /* Copy the cached RNG state */ 360 res = darray_char_copy(&cache->state_scratch, &cache->state); 361 if(res != RES_OK) goto error; 362 363 /* Load the cached RNG state */ 364 res = ssp_rng_read_cstr(rng, darray_char_cdata_get(&cache->state)); 365 if(res != RES_OK) goto error; 366 367 /* Setup the next RNG state */ 368 res = ssp_rng_discard(rng, cache->state_pitch); 369 if(res != RES_OK) goto error; 370 371 /* Save the next RNG state */ 372 res = rng_write_cstr(rng, &cache->state, NULL); 373 if(res != RES_OK) goto error; 374 375 /* Setup the current RNG state */ 376 res = ssp_rng_read_cstr(rng, darray_char_cdata_get(&cache->state_scratch)); 377 if(res != RES_OK) goto error; 378 } 379 380 exit: 381 return res; 382 error: 383 goto exit; 384 } 385 386 static res_T 387 rng_state_cache_write(struct rng_state_cache* cache, struct ssp_rng* rng) 388 { 389 long remaining_space = 0; /* Remaining cache space */ 390 size_t len = 0; /* Length of the cache state in bytes */ 391 res_T res = RES_OK; 392 393 ASSERT(cache && rng); 394 395 if(cache->no_wstream) goto exit; /* Do not cache the submitted state */ 396 397 /* Store in memory the state to be cached */ 398 res = rng_write_cstr(rng, &cache->state, &len); 399 if(res != RES_OK) goto error; 400 401 /* There are no spaces left at the end of the stream: rewind the writing */ 402 if(len > (size_t)(STATE_CACHE_CAPACITY - cache->write)) { 403 cache->end_of_cache = cache->write; /* Mark the end of cache */ 404 cache->write = 0; 405 } 406 407 /* Calculate remaining cache space */ 408 if(rng_state_cache_is_empty(cache) || cache->write > cache->read) { 409 remaining_space = STATE_CACHE_CAPACITY - cache->write; 410 } else { 411 remaining_space = cache->read - cache->write; 412 } 413 414 /* There is no sufficient space. Cache cannot be used */ 415 if(remaining_space < 0 || len > (size_t)remaining_space) { 416 cache->no_wstream = 1; 417 418 /* There is sufficient space. Write the RNG state into the cached stream */ 419 } else { 420 size_t sz = 0; 421 422 fseek(cache->stream, cache->write, SEEK_SET); 423 sz = fwrite(darray_char_cdata_get(&cache->state), 1, len, cache->stream); 424 if(sz != len) { res = RES_IO_ERR; goto error; } 425 cache->write = ftell(cache->stream); 426 427 /* Flush write state to detect a write error */ 428 if(fflush(cache->stream) != 0) { 429 res = RES_IO_ERR; 430 goto error; 431 } 432 } 433 434 /* Update the number of cached states */ 435 cache->nstates += 1; 436 437 exit: 438 return res; 439 error: 440 goto exit; 441 } 442 443 /******************************************************************************* 444 * RNG that control the scheduling of random number pools for a given bucket 445 ******************************************************************************/ 446 struct rng_bucket { 447 struct ssp_rng* pool; /* Wrapped RNG providing a pool of `bucket_size' RNs */ 448 struct ssp_rng_proxy* proxy; /* The RNG proxy */ 449 size_t name; /* Unique bucket identifier in [0, #buckets) */ 450 size_t count; /* Remaining unique random numbers in `pool' */ 451 rng_proxy_cb_T cb_on_proxy_set_state; 452 }; 453 454 static void 455 rng_bucket_on_proxy_set_state(const struct ssp_rng_proxy* proxy, void* ctx) 456 { 457 struct rng_bucket* rng = (struct rng_bucket*)ctx; 458 ASSERT(proxy && ctx && rng->proxy == proxy); 459 (void)proxy; 460 /* Reset bucket */ 461 rng->count = 0; 462 rng->pool = NULL; 463 } 464 465 static INLINE void 466 rng_bucket_next_ran_pool(struct rng_bucket* rng) 467 { 468 ASSERT(rng); 469 rng->pool = rng_proxy_next_ran_pool(rng->proxy, rng->name); 470 rng->count = rng->proxy->bucket_size; 471 } 472 473 static res_T 474 rng_bucket_set(void* data, const uint64_t seed) 475 { 476 (void)data, (void)seed; 477 return RES_BAD_OP; 478 } 479 480 static uint64_t 481 rng_bucket_get(void* data) 482 { 483 struct rng_bucket* rng = (struct rng_bucket*)data; 484 ASSERT(data); 485 if(!rng->count) rng_bucket_next_ran_pool(rng); 486 --rng->count; 487 return ssp_rng_get(rng->pool); 488 } 489 490 static res_T 491 rng_bucket_read(void* data, FILE* file) 492 { 493 (void)data, (void)file; 494 return RES_BAD_OP; 495 } 496 497 static res_T 498 rng_bucket_read_cstr(void* data, const char* cstr) 499 { 500 (void)data, (void)cstr; 501 return RES_BAD_OP; 502 } 503 504 static res_T 505 rng_bucket_write(const void* data, FILE* file) 506 { 507 (void)data, (void)file; 508 return RES_BAD_OP; 509 } 510 511 static res_T 512 rng_bucket_write_cstr 513 (const void* data, 514 char* buf, 515 const size_t bufsz, 516 size_t* len) 517 { 518 (void)data, (void)buf, (void)bufsz, (void)len; 519 return RES_BAD_OP; 520 } 521 522 static res_T 523 rng_bucket_init(struct mem_allocator* allocator, void* data) 524 { 525 struct rng_bucket* rng = (struct rng_bucket*)data; 526 ASSERT(data); 527 (void)allocator; 528 rng->proxy = NULL; 529 rng->pool = NULL; 530 rng->name = SIZE_MAX; 531 rng->count = 0; 532 return RES_OK; 533 } 534 535 static void 536 rng_bucket_release(void* data) 537 { 538 struct rng_bucket* rng = (struct rng_bucket*)data; 539 ASSERT(data && rng->proxy); 540 ATOMIC_SET(&rng->proxy->buckets[rng->name], 0); 541 CLBK_DISCONNECT(&rng->cb_on_proxy_set_state); 542 SSP(rng_proxy_ref_put(rng->proxy)); 543 } 544 545 static double 546 rng_bucket_entropy(const void* data) 547 { 548 (void)data; 549 return 0; 550 } 551 552 static res_T 553 rng_bucket_discard(void* data, uint64_t n) 554 { 555 struct rng_bucket* rng = (struct rng_bucket*)data; 556 ASSERT(data); 557 while (rng->count < n) { 558 n -= rng->count; 559 rng_bucket_next_ran_pool(rng); 560 } 561 rng->count -= n; 562 return ssp_rng_discard(rng->pool, n); 563 } 564 565 static const struct rng_desc RNG_BUCKET_NULL = { 566 rng_bucket_init, 567 rng_bucket_release, 568 rng_bucket_set, 569 rng_bucket_get, 570 rng_bucket_discard, 571 rng_bucket_read, 572 rng_bucket_read_cstr, 573 rng_bucket_write, 574 rng_bucket_write_cstr, 575 rng_bucket_entropy, 576 INT_MAX, /* Min dummy value */ 577 0, /* Max dummy value */ 578 sizeof(struct rng_bucket), 579 16 580 }; 581 582 /******************************************************************************* 583 * Helper functions 584 ******************************************************************************/ 585 /* Scheduler of random number pools */ 586 struct ssp_rng* 587 rng_proxy_next_ran_pool 588 (struct ssp_rng_proxy* proxy, 589 const size_t bucket_name) 590 { 591 res_T res = RES_OK; 592 ASSERT(proxy); 593 ASSERT(bucket_name <= sa_size(proxy->buckets)); 594 ASSERT(bucket_name <= sa_size(proxy->per_bucket_sequence_id)); 595 596 mutex_lock(proxy->mutex); 597 598 if(rng_state_cache_is_empty(proxy->states + bucket_name)) { 599 size_t ibucket; 600 /* Register a new state for *all* buckets */ 601 FOR_EACH(ibucket, 0, sa_size(proxy->states)) { 602 res = rng_state_cache_write(proxy->states + ibucket, proxy->rng); 603 if(res != RES_OK) { 604 rng_state_cache_dump(proxy->states + ibucket); 605 FATAL("RNG proxy: cannot write to state cache\n"); 606 } 607 ssp_rng_discard(proxy->rng, proxy->bucket_size); 608 } 609 /* Discard RNs to reach the next sequence */ 610 ssp_rng_discard(proxy->rng, proxy->sequence_bias); 611 612 /* Increment the sequence id of the main RNG */ 613 proxy->main_sequence_id += 1; 614 } 615 mutex_unlock(proxy->mutex); 616 617 /* Read the RNG pool state of `bucket_name' */ 618 res = rng_state_cache_read 619 (proxy->states + bucket_name, 620 proxy->pools[bucket_name], 621 proxy->mutex); 622 if(res != RES_OK) { 623 mutex_lock(proxy->mutex); 624 rng_state_cache_dump(proxy->states + bucket_name); 625 mutex_unlock(proxy->mutex); 626 FATAL("RNG proxy: cannot read from state cache\n"); 627 } 628 629 /* Update the sequence of the bucket RNG */ 630 proxy->per_bucket_sequence_id[bucket_name] += 1; 631 632 return proxy->pools[bucket_name]; 633 } 634 635 static void 636 rng_proxy_clear(struct ssp_rng_proxy* proxy) 637 { 638 size_t ibucket; 639 ASSERT(proxy); 640 ASSERT(sa_size(proxy->pools) == sa_size(proxy->buckets)); 641 ASSERT(sa_size(proxy->pools) == sa_size(proxy->states)); 642 643 FOR_EACH(ibucket, 0, sa_size(proxy->pools)) { 644 ASSERT(proxy->buckets[ibucket] == 0); /* No bucket RNG should be created */ 645 if(proxy->pools[ibucket]) SSP(rng_ref_put(proxy->pools[ibucket])); 646 rng_state_cache_release(proxy->states + ibucket); 647 } 648 sa_clear(proxy->buckets); 649 sa_clear(proxy->pools); 650 sa_clear(proxy->states); 651 } 652 653 static res_T 654 rng_proxy_setup 655 (struct ssp_rng_proxy* proxy, 656 const size_t sequence_pitch, /* #RNs between 2 consecutive sequences */ 657 const size_t nbuckets) 658 { 659 size_t ibucket; 660 res_T res = RES_OK; 661 662 ASSERT(proxy && sequence_pitch && nbuckets); 663 rng_proxy_clear(proxy); 664 665 sa_add(proxy->states, nbuckets); 666 sa_add(proxy->pools, nbuckets); 667 sa_add(proxy->buckets, nbuckets); 668 sa_add(proxy->per_bucket_sequence_id, nbuckets); 669 670 /* Clearing allocated memory. This operation is necessary to manage errors and 671 * identify which data has been initialised and which has not */ 672 memset(proxy->states, 0, sizeof(*proxy->states)*nbuckets); 673 memset(proxy->pools, 0, sizeof(*proxy->pools)*nbuckets); 674 memset(proxy->buckets, 0, sizeof(*proxy->buckets)*nbuckets); 675 memset(proxy->per_bucket_sequence_id, 0, 676 sizeof(*proxy->per_bucket_sequence_id)*nbuckets); 677 678 FOR_EACH(ibucket, 0, nbuckets) { 679 res = rng_state_cache_init 680 (proxy->allocator, sequence_pitch, proxy->states+ibucket); 681 if(res != RES_OK) goto error; 682 res = ssp_rng_create(proxy->allocator, proxy->type, proxy->pools+ibucket); 683 if(res != RES_OK) goto error; 684 proxy->buckets[ibucket] = 0; 685 686 /* Set the sequence index to SIZE_MAX because no sequence is active until a 687 * random number query is made. On the first query, the index will be 688 * incremented to 0 */ 689 proxy->per_bucket_sequence_id[ibucket] = SSP_SEQUENCE_ID_NONE/*<=> SIZE_MAX*/; 690 } 691 692 exit: 693 return res; 694 error: 695 if(proxy) rng_proxy_clear(proxy); 696 goto exit; 697 } 698 699 static res_T 700 rng_proxy_clear_caches(struct ssp_rng_proxy* proxy) 701 { 702 size_t ibucket; 703 res_T res = RES_OK; 704 ASSERT(proxy); 705 706 mutex_lock(proxy->mutex); 707 FOR_EACH(ibucket, 0, sa_size(proxy->pools)) { 708 res = rng_state_cache_clear(proxy->states+ibucket); 709 if(res != RES_OK) break; 710 } 711 mutex_unlock(proxy->mutex); 712 return res; 713 } 714 715 static void 716 rng_proxy_release(ref_T* ref) 717 { 718 struct ssp_rng_proxy* proxy; 719 ASSERT(ref); 720 proxy = CONTAINER_OF(ref, struct ssp_rng_proxy, ref); 721 rng_proxy_clear(proxy); 722 sa_release(proxy->states); 723 sa_release(proxy->pools); 724 sa_release(proxy->buckets); 725 sa_release(proxy->per_bucket_sequence_id); 726 if(proxy->rng) SSP(rng_ref_put(proxy->rng)); 727 if(proxy->mutex) mutex_destroy(proxy->mutex); 728 MEM_RM(proxy->allocator, proxy); 729 } 730 731 /******************************************************************************* 732 * Exported functions 733 ******************************************************************************/ 734 res_T 735 ssp_rng_proxy_create 736 (struct mem_allocator* mem_allocator, 737 const enum ssp_rng_type type, 738 const size_t nbuckets, 739 struct ssp_rng_proxy** out_proxy) 740 { 741 struct ssp_rng_proxy_create2_args args = SSP_RNG_PROXY_CREATE2_ARGS_NULL; 742 const size_t sz = BUCKET_SIZE_DEFAULT * nbuckets; 743 args.type = type; 744 args.sequence_offset = 0; 745 args.sequence_size = sz; 746 args.sequence_pitch = sz; 747 args.nbuckets = nbuckets; 748 return ssp_rng_proxy_create2(mem_allocator, &args, out_proxy); 749 } 750 751 res_T 752 ssp_rng_proxy_create_from_rng 753 (struct mem_allocator* mem_allocator, 754 const struct ssp_rng* rng, 755 const size_t nbuckets, 756 struct ssp_rng_proxy** out_proxy) 757 { 758 struct ssp_rng_proxy_create2_args args = SSP_RNG_PROXY_CREATE2_ARGS_NULL; 759 const size_t sz = BUCKET_SIZE_DEFAULT * nbuckets; 760 args.rng = rng; 761 args.sequence_offset = 0; 762 args.sequence_size = sz; 763 args.sequence_pitch = sz; 764 args.nbuckets = nbuckets; 765 return ssp_rng_proxy_create2(mem_allocator, &args, out_proxy); 766 } 767 768 res_T 769 ssp_rng_proxy_create2 770 (struct mem_allocator* mem_allocator, 771 const struct ssp_rng_proxy_create2_args* args, 772 struct ssp_rng_proxy** out_proxy) 773 { 774 struct darray_char buf; 775 struct mem_allocator* allocator = NULL; 776 struct ssp_rng_proxy* proxy = NULL; 777 size_t i; 778 res_T res = RES_OK; 779 780 darray_char_init(mem_allocator, &buf); 781 782 if(!args 783 || !out_proxy 784 || !args->sequence_size 785 || !args->nbuckets 786 || (args->type == SSP_RNG_TYPE_NULL && !args->rng) 787 || args->sequence_pitch < args->sequence_size 788 || args->sequence_size < args->nbuckets) { 789 res = RES_BAD_ARG; 790 goto error; 791 } 792 793 allocator = mem_allocator ? mem_allocator : &mem_default_allocator; 794 proxy = (struct ssp_rng_proxy*)MEM_CALLOC(allocator, 1, sizeof(*proxy)); 795 if(!proxy) { 796 res = RES_MEM_ERR; 797 goto error; 798 } 799 proxy->allocator = allocator; 800 ref_init(&proxy->ref); 801 proxy->bucket_size = args->sequence_size / args->nbuckets; 802 proxy->sequence_size = args->sequence_size; 803 proxy->sequence_bias = 804 args->sequence_pitch - (proxy->bucket_size * args->nbuckets); 805 806 proxy->main_sequence_id = SSP_SEQUENCE_ID_NONE; 807 808 /* Create the proxy RNG in its default state */ 809 if(!args->rng) { 810 res = ssp_rng_create(allocator, args->type, &proxy->rng); 811 if(res != RES_OK) goto error; 812 proxy->type = args->type; 813 814 /* Create the proxy RNG from a submitted RNG state */ 815 } else { 816 size_t len; 817 818 /* Create the RNG proxy of the type of the submitted RNG. Simply Ignore the 819 * submitted RNG type if any */ 820 res = ssp_rng_get_type(args->rng, &proxy->type); 821 if(res != RES_OK) goto error; 822 823 /* Bucket RNG is not allowed to be a proxy RNG */ 824 if(args->rng->desc.init == rng_bucket_init) { 825 res = RES_BAD_ARG; 826 goto error; 827 } 828 829 res = ssp_rng_create(allocator, proxy->type, &proxy->rng); 830 if(res != RES_OK) goto error; 831 832 /* Initialise the RNG proxy state from the state of the submitted RNG */ 833 res = ssp_rng_write_cstr(args->rng, NULL, 0, &len); 834 if(res != RES_OK) goto error; 835 res = darray_char_resize(&buf, len+1/*Null char*/); 836 if(res != RES_OK) goto error; 837 res = ssp_rng_write_cstr(args->rng, darray_char_data_get(&buf), len+1, &len); 838 if(res != RES_OK) goto error; 839 res = ssp_rng_read_cstr(proxy->rng, darray_char_cdata_get(&buf)); 840 if(res != RES_OK) goto error; 841 } 842 843 res = ssp_rng_discard(proxy->rng, args->sequence_offset); 844 if(res != RES_OK) goto error; 845 846 proxy->mutex = mutex_create(); 847 if(!proxy->mutex) { 848 res = RES_MEM_ERR; 849 goto error; 850 } 851 852 FOR_EACH(i, 0, RNG_PROXY_SIGS_COUNT__) { 853 SIG_INIT(proxy->signals + i); 854 } 855 856 res = rng_proxy_setup(proxy, args->sequence_pitch, args->nbuckets); 857 if(res != RES_OK) goto error; 858 859 exit: 860 darray_char_release(&buf); 861 if(out_proxy) *out_proxy = proxy; 862 return res; 863 error: 864 if(proxy) { 865 SSP(rng_proxy_ref_put(proxy)); 866 proxy = NULL; 867 } 868 goto exit; 869 } 870 871 res_T 872 ssp_rng_proxy_read(struct ssp_rng_proxy* proxy, FILE* stream) 873 { 874 res_T res = RES_OK; 875 if(!proxy || !stream) return RES_BAD_ARG; 876 877 mutex_lock(proxy->mutex); 878 res = ssp_rng_read(proxy->rng, stream); 879 mutex_unlock(proxy->mutex); 880 if(res != RES_OK) return res; 881 882 /* Discard the cached RNG states */ 883 res = rng_proxy_clear_caches(proxy); 884 if(res != RES_OK) return res; 885 886 /* Notify to bucket RNGs that the proxy RNG state was updated */ 887 SIG_BROADCAST 888 (proxy->signals+RNG_PROXY_SIG_SET_STATE, rng_proxy_cb_T, ARG1(proxy)); 889 return RES_OK; 890 } 891 892 res_T 893 ssp_rng_proxy_write(const struct ssp_rng_proxy* proxy, FILE* stream) 894 { 895 res_T res = RES_OK; 896 897 if(!proxy || !stream) return RES_BAD_ARG; 898 899 mutex_lock(proxy->mutex); 900 res = ssp_rng_write(proxy->rng, stream); 901 mutex_unlock(proxy->mutex); 902 return res; 903 } 904 905 res_T 906 ssp_rng_proxy_ref_get(struct ssp_rng_proxy* proxy) 907 { 908 if(!proxy) return RES_BAD_ARG; 909 ref_get(&proxy->ref); 910 return RES_OK; 911 } 912 913 res_T 914 ssp_rng_proxy_ref_put(struct ssp_rng_proxy* proxy) 915 { 916 if(!proxy) return RES_BAD_ARG; 917 ref_put(&proxy->ref, rng_proxy_release); 918 return RES_OK; 919 } 920 921 res_T 922 ssp_rng_proxy_create_rng 923 (struct ssp_rng_proxy* proxy, 924 const size_t ibucket, 925 struct ssp_rng** out_rng) 926 { 927 struct ssp_rng* rng = NULL; 928 struct rng_desc desc = RNG_BUCKET_NULL; 929 struct rng_bucket* bucket = NULL; 930 res_T res = RES_OK; 931 932 if(!proxy || ibucket >= sa_size(proxy->buckets) || !out_rng) { 933 res = RES_BAD_ARG; 934 goto error; 935 } 936 937 if(ATOMIC_CAS(&proxy->buckets[ibucket], 1, 0) == 1) { 938 res = RES_BAD_ARG; 939 goto error; 940 } 941 942 /* Update the dummy rng bucket min/max value with the min/max values of the 943 * RNG desc on which the proxy relies */ 944 desc.min = proxy->rng->desc.min; 945 desc.max = proxy->rng->desc.max; 946 947 res = rng_create(proxy->allocator, &desc, &rng); 948 if(res != RES_OK) goto error; 949 rng->type = proxy->type; 950 bucket = (struct rng_bucket*)rng->state; 951 bucket->name = ibucket; 952 bucket->proxy = proxy; 953 SSP(rng_proxy_ref_get(proxy)); 954 955 /* The bucket RNG listens the "write" signal of the proxy to reset its 956 * internal RNs counter on "write" invocation. */ 957 CLBK_INIT(&bucket->cb_on_proxy_set_state); 958 CLBK_SETUP 959 (&bucket->cb_on_proxy_set_state, rng_bucket_on_proxy_set_state, bucket); 960 SIG_CONNECT_CLBK 961 (proxy->signals+RNG_PROXY_SIG_SET_STATE, &bucket->cb_on_proxy_set_state); 962 963 exit: 964 if(out_rng) *out_rng = rng; 965 return res; 966 error: 967 if(rng) { 968 SSP(rng_ref_put(rng)); 969 rng = NULL; 970 } 971 goto exit; 972 } 973 974 res_T 975 ssp_rng_proxy_get_type 976 (const struct ssp_rng_proxy* proxy, 977 enum ssp_rng_type* type) 978 { 979 if(!proxy || !type) return RES_BAD_ARG; 980 *type = proxy->type; 981 return RES_OK; 982 } 983 984 res_T 985 ssp_rng_proxy_get_sequence_id(const struct ssp_rng_proxy* proxy, size_t* out_id) 986 { 987 size_t id = SSP_SEQUENCE_ID_NONE; 988 size_t i; 989 990 if(!proxy || !out_id) return RES_BAD_ARG; 991 992 mutex_lock(proxy->mutex); 993 FOR_EACH(i, 0, sa_size(proxy->per_bucket_sequence_id)) { 994 if(proxy->per_bucket_sequence_id[i] == SSP_SEQUENCE_ID_NONE) continue; 995 id = id == SSP_SEQUENCE_ID_NONE 996 ? proxy->per_bucket_sequence_id[i] 997 : MMAX(id, proxy->per_bucket_sequence_id[i]); 998 } 999 mutex_unlock(proxy->mutex); 1000 1001 *out_id = id; 1002 return RES_OK; 1003 } 1004 1005 res_T 1006 ssp_rng_proxy_flush_sequences 1007 (struct ssp_rng_proxy* proxy, 1008 const size_t nseqs) 1009 { 1010 size_t nseqs_proxy = 0; 1011 size_t iseq; 1012 size_t i; 1013 res_T res = RES_OK; 1014 1015 if(!proxy) { 1016 res = RES_BAD_ARG; 1017 goto error; 1018 } 1019 1020 /* Nothing to discard */ 1021 if(nseqs == 0) goto exit; 1022 1023 res = ssp_rng_proxy_get_sequence_id(proxy, &iseq); 1024 if(res != RES_OK) goto error; 1025 1026 /* Calculate the number of sequences to flush for the main RNG, i.e. the one 1027 * used when the cache is in use. We want to dump the 'nseqs' sequences. That 1028 * said, since the status of the RNG is set to the status of the 1st random 1029 * number of the next sequence, it is enough to 'nseqs-1' sequences and clear 1030 * the cache. Anyway, note that the sequence identifier of the main RNG may 1031 * be behind the global sequence identifier. This happens when the cache 1032 * mechanism is no longer used. In this case, we need to dump the extra 1033 * sequences from the main RNG to match the current sequence index of the 1034 * proxy. */ 1035 nseqs_proxy = (nseqs - 1) + (iseq - proxy->main_sequence_id); 1036 1037 mutex_lock(proxy->mutex); 1038 res = ssp_rng_discard(proxy->rng, proxy->sequence_size * nseqs_proxy); 1039 mutex_unlock(proxy->mutex); 1040 if(res != RES_OK) goto error; 1041 1042 proxy->main_sequence_id += nseqs_proxy; 1043 1044 /* Discard the cached RNG states */ 1045 rng_proxy_clear_caches(proxy); 1046 1047 /* Reset the RNGs sequence id */ 1048 FOR_EACH(i, 0, sa_size(proxy->per_bucket_sequence_id)) { 1049 proxy->per_bucket_sequence_id[i] = proxy->main_sequence_id; 1050 } 1051 1052 /* Notify to bucket RNGs that the proxy RNG state was updated */ 1053 SIG_BROADCAST 1054 (proxy->signals+RNG_PROXY_SIG_SET_STATE, rng_proxy_cb_T, ARG1(proxy)); 1055 1056 exit: 1057 return res; 1058 error: 1059 goto exit; 1060 }