1 // Copyright (C) 2004-2006 The Trustees of Indiana University.
2
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 // Authors: Douglas Gregor
8 // Andrew Lumsdaine
9 #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
10 #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
11
12 #ifndef BOOST_GRAPH_USE_MPI
13 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
14 #endif
15
16 #include <boost/graph/parallel/process_group.hpp>
17 #include <boost/optional.hpp>
18 #include <boost/shared_ptr.hpp>
19 #include <vector>
20
21 namespace boost { namespace graph { namespace distributed {
22
23 /// A unary predicate that always returns "true".
24 struct always_push
25 {
operator ()boost::graph::distributed::always_push26 template<typename T> bool operator()(const T&) const { return true; }
27 };
28
29
30
31 /** A distributed queue adaptor.
32 *
33 * Class template @c distributed_queue implements a distributed queue
34 * across a process group. The distributed queue is an adaptor over an
35 * existing (local) queue, which must model the @ref Buffer
36 * concept. Each process stores a distinct copy of the local queue,
37 * from which it draws or removes elements via the @ref pop and @ref
38 * top members.
39 *
40 * The value type of the local queue must be a model of the @ref
41 * GlobalDescriptor concept. The @ref push operation of the
42 * distributed queue passes (via a message) the value to its owning
43 * processor. Thus, the elements within a particular local queue are
44 * guaranteed to have the process owning that local queue as an owner.
45 *
46 * Synchronization of distributed queues occurs in the @ref empty and
47 * @ref size functions, which will only return "empty" values (true or
48 * 0, respectively) when the entire distributed queue is empty. If the
49 * local queue is empty but the distributed queue is not, the
50 * operation will block until either condition changes. When the @ref
51 * size function of a nonempty queue returns, it returns the size of
52 * the local queue. These semantics were selected so that sequential
53 * code that processes elements in the queue via the following idiom
54 * can be parallelized via introduction of a distributed queue:
55 *
56 * distributed_queue<...> Q;
57 * Q.push(x);
58 * while (!Q.empty()) {
59 * // do something, that may push a value onto Q
60 * }
61 *
62 * In the parallel version, the initial @ref push operation will place
63 * the value @c x onto its owner's queue. All processes will
64 * synchronize at the call to empty, and only the process owning @c x
65 * will be allowed to execute the loop (@ref Q.empty() returns
66 * false). This iteration may in turn push values onto other remote
67 * queues, so when that process finishes execution of the loop body
68 * and all processes synchronize again in @ref empty, more processes
69 * may have nonempty local queues to execute. Once all local queues
70 * are empty, @ref Q.empty() returns @c false for all processes.
71 *
72 * The distributed queue can receive messages at two different times:
73 * during synchronization and when polling @ref empty. Messages are
74 * always received during synchronization, to ensure that accurate
75 * local queue sizes can be determines. However, whether @ref empty
76 * should poll for messages is specified as an option to the
77 * constructor. Polling may be desired when the order in which
78 * elements in the queue are processed is not important, because it
79 * permits fewer synchronization steps and less communication
80 * overhead. However, when more strict ordering guarantees are
81 * required, polling may be semantically incorrect. By disabling
82 * polling, one ensures that parallel execution using the idiom above
83 * will not process an element at a later "level" before an earlier
84 * "level".
85 *
86 * The distributed queue nearly models the @ref Buffer
87 * concept. However, the @ref push routine does not necessarily
88 * increase the result of @c size() by one (although the size of the
89 * global queue does increase by one).
90 */
91 template<typename ProcessGroup, typename OwnerMap, typename Buffer,
92 typename UnaryPredicate = always_push>
93 class distributed_queue
94 {
95 typedef distributed_queue self_type;
96
97 enum {
98 /** Message indicating a remote push. The message contains a
99 * single value x of type value_type that is to be pushed on the
100 * receiver's queue.
101 */
102 msg_push,
103 /** Push many elements at once. */
104 msg_multipush
105 };
106
107 public:
108 typedef ProcessGroup process_group_type;
109 typedef Buffer buffer_type;
110 typedef typename buffer_type::value_type value_type;
111 typedef typename buffer_type::size_type size_type;
112
113 /** Construct a new distributed queue.
114 *
115 * Build a new distributed queue that communicates over the given @p
116 * process_group, whose local queue is initialized via @p buffer and
117 * which may or may not poll for messages.
118 */
119 explicit
120 distributed_queue(const ProcessGroup& process_group,
121 const OwnerMap& owner,
122 const Buffer& buffer,
123 bool polling = false);
124
125 /** Construct a new distributed queue.
126 *
127 * Build a new distributed queue that communicates over the given @p
128 * process_group, whose local queue is initialized via @p buffer and
129 * which may or may not poll for messages.
130 */
131 explicit
132 distributed_queue(const ProcessGroup& process_group = ProcessGroup(),
133 const OwnerMap& owner = OwnerMap(),
134 const Buffer& buffer = Buffer(),
135 const UnaryPredicate& pred = UnaryPredicate(),
136 bool polling = false);
137
138 /** Construct a new distributed queue.
139 *
140 * Build a new distributed queue that communicates over the given @p
141 * process_group, whose local queue is default-initalized and which
142 * may or may not poll for messages.
143 */
144 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
145 const UnaryPredicate& pred, bool polling = false);
146
147 /** Virtual destructor required with virtual functions.
148 *
149 */
~distributed_queue()150 virtual ~distributed_queue() {}
151
152 /** Push an element onto the distributed queue.
153 *
154 * The element will be sent to its owner process to be added to that
155 * process's local queue. If polling is enabled for this queue and
156 * the owner process is the current process, the value will be
157 * immediately pushed onto the local queue.
158 *
159 * Complexity: O(1) messages of size O(sizeof(value_type)) will be
160 * transmitted.
161 */
162 void push(const value_type& x);
163
164 /** Pop an element off the local queue.
165 *
166 * @p @c !empty()
167 */
pop()168 void pop() { buffer.pop(); }
169
170 /**
171 * Return the element at the top of the local queue.
172 *
173 * @p @c !empty()
174 */
top()175 value_type& top() { return buffer.top(); }
176
177 /**
178 * \overload
179 */
top() const180 const value_type& top() const { return buffer.top(); }
181
182 /** Determine if the queue is empty.
183 *
184 * When the local queue is nonempty, returns @c true. If the local
185 * queue is empty, synchronizes with all other processes in the
186 * process group until either (1) the local queue is nonempty
187 * (returns @c true) (2) the entire distributed queue is empty
188 * (returns @c false).
189 */
190 bool empty() const;
191
192 /** Determine the size of the local queue.
193 *
194 * The behavior of this routine is equivalent to the behavior of
195 * @ref empty, except that when @ref empty returns true this
196 * function returns the size of the local queue and when @ref empty
197 * returns false this function returns zero.
198 */
199 size_type size() const;
200
201 // private:
202 /** Synchronize the distributed queue and determine if all queues
203 * are empty.
204 *
205 * \returns \c true when all local queues are empty, or false if at least
206 * one of the local queues is nonempty.
207 * Defined as virtual for derived classes like depth_limited_distributed_queue.
208 */
209 virtual bool do_synchronize() const;
210
211 private:
212 // Setup triggers
213 void setup_triggers();
214
215 // Message handlers
216 void
217 handle_push(int source, int tag, const value_type& value,
218 trigger_receive_context);
219
220 void
221 handle_multipush(int source, int tag, const std::vector<value_type>& values,
222 trigger_receive_context);
223
224 mutable ProcessGroup process_group;
225 OwnerMap owner;
226 mutable Buffer buffer;
227 UnaryPredicate pred;
228 bool polling;
229
230 typedef std::vector<value_type> outgoing_buffer_t;
231 typedef std::vector<outgoing_buffer_t> outgoing_buffers_t;
232 shared_ptr<outgoing_buffers_t> outgoing_buffers;
233 };
234
235 /// Helper macro containing the normal names for the template
236 /// parameters to distributed_queue.
237 #define BOOST_DISTRIBUTED_QUEUE_PARMS \
238 typename ProcessGroup, typename OwnerMap, typename Buffer, \
239 typename UnaryPredicate
240
241 /// Helper macro containing the normal template-id for
242 /// distributed_queue.
243 #define BOOST_DISTRIBUTED_QUEUE_TYPE \
244 distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate>
245
246 /** Synchronize all processes involved with the given distributed queue.
247 *
248 * This function will synchronize all of the local queues for a given
249 * distributed queue, by ensuring that no additional messages are in
250 * transit. It is rarely required by the user, because most
251 * synchronization of distributed queues occurs via the @c empty or @c
252 * size methods.
253 */
254 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
255 inline void
synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE & Q)256 synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q)
257 { Q.do_synchronize(); }
258
259 /// Construct a new distributed queue.
260 template<typename ProcessGroup, typename OwnerMap, typename Buffer>
261 inline distributed_queue<ProcessGroup, OwnerMap, Buffer>
make_distributed_queue(const ProcessGroup & process_group,const OwnerMap & owner,const Buffer & buffer,bool polling=false)262 make_distributed_queue(const ProcessGroup& process_group,
263 const OwnerMap& owner,
264 const Buffer& buffer,
265 bool polling = false)
266 {
267 typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type;
268 return result_type(process_group, owner, buffer, polling);
269 }
270
271 } } } // end namespace boost::graph::distributed
272
273 #include <boost/graph/distributed/detail/queue.ipp>
274
275 #undef BOOST_DISTRIBUTED_QUEUE_TYPE
276 #undef BOOST_DISTRIBUTED_QUEUE_PARMS
277
278 #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
279