StarPU Internal Handbook
starpu_mpi_private.h
Go to the documentation of this file.
1 /* StarPU --- Runtime system for heterogeneous multicore architectures.
2  *
3  * Copyright (C) 2010-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
4  *
5  * StarPU is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU Lesser General Public License as published by
7  * the Free Software Foundation; either version 2.1 of the License, or (at
8  * your option) any later version.
9  *
10  * StarPU is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13  *
14  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
15  */
16 
17 #ifndef __STARPU_MPI_PRIVATE_H__
18 #define __STARPU_MPI_PRIVATE_H__
19 
20 #include <starpu.h>
21 #include <common/config.h>
22 #include <common/uthash.h>
23 #include <starpu_mpi.h>
24 #include <starpu_mpi_fxt.h>
25 #include <common/list.h>
26 #include <common/prio_list.h>
27 #include <common/starpu_spinlock.h>
28 #include <core/simgrid.h>
29 
32 #ifdef __cplusplus
33 extern "C"
34 {
35 #endif
36 
37 #ifdef STARPU_SIMGRID
38 extern starpu_pthread_wait_t _starpu_mpi_thread_wait;
39 extern starpu_pthread_queue_t _starpu_mpi_thread_dontsleep;
40 
42 {
43  MPI_Request *request;
44  MPI_Status *status;
45  starpu_pthread_queue_t *queue;
46  unsigned *done;
47 };
48 
49 int _starpu_mpi_simgrid_mpi_test(unsigned *done, int *flag);
50 void _starpu_mpi_simgrid_wait_req(MPI_Request *request, MPI_Status *status, starpu_pthread_queue_t *queue, unsigned *done);
51 #endif
52 
53 extern int _starpu_debug_rank;
54 char *_starpu_mpi_get_mpi_error_code(int code);
55 extern int _starpu_mpi_comm_debug;
56 
57 #ifdef STARPU_MPI_VERBOSE
58 extern int _starpu_debug_level_min;
59 extern int _starpu_debug_level_max;
60 void _starpu_mpi_set_debug_level_min(int level);
61 void _starpu_mpi_set_debug_level_max(int level);
62 #endif
63 extern int _starpu_mpi_fake_world_size;
64 extern int _starpu_mpi_fake_world_rank;
65 extern int _starpu_mpi_use_prio;
66 extern int _starpu_mpi_nobind;
67 extern int _starpu_mpi_thread_cpuid;
68 extern int _starpu_mpi_use_coop_sends;
69 void _starpu_mpi_env_init(void);
70 
71 #ifdef STARPU_NO_ASSERT
72 # define STARPU_MPI_ASSERT_MSG(x, msg, ...) do { if (0) { (void) (x); }} while(0)
73 #else
74 # if defined(__CUDACC__) && defined(STARPU_HAVE_WINDOWS)
75 int _starpu_debug_rank;
76 # define STARPU_MPI_ASSERT_MSG(x, msg, ...) \
77  do \
78  { \
79  if (STARPU_UNLIKELY(!(x))) \
80  { \
81  if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
82  fprintf(stderr, "\n[%d][starpu_mpi][%s][assert failure] " msg "\n\n", _starpu_debug_rank, __starpu_func__, ## __VA_ARGS__); *(int*)NULL = 0; \
83  } \
84  } while(0)
85 # else
86 # define STARPU_MPI_ASSERT_MSG(x, msg, ...) \
87  do \
88  { \
89  if (STARPU_UNLIKELY(!(x))) \
90  { \
91  if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
92  fprintf(stderr, "\n[%d][starpu_mpi][%s][assert failure] " msg "\n\n", _starpu_debug_rank, __starpu_func__, ## __VA_ARGS__); \
93  } \
94  assert(x); \
95  } while(0)
96 
97 # endif
98 #endif
99 
100 #define _STARPU_MPI_MALLOC(ptr, size) do { ptr = malloc(size); STARPU_MPI_ASSERT_MSG(ptr != NULL, "Cannot allocate %ld bytes\n", (long) (size)); } while (0)
101 #define _STARPU_MPI_CALLOC(ptr, nmemb, size) do { ptr = calloc(nmemb, size); STARPU_MPI_ASSERT_MSG(ptr != NULL, "Cannot allocate %ld bytes\n", (long) (nmemb*size)); } while (0)
102 #define _STARPU_MPI_REALLOC(ptr, size) do { void *_new_ptr = realloc(ptr, size); STARPU_MPI_ASSERT_MSG(_new_ptr != NULL, "Cannot reallocate %ld bytes\n", (long) (size)); ptr = _new_ptr; } while (0)
103 
104 #ifdef STARPU_MPI_VERBOSE
105 # define _STARPU_MPI_COMM_DEBUG(ptr, count, datatype, node, tag, utag, comm, way) \
106  do \
107  { \
108  if (_starpu_mpi_comm_debug) \
109  { \
110  int __size; \
111  char _comm_name[128]; \
112  int _comm_name_len; \
113  int _rank; \
114  starpu_mpi_comm_rank(comm, &_rank); \
115  MPI_Type_size(datatype, &__size); \
116  MPI_Comm_get_name(comm, _comm_name, &_comm_name_len); \
117  fprintf(stderr, "[%d][starpu_mpi] :%d:%s:%d:%d:%ld:%s:%p:%ld:%d:%s:%d\n", _rank, _rank, way, node, tag, utag, _comm_name, ptr, count, __size, __starpu_func__ , __LINE__); \
118  fflush(stderr); \
119  } \
120  } while(0);
121 # define _STARPU_MPI_COMM_TO_DEBUG(ptr, count, datatype, dest, tag, utag, comm) _STARPU_MPI_COMM_DEBUG(ptr, count, datatype, dest, tag, utag, comm, "-->")
122 # define _STARPU_MPI_COMM_FROM_DEBUG(ptr, count, datatype, source, tag, utag, comm) _STARPU_MPI_COMM_DEBUG(ptr, count, datatype, source, tag, utag, comm, "<--")
123 # define _STARPU_MPI_DEBUG(level, fmt, ...) \
124  do \
125  { \
126  if (!_starpu_silent && _starpu_debug_level_min <= level && level <= _starpu_debug_level_max) \
127  { \
128  if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
129  fprintf(stderr, "%*s[%d][starpu_mpi][%s:%d] " fmt , (_starpu_debug_rank+1)*4, "", _starpu_debug_rank, __starpu_func__ , __LINE__,## __VA_ARGS__); \
130  fflush(stderr); \
131  } \
132  } while(0);
133 #else
134 # define _STARPU_MPI_COMM_DEBUG(ptr, count, datatype, node, tag, utag, comm, way) do { } while(0)
135 # define _STARPU_MPI_COMM_TO_DEBUG(ptr, count, datatype, dest, tag, utag, comm) do { } while(0)
136 # define _STARPU_MPI_COMM_FROM_DEBUG(ptr, count, datatype, source, tag, utag, comm) do { } while(0)
137 # define _STARPU_MPI_DEBUG(level, fmt, ...) do { } while(0)
138 #endif
139 
140 #define _STARPU_MPI_DISP(fmt, ...) do { if (!_starpu_silent) { \
141  if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
142  fprintf(stderr, "%*s[%d][starpu_mpi][%s:%d] " fmt , (_starpu_debug_rank+1)*4, "", _starpu_debug_rank, __starpu_func__ , __LINE__ ,## __VA_ARGS__); \
143  fflush(stderr); }} while(0);
144 #define _STARPU_MPI_MSG(fmt, ...) do { if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
145  fprintf(stderr, "[%d][starpu_mpi][%s:%d] " fmt , _starpu_debug_rank, __starpu_func__ , __LINE__ ,## __VA_ARGS__); \
146  fflush(stderr); } while(0);
147 
148 #ifdef STARPU_MPI_EXTRA_VERBOSE
149 # define _STARPU_MPI_LOG_IN() do { if (!_starpu_silent) { \
150  if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
151  fprintf(stderr, "%*s[%d][starpu_mpi][%s:%d] -->\n", (_starpu_debug_rank+1)*4, "", _starpu_debug_rank, __starpu_func__ , __LINE__); \
152  fflush(stderr); }} while(0)
153 # define _STARPU_MPI_LOG_OUT() do { if (!_starpu_silent) { \
154  if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
155  fprintf(stderr, "%*s[%d][starpu_mpi][%s:%d] <--\n", (_starpu_debug_rank+1)*4, "", _starpu_debug_rank, __starpu_func__, __LINE__ ); \
156  fflush(stderr); }} while(0)
157 #else
158 # define _STARPU_MPI_LOG_IN()
159 # define _STARPU_MPI_LOG_OUT()
160 #endif
161 
162 enum _starpu_mpi_request_type
163 {
164  SEND_REQ=0,
165  RECV_REQ=1,
166  WAIT_REQ=2,
167  TEST_REQ=3,
168  BARRIER_REQ=4,
169  PROBE_REQ=5,
170  UNKNOWN_REQ=6,
171 };
172 
174 {
175  MPI_Comm comm;
176  int rank;
177 };
178 
180 {
181  struct _starpu_mpi_node node;
182  starpu_mpi_tag_t data_tag;
183 };
184 
185 MULTILIST_CREATE_TYPE(_starpu_mpi_req, coop_sends)
186 /* One bag of cooperative sends */
188 {
189  /* List of send requests */
190  struct _starpu_mpi_req_multilist_coop_sends reqs;
191  struct _starpu_mpi_data *mpi_data;
192 
193  /* Array of send requests, after sorting out */
194  struct _starpu_spinlock lock;
195  struct _starpu_mpi_req **reqs_array;
196  unsigned n;
197  unsigned redirects_sent;
198 };
199 
200 /* Initialized in starpu_mpi_data_register_comm */
202 {
203  int magic;
204  struct _starpu_mpi_node_tag node_tag;
205  char *cache_sent;
206  int cache_received;
207 
208  /* Rendez-vous data for opportunistic cooperative sends */
209  struct _starpu_spinlock coop_lock; /* Needed to synchronize between submit thread and workers */
210  struct _starpu_mpi_coop_sends *coop_sends; /* Current cooperative send bag */
211 };
212 
213 struct _starpu_mpi_data *_starpu_mpi_data_get(starpu_data_handle_t data_handle);
214 
216 struct _starpu_mpi_req;
217 LIST_TYPE(_starpu_mpi_req,
218  /* description of the data at StarPU level */
219  starpu_data_handle_t data_handle;
220 
221  int prio;
222 
223  /* description of the data to be sent/received */
224  MPI_Datatype datatype;
225  char *datatype_name;
226  void *ptr;
227  starpu_ssize_t count;
228  int registered_datatype;
229 
230  struct _starpu_mpi_req_backend *backend;
231 
232  /* who are we talking to ? */
233  struct _starpu_mpi_node_tag node_tag;
234  void (*func)(struct _starpu_mpi_req *);
235 
236  MPI_Status *status;
237  struct _starpu_mpi_req_multilist_coop_sends coop_sends;
238  struct _starpu_mpi_coop_sends *coop_sends_head;
239 
240  int *flag;
241  unsigned sync;
242 
243  int ret;
244 
246  enum _starpu_mpi_request_type request_type;
247 
248  unsigned submitted;
249  unsigned completed;
250  unsigned posted;
251 
252  /* in the case of detached requests */
253  int detached;
254  void *callback_arg;
255  void (*callback)(void *);
256 
257  /* in the case of user-defined datatypes, we need to send the size of the data */
258 
259  int sequential_consistency;
260 
261  long pre_sync_jobid;
262  long post_sync_jobid;
263 
264 #ifdef STARPU_SIMGRID
265  MPI_Status status_store;
266  starpu_pthread_queue_t queue;
267  unsigned done;
268 #endif
269 );
270 PRIO_LIST_TYPE(_starpu_mpi_req, prio)
271 
272 MULTILIST_CREATE_INLINES(struct _starpu_mpi_req, _starpu_mpi_req, coop_sends)
273 
274 /* To be called before actually queueing a request, so the communication layer knows it has something to look at */
275 void _starpu_mpi_req_willpost(struct _starpu_mpi_req *req);
276 /* To be called to actually submit the request */
277 void _starpu_mpi_submit_ready_request(void *arg);
278 /* To be called when request is completed */
279 void _starpu_mpi_release_req_data(struct _starpu_mpi_req *req);
280 
281 /* Build a communication tree. Called before _starpu_mpi_coop_send is ever called. coop_sends->lock is held. */
282 void _starpu_mpi_coop_sends_build_tree(struct _starpu_mpi_coop_sends *coop_sends);
283 /* Try to merge with send request with other send requests */
284 void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency);
285 
286 /* Actually submit the coop_sends bag to MPI.
287  * At least one of submit_control or submit_data is true.
288  * _starpu_mpi_submit_coop_sends may be called either
289  * - just once with both parameters being true,
290  * - or once with submit_control being true (data is not available yet, but we
291  * can send control messages), and a second time with submit_data being true. Or
292  * the converse, possibly on different threads, etc.
293  */
294 void _starpu_mpi_submit_coop_sends(struct _starpu_mpi_coop_sends *coop_sends, int submit_control, int submit_data);
295 
296 void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req);
297 void _starpu_mpi_request_init(struct _starpu_mpi_req **req);
298 struct _starpu_mpi_req * _starpu_mpi_request_fill(starpu_data_handle_t data_handle,
299  int srcdst, starpu_mpi_tag_t data_tag, MPI_Comm comm,
300  unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
301  enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
302  int sequential_consistency,
303  int is_internal_req,
304  starpu_ssize_t count);
305 
306 void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req);
307 void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req);
308 void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req);
309 int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status);
310 int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status);
311 int _starpu_mpi_barrier(MPI_Comm comm);
312 
314 {
315  int initialize_mpi;
316  int *argc;
317  char ***argv;
318  MPI_Comm comm;
320  int fargc;
322  char **fargv;
323  int rank;
324  int world_size;
325 };
326 
327 void _starpu_mpi_progress_shutdown(void **value);
328 int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv);
329 #ifdef STARPU_SIMGRID
330 void _starpu_mpi_wait_for_initialization();
331 #endif
332 void _starpu_mpi_data_flush(starpu_data_handle_t data_handle);
333 
334 /*
335  * Specific functions to backend implementation
336  */
338 {
339  void (*_starpu_mpi_backend_init)(struct starpu_conf *conf);
340  void (*_starpu_mpi_backend_shutdown)(void);
341  int (*_starpu_mpi_backend_reserve_core)(void);
342  void (*_starpu_mpi_backend_request_init)(struct _starpu_mpi_req *req);
343  void (*_starpu_mpi_backend_request_fill)(struct _starpu_mpi_req *req, MPI_Comm comm, int is_internal_req);
344  void (*_starpu_mpi_backend_request_destroy)(struct _starpu_mpi_req *req);
345  void (*_starpu_mpi_backend_data_clear)(starpu_data_handle_t data_handle);
346  void (*_starpu_mpi_backend_data_register)(starpu_data_handle_t data_handle, starpu_mpi_tag_t data_tag);
347  void (*_starpu_mpi_backend_comm_register)(MPI_Comm comm);
348 };
349 
350 extern struct _starpu_mpi_backend _mpi_backend;
351 #ifdef __cplusplus
352 }
353 #endif
354 
355 #endif // __STARPU_MPI_PRIVATE_H__
#define struct
Definition: list.h:172
Definition: starpu_mpi_mpi_backend.h:52
char ** fargv
Definition: starpu_mpi_private.h:322
int fargc
Definition: starpu_mpi_private.h:320
Definition: starpu_mpi_private.h:314
Definition: starpu_mpi_private.h:188
Definition: starpu_mpi_private.h:202
Definition: starpu_mpi_private.h:174
Definition: starpu_mpi_private.h:180
Definition: starpu_mpi_private.h:42
Definition: starpu_spinlock.h:82
Definition: starpu_mpi_private.h:338
Definition: starpu_mpi_private.h:217
enum _starpu_mpi_request_type request_type
Definition: starpu_mpi_private.h:246