1 // Copyright (C) 2006 Douglas Gregor <doug.gregor -at- gmail.com>.
2
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 /** @file nonblocking.hpp
8 *
9 * This header defines operations for completing non-blocking
10 * communication requests.
11 */
12 #ifndef BOOST_MPI_NONBLOCKING_HPP
13 #define BOOST_MPI_NONBLOCKING_HPP
14
15 #include <boost/mpi/config.hpp>
16 #include <vector>
17 #include <iterator> // for std::iterator_traits
18 #include <boost/optional.hpp>
19 #include <utility> // for std::pair
20 #include <algorithm> // for iter_swap, reverse
21 #include <boost/static_assert.hpp>
22 #include <boost/mpi/request.hpp>
23 #include <boost/mpi/status.hpp>
24 #include <boost/mpi/exception.hpp>
25
26 namespace boost { namespace mpi {
27
28 /**
29 * @brief Wait until any non-blocking request has completed.
30 *
31 * This routine takes in a set of requests stored in the iterator
32 * range @c [first,last) and waits until any of these requests has
33 * been completed. It provides functionality equivalent to
34 * @c MPI_Waitany.
35 *
36 * @param first The iterator that denotes the beginning of the
37 * sequence of request objects.
38 *
39 * @param last The iterator that denotes the end of the sequence of
40 * request objects. This may not be equal to @c first.
41 *
42 * @returns A pair containing the status object that corresponds to
43 * the completed operation and the iterator referencing the completed
44 * request.
45 */
46 template<typename ForwardIterator>
47 std::pair<status, ForwardIterator>
wait_any(ForwardIterator first,ForwardIterator last)48 wait_any(ForwardIterator first, ForwardIterator last)
49 {
50 using std::advance;
51
52 BOOST_ASSERT(first != last);
53
54 typedef typename std::iterator_traits<ForwardIterator>::difference_type
55 difference_type;
56
57 bool all_trivial_requests = true;
58 difference_type n = 0;
59 ForwardIterator current = first;
60 while (true) {
61 // Check if we have found a completed request. If so, return it.
62 if (current->active()) {
63 optional<status> result = current->test();
64 if (bool(result)) {
65 return std::make_pair(*result, current);
66 }
67 }
68
69 // Check if this request (and all others before it) are "trivial"
70 // requests, e.g., they can be represented with a single
71 // MPI_Request.
72 // We could probably ignore non trivial request that are inactive,
73 // but we can assume that a mix of trivial and non trivial requests
74 // is unlikely enough not to care.
75 all_trivial_requests = all_trivial_requests && current->trivial();
76
77 // Move to the next request.
78 ++n;
79 if (++current == last) {
80 // We have reached the end of the list. If all requests thus far
81 // have been trivial, we can call MPI_Waitany directly, because
82 // it may be more efficient than our busy-wait semantics.
83 if (all_trivial_requests) {
84 std::vector<MPI_Request> requests;
85 requests.reserve(n);
86 for (current = first; current != last; ++current) {
87 requests.push_back(*current->trivial());
88 }
89
90 // Let MPI wait until one of these operations completes.
91 int index;
92 status stat;
93 BOOST_MPI_CHECK_RESULT(MPI_Waitany,
94 (n, detail::c_data(requests), &index, &stat.m_status));
95
96 // We don't have a notion of empty requests or status objects,
97 // so this is an error.
98 if (index == MPI_UNDEFINED)
99 boost::throw_exception(exception("MPI_Waitany", MPI_ERR_REQUEST));
100
101 // Find the iterator corresponding to the completed request.
102 current = first;
103 advance(current, index);
104 *current->trivial() = requests[index];
105 return std::make_pair(stat, current);
106 }
107
108 // There are some nontrivial requests, so we must continue our
109 // busy waiting loop.
110 n = 0;
111 current = first;
112 all_trivial_requests = true;
113 }
114 }
115
116 // We cannot ever get here
117 BOOST_ASSERT(false);
118 }
119
120 /**
121 * @brief Test whether any non-blocking request has completed.
122 *
123 * This routine takes in a set of requests stored in the iterator
124 * range @c [first,last) and tests whether any of these requests has
125 * been completed. This routine is similar to @c wait_any, but will
126 * not block waiting for requests to completed. It provides
127 * functionality equivalent to @c MPI_Testany.
128 *
129 * @param first The iterator that denotes the beginning of the
130 * sequence of request objects.
131 *
132 * @param last The iterator that denotes the end of the sequence of
133 * request objects.
134 *
135 * @returns If any outstanding requests have completed, a pair
136 * containing the status object that corresponds to the completed
137 * operation and the iterator referencing the completed
138 * request. Otherwise, an empty @c optional<>.
139 */
140 template<typename ForwardIterator>
141 optional<std::pair<status, ForwardIterator> >
test_any(ForwardIterator first,ForwardIterator last)142 test_any(ForwardIterator first, ForwardIterator last)
143 {
144 while (first != last) {
145 // Check if we have found a completed request. If so, return it.
146 if (optional<status> result = first->test()) {
147 return std::make_pair(*result, first);
148 }
149 ++first;
150 }
151
152 // We found nothing
153 return optional<std::pair<status, ForwardIterator> >();
154 }
155
156 /**
157 * @brief Wait until all non-blocking requests have completed.
158 *
159 * This routine takes in a set of requests stored in the iterator
160 * range @c [first,last) and waits until all of these requests have
161 * been completed. It provides functionality equivalent to
162 * @c MPI_Waitall.
163 *
164 * @param first The iterator that denotes the beginning of the
165 * sequence of request objects.
166 *
167 * @param last The iterator that denotes the end of the sequence of
168 * request objects.
169 *
170 * @param out If provided, an output iterator through which the
171 * status of each request will be emitted. The @c status objects are
172 * emitted in the same order as the requests are retrieved from
173 * @c [first,last).
174 *
175 * @returns If an @p out parameter was provided, the value @c out
176 * after all of the @c status objects have been emitted.
177 */
178 template<typename ForwardIterator, typename OutputIterator>
179 OutputIterator
wait_all(ForwardIterator first,ForwardIterator last,OutputIterator out)180 wait_all(ForwardIterator first, ForwardIterator last, OutputIterator out)
181 {
182 typedef typename std::iterator_traits<ForwardIterator>::difference_type
183 difference_type;
184
185 using std::distance;
186
187 difference_type num_outstanding_requests = distance(first, last);
188
189 std::vector<status> results(num_outstanding_requests);
190 std::vector<bool> completed(num_outstanding_requests);
191
192 while (num_outstanding_requests > 0) {
193 bool all_trivial_requests = true;
194 difference_type idx = 0;
195 for (ForwardIterator current = first; current != last; ++current, ++idx) {
196 if (!completed[idx]) {
197 if (!current->active()) {
198 completed[idx] = true;
199 --num_outstanding_requests;
200 } else if (optional<status> stat = current->test()) {
201 // This outstanding request has been completed. We're done.
202 results[idx] = *stat;
203 completed[idx] = true;
204 --num_outstanding_requests;
205 all_trivial_requests = false;
206 } else {
207 // Check if this request (and all others before it) are "trivial"
208 // requests, e.g., they can be represented with a single
209 // MPI_Request.
210 all_trivial_requests = all_trivial_requests && current->trivial();
211 }
212 }
213 }
214
215 // If we have yet to fulfill any requests and all of the requests
216 // are trivial (i.e., require only a single MPI_Request to be
217 // fulfilled), call MPI_Waitall directly.
218 if (all_trivial_requests
219 && num_outstanding_requests == (difference_type)results.size()) {
220 std::vector<MPI_Request> requests;
221 requests.reserve(num_outstanding_requests);
222 for (ForwardIterator current = first; current != last; ++current)
223 requests.push_back(*current->trivial());
224
225 // Let MPI wait until all of these operations completes.
226 std::vector<MPI_Status> stats(num_outstanding_requests);
227 BOOST_MPI_CHECK_RESULT(MPI_Waitall,
228 (num_outstanding_requests, detail::c_data(requests),
229 detail::c_data(stats)));
230
231 for (std::vector<MPI_Status>::iterator i = stats.begin();
232 i != stats.end(); ++i, ++out) {
233 status stat;
234 stat.m_status = *i;
235 *out = stat;
236 }
237
238 return out;
239 }
240
241 all_trivial_requests = false;
242 }
243
244 return std::copy(results.begin(), results.end(), out);
245 }
246
247 /**
248 * \overload
249 */
250 template<typename ForwardIterator>
251 void
wait_all(ForwardIterator first,ForwardIterator last)252 wait_all(ForwardIterator first, ForwardIterator last)
253 {
254 typedef typename std::iterator_traits<ForwardIterator>::difference_type
255 difference_type;
256
257 using std::distance;
258
259 difference_type num_outstanding_requests = distance(first, last);
260
261 std::vector<bool> completed(num_outstanding_requests, false);
262
263 while (num_outstanding_requests > 0) {
264 bool all_trivial_requests = true;
265
266 difference_type idx = 0;
267 for (ForwardIterator current = first; current != last; ++current, ++idx) {
268 if (!completed[idx]) {
269 if (!current->active()) {
270 completed[idx] = true;
271 --num_outstanding_requests;
272 } else if (optional<status> stat = current->test()) {
273 // This outstanding request has been completed.
274 completed[idx] = true;
275 --num_outstanding_requests;
276 all_trivial_requests = false;
277 } else {
278 // Check if this request (and all others before it) are "trivial"
279 // requests, e.g., they can be represented with a single
280 // MPI_Request.
281 all_trivial_requests = all_trivial_requests && current->trivial();
282 }
283 }
284 }
285
286 // If we have yet to fulfill any requests and all of the requests
287 // are trivial (i.e., require only a single MPI_Request to be
288 // fulfilled), call MPI_Waitall directly.
289 if (all_trivial_requests
290 && num_outstanding_requests == (difference_type)completed.size()) {
291 std::vector<MPI_Request> requests;
292 requests.reserve(num_outstanding_requests);
293 for (ForwardIterator current = first; current != last; ++current)
294 requests.push_back(*current->trivial());
295
296 // Let MPI wait until all of these operations completes.
297 BOOST_MPI_CHECK_RESULT(MPI_Waitall,
298 (num_outstanding_requests, detail::c_data(requests),
299 MPI_STATUSES_IGNORE));
300
301 // Signal completion
302 num_outstanding_requests = 0;
303 }
304 }
305 }
306
307 /**
308 * @brief Tests whether all non-blocking requests have completed.
309 *
310 * This routine takes in a set of requests stored in the iterator
311 * range @c [first,last) and determines whether all of these requests
312 * have been completed. However, due to limitations of the underlying
313 * MPI implementation, if any of the requests refers to a
314 * non-blocking send or receive of a serialized data type, @c
315 * test_all will always return the equivalent of @c false (i.e., the
316 * requests cannot all be finished at this time). This routine
317 * performs the same functionality as @c wait_all, except that this
318 * routine will not block. This routine provides functionality
319 * equivalent to @c MPI_Testall.
320 *
321 * @param first The iterator that denotes the beginning of the
322 * sequence of request objects.
323 *
324 * @param last The iterator that denotes the end of the sequence of
325 * request objects.
326 *
327 * @param out If provided and all requests hav been completed, an
328 * output iterator through which the status of each request will be
329 * emitted. The @c status objects are emitted in the same order as
330 * the requests are retrieved from @c [first,last).
331 *
332 * @returns If an @p out parameter was provided, the value @c out
333 * after all of the @c status objects have been emitted (if all
334 * requests were completed) or an empty @c optional<>. If no @p out
335 * parameter was provided, returns @c true if all requests have
336 * completed or @c false otherwise.
337 */
338 template<typename ForwardIterator, typename OutputIterator>
339 optional<OutputIterator>
test_all(ForwardIterator first,ForwardIterator last,OutputIterator out)340 test_all(ForwardIterator first, ForwardIterator last, OutputIterator out)
341 {
342 std::vector<MPI_Request> requests;
343 for (; first != last; ++first) {
344 // If we have a non-trivial request, then no requests can be
345 // completed.
346 if (!first->trivial()) {
347 return optional<OutputIterator>();
348 }
349 requests.push_back(*first->trivial());
350 }
351
352 int flag = 0;
353 int n = requests.size();
354 std::vector<MPI_Status> stats(n);
355 BOOST_MPI_CHECK_RESULT(MPI_Testall, (n, detail::c_data(requests), &flag, detail::c_data(stats)));
356 if (flag) {
357 for (int i = 0; i < n; ++i, ++out) {
358 status stat;
359 stat.m_status = stats[i];
360 *out = stat;
361 }
362 return out;
363 } else {
364 return optional<OutputIterator>();
365 }
366 }
367
368 /**
369 * \overload
370 */
371 template<typename ForwardIterator>
372 bool
test_all(ForwardIterator first,ForwardIterator last)373 test_all(ForwardIterator first, ForwardIterator last)
374 {
375 std::vector<MPI_Request> requests;
376 for (; first != last; ++first) {
377 // If we have a non-trivial request, then no requests can be
378 // completed.
379 if (!first->trivial()) {
380 return false;
381 }
382 requests.push_back(*first->trivial());
383 }
384
385 int flag = 0;
386 int n = requests.size();
387 BOOST_MPI_CHECK_RESULT(MPI_Testall,
388 (n, detail::c_data(requests), &flag, MPI_STATUSES_IGNORE));
389 return flag != 0;
390 }
391
392 /**
393 * @brief Wait until some non-blocking requests have completed.
394 *
395 * This routine takes in a set of requests stored in the iterator
396 * range @c [first,last) and waits until at least one of the requests
397 * has completed. It then completes all of the requests it can,
398 * partitioning the input sequence into pending requests followed by
399 * completed requests. If an output iterator is provided, @c status
400 * objects will be emitted for each of the completed requests. This
401 * routine provides functionality equivalent to @c MPI_Waitsome.
402 *
403 * @param first The iterator that denotes the beginning of the
404 * sequence of request objects.
405 *
406 * @param last The iterator that denotes the end of the sequence of
407 * request objects. This may not be equal to @c first.
408 *
409 * @param out If provided, the @c status objects corresponding to
410 * completed requests will be emitted through this output iterator.
411
412 * @returns If the @p out parameter was provided, a pair containing
413 * the output iterator @p out after all of the @c status objects have
414 * been written through it and an iterator referencing the first
415 * completed request. If no @p out parameter was provided, only the
416 * iterator referencing the first completed request will be emitted.
417 */
418 template<typename BidirectionalIterator, typename OutputIterator>
419 std::pair<OutputIterator, BidirectionalIterator>
wait_some(BidirectionalIterator first,BidirectionalIterator last,OutputIterator out)420 wait_some(BidirectionalIterator first, BidirectionalIterator last,
421 OutputIterator out)
422 {
423 using std::advance;
424
425 if (first == last)
426 return std::make_pair(out, first);
427
428 typedef typename std::iterator_traits<BidirectionalIterator>::difference_type
429 difference_type;
430
431 bool all_trivial_requests = true;
432 difference_type n = 0;
433 BidirectionalIterator current = first;
434 BidirectionalIterator start_of_completed = last;
435 while (true) {
436 // Check if we have found a completed request.
437 if (optional<status> result = current->test()) {
438 using std::iter_swap;
439
440 // Emit the resulting status object
441 *out++ = *result;
442
443 // We're expanding the set of completed requests
444 --start_of_completed;
445
446 if (current == start_of_completed) {
447 // If we have hit the end of the list of pending
448 // requests. Finish up by fixing the order of the completed
449 // set to match the order in which we emitted status objects,
450 // then return.
451 std::reverse(start_of_completed, last);
452 return std::make_pair(out, start_of_completed);
453 }
454
455 // Swap the request we just completed with the last request that
456 // has not yet been tested.
457 iter_swap(current, start_of_completed);
458
459 continue;
460 }
461
462 // Check if this request (and all others before it) are "trivial"
463 // requests, e.g., they can be represented with a single
464 // MPI_Request.
465 all_trivial_requests = all_trivial_requests && current->trivial();
466
467 // Move to the next request.
468 ++n;
469 if (++current == start_of_completed) {
470 if (start_of_completed != last) {
471 // We have satisfied some requests. Make the order of the
472 // completed requests match that of the status objects we've
473 // already emitted and we're done.
474 std::reverse(start_of_completed, last);
475 return std::make_pair(out, start_of_completed);
476 }
477
478 // We have reached the end of the list. If all requests thus far
479 // have been trivial, we can call MPI_Waitsome directly, because
480 // it may be more efficient than our busy-wait semantics.
481 if (all_trivial_requests) {
482 std::vector<MPI_Request> requests;
483 std::vector<int> indices(n);
484 std::vector<MPI_Status> stats(n);
485 requests.reserve(n);
486 for (current = first; current != last; ++current)
487 requests.push_back(*current->trivial());
488
489 // Let MPI wait until some of these operations complete.
490 int num_completed;
491 BOOST_MPI_CHECK_RESULT(MPI_Waitsome,
492 (n, detail::c_data(requests), &num_completed, detail::c_data(indices),
493 detail::c_data(stats)));
494
495 // Translate the index-based result of MPI_Waitsome into a
496 // partitioning on the requests.
497 int current_offset = 0;
498 current = first;
499 for (int index = 0; index < num_completed; ++index, ++out) {
500 using std::iter_swap;
501
502 // Move "current" to the request object at this index
503 advance(current, indices[index] - current_offset);
504 current_offset = indices[index];
505
506 // Emit the status object
507 status stat;
508 stat.m_status = stats[index];
509 *out = stat;
510
511 // Finish up the request and swap it into the "completed
512 // requests" partition.
513 *current->trivial() = requests[indices[index]];
514 --start_of_completed;
515 iter_swap(current, start_of_completed);
516 }
517
518 // We have satisfied some requests. Make the order of the
519 // completed requests match that of the status objects we've
520 // already emitted and we're done.
521 std::reverse(start_of_completed, last);
522 return std::make_pair(out, start_of_completed);
523 }
524
525 // There are some nontrivial requests, so we must continue our
526 // busy waiting loop.
527 n = 0;
528 current = first;
529 }
530 }
531
532 // We cannot ever get here
533 BOOST_ASSERT(false);
534 }
535
536 /**
537 * \overload
538 */
539 template<typename BidirectionalIterator>
540 BidirectionalIterator
wait_some(BidirectionalIterator first,BidirectionalIterator last)541 wait_some(BidirectionalIterator first, BidirectionalIterator last)
542 {
543 using std::advance;
544
545 if (first == last)
546 return first;
547
548 typedef typename std::iterator_traits<BidirectionalIterator>::difference_type
549 difference_type;
550
551 bool all_trivial_requests = true;
552 difference_type n = 0;
553 BidirectionalIterator current = first;
554 BidirectionalIterator start_of_completed = last;
555 while (true) {
556 // Check if we have found a completed request.
557 if (optional<status> result = current->test()) {
558 using std::iter_swap;
559
560 // We're expanding the set of completed requests
561 --start_of_completed;
562
563 // If we have hit the end of the list of pending requests, we're
564 // done.
565 if (current == start_of_completed)
566 return start_of_completed;
567
568 // Swap the request we just completed with the last request that
569 // has not yet been tested.
570 iter_swap(current, start_of_completed);
571
572 continue;
573 }
574
575 // Check if this request (and all others before it) are "trivial"
576 // requests, e.g., they can be represented with a single
577 // MPI_Request.
578 all_trivial_requests = all_trivial_requests && current->trivial();
579
580 // Move to the next request.
581 ++n;
582 if (++current == start_of_completed) {
583 // If we have satisfied some requests, we're done.
584 if (start_of_completed != last)
585 return start_of_completed;
586
587 // We have reached the end of the list. If all requests thus far
588 // have been trivial, we can call MPI_Waitsome directly, because
589 // it may be more efficient than our busy-wait semantics.
590 if (all_trivial_requests) {
591 std::vector<MPI_Request> requests;
592 std::vector<int> indices(n);
593 requests.reserve(n);
594 for (current = first; current != last; ++current)
595 requests.push_back(*current->trivial());
596
597 // Let MPI wait until some of these operations complete.
598 int num_completed;
599 BOOST_MPI_CHECK_RESULT(MPI_Waitsome,
600 (n, detail::c_data(requests), &num_completed, detail::c_data(indices),
601 MPI_STATUSES_IGNORE));
602
603 // Translate the index-based result of MPI_Waitsome into a
604 // partitioning on the requests.
605 int current_offset = 0;
606 current = first;
607 for (int index = 0; index < num_completed; ++index) {
608 using std::iter_swap;
609
610 // Move "current" to the request object at this index
611 advance(current, indices[index] - current_offset);
612 current_offset = indices[index];
613
614 // Finish up the request and swap it into the "completed
615 // requests" partition.
616 *current->trivial() = requests[indices[index]];
617 --start_of_completed;
618 iter_swap(current, start_of_completed);
619 }
620
621 // We have satisfied some requests, so we are done.
622 return start_of_completed;
623 }
624
625 // There are some nontrivial requests, so we must continue our
626 // busy waiting loop.
627 n = 0;
628 current = first;
629 }
630 }
631
632 // We cannot ever get here
633 BOOST_ASSERT(false);
634 }
635
636 /**
637 * @brief Test whether some non-blocking requests have completed.
638 *
639 * This routine takes in a set of requests stored in the iterator
640 * range @c [first,last) and tests to see if any of the requests has
641 * completed. It completes all of the requests it can, partitioning
642 * the input sequence into pending requests followed by completed
643 * requests. If an output iterator is provided, @c status objects
644 * will be emitted for each of the completed requests. This routine
645 * is similar to @c wait_some, but does not wait until any requests
646 * have completed. This routine provides functionality equivalent to
647 * @c MPI_Testsome.
648 *
649 * @param first The iterator that denotes the beginning of the
650 * sequence of request objects.
651 *
652 * @param last The iterator that denotes the end of the sequence of
653 * request objects. This may not be equal to @c first.
654 *
655 * @param out If provided, the @c status objects corresponding to
656 * completed requests will be emitted through this output iterator.
657
658 * @returns If the @p out parameter was provided, a pair containing
659 * the output iterator @p out after all of the @c status objects have
660 * been written through it and an iterator referencing the first
661 * completed request. If no @p out parameter was provided, only the
662 * iterator referencing the first completed request will be emitted.
663 */
664 template<typename BidirectionalIterator, typename OutputIterator>
665 std::pair<OutputIterator, BidirectionalIterator>
test_some(BidirectionalIterator first,BidirectionalIterator last,OutputIterator out)666 test_some(BidirectionalIterator first, BidirectionalIterator last,
667 OutputIterator out)
668 {
669 BidirectionalIterator current = first;
670 BidirectionalIterator start_of_completed = last;
671 while (current != start_of_completed) {
672 // Check if we have found a completed request.
673 if (optional<status> result = current->test()) {
674 using std::iter_swap;
675
676 // Emit the resulting status object
677 *out++ = *result;
678
679 // We're expanding the set of completed requests
680 --start_of_completed;
681
682 // Swap the request we just completed with the last request that
683 // has not yet been tested.
684 iter_swap(current, start_of_completed);
685
686 continue;
687 }
688
689 // Move to the next request.
690 ++current;
691 }
692
693 // Finish up by fixing the order of the completed set to match the
694 // order in which we emitted status objects, then return.
695 std::reverse(start_of_completed, last);
696 return std::make_pair(out, start_of_completed);
697 }
698
699 /**
700 * \overload
701 */
702 template<typename BidirectionalIterator>
703 BidirectionalIterator
test_some(BidirectionalIterator first,BidirectionalIterator last)704 test_some(BidirectionalIterator first, BidirectionalIterator last)
705 {
706 BidirectionalIterator current = first;
707 BidirectionalIterator start_of_completed = last;
708 while (current != start_of_completed) {
709 // Check if we have found a completed request.
710 if (optional<status> result = current->test()) {
711 using std::iter_swap;
712
713 // We're expanding the set of completed requests
714 --start_of_completed;
715
716 // Swap the request we just completed with the last request that
717 // has not yet been tested.
718 iter_swap(current, start_of_completed);
719
720 continue;
721 }
722
723 // Move to the next request.
724 ++current;
725 }
726
727 return start_of_completed;
728 }
729
730 } } // end namespace boost::mpi
731
732
733 #endif // BOOST_MPI_NONBLOCKING_HPP
734