1// Copyright (C) 2004-2006 The Trustees of Indiana University. 2 3// Use, modification and distribution is subject to the Boost Software 4// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at 5// http://www.boost.org/LICENSE_1_0.txt) 6 7// Authors: Douglas Gregor 8// Andrew Lumsdaine 9#include <boost/optional.hpp> 10#include <cassert> 11#include <boost/graph/parallel/algorithm.hpp> 12#include <boost/graph/parallel/process_group.hpp> 13#include <functional> 14#include <algorithm> 15#include <boost/graph/parallel/simple_trigger.hpp> 16 17#ifndef BOOST_GRAPH_USE_MPI 18#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" 19#endif 20 21namespace boost { namespace graph { namespace distributed { 22 23template<BOOST_DISTRIBUTED_QUEUE_PARMS> 24BOOST_DISTRIBUTED_QUEUE_TYPE:: 25distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, 26 const Buffer& buffer, bool polling) 27 : process_group(process_group, attach_distributed_object()), 28 owner(owner), 29 buffer(buffer), 30 polling(polling) 31{ 32 if (!polling) 33 outgoing_buffers.reset( 34 new outgoing_buffers_t(num_processes(process_group))); 35 36 setup_triggers(); 37} 38 39template<BOOST_DISTRIBUTED_QUEUE_PARMS> 40BOOST_DISTRIBUTED_QUEUE_TYPE:: 41distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, 42 const Buffer& buffer, const UnaryPredicate& pred, 43 bool polling) 44 : process_group(process_group, attach_distributed_object()), 45 owner(owner), 46 buffer(buffer), 47 pred(pred), 48 polling(polling) 49{ 50 if (!polling) 51 outgoing_buffers.reset( 52 new outgoing_buffers_t(num_processes(process_group))); 53 54 setup_triggers(); 55} 56 57template<BOOST_DISTRIBUTED_QUEUE_PARMS> 58BOOST_DISTRIBUTED_QUEUE_TYPE:: 59distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, 60 const UnaryPredicate& pred, bool polling) 61 : process_group(process_group, attach_distributed_object()), 62 owner(owner), 63 pred(pred), 64 polling(polling) 65{ 66 if (!polling) 67 outgoing_buffers.reset( 68 new outgoing_buffers_t(num_processes(process_group))); 69 70 setup_triggers(); 71} 72 73template<BOOST_DISTRIBUTED_QUEUE_PARMS> 74void 75BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x) 76{ 77 typename ProcessGroup::process_id_type dest = get(owner, x); 78 if (outgoing_buffers) 79 outgoing_buffers->at(dest).push_back(x); 80 else if (dest == process_id(process_group)) 81 buffer.push(x); 82 else 83 send(process_group, get(owner, x), msg_push, x); 84} 85 86template<BOOST_DISTRIBUTED_QUEUE_PARMS> 87bool 88BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const 89{ 90 /* Processes will stay here until the buffer is nonempty or 91 synchronization with the other processes indicates that all local 92 buffers are empty (and no messages are in transit). 93 */ 94 while (buffer.empty() && !do_synchronize()) ; 95 96 return buffer.empty(); 97} 98 99template<BOOST_DISTRIBUTED_QUEUE_PARMS> 100typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type 101BOOST_DISTRIBUTED_QUEUE_TYPE::size() const 102{ 103 empty(); 104 return buffer.size(); 105} 106 107template<BOOST_DISTRIBUTED_QUEUE_PARMS> 108void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers() 109{ 110 using boost::graph::parallel::simple_trigger; 111 112 simple_trigger(process_group, msg_push, this, 113 &distributed_queue::handle_push); 114 simple_trigger(process_group, msg_multipush, this, 115 &distributed_queue::handle_multipush); 116} 117 118template<BOOST_DISTRIBUTED_QUEUE_PARMS> 119void 120BOOST_DISTRIBUTED_QUEUE_TYPE:: 121handle_push(int /*source*/, int /*tag*/, const value_type& value, 122 trigger_receive_context) 123{ 124 if (pred(value)) buffer.push(value); 125} 126 127template<BOOST_DISTRIBUTED_QUEUE_PARMS> 128void 129BOOST_DISTRIBUTED_QUEUE_TYPE:: 130handle_multipush(int /*source*/, int /*tag*/, 131 const std::vector<value_type>& values, 132 trigger_receive_context) 133{ 134 for (std::size_t i = 0; i < values.size(); ++i) 135 if (pred(values[i])) buffer.push(values[i]); 136} 137 138template<BOOST_DISTRIBUTED_QUEUE_PARMS> 139bool 140BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const 141{ 142#ifdef PBGL_ACCOUNTING 143 ++num_synchronizations; 144#endif 145 146 using boost::parallel::all_reduce; 147 using std::swap; 148 149 typedef typename ProcessGroup::process_id_type process_id_type; 150 151 if (outgoing_buffers) { 152 // Transfer all of the push requests 153 process_id_type id = process_id(process_group); 154 process_id_type np = num_processes(process_group); 155 for (process_id_type dest = 0; dest < np; ++dest) { 156 outgoing_buffer_t& outgoing = outgoing_buffers->at(dest); 157 std::size_t size = outgoing.size(); 158 if (size != 0) { 159 if (dest != id) { 160 send(process_group, dest, msg_multipush, outgoing); 161 } else { 162 for (std::size_t i = 0; i < size; ++i) 163 buffer.push(outgoing[i]); 164 } 165 outgoing.clear(); 166 } 167 } 168 } 169 synchronize(process_group); 170 171 unsigned local_size = buffer.size(); 172 unsigned global_size = 173 all_reduce(process_group, local_size, std::plus<unsigned>()); 174 return global_size == 0; 175} 176 177} } } // end namespace boost::graph::distributed 178