• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CONTRIB_MPI_H_
17 #define TENSORFLOW_CONTRIB_MPI_H_
18 
19 #ifdef TENSORFLOW_USE_MPI
20 
21 #include "tensorflow/core/framework/op.h"
22 #include "tensorflow/core/framework/op_kernel.h"
23 #include "tensorflow/core/framework/shape_inference.h"
24 
25 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
26 #include "tensorflow/core/framework/tensor_types.h"
27 
28 #if GOOGLE_CUDA
29 #include "cuda_runtime.h"
30 #endif
31 
32 // Needed to avoid header issues with C++-supporting MPI implementations
33 #define OMPI_SKIP_MPICXX
34 #include "third_party/mpi/mpi.h"
35 
36 #define TAG_TENSOR 12
37 
38 namespace tensorflow {
39 namespace contrib {
40 namespace mpi_collectives {
41 
42 using CPUDevice = Eigen::ThreadPoolDevice;
43 using GPUDevice = Eigen::GpuDevice;
44 
45 // Convert from templated types to values we can pass to MPI.
46 template <typename T>
47 MPI_Datatype MPIType();
48 
49 // Convert from templated types to TensorFlow data types.
50 template <typename T>
51 DataType TensorFlowDataType();
52 
53 #define MPI_REQUIRES_OK(MPI_STATUS)                               \
54   if ((MPI_STATUS) != MPI_SUCCESS) {                              \
55     return errors::Unknown("MPI operation failed unexpectedly."); \
56   }
57 
58 // Copy data from one tensor to another tensor.
59 // This uses a custom CUDA stream on GPU, which is necessary to overlay the
60 // backpropagation computations with the allreduce.
61 template <typename Device>
62 void CopyTensorData(void* destination, void* source, size_t size);
63 
64 // Add a tensor into another tensor, accumulating in place.
65 // This uses a custom CUDA stream on GPU, which is necessary to overlay the
66 // backpropagation computations with the allreduce.
67 template <typename Device, typename T>
68 void AccumulateTensorData(T* destination, T* source, size_t size);
69 
70 // We need to get the right stream for doing CUDA memory transfers and
71 // operations, which is possibly different from the standard TensorFlow stream.
72 #if GOOGLE_CUDA
73 cudaStream_t CudaStreamForMPI();
74 #endif
75 
76 /* Perform a ring allreduce on the data. Allocate the necessary output tensor
77  * and store it in the output parameter.
78  *
79  * Assumes that all MPI processes are doing an allreduce of the same tensor,
80  * with the same dimensions.
81  *
82  * A ring allreduce is a bandwidth-optimal way to do an allreduce. To do the
83  * allreduce, the nodes involved are arranged in a ring:
84  *
85  *                   .--0--.
86  *                  /       \
87  *                 3         1
88  *                  \       /
89  *                   *--2--*
90  *
91  *  Each node always sends to the next clockwise node in the ring, and receives
92  *  from the previous one.
93  *
94  *  The allreduce is done in two parts: a scatter-reduce and an allgather. In
95  *  the scatter reduce, a reduction is done, so that each node ends up with a
96  *  chunk of the final output tensor which has contributions from all other
97  *  nodes.  In the allgather, those chunks are distributed among all the nodes,
98  *  so that all nodes have the entire output tensor.
99  *
100  *  Both of these operations are done by dividing the input tensor into N
101  *  evenly sized chunks (where N is the number of nodes in the ring).
102  *
103  *  The scatter-reduce is done in N-1 steps. In the ith step, node j will send
104  *  the (j - i)th chunk and receive the (j - i - 1)th chunk, adding it in to
105  *  its existing data for that chunk. For example, in the first iteration with
106  *  the ring depicted above, you will have the following transfers:
107  *
108  *      Segment 0:  Node 0 --> Node 1
109  *      Segment 1:  Node 1 --> Node 2
110  *      Segment 2:  Node 2 --> Node 3
111  *      Segment 3:  Node 3 --> Node 0
112  *
113  *  In the second iteration, you'll have the following transfers:
114  *
115  *      Segment 0:  Node 1 --> Node 2
116  *      Segment 1:  Node 2 --> Node 3
117  *      Segment 2:  Node 3 --> Node 0
118  *      Segment 3:  Node 0 --> Node 1
119  *
120  *  After this iteration, Node 2 has 3 of the four contributions to Segment 0.
121  *  The last iteration has the following transfers:
122  *
123  *      Segment 0:  Node 2 --> Node 3
124  *      Segment 1:  Node 3 --> Node 0
125  *      Segment 2:  Node 0 --> Node 1
126  *      Segment 3:  Node 1 --> Node 2
127  *
128  *  After this iteration, Node 3 has the fully accumulated Segment 0; Node 0
129  *  has the fully accumulated Segment 1; and so on. The scatter-reduce is
130  * complete.
131  *
132  *  Next, the allgather distributes these fully accumulated chunks across all
133  * nodes. Communication proceeds in the same ring, once again in N-1 steps. At
134  * the ith step, node j will send chunk (j - i + 1) and receive chunk (j - i).
135  * For example, at the first iteration, the following transfers will occur:
136  *
137  *      Segment 0:  Node 3 --> Node 0
138  *      Segment 1:  Node 0 --> Node 1
139  *      Segment 2:  Node 1 --> Node 2
140  *      Segment 3:  Node 2 --> Node 3
141  *
142  * After the first iteration, Node 0 will have a fully accumulated Segment 0
143  * (from Node 3) and Segment 1. In the next iteration, Node 0 will send its
144  * just-received Segment 0 onward to Node 1, and receive Segment 3 from Node 3.
145  * After this has continued for N - 1 iterations, all nodes will have a the
146  * fully accumulated tensor.
147  *
148  * Each node will do (N-1) sends for the scatter-reduce and (N-1) sends for the
149  * allgather. Each send will contain K / N bytes, if there are K bytes in the
150  * original tensor on every node. Thus, each node sends and receives 2K(N - 1)/N
151  * bytes of data, and the performance of the allreduce (assuming no latency in
152  * connections) is constrained by the slowest interconnect between the nodes.
153  *
154  */
155 template <typename Device, typename T>
RingAllreduce(OpKernelContext * context,const Tensor * input,Tensor * temp,Tensor * output)156 Status RingAllreduce(OpKernelContext* context, const Tensor* input,
157                      Tensor* temp, Tensor* output) {
158   // Acquire MPI size and rank
159   int n, r;
160   MPI_REQUIRES_OK(MPI_Comm_size(MPI_COMM_WORLD, &n));
161   MPI_REQUIRES_OK(MPI_Comm_rank(MPI_COMM_WORLD, &r));
162 
163   T* buffer = (T*)output->tensor_data().data();
164 
165   CopyTensorData<Device>((void*)buffer, (void*)input->tensor_data().data(),
166                          output->tensor_data().size());
167 
168   // Calculate segment sizes and segment ends
169   const size_t elements_to_reduce = input->NumElements();
170   const size_t segment_size = elements_to_reduce / n;
171   std::vector<size_t> segment_sizes(n, segment_size);
172 
173   const size_t residual = elements_to_reduce % n;
174   for (size_t i = 0; i < residual; ++i) {
175     segment_sizes[i]++;
176   }
177 
178   std::vector<size_t> segment_starts(n);
179   segment_starts[0] = 0;
180   for (size_t i = 1; i < segment_starts.size(); ++i) {
181     segment_starts[i] = segment_starts[i - 1] + segment_sizes[i - 1];
182   }
183 
184   assert(segment_starts[n - 1] + segment_sizes[n - 1] == elements_to_reduce);
185 
186   T* segment_recv = (T*)temp->tensor_data().data();
187 
188   // Receive from your left neighbor with wrap-around
189   const size_t recv_from = ((r - 1) + n) % n;
190 
191   // Send to your right neighbor with wrap-around
192   const size_t send_to = (r + 1) % n;
193 
194   MPI_Status recv_status;
195   MPI_Request recv_req;
196 
197   // Now start ring. At every step, for every rank, we iterate through
198   // segments with wraparound and send and recv from our neighbors and reduce
199   // locally. At the i'th iteration, rank r, sends segment (r-i) and receives
200   // segment (r-i-1).
201   for (int i = 0; i < n - 1; i++) {
202     const size_t send_seg_id = ((r - i) + n) % n;
203     const size_t recv_seg_id = ((r - i - 1) + n) % n;
204 
205     T* segment_send = &(buffer[segment_starts[send_seg_id]]);
206 
207     MPI_REQUIRES_OK(MPI_Irecv(segment_recv, segment_sizes[recv_seg_id],
208                               MPIType<T>(), recv_from, TAG_TENSOR,
209                               MPI_COMM_WORLD, &recv_req));
210 
211     MPI_REQUIRES_OK(MPI_Send(segment_send, segment_sizes[send_seg_id],
212                              MPIType<T>(), send_to, TAG_TENSOR,
213                              MPI_COMM_WORLD));
214 
215     T* segment_update = &(buffer[segment_starts[recv_seg_id]]);
216 
217     // Wait for recv to complete before reduction
218     MPI_REQUIRES_OK(MPI_Wait(&recv_req, &recv_status));
219 
220     const size_t recv_seg_size = segment_sizes[recv_seg_id];
221     AccumulateTensorData<Device, T>(segment_update, segment_recv,
222                                     recv_seg_size);
223   }
224 
225   // Now start pipelined ring allgather. At every step, for every rank, we
226   // iterate through segments with wraparound and send and recv from our
227   // neighbors. At the i'th iteration, rank r, sends segment (r-i+1) and
228   // receives segment (r-i).
229   for (size_t i = 0; i < n - 1; ++i) {
230     const size_t send_seg_id = ((r - i + 1) + n) % n;
231     const size_t recv_seg_id = ((r - i) + n) % n;
232 
233     // Segment to send - at every iteration we send segment (r-i+1)
234     T* segment_send = &(buffer[segment_starts[send_seg_id]]);
235 
236     // Segment to recv - at every iteration we receive segment (r-i)
237     T* segment_recv = &(buffer[segment_starts[recv_seg_id]]);
238 
239     MPI_REQUIRES_OK(MPI_Sendrecv(
240         segment_send, segment_sizes[send_seg_id], MPIType<T>(), send_to,
241         TAG_TENSOR, segment_recv, segment_sizes[recv_seg_id], MPIType<T>(),
242         recv_from, TAG_TENSOR, MPI_COMM_WORLD, &recv_status));
243   }
244 
245   return Status::OK();
246 }
247 
248 // Perform a ring allgather on a Tensor. Other ranks may allgather with a
249 // tensor which differs in the first dimension only; all other dimensions must
250 // be the same.
251 //
252 // For more information on the ring allgather, read the documentation for the
253 // ring allreduce, which includes a ring allgather.
254 template <typename Device, typename T>
RingAllgather(OpKernelContext * context,const Tensor * input,const std::vector<size_t> & sizes,Tensor * output)255 Status RingAllgather(OpKernelContext* context, const Tensor* input,
256                      const std::vector<size_t>& sizes, Tensor* output) {
257   // Acquire MPI size and rank
258   int n, r;
259   MPI_REQUIRES_OK(MPI_Comm_size(MPI_COMM_WORLD, &n));
260   MPI_REQUIRES_OK(MPI_Comm_rank(MPI_COMM_WORLD, &r));
261 
262   assert(sizes.size() == n);
263   assert(input->dim_size(0) == sizes[r]);
264 
265   // Compute number of elements in every "row". We can't compute number of
266   // elements in every chunks, because those chunks are variable length.
267   size_t elements_per_row = 1;
268   for (int i = 1; i < input->shape().dims(); i++) {
269     elements_per_row *= input->dim_size(i);
270   }
271 
272   // Copy data from input tensor to correct place in output tensor.
273   std::vector<size_t> segment_starts(n);
274   segment_starts[0] = 0;
275   for (int i = 1; i < n; i++) {
276     segment_starts[i] = segment_starts[i - 1] + elements_per_row * sizes[i - 1];
277   }
278   size_t offset = segment_starts[r];
279 
280   // Copy data to the right offset for this rank.
281   T* buffer = (T*)output->tensor_data().data();
282   CopyTensorData<Device>((void*)(buffer + offset),
283                          (void*)input->tensor_data().data(),
284                          elements_per_row * sizes[r] * sizeof(T));
285 
286   // Receive from your left neighbor with wrap-around
287   const size_t recv_from = ((r - 1) + n) % n;
288 
289   // Send to your right neighbor with wrap-around
290   const size_t send_to = (r + 1) % n;
291 
292   // Perform a ring allgather. At every step, for every rank, we iterate
293   // through segments with wraparound and send and recv from our neighbors.
294   // At the i'th iteration, rank r, sends segment (r-i) and receives segment
295   // (r-1-i).
296   MPI_Status recv_status;
297   for (size_t i = 0; i < n - 1; ++i) {
298     const size_t send_seg_id = ((r - i) + n) % n;
299     const size_t recv_seg_id = ((r - i - 1) + n) % n;
300 
301     // Segment to send - at every iteration we send segment (r-i)
302     size_t offset_send = segment_starts[send_seg_id];
303     size_t rows_send = sizes[send_seg_id];
304     T* segment_send = &(buffer[offset_send]);
305 
306     // Segment to recv - at every iteration we receive segment (r-1-i)
307     size_t offset_recv = segment_starts[recv_seg_id];
308     size_t rows_recv = sizes[recv_seg_id];
309     T* segment_recv = &(buffer[offset_recv]);
310 
311     MPI_REQUIRES_OK(MPI_Sendrecv(
312         segment_send, elements_per_row * rows_send, MPIType<T>(), send_to,
313         TAG_TENSOR, segment_recv, elements_per_row * rows_recv, MPIType<T>(),
314         recv_from, TAG_TENSOR, MPI_COMM_WORLD, &recv_status));
315   }
316 
317   return Status::OK();
318 }
319 
320 }  // namespace mpi_collectives
321 }  // namespace contrib
322 }  // namespace tensorflow
323 
324 #endif  // TENSORFLOW_USE_MPI
325 
326 #undef TENSORFLOW_CONTRIB_MPI_H_
327 #endif  // TENSORFLOW_CONTRIB_MPI_H_
328