• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2004-2006 The Trustees of Indiana University.
2 // Copyright (C) 2007  Douglas Gregor  <doug.gregor@gmail.com>
3 // Copyright (C) 2007  Matthias Troyer  <troyer@boost-consulting.com>
4 
5 // Use, modification and distribution is subject to the Boost Software
6 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
7 // http://www.boost.org/LICENSE_1_0.txt)
8 
9 //  Authors: Douglas Gregor
10 //           Andrew Lumsdaine
11 //           Matthias Troyer
12 #include <boost/assert.hpp>
13 #include <boost/graph/use_mpi.hpp>
14 #include <boost/graph/distributed/mpi_process_group.hpp>
15 #include <boost/serialization/vector.hpp>
16 #include <boost/mpi/environment.hpp>
17 #include <boost/lexical_cast.hpp>
18 #include <memory>
19 #include <algorithm>
20 
21 //#define DEBUG 1
22 
23 
24 //#define MAX_BATCHES 1500
25 #define PREALLOCATE_BATCHES 250
26 // 500 is a better setting for PREALLOCATE_BATCHES if you're not using process
27 // subgroups and are building 64-bit binaries.  250 allows all the CTest
28 // tests to pass in both 32 and 64-bit modes.  If you create multiple process
29 // groups with PREALLOCATE_BATCHES at a reasonable level in 32-bit mode you
30 // _will_ run out of memory and get "malloc failed" errors
31 
32 //#define NO_ISEND_BATCHES
33 //#define NO_IMMEDIATE_PROCESSING
34 //#define NO_SPLIT_BATCHES
35 #define IRECV_BATCH
36 
37 // we cannot keep track of how many we received if we do not process them
38 #ifdef NO_IMMEDIATE_PROCESSING
39 #undef IRECV_BATCH
40 #endif
41 
42 #ifdef DEBUG
43 #  include <iostream>
44 #endif // DEBUG
45 
46 namespace boost { namespace graph { namespace distributed {
47 
48 struct mpi_process_group::deallocate_block
49 {
deallocate_blockboost::graph::distributed::mpi_process_group::deallocate_block50   explicit deallocate_block(blocks_type* blocks) : blocks(blocks) { }
51 
operator ()boost::graph::distributed::mpi_process_group::deallocate_block52   void operator()(int* block_num)
53   {
54     block_type* block = (*blocks)[*block_num];
55 
56     // Mark this block as inactive
57     (*blocks)[*block_num] = 0;
58 
59 #ifdef DEBUG
60     fprintf(stderr, "Processor %i deallocated block #%i\n",
61             boost::mpi::communicator().rank(), *block_num);
62 #endif
63 
64     // Delete the block and its block number
65     delete block_num;
66     delete block;
67   }
68 
69 private:
70   blocks_type* blocks;
71 };
72 
incoming_messages()73 mpi_process_group::impl::incoming_messages::incoming_messages()
74 {
75   next_header.push_back(headers.begin());
76 }
77 
impl(std::size_t num_headers,std::size_t buffer_sz,communicator_type parent_comm)78 mpi_process_group::impl::impl(std::size_t num_headers, std::size_t buffer_sz,
79                               communicator_type parent_comm)
80   : comm(parent_comm, boost::mpi::comm_duplicate),
81     oob_reply_comm(parent_comm, boost::mpi::comm_duplicate),
82     allocated_tags(boost::mpi::environment::max_tag())
83 {
84   max_sent=0;
85   max_received=0;
86   int n = comm.size();
87   outgoing.resize(n);
88   incoming.resize(n);
89 
90   // no synchronization stage means -1
91   // to keep the convention
92   synchronizing_stage.resize(n,-1);
93   number_sent_batches.resize(n);
94   number_received_batches.resize(n);
95   trigger_context = trc_none;
96   processing_batches = 0;
97 
98   // Allocator a placeholder block "0"
99   blocks.push_back(new block_type);
100 
101   synchronizing = false;
102 
103   set_batch_size(num_headers,buffer_sz);
104 
105   for (int i = 0; i < n; ++i) {
106     incoming[i].next_header.front() = incoming[i].headers.end();
107     outgoing[i].buffer.reserve(batch_message_size);
108   }
109 
110 #ifdef PREALLOCATE_BATCHES
111   batch_pool.resize(PREALLOCATE_BATCHES);
112   for (std::size_t i = 0 ; i < batch_pool.size(); ++i) {
113     batch_pool[i].buffer.reserve(batch_message_size);
114     batch_pool[i].request=MPI_REQUEST_NULL;
115     free_batches.push(i);
116   }
117 #endif
118 }
119 
set_batch_size(std::size_t header_num,std::size_t buffer_sz)120 void mpi_process_group::impl::set_batch_size(std::size_t header_num, std::size_t buffer_sz)
121 {
122   batch_header_number = header_num;
123   batch_buffer_size = buffer_sz;
124 
125   // determine batch message size by serializing the largest possible batch
126   outgoing_messages msg;
127   msg.headers.resize(batch_header_number);
128   msg.buffer.resize(batch_buffer_size);
129   boost::mpi::packed_oarchive oa(comm);
130   oa << const_cast<const outgoing_messages&>(msg);
131   batch_message_size = oa.size();
132 }
133 
134 
~impl()135 mpi_process_group::impl::~impl()
136 {
137   // Delete the placeholder "0" block
138   delete blocks.front();
139   if (!boost::mpi::environment::finalized())
140   for (std::vector<MPI_Request>::iterator it=requests.begin();
141        it != requests.end();++it)
142     MPI_Cancel(&(*it));
143 }
144 
145 namespace detail {
146 // global batch handlers
handle_batch(mpi_process_group const & self,int source,int,mpi_process_group::outgoing_messages & batch,bool out_of_band)147 void handle_batch (mpi_process_group const& self, int source, int,
148     mpi_process_group::outgoing_messages& batch,bool out_of_band)
149 {
150 #ifdef DEBUG
151   std::cerr << "Processing batch trigger\n";
152   std::cerr << "BATCH: " << process_id(self) << " <- " << source << " ("
153             << batch.headers.size() << " headers, "
154             << batch.buffer.size() << " bytes)"
155             << std::endl;
156 #endif
157   // If we are not synchronizing, then this must be an early receive
158   trigger_receive_context old_context = self.impl_->trigger_context;
159   if (self.impl_->trigger_context != trc_in_synchronization)
160     self.impl_->trigger_context = trc_early_receive;
161 
162   // Receive the batched messages
163   self.receive_batch(source,batch);
164 
165   // Restore the previous context
166   self.impl_->trigger_context = old_context;
167 }
168 
169 // synchronization handler
handle_sync(mpi_process_group const & self,int source,int tag,int val,bool)170 void handle_sync (mpi_process_group const& self, int source, int tag, int val,
171                   bool)
172 {
173   // increment the stage for the source
174   std::size_t stage = static_cast<std::size_t>(
175     ++self.impl_->synchronizing_stage[source]);
176 
177   BOOST_ASSERT(source != process_id(self));
178 
179 #ifdef DEBUG
180   std::ostringstream out;
181   out << process_id(self) << ": handle_sync from " << source
182       << " (stage = " << self.impl_->synchronizing_stage[source] << ")\n";
183   std::cerr << out.str();
184 #endif
185 
186   // record how many still have messages to be sent
187   if (self.impl_->synchronizing_unfinished.size()<=stage) {
188     BOOST_ASSERT(self.impl_->synchronizing_unfinished.size() == stage);
189     self.impl_->synchronizing_unfinished.push_back(val >= 0 ? 1 : 0);
190   }
191   else
192     self.impl_->synchronizing_unfinished[stage]+=(val >= 0 ? 1 : 0);
193 
194   // record how many are in that stage
195   if (self.impl_->processors_synchronizing_stage.size()<=stage) {
196     BOOST_ASSERT(self.impl_->processors_synchronizing_stage.size() == stage);
197     self.impl_->processors_synchronizing_stage.push_back(1);
198   }
199   else
200     ++self.impl_->processors_synchronizing_stage[stage];
201 
202   // subtract how many batches we were supposed to receive
203   if (val>0)
204     self.impl_->number_received_batches[source] -= val;
205 }
206 
207 
208 }
209 
mpi_process_group(communicator_type parent_comm)210 mpi_process_group::mpi_process_group(communicator_type parent_comm)
211 {
212   // 64K messages and 1MB buffer turned out to be a reasonable choice
213   impl_.reset(new impl(64*1024,1024*1024,parent_comm));
214 #ifndef IRECV_BATCH
215   global_trigger<outgoing_messages>(msg_batch,&detail::handle_batch);
216 #else // use irecv version by providing a maximum buffer size
217   global_trigger<outgoing_messages>(msg_batch,&detail::handle_batch,
218          impl_->batch_message_size);
219 #endif
220   global_trigger<outgoing_messages>(msg_large_batch,&detail::handle_batch);
221   global_trigger<int>(msg_synchronizing,&detail::handle_sync);
222   rank = impl_->comm.rank();
223   size = impl_->comm.size();
224 
225 #ifdef SEND_OOB_BSEND
226   // let us try with a default bufferr size of 16 MB
227   if (!message_buffer_size())
228     set_message_buffer_size(16*1024*1024);
229 #endif
230 }
231 
mpi_process_group(std::size_t h,std::size_t sz,communicator_type parent_comm)232 mpi_process_group::mpi_process_group(std::size_t h, std::size_t sz,
233                                      communicator_type parent_comm)
234 {
235   impl_.reset(new impl(h,sz,parent_comm));
236 #ifndef IRECV_BATCH
237   global_trigger<outgoing_messages>(msg_batch,&detail::handle_batch);
238 #else // use irecv version by providing a maximum buffer size
239   global_trigger<outgoing_messages>(msg_batch,&detail::handle_batch,
240          impl_->batch_message_size);
241 #endif
242   global_trigger<outgoing_messages>(msg_large_batch,&detail::handle_batch);
243   global_trigger<int>(msg_synchronizing,&detail::handle_sync);
244   rank = impl_->comm.rank();
245   size = impl_->comm.size();
246 #ifdef SEND_OOB_BSEND
247   // let us try with a default bufferr size of 16 MB
248   if (!message_buffer_size())
249     set_message_buffer_size(16*1024*1024);
250 #endif
251 }
252 
mpi_process_group(const mpi_process_group & other,const receiver_type & handler,bool)253 mpi_process_group::mpi_process_group(const mpi_process_group& other,
254                                      const receiver_type& handler, bool)
255   : impl_(other.impl_)
256 {
257   rank = impl_->comm.rank();
258   size = impl_->comm.size();
259   replace_handler(handler);
260 }
261 
mpi_process_group(const mpi_process_group & other,attach_distributed_object,bool)262 mpi_process_group::mpi_process_group(const mpi_process_group& other,
263                                      attach_distributed_object, bool)
264   : impl_(other.impl_)
265 {
266   rank = impl_->comm.rank();
267   size = impl_->comm.size();
268   allocate_block();
269 
270   for (std::size_t i = 0; i < impl_->incoming.size(); ++i) {
271     if (my_block_number() >= (int)impl_->incoming[i].next_header.size()) {
272       impl_->incoming[i].next_header
273         .push_back(impl_->incoming[i].headers.begin());
274     } else {
275       impl_->incoming[i].next_header[my_block_number()] =
276         impl_->incoming[i].headers.begin();
277     }
278 
279 #ifdef DEBUG
280     if (process_id(*this) == 0) {
281       std::cerr << "Allocated tag block " << my_block_number() << std::endl;
282     }
283 #endif
284   }
285 }
286 
~mpi_process_group()287 mpi_process_group::~mpi_process_group()  {
288   /*
289   std::string msg = boost::lexical_cast<std::string>(process_id(*this)) + " " +
290             boost::lexical_cast<std::string>(impl_->max_received) + " " +
291             boost::lexical_cast<std::string>(impl_->max_sent) + "\n";
292      std::cerr << msg << "\n";
293      */
294 }
295 
296 
communicator(const mpi_process_group & pg)297 mpi_process_group::communicator_type communicator(const mpi_process_group& pg)
298 { return pg.impl_->comm; }
299 
300 void
replace_handler(const receiver_type & handler,bool out_of_band_receive)301 mpi_process_group::replace_handler(const receiver_type& handler,
302                                    bool out_of_band_receive)
303 {
304   make_distributed_object();
305 
306   // Attach the receive handler
307   impl_->blocks[my_block_number()]->on_receive = handler;
308 }
309 
310 void
make_distributed_object()311 mpi_process_group::make_distributed_object()
312 {
313   if (my_block_number() == 0) {
314     allocate_block();
315 
316     for (std::size_t i = 0; i < impl_->incoming.size(); ++i) {
317       if (my_block_number() >= (int)impl_->incoming[i].next_header.size()) {
318         impl_->incoming[i].next_header
319           .push_back(impl_->incoming[i].headers.begin());
320       } else {
321         impl_->incoming[i].next_header[my_block_number()] =
322           impl_->incoming[i].headers.begin();
323       }
324 
325 #ifdef DEBUG
326       if (process_id(*this) == 0) {
327         std::cerr << "Allocated tag block " << my_block_number() << std::endl;
328       }
329 #endif
330     }
331   } else {
332     // Clear out the existing triggers
333     std::vector<shared_ptr<trigger_base> >()
334       .swap(impl_->blocks[my_block_number()]->triggers);
335   }
336 
337   // Clear out the receive handler
338   impl_->blocks[my_block_number()]->on_receive = 0;
339 }
340 
341 void
342 mpi_process_group::
replace_on_synchronize_handler(const on_synchronize_event_type & handler)343 replace_on_synchronize_handler(const on_synchronize_event_type& handler)
344 {
345   if (my_block_number() > 0)
346     impl_->blocks[my_block_number()]->on_synchronize = handler;
347 }
348 
allocate_block(bool out_of_band_receive)349 int mpi_process_group::allocate_block(bool out_of_band_receive)
350 {
351   BOOST_ASSERT(!block_num);
352   block_iterator i = impl_->blocks.begin();
353   while (i != impl_->blocks.end() && *i) ++i;
354 
355   if (i == impl_->blocks.end()) {
356     impl_->blocks.push_back(new block_type());
357     i = impl_->blocks.end() - 1;
358   } else {
359     *i = new block_type();
360   }
361 
362   block_num.reset(new int(i - impl_->blocks.begin()),
363                   deallocate_block(&impl_->blocks));
364 
365 #ifdef DEBUG
366   fprintf(stderr,
367           "Processor %i allocated block #%i\n", process_id(*this), *block_num);
368 #endif
369 
370   return *block_num;
371 }
372 
maybe_emit_receive(int process,int encoded_tag) const373 bool mpi_process_group::maybe_emit_receive(int process, int encoded_tag) const
374 {
375   std::pair<int, int> decoded = decode_tag(encoded_tag);
376 
377   BOOST_ASSERT (decoded.first < static_cast<int>(impl_->blocks.size()));
378 
379   block_type* block = impl_->blocks[decoded.first];
380   if (!block) {
381     std::cerr << "Received message from process " << process << " with tag "
382               << decoded.second << " for non-active block "
383               << decoded.first << std::endl;
384     std::cerr << "Active blocks are: ";
385     for (std::size_t i = 0; i < impl_->blocks.size(); ++i)
386       if (impl_->blocks[i])
387         std::cerr << i << ' ';
388     std::cerr << std::endl;
389     BOOST_ASSERT(block);
390   }
391 
392   if (decoded.second < static_cast<int>(block->triggers.size())
393       && block->triggers[decoded.second]) {
394     // We have a trigger for this message; use it
395     trigger_receive_context old_context = impl_->trigger_context;
396     impl_->trigger_context = trc_out_of_band;
397     block->triggers[decoded.second]->receive(*this, process, decoded.second,
398                                              impl_->trigger_context,
399                                              decoded.first);
400     impl_->trigger_context = old_context;
401   }
402   else
403     return false;
404   // We receives the message above
405   return true;
406 }
407 
emit_receive(int process,int encoded_tag) const408 bool mpi_process_group::emit_receive(int process, int encoded_tag) const
409 {
410   std::pair<int, int> decoded = decode_tag(encoded_tag);
411 
412   if (decoded.first >= static_cast<int>(impl_->blocks.size()))
413     // This must be an out-of-band message destined for
414     // send_oob_with_reply; ignore it.
415     return false;
416 
417   // Find the block that will receive this message
418   block_type* block = impl_->blocks[decoded.first];
419   BOOST_ASSERT(block);
420   if (decoded.second < static_cast<int>(block->triggers.size())
421       && block->triggers[decoded.second])
422     // We have a trigger for this message; use it
423     block->triggers[decoded.second]->receive(*this,process, decoded.second,
424                                              impl_->trigger_context);
425   else if (block->on_receive)
426     // Fall back to the normal message handler
427     block->on_receive(process, decoded.second);
428   else
429     // There was no handler for this message
430     return false;
431 
432   // The message was handled above
433   return true;
434 }
435 
emit_on_synchronize() const436 void mpi_process_group::emit_on_synchronize() const
437 {
438   for (block_iterator i = impl_->blocks.begin(); i != impl_->blocks.end(); ++i)
439     if (*i && (*i)->on_synchronize) (*i)->on_synchronize();
440 }
441 
442 
443 optional<std::pair<mpi_process_group::process_id_type, int> >
probe() const444 mpi_process_group::probe() const
445 {
446 #ifdef DEBUG
447   std::cerr << "PROBE: " << process_id(*this) << ", tag block = "
448             << my_block_number() << std::endl;
449 #endif
450 
451   typedef std::pair<process_id_type, int> result_type;
452 
453   int tag_block = my_block_number();
454 
455   for (std::size_t source = 0; source < impl_->incoming.size(); ++source) {
456     impl::incoming_messages& incoming = impl_->incoming[source];
457     std::vector<impl::message_header>::iterator& i =
458       incoming.next_header[tag_block];
459     std::vector<impl::message_header>::iterator end =  incoming.headers.end();
460 
461     while (i != end) {
462       if (i->tag != -1 && decode_tag(i->tag).first == my_block_number()) {
463 #ifdef DEBUG
464         std::cerr << "PROBE: " << process_id(*this) << " <- " << source
465                   << ", block = " << my_block_number() << ", tag = "
466                   << decode_tag(i->tag).second << ", bytes = " << i->bytes
467                   << std::endl;
468 #endif
469         return result_type(source, decode_tag(i->tag).second);
470       }
471       ++i;
472     }
473   }
474   return optional<result_type>();
475 }
476 
477 void
maybe_send_batch(process_id_type dest) const478 mpi_process_group::maybe_send_batch(process_id_type dest) const
479 {
480 #ifndef NO_SPLIT_BATCHES
481   impl::outgoing_messages& outgoing = impl_->outgoing[dest];
482   if (outgoing.buffer.size() >= impl_->batch_buffer_size ||
483       outgoing.headers.size() >= impl_->batch_header_number) {
484     // we are full and need to send
485     outgoing_messages batch;
486     batch.buffer.reserve(impl_->batch_buffer_size);
487     batch.swap(outgoing);
488     if (batch.buffer.size() >= impl_->batch_buffer_size
489          && batch.headers.size()>1 ) {
490       // we are too large, keep the last message in the outgoing buffer
491       std::copy(batch.buffer.begin()+batch.headers.back().offset,
492                 batch.buffer.end(),std::back_inserter(outgoing.buffer));
493       batch.buffer.resize(batch.headers.back().offset);
494       outgoing.headers.push_back(batch.headers.back());
495       batch.headers.pop_back();
496       outgoing.headers.front().offset=0;
497     }
498     send_batch(dest,batch);
499   }
500 #endif
501 }
502 
503 void
send_batch(process_id_type dest) const504 mpi_process_group::send_batch(process_id_type dest) const
505 {
506   impl::outgoing_messages& outgoing = impl_->outgoing[dest];
507   if (outgoing.headers.size()) {
508     // need to copy to avoid race conditions
509     outgoing_messages batch;
510     batch.buffer.reserve(impl_->batch_buffer_size);
511     batch.swap(outgoing);
512     send_batch(dest,batch);
513   }
514 }
515 
516 
517 void
send_batch(process_id_type dest,outgoing_messages & outgoing) const518 mpi_process_group::send_batch(process_id_type dest,
519                               outgoing_messages& outgoing) const
520 {
521   impl_->free_sent_batches();
522   process_id_type id = process_id(*this);
523 
524   // clear the batch
525 #ifdef DEBUG
526   std::cerr << "Sending batch: " << id << " -> "  << dest << std::endl;
527 #endif
528   // we increment the number of batches sent
529   ++impl_->number_sent_batches[dest];
530   // and send the batch
531   BOOST_ASSERT(outgoing.headers.size() <= impl_->batch_header_number);
532   if (id != dest) {
533 #ifdef NO_ISEND_BATCHES
534     impl::batch_request req;
535 #else
536 #ifdef PREALLOCATE_BATCHES
537     while (impl_->free_batches.empty()) {
538       impl_->free_sent_batches();
539       poll();
540     }
541     impl::batch_request& req = impl_->batch_pool[impl_->free_batches.top()];
542     impl_->free_batches.pop();
543 #else
544     impl_->sent_batches.push_back(impl::batch_request());
545     impl::batch_request& req = impl_->sent_batches.back();
546 #endif
547 #endif
548     boost::mpi::packed_oarchive oa(impl_->comm,req.buffer);
549     oa << outgoing;
550 
551     int tag = msg_batch;
552 
553 #ifdef IRECV_BATCH
554     if (oa.size() > impl_->batch_message_size)
555       tag = msg_large_batch;
556 #endif
557 
558 #ifndef NDEBUG // Prevent uninitialized variable warning with NDEBUG is on
559     int result =
560 #endif // !NDEBUG
561       MPI_Isend(const_cast<void*>(oa.address()), oa.size(),
562                 MPI_PACKED, dest, tag, impl_->comm,
563                 &req.request);
564     BOOST_ASSERT(result == MPI_SUCCESS);
565     impl_->max_sent = (std::max)(impl_->max_sent,impl_->sent_batches.size());
566 #ifdef NO_ISEND_BATCHES
567     int done=0;
568     do {
569         poll();
570         MPI_Test(&req.request,&done,MPI_STATUS_IGNORE);
571        } while (!done);
572 #else
573 #ifdef MAX_BATCHES
574     while (impl_->sent_batches.size() >= MAX_BATCHES-1) {
575       impl_->free_sent_batches();
576       poll();
577     }
578 #endif
579 #endif
580   }
581   else
582     receive_batch(id,outgoing);
583 }
584 
process_batch(int source) const585 void mpi_process_group::process_batch(int source) const
586 {
587   bool processing_from_queue = !impl_->new_batches.empty();
588   impl_->processing_batches++;
589   typedef std::vector<impl::message_header>::iterator iterator;
590 
591   impl::incoming_messages& incoming = impl_->incoming[source];
592 
593   // Set up the iterators pointing to the next header in each block
594   for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
595         incoming.next_header[i] = incoming.headers.begin();
596 
597   buffer_type remaining_buffer;
598   std::vector<impl::message_header> remaining_headers;
599 
600   iterator end = incoming.headers.end();
601 
602   for (iterator i = incoming.headers.begin(); i != end; ++i) {
603     // This this message has already been received, skip it
604     if (i->tag == -1)
605       continue;
606 
607 #ifdef BATCH_DEBUG
608     std::cerr << process_id(*this) << ": emit_receive(" << source << ", "
609               << decode_tag(i->tag).first << ":" << decode_tag(i->tag).second
610               << ")\n";
611 #endif
612 
613     if (!emit_receive(source, i->tag)) {
614 #ifdef BATCH_DEBUG
615       std::cerr << process_id(*this) << ": keeping message # "
616             << remaining_headers.size() << " from " << source << " ("
617             << decode_tag(i->tag).first << ":"
618             << decode_tag(i->tag).second << ", "
619             << i->bytes << " bytes)\n";
620 #endif
621       // Hold on to this message until the next stage
622       remaining_headers.push_back(*i);
623       remaining_headers.back().offset = remaining_buffer.size();
624       remaining_buffer.insert(remaining_buffer.end(),
625                   &incoming.buffer[i->offset],
626                   &incoming.buffer[i->offset] + i->bytes);
627     }
628   }
629 
630   // Swap the remaining messages into the "incoming" set.
631   incoming.headers.swap(remaining_headers);
632   incoming.buffer.swap(remaining_buffer);
633 
634   // Set up the iterators pointing to the next header in each block
635   for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
636     incoming.next_header[i] = incoming.headers.begin();
637   impl_->processing_batches--;
638 
639   if (!processing_from_queue)
640     while (!impl_->new_batches.empty()) {
641       receive_batch(impl_->new_batches.front().first,
642                     impl_->new_batches.front().second);
643       impl_->new_batches.pop();
644     }
645 }
646 
647 
receive_batch(process_id_type source,outgoing_messages & new_messages) const648 void mpi_process_group::receive_batch(process_id_type source,
649   outgoing_messages& new_messages) const
650 {
651   impl_->free_sent_batches();
652   if(!impl_->processing_batches) {
653     // increase the number of received batches
654     ++impl_->number_received_batches[source];
655     // and receive the batch
656     impl::incoming_messages& incoming = impl_->incoming[source];
657     typedef std::vector<impl::message_header>::iterator iterator;
658     iterator end = new_messages.headers.end();
659     for (iterator i = new_messages.headers.begin(); i != end; ++i) {
660       incoming.headers.push_back(*i);
661       incoming.headers.back().offset = incoming.buffer.size();
662       incoming.buffer.insert(incoming.buffer.end(),
663                   &new_messages.buffer[i->offset],
664                   &new_messages.buffer[i->offset] + i->bytes);
665     }
666     // Set up the iterators pointing to the next header in each block
667     for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
668       incoming.next_header[i] = incoming.headers.begin();
669 #ifndef NO_IMMEDIATE_PROCESSING
670     process_batch(source);
671 #endif
672   }
673   else {
674   #ifdef DEBUG
675     std::cerr << "Pushing incoming message batch onto queue since we are already processing a batch.\n";
676   #endif
677     // use swap to avoid copying
678     impl_->new_batches.push(std::make_pair(int(source),outgoing_messages()));
679     impl_->new_batches.back().second.swap(new_messages);
680     impl_->max_received = (std::max)(impl_->max_received,impl_->new_batches.size());
681   }
682 }
683 
684 
pack_headers() const685 void mpi_process_group::pack_headers() const
686 {
687   for (process_id_type other = 0; other < num_processes(*this); ++other) {
688     typedef std::vector<impl::message_header>::iterator iterator;
689 
690     impl::incoming_messages& incoming = impl_->incoming[other];
691 
692     buffer_type remaining_buffer;
693     std::vector<impl::message_header> remaining_headers;
694 
695     iterator end = incoming.headers.end();
696     for (iterator i = incoming.headers.begin(); i != end; ++i) {
697       if (i->tag == -1)
698         continue;
699 
700       // Hold on to this message until the next stage
701       remaining_headers.push_back(*i);
702       remaining_headers.back().offset = remaining_buffer.size();
703       remaining_buffer.insert(remaining_buffer.end(),
704                               &incoming.buffer[i->offset],
705                               &incoming.buffer[i->offset] + i->bytes);
706     }
707 
708     // Swap the remaining messages into the "incoming" set.
709     incoming.headers.swap(remaining_headers);
710     incoming.buffer.swap(remaining_buffer);
711 
712     // Set up the iterators pointing to the next header in each block
713     for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
714       incoming.next_header[i] = incoming.headers.begin();
715   }
716 }
717 
receive_batch(boost::mpi::status & status) const718 void mpi_process_group::receive_batch(boost::mpi::status& status) const
719 {
720   //std::cerr << "Handling batch\n";
721   outgoing_messages batch;
722   //impl_->comm.recv(status.source(),status.tag(),batch);
723 
724   //receive_oob(*this,status.source(),msg_batch,batch);
725 
726   // Determine how big the receive buffer should be
727 #if BOOST_VERSION >= 103600
728   int size = status.count<boost::mpi::packed>().get();
729 #else
730   int size;
731   MPI_Status mpi_status(status);
732   MPI_Get_count(&mpi_status, MPI_PACKED, &size);
733 #endif
734 
735   // Allocate the receive buffer
736   boost::mpi::packed_iarchive in(impl_->comm,size);
737 
738   // Receive the message data
739   MPI_Recv(in.address(), size, MPI_PACKED,
740            status.source(), status.tag(),
741            impl_->comm, MPI_STATUS_IGNORE);
742 
743   // Unpack the message data
744   in >> batch;
745   receive_batch(status.source(),batch);
746 }
747 
748 std::pair<boost::mpi::communicator, int>
actual_communicator_and_tag(int tag,int block) const749 mpi_process_group::actual_communicator_and_tag(int tag, int block) const
750 {
751   if (tag >= max_tags * static_cast<int>(impl_->blocks.size()))
752     // Use the out-of-band reply communicator
753     return std::make_pair(impl_->oob_reply_comm, tag);
754   else
755     // Use the normal communicator and translate the tag
756     return std::make_pair(impl_->comm,
757                           encode_tag(block == -1? my_block_number() : block,
758                                      tag));
759 }
760 
761 
synchronize() const762 void mpi_process_group::synchronize() const
763 {
764   // Don't synchronize if we've already finished
765   if (boost::mpi::environment::finalized())
766     return;
767 
768 #ifdef DEBUG
769   std::cerr << "SYNC: " << process_id(*this) << std::endl;
770 #endif
771 
772   emit_on_synchronize();
773 
774   process_id_type id = process_id(*this);     // Our rank
775   process_size_type p = num_processes(*this); // The number of processes
776 
777   // Pack the remaining incoming messages into the beginning of the
778   // buffers, so that we can receive new messages in this
779   // synchronization step without losing those messages that have not
780   // yet been received.
781   pack_headers();
782 
783   impl_->synchronizing_stage[id] = -1;
784   int stage=-1;
785   bool no_new_messages = false;
786   while (true) {
787       ++stage;
788 #ifdef DEBUG
789       std::cerr << "SYNC: " << id << " starting stage " << (stage+1) << ".\n";
790 #endif
791 
792       // Tell everyone that we are synchronizing. Note: we use MPI_Isend since
793       // we absolutely cannot have any of these operations blocking.
794 
795       // increment the stage for the source
796        ++impl_->synchronizing_stage[id];
797        if (impl_->synchronizing_stage[id] != stage)
798          std::cerr << "Expected stage " << stage << ", got " << impl_->synchronizing_stage[id] << std::endl;
799        BOOST_ASSERT(impl_->synchronizing_stage[id]==stage);
800       // record how many still have messages to be sent
801       if (static_cast<int>(impl_->synchronizing_unfinished.size())<=stage) {
802         BOOST_ASSERT(static_cast<int>(impl_->synchronizing_unfinished.size()) == stage);
803         impl_->synchronizing_unfinished.push_back(no_new_messages ? 0 : 1);
804       }
805       else
806         impl_->synchronizing_unfinished[stage]+=(no_new_messages ? 0 : 1);
807 
808       // record how many are in that stage
809       if (static_cast<int>(impl_->processors_synchronizing_stage.size())<=stage) {
810         BOOST_ASSERT(static_cast<int>(impl_->processors_synchronizing_stage.size()) == stage);
811         impl_->processors_synchronizing_stage.push_back(1);
812       }
813       else
814         ++impl_->processors_synchronizing_stage[stage];
815 
816       impl_->synchronizing = true;
817 
818       for (int dest = 0; dest < p; ++dest) {
819         int sync_message = no_new_messages ? -1 : impl_->number_sent_batches[dest];
820         if (dest != id) {
821           impl_->number_sent_batches[dest]=0;
822           MPI_Request request;
823           MPI_Isend(&sync_message, 1, MPI_INT, dest, msg_synchronizing, impl_->comm,&request);
824           int done=0;
825           do {
826             poll();
827             MPI_Test(&request,&done,MPI_STATUS_IGNORE);
828           } while (!done);
829         }
830         else { // need to subtract how many messages I should have received
831           impl_->number_received_batches[id] -=impl_->number_sent_batches[id];
832           impl_->number_sent_batches[id]=0;
833         }
834       }
835 
836       // Keep handling out-of-band messages until everyone has gotten
837       // to this point.
838       while (impl_->processors_synchronizing_stage[stage] <p) {
839         // with the trigger based solution we cannot easily pass true here
840         poll(/*wait=*/false, -1, true);
841 
842       }
843 
844       // check that everyone is at least here
845       for (int source=0; source<p ; ++source)
846         BOOST_ASSERT(impl_->synchronizing_stage[source] >= stage);
847 
848       // receive any batches sent in the meantime
849       // all have to be available already
850       while (true) {
851         bool done=true;
852         for (int source=0; source<p ; ++source)
853           if(impl_->number_received_batches[source] < 0)
854             done = false;
855         if (done)
856           break;
857         poll(false,-1,true);
858       }
859 
860 #ifndef NO_IMMEDIATE_PROCESSING
861       for (int source=0; source<p ; ++source)
862         BOOST_ASSERT(impl_->number_received_batches[source] >= 0);
863 #endif
864 
865       impl_->synchronizing = false;
866 
867       // Flush out remaining messages
868       if (impl_->synchronizing_unfinished[stage]==0)
869         break;
870 #ifdef NO_IMMEDIATE_PROCESSING
871       for (process_id_type dest = 0; dest < p; ++dest)
872         process_batch(dest);
873 #endif
874 
875       no_new_messages = true;
876       for (process_id_type dest = 0; dest < p; ++dest) {
877         if (impl_->outgoing[dest].headers.size() ||
878             impl_->number_sent_batches[dest]>0)
879           no_new_messages = false;
880         send_batch(dest);
881       }
882     }
883 
884   impl_->comm.barrier/*nomacro*/();
885 #if 0
886   // set up for next synchronize call
887   for (int source=0; source<p; ++source) {
888     if (impl_->synchronizing_stage[source] != stage) {
889       std::cerr << id << ": expecting stage " << stage << " from source "
890                 << source << ", got " << impl_->synchronizing_stage[source]
891                 << std::endl;
892     }
893     BOOST_ASSERT(impl_->synchronizing_stage[source]==stage);
894   }
895 #endif
896   std::fill(impl_->synchronizing_stage.begin(),
897             impl_->synchronizing_stage.end(), -1);
898 
899   // get rid of the information regarding recorded numbers of processors
900   // for the stages we just finished
901   impl_->processors_synchronizing_stage.clear();
902   impl_->synchronizing_unfinished.clear();
903 
904   for (process_id_type dest = 0; dest < p; ++dest)
905     BOOST_ASSERT (impl_->outgoing[dest].headers.empty());
906 #ifndef NO_IMMEDIATE_PROCESSING
907       for (int source=0; source<p ; ++source)
908         BOOST_ASSERT (impl_->number_received_batches[source] == 0);
909 #endif
910 
911   impl_->free_sent_batches();
912 #ifdef DEBUG
913   std::cerr << "SYNC: " << process_id(*this) << " completed." << std::endl;
914 #endif
915 }
916 
917 optional<std::pair<mpi_process_group::process_id_type, int> >
probe(const mpi_process_group & pg)918 probe(const mpi_process_group& pg)
919 { return pg.probe(); }
920 
poll_requests(int block) const921 void mpi_process_group::poll_requests(int block) const
922 {
923   int size = impl_->requests.size();
924   if (size==0)
925     return;
926   std::vector<MPI_Status> statuses(size);
927   std::vector<int> indices(size);
928 
929   while (true) {
930     MPI_Testsome(impl_->requests.size(),&impl_->requests[0],
931        &size,&indices[0],&statuses[0]);
932     if (size==0)
933       return; // no message waiting
934 
935     // remove handled requests before we get the chance to be recursively called
936     if (size) {
937       std::vector<MPI_Request> active_requests;
938       std::size_t i=0;
939       int j=0;
940       for (;i< impl_->requests.size() && j< size; ++i) {
941         if (int(i)==indices[j])
942           // release the dealt-with request
943           ++j;
944         else // copy and keep the request
945           active_requests.push_back(impl_->requests[i]);
946       }
947       while (i < impl_->requests.size())
948         active_requests.push_back(impl_->requests[i++]);
949       impl_->requests.swap(active_requests);
950     }
951 
952     optional<std::pair<int, int> > result;
953     for (int i=0;i < size; ++i) {
954       std::pair<int, int> decoded = decode_tag(statuses[i].MPI_TAG);
955       block_type* block = impl_->blocks[decoded.first];
956 
957       BOOST_ASSERT (decoded.second < static_cast<int>(block->triggers.size()) && block->triggers[decoded.second]);
958         // We have a trigger for this message; use it
959       trigger_receive_context old_context = impl_->trigger_context;
960       impl_->trigger_context = trc_irecv_out_of_band;
961       block->triggers[decoded.second]->receive(*this, statuses[i].MPI_SOURCE,
962             decoded.second, impl_->trigger_context, decoded.first);
963       impl_->trigger_context = old_context;
964     }
965   }
966 }
967 
968 
969 optional<std::pair<int, int> >
970 mpi_process_group::
poll(bool wait,int block,bool synchronizing) const971 poll(bool wait, int block, bool synchronizing) const
972 {
973   // Set the new trigger context for these receive operations
974   trigger_receive_context old_context = impl_->trigger_context;
975   if (synchronizing)
976     impl_->trigger_context = trc_in_synchronization;
977   else
978     impl_->trigger_context = trc_out_of_band;
979 
980   //wait = false;
981   optional<boost::mpi::status> status;
982   bool finished=false;
983   optional<std::pair<int, int> > result;
984   do {
985     poll_requests(block);
986     // Check for any messages not yet received.
987 #ifdef PBGL_PROCESS_GROUP_NO_IRECV
988     if (wait)
989       status = impl_->comm.probe();
990     else
991 #endif
992        status = impl_->comm.iprobe();
993 
994     if (status) { // we have a message
995       // Decode the message
996       std::pair<int, int> decoded = decode_tag(status.get().tag());
997       if (maybe_emit_receive(status.get().source(), status.get().tag()))
998         // We received the message out-of-band; poll again
999         finished = false;
1000       else if (decoded.first == (block == -1 ? my_block_number() : block)) {
1001         // This message is for us, but not through a trigger. Return
1002         // the decoded message.
1003         result = std::make_pair(status.get().source(), decoded.second);
1004       // otherwise we didn't match any message we know how to deal with, so
1005       // pretend no message exists.
1006         finished = true;
1007       }
1008     }
1009     else
1010       // We don't have a message to process; we're done.
1011       finished=!wait;
1012   } while (!finished);
1013 
1014   // Restore the prior trigger context
1015   impl_->trigger_context = old_context;
1016   poll_requests(block);
1017   return result;
1018 }
1019 
synchronize(const mpi_process_group & pg)1020 void synchronize(const mpi_process_group& pg) { pg.synchronize(); }
1021 
base() const1022 mpi_process_group mpi_process_group::base() const
1023 {
1024   mpi_process_group copy(*this);
1025   copy.block_num.reset();
1026   return copy;
1027 }
1028 
1029 
free_sent_batches()1030 void mpi_process_group::impl::free_sent_batches()
1031 {
1032   typedef std::list<batch_request>::iterator iterator;
1033   iterator it = sent_batches.begin();
1034   int flag;
1035   int result;
1036   while(it != sent_batches.end()) {
1037     result = MPI_Test(&it->request,&flag,MPI_STATUS_IGNORE);
1038     BOOST_ASSERT(result == MPI_SUCCESS);
1039     iterator next=it;
1040     ++next;
1041     if (flag)
1042       sent_batches.erase(it);
1043     it=next;
1044   }
1045 #ifdef PREALLOCATE_BATCHES
1046   for (std::size_t i=0; i< batch_pool.size();++i) {
1047     if(batch_pool[i].request != MPI_REQUEST_NULL) {
1048       result = MPI_Test(&batch_pool[i].request,&flag,MPI_STATUS_IGNORE);
1049       BOOST_ASSERT(result == MPI_SUCCESS);
1050       if (flag) {
1051         free_batches.push(i);
1052         batch_pool[i].request = MPI_REQUEST_NULL;
1053         batch_pool[i].buffer.resize(0);
1054       }
1055     }
1056   }
1057 #endif
1058 }
1059 
1060 void
install_trigger(int tag,int block,shared_ptr<trigger_base> const & launcher)1061 mpi_process_group::install_trigger(int tag, int block,
1062       shared_ptr<trigger_base> const& launcher)
1063 {
1064   block_type* my_block = impl_->blocks[block];
1065   BOOST_ASSERT(my_block);
1066 
1067   // Make sure we have enough space in the structure for this trigger.
1068   if (launcher->tag() >= static_cast<int>(my_block->triggers.size()))
1069     my_block->triggers.resize(launcher->tag() + 1);
1070 
1071   // If someone already put a trigger in this spot, we have a big
1072   // problem.
1073   if (my_block->triggers[launcher->tag()]) {
1074     std::cerr << "Block " << my_block_number()
1075               << " already has a trigger for tag " << launcher->tag()
1076               << std::endl;
1077   }
1078   BOOST_ASSERT(!my_block->triggers[launcher->tag()]);
1079 
1080   // Attach a new trigger launcher
1081   my_block->triggers[launcher->tag()] = launcher;
1082 }
1083 
message_buffer_size()1084 std::size_t mpi_process_group::message_buffer_size()
1085 {
1086   return message_buffer.size();
1087 }
1088 
1089 
set_message_buffer_size(std::size_t s)1090 void mpi_process_group::set_message_buffer_size(std::size_t s)
1091 {
1092   int sz;
1093   void* ptr;
1094   if (!message_buffer.empty()) {
1095     MPI_Buffer_detach(&ptr,&sz);
1096     BOOST_ASSERT(ptr == &message_buffer.front());
1097     BOOST_ASSERT(static_cast<std::size_t>(sz)  == message_buffer.size());
1098   }
1099   else if (old_buffer != 0)
1100     MPI_Buffer_detach(&old_buffer,&old_buffer_size);
1101   message_buffer.resize(s);
1102   if (s)
1103     MPI_Buffer_attach(&message_buffer.front(), message_buffer.size());
1104   else if (old_buffer_size)
1105     MPI_Buffer_attach(old_buffer, old_buffer_size);
1106 }
1107 
1108 
1109 std::vector<char> mpi_process_group::message_buffer;
1110 int mpi_process_group::old_buffer_size=0;
1111 void* mpi_process_group::old_buffer=0;
1112 
1113 } } } // end namespace boost::graph::distributed
1114