1 // Copyright (C) 2006 Douglas Gregor <doug.gregor@gmail.com>
2
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 // An example using Boost.MPI's split() operation on communicators to
8 // create separate data-generating processes and data-collecting
9 // processes.
10 #include <boost/mpi.hpp>
11 #include <iostream>
12 #include <cstdlib>
13 #include <boost/serialization/vector.hpp>
14 namespace mpi = boost::mpi;
15
16 enum message_tags { msg_data_packet, msg_broadcast_data, msg_finished };
17
generate_data(mpi::communicator local,mpi::communicator world)18 void generate_data(mpi::communicator local, mpi::communicator world)
19 {
20 using std::srand;
21 using std::rand;
22
23 // The rank of the collector within the world communicator
24 int master_collector = local.size();
25
26 srand(time(0) + world.rank());
27
28 // Send out several blocks of random data to the collectors.
29 int num_data_blocks = rand() % 3 + 1;
30 for (int block = 0; block < num_data_blocks; ++block) {
31 // Generate some random data
32 int num_samples = rand() % 1000;
33 std::vector<int> data;
34 for (int i = 0; i < num_samples; ++i) {
35 data.push_back(rand());
36 }
37
38 // Send our data to the master collector process.
39 std::cout << "Generator #" << local.rank() << " sends some data..."
40 << std::endl;
41 world.send(master_collector, msg_data_packet, data);
42 }
43
44 // Wait for all of the generators to complete
45 (local.barrier)();
46
47 // The first generator will send the message to the master collector
48 // indicating that we're done.
49 if (local.rank() == 0)
50 world.send(master_collector, msg_finished);
51 }
52
collect_data(mpi::communicator local,mpi::communicator world)53 void collect_data(mpi::communicator local, mpi::communicator world)
54 {
55 // The rank of the collector within the world communicator
56 int master_collector = world.size() - local.size();
57
58 if (world.rank() == master_collector) {
59 while (true) {
60 // Wait for a message
61 mpi::status msg = world.probe();
62 if (msg.tag() == msg_data_packet) {
63 // Receive the packet of data
64 std::vector<int> data;
65 world.recv(msg.source(), msg.tag(), data);
66
67 // Tell each of the collectors that we'll be broadcasting some data
68 for (int dest = 1; dest < local.size(); ++dest)
69 local.send(dest, msg_broadcast_data, msg.source());
70
71 // Broadcast the actual data.
72 broadcast(local, data, 0);
73 } else if (msg.tag() == msg_finished) {
74 // Receive the message
75 world.recv(msg.source(), msg.tag());
76
77 // Tell each of the collectors that we're finished
78 for (int dest = 1; dest < local.size(); ++dest)
79 local.send(dest, msg_finished);
80
81 break;
82 }
83 }
84 } else {
85 while (true) {
86 // Wait for a message from the master collector
87 mpi::status msg = local.probe();
88 if (msg.tag() == msg_broadcast_data) {
89 // Receive the broadcast message
90 int originator;
91 local.recv(msg.source(), msg.tag(), originator);
92
93 // Receive the data broadcasted from the master collector
94 std::vector<int> data;
95 broadcast(local, data, 0);
96
97 std::cout << "Collector #" << local.rank()
98 << " is processing data from generator #" << originator
99 << "." << std::endl;
100 } else if (msg.tag() == msg_finished) {
101 // Receive the message
102 local.recv(msg.source(), msg.tag());
103
104 break;
105 }
106 }
107 }
108 }
109
main(int argc,char * argv[])110 int main(int argc, char* argv[])
111 {
112 mpi::environment env(argc, argv);
113 mpi::communicator world;
114
115 if (world.size() < 3) {
116 if (world.rank() == 0) {
117 std::cerr << "Error: this example requires at least 3 processes."
118 << std::endl;
119 }
120 env.abort(-1);
121 }
122
123 bool is_generator = world.rank() < 2 * world.size() / 3;
124 mpi::communicator local = world.split(is_generator? 0 : 1);
125 if (is_generator) generate_data(local, world);
126 else collect_data(local, world);
127
128 return 0;
129 }
130