1 // Copyright (C) 2004-2008 The Trustees of Indiana University.
2 // Copyright (C) 2007 Douglas Gregor
3 // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com>
4
5 // Use, modification and distribution is subject to the Boost Software
6 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
7 // http://www.boost.org/LICENSE_1_0.txt)
8
9 // Authors: Douglas Gregor
10 // Matthias Troyer
11 // Andrew Lumsdaine
12 #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
13 #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
14
15 #ifndef BOOST_GRAPH_USE_MPI
16 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
17 #endif
18
19 //#define NO_SPLIT_BATCHES
20 #define SEND_OOB_BSEND
21
22 #include <boost/optional.hpp>
23 #include <boost/shared_ptr.hpp>
24 #include <boost/weak_ptr.hpp>
25 #include <utility>
26 #include <memory>
27 #include <boost/function/function1.hpp>
28 #include <boost/function/function2.hpp>
29 #include <boost/function/function0.hpp>
30 #include <boost/mpi.hpp>
31 #include <boost/property_map/parallel/process_group.hpp>
32 #include <boost/serialization/vector.hpp>
33 #include <boost/utility/enable_if.hpp>
34
35 namespace boost { namespace graph { namespace distributed {
36
37 // Process group tags
38 struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { };
39
40 class mpi_process_group
41 {
42 struct impl;
43
44 public:
45 /// Number of tags available to each data structure.
46 static const int max_tags = 256;
47
48 /**
49 * The type of a "receive" handler, that will be provided with
50 * (source, tag) pairs when a message is received. Users can provide a
51 * receive handler for a distributed data structure, for example, to
52 * automatically pick up and respond to messages as needed.
53 */
54 typedef function<void(int source, int tag)> receiver_type;
55
56 /**
57 * The type of a handler for the on-synchronize event, which will be
58 * executed at the beginning of synchronize().
59 */
60 typedef function0<void> on_synchronize_event_type;
61
62 /// Used as a tag to help create an "empty" process group.
63 struct create_empty {};
64
65 /// The type used to buffer message data
66 typedef boost::mpi::packed_oprimitive::buffer_type buffer_type;
67
68 /// The type used to identify a process
69 typedef int process_id_type;
70
71 /// The type used to count the number of processes
72 typedef int process_size_type;
73
74 /// The type of communicator used to transmit data via MPI
75 typedef boost::mpi::communicator communicator_type;
76
77 /// Classification of the capabilities of this process group
78 struct communication_category
79 : virtual boost::parallel::bsp_process_group_tag,
80 virtual mpi_process_group_tag { };
81
82 // TBD: We can eliminate the "source" field and possibly the
83 // "offset" field.
84 struct message_header {
85 /// The process that sent the message
86 process_id_type source;
87
88 /// The message tag
89 int tag;
90
91 /// The offset of the message into the buffer
92 std::size_t offset;
93
94 /// The length of the message in the buffer, in bytes
95 std::size_t bytes;
96
97 template <class Archive>
serializeboost::graph::distributed::mpi_process_group::message_header98 void serialize(Archive& ar, int)
99 {
100 ar & source & tag & offset & bytes;
101 }
102 };
103
104 /**
105 * Stores the outgoing messages for a particular processor.
106 *
107 * @todo Evaluate whether we should use a deque instance, which
108 * would reduce could reduce the cost of "sending" messages but
109 * increases the time spent in the synchronization step.
110 */
111 struct outgoing_messages {
outgoing_messagesboost::graph::distributed::mpi_process_group::outgoing_messages112 outgoing_messages() {}
~outgoing_messagesboost::graph::distributed::mpi_process_group::outgoing_messages113 ~outgoing_messages() {}
114
115 std::vector<message_header> headers;
116 buffer_type buffer;
117
118 template <class Archive>
serializeboost::graph::distributed::mpi_process_group::outgoing_messages119 void serialize(Archive& ar, int)
120 {
121 ar & headers & buffer;
122 }
123
swapboost::graph::distributed::mpi_process_group::outgoing_messages124 void swap(outgoing_messages& x)
125 {
126 headers.swap(x.headers);
127 buffer.swap(x.buffer);
128 }
129 };
130
131 private:
132 /**
133 * Virtual base from which every trigger will be launched. See @c
134 * trigger_launcher for more information.
135 */
136 class trigger_base : boost::noncopyable
137 {
138 public:
trigger_base(int tag)139 explicit trigger_base(int tag) : tag_(tag) { }
140
141 /// Retrieve the tag associated with this trigger
tag() const142 int tag() const { return tag_; }
143
~trigger_base()144 virtual ~trigger_base() { }
145
146 /**
147 * Invoked to receive a message that matches a particular trigger.
148 *
149 * @param source the source of the message
150 * @param tag the (local) tag of the message
151 * @param context the context under which the trigger is being
152 * invoked
153 */
154 virtual void
155 receive(mpi_process_group const& pg, int source, int tag,
156 trigger_receive_context context, int block=-1) const = 0;
157
158 protected:
159 // The message tag associated with this trigger
160 int tag_;
161 };
162
163 /**
164 * Launches a specific handler in response to a trigger. This
165 * function object wraps up the handler function object and a buffer
166 * for incoming data.
167 */
168 template<typename Type, typename Handler>
169 class trigger_launcher : public trigger_base
170 {
171 public:
trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)172 explicit trigger_launcher(mpi_process_group& self, int tag,
173 const Handler& handler)
174 : trigger_base(tag), self(self), handler(handler)
175 {}
176
177 void
178 receive(mpi_process_group const& pg, int source, int tag,
179 trigger_receive_context context, int block=-1) const;
180
181 private:
182 mpi_process_group& self;
183 mutable Handler handler;
184 };
185
186 /**
187 * Launches a specific handler with a message reply in response to a
188 * trigger. This function object wraps up the handler function
189 * object and a buffer for incoming data.
190 */
191 template<typename Type, typename Handler>
192 class reply_trigger_launcher : public trigger_base
193 {
194 public:
reply_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)195 explicit reply_trigger_launcher(mpi_process_group& self, int tag,
196 const Handler& handler)
197 : trigger_base(tag), self(self), handler(handler)
198 {}
199
200 void
201 receive(mpi_process_group const& pg, int source, int tag,
202 trigger_receive_context context, int block=-1) const;
203
204 private:
205 mpi_process_group& self;
206 mutable Handler handler;
207 };
208
209 template<typename Type, typename Handler>
210 class global_trigger_launcher : public trigger_base
211 {
212 public:
global_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)213 explicit global_trigger_launcher(mpi_process_group& self, int tag,
214 const Handler& handler)
215 : trigger_base(tag), handler(handler)
216 {
217 }
218
219 void
220 receive(mpi_process_group const& pg, int source, int tag,
221 trigger_receive_context context, int block=-1) const;
222
223 private:
224 mutable Handler handler;
225 // TBD: do not forget to cancel any outstanding Irecv when deleted,
226 // if we decide to use Irecv
227 };
228
229 template<typename Type, typename Handler>
230 class global_irecv_trigger_launcher : public trigger_base
231 {
232 public:
global_irecv_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler,int sz)233 explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag,
234 const Handler& handler, int sz)
235 : trigger_base(tag), handler(handler), buffer_size(sz)
236 {
237 prepare_receive(self,tag);
238 }
239
240 void
241 receive(mpi_process_group const& pg, int source, int tag,
242 trigger_receive_context context, int block=-1) const;
243
244 private:
245 void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const;
246 Handler handler;
247 int buffer_size;
248 // TBD: do not forget to cancel any outstanding Irecv when deleted,
249 // if we decide to use Irecv
250 };
251
252 public:
253 /**
254 * Construct a new BSP process group from an MPI communicator. The
255 * MPI communicator will be duplicated to create a new communicator
256 * for this process group to use.
257 */
258 mpi_process_group(communicator_type parent_comm = communicator_type());
259
260 /**
261 * Construct a new BSP process group from an MPI communicator. The
262 * MPI communicator will be duplicated to create a new communicator
263 * for this process group to use. This constructor allows to tune the
264 * size of message batches.
265 *
266 * @param num_headers The maximum number of headers in a message batch
267 *
268 * @param buffer_size The maximum size of the message buffer in a batch.
269 *
270 */
271 mpi_process_group( std::size_t num_headers, std::size_t buffer_size,
272 communicator_type parent_comm = communicator_type());
273
274 /**
275 * Construct a copy of the BSP process group for a new distributed
276 * data structure. This data structure will synchronize with all
277 * other members of the process group's equivalence class (including
278 * @p other), but will have its own set of tags.
279 *
280 * @param other The process group that this new process group will
281 * be based on, using a different set of tags within the same
282 * communication and synchronization space.
283 *
284 * @param handler A message handler that will be passed (source,
285 * tag) pairs for each message received by this data
286 * structure. The handler is expected to receive the messages
287 * immediately. The handler can be changed after-the-fact by
288 * calling @c replace_handler.
289 *
290 * @param out_of_band_receive An anachronism. TODO: remove this.
291 */
292 mpi_process_group(const mpi_process_group& other,
293 const receiver_type& handler,
294 bool out_of_band_receive = false);
295
296 /**
297 * Construct a copy of the BSP process group for a new distributed
298 * data structure. This data structure will synchronize with all
299 * other members of the process group's equivalence class (including
300 * @p other), but will have its own set of tags.
301 */
302 mpi_process_group(const mpi_process_group& other,
303 attach_distributed_object,
304 bool out_of_band_receive = false);
305
306 /**
307 * Create an "empty" process group, with no information. This is an
308 * internal routine that users should never need.
309 */
mpi_process_group(create_empty)310 explicit mpi_process_group(create_empty) {}
311
312 /**
313 * Destroys this copy of the process group.
314 */
315 ~mpi_process_group();
316
317 /**
318 * Replace the current message handler with a new message handler.
319 *
320 * @param handle The new message handler.
321 * @param out_of_band_receive An anachronism: remove this
322 */
323 void replace_handler(const receiver_type& handler,
324 bool out_of_band_receive = false);
325
326 /**
327 * Turns this process group into the process group for a new
328 * distributed data structure or object, allocating its own tag
329 * block.
330 */
331 void make_distributed_object();
332
333 /**
334 * Replace the handler to be invoked at the beginning of synchronize.
335 */
336 void
337 replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0);
338
339 /**
340 * Return the block number of the current data structure. A value of
341 * 0 indicates that this particular instance of the process group is
342 * not associated with any distributed data structure.
343 */
my_block_number() const344 int my_block_number() const { return block_num? *block_num : 0; }
345
346 /**
347 * Encode a block number/tag pair into a single encoded tag for
348 * transmission.
349 */
encode_tag(int block_num,int tag) const350 int encode_tag(int block_num, int tag) const
351 { return block_num * max_tags + tag; }
352
353 /**
354 * Decode an encoded tag into a block number/tag pair.
355 */
decode_tag(int encoded_tag) const356 std::pair<int, int> decode_tag(int encoded_tag) const
357 { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); }
358
359 // @todo Actually write up the friend declarations so these could be
360 // private.
361
362 // private:
363
364 /** Allocate a block of tags for this instance. The block should not
365 * have been allocated already, e.g., my_block_number() ==
366 * 0. Returns the newly-allocated block number.
367 */
368 int allocate_block(bool out_of_band_receive = false);
369
370 /** Potentially emit a receive event out of band. Returns true if an event
371 * was actually sent, false otherwise.
372 */
373 bool maybe_emit_receive(int process, int encoded_tag) const;
374
375 /** Emit a receive event. Returns true if an event was actually
376 * sent, false otherwise.
377 */
378 bool emit_receive(int process, int encoded_tag) const;
379
380 /** Emit an on-synchronize event to all block handlers. */
381 void emit_on_synchronize() const;
382
383 /** Retrieve a reference to the stored receiver in this block. */
384 template<typename Receiver>
385 Receiver* get_receiver();
386
387 template<typename T>
388 void
389 send_impl(int dest, int tag, const T& value,
390 mpl::true_ /*is_mpi_datatype*/) const;
391
392 template<typename T>
393 void
394 send_impl(int dest, int tag, const T& value,
395 mpl::false_ /*is_mpi_datatype*/) const;
396
397 template<typename T>
398 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
399 array_send_impl(int dest, int tag, const T values[], std::size_t n) const;
400
401 template<typename T>
402 bool
403 receive_impl(int source, int tag, T& value,
404 mpl::true_ /*is_mpi_datatype*/) const;
405
406 template<typename T>
407 bool
408 receive_impl(int source, int tag, T& value,
409 mpl::false_ /*is_mpi_datatype*/) const;
410
411 // Receive an array of values
412 template<typename T>
413 typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
414 array_receive_impl(int source, int tag, T* values, std::size_t& n) const;
415
416 optional<std::pair<mpi_process_group::process_id_type, int> > probe() const;
417
418 void synchronize() const;
419
operator bool()420 operator bool() { return bool(impl_); }
421
422 mpi_process_group base() const;
423
424 /**
425 * Create a new trigger for a specific message tag. Triggers handle
426 * out-of-band messaging, and the handler itself will be called
427 * whenever a message is available. The handler itself accepts four
428 * arguments: the source of the message, the message tag (which will
429 * be the same as @p tag), the message data (of type @c Type), and a
430 * boolean flag that states whether the message was received
431 * out-of-band. The last will be @c true for out-of-band receives,
432 * or @c false for receives at the end of a synchronization step.
433 */
434 template<typename Type, typename Handler>
435 void trigger(int tag, const Handler& handler);
436
437 /**
438 * Create a new trigger for a specific message tag, along with a way
439 * to send a reply with data back to the sender. Triggers handle
440 * out-of-band messaging, and the handler itself will be called
441 * whenever a message is available. The handler itself accepts four
442 * arguments: the source of the message, the message tag (which will
443 * be the same as @p tag), the message data (of type @c Type), and a
444 * boolean flag that states whether the message was received
445 * out-of-band. The last will be @c true for out-of-band receives,
446 * or @c false for receives at the end of a synchronization
447 * step. The handler also returns a value, which will be routed back
448 * to the sender.
449 */
450 template<typename Type, typename Handler>
451 void trigger_with_reply(int tag, const Handler& handler);
452
453 template<typename Type, typename Handler>
454 void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0);
455
456
457
458 /**
459 * Poll for any out-of-band messages. This routine will check if any
460 * out-of-band messages are available. Those that are available will
461 * be handled immediately, if possible.
462 *
463 * @returns if an out-of-band message has been received, but we are
464 * unable to actually receive the message, a (source, tag) pair will
465 * be returned. Otherwise, returns an empty optional.
466 *
467 * @param wait When true, we should block until a message comes in.
468 *
469 * @param synchronizing whether we are currently synchronizing the
470 * process group
471 */
472 optional<std::pair<int, int> >
473 poll(bool wait = false, int block = -1, bool synchronizing = false) const;
474
475 /**
476 * Determines the context of the trigger currently executing. If
477 * multiple triggers are executing (recursively), then the context
478 * for the most deeply nested trigger will be returned. If no
479 * triggers are executing, returns @c trc_none. This might be used,
480 * for example, to determine whether a reply to a message should
481 * itself be sent out-of-band or whether it can go via the normal,
482 * slower communication route.
483 */
484 trigger_receive_context trigger_context() const;
485
486 /// INTERNAL ONLY
487 void receive_batch(process_id_type source, outgoing_messages& batch) const;
488
489 /// INTERNAL ONLY
490 ///
491 /// Determine the actual communicator and tag will be used for a
492 /// transmission with the given tag.
493 std::pair<boost::mpi::communicator, int>
494 actual_communicator_and_tag(int tag, int block) const;
495
496 /// set the size of the message buffer used for buffered oob sends
497
498 static void set_message_buffer_size(std::size_t s);
499
500 /// get the size of the message buffer used for buffered oob sends
501
502 static std::size_t message_buffer_size();
503 static int old_buffer_size;
504 static void* old_buffer;
505 private:
506
507 void install_trigger(int tag, int block,
508 shared_ptr<trigger_base> const& launcher);
509
510 void poll_requests(int block=-1) const;
511
512
513 // send a batch if the buffer is full now or would get full
514 void maybe_send_batch(process_id_type dest) const;
515
516 // actually send a batch
517 void send_batch(process_id_type dest, outgoing_messages& batch) const;
518 void send_batch(process_id_type dest) const;
519
520 void pack_headers() const;
521
522 /**
523 * Process a batch of incoming messages immediately.
524 *
525 * @param source the source of these messages
526 */
527 void process_batch(process_id_type source) const;
528 void receive_batch(boost::mpi::status& status) const;
529
530 //void free_finished_sends() const;
531
532 /// Status messages used internally by the process group
533 enum status_messages {
534 /// the first of the reserved message tags
535 msg_reserved_first = 126,
536 /// Sent from a processor when sending batched messages
537 msg_batch = 126,
538 /// Sent from a processor when sending large batched messages, larger than
539 /// the maximum buffer size for messages to be received by MPI_Irecv
540 msg_large_batch = 127,
541 /// Sent from a source processor to everyone else when that
542 /// processor has entered the synchronize() function.
543 msg_synchronizing = 128,
544 /// the last of the reserved message tags
545 msg_reserved_last = 128
546 };
547
548 /**
549 * Description of a block of tags associated to a particular
550 * distributed data structure. This structure will live as long as
551 * the distributed data structure is around, and will be used to
552 * help send messages to the data structure.
553 */
554 struct block_type
555 {
block_typeboost::graph::distributed::mpi_process_group::block_type556 block_type() { }
557
558 /// Handler for receive events
559 receiver_type on_receive;
560
561 /// Handler executed at the start of synchronization
562 on_synchronize_event_type on_synchronize;
563
564 /// Individual message triggers. Note: at present, this vector is
565 /// indexed by the (local) tag of the trigger. Any tags that
566 /// don't have triggers will have NULL pointers in that spot.
567 std::vector<shared_ptr<trigger_base> > triggers;
568 };
569
570 /**
571 * Data structure containing all of the blocks for the distributed
572 * data structures attached to a process group.
573 */
574 typedef std::vector<block_type*> blocks_type;
575
576 /// Iterator into @c blocks_type.
577 typedef blocks_type::iterator block_iterator;
578
579 /**
580 * Deleter used to deallocate a block when its distributed data
581 * structure is destroyed. This type will be used as the deleter for
582 * @c block_num.
583 */
584 struct deallocate_block;
585
586 static std::vector<char> message_buffer;
587
588 public:
589 /**
590 * Data associated with the process group and all of its attached
591 * distributed data structures.
592 */
593 shared_ptr<impl> impl_;
594
595 /**
596 * When non-null, indicates that this copy of the process group is
597 * associated with a particular distributed data structure. The
598 * integer value contains the block number (a value > 0) associated
599 * with that data structure. The deleter for this @c shared_ptr is a
600 * @c deallocate_block object that will deallocate the associated
601 * block in @c impl_->blocks.
602 */
603 shared_ptr<int> block_num;
604
605 /**
606 * Rank of this process, to avoid having to call rank() repeatedly.
607 */
608 int rank;
609
610 /**
611 * Number of processes in this process group, to avoid having to
612 * call communicator::size() repeatedly.
613 */
614 int size;
615 };
616
617
618
619 inline mpi_process_group::process_id_type
process_id(const mpi_process_group & pg)620 process_id(const mpi_process_group& pg)
621 { return pg.rank; }
622
623 inline mpi_process_group::process_size_type
num_processes(const mpi_process_group & pg)624 num_processes(const mpi_process_group& pg)
625 { return pg.size; }
626
627 mpi_process_group::communicator_type communicator(const mpi_process_group& pg);
628
629 template<typename T>
630 void
631 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
632 int tag, const T& value);
633
634 template<typename InputIterator>
635 void
636 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
637 int tag, InputIterator first, InputIterator last);
638
639 template<typename T>
640 inline void
send(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,T * first,T * last)641 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
642 int tag, T* first, T* last)
643 { send(pg, dest, tag, first, last - first); }
644
645 template<typename T>
646 inline void
send(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T * first,const T * last)647 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
648 int tag, const T* first, const T* last)
649 { send(pg, dest, tag, first, last - first); }
650
651 template<typename T>
652 mpi_process_group::process_id_type
653 receive(const mpi_process_group& pg, int tag, T& value);
654
655 template<typename T>
656 mpi_process_group::process_id_type
657 receive(const mpi_process_group& pg,
658 mpi_process_group::process_id_type source, int tag, T& value);
659
660 optional<std::pair<mpi_process_group::process_id_type, int> >
661 probe(const mpi_process_group& pg);
662
663 void synchronize(const mpi_process_group& pg);
664
665 template<typename T, typename BinaryOperation>
666 T*
667 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
668 BinaryOperation bin_op);
669
670 template<typename T, typename BinaryOperation>
671 T*
672 scan(const mpi_process_group& pg, T* first, T* last, T* out,
673 BinaryOperation bin_op);
674
675 template<typename InputIterator, typename T>
676 void
677 all_gather(const mpi_process_group& pg,
678 InputIterator first, InputIterator last, std::vector<T>& out);
679
680 template<typename InputIterator>
681 mpi_process_group
682 process_subgroup(const mpi_process_group& pg,
683 InputIterator first, InputIterator last);
684
685 template<typename T>
686 void
687 broadcast(const mpi_process_group& pg, T& val,
688 mpi_process_group::process_id_type root);
689
690
691 /*******************************************************************
692 * Out-of-band communication *
693 *******************************************************************/
694
695 template<typename T>
696 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
send_oob(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T & value,int block=-1)697 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
698 int tag, const T& value, int block=-1)
699 {
700 using boost::mpi::get_mpi_datatype;
701
702 // Determine the actual message tag we will use for the send, and which
703 // communicator we will use.
704 std::pair<boost::mpi::communicator, int> actual
705 = pg.actual_communicator_and_tag(tag, block);
706
707 #ifdef SEND_OOB_BSEND
708 if (mpi_process_group::message_buffer_size()) {
709 MPI_Bsend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
710 actual.second, actual.first);
711 return;
712 }
713 #endif
714 MPI_Request request;
715 MPI_Isend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
716 actual.second, actual.first, &request);
717
718 int done=0;
719 do {
720 pg.poll();
721 MPI_Test(&request,&done,MPI_STATUS_IGNORE);
722 } while (!done);
723 }
724
725 template<typename T>
726 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
send_oob(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T & value,int block=-1)727 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
728 int tag, const T& value, int block=-1)
729 {
730 using boost::mpi::packed_oarchive;
731
732 // Determine the actual message tag we will use for the send, and which
733 // communicator we will use.
734 std::pair<boost::mpi::communicator, int> actual
735 = pg.actual_communicator_and_tag(tag, block);
736
737 // Serialize the data into a buffer
738 packed_oarchive out(actual.first);
739 out << value;
740 std::size_t size = out.size();
741
742 // Send the actual message data
743 #ifdef SEND_OOB_BSEND
744 if (mpi_process_group::message_buffer_size()) {
745 MPI_Bsend(const_cast<void*>(out.address()), size, MPI_PACKED,
746 dest, actual.second, actual.first);
747 return;
748 }
749 #endif
750 MPI_Request request;
751 MPI_Isend(const_cast<void*>(out.address()), size, MPI_PACKED,
752 dest, actual.second, actual.first, &request);
753
754 int done=0;
755 do {
756 pg.poll();
757 MPI_Test(&request,&done,MPI_STATUS_IGNORE);
758 } while (!done);
759 }
760
761 template<typename T>
762 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
763 receive_oob(const mpi_process_group& pg,
764 mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
765
766 template<typename T>
767 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
768 receive_oob(const mpi_process_group& pg,
769 mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
770
771 template<typename SendT, typename ReplyT>
772 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
773 send_oob_with_reply(const mpi_process_group& pg,
774 mpi_process_group::process_id_type dest,
775 int tag, const SendT& send_value, ReplyT& reply_value,
776 int block = -1);
777
778 template<typename SendT, typename ReplyT>
779 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
780 send_oob_with_reply(const mpi_process_group& pg,
781 mpi_process_group::process_id_type dest,
782 int tag, const SendT& send_value, ReplyT& reply_value,
783 int block = -1);
784
785 } } } // end namespace boost::graph::distributed
786
787 BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header)
788 namespace boost { namespace mpi {
789 template<>
790 struct is_mpi_datatype<boost::graph::distributed::mpi_process_group::message_header> : mpl::true_ { };
791 } } // end namespace boost::mpi
792
793 namespace std {
794 /// optimized swap for outgoing messages
795 inline void
swap(boost::graph::distributed::mpi_process_group::outgoing_messages & x,boost::graph::distributed::mpi_process_group::outgoing_messages & y)796 swap(boost::graph::distributed::mpi_process_group::outgoing_messages& x,
797 boost::graph::distributed::mpi_process_group::outgoing_messages& y)
798 {
799 x.swap(y);
800 }
801
802
803 }
804
805 BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable)
806 BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never)
807
808 #include <boost/graph/distributed/detail/mpi_process_group.ipp>
809
810 #endif // BOOST_PARALLEL_MPI_MPI_PROCESS_GROUP_HPP
811