1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15# pylint: disable=g-short-docstring-punctuation 16"""## Communicating Between Processes with MPI 17 18TensorFlow natively provides inter-device communication through send and 19receive ops and inter-node communication through Distributed TensorFlow, based 20on the same send and receive abstractions. On HPC clusters where Infiniband or 21other high-speed node interconnects are available, these can end up being 22insufficient for synchronous data-parallel training (without asynchronous 23gradient descent). This module implements a variety of MPI ops which can take 24advantage of hardware-specific MPI libraries for efficient communication. 25 26In order to use this module, TensorFlow must be built with an MPI library, 27which can be provided to the `./configure` script at build time. As a user of 28TensorFlow, you will need to build TensorFlow yourself to select the MPI 29library to use; to do so, follow the [instructions for building TensorFlow from 30source](https://www.tensorflow.org/get_started/os_setup#installing_from_sources). 31 32### Utility Ops 33 34In addition to reductions and gathers, this module provides utility operations 35for detecting the running MPI configuration. 36 37Example: 38 39```python 40import tensorflow.contrib.mpi_collectives as mpi 41 42# Use `mpi.Session` instead of `tf.Session` 43with mpi.Session() as session: 44 rank = session.run(mpi.rank()) 45 print("My MPI Rank:", rank) 46 47 if rank == 0: 48 print("MPI Size:", session.run(mpi.size())) 49``` 50 51@@init 52@@size 53@@rank 54@@local_rank 55 56### Ring Allreduce and Allgather 57 58When summing or averaging tensors across many processes, communication can 59easily become a bottleneck. A naive implementation will send all the tensor 60values to the same process, perform the reduction, and then broadcast the 61values back to all other processes, effectively creating a synchronous 62parameter server in one process. However, the process responsible for 63performing the reduction will have to receive and send a massive amount of data 64which scales with the number of processes *and* the number of parameters in the 65model. 66 67Instead of centralizing the reduction and having one primary reducer, we can 68implement a distributed allreduce or allgather. A bandwidth-optimal allreduce 69will end up sending 2(N - 1) values for every value in the input tensor, 70and can be implemented with a ring allreduce [1]. (Intuitively, a linear reduce 71requires at least (N - 1) sends between the different nodes, and a broadcast of 72the result also requires (N - 1) sends, for a total of 2 (N - 1); these two 73steps cannot be combined in a clever way to reduce the number of required 74sends.) This module implements bandwidth-optimal ring allreduce and ring 75allgather operations using MPI; by choosing a hardware-appropriate MPI 76implementation (such as OpenMPI with CUDA-IPC support), you can train large 77models with synchronous gradient descent with minimal communication overhead. 78 79In addition to the `allreduce` and `allgather` functions, a convenience 80`DistributedOptimizer` wrapper is provided to simplify using these functions 81for reducing model gradients. 82 83Example: 84 85```python 86import tensorflow as tf 87from tensorflow.contrib import mpi_collectives as mpi 88 89# Construct a simple linear regression model to optimize 90W = tf.get_variable("W", shape=[20, 1], dtype=tf.float32) 91B = tf.get_variable("B", shape=[1, 1], dtype=tf.float32) 92inputs = tf.placeholder("Inputs", shape=[None, 20]) 93outputs = tf.placeholder("Outputs", shape=[None, 1]) 94loss = tf.nn.l2_loss(tf.matmul(inputs, W) + B - outputs) 95 96# Training using MPI allreduce with DistributedOptimizer 97optimizer = mpi.DistributedOptimizer(tf.train.AdamOptimizer()) 98train = optimizer.minimize(loss) 99 100# Average loss over all ranks, for printing. 101# Do not pass this to an optimizer! 102avg_loss = mpi.allreduce(loss) 103 104# On different ranks, feed different input data. 105with mpi.Session() as session: 106 rank = session.run(mpi.rank()) 107 batch_inputs, batch_outputs = construct_batch_for_rank(rank) 108 feed_dict = {inputs: batch_inputs, outputs: batch_outputs} 109 _, l = session.run([train, avg_loss], feed_dict=feed_dict) 110 print("Average Loss:", l) 111``` 112 113[1] Patarasuk, Pitch and Yuan, Xin. "Bandwidth Optimal All-reduce Algorithms 114for Clusters of Workstations". 115 116@@Session 117@@DistributedOptimizer 118@@allreduce 119@@allgather 120""" 121 122from __future__ import absolute_import 123from __future__ import division 124from __future__ import print_function 125 126import tensorflow as tf 127 128from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import init 129from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import size 130from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import rank 131from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import local_rank 132from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import allgather 133from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import _allreduce 134 135 136def allreduce(tensor, average=True): 137 """Perform an MPI allreduce on a tf.Tensor or tf.IndexedSlices. 138 139 Arguments: 140 tensor: tf.Tensor, tf.Variable, or tf.IndexedSlices to reduce. 141 The shape of the input must be identical across all ranks. 142 average: If True, computes the average over all ranks. 143 Otherwise, computes the sum over all ranks. 144 145 This function performs a bandwidth-optimal ring allreduce on the input 146 tensor. If the input is an tf.IndexedSlices, the function instead does an 147 allgather on the values and the indices, effectively doing an allreduce on 148 the represented tensor. 149 """ 150 if isinstance(tensor, tf.IndexedSlices): 151 # For IndexedSlices, do two allgathers intead of an allreduce. 152 mpi_size = tf.cast(size(), tensor.values.dtype) 153 values = allgather(tensor.values) 154 indices = allgather(tensor.indices) 155 156 # To make this operation into an average, divide all gathered values by 157 # the MPI size. 158 new_values = tf.div(values, mpi_size) if average else values 159 return tf.IndexedSlices(new_values, indices, 160 dense_shape=tensor.dense_shape) 161 else: 162 mpi_size = tf.cast(size(), tensor.dtype) 163 summed_tensor = _allreduce(tensor) 164 new_tensor = (tf.div(summed_tensor, mpi_size) 165 if average else summed_tensor) 166 return new_tensor 167 168 169class DistributedOptimizer(tf.train.Optimizer): 170 """An optimizer that wraps another tf.Optimizer, using an MPI allreduce to 171 average gradient values before applying gradients to model weights.""" 172 173 def __init__(self, optimizer, name=None, use_locking=False): 174 """Construct a new DistributedOptimizer, which uses another optimizer 175 under the hood for computing single-process gradient values and 176 applying gradient updates after the gradient values have been averaged 177 across all the MPI ranks. 178 179 Args: 180 optimizer: Optimizer to use for computing gradients and applying updates. 181 name: Optional name prefix for the operations created when applying 182 gradients. Defaults to "Distributed" followed by the provided 183 optimizer type. 184 use_locking: Whether to use locking when updating variables. See 185 Optimizer.__init__ for more info. 186 """ 187 if name is None: 188 name = "Distributed{}".format(type(optimizer).__name__) 189 190 self._optimizer = optimizer 191 super(DistributedOptimizer, self).__init__( 192 name=name, use_locking=use_locking) 193 194 def compute_gradients(self, *args, **kwargs): 195 """Compute gradients of all trainable variables. 196 197 See Optimizer.compute_gradients() for more info. 198 199 In DistributedOptimizer, compute_gradients() is overridden to also 200 allreduce the gradients before returning them. 201 """ 202 gradients = (super(DistributedOptimizer, self) 203 .compute_gradients(*args, **kwargs)) 204 return [(allreduce(gradient), var) for (gradient, var) in gradients] 205 206 def _apply_dense(self, *args, **kwargs): 207 """Calls this same method on the underlying optimizer.""" 208 return self._optimizer._apply_dense(*args, **kwargs) 209 210 def _apply_sparse(self, *args, **kwargs): 211 """Calls this same method on the underlying optimizer.""" 212 return self._optimizer._apply_sparse(*args, **kwargs) 213 214 def _apply_sparse_duplicate_indices(self, *args, **kwargs): 215 """Calls this same method on the underlying optimizer.""" 216 return self._optimizer._apply_sparse_duplicate_indices(*args, 217 **kwargs) 218 219 def _prepare(self, *args, **kwargs): 220 """Calls this same method on the underlying optimizer.""" 221 return self._optimizer._prepare(*args, **kwargs) 222 223 def _create_slots(self, *args, **kwargs): 224 """Calls this same method on the underlying optimizer.""" 225 return self._optimizer._create_slots(*args, **kwargs) 226 227 def _valid_dtypes(self, *args, **kwargs): 228 """Calls this same method on the underlying optimizer.""" 229 return self._optimizer._valid_dtypes(*args, **kwargs) 230 231 def _finish(self, *args, **kwargs): 232 """Calls this same method on the underlying optimizer.""" 233 return self._optimizer._finish(*args, **kwargs) 234 235 236class Session(tf.Session): 237 """A class for running TensorFlow operations, with copies of the same graph 238 running distributed across different MPI nodes. 239 240 The primary difference between `tf.Session` and 241 `tf.contrib.mpi_collectives.Session` is that the MPI `Session` ensures that 242 the `Session` options are correct for use with `tf.contrib.mpi`, and 243 initializes MPI immediately upon the start of the session. 244 """ 245 246 def __init__(self, target='', graph=None, config=None): 247 """Creates a new TensorFlow MPI session. 248 249 Unlike a normal `tf.Session`, an MPI Session may only use a single GPU, 250 which must be specified in advance before the session is initialized. 251 In addition, it only uses a single graph evaluation thread, and 252 initializes MPI immediately upon starting. 253 254 If no `graph` argument is specified when constructing the session, 255 the default graph will be launched in the session. If you are 256 using more than one graph (created with `tf.Graph()` in the same 257 process, you will have to use different sessions for each graph, 258 but each graph can be used in multiple sessions. In this case, it 259 is often clearer to pass the graph to be launched explicitly to 260 the session constructor. 261 262 Args: 263 target: (Optional.) The execution engine to connect to. 264 graph: (Optional.) The `Graph` to be launched (described above). 265 config: (Optional.) A `ConfigProto` protocol buffer with configuration 266 options for the session. 267 """ 268 super(Session, self).__init__(target, graph, config=config) 269 270 # Initialize MPI on the relevant device. 271 # TODO: Move this to library load and eliminate mpi.Session() 272 if graph is None: 273 graph = tf.get_default_graph() 274 with graph.as_default(): 275 self.run(init()) 276