• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2004-2008 The Trustees of Indiana University.
2 // Copyright (C) 2007   Douglas Gregor
3 // Copyright (C) 2007  Matthias Troyer  <troyer@boost-consulting.com>
4 
5 // Use, modification and distribution is subject to the Boost Software
6 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
7 // http://www.boost.org/LICENSE_1_0.txt)
8 
9 //  Authors: Douglas Gregor
10 //           Matthias Troyer
11 //           Andrew Lumsdaine
12 #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
13 #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
14 
15 #ifndef BOOST_GRAPH_USE_MPI
16 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
17 #endif
18 
19 //#define NO_SPLIT_BATCHES
20 #define SEND_OOB_BSEND
21 
22 #include <boost/optional.hpp>
23 #include <boost/shared_ptr.hpp>
24 #include <boost/weak_ptr.hpp>
25 #include <utility>
26 #include <memory>
27 #include <boost/function/function1.hpp>
28 #include <boost/function/function2.hpp>
29 #include <boost/function/function0.hpp>
30 #include <boost/mpi.hpp>
31 #include <boost/property_map/parallel/process_group.hpp>
32 #include <boost/serialization/vector.hpp>
33 #include <boost/utility/enable_if.hpp>
34 
35 namespace boost { namespace graph { namespace distributed {
36 
37 // Process group tags
38 struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { };
39 
40 class mpi_process_group
41 {
42   struct impl;
43 
44  public:
45   /// Number of tags available to each data structure.
46   static const int max_tags = 256;
47 
48   /**
49    * The type of a "receive" handler, that will be provided with
50    * (source, tag) pairs when a message is received. Users can provide a
51    * receive handler for a distributed data structure, for example, to
52    * automatically pick up and respond to messages as needed.
53    */
54   typedef function<void(int source, int tag)> receiver_type;
55 
56   /**
57    * The type of a handler for the on-synchronize event, which will be
58    * executed at the beginning of synchronize().
59    */
60   typedef function0<void>      on_synchronize_event_type;
61 
62   /// Used as a tag to help create an "empty" process group.
63   struct create_empty {};
64 
65   /// The type used to buffer message data
66   typedef boost::mpi::packed_oprimitive::buffer_type buffer_type;
67 
68   /// The type used to identify a process
69   typedef int process_id_type;
70 
71   /// The type used to count the number of processes
72   typedef int process_size_type;
73 
74   /// The type of communicator used to transmit data via MPI
75   typedef boost::mpi::communicator communicator_type;
76 
77   /// Classification of the capabilities of this process group
78   struct communication_category
79     : virtual boost::parallel::bsp_process_group_tag,
80       virtual mpi_process_group_tag { };
81 
82   // TBD: We can eliminate the "source" field and possibly the
83   // "offset" field.
84   struct message_header {
85     /// The process that sent the message
86     process_id_type source;
87 
88     /// The message tag
89     int tag;
90 
91     /// The offset of the message into the buffer
92     std::size_t offset;
93 
94     /// The length of the message in the buffer, in bytes
95     std::size_t bytes;
96 
97     template <class Archive>
serializeboost::graph::distributed::mpi_process_group::message_header98     void serialize(Archive& ar, int)
99     {
100       ar & source & tag & offset & bytes;
101     }
102   };
103 
104   /**
105    * Stores the outgoing messages for a particular processor.
106    *
107    * @todo Evaluate whether we should use a deque instance, which
108    * would reduce could reduce the cost of "sending" messages but
109    * increases the time spent in the synchronization step.
110    */
111   struct outgoing_messages {
outgoing_messagesboost::graph::distributed::mpi_process_group::outgoing_messages112         outgoing_messages() {}
~outgoing_messagesboost::graph::distributed::mpi_process_group::outgoing_messages113         ~outgoing_messages() {}
114 
115     std::vector<message_header> headers;
116     buffer_type                 buffer;
117 
118     template <class Archive>
serializeboost::graph::distributed::mpi_process_group::outgoing_messages119     void serialize(Archive& ar, int)
120     {
121       ar & headers & buffer;
122     }
123 
swapboost::graph::distributed::mpi_process_group::outgoing_messages124     void swap(outgoing_messages& x)
125     {
126       headers.swap(x.headers);
127       buffer.swap(x.buffer);
128     }
129   };
130 
131 private:
132   /**
133    * Virtual base from which every trigger will be launched. See @c
134    * trigger_launcher for more information.
135    */
136   class trigger_base : boost::noncopyable
137   {
138   public:
trigger_base(int tag)139     explicit trigger_base(int tag) : tag_(tag) { }
140 
141     /// Retrieve the tag associated with this trigger
tag() const142     int tag() const { return tag_; }
143 
~trigger_base()144     virtual ~trigger_base() { }
145 
146     /**
147      * Invoked to receive a message that matches a particular trigger.
148      *
149      * @param source      the source of the message
150      * @param tag         the (local) tag of the message
151      * @param context     the context under which the trigger is being
152      *                    invoked
153      */
154     virtual void
155     receive(mpi_process_group const& pg, int source, int tag,
156             trigger_receive_context context, int block=-1) const = 0;
157 
158   protected:
159     // The message tag associated with this trigger
160     int tag_;
161   };
162 
163   /**
164    * Launches a specific handler in response to a trigger. This
165    * function object wraps up the handler function object and a buffer
166    * for incoming data.
167    */
168   template<typename Type, typename Handler>
169   class trigger_launcher : public trigger_base
170   {
171   public:
trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)172     explicit trigger_launcher(mpi_process_group& self, int tag,
173                               const Handler& handler)
174       : trigger_base(tag), self(self), handler(handler)
175       {}
176 
177     void
178     receive(mpi_process_group const& pg, int source, int tag,
179             trigger_receive_context context, int block=-1) const;
180 
181   private:
182     mpi_process_group& self;
183     mutable Handler handler;
184   };
185 
186   /**
187    * Launches a specific handler with a message reply in response to a
188    * trigger. This function object wraps up the handler function
189    * object and a buffer for incoming data.
190    */
191   template<typename Type, typename Handler>
192   class reply_trigger_launcher : public trigger_base
193   {
194   public:
reply_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)195     explicit reply_trigger_launcher(mpi_process_group& self, int tag,
196                                     const Handler& handler)
197       : trigger_base(tag), self(self), handler(handler)
198       {}
199 
200     void
201     receive(mpi_process_group const& pg, int source, int tag,
202             trigger_receive_context context, int block=-1) const;
203 
204   private:
205     mpi_process_group& self;
206     mutable Handler handler;
207   };
208 
209   template<typename Type, typename Handler>
210   class global_trigger_launcher : public trigger_base
211   {
212   public:
global_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)213     explicit global_trigger_launcher(mpi_process_group& self, int tag,
214                               const Handler& handler)
215       : trigger_base(tag), handler(handler)
216       {
217       }
218 
219     void
220     receive(mpi_process_group const& pg, int source, int tag,
221             trigger_receive_context context, int block=-1) const;
222 
223   private:
224     mutable Handler handler;
225     // TBD: do not forget to cancel any outstanding Irecv when deleted,
226     // if we decide to use Irecv
227   };
228 
229   template<typename Type, typename Handler>
230   class global_irecv_trigger_launcher : public trigger_base
231   {
232   public:
global_irecv_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler,int sz)233     explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag,
234                               const Handler& handler, int sz)
235       : trigger_base(tag), handler(handler), buffer_size(sz)
236       {
237         prepare_receive(self,tag);
238       }
239 
240     void
241     receive(mpi_process_group const& pg, int source, int tag,
242             trigger_receive_context context, int block=-1) const;
243 
244   private:
245     void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const;
246     Handler handler;
247     int buffer_size;
248     // TBD: do not forget to cancel any outstanding Irecv when deleted,
249     // if we decide to use Irecv
250   };
251 
252 public:
253   /**
254    * Construct a new BSP process group from an MPI communicator. The
255    * MPI communicator will be duplicated to create a new communicator
256    * for this process group to use.
257    */
258   mpi_process_group(communicator_type parent_comm = communicator_type());
259 
260   /**
261    * Construct a new BSP process group from an MPI communicator. The
262    * MPI communicator will be duplicated to create a new communicator
263    * for this process group to use. This constructor allows to tune the
264    * size of message batches.
265    *
266    *   @param num_headers The maximum number of headers in a message batch
267    *
268    *   @param buffer_size The maximum size of the message buffer in a batch.
269    *
270    */
271   mpi_process_group( std::size_t num_headers, std::size_t buffer_size,
272                      communicator_type parent_comm = communicator_type());
273 
274   /**
275    * Construct a copy of the BSP process group for a new distributed
276    * data structure. This data structure will synchronize with all
277    * other members of the process group's equivalence class (including
278    * @p other), but will have its own set of tags.
279    *
280    *   @param other The process group that this new process group will
281    *   be based on, using a different set of tags within the same
282    *   communication and synchronization space.
283    *
284    *   @param handler A message handler that will be passed (source,
285    *   tag) pairs for each message received by this data
286    *   structure. The handler is expected to receive the messages
287    *   immediately. The handler can be changed after-the-fact by
288    *   calling @c replace_handler.
289    *
290    *   @param out_of_band_receive An anachronism. TODO: remove this.
291    */
292   mpi_process_group(const mpi_process_group& other,
293                     const receiver_type& handler,
294                     bool out_of_band_receive = false);
295 
296   /**
297    * Construct a copy of the BSP process group for a new distributed
298    * data structure. This data structure will synchronize with all
299    * other members of the process group's equivalence class (including
300    * @p other), but will have its own set of tags.
301    */
302   mpi_process_group(const mpi_process_group& other,
303                     attach_distributed_object,
304                     bool out_of_band_receive = false);
305 
306   /**
307    * Create an "empty" process group, with no information. This is an
308    * internal routine that users should never need.
309    */
mpi_process_group(create_empty)310   explicit mpi_process_group(create_empty) {}
311 
312   /**
313    * Destroys this copy of the process group.
314    */
315   ~mpi_process_group();
316 
317   /**
318    * Replace the current message handler with a new message handler.
319    *
320    * @param handle The new message handler.
321    * @param out_of_band_receive An anachronism: remove this
322    */
323   void replace_handler(const receiver_type& handler,
324                        bool out_of_band_receive = false);
325 
326   /**
327    * Turns this process group into the process group for a new
328    * distributed data structure or object, allocating its own tag
329    * block.
330    */
331   void make_distributed_object();
332 
333   /**
334    * Replace the handler to be invoked at the beginning of synchronize.
335    */
336   void
337   replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0);
338 
339   /**
340    * Return the block number of the current data structure. A value of
341    * 0 indicates that this particular instance of the process group is
342    * not associated with any distributed data structure.
343    */
my_block_number() const344   int my_block_number() const { return block_num? *block_num : 0; }
345 
346   /**
347    * Encode a block number/tag pair into a single encoded tag for
348    * transmission.
349    */
encode_tag(int block_num,int tag) const350   int encode_tag(int block_num, int tag) const
351   { return block_num * max_tags + tag; }
352 
353   /**
354    * Decode an encoded tag into a block number/tag pair.
355    */
decode_tag(int encoded_tag) const356   std::pair<int, int> decode_tag(int encoded_tag) const
357   { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); }
358 
359   // @todo Actually write up the friend declarations so these could be
360   // private.
361 
362   // private:
363 
364   /** Allocate a block of tags for this instance. The block should not
365    * have been allocated already, e.g., my_block_number() ==
366    * 0. Returns the newly-allocated block number.
367    */
368   int allocate_block(bool out_of_band_receive = false);
369 
370   /** Potentially emit a receive event out of band. Returns true if an event
371    *  was actually sent, false otherwise.
372    */
373   bool maybe_emit_receive(int process, int encoded_tag) const;
374 
375   /** Emit a receive event. Returns true if an event was actually
376    * sent, false otherwise.
377    */
378   bool emit_receive(int process, int encoded_tag) const;
379 
380   /** Emit an on-synchronize event to all block handlers. */
381   void emit_on_synchronize() const;
382 
383   /** Retrieve a reference to the stored receiver in this block.  */
384   template<typename Receiver>
385   Receiver* get_receiver();
386 
387   template<typename T>
388   void
389   send_impl(int dest, int tag, const T& value,
390             mpl::true_ /*is_mpi_datatype*/) const;
391 
392   template<typename T>
393   void
394   send_impl(int dest, int tag, const T& value,
395             mpl::false_ /*is_mpi_datatype*/) const;
396 
397   template<typename T>
398   typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
399   array_send_impl(int dest, int tag, const T values[], std::size_t n) const;
400 
401   template<typename T>
402   bool
403   receive_impl(int source, int tag, T& value,
404                mpl::true_ /*is_mpi_datatype*/) const;
405 
406   template<typename T>
407   bool
408   receive_impl(int source, int tag, T& value,
409                mpl::false_ /*is_mpi_datatype*/) const;
410 
411   // Receive an array of values
412   template<typename T>
413   typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
414   array_receive_impl(int source, int tag, T* values, std::size_t& n) const;
415 
416   optional<std::pair<mpi_process_group::process_id_type, int> > probe() const;
417 
418   void synchronize() const;
419 
operator bool()420   operator bool() { return bool(impl_); }
421 
422   mpi_process_group base() const;
423 
424   /**
425    * Create a new trigger for a specific message tag. Triggers handle
426    * out-of-band messaging, and the handler itself will be called
427    * whenever a message is available. The handler itself accepts four
428    * arguments: the source of the message, the message tag (which will
429    * be the same as @p tag), the message data (of type @c Type), and a
430    * boolean flag that states whether the message was received
431    * out-of-band. The last will be @c true for out-of-band receives,
432    * or @c false for receives at the end of a synchronization step.
433    */
434   template<typename Type, typename Handler>
435   void trigger(int tag, const Handler& handler);
436 
437   /**
438    * Create a new trigger for a specific message tag, along with a way
439    * to send a reply with data back to the sender. Triggers handle
440    * out-of-band messaging, and the handler itself will be called
441    * whenever a message is available. The handler itself accepts four
442    * arguments: the source of the message, the message tag (which will
443    * be the same as @p tag), the message data (of type @c Type), and a
444    * boolean flag that states whether the message was received
445    * out-of-band. The last will be @c true for out-of-band receives,
446    * or @c false for receives at the end of a synchronization
447    * step. The handler also returns a value, which will be routed back
448    * to the sender.
449    */
450   template<typename Type, typename Handler>
451   void trigger_with_reply(int tag, const Handler& handler);
452 
453   template<typename Type, typename Handler>
454   void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0);
455 
456 
457 
458   /**
459    * Poll for any out-of-band messages. This routine will check if any
460    * out-of-band messages are available. Those that are available will
461    * be handled immediately, if possible.
462    *
463    * @returns if an out-of-band message has been received, but we are
464    * unable to actually receive the message, a (source, tag) pair will
465    * be returned. Otherwise, returns an empty optional.
466    *
467    * @param wait When true, we should block until a message comes in.
468    *
469    * @param synchronizing whether we are currently synchronizing the
470    *                      process group
471    */
472   optional<std::pair<int, int> >
473   poll(bool wait = false, int block = -1, bool synchronizing = false) const;
474 
475   /**
476    * Determines the context of the trigger currently executing. If
477    * multiple triggers are executing (recursively), then the context
478    * for the most deeply nested trigger will be returned. If no
479    * triggers are executing, returns @c trc_none. This might be used,
480    * for example, to determine whether a reply to a message should
481    * itself be sent out-of-band or whether it can go via the normal,
482    * slower communication route.
483    */
484   trigger_receive_context trigger_context() const;
485 
486   /// INTERNAL ONLY
487   void receive_batch(process_id_type source, outgoing_messages& batch) const;
488 
489   /// INTERNAL ONLY
490   ///
491   /// Determine the actual communicator and tag will be used for a
492   /// transmission with the given tag.
493   std::pair<boost::mpi::communicator, int>
494   actual_communicator_and_tag(int tag, int block) const;
495 
496   /// set the size of the message buffer used for buffered oob sends
497 
498   static void set_message_buffer_size(std::size_t s);
499 
500   /// get the size of the message buffer used for buffered oob sends
501 
502   static std::size_t message_buffer_size();
503   static int old_buffer_size;
504   static void* old_buffer;
505 private:
506 
507   void install_trigger(int tag, int block,
508       shared_ptr<trigger_base> const& launcher);
509 
510   void poll_requests(int block=-1) const;
511 
512 
513   // send a batch if the buffer is full now or would get full
514   void maybe_send_batch(process_id_type dest) const;
515 
516   // actually send a batch
517   void send_batch(process_id_type dest, outgoing_messages& batch) const;
518   void send_batch(process_id_type dest) const;
519 
520   void pack_headers() const;
521 
522   /**
523    * Process a batch of incoming messages immediately.
524    *
525    * @param source         the source of these messages
526    */
527   void process_batch(process_id_type source) const;
528   void receive_batch(boost::mpi::status& status) const;
529 
530   //void free_finished_sends() const;
531 
532   /// Status messages used internally by the process group
533   enum status_messages {
534     /// the first of the reserved message tags
535     msg_reserved_first = 126,
536     /// Sent from a processor when sending batched messages
537     msg_batch = 126,
538     /// Sent from a processor when sending large batched messages, larger than
539     /// the maximum buffer size for messages to be received by MPI_Irecv
540     msg_large_batch = 127,
541     /// Sent from a source processor to everyone else when that
542     /// processor has entered the synchronize() function.
543     msg_synchronizing = 128,
544     /// the last of the reserved message tags
545     msg_reserved_last = 128
546   };
547 
548   /**
549    * Description of a block of tags associated to a particular
550    * distributed data structure. This structure will live as long as
551    * the distributed data structure is around, and will be used to
552    * help send messages to the data structure.
553    */
554   struct block_type
555   {
block_typeboost::graph::distributed::mpi_process_group::block_type556     block_type() { }
557 
558     /// Handler for receive events
559     receiver_type     on_receive;
560 
561     /// Handler executed at the start of  synchronization
562     on_synchronize_event_type  on_synchronize;
563 
564     /// Individual message triggers. Note: at present, this vector is
565     /// indexed by the (local) tag of the trigger.  Any tags that
566     /// don't have triggers will have NULL pointers in that spot.
567     std::vector<shared_ptr<trigger_base> > triggers;
568   };
569 
570   /**
571    * Data structure containing all of the blocks for the distributed
572    * data structures attached to a process group.
573    */
574   typedef std::vector<block_type*> blocks_type;
575 
576   /// Iterator into @c blocks_type.
577   typedef blocks_type::iterator block_iterator;
578 
579   /**
580    * Deleter used to deallocate a block when its distributed data
581    * structure is destroyed. This type will be used as the deleter for
582    * @c block_num.
583    */
584   struct deallocate_block;
585 
586   static std::vector<char> message_buffer;
587 
588 public:
589   /**
590    * Data associated with the process group and all of its attached
591    * distributed data structures.
592    */
593   shared_ptr<impl> impl_;
594 
595   /**
596    * When non-null, indicates that this copy of the process group is
597    * associated with a particular distributed data structure. The
598    * integer value contains the block number (a value > 0) associated
599    * with that data structure. The deleter for this @c shared_ptr is a
600    * @c deallocate_block object that will deallocate the associated
601    * block in @c impl_->blocks.
602    */
603   shared_ptr<int>  block_num;
604 
605   /**
606    * Rank of this process, to avoid having to call rank() repeatedly.
607    */
608   int rank;
609 
610   /**
611    * Number of processes in this process group, to avoid having to
612    * call communicator::size() repeatedly.
613    */
614   int size;
615 };
616 
617 
618 
619 inline mpi_process_group::process_id_type
process_id(const mpi_process_group & pg)620 process_id(const mpi_process_group& pg)
621 { return pg.rank; }
622 
623 inline mpi_process_group::process_size_type
num_processes(const mpi_process_group & pg)624 num_processes(const mpi_process_group& pg)
625 { return pg.size; }
626 
627 mpi_process_group::communicator_type communicator(const mpi_process_group& pg);
628 
629 template<typename T>
630 void
631 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
632      int tag, const T& value);
633 
634 template<typename InputIterator>
635 void
636 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
637      int tag, InputIterator first, InputIterator last);
638 
639 template<typename T>
640 inline void
send(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,T * first,T * last)641 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
642      int tag, T* first, T* last)
643 { send(pg, dest, tag, first, last - first); }
644 
645 template<typename T>
646 inline void
send(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T * first,const T * last)647 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
648      int tag, const T* first, const T* last)
649 { send(pg, dest, tag, first, last - first); }
650 
651 template<typename T>
652 mpi_process_group::process_id_type
653 receive(const mpi_process_group& pg, int tag, T& value);
654 
655 template<typename T>
656 mpi_process_group::process_id_type
657 receive(const mpi_process_group& pg,
658         mpi_process_group::process_id_type source, int tag, T& value);
659 
660 optional<std::pair<mpi_process_group::process_id_type, int> >
661 probe(const mpi_process_group& pg);
662 
663 void synchronize(const mpi_process_group& pg);
664 
665 template<typename T, typename BinaryOperation>
666 T*
667 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
668            BinaryOperation bin_op);
669 
670 template<typename T, typename BinaryOperation>
671 T*
672 scan(const mpi_process_group& pg, T* first, T* last, T* out,
673            BinaryOperation bin_op);
674 
675 template<typename InputIterator, typename T>
676 void
677 all_gather(const mpi_process_group& pg,
678            InputIterator first, InputIterator last, std::vector<T>& out);
679 
680 template<typename InputIterator>
681 mpi_process_group
682 process_subgroup(const mpi_process_group& pg,
683                  InputIterator first, InputIterator last);
684 
685 template<typename T>
686 void
687 broadcast(const mpi_process_group& pg, T& val,
688           mpi_process_group::process_id_type root);
689 
690 
691 /*******************************************************************
692  * Out-of-band communication                                       *
693  *******************************************************************/
694 
695 template<typename T>
696 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
send_oob(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T & value,int block=-1)697 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
698          int tag, const T& value, int block=-1)
699 {
700   using boost::mpi::get_mpi_datatype;
701 
702   // Determine the actual message tag we will use for the send, and which
703   // communicator we will use.
704   std::pair<boost::mpi::communicator, int> actual
705     = pg.actual_communicator_and_tag(tag, block);
706 
707 #ifdef SEND_OOB_BSEND
708   if (mpi_process_group::message_buffer_size()) {
709     MPI_Bsend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
710               actual.second, actual.first);
711     return;
712   }
713 #endif
714   MPI_Request request;
715   MPI_Isend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
716             actual.second, actual.first, &request);
717 
718   int done=0;
719   do {
720     pg.poll();
721     MPI_Test(&request,&done,MPI_STATUS_IGNORE);
722   } while (!done);
723 }
724 
725 template<typename T>
726 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
send_oob(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T & value,int block=-1)727 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
728          int tag, const T& value, int block=-1)
729 {
730   using boost::mpi::packed_oarchive;
731 
732   // Determine the actual message tag we will use for the send, and which
733   // communicator we will use.
734   std::pair<boost::mpi::communicator, int> actual
735     = pg.actual_communicator_and_tag(tag, block);
736 
737   // Serialize the data into a buffer
738   packed_oarchive out(actual.first);
739   out << value;
740   std::size_t size = out.size();
741 
742   // Send the actual message data
743 #ifdef SEND_OOB_BSEND
744   if (mpi_process_group::message_buffer_size()) {
745     MPI_Bsend(const_cast<void*>(out.address()), size, MPI_PACKED,
746             dest, actual.second, actual.first);
747    return;
748   }
749 #endif
750   MPI_Request request;
751   MPI_Isend(const_cast<void*>(out.address()), size, MPI_PACKED,
752             dest, actual.second, actual.first, &request);
753 
754   int done=0;
755   do {
756     pg.poll();
757     MPI_Test(&request,&done,MPI_STATUS_IGNORE);
758   } while (!done);
759 }
760 
761 template<typename T>
762 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
763 receive_oob(const mpi_process_group& pg,
764             mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
765 
766 template<typename T>
767 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
768 receive_oob(const mpi_process_group& pg,
769             mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
770 
771 template<typename SendT, typename ReplyT>
772 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
773 send_oob_with_reply(const mpi_process_group& pg,
774                     mpi_process_group::process_id_type dest,
775                     int tag, const SendT& send_value, ReplyT& reply_value,
776                     int block = -1);
777 
778 template<typename SendT, typename ReplyT>
779 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
780 send_oob_with_reply(const mpi_process_group& pg,
781                     mpi_process_group::process_id_type dest,
782                     int tag, const SendT& send_value, ReplyT& reply_value,
783                     int block = -1);
784 
785 } } } // end namespace boost::graph::distributed
786 
787 BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header)
788 namespace boost { namespace mpi {
789     template<>
790     struct is_mpi_datatype<boost::graph::distributed::mpi_process_group::message_header> : mpl::true_ { };
791 } } // end namespace boost::mpi
792 
793 namespace std {
794 /// optimized swap for outgoing messages
795 inline void
swap(boost::graph::distributed::mpi_process_group::outgoing_messages & x,boost::graph::distributed::mpi_process_group::outgoing_messages & y)796 swap(boost::graph::distributed::mpi_process_group::outgoing_messages& x,
797      boost::graph::distributed::mpi_process_group::outgoing_messages& y)
798 {
799   x.swap(y);
800 }
801 
802 
803 }
804 
805 BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable)
806 BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never)
807 
808 #include <boost/graph/distributed/detail/mpi_process_group.ipp>
809 
810 #endif // BOOST_PARALLEL_MPI_MPI_PROCESS_GROUP_HPP
811