1# Copyright 2020 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Benchmarks using custom training loop on MNIST dataset.""" 16from __future__ import absolute_import 17from __future__ import division 18from __future__ import print_function 19 20import timeit 21import numpy as np 22 23import tensorflow as tf 24 25from tensorflow.python.keras.benchmarks import benchmark_util 26from tensorflow.python.keras.benchmarks import distribution_util 27 28 29class CustomMnistBenchmark(tf.test.Benchmark): 30 """Benchmarks for custom training loop using `tf.test.Benchmark`.""" 31 32 def __init__(self): 33 super(CustomMnistBenchmark, self).__init__() 34 self.num_classes = 10 35 self.input_shape = (28, 28, 1) 36 self.epochs = 15 37 (x_train, y_train), _ = tf.keras.datasets.mnist.load_data() 38 x_train = x_train.astype('float32') / 255 39 x_train = np.expand_dims(x_train, -1) 40 y_train = tf.keras.utils.to_categorical(y_train, self.num_classes) 41 self.num_examples = x_train.shape[0] 42 # Use `tf.data.Dataset` for custom training loop. 43 self.train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) 44 45 def _build_model(self): 46 """Model from https://keras.io/examples/vision/mnist_convnet/.""" 47 model = tf.keras.Sequential([ 48 tf.keras.Input(shape=self.input_shape), 49 tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation='relu'), 50 tf.keras.layers.MaxPooling2D(pool_size=(2, 2)), 51 tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu'), 52 tf.keras.layers.MaxPooling2D(pool_size=(2, 2)), 53 tf.keras.layers.Flatten(), 54 tf.keras.layers.Dropout(0.5), 55 tf.keras.layers.Dense(self.num_classes, activation='softmax'), 56 ]) 57 58 return model 59 60 def compute_loss(self, targets, predictions, loss_fn, batch_size): 61 """Compute average loss.""" 62 per_example_loss = loss_fn(targets, predictions) 63 return tf.nn.compute_average_loss( 64 per_example_loss, global_batch_size=batch_size) 65 66 @tf.function(experimental_relax_shapes=True) 67 def train_step(self, inputs, model, loss_fn, optimizer, batch_size): 68 """Compute loss and optimize model by optimizer. 69 70 Args: 71 inputs: `tf.data`. 72 model: See `model` in `train_function()` method. 73 loss_fn: See `loss_fn` in `train_function()` method. 74 optimizer: See `optimizer` in `train_function()` method. 75 batch_size: See `batch_size` in `train_function()` method. 76 77 Returns: 78 Loss value. 79 """ 80 train_x, train_y = inputs 81 with tf.GradientTape() as tape: 82 predictions = model(train_x, training=True) 83 loss = self.compute_loss(train_y, predictions, loss_fn, batch_size) 84 grads = tape.gradient(loss, model.trainable_weights) 85 optimizer.apply_gradients(zip(grads, model.trainable_weights)) 86 return loss 87 88 @tf.function(experimental_relax_shapes=True) 89 def distributed_train_step(self, batch_dataset, model, loss_fn, optimizer, 90 batch_size, distribution_strategy): 91 """Train step in distribution strategy setting. 92 93 Args: 94 batch_dataset: `tf.data`. 95 model: See `model` in `train_function()` method. 96 loss_fn: See `loss_fn` in `train_function()` method. 97 optimizer: See `optimizer` in `train_function()` method. 98 batch_size: See `batch_size` in `train_function()` method. 99 distribution_strategy: See `distribution_strategy` in `train_function()` 100 method. 101 102 Returns: 103 Sum of per_replica_losses. 104 """ 105 per_replica_losses = distribution_strategy.run( 106 self.train_step, 107 args=( 108 batch_dataset, 109 model, 110 loss_fn, 111 optimizer, 112 batch_size, 113 )) 114 return distribution_strategy.reduce( 115 tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None) 116 117 def train_function(self, 118 model, 119 train_dataset, 120 loss_fn, 121 optimizer, 122 epochs=2, 123 distribution_strategy=None, 124 batch_size=256): 125 """Train model in custom training loop and return average 126 127 train_step_time. 128 129 Args: 130 model: Model function to be benchmarked. 131 train_dataset: `tf.data` dataset. Should return a tuple of either (inputs, 132 targets) or (inputs, targets, sample_weights). 133 loss_fn: `tf.keras.losses.Loss` instance. 134 optimizer: `tf.keras.optimizers` instance. 135 epochs: Integer. Number of epochs to train the model. If unspecified, 136 `epochs` will default to 2. 137 distribution_strategy: Distribution strategies. It could be 138 `multi_worker_mirrored`, `one_device`, `mirrored`. If unspecified, 139 `distribution_strategy` will default to 'off'. Note that, `TPU` and 140 `parameter_server` are not supported yet. 141 batch_size: Integer. Number of samples per gradient update. If 142 unspecified, `batch_size` will default to 32. 143 144 Returns: 145 Average train_step_time. 146 """ 147 train_step_time_list = [] 148 timer = timeit.default_timer 149 150 total_loss = 0.0 151 num_batches = 0 152 for _ in range(epochs): 153 # Iterate over the batches of the dataset. 154 for batch_dataset in train_dataset: 155 156 start_time = timer() 157 158 if distribution_strategy is not None: 159 total_loss += self.distributed_train_step(batch_dataset, model, 160 loss_fn, optimizer, 161 batch_size, 162 distribution_strategy) 163 else: 164 total_loss += self.train_step(batch_dataset, model, loss_fn, 165 optimizer, batch_size) 166 num_batches += 1 167 168 end_time = timer() 169 train_step_time_list.append(end_time - start_time) 170 171 return np.mean(train_step_time_list) 172 173 def measure_performance(self, 174 model, 175 dataset, 176 loss_fn, 177 optimizer, 178 batch_size=32, 179 run_iters=4, 180 epochs=10, 181 distribution_strategy=None): 182 """Run models and measure the performance. 183 184 Args: 185 model_fn: Model function to be benchmarked. 186 dataset: `tf.data` dataset. Should return a tuple of either (inputs, 187 targets) or (inputs, targets, sample_weights). 188 loss_fn: `tf.keras.losses.Loss` instance. 189 optimizer: `tf.keras.optimizers` instance. 190 batch_size: Integer. Number of samples per gradient update. If 191 unspecified, `batch_size` will default to 32. 192 run_iters: Integer. Number of iterations to run the performance 193 measurement. If unspecified, `run_iters` will default to 4. 194 epochs: Integer. Number of epochs to train the model. If unspecified, 195 `epochs` will default to 10. 196 distribution_strategy: Distribution strategies. It could be 197 `multi_worker_mirrored`, `one_device`, `mirrored`. If unspecified, 198 `distribution_strategy` will default to 'off'. Note that, `TPU` and 199 `parameter_server` are not supported yet. 200 201 Returns: 202 Performance summary, which contains build_time, avg_epoch_time, 203 wall_time, exp_per_sec, epochs, warmup_time, train_step_time. 204 205 Raise: 206 ValueError: if `dataset` is None or if `optimizer` instance is 207 not provided or if `loss_fn` instance is not provided. 208 """ 209 if distribution_strategy is not None and \ 210 not isinstance(dataset, tf.distribute.DistributedDataset): 211 raise ValueError('tf.distribute.DistributedDataset' 212 ' required in distribution strategy.') 213 214 if distribution_strategy is None and \ 215 not isinstance(dataset, tf.data.Dataset): 216 raise ValueError('`tf.data` is required.') 217 218 if not isinstance(loss_fn, tf.keras.losses.Loss): 219 raise ValueError('`tf.keras.losses.Loss` instance ' 220 'for loss_fn is required.') 221 222 if not isinstance(optimizer, tf.keras.optimizers.Optimizer): 223 raise ValueError('`tf.keras.optimizers` instance ' 224 'for optimizer is required.') 225 226 avg_epoch_time_list, train_step_time_list = [], [] 227 wall_time_list, exp_per_sec_list, warmup_time_list = [], [], [] 228 229 total_num_examples = epochs * self.num_examples 230 231 for _ in range(run_iters): 232 timer = timeit.default_timer 233 start_time = timer() 234 t1 = timer() 235 self.train_function(model, dataset, loss_fn, optimizer, 1, 236 distribution_strategy, batch_size) 237 warmup_time = timer() - t1 238 239 t2 = timer() 240 train_step_time = self.train_function(model, dataset, loss_fn, optimizer, 241 epochs, distribution_strategy, 242 batch_size) 243 end_time = timer() 244 245 train_step_time_list.append(train_step_time) 246 warmup_time_list.append(warmup_time) 247 wall_time_list.append(end_time - start_time) 248 exp_per_sec_list.append(total_num_examples / (end_time - t2)) 249 avg_epoch_time_list.append((end_time - t2) / epochs) 250 251 metrics = [] 252 metrics.append({ 253 'name': 'avg_epoch_time', 254 'value': np.mean(avg_epoch_time_list) 255 }) 256 metrics.append({'name': 'exp_per_sec', 'value': np.mean(exp_per_sec_list)}) 257 metrics.append({'name': 'warmup_time', 'value': np.mean(warmup_time_list)}) 258 metrics.append({ 259 'name': 'train_step_time', 260 'value': np.mean(train_step_time_list) 261 }) 262 metrics.append({'name': 'epochs', 'value': epochs}) 263 264 wall_time = np.mean(wall_time_list) 265 266 return metrics, wall_time 267 268 def benchmark_custom_training_mnist_bs_128(self): 269 """Measure performance with batch_size=128 and run_iters=5.""" 270 batch_size = 128 271 run_iters = 5 272 train_dataset = self.train_dataset.shuffle( 273 buffer_size=1024).batch(batch_size) 274 275 # Instantiate a loss function. 276 loss_fn = tf.keras.losses.CategoricalCrossentropy( 277 reduction=tf.keras.losses.Reduction.NONE) 278 # Instantiate an optimizer to train the model. 279 optimizer = tf.keras.optimizers.Adam() 280 model = self._build_model() 281 282 metrics, wall_time = self.measure_performance(model, train_dataset, loss_fn, 283 optimizer, batch_size, 284 run_iters, self.epochs) 285 extras = benchmark_util.get_keras_examples_metadata('conv', batch_size, 286 '.keras.ctl_graph') 287 self.report_benchmark( 288 iters=run_iters, wall_time=wall_time, metrics=metrics, extras=extras) 289 290 def benchmark_custom_training_mnist_bs_256(self): 291 """Measure performance with batch_size=256 and run_iters=5.""" 292 batch_size = 256 293 run_iters = 5 294 train_dataset = self.train_dataset.shuffle( 295 buffer_size=1024).batch(batch_size) 296 297 # Instantiate a loss function. 298 loss_fn = tf.keras.losses.CategoricalCrossentropy( 299 reduction=tf.keras.losses.Reduction.NONE) 300 # Instantiate an optimizer to train the model. 301 optimizer = tf.keras.optimizers.Adam() 302 model = self._build_model() 303 304 metrics, wall_time = self.measure_performance(model, train_dataset, loss_fn, 305 optimizer, batch_size, 306 run_iters, self.epochs) 307 extras = benchmark_util.get_keras_examples_metadata('conv', batch_size, 308 '.keras.ctl_graph') 309 self.report_benchmark( 310 iters=run_iters, wall_time=wall_time, metrics=metrics, extras=extras) 311 312 def benchmark_custom_training_mnist_bs_512(self): 313 """Measure performance with batch_size=512 and run_iters=10.""" 314 batch_size = 512 315 run_iters = 5 316 train_dataset = self.train_dataset.shuffle( 317 buffer_size=1024).batch(batch_size) 318 319 # Instantiate a loss function. 320 loss_fn = tf.keras.losses.CategoricalCrossentropy( 321 reduction=tf.keras.losses.Reduction.NONE) 322 # Instantiate an optimizer to train the model. 323 optimizer = tf.keras.optimizers.Adam() 324 model = self._build_model() 325 326 metrics, wall_time = self.measure_performance(model, train_dataset, loss_fn, 327 optimizer, batch_size, 328 run_iters, self.epochs) 329 extras = benchmark_util.get_keras_examples_metadata('conv', batch_size, 330 '.keras.ctl_graph') 331 self.report_benchmark( 332 iters=run_iters, wall_time=wall_time, metrics=metrics, extras=extras) 333 334 def benchmark_custom_training_mnist_bs_512_gpu_2(self): 335 """Measure performance with batch_size=512, run_iters=10, gpu=2 and 336 337 distribution_strategy='mirrored'. 338 """ 339 batch_size = 512 340 run_iters = 10 341 train_dataset = self.train_dataset.shuffle( 342 buffer_size=1024).batch(batch_size) 343 344 distribution_strategy = 'mirrored' 345 346 strategy = distribution_util.get_distribution_strategy( 347 distribution_strategy=distribution_strategy, num_gpus=2) 348 349 if distribution_strategy != 'off': 350 train_dataset = strategy.experimental_distribute_dataset(train_dataset) 351 352 strategy_scope = distribution_util.get_strategy_scope(strategy) 353 354 with strategy_scope: 355 # Instantiate a loss function. 356 loss_fn = tf.keras.losses.CategoricalCrossentropy( 357 reduction=tf.keras.losses.Reduction.NONE) 358 # Instantiate an optimizer to train the model. 359 optimizer = tf.keras.optimizers.Adam() 360 model = self._build_model() 361 362 metrics, wall_time = self.measure_performance(model, train_dataset, loss_fn, 363 optimizer, batch_size, 364 run_iters, self.epochs, 365 strategy) 366 extras = benchmark_util.get_keras_examples_metadata('conv', batch_size, 367 '.keras.ctl_graph') 368 self.report_benchmark( 369 iters=run_iters, wall_time=wall_time, metrics=metrics, extras=extras) 370 371 372if __name__ == '__main__': 373 tf.test.main() 374