• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Benchmarks using custom training loop on MNIST dataset."""
16from __future__ import absolute_import
17from __future__ import division
18from __future__ import print_function
19
20import timeit
21import numpy as np
22
23import tensorflow as tf
24
25from tensorflow.python.keras.benchmarks import benchmark_util
26from tensorflow.python.keras.benchmarks import distribution_util
27
28
29class CustomMnistBenchmark(tf.test.Benchmark):
30  """Benchmarks for custom training loop using `tf.test.Benchmark`."""
31
32  def __init__(self):
33    super(CustomMnistBenchmark, self).__init__()
34    self.num_classes = 10
35    self.input_shape = (28, 28, 1)
36    self.epochs = 15
37    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
38    x_train = x_train.astype('float32') / 255
39    x_train = np.expand_dims(x_train, -1)
40    y_train = tf.keras.utils.to_categorical(y_train, self.num_classes)
41    self.num_examples = x_train.shape[0]
42    #  Use `tf.data.Dataset` for custom training loop.
43    self.train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
44
45  def _build_model(self):
46    """Model from https://keras.io/examples/vision/mnist_convnet/."""
47    model = tf.keras.Sequential([
48        tf.keras.Input(shape=self.input_shape),
49        tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation='relu'),
50        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
51        tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
52        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
53        tf.keras.layers.Flatten(),
54        tf.keras.layers.Dropout(0.5),
55        tf.keras.layers.Dense(self.num_classes, activation='softmax'),
56    ])
57
58    return model
59
60  def compute_loss(self, targets, predictions, loss_fn, batch_size):
61    """Compute average loss."""
62    per_example_loss = loss_fn(targets, predictions)
63    return tf.nn.compute_average_loss(
64        per_example_loss, global_batch_size=batch_size)
65
66  @tf.function(experimental_relax_shapes=True)
67  def train_step(self, inputs, model, loss_fn, optimizer, batch_size):
68    """Compute loss and optimize model by optimizer.
69
70    Args:
71      inputs: `tf.data`.
72      model: See `model` in `train_function()` method.
73      loss_fn: See `loss_fn` in `train_function()` method.
74      optimizer: See `optimizer` in `train_function()` method.
75      batch_size: See `batch_size` in `train_function()` method.
76
77    Returns:
78      Loss value.
79    """
80    train_x, train_y = inputs
81    with tf.GradientTape() as tape:
82      predictions = model(train_x, training=True)
83      loss = self.compute_loss(train_y, predictions, loss_fn, batch_size)
84    grads = tape.gradient(loss, model.trainable_weights)
85    optimizer.apply_gradients(zip(grads, model.trainable_weights))
86    return loss
87
88  @tf.function(experimental_relax_shapes=True)
89  def distributed_train_step(self, batch_dataset, model, loss_fn, optimizer,
90                             batch_size, distribution_strategy):
91    """Train step in distribution strategy setting.
92
93    Args:
94      batch_dataset: `tf.data`.
95      model: See `model` in `train_function()` method.
96      loss_fn: See `loss_fn` in `train_function()` method.
97      optimizer: See `optimizer` in `train_function()` method.
98      batch_size: See `batch_size` in `train_function()` method.
99      distribution_strategy: See `distribution_strategy` in `train_function()`
100        method.
101
102    Returns:
103      Sum of per_replica_losses.
104    """
105    per_replica_losses = distribution_strategy.run(
106        self.train_step,
107        args=(
108            batch_dataset,
109            model,
110            loss_fn,
111            optimizer,
112            batch_size,
113        ))
114    return distribution_strategy.reduce(
115        tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
116
117  def train_function(self,
118                     model,
119                     train_dataset,
120                     loss_fn,
121                     optimizer,
122                     epochs=2,
123                     distribution_strategy=None,
124                     batch_size=256):
125    """Train model in custom training loop and return average
126
127    train_step_time.
128
129    Args:
130      model: Model function to be benchmarked.
131      train_dataset: `tf.data` dataset. Should return a tuple of either (inputs,
132        targets) or (inputs, targets, sample_weights).
133      loss_fn: `tf.keras.losses.Loss` instance.
134      optimizer: `tf.keras.optimizers` instance.
135      epochs: Integer. Number of epochs to train the model. If unspecified,
136        `epochs` will default to 2.
137      distribution_strategy: Distribution strategies. It could be
138        `multi_worker_mirrored`, `one_device`, `mirrored`. If unspecified,
139        `distribution_strategy` will default to 'off'. Note that, `TPU` and
140        `parameter_server` are not supported yet.
141      batch_size: Integer. Number of samples per gradient update. If
142        unspecified, `batch_size` will default to 32.
143
144    Returns:
145      Average train_step_time.
146    """
147    train_step_time_list = []
148    timer = timeit.default_timer
149
150    total_loss = 0.0
151    num_batches = 0
152    for _ in range(epochs):
153      # Iterate over the batches of the dataset.
154      for batch_dataset in train_dataset:
155
156        start_time = timer()
157
158        if distribution_strategy is not None:
159          total_loss += self.distributed_train_step(batch_dataset, model,
160                                                    loss_fn, optimizer,
161                                                    batch_size,
162                                                    distribution_strategy)
163        else:
164          total_loss += self.train_step(batch_dataset, model, loss_fn,
165                                        optimizer, batch_size)
166        num_batches += 1
167
168        end_time = timer()
169        train_step_time_list.append(end_time - start_time)
170
171    return np.mean(train_step_time_list)
172
173  def measure_performance(self,
174                          model,
175                          dataset,
176                          loss_fn,
177                          optimizer,
178                          batch_size=32,
179                          run_iters=4,
180                          epochs=10,
181                          distribution_strategy=None):
182    """Run models and measure the performance.
183
184    Args:
185      model_fn: Model function to be benchmarked.
186      dataset: `tf.data` dataset. Should return a tuple of either (inputs,
187        targets) or (inputs, targets, sample_weights).
188      loss_fn: `tf.keras.losses.Loss` instance.
189      optimizer: `tf.keras.optimizers` instance.
190      batch_size: Integer. Number of samples per gradient update. If
191        unspecified, `batch_size` will default to 32.
192      run_iters: Integer. Number of iterations to run the performance
193        measurement. If unspecified, `run_iters` will default to 4.
194      epochs: Integer. Number of epochs to train the model. If unspecified,
195        `epochs` will default to 10.
196      distribution_strategy: Distribution strategies. It could be
197        `multi_worker_mirrored`, `one_device`, `mirrored`. If unspecified,
198        `distribution_strategy` will default to 'off'. Note that, `TPU` and
199        `parameter_server` are not supported yet.
200
201    Returns:
202      Performance summary, which contains build_time, avg_epoch_time,
203      wall_time, exp_per_sec, epochs, warmup_time, train_step_time.
204
205    Raise:
206      ValueError: if `dataset` is None or if `optimizer` instance is
207      not provided or if `loss_fn` instance is not provided.
208    """
209    if distribution_strategy is not None and \
210      not isinstance(dataset, tf.distribute.DistributedDataset):
211      raise ValueError('tf.distribute.DistributedDataset'
212                       ' required in distribution strategy.')
213
214    if distribution_strategy is None and \
215      not isinstance(dataset, tf.data.Dataset):
216      raise ValueError('`tf.data` is required.')
217
218    if not isinstance(loss_fn, tf.keras.losses.Loss):
219      raise ValueError('`tf.keras.losses.Loss` instance '
220                       'for loss_fn is required.')
221
222    if not isinstance(optimizer, tf.keras.optimizers.Optimizer):
223      raise ValueError('`tf.keras.optimizers` instance '
224                       'for optimizer is required.')
225
226    avg_epoch_time_list, train_step_time_list = [], []
227    wall_time_list, exp_per_sec_list, warmup_time_list = [], [], []
228
229    total_num_examples = epochs * self.num_examples
230
231    for _ in range(run_iters):
232      timer = timeit.default_timer
233      start_time = timer()
234      t1 = timer()
235      self.train_function(model, dataset, loss_fn, optimizer, 1,
236                          distribution_strategy, batch_size)
237      warmup_time = timer() - t1
238
239      t2 = timer()
240      train_step_time = self.train_function(model, dataset, loss_fn, optimizer,
241                                            epochs, distribution_strategy,
242                                            batch_size)
243      end_time = timer()
244
245      train_step_time_list.append(train_step_time)
246      warmup_time_list.append(warmup_time)
247      wall_time_list.append(end_time - start_time)
248      exp_per_sec_list.append(total_num_examples / (end_time - t2))
249      avg_epoch_time_list.append((end_time - t2) / epochs)
250
251    metrics = []
252    metrics.append({
253        'name': 'avg_epoch_time',
254        'value': np.mean(avg_epoch_time_list)
255    })
256    metrics.append({'name': 'exp_per_sec', 'value': np.mean(exp_per_sec_list)})
257    metrics.append({'name': 'warmup_time', 'value': np.mean(warmup_time_list)})
258    metrics.append({
259        'name': 'train_step_time',
260        'value': np.mean(train_step_time_list)
261    })
262    metrics.append({'name': 'epochs', 'value': epochs})
263
264    wall_time = np.mean(wall_time_list)
265
266    return metrics, wall_time
267
268  def benchmark_custom_training_mnist_bs_128(self):
269    """Measure performance with batch_size=128 and run_iters=5."""
270    batch_size = 128
271    run_iters = 5
272    train_dataset = self.train_dataset.shuffle(
273        buffer_size=1024).batch(batch_size)
274
275    # Instantiate a loss function.
276    loss_fn = tf.keras.losses.CategoricalCrossentropy(
277        reduction=tf.keras.losses.Reduction.NONE)
278    # Instantiate an optimizer to train the model.
279    optimizer = tf.keras.optimizers.Adam()
280    model = self._build_model()
281
282    metrics, wall_time = self.measure_performance(model, train_dataset, loss_fn,
283                                                  optimizer, batch_size,
284                                                  run_iters, self.epochs)
285    extras = benchmark_util.get_keras_examples_metadata('conv', batch_size,
286                                                        '.keras.ctl_graph')
287    self.report_benchmark(
288        iters=run_iters, wall_time=wall_time, metrics=metrics, extras=extras)
289
290  def benchmark_custom_training_mnist_bs_256(self):
291    """Measure performance with batch_size=256 and run_iters=5."""
292    batch_size = 256
293    run_iters = 5
294    train_dataset = self.train_dataset.shuffle(
295        buffer_size=1024).batch(batch_size)
296
297    # Instantiate a loss function.
298    loss_fn = tf.keras.losses.CategoricalCrossentropy(
299        reduction=tf.keras.losses.Reduction.NONE)
300    # Instantiate an optimizer to train the model.
301    optimizer = tf.keras.optimizers.Adam()
302    model = self._build_model()
303
304    metrics, wall_time = self.measure_performance(model, train_dataset, loss_fn,
305                                                  optimizer, batch_size,
306                                                  run_iters, self.epochs)
307    extras = benchmark_util.get_keras_examples_metadata('conv', batch_size,
308                                                        '.keras.ctl_graph')
309    self.report_benchmark(
310        iters=run_iters, wall_time=wall_time, metrics=metrics, extras=extras)
311
312  def benchmark_custom_training_mnist_bs_512(self):
313    """Measure performance with batch_size=512 and run_iters=10."""
314    batch_size = 512
315    run_iters = 5
316    train_dataset = self.train_dataset.shuffle(
317        buffer_size=1024).batch(batch_size)
318
319    # Instantiate a loss function.
320    loss_fn = tf.keras.losses.CategoricalCrossentropy(
321        reduction=tf.keras.losses.Reduction.NONE)
322    # Instantiate an optimizer to train the model.
323    optimizer = tf.keras.optimizers.Adam()
324    model = self._build_model()
325
326    metrics, wall_time = self.measure_performance(model, train_dataset, loss_fn,
327                                                  optimizer, batch_size,
328                                                  run_iters, self.epochs)
329    extras = benchmark_util.get_keras_examples_metadata('conv', batch_size,
330                                                        '.keras.ctl_graph')
331    self.report_benchmark(
332        iters=run_iters, wall_time=wall_time, metrics=metrics, extras=extras)
333
334  def benchmark_custom_training_mnist_bs_512_gpu_2(self):
335    """Measure performance with batch_size=512, run_iters=10, gpu=2 and
336
337    distribution_strategy='mirrored'.
338    """
339    batch_size = 512
340    run_iters = 10
341    train_dataset = self.train_dataset.shuffle(
342        buffer_size=1024).batch(batch_size)
343
344    distribution_strategy = 'mirrored'
345
346    strategy = distribution_util.get_distribution_strategy(
347        distribution_strategy=distribution_strategy, num_gpus=2)
348
349    if distribution_strategy != 'off':
350      train_dataset = strategy.experimental_distribute_dataset(train_dataset)
351
352    strategy_scope = distribution_util.get_strategy_scope(strategy)
353
354    with strategy_scope:
355      # Instantiate a loss function.
356      loss_fn = tf.keras.losses.CategoricalCrossentropy(
357          reduction=tf.keras.losses.Reduction.NONE)
358      # Instantiate an optimizer to train the model.
359      optimizer = tf.keras.optimizers.Adam()
360      model = self._build_model()
361
362    metrics, wall_time = self.measure_performance(model, train_dataset, loss_fn,
363                                                  optimizer, batch_size,
364                                                  run_iters, self.epochs,
365                                                  strategy)
366    extras = benchmark_util.get_keras_examples_metadata('conv', batch_size,
367                                                        '.keras.ctl_graph')
368    self.report_benchmark(
369        iters=run_iters, wall_time=wall_time, metrics=metrics, extras=extras)
370
371
372if __name__ == '__main__':
373  tf.test.main()
374