• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2006 Douglas Gregor <doug.gregor@gmail.com>
2 
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 // An example using Boost.MPI's split() operation on communicators to
8 // create separate data-generating processes and data-collecting
9 // processes.
10 #include <boost/mpi.hpp>
11 #include <iostream>
12 #include <cstdlib>
13 #include <boost/serialization/vector.hpp>
14 namespace mpi = boost::mpi;
15 
16 enum message_tags { msg_data_packet, msg_broadcast_data, msg_finished };
17 
generate_data(mpi::communicator local,mpi::communicator world)18 void generate_data(mpi::communicator local, mpi::communicator world)
19 {
20   using std::srand;
21   using std::rand;
22 
23   // The rank of the collector within the world communicator
24   int master_collector = local.size();
25 
26   srand(time(0) + world.rank());
27 
28   // Send out several blocks of random data to the collectors.
29   int num_data_blocks = rand() % 3 + 1;
30   for (int block = 0; block < num_data_blocks; ++block) {
31     // Generate some random data
32     int num_samples = rand() % 1000;
33     std::vector<int> data;
34     for (int i = 0; i < num_samples; ++i) {
35       data.push_back(rand());
36     }
37 
38     // Send our data to the master collector process.
39     std::cout << "Generator #" << local.rank() << " sends some data..."
40               << std::endl;
41     world.send(master_collector, msg_data_packet, data);
42   }
43 
44   // Wait for all of the generators to complete
45   (local.barrier)();
46 
47   // The first generator will send the message to the master collector
48   // indicating that we're done.
49   if (local.rank() == 0)
50     world.send(master_collector, msg_finished);
51 }
52 
collect_data(mpi::communicator local,mpi::communicator world)53 void collect_data(mpi::communicator local, mpi::communicator world)
54 {
55   // The rank of the collector within the world communicator
56   int master_collector = world.size() - local.size();
57 
58   if (world.rank() == master_collector) {
59     while (true) {
60       // Wait for a message
61       mpi::status msg = world.probe();
62       if (msg.tag() == msg_data_packet) {
63         // Receive the packet of data
64         std::vector<int> data;
65         world.recv(msg.source(), msg.tag(), data);
66 
67         // Tell each of the collectors that we'll be broadcasting some data
68         for (int dest = 1; dest < local.size(); ++dest)
69           local.send(dest, msg_broadcast_data, msg.source());
70 
71         // Broadcast the actual data.
72         broadcast(local, data, 0);
73       } else if (msg.tag() == msg_finished) {
74         // Receive the message
75         world.recv(msg.source(), msg.tag());
76 
77         // Tell each of the collectors that we're finished
78         for (int dest = 1; dest < local.size(); ++dest)
79           local.send(dest, msg_finished);
80 
81         break;
82       }
83     }
84   } else {
85     while (true) {
86       // Wait for a message from the master collector
87       mpi::status msg = local.probe();
88       if (msg.tag() == msg_broadcast_data) {
89         // Receive the broadcast message
90         int originator;
91         local.recv(msg.source(), msg.tag(), originator);
92 
93         // Receive the data broadcasted from the master collector
94         std::vector<int> data;
95         broadcast(local, data, 0);
96 
97         std::cout << "Collector #" << local.rank()
98                   << " is processing data from generator #" << originator
99                   << "." << std::endl;
100       } else if (msg.tag() == msg_finished) {
101         // Receive the message
102         local.recv(msg.source(), msg.tag());
103 
104         break;
105       }
106     }
107   }
108 }
109 
main(int argc,char * argv[])110 int main(int argc, char* argv[])
111 {
112   mpi::environment env(argc, argv);
113   mpi::communicator world;
114 
115   if (world.size() < 3) {
116     if (world.rank() == 0) {
117       std::cerr << "Error: this example requires at least 3 processes."
118                 << std::endl;
119     }
120     env.abort(-1);
121   }
122 
123   bool is_generator = world.rank() < 2 * world.size() / 3;
124   mpi::communicator local = world.split(is_generator? 0 : 1);
125   if (is_generator) generate_data(local, world);
126   else collect_data(local, world);
127 
128   return 0;
129 }
130