• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2004-2006 The Trustees of Indiana University.
2 
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 //  Authors: Douglas Gregor
8 //           Andrew Lumsdaine
9 #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
10 #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
11 
12 #ifndef BOOST_GRAPH_USE_MPI
13 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
14 #endif
15 
16 #include <boost/graph/parallel/process_group.hpp>
17 #include <boost/optional.hpp>
18 #include <boost/shared_ptr.hpp>
19 #include <vector>
20 
21 namespace boost { namespace graph { namespace distributed {
22 
23 /// A unary predicate that always returns "true".
24 struct always_push
25 {
operator ()boost::graph::distributed::always_push26   template<typename T> bool operator()(const T&) const { return true; }
27 };
28 
29 
30 
31 /** A distributed queue adaptor.
32  *
33  * Class template @c distributed_queue implements a distributed queue
34  * across a process group. The distributed queue is an adaptor over an
35  * existing (local) queue, which must model the @ref Buffer
36  * concept. Each process stores a distinct copy of the local queue,
37  * from which it draws or removes elements via the @ref pop and @ref
38  * top members.
39  *
40  * The value type of the local queue must be a model of the @ref
41  * GlobalDescriptor concept. The @ref push operation of the
42  * distributed queue passes (via a message) the value to its owning
43  * processor. Thus, the elements within a particular local queue are
44  * guaranteed to have the process owning that local queue as an owner.
45  *
46  * Synchronization of distributed queues occurs in the @ref empty and
47  * @ref size functions, which will only return "empty" values (true or
48  * 0, respectively) when the entire distributed queue is empty. If the
49  * local queue is empty but the distributed queue is not, the
50  * operation will block until either condition changes. When the @ref
51  * size function of a nonempty queue returns, it returns the size of
52  * the local queue. These semantics were selected so that sequential
53  * code that processes elements in the queue via the following idiom
54  * can be parallelized via introduction of a distributed queue:
55  *
56  *   distributed_queue<...> Q;
57  *   Q.push(x);
58  *   while (!Q.empty()) {
59  *     // do something, that may push a value onto Q
60  *   }
61  *
62  * In the parallel version, the initial @ref push operation will place
63  * the value @c x onto its owner's queue. All processes will
64  * synchronize at the call to empty, and only the process owning @c x
65  * will be allowed to execute the loop (@ref Q.empty() returns
66  * false). This iteration may in turn push values onto other remote
67  * queues, so when that process finishes execution of the loop body
68  * and all processes synchronize again in @ref empty, more processes
69  * may have nonempty local queues to execute. Once all local queues
70  * are empty, @ref Q.empty() returns @c false for all processes.
71  *
72  * The distributed queue can receive messages at two different times:
73  * during synchronization and when polling @ref empty. Messages are
74  * always received during synchronization, to ensure that accurate
75  * local queue sizes can be determines. However, whether @ref empty
76  * should poll for messages is specified as an option to the
77  * constructor. Polling may be desired when the order in which
78  * elements in the queue are processed is not important, because it
79  * permits fewer synchronization steps and less communication
80  * overhead. However, when more strict ordering guarantees are
81  * required, polling may be semantically incorrect. By disabling
82  * polling, one ensures that parallel execution using the idiom above
83  * will not process an element at a later "level" before an earlier
84  * "level".
85  *
86  * The distributed queue nearly models the @ref Buffer
87  * concept. However, the @ref push routine does not necessarily
88  * increase the result of @c size() by one (although the size of the
89  * global queue does increase by one).
90  */
91 template<typename ProcessGroup, typename OwnerMap, typename Buffer,
92          typename UnaryPredicate = always_push>
93 class distributed_queue
94 {
95   typedef distributed_queue self_type;
96 
97   enum {
98     /** Message indicating a remote push. The message contains a
99      * single value x of type value_type that is to be pushed on the
100      * receiver's queue.
101      */
102     msg_push,
103     /** Push many elements at once. */
104     msg_multipush
105   };
106 
107  public:
108   typedef ProcessGroup                     process_group_type;
109   typedef Buffer                           buffer_type;
110   typedef typename buffer_type::value_type value_type;
111   typedef typename buffer_type::size_type  size_type;
112 
113   /** Construct a new distributed queue.
114    *
115    * Build a new distributed queue that communicates over the given @p
116    * process_group, whose local queue is initialized via @p buffer and
117    * which may or may not poll for messages.
118    */
119   explicit
120   distributed_queue(const ProcessGroup& process_group,
121                     const OwnerMap& owner,
122                     const Buffer& buffer,
123                     bool polling = false);
124 
125   /** Construct a new distributed queue.
126    *
127    * Build a new distributed queue that communicates over the given @p
128    * process_group, whose local queue is initialized via @p buffer and
129    * which may or may not poll for messages.
130    */
131   explicit
132   distributed_queue(const ProcessGroup& process_group = ProcessGroup(),
133                     const OwnerMap& owner = OwnerMap(),
134                     const Buffer& buffer = Buffer(),
135                     const UnaryPredicate& pred = UnaryPredicate(),
136                     bool polling = false);
137 
138   /** Construct a new distributed queue.
139    *
140    * Build a new distributed queue that communicates over the given @p
141    * process_group, whose local queue is default-initalized and which
142    * may or may not poll for messages.
143    */
144   distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
145                     const UnaryPredicate& pred, bool polling = false);
146 
147   /** Virtual destructor required with virtual functions.
148    *
149    */
~distributed_queue()150   virtual ~distributed_queue() {}
151 
152   /** Push an element onto the distributed queue.
153    *
154    * The element will be sent to its owner process to be added to that
155    * process's local queue. If polling is enabled for this queue and
156    * the owner process is the current process, the value will be
157    * immediately pushed onto the local queue.
158    *
159    * Complexity: O(1) messages of size O(sizeof(value_type)) will be
160    * transmitted.
161    */
162   void push(const value_type& x);
163 
164   /** Pop an element off the local queue.
165    *
166    * @p @c !empty()
167    */
pop()168   void pop() { buffer.pop(); }
169 
170   /**
171    * Return the element at the top of the local queue.
172    *
173    * @p @c !empty()
174    */
top()175   value_type& top() { return buffer.top(); }
176 
177   /**
178    * \overload
179    */
top() const180   const value_type& top() const { return buffer.top(); }
181 
182   /** Determine if the queue is empty.
183    *
184    * When the local queue is nonempty, returns @c true. If the local
185    * queue is empty, synchronizes with all other processes in the
186    * process group until either (1) the local queue is nonempty
187    * (returns @c true) (2) the entire distributed queue is empty
188    * (returns @c false).
189    */
190   bool empty() const;
191 
192   /** Determine the size of the local queue.
193    *
194    * The behavior of this routine is equivalent to the behavior of
195    * @ref empty, except that when @ref empty returns true this
196    * function returns the size of the local queue and when @ref empty
197    * returns false this function returns zero.
198    */
199   size_type size() const;
200 
201   // private:
202   /** Synchronize the distributed queue and determine if all queues
203    * are empty.
204    *
205    * \returns \c true when all local queues are empty, or false if at least
206    * one of the local queues is nonempty.
207    * Defined as virtual for derived classes like depth_limited_distributed_queue.
208    */
209   virtual bool do_synchronize() const;
210 
211  private:
212   // Setup triggers
213   void setup_triggers();
214 
215   // Message handlers
216   void
217   handle_push(int source, int tag, const value_type& value,
218               trigger_receive_context);
219 
220   void
221   handle_multipush(int source, int tag, const std::vector<value_type>& values,
222                    trigger_receive_context);
223 
224   mutable ProcessGroup process_group;
225   OwnerMap owner;
226   mutable Buffer buffer;
227   UnaryPredicate pred;
228   bool polling;
229 
230   typedef std::vector<value_type> outgoing_buffer_t;
231   typedef std::vector<outgoing_buffer_t> outgoing_buffers_t;
232   shared_ptr<outgoing_buffers_t> outgoing_buffers;
233 };
234 
235 /// Helper macro containing the normal names for the template
236 /// parameters to distributed_queue.
237 #define BOOST_DISTRIBUTED_QUEUE_PARMS                           \
238   typename ProcessGroup, typename OwnerMap, typename Buffer,    \
239   typename UnaryPredicate
240 
241 /// Helper macro containing the normal template-id for
242 /// distributed_queue.
243 #define BOOST_DISTRIBUTED_QUEUE_TYPE                                    \
244   distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate>
245 
246 /** Synchronize all processes involved with the given distributed queue.
247  *
248  * This function will synchronize all of the local queues for a given
249  * distributed queue, by ensuring that no additional messages are in
250  * transit. It is rarely required by the user, because most
251  * synchronization of distributed queues occurs via the @c empty or @c
252  * size methods.
253  */
254 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
255 inline void
synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE & Q)256 synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q)
257 { Q.do_synchronize(); }
258 
259 /// Construct a new distributed queue.
260 template<typename ProcessGroup, typename OwnerMap, typename Buffer>
261 inline distributed_queue<ProcessGroup, OwnerMap, Buffer>
make_distributed_queue(const ProcessGroup & process_group,const OwnerMap & owner,const Buffer & buffer,bool polling=false)262 make_distributed_queue(const ProcessGroup& process_group,
263                        const OwnerMap& owner,
264                        const Buffer& buffer,
265                        bool polling = false)
266 {
267   typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type;
268   return result_type(process_group, owner, buffer, polling);
269 }
270 
271 } } } // end namespace boost::graph::distributed
272 
273 #include <boost/graph/distributed/detail/queue.ipp>
274 
275 #undef BOOST_DISTRIBUTED_QUEUE_TYPE
276 #undef BOOST_DISTRIBUTED_QUEUE_PARMS
277 
278 #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
279