• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2006 Douglas Gregor <doug.gregor@gmail.com>
2 
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 // An example using Boost.MPI's split() operation on communicators to
8 // create separate data-generating processes and data-collecting
9 // processes using boost::optional for broadcasting.
10 #include <boost/mpi.hpp>
11 #include <iostream>
12 #include <cstdlib>
13 #include <boost/serialization/vector.hpp>
14 #include <boost/serialization/optional.hpp>
15 namespace mpi = boost::mpi;
16 
17 enum message_tags { msg_data_packet, msg_finished };
18 
generate_data(mpi::communicator local,mpi::communicator world)19 void generate_data(mpi::communicator local, mpi::communicator world)
20 {
21   using std::srand;
22   using std::rand;
23 
24   // The rank of the collector within the world communicator
25   int master_collector = local.size();
26 
27   srand(time(0) + world.rank());
28 
29   // Send out several blocks of random data to the collectors.
30   int num_data_blocks = rand() % 3 + 1;
31   for (int block = 0; block < num_data_blocks; ++block) {
32     // Generate some random dataa
33     int num_samples = rand() % 1000;
34     std::vector<int> data;
35     for (int i = 0; i < num_samples; ++i) {
36       data.push_back(rand());
37     }
38 
39     // Send our data to the master collector process.
40     std::cout << "Generator #" << local.rank() << " sends some data..."
41               << std::endl;
42     world.send(master_collector, msg_data_packet, data);
43   }
44 
45   // Wait for all of the generators to complete
46   (local.barrier)();
47 
48   // The first generator will send the message to the master collector
49   // indicating that we're done.
50   if (local.rank() == 0)
51     world.send(master_collector, msg_finished);
52 }
53 
collect_data(mpi::communicator local,mpi::communicator world)54 void collect_data(mpi::communicator local, mpi::communicator world)
55 {
56   // The rank of the collector within the world communicator
57   int master_collector = world.size() - local.size();
58 
59   if (world.rank() == master_collector) {
60     while (true) {
61       // Wait for a message
62       mpi::status msg = world.probe();
63       if (msg.tag() == msg_data_packet) {
64         // Receive the packet of data into a boost::optional
65         boost::optional<std::vector<int> > data;
66         data = std::vector<int>();
67         world.recv(msg.source(), msg.source(), *data);
68 
69         // Broadcast the actual data.
70         broadcast(local, data, 0);
71       } else if (msg.tag() == msg_finished) {
72         // Receive the message
73         world.recv(msg.source(), msg.tag());
74 
75         // Broadcast to each collector to tell them we've finished.
76         boost::optional<std::vector<int> > data;
77         broadcast(local, data, 0);
78         break;
79       }
80     }
81   } else {
82     boost::optional<std::vector<int> > data;
83     do {
84       // Wait for a broadcast from the master collector
85       broadcast(local, data, 0);
86       if (data) {
87         std::cout << "Collector #" << local.rank()
88                   << " is processing data." << std::endl;
89       }
90     } while (data);
91   }
92 }
93 
main(int argc,char * argv[])94 int main(int argc, char* argv[])
95 {
96   mpi::environment env(argc, argv);
97   mpi::communicator world;
98 
99   if (world.size() < 4) {
100     if (world.rank() == 0) {
101       std::cerr << "Error: this example requires at least 4 processes."
102                 << std::endl;
103     }
104     env.abort(-1);
105   }
106 
107   bool is_generator = world.rank() < 2 * world.size() / 3;
108   mpi::communicator local = world.split(is_generator? 0 : 1);
109   if (is_generator) generate_data(local, world);
110   else collect_data(local, world);
111 
112   return 0;
113 }
114