htrdr_proc_work.c (8079B)
1 /* Copyright (C) 2018-2019, 2022-2025 Centre National de la Recherche Scientifique 2 * Copyright (C) 2020-2022 Institut Mines Télécom Albi-Carmaux 3 * Copyright (C) 2022-2025 Institut Pierre-Simon Laplace 4 * Copyright (C) 2022-2025 Institut de Physique du Globe de Paris 5 * Copyright (C) 2018-2025 |Méso|Star> (contact@meso-star.com) 6 * Copyright (C) 2022-2025 Observatoire de Paris 7 * Copyright (C) 2022-2025 Université de Reims Champagne-Ardenne 8 * Copyright (C) 2022-2025 Université de Versaille Saint-Quentin 9 * Copyright (C) 2018-2019, 2022-2025 Université Paul Sabatier 10 * 11 * This program is free software: you can redistribute it and/or modify 12 * it under the terms of the GNU General Public License as published by 13 * the Free Software Foundation, either version 3 of the License, or 14 * (at your option) any later version. 15 * 16 * This program is distributed in the hope that it will be useful, 17 * but WITHOUT ANY WARRANTY; without even the implied warranty of 18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19 * GNU General Public License for more details. 20 * 21 * You should have received a copy of the GNU General Public License 22 * along with this program. If not, see <http://www.gnu.org/licenses/>. */ 23 24 #define _POSIX_C_SOURCE 200112L /* nanosleep */ 25 26 #include "core/htrdr_c.h" 27 #include "core/htrdr_proc_work.h" 28 29 #include <star/ssp.h> 30 31 #include <rsys/mutex.h> 32 33 #include <time.h> 34 35 /******************************************************************************* 36 * Helper functions 37 ******************************************************************************/ 38 /* Return the rank of a working process */ 39 static int 40 sample_working_process(struct htrdr* htrdr, struct ssp_rng* rng) 41 { 42 int iproc, i; 43 int dst_rank; 44 ASSERT(htrdr && rng && htrdr->mpi_nworking_procs); 45 46 /* Sample the index of the 1st active process */ 47 iproc = (int)(ssp_rng_canonical(rng) * (double)htrdr->mpi_nworking_procs); 48 49 /* Find the rank of the sampled active process. Use a simple linear search 50 * since the overall number of processes should be quite low; at most few 51 * dozens. */ 52 i = 0; 53 FOR_EACH(dst_rank, 0, htrdr->mpi_nprocs) { 54 if(htrdr->mpi_working_procs[dst_rank] == 0) continue; /* Inactive process */ 55 if(i == iproc) break; /* The rank of the sampled process is found */ 56 ++i; 57 } 58 ASSERT(dst_rank < htrdr->mpi_nprocs); 59 return dst_rank; 60 } 61 62 /******************************************************************************* 63 * Local functions 64 ******************************************************************************/ 65 void 66 proc_work_init(struct mem_allocator* allocator, struct proc_work* work) 67 { 68 ASSERT(work); 69 darray_u64_init(allocator, &work->chunks); 70 work->index = 0; 71 CHK(work->mutex = mutex_create()); 72 } 73 74 void 75 proc_work_release(struct proc_work* work) 76 { 77 darray_u64_release(&work->chunks); 78 mutex_destroy(work->mutex); 79 } 80 81 void 82 proc_work_reset(struct proc_work* work) 83 { 84 ASSERT(work); 85 mutex_lock(work->mutex); 86 darray_u64_clear(&work->chunks); 87 work->index = 0; 88 mutex_unlock(work->mutex); 89 } 90 91 void 92 proc_work_add_chunk(struct proc_work* work, const uint64_t ichunk) 93 { 94 mutex_lock(work->mutex); 95 CHK(darray_u64_push_back(&work->chunks, &ichunk) == RES_OK); 96 mutex_unlock(work->mutex); 97 } 98 99 uint64_t 100 proc_work_get_chunk(struct proc_work* work) 101 { 102 uint64_t ichunk = CHUNK_ID_NULL; 103 ASSERT(work); 104 105 mutex_lock(work->mutex); 106 if(work->index >= darray_u64_size_get(&work->chunks)) { 107 ichunk = CHUNK_ID_NULL; 108 } else { 109 ichunk = darray_u64_cdata_get(&work->chunks)[work->index]; 110 ++work->index; 111 } 112 mutex_unlock(work->mutex); 113 return ichunk; 114 } 115 116 size_t 117 proc_work_get_nchunks(struct proc_work* work) 118 { 119 size_t sz = 0; 120 ASSERT(work); 121 122 mutex_lock(work->mutex); 123 sz = darray_u64_size_get(&work->chunks); 124 mutex_unlock(work->mutex); 125 return sz; 126 } 127 128 void 129 mpi_wait_for_request(struct htrdr* htrdr, MPI_Request* req) 130 { 131 ASSERT(htrdr && req); 132 133 /* Wait for process synchronisation */ 134 for(;;) { 135 struct timespec t; 136 int complete; 137 t.tv_sec = 0; 138 t.tv_nsec = 10000000; /* 10ms */ 139 140 mutex_lock(htrdr->mpi_mutex); 141 MPI(Test(req, &complete, MPI_STATUS_IGNORE)); 142 mutex_unlock(htrdr->mpi_mutex); 143 if(complete) break; 144 145 nanosleep(&t, NULL); 146 } 147 } 148 149 void 150 mpi_probe_thieves 151 (struct htrdr* htrdr, 152 struct proc_work* work, 153 ATOMIC* probe_thieves) 154 { 155 uint64_t chunks[UINT8_MAX]; 156 struct timespec t; 157 ASSERT(htrdr && work && probe_thieves); 158 159 if(htrdr->mpi_nprocs == 1) /* The process is alone. No thief is possible */ 160 return; 161 162 t.tv_sec = 0; 163 164 /* Protect MPI calls of multiple invocations from concurrent threads */ 165 #define P_MPI(Func) { \ 166 mutex_lock(htrdr->mpi_mutex); \ 167 MPI(Func); \ 168 mutex_unlock(htrdr->mpi_mutex); \ 169 } (void)0 170 171 while(ATOMIC_GET(probe_thieves)) { 172 MPI_Status status; 173 uint8_t i = 0; 174 int msg = 0; 175 176 /* Probe if a steal request was submitted by any processes */ 177 P_MPI(Iprobe(MPI_ANY_SOURCE, HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD, &msg, 178 &status)); 179 180 if(msg) { /* A steal request was posted */ 181 MPI_Request req; 182 uint8_t nchunks_to_steal; 183 184 /* Asynchronously receive the steal request */ 185 P_MPI(Irecv(&nchunks_to_steal, 1, MPI_UINT8_T, status.MPI_SOURCE, 186 HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD, &req)); 187 188 /* Wait for the completion of the steal request */ 189 mpi_wait_for_request(htrdr, &req); 190 191 /* Thief some chunks */ 192 FOR_EACH(i, 0, nchunks_to_steal) { 193 chunks[i] = proc_work_get_chunk(work); 194 } 195 P_MPI(Send(&chunks, nchunks_to_steal, MPI_UINT64_T, status.MPI_SOURCE, 196 HTRDR_MPI_WORK_STEALING, MPI_COMM_WORLD)); 197 } 198 199 /* Don't constantly check for thieves */ 200 t.tv_nsec = 10000000; /* 10ms */ 201 nanosleep(&t, NULL); 202 } 203 #undef P_MPI 204 } 205 206 /* Return the number of stolen tiles */ 207 size_t 208 mpi_steal_work 209 (struct htrdr* htrdr, 210 struct ssp_rng* rng, 211 struct proc_work* work) 212 { 213 MPI_Request req; 214 size_t nthieves = 0; 215 uint64_t chunks[UINT8_MAX]; /* Index of the stolen chunks */ 216 int proc_to_steal; /* Rank of the process to steal */ 217 218 /* Empircally set the number of chunks to steal */ 219 const uint8_t nchunks_to_steal = MMIN((uint8_t)(htrdr->nthreads*4), 32); 220 uint8_t i = 0; 221 222 ASSERT(htrdr && rng && work && htrdr->nthreads < UINT8_MAX); 223 224 /* Protect MPI calls of multiple invocations from concurrent threads */ 225 #define P_MPI(Func) { \ 226 mutex_lock(htrdr->mpi_mutex); \ 227 MPI(Func); \ 228 mutex_unlock(htrdr->mpi_mutex); \ 229 } (void)0 230 231 /* No more working process => nothing to steal */ 232 if(!htrdr->mpi_nworking_procs) return 0; 233 234 /* Sample a process to steal */ 235 proc_to_steal = sample_working_process(htrdr, rng); 236 237 /* Send a steal request to the sampled process and wait for a response */ 238 P_MPI(Send(&nchunks_to_steal, 1, MPI_UINT8_T, proc_to_steal, 239 HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD)); 240 241 /* Receive the stolen chunks from the sampled process */ 242 P_MPI(Irecv(chunks, nchunks_to_steal, MPI_UINT64_T, proc_to_steal, 243 HTRDR_MPI_WORK_STEALING, MPI_COMM_WORLD, &req)); 244 245 mpi_wait_for_request(htrdr, &req); 246 247 FOR_EACH(i, 0, nchunks_to_steal) { 248 249 if(chunks[i] != CHUNK_ID_NULL) { 250 /* Save stolen chunk in job list */ 251 proc_work_add_chunk(work, chunks[i]); 252 ++nthieves; 253 254 } else { 255 /* The process has returned at least one invalid chunk, 256 * i.e. it has nothing further to do. 257 * Remove it from the working process */ 258 ASSERT(htrdr->mpi_working_procs[proc_to_steal] != 0); 259 htrdr->mpi_working_procs[proc_to_steal] = 0; 260 htrdr->mpi_nworking_procs--; 261 262 break; /* No more to steal */ 263 } 264 } 265 #undef P_MPI 266 return nthieves; 267 }