1 // Copyright (C) 2005-2006 The Trustees of Indiana University.
2
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 // Authors: Douglas Gregor
8 // Andrew Lumsdaine
9
10 //
11 // Implements redistribution of vertices for a distributed adjacency
12 // list. This file should not be included by users. It will be
13 // included by the distributed adjacency list header.
14 //
15
16 #ifndef BOOST_GRAPH_USE_MPI
17 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
18 #endif
19
20 #include <boost/pending/container_traits.hpp>
21
22 namespace boost { namespace detail { namespace parallel {
23
24 /* This structure contains a (vertex or edge) descriptor that is being
25 moved from one processor to another. It contains the properties for
26 that descriptor (if any).
27 */
28 template<typename Descriptor, typename DescriptorProperty>
29 struct redistributed_descriptor : maybe_store_property<DescriptorProperty>
30 {
31 typedef maybe_store_property<DescriptorProperty> inherited;
32
redistributed_descriptorboost::detail::parallel::redistributed_descriptor33 redistributed_descriptor() { }
34
redistributed_descriptorboost::detail::parallel::redistributed_descriptor35 redistributed_descriptor(const Descriptor& v, const DescriptorProperty& p)
36 : inherited(p), descriptor(v) { }
37
38 Descriptor descriptor;
39
40 private:
41 friend class boost::serialization::access;
42
43 template<typename Archiver>
serializeboost::detail::parallel::redistributed_descriptor44 void serialize(Archiver& ar, unsigned int /*version*/)
45 {
46 ar & boost::serialization::base_object<inherited>(*this)
47 & unsafe_serialize(descriptor);
48 }
49 };
50
51 /* Predicate that returns true if the target has migrated. */
52 template<typename VertexProcessorMap, typename Graph>
53 struct target_migrated_t
54 {
55 typedef typename graph_traits<Graph>::vertex_descriptor Vertex;
56 typedef typename graph_traits<Graph>::edge_descriptor Edge;
57
target_migrated_tboost::detail::parallel::target_migrated_t58 target_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g)
59 : vertex_to_processor(vertex_to_processor), g(g) { }
60
operator ()boost::detail::parallel::target_migrated_t61 bool operator()(Edge e) const
62 {
63 typedef global_descriptor<Vertex> DVertex;
64 processor_id_type owner = get(edge_target_processor_id, g, e);
65 return get(vertex_to_processor, DVertex(owner, target(e, g))) != owner;
66 }
67
68 private:
69 VertexProcessorMap vertex_to_processor;
70 const Graph& g;
71 };
72
73 template<typename VertexProcessorMap, typename Graph>
74 inline target_migrated_t<VertexProcessorMap, Graph>
target_migrated(VertexProcessorMap vertex_to_processor,const Graph & g)75 target_migrated(VertexProcessorMap vertex_to_processor, const Graph& g)
76 { return target_migrated_t<VertexProcessorMap, Graph>(vertex_to_processor, g); }
77
78 /* Predicate that returns true if the source of an in-edge has migrated. */
79 template<typename VertexProcessorMap, typename Graph>
80 struct source_migrated_t
81 {
82 typedef typename graph_traits<Graph>::vertex_descriptor Vertex;
83 typedef typename graph_traits<Graph>::edge_descriptor Edge;
84
source_migrated_tboost::detail::parallel::source_migrated_t85 source_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g)
86 : vertex_to_processor(vertex_to_processor), g(g) { }
87
operator ()boost::detail::parallel::source_migrated_t88 bool operator()(stored_in_edge<Edge> e) const
89 {
90 return get(vertex_to_processor, DVertex(e.source_processor, source(e.e, g)))
91 != e.source_processor;
92 }
93
94 private:
95 VertexProcessorMap vertex_to_processor;
96 const Graph& g;
97 };
98
99 template<typename VertexProcessorMap, typename Graph>
100 inline source_migrated_t<VertexProcessorMap, Graph>
source_migrated(VertexProcessorMap vertex_to_processor,const Graph & g)101 source_migrated(VertexProcessorMap vertex_to_processor, const Graph& g)
102 { return source_migrated_t<VertexProcessorMap, Graph>(vertex_to_processor, g); }
103
104 /* Predicate that returns true if the target has migrated. */
105 template<typename VertexProcessorMap, typename Graph>
106 struct source_or_target_migrated_t
107 {
108 typedef typename graph_traits<Graph>::edge_descriptor Edge;
109
source_or_target_migrated_tboost::detail::parallel::source_or_target_migrated_t110 source_or_target_migrated_t(VertexProcessorMap vertex_to_processor,
111 const Graph& g)
112 : vertex_to_processor(vertex_to_processor), g(g) { }
113
operator ()boost::detail::parallel::source_or_target_migrated_t114 bool operator()(Edge e) const
115 {
116 return get(vertex_to_processor, source(e, g)) != source(e, g).owner
117 || get(vertex_to_processor, target(e, g)) != target(e, g).owner;
118 }
119
120 private:
121 VertexProcessorMap vertex_to_processor;
122 const Graph& g;
123 };
124
125 template<typename VertexProcessorMap, typename Graph>
126 inline source_or_target_migrated_t<VertexProcessorMap, Graph>
source_or_target_migrated(VertexProcessorMap vertex_to_processor,const Graph & g)127 source_or_target_migrated(VertexProcessorMap vertex_to_processor,
128 const Graph& g)
129 {
130 typedef source_or_target_migrated_t<VertexProcessorMap, Graph> result_type;
131 return result_type(vertex_to_processor, g);
132 }
133
134 } } // end of namespace detail::parallel
135
136 template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
137 template<typename VertexProcessorMap>
138 void
139 PBGL_DISTRIB_ADJLIST_TYPE
request_in_neighbors(vertex_descriptor v,VertexProcessorMap vertex_to_processor,bidirectionalS)140 ::request_in_neighbors(vertex_descriptor v,
141 VertexProcessorMap vertex_to_processor,
142 bidirectionalS)
143 {
144 BGL_FORALL_INEDGES_T(v, e, *this, graph_type)
145 request(vertex_to_processor, source(e, *this));
146 }
147
148 template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
149 template<typename VertexProcessorMap>
150 void
151 PBGL_DISTRIB_ADJLIST_TYPE
remove_migrated_in_edges(vertex_descriptor v,VertexProcessorMap vertex_to_processor,bidirectionalS)152 ::remove_migrated_in_edges(vertex_descriptor v,
153 VertexProcessorMap vertex_to_processor,
154 bidirectionalS)
155 {
156 graph_detail::erase_if(get(vertex_in_edges, base())[v.local],
157 source_migrated(vertex_to_processor, base()));
158 }
159
160 template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
161 template<typename VertexProcessorMap>
162 void
163 PBGL_DISTRIB_ADJLIST_TYPE
redistribute(VertexProcessorMap vertex_to_processor)164 ::redistribute(VertexProcessorMap vertex_to_processor)
165 {
166 using boost::parallel::inplace_all_to_all;
167
168 // When we have stable descriptors, we only move those descriptors
169 // that actually need to be moved. Otherwise, we essentially have to
170 // regenerate the entire graph.
171 const bool has_stable_descriptors =
172 is_same<typename config_type::vertex_list_selector, listS>::value
173 || is_same<typename config_type::vertex_list_selector, setS>::value
174 || is_same<typename config_type::vertex_list_selector, multisetS>::value;
175
176 typedef detail::parallel::redistributed_descriptor<vertex_descriptor,
177 vertex_property_type>
178 redistributed_vertex;
179 typedef detail::parallel::redistributed_descriptor<edge_descriptor,
180 edge_property_type>
181 redistributed_edge;
182
183 vertex_iterator vi, vi_end;
184 edge_iterator ei, ei_end;
185
186 process_group_type pg = process_group();
187
188 // Initial synchronization makes sure that we have all of our ducks
189 // in a row. We don't want any outstanding add/remove messages
190 // coming in mid-redistribution!
191 synchronize(process_group_);
192
193 // We cannot cope with eviction of ghost cells
194 vertex_to_processor.set_max_ghost_cells(0);
195
196 process_id_type p = num_processes(pg);
197
198 // Send vertices and edges to the processor where they will
199 // actually reside. This requires O(|V| + |E|) communication
200 std::vector<std::vector<redistributed_vertex> > redistributed_vertices(p);
201 std::vector<std::vector<redistributed_edge> > redistributed_edges(p);
202
203 // Build the sets of relocated vertices for each process and then do
204 // an all-to-all transfer.
205 for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; ++vi) {
206 if (!has_stable_descriptors
207 || get(vertex_to_processor, *vi) != vi->owner) {
208 redistributed_vertices[get(vertex_to_processor, *vi)]
209 .push_back(redistributed_vertex(*vi, get(vertex_all_t(), base(),
210 vi->local)));
211 }
212
213 // When our descriptors are stable, we need to determine which
214 // adjacent descriptors are stable to determine which edges will
215 // be removed.
216 if (has_stable_descriptors) {
217 BGL_FORALL_OUTEDGES_T(*vi, e, *this, graph_type)
218 request(vertex_to_processor, target(e, *this));
219 request_in_neighbors(*vi, vertex_to_processor, directed_selector());
220 }
221 }
222
223 inplace_all_to_all(pg, redistributed_vertices);
224
225 // If we have stable descriptors, we need to know where our neighbor
226 // vertices are moving.
227 if (has_stable_descriptors)
228 synchronize(vertex_to_processor);
229
230 // Build the sets of relocated edges for each process and then do
231 // an all-to-all transfer.
232 for (boost::tie(ei, ei_end) = edges(*this); ei != ei_end; ++ei) {
233 vertex_descriptor src = source(*ei, *this);
234 vertex_descriptor tgt = target(*ei, *this);
235 if (!has_stable_descriptors
236 || get(vertex_to_processor, src) != src.owner
237 || get(vertex_to_processor, tgt) != tgt.owner)
238 redistributed_edges[get(vertex_to_processor, source(*ei, *this))]
239 .push_back(redistributed_edge(*ei, split_edge_property(get(edge_all_t(), base(),
240 ei->local))));
241 }
242 inplace_all_to_all(pg, redistributed_edges);
243
244 // A mapping from old vertex descriptors to new vertex
245 // descriptors. This is an STL map partly because I'm too lazy to
246 // build a real property map (which is hard in the general case) but
247 // also because it won't try to look in the graph itself, because
248 // the keys are all vertex descriptors that have been invalidated.
249 std::map<vertex_descriptor, vertex_descriptor> old_to_new_vertex_map;
250
251 if (has_stable_descriptors) {
252 // Clear out all vertices and edges that will have moved. There
253 // are several stages to this.
254
255 // First, eliminate all outgoing edges from the (local) vertices
256 // that have been moved or whose targets have been moved.
257 BGL_FORALL_VERTICES_T(v, *this, graph_type) {
258 if (get(vertex_to_processor, v) != v.owner) {
259 clear_out_edges(v.local, base());
260 clear_in_edges_local(v, directed_selector());
261 } else {
262 remove_out_edge_if(v.local,
263 target_migrated(vertex_to_processor, base()),
264 base());
265 remove_migrated_in_edges(v, vertex_to_processor, directed_selector());
266 }
267 }
268
269 // Next, eliminate locally-stored edges that have migrated (for
270 // undirected graphs).
271 graph_detail::erase_if(local_edges_,
272 source_or_target_migrated(vertex_to_processor, *this));
273
274 // Eliminate vertices that have migrated
275 for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; /* in loop */) {
276 if (get(vertex_to_processor, *vi) != vi->owner)
277 remove_vertex((*vi++).local, base());
278 else {
279 // Add the identity relation for vertices that have not migrated
280 old_to_new_vertex_map[*vi] = *vi;
281 ++vi;
282 }
283 }
284 } else {
285 // Clear out the local graph: the entire graph is in transit
286 clear();
287 }
288
289 // Add the new vertices to the graph. When we do so, update the old
290 // -> new vertex mapping both locally and for the owner of the "old"
291 // vertex.
292 {
293 typedef std::pair<vertex_descriptor, vertex_descriptor> mapping_pair;
294 std::vector<std::vector<mapping_pair> > mappings(p);
295
296 for (process_id_type src = 0; src < p; ++src) {
297 for (typename std::vector<redistributed_vertex>::iterator vi =
298 redistributed_vertices[src].begin();
299 vi != redistributed_vertices[src].end(); ++vi) {
300 vertex_descriptor new_vertex =
301 add_vertex(vi->get_property(), *this);
302 old_to_new_vertex_map[vi->descriptor] = new_vertex;
303 mappings[vi->descriptor.owner].push_back(mapping_pair(vi->descriptor,
304 new_vertex));
305 }
306
307 redistributed_vertices[src].clear();
308 }
309
310 inplace_all_to_all(pg, mappings);
311
312 // Add the mappings we were sent into the old->new map.
313 for (process_id_type src = 0; src < p; ++src)
314 old_to_new_vertex_map.insert(mappings[src].begin(), mappings[src].end());
315 }
316
317 // Get old->new vertex mappings for all of the vertices we need to
318 // know about.
319
320 // TBD: An optimization here might involve sending the
321 // request-response pairs without an explicit request step (for
322 // bidirectional and undirected graphs). However, it may not matter
323 // all that much given the cost of redistribution.
324 {
325 std::vector<std::vector<vertex_descriptor> > vertex_map_requests(p);
326 std::vector<std::vector<vertex_descriptor> > vertex_map_responses(p);
327
328 // We need to know about all of the vertices incident on edges
329 // that have been relocated to this processor. Tell each processor
330 // what each other processor needs to know.
331 for (process_id_type src = 0; src < p; ++src)
332 for (typename std::vector<redistributed_edge>::iterator ei =
333 redistributed_edges[src].begin();
334 ei != redistributed_edges[src].end(); ++ei) {
335 vertex_descriptor need_vertex = target(ei->descriptor, *this);
336 if (old_to_new_vertex_map.find(need_vertex)
337 == old_to_new_vertex_map.end())
338 {
339 old_to_new_vertex_map[need_vertex] = need_vertex;
340 vertex_map_requests[need_vertex.owner].push_back(need_vertex);
341 }
342 }
343 inplace_all_to_all(pg,
344 vertex_map_requests,
345 vertex_map_responses);
346
347 // Process the requests made for vertices we own. Then perform yet
348 // another all-to-all swap. This one matches the requests we've
349 // made to the responses we were given.
350 for (process_id_type src = 0; src < p; ++src)
351 for (typename std::vector<vertex_descriptor>::iterator vi =
352 vertex_map_responses[src].begin();
353 vi != vertex_map_responses[src].end(); ++vi)
354 *vi = old_to_new_vertex_map[*vi];
355 inplace_all_to_all(pg, vertex_map_responses);
356
357 // Matching the requests to the responses, update the old->new
358 // vertex map for all of the vertices we will need to know.
359 for (process_id_type src = 0; src < p; ++src) {
360 typedef typename std::vector<vertex_descriptor>::size_type size_type;
361 for (size_type i = 0; i < vertex_map_requests[src].size(); ++i) {
362 old_to_new_vertex_map[vertex_map_requests[src][i]] =
363 vertex_map_responses[src][i];
364 }
365 }
366 }
367
368 // Add edges to the graph by mapping the source and target.
369 for (process_id_type src = 0; src < p; ++src) {
370 for (typename std::vector<redistributed_edge>::iterator ei =
371 redistributed_edges[src].begin();
372 ei != redistributed_edges[src].end(); ++ei) {
373 add_edge(old_to_new_vertex_map[source(ei->descriptor, *this)],
374 old_to_new_vertex_map[target(ei->descriptor, *this)],
375 ei->get_property(),
376 *this);
377 }
378
379 redistributed_edges[src].clear();
380 }
381
382 // Be sure that edge-addition messages are received now, completing
383 // the graph.
384 synchronize(process_group_);
385
386 this->distribution().clear();
387
388 detail::parallel::maybe_initialize_vertex_indices(vertices(base()),
389 get(vertex_index, base()));
390 }
391
392 } // end namespace boost
393