1 // Copyright (C) 2006 Douglas Gregor <doug.gregor@gmail.com>
2
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 // An example using Boost.MPI's split() operation on communicators to
8 // create separate data-generating processes and data-collecting
9 // processes using boost::optional for broadcasting.
10 #include <boost/mpi.hpp>
11 #include <iostream>
12 #include <cstdlib>
13 #include <boost/serialization/vector.hpp>
14 #include <boost/serialization/optional.hpp>
15 namespace mpi = boost::mpi;
16
17 enum message_tags { msg_data_packet, msg_finished };
18
generate_data(mpi::communicator local,mpi::communicator world)19 void generate_data(mpi::communicator local, mpi::communicator world)
20 {
21 using std::srand;
22 using std::rand;
23
24 // The rank of the collector within the world communicator
25 int master_collector = local.size();
26
27 srand(time(0) + world.rank());
28
29 // Send out several blocks of random data to the collectors.
30 int num_data_blocks = rand() % 3 + 1;
31 for (int block = 0; block < num_data_blocks; ++block) {
32 // Generate some random dataa
33 int num_samples = rand() % 1000;
34 std::vector<int> data;
35 for (int i = 0; i < num_samples; ++i) {
36 data.push_back(rand());
37 }
38
39 // Send our data to the master collector process.
40 std::cout << "Generator #" << local.rank() << " sends some data..."
41 << std::endl;
42 world.send(master_collector, msg_data_packet, data);
43 }
44
45 // Wait for all of the generators to complete
46 (local.barrier)();
47
48 // The first generator will send the message to the master collector
49 // indicating that we're done.
50 if (local.rank() == 0)
51 world.send(master_collector, msg_finished);
52 }
53
collect_data(mpi::communicator local,mpi::communicator world)54 void collect_data(mpi::communicator local, mpi::communicator world)
55 {
56 // The rank of the collector within the world communicator
57 int master_collector = world.size() - local.size();
58
59 if (world.rank() == master_collector) {
60 while (true) {
61 // Wait for a message
62 mpi::status msg = world.probe();
63 if (msg.tag() == msg_data_packet) {
64 // Receive the packet of data into a boost::optional
65 boost::optional<std::vector<int> > data;
66 data = std::vector<int>();
67 world.recv(msg.source(), msg.source(), *data);
68
69 // Broadcast the actual data.
70 broadcast(local, data, 0);
71 } else if (msg.tag() == msg_finished) {
72 // Receive the message
73 world.recv(msg.source(), msg.tag());
74
75 // Broadcast to each collector to tell them we've finished.
76 boost::optional<std::vector<int> > data;
77 broadcast(local, data, 0);
78 break;
79 }
80 }
81 } else {
82 boost::optional<std::vector<int> > data;
83 do {
84 // Wait for a broadcast from the master collector
85 broadcast(local, data, 0);
86 if (data) {
87 std::cout << "Collector #" << local.rank()
88 << " is processing data." << std::endl;
89 }
90 } while (data);
91 }
92 }
93
main(int argc,char * argv[])94 int main(int argc, char* argv[])
95 {
96 mpi::environment env(argc, argv);
97 mpi::communicator world;
98
99 if (world.size() < 4) {
100 if (world.rank() == 0) {
101 std::cerr << "Error: this example requires at least 4 processes."
102 << std::endl;
103 }
104 env.abort(-1);
105 }
106
107 bool is_generator = world.rank() < 2 * world.size() / 3;
108 mpi::communicator local = world.split(is_generator? 0 : 1);
109 if (is_generator) generate_data(local, world);
110 else collect_data(local, world);
111
112 return 0;
113 }
114