htrdr

Solving radiative transfer in heterogeneous media
git clone git://git.meso-star.fr/htrdr.git
Log | Files | Refs | README | LICENSE

commit 50bcc00519b822cf6aa8f3264ef00790dd9013c1
parent e950ac33f9696160be7c30e1b4080563a1b446a6
Author: Vincent Forest <vincent.forest@meso-star.com>
Date:   Thu, 13 Feb 2025 12:48:22 +0100

core: refactoring the distribution of MPI work

Prepare to add a solve function that will not distribute rendering but
more generic calculations.

The distribution of MPI work thus becomes an internal API of the core
library. What's more, it no longer manages tiles, but chunks, a more
agnostic term. The chunk is identified by a 64-bit unsigned integer,
whereas previously a tile was encoded on 32 bits. Again, this is to make
the system more generic: tiles were a set of pixels in an image, whereas
chunks can be anything (e.g. a set of realisations), and their number
could therefore exceed a 32-bit encoding.

Diffstat:
MMakefile | 1+
Msrc/core/htrdr_draw_map.c | 246++++---------------------------------------------------------------------------
Asrc/core/htrdr_proc_work.c | 268+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/core/htrdr_proc_work.h | 94+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 375 insertions(+), 234 deletions(-)

diff --git a/Makefile b/Makefile @@ -217,6 +217,7 @@ CORE_SRC =\ src/core/htrdr_geometry.c\ src/core/htrdr_log.c\ src/core/htrdr_materials.c\ + src/core/htrdr_proc_work.c\ src/core/htrdr_ran_wlen_cie_xyz.c\ src/core/htrdr_ran_wlen_discrete.c\ src/core/htrdr_ran_wlen_planck.c\ diff --git a/src/core/htrdr_draw_map.c b/src/core/htrdr_draw_map.c @@ -21,17 +21,15 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#define _POSIX_C_SOURCE 200112L /* nanosleep */ - #include "core/htrdr.h" #include "core/htrdr_c.h" #include "core/htrdr_buffer.h" #include "core/htrdr_draw_map.h" #include "core/htrdr_log.h" +#include "core/htrdr_proc_work.h" #include <rsys/clock_time.h> #include <rsys/cstr.h> -#include <rsys/dynamic_array_u32.h> #include <rsys/list.h> #include <rsys/math.h> #include <rsys/morton.h> @@ -42,12 +40,10 @@ #include <omp.h> #include <mpi.h> -#include <time.h> #include <unistd.h> #define RNG_SEQUENCE_SIZE 10000 -#define TILE_MCODE_NULL UINT32_MAX #define TILE_SIZE 8 /* Definition in X & Y of a tile */ STATIC_ASSERT(IS_POW2(TILE_SIZE), TILE_SIZE_must_be_a_power_of_2); @@ -58,7 +54,7 @@ struct tile { ref_T ref; struct tile_data { - size_t pixsz; /* Sizeof on pixel */ + size_t pixsz; /* Size of a pixel */ size_t pixal; /* Pixel alignment */ uint16_t x, y; /* 2D coordinates of the tile in tile space */ /* Simulate the flexible array member of the C99 standard */ @@ -66,13 +62,6 @@ struct tile { } data; }; -/* List of tile to compute onto the MPI process. */ -struct proc_work { - struct mutex* mutex; - struct darray_u32 tiles; /* #tiles to render */ - size_t itile; /* Next tile to render in the above list of tiles */ -}; - /******************************************************************************* * Helper functions ******************************************************************************/ @@ -190,218 +179,6 @@ write_tile_data } } -static INLINE void -proc_work_init(struct mem_allocator* allocator, struct proc_work* work) -{ - ASSERT(work); - darray_u32_init(allocator, &work->tiles); - work->itile = 0; - CHK(work->mutex = mutex_create()); -} - -static INLINE void -proc_work_release(struct proc_work* work) -{ - darray_u32_release(&work->tiles); - mutex_destroy(work->mutex); -} - -static INLINE void -proc_work_reset(struct proc_work* work) -{ - ASSERT(work); - mutex_lock(work->mutex); - darray_u32_clear(&work->tiles); - work->itile = 0; - mutex_unlock(work->mutex); -} - -static INLINE void -proc_work_add_tile(struct proc_work* work, const uint32_t mcode) -{ - mutex_lock(work->mutex); - CHK(darray_u32_push_back(&work->tiles, &mcode) == RES_OK); - mutex_unlock(work->mutex); -} - -static INLINE uint32_t -proc_work_get_tile(struct proc_work* work) -{ - uint32_t mcode; - ASSERT(work); - mutex_lock(work->mutex); - if(work->itile >= darray_u32_size_get(&work->tiles)) { - mcode = TILE_MCODE_NULL; - } else { - mcode = darray_u32_cdata_get(&work->tiles)[work->itile]; - ++work->itile; - } - mutex_unlock(work->mutex); - return mcode; -} - -static INLINE size_t -proc_work_get_ntiles(struct proc_work* work) -{ - size_t sz = 0; - ASSERT(work); - mutex_lock(work->mutex); - sz = darray_u32_size_get(&work->tiles); - mutex_unlock(work->mutex); - return sz; -} - -static void -mpi_wait_for_request(struct htrdr* htrdr, MPI_Request* req) -{ - ASSERT(htrdr && req); - - /* Wait for process synchronisation */ - for(;;) { - struct timespec t; - int complete; - t.tv_sec = 0; - t.tv_nsec = 10000000; /* 10ms */ - - mutex_lock(htrdr->mpi_mutex); - MPI(Test(req, &complete, MPI_STATUS_IGNORE)); - mutex_unlock(htrdr->mpi_mutex); - if(complete) break; - - nanosleep(&t, NULL); - } -} - -static void -mpi_probe_thieves - (struct htrdr* htrdr, - struct proc_work* work, - ATOMIC* probe_thieves) -{ - uint32_t tiles[UINT8_MAX]; - struct timespec t; - ASSERT(htrdr && work && probe_thieves); - - if(htrdr->mpi_nprocs == 1) /* The process is alone. No thief is possible */ - return; - - t.tv_sec = 0; - - /* Protect MPI calls of multiple invocations from concurrent threads */ - #define P_MPI(Func) { \ - mutex_lock(htrdr->mpi_mutex); \ - MPI(Func); \ - mutex_unlock(htrdr->mpi_mutex); \ - } (void)0 - - while(ATOMIC_GET(probe_thieves)) { - MPI_Status status; - size_t itile; - int msg; - - /* Probe if a steal request was submitted by any processes */ - P_MPI(Iprobe(MPI_ANY_SOURCE, HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD, &msg, - &status)); - - if(msg) { /* A steal request was posted */ - MPI_Request req; - uint8_t ntiles_to_steal; - - /* Asynchronously receive the steal request */ - P_MPI(Irecv(&ntiles_to_steal, 1, MPI_UINT8_T, status.MPI_SOURCE, - HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD, &req)); - - /* Wait for the completion of the steal request */ - mpi_wait_for_request(htrdr, &req); - - /* Thief some tiles */ - FOR_EACH(itile, 0, ntiles_to_steal) { - tiles[itile] = proc_work_get_tile(work); - } - P_MPI(Send(&tiles, ntiles_to_steal, MPI_UINT32_T, status.MPI_SOURCE, - HTRDR_MPI_WORK_STEALING, MPI_COMM_WORLD)); - } - t.tv_nsec = 500000000; /* 500ms */ - nanosleep(&t, NULL); - } - #undef P_MPI -} - -static int -mpi_sample_working_process(struct htrdr* htrdr, struct ssp_rng* rng) -{ - int iproc, i; - int dst_rank; - ASSERT(htrdr && rng && htrdr->mpi_nworking_procs); - - /* Sample the index of the 1st active process */ - iproc = (int)(ssp_rng_canonical(rng) * (double)htrdr->mpi_nworking_procs); - - /* Find the rank of the sampled active process. Use a simple linear search - * since the overall number of processes should be quite low; at most few - * dozens. */ - i = 0; - FOR_EACH(dst_rank, 0, htrdr->mpi_nprocs) { - if(htrdr->mpi_working_procs[dst_rank] == 0) continue; /* Inactive process */ - if(i == iproc) break; /* The rank of the sampled process is found */ - ++i; - } - ASSERT(dst_rank < htrdr->mpi_nprocs); - return dst_rank; -} - -/* Return the number of stolen tiles */ -static size_t -mpi_steal_work - (struct htrdr* htrdr, - struct ssp_rng* rng, - struct proc_work* work) -{ - MPI_Request req; - size_t itile; - size_t nthieves = 0; - uint32_t tiles[UINT8_MAX]; /* Morton code of the stolen tile */ - int proc_to_steal; /* Process to steal */ - uint8_t ntiles_to_steal = MMIN((uint8_t)(htrdr->nthreads*2), 16); - ASSERT(htrdr && rng && work && htrdr->nthreads < UINT8_MAX); - - /* Protect MPI calls of multiple invocations from concurrent threads */ - #define P_MPI(Func) { \ - mutex_lock(htrdr->mpi_mutex); \ - MPI(Func); \ - mutex_unlock(htrdr->mpi_mutex); \ - } (void)0 - - /* No more working process => nohting to steal */ - if(!htrdr->mpi_nworking_procs) return 0; - - /* Sample a process to steal */ - proc_to_steal = mpi_sample_working_process(htrdr, rng); - - /* Send a steal request to the sampled process and wait for a response */ - P_MPI(Send(&ntiles_to_steal, 1, MPI_UINT8_T, proc_to_steal, - HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD)); - - /* Receive the stolen tile from the sampled process */ - P_MPI(Irecv(tiles, ntiles_to_steal, MPI_UINT32_T, proc_to_steal, - HTRDR_MPI_WORK_STEALING, MPI_COMM_WORLD, &req)); - - mpi_wait_for_request(htrdr, &req); - - FOR_EACH(itile, 0, ntiles_to_steal) { - if(tiles[itile] == TILE_MCODE_NULL) { - ASSERT(htrdr->mpi_working_procs[proc_to_steal] != 0); - htrdr->mpi_working_procs[proc_to_steal] = 0; - htrdr->mpi_nworking_procs--; - break; - } - proc_work_add_tile(work, tiles[itile]); - ++nthieves; - } - #undef P_MPI - return nthieves; -} - static res_T mpi_gather_tiles (struct htrdr* htrdr, @@ -567,7 +344,7 @@ draw_map goto error; } - proc_ntiles = proc_work_get_ntiles(work); + proc_ntiles = proc_work_get_nchunks(work); nthreads = MMIN(htrdr->nthreads, proc_ntiles); /* The process is not considered as a working process for himself */ @@ -583,7 +360,7 @@ draw_map struct ssp_rng_proxy* rng_proxy = NULL; struct ssp_rng* rng; struct tile* tile; - uint32_t mcode = TILE_MCODE_NULL; + uint64_t mcode = CHUNK_ID_NULL; uint16_t tile_org[2]; size_t tile_sz[2]; size_t n; @@ -593,20 +370,21 @@ draw_map /* Get a tile to draw */ #pragma omp critical { - mcode = proc_work_get_tile(work); - if(mcode == TILE_MCODE_NULL) { /* No more work on this process */ + mcode = proc_work_get_chunk(work); + if(mcode == CHUNK_ID_NULL) { /* No more work on this process */ /* Try to steal works to concurrent processes */ proc_work_reset(work); nthieves = mpi_steal_work(htrdr, rng_proc, work); if(nthieves != 0) { - mcode = proc_work_get_tile(work); + mcode = proc_work_get_chunk(work); } } } - if(mcode == TILE_MCODE_NULL) break; /* No more work */ + if(mcode == CHUNK_ID_NULL) break; /* No more work */ + ASSERT(mcode <= UINT32_MAX); /* Decode the morton code to retrieve the tile index */ - morton_xy_decode_u16(mcode, tile_org); + morton_xy_decode_u16((uint32_t)mcode, tile_org); ASSERT(tile_org[0] < ntiles_x && tile_org[1] < ntiles_y); /* Create the tile */ @@ -654,7 +432,7 @@ draw_map SSP(rng_proxy_create_rng(rng_proxy, 0, &rng)); /* Launch the tile rendering */ - res_local = draw_tile(htrdr, args, (size_t)ithread, mcode, + res_local = draw_tile(htrdr, args, (size_t)ithread, (uint32_t)mcode, tile_org, tile_sz, pix_sz, rng, tile); SSP(rng_proxy_ref_put(rng_proxy)); @@ -772,7 +550,7 @@ htrdr_draw_map morton_xy_decode_u16(mcode, tile_org); if(tile_org[0] >= ntiles_x || tile_org[1] >= ntiles_y) continue; - proc_work_add_tile(&work, mcode); + proc_work_add_chunk(&work, mcode); } if(htrdr->mpi_rank == 0) { diff --git a/src/core/htrdr_proc_work.c b/src/core/htrdr_proc_work.c @@ -0,0 +1,268 @@ +/* Copyright (C) 2018-2019, 2022-2025 Centre National de la Recherche Scientifique + * Copyright (C) 2020-2022 Institut Mines Télécom Albi-Carmaux + * Copyright (C) 2022-2025 Institut Pierre-Simon Laplace + * Copyright (C) 2022-2025 Institut de Physique du Globe de Paris + * Copyright (C) 2018-2025 |Méso|Star> (contact@meso-star.com) + * Copyright (C) 2022-2025 Observatoire de Paris + * Copyright (C) 2022-2025 Université de Reims Champagne-Ardenne + * Copyright (C) 2022-2025 Université de Versaille Saint-Quentin + * Copyright (C) 2018-2019, 2022-2025 Université Paul Sabatier + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. */ + +#define _POSIX_C_SOURCE 200112L /* nanosleep */ + +#include "core/htrdr_c.h" +#include "core/htrdr_proc_work.h" + +#include <star/ssp.h> + +#include <rsys/mutex.h> + +#include <time.h> + +/******************************************************************************* + * Helper functions + ******************************************************************************/ +/* Return the rank of a working process */ +static int +sample_working_process(struct htrdr* htrdr, struct ssp_rng* rng) +{ + int iproc, i; + int dst_rank; + ASSERT(htrdr && rng && htrdr->mpi_nworking_procs); + + /* Sample the index of the 1st active process */ + iproc = (int)(ssp_rng_canonical(rng) * (double)htrdr->mpi_nworking_procs); + + /* Find the rank of the sampled active process. Use a simple linear search + * since the overall number of processes should be quite low; at most few + * dozens. */ + i = 0; + FOR_EACH(dst_rank, 0, htrdr->mpi_nprocs) { + if(htrdr->mpi_working_procs[dst_rank] == 0) continue; /* Inactive process */ + if(i == iproc) break; /* The rank of the sampled process is found */ + ++i; + } + ASSERT(dst_rank < htrdr->mpi_nprocs); + return dst_rank; +} + + +/******************************************************************************* + * Local functions + ******************************************************************************/ +void +proc_work_init(struct mem_allocator* allocator, struct proc_work* work) +{ + ASSERT(work); + darray_u64_init(allocator, &work->chunks); + work->index = 0; + CHK(work->mutex = mutex_create()); +} + +void +proc_work_release(struct proc_work* work) +{ + darray_u64_release(&work->chunks); + mutex_destroy(work->mutex); +} + +void +proc_work_reset(struct proc_work* work) +{ + ASSERT(work); + mutex_lock(work->mutex); + darray_u64_clear(&work->chunks); + work->index = 0; + mutex_unlock(work->mutex); +} + +void +proc_work_add_chunk(struct proc_work* work, const size_t ichunk) +{ + mutex_lock(work->mutex); + CHK(darray_u64_push_back(&work->chunks, &ichunk) == RES_OK); + mutex_unlock(work->mutex); +} + +uint64_t +proc_work_get_chunk(struct proc_work* work) +{ + uint64_t ichunk = CHUNK_ID_NULL; + ASSERT(work); + + mutex_lock(work->mutex); + if(work->index >= darray_u64_size_get(&work->chunks)) { + ichunk = CHUNK_ID_NULL; + } else { + ichunk = darray_u64_cdata_get(&work->chunks)[work->index]; + ++work->index; + } + mutex_unlock(work->mutex); + return ichunk; +} + +size_t +proc_work_get_nchunks(struct proc_work* work) +{ + size_t sz = 0; + ASSERT(work); + + mutex_lock(work->mutex); + sz = darray_u64_size_get(&work->chunks); + mutex_unlock(work->mutex); + return sz; +} + +void +mpi_wait_for_request(struct htrdr* htrdr, MPI_Request* req) +{ + ASSERT(htrdr && req); + + /* Wait for process synchronisation */ + for(;;) { + struct timespec t; + int complete; + t.tv_sec = 0; + t.tv_nsec = 10000000; /* 10ms */ + + mutex_lock(htrdr->mpi_mutex); + MPI(Test(req, &complete, MPI_STATUS_IGNORE)); + mutex_unlock(htrdr->mpi_mutex); + if(complete) break; + + nanosleep(&t, NULL); + } +} + +void +mpi_probe_thieves + (struct htrdr* htrdr, + struct proc_work* work, + ATOMIC* probe_thieves) +{ + uint64_t chunks[UINT8_MAX]; + struct timespec t; + ASSERT(htrdr && work && probe_thieves); + + if(htrdr->mpi_nprocs == 1) /* The process is alone. No thief is possible */ + return; + + t.tv_sec = 0; + + /* Protect MPI calls of multiple invocations from concurrent threads */ + #define P_MPI(Func) { \ + mutex_lock(htrdr->mpi_mutex); \ + MPI(Func); \ + mutex_unlock(htrdr->mpi_mutex); \ + } (void)0 + + while(ATOMIC_GET(probe_thieves)) { + MPI_Status status; + uint8_t i = 0; + int msg = 0; + + /* Probe if a steal request was submitted by any processes */ + P_MPI(Iprobe(MPI_ANY_SOURCE, HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD, &msg, + &status)); + + if(msg) { /* A steal request was posted */ + MPI_Request req; + uint8_t nchunks_to_steal; + + /* Asynchronously receive the steal request */ + P_MPI(Irecv(&nchunks_to_steal, 1, MPI_UINT8_T, status.MPI_SOURCE, + HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD, &req)); + + /* Wait for the completion of the steal request */ + mpi_wait_for_request(htrdr, &req); + + /* Thief some chunks */ + FOR_EACH(i, 0, nchunks_to_steal) { + chunks[i] = proc_work_get_chunk(work); + } + P_MPI(Send(&chunks, nchunks_to_steal, MPI_UINT64_T, status.MPI_SOURCE, + HTRDR_MPI_WORK_STEALING, MPI_COMM_WORLD)); + } + + /* Don't constantly check for thieves */ + t.tv_nsec = 500000000; /* 500ms */ + nanosleep(&t, NULL); + } + #undef P_MPI +} + +/* Return the number of stolen tiles */ +size_t +mpi_steal_work + (struct htrdr* htrdr, + struct ssp_rng* rng, + struct proc_work* work) +{ + MPI_Request req; + size_t nthieves = 0; + uint64_t chunks[UINT8_MAX]; /* Index of the stolen chunks */ + int proc_to_steal; /* Rank of the process to steal */ + + /* Empircally set the number of chunks to steal */ + const uint8_t nchunks_to_steal = MMIN((uint8_t)(htrdr->nthreads*2), 16); + uint8_t i = 0; + + ASSERT(htrdr && rng && work && htrdr->nthreads < UINT8_MAX); + + /* Protect MPI calls of multiple invocations from concurrent threads */ + #define P_MPI(Func) { \ + mutex_lock(htrdr->mpi_mutex); \ + MPI(Func); \ + mutex_unlock(htrdr->mpi_mutex); \ + } (void)0 + + /* No more working process => nothing to steal */ + if(!htrdr->mpi_nworking_procs) return 0; + + /* Sample a process to steal */ + proc_to_steal = sample_working_process(htrdr, rng); + + /* Send a steal request to the sampled process and wait for a response */ + P_MPI(Send(&nchunks_to_steal, 1, MPI_UINT8_T, proc_to_steal, + HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD)); + + /* Receive the stolen chunks from the sampled process */ + P_MPI(Irecv(chunks, nchunks_to_steal, MPI_UINT64_T, proc_to_steal, + HTRDR_MPI_WORK_STEALING, MPI_COMM_WORLD, &req)); + + mpi_wait_for_request(htrdr, &req); + + FOR_EACH(i, 0, nchunks_to_steal) { + + if(chunks[i] != CHUNK_ID_NULL) { + /* Save stolen chunk in job list */ + proc_work_add_chunk(work, chunks[i]); + ++nthieves; + + } else { + /* The process has returned at least one invalid chunk, + * i.e. it has nothing further to do. + * Remove it from the working process */ + ASSERT(htrdr->mpi_working_procs[proc_to_steal] != 0); + htrdr->mpi_working_procs[proc_to_steal] = 0; + htrdr->mpi_nworking_procs--; + + break; /* No more to steal */ + } + } + #undef P_MPI + return nthieves; +} diff --git a/src/core/htrdr_proc_work.h b/src/core/htrdr_proc_work.h @@ -0,0 +1,94 @@ +/* Copyright (C) 2018-2019, 2022-2025 Centre National de la Recherche Scientifique + * Copyright (C) 2020-2022 Institut Mines Télécom Albi-Carmaux + * Copyright (C) 2022-2025 Institut Pierre-Simon Laplace + * Copyright (C) 2022-2025 Institut de Physique du Globe de Paris + * Copyright (C) 2018-2025 |Méso|Star> (contact@meso-star.com) + * Copyright (C) 2022-2025 Observatoire de Paris + * Copyright (C) 2022-2025 Université de Reims Champagne-Ardenne + * Copyright (C) 2022-2025 Université de Versaille Saint-Quentin + * Copyright (C) 2018-2019, 2022-2025 Université Paul Sabatier + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. */ + +#ifndef HTRDR_PROC_WORK_H +#define HTRDR_PROC_WORK_H + +#include <rsys/dynamic_array_u64.h> + +#include <mpi.h> + +#define CHUNK_ID_NULL UINT64_MAX + +/* Forward declarations */ +struct mutex; +struct ssp_rng; + +/* List of chunks to compute onto the MPI process */ +struct proc_work { + struct mutex* mutex; + struct darray_u64 chunks; /* #chunks to solve */ + uint64_t index; /* Next chunk to solve in the above list of chunks */ +}; + +extern LOCAL_SYM void +proc_work_init + (struct mem_allocator* allocator, + struct proc_work* work); + +extern LOCAL_SYM void +proc_work_release + (struct proc_work* work); + +extern LOCAL_SYM void +proc_work_reset + (struct proc_work* work); + +extern LOCAL_SYM void +proc_work_add_chunk + (struct proc_work* work, + const size_t ichunk); + +/* Return the index of the next chunk to be processed */ +extern LOCAL_SYM uint64_t +proc_work_get_chunk + (struct proc_work* work); + +extern LOCAL_SYM uint64_t +proc_work_get_nchunks + (struct proc_work* work); + +/* Wait for the completion of an MPI request */ +extern LOCAL_SYM void +mpi_wait_for_request + (struct htrdr* htrdr, + MPI_Request* req); + +/* Active polling of the "steal" request submitted by other processes to + * relieve the current process of the chunks assigned to it. + * The function runs until probe_thieves is set to 0 by the caller. */ +extern LOCAL_SYM void +mpi_probe_thieves + (struct htrdr* htrdr, + struct proc_work* work, + ATOMIC* probe_thieves); + +/* Unload a working process by taking chunks from it, + * i.e. submit a steal request and wait for it to be honored */ +extern LOCAL_SYM size_t +mpi_steal_work + (struct htrdr* htrdr, + struct ssp_rng* rng, + struct proc_work* work); + +#endif /* HTRDR_PROC_WORK_H */