1 // Copyright (C) 2004-2006 The Trustees of Indiana University. 2 3 // Use, modification and distribution is subject to the Boost Software 4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at 5 // http://www.boost.org/LICENSE_1_0.txt) 6 7 // Authors: Brian Barrett 8 // Douglas Gregor 9 // Andrew Lumsdaine 10 #ifndef BOOST_GRAPH_PARALLEL_CC_PS_HPP 11 #define BOOST_GRAPH_PARALLEL_CC_PS_HPP 12 13 #ifndef BOOST_GRAPH_USE_MPI 14 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" 15 #endif 16 17 #include <boost/assert.hpp> 18 #include <boost/property_map/property_map.hpp> 19 #include <boost/graph/parallel/algorithm.hpp> 20 #include <boost/pending/indirect_cmp.hpp> 21 #include <boost/graph/graph_traits.hpp> 22 #include <boost/graph/overloading.hpp> 23 #include <boost/graph/distributed/concepts.hpp> 24 #include <boost/graph/parallel/properties.hpp> 25 #include <boost/graph/parallel/process_group.hpp> 26 #include <boost/optional.hpp> 27 #include <algorithm> 28 #include <vector> 29 #include <queue> 30 #include <limits> 31 #include <map> 32 #include <boost/graph/parallel/container_traits.hpp> 33 #include <boost/graph/iteration_macros.hpp> 34 35 36 // Connected components algorithm based on a parallel search. 37 // 38 // Every N nodes starts a parallel search from the first vertex in 39 // their local vertex list during the first superstep (the other nodes 40 // remain idle during the first superstep to reduce the number of 41 // conflicts in numbering the components). At each superstep, all new 42 // component mappings from remote nodes are handled. If there is no 43 // work from remote updates, a new vertex is removed from the local 44 // list and added to the work queue. 45 // 46 // Components are allocated from the component_value_allocator object, 47 // which ensures that a given component number is unique in the 48 // system, currently by using the rank and number of processes to 49 // stride allocations. 50 // 51 // When two components are discovered to actually be the same 52 // component, a mapping is created in the collisions object. The 53 // lower component number is prefered in the resolution, so component 54 // numbering resolution is consistent. After the search has exhausted 55 // all vertices in the graph, the mapping is shared with all 56 // processes, and they independently resolve the comonent mapping (so 57 // O((N * NP) + (V * NP)) work, in O(N + V) time, where N is the 58 // number of mappings and V is the number of local vertices). This 59 // phase can likely be significantly sped up if a clever algorithm for 60 // the reduction can be found. 61 namespace boost { namespace graph { namespace distributed { 62 namespace cc_ps_detail { 63 // Local object for allocating component numbers. There are two 64 // places this happens in the code, and I was getting sick of them 65 // getting out of sync. Components are not tightly packed in 66 // numbering, but are numbered to ensure each rank has its own 67 // independent sets of numberings. 68 template<typename component_value_type> 69 class component_value_allocator { 70 public: component_value_allocator(int num,int size)71 component_value_allocator(int num, int size) : 72 last(0), num(num), size(size) 73 { 74 } 75 allocate(void)76 component_value_type allocate(void) 77 { 78 component_value_type ret = num + (last * size); 79 last++; 80 return ret; 81 } 82 83 private: 84 component_value_type last; 85 int num; 86 int size; 87 }; 88 89 90 // Map of the "collisions" between component names in the global 91 // component mapping. TO make cleanup easier, component numbers 92 // are added, pointing to themselves, when a new component is 93 // found. In order to make the results deterministic, the lower 94 // component number is always taken. The resolver will drill 95 // through the map until it finds a component entry that points to 96 // itself as the next value, allowing some cleanup to happen at 97 // update() time. Attempts are also made to update the mapping 98 // when new entries are created. 99 // 100 // Note that there's an assumption that the entire mapping is 101 // shared during the end of the algorithm, but before component 102 // name resolution. 103 template<typename component_value_type> 104 class collision_map { 105 public: collision_map()106 collision_map() : num_unique(0) 107 { 108 } 109 110 // add new component mapping first time component is used. Own 111 // function only so that we can sanity check there isn't already 112 // a mapping for that component number (which would be bad) add(const component_value_type & a)113 void add(const component_value_type &a) 114 { 115 BOOST_ASSERT(collisions.count(a) == 0); 116 collisions[a] = a; 117 } 118 119 // add a mapping between component values saying they're the 120 // same component add(const component_value_type & a,const component_value_type & b)121 void add(const component_value_type &a, const component_value_type &b) 122 { 123 component_value_type high, low, tmp; 124 if (a > b) { 125 high = a; 126 low = b; 127 } else { 128 high = b; 129 low = a; 130 } 131 132 if (collisions.count(high) != 0 && collisions[high] != low) { 133 tmp = collisions[high]; 134 if (tmp > low) { 135 collisions[tmp] = low; 136 collisions[high] = low; 137 } else { 138 collisions[low] = tmp; 139 collisions[high] = tmp; 140 } 141 } else { 142 collisions[high] = low; 143 } 144 145 } 146 147 // get the "real" component number for the given component. 148 // Used to resolve mapping at end of run. update(component_value_type a)149 component_value_type update(component_value_type a) 150 { 151 BOOST_ASSERT(num_unique > 0); 152 BOOST_ASSERT(collisions.count(a) != 0); 153 return collisions[a]; 154 } 155 156 // collapse the collisions tree, so that update is a one lookup 157 // operation. Count unique components at the same time. uniqify(void)158 void uniqify(void) 159 { 160 typename std::map<component_value_type, component_value_type>::iterator i, end; 161 162 end = collisions.end(); 163 for (i = collisions.begin() ; i != end ; ++i) { 164 if (i->first == i->second) { 165 num_unique++; 166 } else { 167 i->second = collisions[i->second]; 168 } 169 } 170 } 171 172 // get the number of component entries that have an associated 173 // component number of themselves, which are the real components 174 // used in the final mapping. This is the number of unique 175 // components in the graph. unique(void)176 int unique(void) 177 { 178 BOOST_ASSERT(num_unique > 0); 179 return num_unique; 180 } 181 182 // "serialize" into a vector for communication. serialize(void)183 std::vector<component_value_type> serialize(void) 184 { 185 std::vector<component_value_type> ret; 186 typename std::map<component_value_type, component_value_type>::iterator i, end; 187 188 end = collisions.end(); 189 for (i = collisions.begin() ; i != end ; ++i) { 190 ret.push_back(i->first); 191 ret.push_back(i->second); 192 } 193 194 return ret; 195 } 196 197 private: 198 std::map<component_value_type, component_value_type> collisions; 199 int num_unique; 200 }; 201 202 203 // resolver to handle remote updates. The resolver will add 204 // entries into the collisions map if required, and if it is the 205 // first time the vertex has been touched, it will add the vertex 206 // to the remote queue. Note that local updates are handled 207 // differently, in the main loop (below). 208 209 // BWB - FIX ME - don't need graph anymore - can pull from key value of Component Map. 210 template<typename ComponentMap, typename work_queue> 211 struct update_reducer { 212 BOOST_STATIC_CONSTANT(bool, non_default_resolver = false); 213 214 typedef typename property_traits<ComponentMap>::value_type component_value_type; 215 typedef typename property_traits<ComponentMap>::key_type vertex_descriptor; 216 update_reducerboost::graph::distributed::cc_ps_detail::update_reducer217 update_reducer(work_queue *q, 218 cc_ps_detail::collision_map<component_value_type> *collisions, 219 processor_id_type pg_id) : 220 q(q), collisions(collisions), pg_id(pg_id) 221 { 222 } 223 224 // ghost cell initialization routine. This should never be 225 // called in this imlementation. 226 template<typename K> operator ()boost::graph::distributed::cc_ps_detail::update_reducer227 component_value_type operator()(const K&) const 228 { 229 return component_value_type(0); 230 } 231 232 // resolver for remote updates. I'm not entirely sure why, but 233 // I decided to not change the value of the vertex if it's 234 // already non-infinite. It doesn't matter in the end, as we'll 235 // touch every vertex in the cleanup phase anyway. If the 236 // component is currently infinite, set to the new component 237 // number and add the vertex to the work queue. If it's not 238 // infinite, we've touched it already so don't add it to the 239 // work queue. Do add a collision entry so that we know the two 240 // components are the same. operator ()boost::graph::distributed::cc_ps_detail::update_reducer241 component_value_type operator()(const vertex_descriptor &v, 242 const component_value_type& current, 243 const component_value_type& update) const 244 { 245 const component_value_type max = (std::numeric_limits<component_value_type>::max)(); 246 component_value_type ret = current; 247 248 if (max == current) { 249 q->push(v); 250 ret = update; 251 } else if (current != update) { 252 collisions->add(current, update); 253 } 254 255 return ret; 256 } 257 258 // So for whatever reason, the property map can in theory call 259 // the resolver with a local descriptor in addition to the 260 // standard global descriptor. As far as I can tell, this code 261 // path is never taken in this implementation, but I need to 262 // have this code here to make it compile. We just make a 263 // global descriptor and call the "real" operator(). 264 template<typename K> operator ()boost::graph::distributed::cc_ps_detail::update_reducer265 component_value_type operator()(const K& v, 266 const component_value_type& current, 267 const component_value_type& update) const 268 { 269 return (*this)(vertex_descriptor(pg_id, v), current, update); 270 } 271 272 private: 273 work_queue *q; 274 collision_map<component_value_type> *collisions; 275 boost::processor_id_type pg_id; 276 }; 277 278 } // namespace cc_ps_detail 279 280 281 template<typename Graph, typename ComponentMap> 282 typename property_traits<ComponentMap>::value_type connected_components_ps(const Graph & g,ComponentMap c)283 connected_components_ps(const Graph& g, ComponentMap c) 284 { 285 using boost::graph::parallel::process_group; 286 287 typedef typename property_traits<ComponentMap>::value_type component_value_type; 288 typedef typename graph_traits<Graph>::vertex_iterator vertex_iterator; 289 typedef typename graph_traits<Graph>::vertex_descriptor vertex_descriptor; 290 typedef typename boost::graph::parallel::process_group_type<Graph> 291 ::type process_group_type; 292 typedef typename process_group_type::process_id_type process_id_type; 293 typedef std::queue<vertex_descriptor> work_queue; 294 295 static const component_value_type max_component = 296 (std::numeric_limits<component_value_type>::max)(); 297 typename property_map<Graph, vertex_owner_t>::const_type 298 owner = get(vertex_owner, g); 299 300 // standard who am i? stuff 301 process_group_type pg = process_group(g); 302 process_id_type id = process_id(pg); 303 304 // Initialize every vertex to have infinite component number 305 BGL_FORALL_VERTICES_T(v, g, Graph) put(c, v, max_component); 306 307 vertex_iterator current, end; 308 boost::tie(current, end) = vertices(g); 309 310 cc_ps_detail::component_value_allocator<component_value_type> cva(process_id(pg), num_processes(pg)); 311 cc_ps_detail::collision_map<component_value_type> collisions; 312 work_queue q; // this is intentionally a local data structure 313 c.set_reduce(cc_ps_detail::update_reducer<ComponentMap, work_queue>(&q, &collisions, id)); 314 315 // add starting work 316 while (true) { 317 bool useful_found = false; 318 component_value_type val = cva.allocate(); 319 put(c, *current, val); 320 collisions.add(val); 321 q.push(*current); 322 if (0 != out_degree(*current, g)) useful_found = true; 323 ++current; 324 if (useful_found) break; 325 } 326 327 // Run the loop until everyone in the system is done 328 bool global_done = false; 329 while (!global_done) { 330 331 // drain queue of work for this superstep 332 while (!q.empty()) { 333 vertex_descriptor v = q.front(); 334 q.pop(); 335 // iterate through outedges of the vertex currently being 336 // examined, setting their component to our component. There 337 // is no way to end up in the queue without having a component 338 // number already. 339 340 BGL_FORALL_ADJ_T(v, peer, g, Graph) { 341 component_value_type my_component = get(c, v); 342 343 // update other vertex with our component information. 344 // Resolver will handle remote collisions as well as whether 345 // to put the vertex on the work queue or not. We have to 346 // handle local collisions and work queue management 347 if (id == get(owner, peer)) { 348 if (max_component == get(c, peer)) { 349 put(c, peer, my_component); 350 q.push(peer); 351 } else if (my_component != get(c, peer)) { 352 collisions.add(my_component, get(c, peer)); 353 } 354 } else { 355 put(c, peer, my_component); 356 } 357 } 358 } 359 360 // synchronize / start a new superstep. 361 synchronize(pg); 362 global_done = all_reduce(pg, (q.empty() && (current == end)), boost::parallel::minimum<bool>()); 363 364 // If the queue is currently empty, add something to do to start 365 // the current superstep (supersteps start at the sync, not at 366 // the top of the while loop as one might expect). Down at the 367 // bottom of the while loop so that not everyone starts the 368 // algorithm with something to do, to try to reduce component 369 // name conflicts 370 if (q.empty()) { 371 bool useful_found = false; 372 for ( ; current != end && !useful_found ; ++current) { 373 if (max_component == get(c, *current)) { 374 component_value_type val = cva.allocate(); 375 put(c, *current, val); 376 collisions.add(val); 377 q.push(*current); 378 if (0 != out_degree(*current, g)) useful_found = true; 379 } 380 } 381 } 382 } 383 384 // share component mappings 385 std::vector<component_value_type> global; 386 std::vector<component_value_type> mine = collisions.serialize(); 387 all_gather(pg, mine.begin(), mine.end(), global); 388 for (size_t i = 0 ; i < global.size() ; i += 2) { 389 collisions.add(global[i], global[i + 1]); 390 } 391 collisions.uniqify(); 392 393 // update the component mappings 394 BGL_FORALL_VERTICES_T(v, g, Graph) { 395 put(c, v, collisions.update(get(c, v))); 396 } 397 398 return collisions.unique(); 399 } 400 401 } // end namespace distributed 402 403 } // end namespace graph 404 405 } // end namespace boost 406 407 #endif // BOOST_GRAPH_PARALLEL_CC_HPP 408