• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Distribution Strategy
2
3> *NOTE*: This is an experimental feature. The API and performance
4> characteristics are subject to change.
5
6## Overview
7
8[`DistributionStrategy`](https://www.tensorflow.org/versions/master/api_docs/python/tf/contrib/distribute/DistributionStrategy)
9API is an easy way to distribute your training
10across multiple devices/machines. Our goal is to allow users to use existing
11models and training code with minimal changes to enable distributed training.
12Moreover, we've designed the API in such a way that it works with both eager and
13graph execution.
14
15Currently we support several types of strategies:
16
17* [`MirroredStrategy`](https://www.tensorflow.org/versions/master/api_docs/python/tf/contrib/distribute/MirroredStrategy):
18This does in-graph replication with synchronous training
19on many GPUs on one machine. Essentially, we create copies of all variables in
20the model's layers on each device. We then use all-reduce to combine gradients
21across the devices before applying them to the variables to keep them in sync.
22* [`CollectiveAllReduceStrategy`](https://www.tensorflow.org/versions/master/api_docs/python/tf/contrib/distribute/CollectiveAllReduceStrategy):
23This is a version of `MirroredStrategy` for multi-worker training. It uses
24a collective op to do all-reduce. This supports between-graph communication and
25synchronization, and delegates the specifics of the all-reduce implementation to
26the runtime (as opposed to encoding it in the graph). This allows it to perform
27optimizations like batching and switch between plugins that support different
28hardware or algorithms. In the future, this strategy will implement
29fault-tolerance to allow training to continue when there is worker failure.
30
31* [`ParameterServerStrategy`](https://www.tensorflow.org/versions/master/api_docs/python/tf/contrib/distribute/ParameterServerStrategy):
32This strategy supports using parameter servers either for multi-GPU local
33training or asynchronous multi-machine training. When used to train locally,
34variables are not mirrored, instead they are placed on the CPU and operations
35are replicated across all local GPUs. In a multi-machine setting, some are
36designated as workers and some as parameter servers. Each variable is placed on
37one parameter server. Computation operations are replicated across all GPUs of
38the workers.
39
40## Multi-GPU Training
41
42## Example with Keras API
43
44Let's see how to scale to multiple GPUs on one machine using `MirroredStrategy` with [tf.keras] (https://www.tensorflow.org/guide/keras).
45
46Let's define a simple input dataset for training this model. Note that currently we require using
47[`tf.data.Dataset`](https://www.tensorflow.org/api_docs/python/tf/data/Dataset)
48with `DistributionStrategy`.
49
50```python
51import tensorflow as tf
52from tensorflow import keras
53
54features = tf.data.Dataset.from_tensors([1.]).repeat(10000).batch(10)
55labels = tf.data.Dataset.from_tensors([1.]).repeat(10000).batch(10)
56train_dataset = tf.data.Dataset.zip((features, labels))
57```
58
59To distribute this Keras model on multiple GPUs using `MirroredStrategy` we
60first instantiate a `MirroredStrategy` object.
61
62```python
63distribution = tf.contrib.distribute.MirroredStrategy()
64```
65
66Take a very simple model consisting of a single layer. We need to create and compile
67the model under the distribution strategy scope.
68
69```python
70with distribution.scope():
71  inputs = tf.keras.layers.Input(shape=(1,))
72  predictions = tf.keras.layers.Dense(1)(inputs)
73  model = tf.keras.models.Model(inputs=inputs, outputs=predictions)
74
75  model.compile(loss='mean_squared_error',
76                optimizer=tf.train.GradientDescentOptimizer(learning_rate=0.2))
77```
78
79To train the model we call Keras `fit` API using the input dataset that we
80created earlier, same as how we would in a non-distributed case.
81
82```python
83model.fit(train_dataset, epochs=5, steps_per_epoch=10)
84```
85
86Similarly, we can also call `evaluate` and `predict` as before using appropriate
87datasets.
88
89```python
90model.evaluate(eval_dataset, steps=1)
91model.predict(predict_dataset, steps=1)
92```
93
94That's all you need to train your model with Keras on multiple GPUs with
95`MirroredStrategy`. It will take care of splitting up
96the input dataset, replicating layers and variables on each device, and
97combining and applying gradients.
98
99The model and input code does not have to change because we have changed the
100underlying components of TensorFlow (such as
101optimizer, batch norm and summaries) to become distribution-aware.
102That means those components know how to
103combine their state across devices. Further, saving and checkpointing works
104seamlessly, so you can save with one or no distribution strategy and resume with
105another.
106
107
108## Example with Estimator API
109
110You can also use Distribution Strategy API with [`Estimator`](https://www.tensorflow.org/api_docs/python/tf/estimator/Estimator). Let's see a simple example of it's usage with `MirroredStrategy`.
111
112
113Consider a very simple model function which tries to learn a simple function.
114
115```python
116def model_fn(features, labels, mode):
117  layer = tf.layers.Dense(1)
118  logits = layer(features)
119
120  if mode == tf.estimator.ModeKeys.PREDICT:
121    predictions = {"logits": logits}
122    return tf.estimator.EstimatorSpec(mode, predictions=predictions)
123
124  loss = tf.losses.mean_squared_error(
125      labels=labels, predictions=tf.reshape(logits, []))
126
127  if mode == tf.estimator.ModeKeys.EVAL:
128    return tf.estimator.EstimatorSpec(mode, loss=loss)
129
130  if mode == tf.estimator.ModeKeys.TRAIN:
131    train_op = tf.train.GradientDescentOptimizer(0.2).minimize(loss)
132    return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
133```
134
135Again, let's define a simple input function to feed data for training this model.
136
137
138```python
139def input_fn():
140  features = tf.data.Dataset.from_tensors([[1.]]).repeat(100)
141  labels = tf.data.Dataset.from_tensors(1.).repeat(100)
142  return tf.data.Dataset.zip((features, labels))
143```
144
145Now that we have a model function and input function defined, we can define the
146estimator. To use `MirroredStrategy`, all we need to do is:
147
148* Create an instance of the `MirroredStrategy` class.
149* Pass it to the
150[`RunConfig`](https://www.tensorflow.org/api_docs/python/tf/estimator/RunConfig)
151parameter of `Estimator`.
152
153
154```python
155distribution = tf.contrib.distribute.MirroredStrategy()
156config = tf.estimator.RunConfig(train_distribute=distribution)
157classifier = tf.estimator.Estimator(model_fn=model_fn, config=config)
158classifier.train(input_fn=input_fn)
159classifier.evaluate(input_fn=input_fn)
160```
161
162That's it! This change will now configure estimator to run on all GPUs on your
163machine.
164
165
166## Customization and Performance Tips
167
168Above, we showed the easiest way to use [`MirroredStrategy`](https://www.tensorflow.org/versions/master/api_docs/python/tf/contrib/distribute/MirroredStrategy#__init__).
169There are few things you can customize in practice:
170
171* You can specify a list of specific GPUs (using param `devices`) or the number
172of GPUs (using param `num_gpus`), in case you don't want auto detection.
173* You can specify various parameters for all reduce with the `cross_tower_ops`
174param, such as the all reduce algorithm to use, and gradient repacking.
175
176We've tried to make it such that you get the best performance for your existing
177model. We also recommend you follow the tips from
178[Input Pipeline Performance Guide](https://www.tensorflow.org/performance/datasets_performance).
179Specifically, we found using [`map_and_batch`](https://www.tensorflow.org/performance/datasets_performance#map_and_batch)
180and [`dataset.prefetch`](https://www.tensorflow.org/performance/datasets_performance#pipelining)
181in the input function gives a solid boost in performance. When using
182`dataset.prefetch`, use `buffer_size=None` to let it detect optimal buffer size.
183
184## Multi-worker Training
185### Overview
186
187For multi-worker training, no code change is required to the `Estimator` code.
188You can run the same model code for all tasks in your cluster including
189parameter servers and the evaluator. But you need to use
190`tf.estimator.train_and_evaluate`, explicitly specify `num_gpus_per_workers`
191for your strategy object, and set "TF\_CONFIG" environment variables for each
192binary running in your cluster. We'll provide a Kubernetes template in the
193[tensorflow/ecosystem](https://github.com/tensorflow/ecosystem) repo which sets
194"TF\_CONFIG" for your training tasks.
195
196### TF\_CONFIG environment variable
197
198The "TF\_CONFIG" environment variables is a JSON string which specifies what
199tasks constitute a cluster, their addresses and each task's role in the cluster.
200One example of "TF\_CONFIG" is:
201
202```python
203TF_CONFIG='{
204    "cluster": {
205        "worker": ["host1:port", "host2:port", "host3:port"],
206        "ps": ["host4:port", "host5:port"]
207    },
208   "task": {"type": "worker", "index": 1}
209}'
210```
211
212This "TF\_CONFIG" specifies that there are three workers and two ps tasks in the
213cluster along with their hosts and ports. The "task" part specifies that the
214role of the current task in the cluster, worker 1. Valid roles in a cluster is
215"chief", "worker", "ps" and "evaluator". There should be no "ps" job for
216`CollectiveAllReduceStrategy` and `MirroredStrategy`. The "evaluator" job is
217optional and can have at most one task. It does single machine evaluation and if
218you don't want to do evaluation, you can pass in a dummy `input_fn` to the
219`tf.estimator.EvalSpec` of `tf.estimator.train_and_evaluate`.
220
221### Dataset
222
223The `input_fn` you provide to estimator code is for one worker. So remember to
224scale up your batch if you have multiple GPUs on each worker.
225
226The same `input_fn` will be used for all workers if you use
227`CollectiveAllReduceStrategy` and `ParameterServerStrategy`. Therefore it is
228important to shuffle your dataset in your `input_fn`.
229
230`MirroredStrategy` will insert a `tf.dataset.Dataset.shard` call in you
231`input_fn` if `auto_shard_dataset` is set to `True`. As a result, each worker
232gets a fraction of your input data.
233
234### Performance Tips
235
236We have been actively working on multi-worker performance. Currently, prefer
237`CollectiveAllReduceStrategy` for synchronous multi-worker training.
238
239### Example
240
241Let's use the same example for multi-worker. We'll start a cluster with 3
242workers doing synchronous all-reduce training. In the following code snippet, we
243start multi-worker training using `tf.estimator.train_and_evaluate`:
244
245```python
246def model_main():
247  distribution = tf.contrib.distribute.CollectiveAllReduceStrategy(
248      num_gpus_per_worker=2)
249  config = tf.estimator.RunConfig(train_distribute=distribution)
250  estimator = tf.estimator.Estimator(model_fn=model_fn, config=config)
251  train_spec = tf.estimator.TrainSpec(input_fn=input_fn)
252  eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn)
253  tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
254```
255
256**Note**: You don't have to set "TF\_CONFIG" manually if you use our provided
257Kubernetes template.
258
259You'll then need 3 machines, find out their host addresses and one available
260port on each machine. Then set  "TF\_CONFIG" in each binary and run the above
261model code.
262
263In your worker 0, run:
264
265```python
266os.environ["TF_CONFIG"] = json.dumps({
267    "cluster": {
268        "worker": ["host1:port", "host2:port", "host3:port"]
269    },
270   "task": {"type": "worker", "index": 0}
271})
272
273# Call the model_main function defined above.
274model_main()
275```
276
277In your worker 1, run:
278
279```python
280os.environ["TF_CONFIG"] = json.dumps({
281    "cluster": {
282        "worker": ["host1:port", "host2:port", "host3:port"]
283    },
284   "task": {"type": "worker", "index": 1}
285})
286
287# Call the model_main function defined above.
288model_main()
289```
290
291In your worker 2, run:
292
293```python
294os.environ["TF_CONFIG"] = json.dumps({
295    "cluster": {
296        "worker": ["host1:port", "host2:port", "host3:port"]
297    },
298   "task": {"type": "worker", "index": 2}
299})
300
301# Call the model_main function defined above.
302model_main()
303```
304
305Then you'll find your cluster has started training! You can inspect the logs of
306workers or start a tensorboard.
307
308### Standalone client mode
309
310We have a new way to run distributed training. You can bring up standard
311tensorflow servers in your cluster and run your model code anywhere such as on
312your laptop.
313
314In the above example, instead of calling `model_main`, you can call
315`tf.contrib.distribute.run_standard_tensorflow_server().join()`. This will bring
316up a cluster running standard tensorflow servers which wait for your request to
317start training.
318
319On your laptop, you can run
320
321```python
322distribution = tf.contrib.distribute.CollectiveAllReduceStrategy(
323    num_gpus_per_worker=2)
324config = tf.estimator.RunConfig(
325    experimental_distribute=tf.contrib.distribute.DistributeConfig(
326        train_distribute=distribution,
327        remote_cluster={"worker": ["host1:port", "host2:port", "host3:port"]}))
328estimator = tf.estimator.Estimator(model_fn=model_fn, config=config)
329train_spec = tf.estimator.TrainSpec(input_fn=input_fn)
330eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn)
331tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
332```
333
334Then you will see the training logs on your laptop. You can terminate the
335training by terminating your process on your laptop. You can also modify your
336code and run a new model against the same cluster.
337
338We've been optimizing the performance of standalone client mode. If you notice
339high latency between your laptop and your cluster, you can reduce that latency
340by running your model binary in the cluster.
341
342## Caveats
343
344This feature is in early stages and there are a lot of improvements forthcoming:
345
346* Summaries are only computed in the first tower in `MirroredStrategy`.
347* Eager support is in the works; performance can be more challenging with eager
348execution.
349* We currently support the following predefined Keras callbacks:
350`ModelCheckpointCallback`, `TensorBoardCallback`. We will soon be adding support for
351some of the other callbacks such as `EarlyStopping`, `ReduceLROnPlateau`, etc. If you
352create your own callback, you will not have access to all model properties and
353validation data.
354* If you are [`batching`](https://www.tensorflow.org/api_docs/python/tf/data/Dataset#batch)
355your input data, we will place one batch on each GPU in each step. So your
356effective batch size will be `num_gpus * batch_size`. Therefore, consider
357adjusting your learning rate or batch size according to the number of GPUs.
358We are working on addressing this limitation by splitting each batch across GPUs
359instead.
360* PartitionedVariables are not supported yet.
361
362## What's next?
363
364Please give distribution strategies a try. This feature is in early stages and
365is evolving, so we welcome your feedback via
366[issues on GitHub](https://github.com/tensorflow/tensorflow/issues/new).
367
368
369