• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2004-2006 The Trustees of Indiana University.
2
3// Use, modification and distribution is subject to the Boost Software
4// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7//  Authors: Douglas Gregor
8//           Andrew Lumsdaine
9#include <boost/optional.hpp>
10#include <cassert>
11#include <boost/graph/parallel/algorithm.hpp>
12#include <boost/graph/parallel/process_group.hpp>
13#include <functional>
14#include <algorithm>
15#include <boost/graph/parallel/simple_trigger.hpp>
16
17#ifndef BOOST_GRAPH_USE_MPI
18#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
19#endif
20
21namespace boost { namespace graph { namespace distributed {
22
23template<BOOST_DISTRIBUTED_QUEUE_PARMS>
24BOOST_DISTRIBUTED_QUEUE_TYPE::
25distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
26                  const Buffer& buffer, bool polling)
27  : process_group(process_group, attach_distributed_object()),
28    owner(owner),
29    buffer(buffer),
30    polling(polling)
31{
32  if (!polling)
33    outgoing_buffers.reset(
34      new outgoing_buffers_t(num_processes(process_group)));
35
36  setup_triggers();
37}
38
39template<BOOST_DISTRIBUTED_QUEUE_PARMS>
40BOOST_DISTRIBUTED_QUEUE_TYPE::
41distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
42                  const Buffer& buffer, const UnaryPredicate& pred,
43                  bool polling)
44  : process_group(process_group, attach_distributed_object()),
45    owner(owner),
46    buffer(buffer),
47    pred(pred),
48    polling(polling)
49{
50  if (!polling)
51    outgoing_buffers.reset(
52      new outgoing_buffers_t(num_processes(process_group)));
53
54  setup_triggers();
55}
56
57template<BOOST_DISTRIBUTED_QUEUE_PARMS>
58BOOST_DISTRIBUTED_QUEUE_TYPE::
59distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
60                  const UnaryPredicate& pred, bool polling)
61  : process_group(process_group, attach_distributed_object()),
62    owner(owner),
63    pred(pred),
64    polling(polling)
65{
66  if (!polling)
67    outgoing_buffers.reset(
68      new outgoing_buffers_t(num_processes(process_group)));
69
70  setup_triggers();
71}
72
73template<BOOST_DISTRIBUTED_QUEUE_PARMS>
74void
75BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
76{
77  typename ProcessGroup::process_id_type dest = get(owner, x);
78  if (outgoing_buffers)
79    outgoing_buffers->at(dest).push_back(x);
80  else if (dest == process_id(process_group))
81    buffer.push(x);
82  else
83    send(process_group, get(owner, x), msg_push, x);
84}
85
86template<BOOST_DISTRIBUTED_QUEUE_PARMS>
87bool
88BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
89{
90  /* Processes will stay here until the buffer is nonempty or
91     synchronization with the other processes indicates that all local
92     buffers are empty (and no messages are in transit).
93   */
94  while (buffer.empty() && !do_synchronize()) ;
95
96  return buffer.empty();
97}
98
99template<BOOST_DISTRIBUTED_QUEUE_PARMS>
100typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
101BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
102{
103  empty();
104  return buffer.size();
105}
106
107template<BOOST_DISTRIBUTED_QUEUE_PARMS>
108void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
109{
110  using boost::graph::parallel::simple_trigger;
111
112  simple_trigger(process_group, msg_push, this,
113                 &distributed_queue::handle_push);
114  simple_trigger(process_group, msg_multipush, this,
115                 &distributed_queue::handle_multipush);
116}
117
118template<BOOST_DISTRIBUTED_QUEUE_PARMS>
119void
120BOOST_DISTRIBUTED_QUEUE_TYPE::
121handle_push(int /*source*/, int /*tag*/, const value_type& value,
122            trigger_receive_context)
123{
124  if (pred(value)) buffer.push(value);
125}
126
127template<BOOST_DISTRIBUTED_QUEUE_PARMS>
128void
129BOOST_DISTRIBUTED_QUEUE_TYPE::
130handle_multipush(int /*source*/, int /*tag*/,
131                 const std::vector<value_type>& values,
132                 trigger_receive_context)
133{
134  for (std::size_t i = 0; i < values.size(); ++i)
135    if (pred(values[i])) buffer.push(values[i]);
136}
137
138template<BOOST_DISTRIBUTED_QUEUE_PARMS>
139bool
140BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
141{
142#ifdef PBGL_ACCOUNTING
143  ++num_synchronizations;
144#endif
145
146  using boost::parallel::all_reduce;
147  using std::swap;
148
149  typedef typename ProcessGroup::process_id_type process_id_type;
150
151  if (outgoing_buffers) {
152    // Transfer all of the push requests
153    process_id_type id = process_id(process_group);
154    process_id_type np = num_processes(process_group);
155    for (process_id_type dest = 0; dest < np; ++dest) {
156      outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
157      std::size_t size = outgoing.size();
158      if (size != 0) {
159        if (dest != id) {
160          send(process_group, dest, msg_multipush, outgoing);
161        } else {
162          for (std::size_t i = 0; i < size; ++i)
163            buffer.push(outgoing[i]);
164        }
165        outgoing.clear();
166      }
167    }
168  }
169  synchronize(process_group);
170
171  unsigned local_size = buffer.size();
172  unsigned global_size =
173    all_reduce(process_group, local_size, std::plus<unsigned>());
174  return global_size == 0;
175}
176
177} } } // end namespace boost::graph::distributed
178