1# Distribution Strategy 2 3> *NOTE*: This is an experimental feature. The API and performance 4> characteristics are subject to change. 5 6## Overview 7 8[`DistributionStrategy`](https://www.tensorflow.org/versions/master/api_docs/python/tf/contrib/distribute/DistributionStrategy) 9API is an easy way to distribute your training 10across multiple devices/machines. Our goal is to allow users to use existing 11models and training code with minimal changes to enable distributed training. 12Moreover, we've designed the API in such a way that it works with both eager and 13graph execution. 14 15Currently we support several types of strategies: 16 17* [`MirroredStrategy`](https://www.tensorflow.org/versions/master/api_docs/python/tf/contrib/distribute/MirroredStrategy): 18This does in-graph replication with synchronous training 19on many GPUs on one machine. Essentially, we create copies of all variables in 20the model's layers on each device. We then use all-reduce to combine gradients 21across the devices before applying them to the variables to keep them in sync. 22* [`CollectiveAllReduceStrategy`](https://www.tensorflow.org/versions/master/api_docs/python/tf/contrib/distribute/CollectiveAllReduceStrategy): 23This is a version of `MirroredStrategy` for multi-worker training. It uses 24a collective op to do all-reduce. This supports between-graph communication and 25synchronization, and delegates the specifics of the all-reduce implementation to 26the runtime (as opposed to encoding it in the graph). This allows it to perform 27optimizations like batching and switch between plugins that support different 28hardware or algorithms. In the future, this strategy will implement 29fault-tolerance to allow training to continue when there is worker failure. 30 31* [`ParameterServerStrategy`](https://www.tensorflow.org/versions/master/api_docs/python/tf/contrib/distribute/ParameterServerStrategy): 32This strategy supports using parameter servers either for multi-GPU local 33training or asynchronous multi-machine training. When used to train locally, 34variables are not mirrored, instead they are placed on the CPU and operations 35are replicated across all local GPUs. In a multi-machine setting, some are 36designated as workers and some as parameter servers. Each variable is placed on 37one parameter server. Computation operations are replicated across all GPUs of 38the workers. 39 40## Multi-GPU Training 41 42## Example with Keras API 43 44Let's see how to scale to multiple GPUs on one machine using `MirroredStrategy` with [tf.keras] (https://www.tensorflow.org/guide/keras). 45 46Let's define a simple input dataset for training this model. Note that currently we require using 47[`tf.data.Dataset`](https://www.tensorflow.org/api_docs/python/tf/data/Dataset) 48with `DistributionStrategy`. 49 50```python 51import tensorflow as tf 52from tensorflow import keras 53 54features = tf.data.Dataset.from_tensors([1.]).repeat(10000).batch(10) 55labels = tf.data.Dataset.from_tensors([1.]).repeat(10000).batch(10) 56train_dataset = tf.data.Dataset.zip((features, labels)) 57``` 58 59To distribute this Keras model on multiple GPUs using `MirroredStrategy` we 60first instantiate a `MirroredStrategy` object. 61 62```python 63distribution = tf.contrib.distribute.MirroredStrategy() 64``` 65 66Take a very simple model consisting of a single layer. We need to create and compile 67the model under the distribution strategy scope. 68 69```python 70with distribution.scope(): 71 inputs = tf.keras.layers.Input(shape=(1,)) 72 predictions = tf.keras.layers.Dense(1)(inputs) 73 model = tf.keras.models.Model(inputs=inputs, outputs=predictions) 74 75 model.compile(loss='mean_squared_error', 76 optimizer=tf.train.GradientDescentOptimizer(learning_rate=0.2)) 77``` 78 79To train the model we call Keras `fit` API using the input dataset that we 80created earlier, same as how we would in a non-distributed case. 81 82```python 83model.fit(train_dataset, epochs=5, steps_per_epoch=10) 84``` 85 86Similarly, we can also call `evaluate` and `predict` as before using appropriate 87datasets. 88 89```python 90model.evaluate(eval_dataset, steps=1) 91model.predict(predict_dataset, steps=1) 92``` 93 94That's all you need to train your model with Keras on multiple GPUs with 95`MirroredStrategy`. It will take care of splitting up 96the input dataset, replicating layers and variables on each device, and 97combining and applying gradients. 98 99The model and input code does not have to change because we have changed the 100underlying components of TensorFlow (such as 101optimizer, batch norm and summaries) to become distribution-aware. 102That means those components know how to 103combine their state across devices. Further, saving and checkpointing works 104seamlessly, so you can save with one or no distribution strategy and resume with 105another. 106 107 108## Example with Estimator API 109 110You can also use Distribution Strategy API with [`Estimator`](https://www.tensorflow.org/api_docs/python/tf/estimator/Estimator). Let's see a simple example of it's usage with `MirroredStrategy`. 111 112 113Consider a very simple model function which tries to learn a simple function. 114 115```python 116def model_fn(features, labels, mode): 117 layer = tf.layers.Dense(1) 118 logits = layer(features) 119 120 if mode == tf.estimator.ModeKeys.PREDICT: 121 predictions = {"logits": logits} 122 return tf.estimator.EstimatorSpec(mode, predictions=predictions) 123 124 loss = tf.losses.mean_squared_error( 125 labels=labels, predictions=tf.reshape(logits, [])) 126 127 if mode == tf.estimator.ModeKeys.EVAL: 128 return tf.estimator.EstimatorSpec(mode, loss=loss) 129 130 if mode == tf.estimator.ModeKeys.TRAIN: 131 train_op = tf.train.GradientDescentOptimizer(0.2).minimize(loss) 132 return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op) 133``` 134 135Again, let's define a simple input function to feed data for training this model. 136 137 138```python 139def input_fn(): 140 features = tf.data.Dataset.from_tensors([[1.]]).repeat(100) 141 labels = tf.data.Dataset.from_tensors(1.).repeat(100) 142 return tf.data.Dataset.zip((features, labels)) 143``` 144 145Now that we have a model function and input function defined, we can define the 146estimator. To use `MirroredStrategy`, all we need to do is: 147 148* Create an instance of the `MirroredStrategy` class. 149* Pass it to the 150[`RunConfig`](https://www.tensorflow.org/api_docs/python/tf/estimator/RunConfig) 151parameter of `Estimator`. 152 153 154```python 155distribution = tf.contrib.distribute.MirroredStrategy() 156config = tf.estimator.RunConfig(train_distribute=distribution) 157classifier = tf.estimator.Estimator(model_fn=model_fn, config=config) 158classifier.train(input_fn=input_fn) 159classifier.evaluate(input_fn=input_fn) 160``` 161 162That's it! This change will now configure estimator to run on all GPUs on your 163machine. 164 165 166## Customization and Performance Tips 167 168Above, we showed the easiest way to use [`MirroredStrategy`](https://www.tensorflow.org/versions/master/api_docs/python/tf/contrib/distribute/MirroredStrategy#__init__). 169There are few things you can customize in practice: 170 171* You can specify a list of specific GPUs (using param `devices`) or the number 172of GPUs (using param `num_gpus`), in case you don't want auto detection. 173* You can specify various parameters for all reduce with the `cross_tower_ops` 174param, such as the all reduce algorithm to use, and gradient repacking. 175 176We've tried to make it such that you get the best performance for your existing 177model. We also recommend you follow the tips from 178[Input Pipeline Performance Guide](https://www.tensorflow.org/performance/datasets_performance). 179Specifically, we found using [`map_and_batch`](https://www.tensorflow.org/performance/datasets_performance#map_and_batch) 180and [`dataset.prefetch`](https://www.tensorflow.org/performance/datasets_performance#pipelining) 181in the input function gives a solid boost in performance. When using 182`dataset.prefetch`, use `buffer_size=None` to let it detect optimal buffer size. 183 184## Multi-worker Training 185### Overview 186 187For multi-worker training, no code change is required to the `Estimator` code. 188You can run the same model code for all tasks in your cluster including 189parameter servers and the evaluator. But you need to use 190`tf.estimator.train_and_evaluate`, explicitly specify `num_gpus_per_workers` 191for your strategy object, and set "TF\_CONFIG" environment variables for each 192binary running in your cluster. We'll provide a Kubernetes template in the 193[tensorflow/ecosystem](https://github.com/tensorflow/ecosystem) repo which sets 194"TF\_CONFIG" for your training tasks. 195 196### TF\_CONFIG environment variable 197 198The "TF\_CONFIG" environment variables is a JSON string which specifies what 199tasks constitute a cluster, their addresses and each task's role in the cluster. 200One example of "TF\_CONFIG" is: 201 202```python 203TF_CONFIG='{ 204 "cluster": { 205 "worker": ["host1:port", "host2:port", "host3:port"], 206 "ps": ["host4:port", "host5:port"] 207 }, 208 "task": {"type": "worker", "index": 1} 209}' 210``` 211 212This "TF\_CONFIG" specifies that there are three workers and two ps tasks in the 213cluster along with their hosts and ports. The "task" part specifies that the 214role of the current task in the cluster, worker 1. Valid roles in a cluster is 215"chief", "worker", "ps" and "evaluator". There should be no "ps" job for 216`CollectiveAllReduceStrategy` and `MirroredStrategy`. The "evaluator" job is 217optional and can have at most one task. It does single machine evaluation and if 218you don't want to do evaluation, you can pass in a dummy `input_fn` to the 219`tf.estimator.EvalSpec` of `tf.estimator.train_and_evaluate`. 220 221### Dataset 222 223The `input_fn` you provide to estimator code is for one worker. So remember to 224scale up your batch if you have multiple GPUs on each worker. 225 226The same `input_fn` will be used for all workers if you use 227`CollectiveAllReduceStrategy` and `ParameterServerStrategy`. Therefore it is 228important to shuffle your dataset in your `input_fn`. 229 230`MirroredStrategy` will insert a `tf.dataset.Dataset.shard` call in you 231`input_fn` if `auto_shard_dataset` is set to `True`. As a result, each worker 232gets a fraction of your input data. 233 234### Performance Tips 235 236We have been actively working on multi-worker performance. Currently, prefer 237`CollectiveAllReduceStrategy` for synchronous multi-worker training. 238 239### Example 240 241Let's use the same example for multi-worker. We'll start a cluster with 3 242workers doing synchronous all-reduce training. In the following code snippet, we 243start multi-worker training using `tf.estimator.train_and_evaluate`: 244 245```python 246def model_main(): 247 distribution = tf.contrib.distribute.CollectiveAllReduceStrategy( 248 num_gpus_per_worker=2) 249 config = tf.estimator.RunConfig(train_distribute=distribution) 250 estimator = tf.estimator.Estimator(model_fn=model_fn, config=config) 251 train_spec = tf.estimator.TrainSpec(input_fn=input_fn) 252 eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn) 253 tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec) 254``` 255 256**Note**: You don't have to set "TF\_CONFIG" manually if you use our provided 257Kubernetes template. 258 259You'll then need 3 machines, find out their host addresses and one available 260port on each machine. Then set "TF\_CONFIG" in each binary and run the above 261model code. 262 263In your worker 0, run: 264 265```python 266os.environ["TF_CONFIG"] = json.dumps({ 267 "cluster": { 268 "worker": ["host1:port", "host2:port", "host3:port"] 269 }, 270 "task": {"type": "worker", "index": 0} 271}) 272 273# Call the model_main function defined above. 274model_main() 275``` 276 277In your worker 1, run: 278 279```python 280os.environ["TF_CONFIG"] = json.dumps({ 281 "cluster": { 282 "worker": ["host1:port", "host2:port", "host3:port"] 283 }, 284 "task": {"type": "worker", "index": 1} 285}) 286 287# Call the model_main function defined above. 288model_main() 289``` 290 291In your worker 2, run: 292 293```python 294os.environ["TF_CONFIG"] = json.dumps({ 295 "cluster": { 296 "worker": ["host1:port", "host2:port", "host3:port"] 297 }, 298 "task": {"type": "worker", "index": 2} 299}) 300 301# Call the model_main function defined above. 302model_main() 303``` 304 305Then you'll find your cluster has started training! You can inspect the logs of 306workers or start a tensorboard. 307 308### Standalone client mode 309 310We have a new way to run distributed training. You can bring up standard 311tensorflow servers in your cluster and run your model code anywhere such as on 312your laptop. 313 314In the above example, instead of calling `model_main`, you can call 315`tf.contrib.distribute.run_standard_tensorflow_server().join()`. This will bring 316up a cluster running standard tensorflow servers which wait for your request to 317start training. 318 319On your laptop, you can run 320 321```python 322distribution = tf.contrib.distribute.CollectiveAllReduceStrategy( 323 num_gpus_per_worker=2) 324config = tf.estimator.RunConfig( 325 experimental_distribute=tf.contrib.distribute.DistributeConfig( 326 train_distribute=distribution, 327 remote_cluster={"worker": ["host1:port", "host2:port", "host3:port"]})) 328estimator = tf.estimator.Estimator(model_fn=model_fn, config=config) 329train_spec = tf.estimator.TrainSpec(input_fn=input_fn) 330eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn) 331tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec) 332``` 333 334Then you will see the training logs on your laptop. You can terminate the 335training by terminating your process on your laptop. You can also modify your 336code and run a new model against the same cluster. 337 338We've been optimizing the performance of standalone client mode. If you notice 339high latency between your laptop and your cluster, you can reduce that latency 340by running your model binary in the cluster. 341 342## Caveats 343 344This feature is in early stages and there are a lot of improvements forthcoming: 345 346* Summaries are only computed in the first tower in `MirroredStrategy`. 347* Eager support is in the works; performance can be more challenging with eager 348execution. 349* We currently support the following predefined Keras callbacks: 350`ModelCheckpointCallback`, `TensorBoardCallback`. We will soon be adding support for 351some of the other callbacks such as `EarlyStopping`, `ReduceLROnPlateau`, etc. If you 352create your own callback, you will not have access to all model properties and 353validation data. 354* If you are [`batching`](https://www.tensorflow.org/api_docs/python/tf/data/Dataset#batch) 355your input data, we will place one batch on each GPU in each step. So your 356effective batch size will be `num_gpus * batch_size`. Therefore, consider 357adjusting your learning rate or batch size according to the number of GPUs. 358We are working on addressing this limitation by splitting each batch across GPUs 359instead. 360* PartitionedVariables are not supported yet. 361 362## What's next? 363 364Please give distribution strategies a try. This feature is in early stages and 365is evolving, so we welcome your feedback via 366[issues on GitHub](https://github.com/tensorflow/tensorflow/issues/new). 367 368 369