1 // Copyright Daniel Wallin 2007. Use, modification and distribution is
2 // subject to the Boost Software License, Version 1.0. (See accompanying
3 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
4
5 #ifndef BOOST_GRAPH_DISTRIBUTED_ADJLIST_SERIALIZATION_070925_HPP
6 #define BOOST_GRAPH_DISTRIBUTED_ADJLIST_SERIALIZATION_070925_HPP
7
8 #ifndef BOOST_GRAPH_USE_MPI
9 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
10 #endif
11
12 # include <boost/assert.hpp>
13 # include <boost/lexical_cast.hpp>
14 # include <boost/foreach.hpp>
15 # include <boost/filesystem/path.hpp>
16 # include <boost/filesystem/operations.hpp>
17 # include <cctype>
18 # include <fstream>
19
20 namespace boost {
21
22 namespace detail { namespace parallel
23 {
24
25 // Wraps a local descriptor, making it serializable.
26 template <class Local>
27 struct serializable_local_descriptor
28 {
serializable_local_descriptorboost::detail::parallel::serializable_local_descriptor29 serializable_local_descriptor()
30 {}
31
serializable_local_descriptorboost::detail::parallel::serializable_local_descriptor32 serializable_local_descriptor(Local local)
33 : local(local)
34 {}
35
operator Local const&boost::detail::parallel::serializable_local_descriptor36 operator Local const&() const
37 {
38 return local;
39 }
40
operator ==boost::detail::parallel::serializable_local_descriptor41 bool operator==(serializable_local_descriptor const& other) const
42 {
43 return local == other.local;
44 }
45
operator <boost::detail::parallel::serializable_local_descriptor46 bool operator<(serializable_local_descriptor const& other) const
47 {
48 return local < other.local;
49 }
50
51 template <class Archive>
serializeboost::detail::parallel::serializable_local_descriptor52 void serialize(Archive& ar, const unsigned int /*version*/)
53 {
54 ar & unsafe_serialize(local);
55 }
56
57 Local local;
58 };
59
60 template <class Vertex, class Properties>
61 struct pending_edge
62 {
pending_edgeboost::detail::parallel::pending_edge63 pending_edge(
64 Vertex source, Vertex target
65 , Properties properties, void* property_ptr
66 )
67 : source(source)
68 , target(target)
69 , properties(properties)
70 , property_ptr(property_ptr)
71 {}
72
73 Vertex source;
74 Vertex target;
75 Properties properties;
76 void* property_ptr;
77 };
78
is_digit(char c)79 inline bool is_digit(char c)
80 {
81 return std::isdigit(c) != 0;
82 }
83
84 inline std::vector<int>
available_process_files(std::string const & filename)85 available_process_files(std::string const& filename)
86 {
87 if (!filesystem::exists(filename))
88 return std::vector<int>();
89
90 std::vector<int> result;
91
92 for (filesystem::directory_iterator i(filename), end; i != end; ++i)
93 {
94 if (!filesystem::is_regular(*i))
95 boost::throw_exception(std::runtime_error("directory contains non-regular entries"));
96
97 std::string process_name = i->path().filename().string();
98 for (std::string::size_type i = 0; i < process_name.size(); ++i)
99 if (!is_digit(process_name[i]))
100 boost::throw_exception(std::runtime_error("directory contains files with invalid names"));
101
102 result.push_back(boost::lexical_cast<int>(process_name));
103 }
104
105 return result;
106 }
107
108 template <class Archive, class Tag, class T, class Base>
maybe_load_properties(Archive & ar,char const * name,property<Tag,T,Base> & properties)109 void maybe_load_properties(
110 Archive& ar, char const* name, property<Tag, T, Base>& properties)
111 {
112 ar >> serialization::make_nvp(name, get_property_value(properties, Tag()));
113 maybe_load_properties(ar, name, static_cast<Base&>(properties));
114 }
115
116 template <class Archive>
maybe_load_properties(Archive &,char const *,no_property &)117 void maybe_load_properties(
118 Archive&, char const*, no_property&)
119 {}
120
121 template <class Archive, typename Bundle>
maybe_load_properties(Archive & ar,char const * name,Bundle & bundle)122 void maybe_load_properties(
123 Archive& ar, char const* name, Bundle& bundle)
124 {
125 ar >> serialization::make_nvp(name, bundle);
126 no_property prop;
127 maybe_load_properties(ar, name, prop);
128 }
129
130
131
132
133
134
135 template <class Graph, class Archive, class VertexListS>
136 struct graph_loader
137 {
138 typedef typename Graph::vertex_descriptor vertex_descriptor;
139 typedef typename Graph::local_vertex_descriptor local_vertex_descriptor;
140 typedef typename Graph::vertex_property_type vertex_property_type;
141 typedef typename Graph::edge_descriptor edge_descriptor;
142 typedef typename Graph::local_edge_descriptor local_edge_descriptor;
143 typedef typename Graph::edge_property_type edge_property_type;
144 typedef typename Graph::process_group_type process_group_type;
145 typedef typename process_group_type::process_id_type process_id_type;
146 typedef typename Graph::directed_selector directed_selector;
147 typedef typename mpl::if_<
148 is_same<VertexListS, defaultS>, vecS, VertexListS
149 >::type vertex_list_selector;
150 typedef pending_edge<vertex_descriptor, edge_property_type>
151 pending_edge_type;
152 typedef serializable_local_descriptor<local_vertex_descriptor>
153 serializable_vertex_descriptor;
154
graph_loaderboost::detail::parallel::graph_loader155 graph_loader(Graph& g, Archive& ar)
156 : m_g(g)
157 , m_ar(ar)
158 , m_pg(g.process_group())
159 , m_requested_vertices(num_processes(m_pg))
160 , m_remote_vertices(num_processes(m_pg))
161 , m_property_ptrs(num_processes(m_pg))
162 {
163 g.clear();
164 load_prefix();
165 load_vertices();
166 load_edges();
167 ar >> make_nvp("distribution", m_g.distribution());
168 }
169
170 private:
171 struct pending_in_edge
172 {
pending_in_edgeboost::detail::parallel::graph_loader::pending_in_edge173 pending_in_edge(
174 vertex_descriptor u, vertex_descriptor v, void* property_ptr
175 )
176 : u(u)
177 , v(v)
178 , property_ptr(property_ptr)
179 {}
180
181 vertex_descriptor u;
182 vertex_descriptor v;
183 void* property_ptr;
184 };
185
is_rootboost::detail::parallel::graph_loader186 bool is_root() const
187 {
188 return process_id(m_pg) == 0;
189 }
190
191 template <class T>
make_nvpboost::detail::parallel::graph_loader192 serialization::nvp<T> const make_nvp(char const* name, T& value) const
193 {
194 return serialization::nvp<T>(name, value);
195 }
196
197 void load_prefix();
198 void load_vertices();
199
200 template <class Anything>
201 void maybe_load_and_store_local_vertex(Anything);
202 void maybe_load_and_store_local_vertex(vecS);
203
204 void load_edges();
205 void load_in_edges(bidirectionalS);
206 void load_in_edges(directedS);
207 void add_pending_in_edge(
208 vertex_descriptor u, vertex_descriptor v, void* property_ptr, vecS);
209 template <class Anything>
210 void add_pending_in_edge(
211 vertex_descriptor u, vertex_descriptor v, void* property_ptr, Anything);
212 template <class Anything>
213 void add_edge(
214 vertex_descriptor u, vertex_descriptor v
215 , edge_property_type const& property, void* property_ptr, Anything);
216 void add_edge(
217 vertex_descriptor u, vertex_descriptor v
218 , edge_property_type const& property, void* property_ptr, vecS);
219 void add_remote_vertex_request(
220 vertex_descriptor u, vertex_descriptor v, directedS);
221 void add_remote_vertex_request(
222 vertex_descriptor u, vertex_descriptor v, bidirectionalS);
223 void add_in_edge(
224 edge_descriptor const&, void*, directedS);
225 void add_in_edge(
226 edge_descriptor const& edge, void* old_property_ptr, bidirectionalS);
227
228 void resolve_remote_vertices(directedS);
229 void resolve_remote_vertices(bidirectionalS);
230 vertex_descriptor resolve_remote_vertex(vertex_descriptor u) const;
231 vertex_descriptor resolve_remote_vertex(vertex_descriptor u, vecS) const;
232 template <class Anything>
233 vertex_descriptor resolve_remote_vertex(vertex_descriptor u, Anything) const;
234
235 void resolve_property_ptrs();
236
237 void commit_pending_edges(vecS);
238 template <class Anything>
239 void commit_pending_edges(Anything);
240 void commit_pending_in_edges(directedS);
241 void commit_pending_in_edges(bidirectionalS);
242
maybe_load_property_ptrboost::detail::parallel::graph_loader243 void* maybe_load_property_ptr(directedS) { return 0; }
244 void* maybe_load_property_ptr(bidirectionalS);
245
246 Graph& m_g;
247 Archive& m_ar;
248 process_group_type m_pg;
249
250 std::vector<process_id_type> m_id_mapping;
251
252 // Maps local vertices as loaded from the archive to
253 // the ones actually added to the graph. Only used
254 // when !vecS.
255 std::map<local_vertex_descriptor, local_vertex_descriptor> m_local_vertices;
256
257 // This is the list of remote vertex descriptors that we
258 // are going to receive from other processes. This is
259 // kept sorted so that we can determine the position of
260 // the matching vertex descriptor in m_remote_vertices.
261 std::vector<std::vector<serializable_vertex_descriptor> > m_requested_vertices;
262
263 // This is the list of remote vertex descriptors that
264 // we send and receive from other processes.
265 std::vector<std::vector<serializable_vertex_descriptor> > m_remote_vertices;
266
267 // ...
268 std::vector<pending_edge_type> m_pending_edges;
269
270 // The pending in-edges that will be added in the commit step, after
271 // the remote vertex descriptors has been resolved. Only used
272 // when bidirectionalS and !vecS.
273 std::vector<pending_in_edge> m_pending_in_edges;
274
275 std::vector<std::vector<unsafe_pair<void*,void*> > > m_property_ptrs;
276 };
277
278 template <class Graph, class Archive, class VertexListS>
load_prefix()279 void graph_loader<Graph, Archive, VertexListS>::load_prefix()
280 {
281 typename process_group_type::process_size_type num_processes_;
282 m_ar >> make_nvp("num_processes", num_processes_);
283
284 if (num_processes_ != num_processes(m_pg))
285 boost::throw_exception(std::runtime_error("number of processes mismatch"));
286
287 process_id_type old_id;
288 m_ar >> make_nvp("id", old_id);
289
290 std::vector<typename Graph::distribution_type::size_type> mapping;
291 m_ar >> make_nvp("mapping", mapping);
292
293 // Fetch all the old id's from the other processes.
294 std::vector<process_id_type> old_ids;
295 all_gather(m_pg, &old_id, &old_id+1, old_ids);
296
297 m_id_mapping.resize(num_processes(m_pg), -1);
298
299 for (process_id_type i = 0; i < num_processes(m_pg); ++i)
300 {
301 # ifdef PBGL_SERIALIZE_DEBUG
302 if (is_root())
303 std::cout << i << " used to be " << old_ids[i] << "\n";
304 # endif
305 BOOST_ASSERT(m_id_mapping[old_ids[i]] == -1);
306 m_id_mapping[old_ids[i]] = i;
307 }
308
309 std::vector<typename Graph::distribution_type::size_type> new_mapping(
310 mapping.size());
311
312 for (int i = 0; i < num_processes(m_pg); ++i)
313 {
314 new_mapping[mapping[old_ids[i]]] = i;
315 }
316
317 m_g.distribution().assign_mapping(
318 new_mapping.begin(), new_mapping.end());
319 }
320
321 template <class Graph, class Archive, class VertexListS>
load_vertices()322 void graph_loader<Graph, Archive, VertexListS>::load_vertices()
323 {
324 int V;
325 m_ar >> BOOST_SERIALIZATION_NVP(V);
326
327 # ifdef PBGL_SERIALIZE_DEBUG
328 if (is_root())
329 std::cout << "Loading vertices\n";
330 # endif
331
332 for (int i = 0; i < V; ++i)
333 {
334 maybe_load_and_store_local_vertex(vertex_list_selector());
335 }
336 }
337
338 template <class Graph, class Archive, class VertexListS>
339 template <class Anything>
maybe_load_and_store_local_vertex(Anything)340 void graph_loader<Graph, Archive, VertexListS>::maybe_load_and_store_local_vertex(Anything)
341 {
342 // Load the original vertex descriptor
343 local_vertex_descriptor local;
344 m_ar >> make_nvp("local", unsafe_serialize(local));
345
346 // Load the properties
347 vertex_property_type property;
348 detail::parallel::maybe_load_properties(m_ar, "vertex_property",
349 property);
350
351 // Add the vertex
352 vertex_descriptor v(process_id(m_pg), add_vertex(property, m_g.base()));
353
354 if (m_g.on_add_vertex)
355 m_g.on_add_vertex(v, m_g);
356
357 // Create the mapping from the "old" local descriptor to the new
358 // local descriptor.
359 m_local_vertices[local] = v.local;
360 }
361
362 template <class Graph, class Archive, class VertexListS>
maybe_load_and_store_local_vertex(vecS)363 void graph_loader<Graph, Archive, VertexListS>::maybe_load_and_store_local_vertex(vecS)
364 {
365 // Load the properties
366 vertex_property_type property;
367 detail::parallel::maybe_load_properties(m_ar, "vertex_property",
368 property);
369
370 // Add the vertex
371 vertex_descriptor v(process_id(m_pg),
372 add_vertex(m_g.build_vertex_property(property),
373 m_g.base()));
374
375 if (m_g.on_add_vertex)
376 m_g.on_add_vertex(v, m_g);
377 }
378
379 template <class Graph, class Archive, class VertexListS>
load_edges()380 void graph_loader<Graph, Archive, VertexListS>::load_edges()
381 {
382 int E;
383 m_ar >> BOOST_SERIALIZATION_NVP(E);
384
385 # ifdef PBGL_SERIALIZE_DEBUG
386 if (is_root())
387 std::cout << "Loading edges\n";
388 # endif
389
390 for (int i = 0; i < E; ++i)
391 {
392 local_vertex_descriptor local_src;
393 process_id_type target_owner;
394 local_vertex_descriptor local_tgt;
395
396 m_ar >> make_nvp("source", unsafe_serialize(local_src));
397 m_ar >> make_nvp("target_owner", target_owner);
398 m_ar >> make_nvp("target", unsafe_serialize(local_tgt));
399
400 process_id_type new_src_owner = process_id(m_pg);
401 process_id_type new_tgt_owner = m_id_mapping[target_owner];
402
403 vertex_descriptor source(new_src_owner, local_src);
404 vertex_descriptor target(new_tgt_owner, local_tgt);
405
406 edge_property_type properties;
407 detail::parallel::maybe_load_properties(m_ar, "edge_property", properties);
408
409 void* property_ptr = maybe_load_property_ptr(directed_selector());
410 add_edge(source, target, properties, property_ptr, vertex_list_selector());
411 }
412
413 load_in_edges(directed_selector());
414 commit_pending_edges(vertex_list_selector());
415 }
416
417 template <class Graph, class Archive, class VertexListS>
load_in_edges(bidirectionalS)418 void graph_loader<Graph, Archive, VertexListS>::load_in_edges(bidirectionalS)
419 {
420 std::size_t I;
421 m_ar >> BOOST_SERIALIZATION_NVP(I);
422
423 # ifdef PBGL_SERIALIZE_DEBUG
424 if (is_root())
425 std::cout << "Loading in-edges\n";
426 # endif
427
428 for (int i = 0; i < I; ++i)
429 {
430 process_id_type src_owner;
431 local_vertex_descriptor local_src;
432 local_vertex_descriptor local_target;
433 void* property_ptr;
434
435 m_ar >> make_nvp("src_owner", src_owner);
436 m_ar >> make_nvp("source", unsafe_serialize(local_src));
437 m_ar >> make_nvp("target", unsafe_serialize(local_target));
438 m_ar >> make_nvp("property_ptr", unsafe_serialize(property_ptr));
439
440 src_owner = m_id_mapping[src_owner];
441
442 vertex_descriptor u(src_owner, local_src);
443 vertex_descriptor v(process_id(m_pg), local_target);
444
445 add_pending_in_edge(u, v, property_ptr, vertex_list_selector());
446 }
447 }
448
449 template <class Graph, class Archive, class VertexListS>
load_in_edges(directedS)450 void graph_loader<Graph, Archive, VertexListS>::load_in_edges(directedS)
451 {}
452
453 template <class Graph, class Archive, class VertexListS>
add_pending_in_edge(vertex_descriptor u,vertex_descriptor v,void * property_ptr,vecS)454 void graph_loader<Graph, Archive, VertexListS>::add_pending_in_edge(
455 vertex_descriptor u, vertex_descriptor v, void* property_ptr, vecS)
456 {
457 m_pending_in_edges.push_back(pending_in_edge(u,v,property_ptr));
458 }
459
460 template <class Graph, class Archive, class VertexListS>
461 template <class Anything>
add_pending_in_edge(vertex_descriptor u,vertex_descriptor v,void * property_ptr,Anything)462 void graph_loader<Graph, Archive, VertexListS>::add_pending_in_edge(
463 vertex_descriptor u, vertex_descriptor v, void* property_ptr, Anything)
464 {
465 // u and v represent the out-edge here, meaning v is local
466 // to us, and u is always remote.
467 m_pending_in_edges.push_back(pending_in_edge(u,v,property_ptr));
468 add_remote_vertex_request(v, u, bidirectionalS());
469 }
470
471 template <class Graph, class Archive, class VertexListS>
472 template <class Anything>
add_edge(vertex_descriptor u,vertex_descriptor v,edge_property_type const & property,void * property_ptr,Anything)473 void graph_loader<Graph, Archive, VertexListS>::add_edge(
474 vertex_descriptor u, vertex_descriptor v
475 , edge_property_type const& property, void* property_ptr, Anything)
476 {
477 m_pending_edges.push_back(pending_edge_type(u, v, property, property_ptr));
478 add_remote_vertex_request(u, v, directed_selector());
479 }
480
481 template <class Graph, class Archive, class VertexListS>
add_remote_vertex_request(vertex_descriptor u,vertex_descriptor v,directedS)482 void graph_loader<Graph, Archive, VertexListS>::add_remote_vertex_request(
483 vertex_descriptor u, vertex_descriptor v, directedS)
484 {
485 // We have to request the remote vertex.
486 m_requested_vertices[owner(v)].push_back(local(v));
487 }
488
489 template <class Graph, class Archive, class VertexListS>
add_remote_vertex_request(vertex_descriptor u,vertex_descriptor v,bidirectionalS)490 void graph_loader<Graph, Archive, VertexListS>::add_remote_vertex_request(
491 vertex_descriptor u, vertex_descriptor v, bidirectionalS)
492 {
493 // If the edge spans to another process, we know
494 // that that process has a matching in-edge, so
495 // we can just send our vertex. No requests
496 // necessary.
497 if (owner(v) != m_g.processor())
498 {
499 m_remote_vertices[owner(v)].push_back(local(u));
500 m_requested_vertices[owner(v)].push_back(local(v));
501 }
502 }
503
504 template <class Graph, class Archive, class VertexListS>
add_edge(vertex_descriptor u,vertex_descriptor v,edge_property_type const & property,void * property_ptr,vecS)505 void graph_loader<Graph, Archive, VertexListS>::add_edge(
506 vertex_descriptor u, vertex_descriptor v
507 , edge_property_type const& property, void* property_ptr, vecS)
508 {
509 std::pair<local_edge_descriptor, bool> inserted =
510 detail::parallel::add_local_edge(
511 local(u), local(v)
512 , m_g.build_edge_property(property), m_g.base());
513 BOOST_ASSERT(inserted.second);
514 put(edge_target_processor_id, m_g.base(), inserted.first, owner(v));
515
516 edge_descriptor e(owner(u), owner(v), true, inserted.first);
517
518 if (inserted.second && m_g.on_add_edge)
519 m_g.on_add_edge(e, m_g);
520
521 add_in_edge(e, property_ptr, directed_selector());
522 }
523
524 template <class Graph, class Archive, class VertexListS>
add_in_edge(edge_descriptor const &,void *,directedS)525 void graph_loader<Graph, Archive, VertexListS>::add_in_edge(
526 edge_descriptor const&, void*, directedS)
527 {}
528
529 template <class Graph, class Archive, class VertexListS>
add_in_edge(edge_descriptor const & edge,void * old_property_ptr,bidirectionalS)530 void graph_loader<Graph, Archive, VertexListS>::add_in_edge(
531 edge_descriptor const& edge, void* old_property_ptr, bidirectionalS)
532 {
533 if (owner(target(edge, m_g)) == m_g.processor())
534 {
535 detail::parallel::stored_in_edge<local_edge_descriptor>
536 e(m_g.processor(), local(edge));
537 boost::graph_detail::push(get(
538 vertex_in_edges, m_g.base())[local(target(edge, m_g))], e);
539 }
540 else
541 {
542 // We send the (old,new) property pointer pair to
543 // the remote process. This could be optimized to
544 // only send the new one -- the ordering can be
545 // made implicit because the old pointer value is
546 // stored on the remote process.
547 //
548 // Doing that is a little bit more complicated, but
549 // in case it turns out it's important we can do it.
550 void* property_ptr = local(edge).get_property();
551 m_property_ptrs[owner(target(edge, m_g))].push_back(
552 unsafe_pair<void*,void*>(old_property_ptr, property_ptr));
553 }
554 }
555
556 template <class Graph, class Archive, class VertexListS>
resolve_property_ptrs()557 void graph_loader<Graph, Archive, VertexListS>::resolve_property_ptrs()
558 {
559 # ifdef PBGL_SERIALIZE_DEBUG
560 if (is_root())
561 std::cout << "Resolving property pointers\n";
562 # endif
563
564 for (int i = 0; i < num_processes(m_pg); ++i)
565 {
566 std::sort(
567 m_property_ptrs[i].begin(), m_property_ptrs[i].end());
568 }
569
570 boost::parallel::inplace_all_to_all(m_pg, m_property_ptrs);
571 }
572
573 template <class Graph, class Archive, class VertexListS>
resolve_remote_vertices(directedS)574 void graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertices(directedS)
575 {
576 for (int i = 0; i < num_processes(m_pg); ++i)
577 {
578 std::sort(m_requested_vertices[i].begin(), m_requested_vertices[i].end());
579 }
580
581 boost::parallel::inplace_all_to_all(
582 m_pg, m_requested_vertices, m_remote_vertices);
583
584 for (int i = 0; i < num_processes(m_pg); ++i)
585 {
586 BOOST_FOREACH(serializable_vertex_descriptor& u, m_remote_vertices[i])
587 {
588 u = m_local_vertices[u];
589 }
590 }
591
592 boost::parallel::inplace_all_to_all(m_pg, m_remote_vertices);
593 }
594
595 template <class Graph, class Archive, class VertexListS>
resolve_remote_vertices(bidirectionalS)596 void graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertices(bidirectionalS)
597 {
598 # ifdef PBGL_SERIALIZE_DEBUG
599 if (is_root())
600 std::cout << "Resolving remote vertices\n";
601 # endif
602
603 for (int i = 0; i < num_processes(m_pg); ++i)
604 {
605 std::sort(m_requested_vertices[i].begin(), m_requested_vertices[i].end());
606 std::sort(m_remote_vertices[i].begin(), m_remote_vertices[i].end());
607
608 BOOST_FOREACH(serializable_vertex_descriptor& u, m_remote_vertices[i])
609 {
610 u = m_local_vertices[u];
611 }
612 }
613
614 boost::parallel::inplace_all_to_all(m_pg, m_remote_vertices);
615
616 for (int i = 0; i < num_processes(m_pg); ++i)
617 BOOST_ASSERT(m_remote_vertices[i].size() == m_requested_vertices[i].size());
618 }
619
620 template <class Graph, class Archive, class VertexListS>
commit_pending_edges(vecS)621 void graph_loader<Graph, Archive, VertexListS>::commit_pending_edges(vecS)
622 {
623 commit_pending_in_edges(directed_selector());
624 }
625
626 template <class Graph, class Archive, class VertexListS>
627 template <class Anything>
commit_pending_edges(Anything)628 void graph_loader<Graph, Archive, VertexListS>::commit_pending_edges(Anything)
629 {
630 resolve_remote_vertices(directed_selector());
631
632 BOOST_FOREACH(pending_edge_type const& e, m_pending_edges)
633 {
634 vertex_descriptor u = resolve_remote_vertex(e.source);
635 vertex_descriptor v = resolve_remote_vertex(e.target);
636 add_edge(u, v, e.properties, e.property_ptr, vecS());
637 }
638
639 commit_pending_in_edges(directed_selector());
640 }
641
642 template <class Graph, class Archive, class VertexListS>
commit_pending_in_edges(directedS)643 void graph_loader<Graph, Archive, VertexListS>::commit_pending_in_edges(directedS)
644 {}
645
646 template <class Graph, class Archive, class VertexListS>
commit_pending_in_edges(bidirectionalS)647 void graph_loader<Graph, Archive, VertexListS>::commit_pending_in_edges(bidirectionalS)
648 {
649 resolve_property_ptrs();
650
651 BOOST_FOREACH(pending_in_edge const& e, m_pending_in_edges)
652 {
653 vertex_descriptor u = resolve_remote_vertex(e.u, vertex_list_selector());
654 vertex_descriptor v = resolve_remote_vertex(e.v, vertex_list_selector());
655
656 typedef detail::parallel::stored_in_edge<local_edge_descriptor> stored_edge;
657
658 std::vector<unsafe_pair<void*,void*> >::iterator i = std::lower_bound(
659 m_property_ptrs[owner(u)].begin()
660 , m_property_ptrs[owner(u)].end()
661 , unsafe_pair<void*,void*>(e.property_ptr, 0)
662 );
663
664 if (i == m_property_ptrs[owner(u)].end()
665 || i->first != e.property_ptr)
666 {
667 BOOST_ASSERT(false);
668 }
669
670 local_edge_descriptor local_edge(local(u), local(v), i->second);
671 stored_edge edge(owner(u), local_edge);
672 boost::graph_detail::push(
673 get(vertex_in_edges, m_g.base())[local(v)], edge);
674 }
675 }
676
677 template <class Graph, class Archive, class VertexListS>
678 typename graph_loader<Graph, Archive, VertexListS>::vertex_descriptor
resolve_remote_vertex(vertex_descriptor u) const679 graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertex(
680 vertex_descriptor u) const
681 {
682 if (owner(u) == process_id(m_pg))
683 {
684 return vertex_descriptor(
685 process_id(m_pg), m_local_vertices.find(local(u))->second);
686 }
687
688 typename std::vector<serializable_vertex_descriptor>::const_iterator
689 i = std::lower_bound(
690 m_requested_vertices[owner(u)].begin()
691 , m_requested_vertices[owner(u)].end()
692 , serializable_vertex_descriptor(local(u))
693 );
694
695 if (i == m_requested_vertices[owner(u)].end()
696 || *i != local(u))
697 {
698 BOOST_ASSERT(false);
699 }
700
701 local_vertex_descriptor local =
702 m_remote_vertices[owner(u)][m_requested_vertices[owner(u)].end() - i];
703 return vertex_descriptor(owner(u), local);
704 }
705
706 template <class Graph, class Archive, class VertexListS>
707 typename graph_loader<Graph, Archive, VertexListS>::vertex_descriptor
resolve_remote_vertex(vertex_descriptor u,vecS) const708 graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertex(
709 vertex_descriptor u, vecS) const
710 {
711 return u;
712 }
713
714 template <class Graph, class Archive, class VertexListS>
715 template <class Anything>
716 typename graph_loader<Graph, Archive, VertexListS>::vertex_descriptor
resolve_remote_vertex(vertex_descriptor u,Anything) const717 graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertex(
718 vertex_descriptor u, Anything) const
719 {
720 return resolve_remote_vertex(u);
721 }
722
723 template <class Graph, class Archive, class VertexListS>
724 void*
maybe_load_property_ptr(bidirectionalS)725 graph_loader<Graph, Archive, VertexListS>::maybe_load_property_ptr(bidirectionalS)
726 {
727 void* ptr;
728 m_ar >> make_nvp("property_ptr", unsafe_serialize(ptr));
729 return ptr;
730 }
731
732 template <class Archive, class D>
maybe_save_local_descriptor(Archive & ar,D const &,vecS)733 void maybe_save_local_descriptor(Archive& ar, D const&, vecS)
734 {}
735
736 template <class Archive, class D, class NotVecS>
maybe_save_local_descriptor(Archive & ar,D const & d,NotVecS)737 void maybe_save_local_descriptor(Archive& ar, D const& d, NotVecS)
738 {
739 ar << serialization::make_nvp(
740 "local", unsafe_serialize(const_cast<D&>(d)));
741 }
742
743 template <class Archive>
maybe_save_properties(Archive &,char const *,no_property const &)744 void maybe_save_properties(
745 Archive&, char const*, no_property const&)
746 {}
747
748 template <class Archive, class Tag, class T, class Base>
maybe_save_properties(Archive & ar,char const * name,property<Tag,T,Base> const & properties)749 void maybe_save_properties(
750 Archive& ar, char const* name, property<Tag, T, Base> const& properties)
751 {
752 ar & serialization::make_nvp(name, get_property_value(properties, Tag()));
753 maybe_save_properties(ar, name, static_cast<Base const&>(properties));
754 }
755
756 template <class Archive, class Graph>
save_in_edges(Archive & ar,Graph const & g,directedS)757 void save_in_edges(Archive& ar, Graph const& g, directedS)
758 {}
759
760 // We need to save the edges in the base edge
761 // list, and the in_edges that are stored in the
762 // vertex_in_edges vertex property.
763 template <class Archive, class Graph>
save_in_edges(Archive & ar,Graph const & g,bidirectionalS)764 void save_in_edges(Archive& ar, Graph const& g, bidirectionalS)
765 {
766 typedef typename Graph::process_group_type
767 process_group_type;
768 typedef typename process_group_type::process_id_type
769 process_id_type;
770 typedef typename graph_traits<
771 Graph>::vertex_descriptor vertex_descriptor;
772 typedef typename vertex_descriptor::local_descriptor_type
773 local_vertex_descriptor;
774 typedef typename graph_traits<
775 Graph>::edge_descriptor edge_descriptor;
776
777 process_id_type id = g.processor();
778
779 std::vector<edge_descriptor> saved_in_edges;
780
781 BGL_FORALL_VERTICES_T(v, g, Graph)
782 {
783 BOOST_FOREACH(edge_descriptor const& e, in_edges(v, g))
784 {
785 // Only save the in_edges that isn't owned by this process.
786 if (owner(e) == id)
787 continue;
788
789 saved_in_edges.push_back(e);
790 }
791 }
792
793 std::size_t I = saved_in_edges.size();
794 ar << BOOST_SERIALIZATION_NVP(I);
795
796 BOOST_FOREACH(edge_descriptor const& e, saved_in_edges)
797 {
798 process_id_type src_owner = owner(source(e,g));
799 local_vertex_descriptor local_src = local(source(e,g));
800 local_vertex_descriptor local_target = local(target(e,g));
801 void* property_ptr = local(e).get_property();
802
803 using serialization::make_nvp;
804
805 ar << make_nvp("src_owner", src_owner);
806 ar << make_nvp("source", unsafe_serialize(local_src));
807 ar << make_nvp("target", unsafe_serialize(local_target));
808 ar << make_nvp("property_ptr", unsafe_serialize(property_ptr));
809 }
810 }
811
812 template <class Archive, class Edge>
maybe_save_property_ptr(Archive &,Edge const &,directedS)813 void maybe_save_property_ptr(Archive&, Edge const&, directedS)
814 {}
815
816 template <class Archive, class Edge>
maybe_save_property_ptr(Archive & ar,Edge const & e,bidirectionalS)817 void maybe_save_property_ptr(Archive& ar, Edge const& e, bidirectionalS)
818 {
819 void* ptr = local(e).get_property();
820 ar << serialization::make_nvp("property_ptr", unsafe_serialize(ptr));
821 }
822
823 template <class Archive, class Graph, class DirectedS>
save_edges(Archive & ar,Graph const & g,DirectedS)824 void save_edges(Archive& ar, Graph const& g, DirectedS)
825 {
826 typedef typename Graph::process_group_type
827 process_group_type;
828 typedef typename process_group_type::process_id_type
829 process_id_type;
830 typedef typename graph_traits<
831 Graph>::vertex_descriptor vertex_descriptor;
832
833 typedef typename Graph::edge_property_type edge_property_type;
834
835 int E = num_edges(g);
836 ar << BOOST_SERIALIZATION_NVP(E);
837
838 // For *directed* graphs, we can just save
839 // the edge list and be done.
840 //
841 // For *bidirectional* graphs, we need to also
842 // save the "vertex_in_edges" property map,
843 // because it might contain in-edges that
844 // are not locally owned.
845 BGL_FORALL_EDGES_T(e, g, Graph)
846 {
847 vertex_descriptor src(source(e, g));
848 vertex_descriptor tgt(target(e, g));
849
850 typename vertex_descriptor::local_descriptor_type
851 local_u(local(src));
852 typename vertex_descriptor::local_descriptor_type
853 local_v(local(tgt));
854
855 process_id_type target_owner = owner(tgt);
856
857 using serialization::make_nvp;
858
859 ar << make_nvp("source", unsafe_serialize(local_u));
860 ar << make_nvp("target_owner", target_owner);
861 ar << make_nvp("target", unsafe_serialize(local_v));
862
863 maybe_save_properties(
864 ar, "edge_property"
865 , static_cast<edge_property_type const&>(get(edge_all_t(), g, e))
866 );
867
868 maybe_save_property_ptr(ar, e, DirectedS());
869 }
870
871 save_in_edges(ar, g, DirectedS());
872 }
873
874 }} // namespace detail::parallel
875
876 template <PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
877 template <class IStreamConstructibleArchive>
load(std::string const & filename)878 void PBGL_DISTRIB_ADJLIST_TYPE::load(std::string const& filename)
879 {
880 process_group_type pg = process_group();
881 process_id_type id = process_id(pg);
882
883 synchronize(pg);
884
885 std::vector<int> disk_files = detail::parallel::available_process_files(filename);
886 std::sort(disk_files.begin(), disk_files.end());
887
888 // Negotiate which process gets which file. Serialized.
889 std::vector<int> consumed_files;
890 int picked_file = -1;
891
892 if (id > 0)
893 receive_oob(pg, id-1, 0, consumed_files);
894
895 std::sort(consumed_files.begin(), consumed_files.end());
896 std::vector<int> available_files;
897 std::set_difference(
898 disk_files.begin(), disk_files.end()
899 , consumed_files.begin(), consumed_files.end()
900 , std::back_inserter(available_files)
901 );
902
903 if (available_files.empty())
904 boost::throw_exception(std::runtime_error("no file available"));
905
906 // back() used for debug purposes. Making sure the
907 // ranks are shuffled.
908 picked_file = available_files.back();
909
910 # ifdef PBGL_SERIALIZE_DEBUG
911 std::cout << id << " picked " << picked_file << "\n";
912 # endif
913
914 consumed_files.push_back(picked_file);
915
916 if (id < num_processes(pg) - 1)
917 send_oob(pg, id+1, 0, consumed_files);
918
919 std::string local_filename = filename + "/" +
920 lexical_cast<std::string>(picked_file);
921
922 std::ifstream in(local_filename.c_str(), std::ios_base::binary);
923 IStreamConstructibleArchive ar(in);
924
925 detail::parallel::graph_loader<
926 graph_type, IStreamConstructibleArchive, InVertexListS
927 > loader(*this, ar);
928
929 # ifdef PBGL_SERIALIZE_DEBUG
930 std::cout << "Process " << id << " done loading.\n";
931 # endif
932
933 synchronize(pg);
934 }
935
936 template <PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
937 template <class OStreamConstructibleArchive>
save(std::string const & filename) const938 void PBGL_DISTRIB_ADJLIST_TYPE::save(std::string const& filename) const
939 {
940 typedef typename config_type::VertexListS vertex_list_selector;
941
942 process_group_type pg = process_group();
943 process_id_type id = process_id(pg);
944
945 if (filesystem::exists(filename) && !filesystem::is_directory(filename))
946 boost::throw_exception(std::runtime_error("entry exists, but is not a directory"));
947
948 filesystem::remove_all(filename);
949 filesystem::create_directory(filename);
950
951 synchronize(pg);
952
953 std::string local_filename = filename + "/" +
954 lexical_cast<std::string>(id);
955
956 std::ofstream out(local_filename.c_str(), std::ios_base::binary);
957 OStreamConstructibleArchive ar(out);
958
959 using serialization::make_nvp;
960
961 typename process_group_type::process_size_type num_processes_ = num_processes(pg);
962 ar << make_nvp("num_processes", num_processes_);
963 ar << BOOST_SERIALIZATION_NVP(id);
964 ar << make_nvp("mapping", this->distribution().mapping());
965
966 int V = num_vertices(*this);
967 ar << BOOST_SERIALIZATION_NVP(V);
968
969 BGL_FORALL_VERTICES_T(v, *this, graph_type)
970 {
971 local_vertex_descriptor local_descriptor(local(v));
972 detail::parallel::maybe_save_local_descriptor(
973 ar, local_descriptor, vertex_list_selector());
974 detail::parallel::maybe_save_properties(
975 ar, "vertex_property"
976 , static_cast<vertex_property_type const&>(get(vertex_all_t(), *this, v))
977 );
978 }
979
980 detail::parallel::save_edges(ar, *this, directed_selector());
981
982 ar << make_nvp("distribution", this->distribution());
983 }
984
985 } // namespace boost
986
987 #endif // BOOST_GRAPH_DISTRIBUTED_ADJLIST_SERIALIZATION_070925_HPP
988
989