1# Copyright 2019 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Tests add_loss API correctness.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import numpy as np 22 23from tensorflow.python.eager import backprop 24from tensorflow.python.eager import context 25from tensorflow.python.eager import def_function 26from tensorflow.python.keras import Input 27from tensorflow.python.keras import keras_parameterized 28from tensorflow.python.keras import layers 29from tensorflow.python.keras import losses 30from tensorflow.python.keras import Model 31from tensorflow.python.keras import optimizer_v2 32from tensorflow.python.keras import Sequential 33from tensorflow.python.keras import testing_utils 34from tensorflow.python.ops import array_ops 35from tensorflow.python.ops import math_ops 36from tensorflow.python.platform import test 37from tensorflow.python.platform import tf_logging as logging 38from tensorflow.python.training.rmsprop import RMSPropOptimizer 39 40MAE = losses.MeanAbsoluteError 41mae = losses.mean_absolute_error 42 43 44def get_ctl_train_step(model): 45 optimizer = optimizer_v2.gradient_descent.SGD(0.05) 46 47 def train_step(x, y, w=None): 48 with backprop.GradientTape() as tape: 49 if w is not None: 50 model([x, y, w]) 51 else: 52 model([x, y]) 53 loss = math_ops.reduce_sum(model.losses) 54 gradients = tape.gradient(loss, model.trainable_weights) 55 optimizer.apply_gradients(zip(gradients, model.trainable_weights)) 56 return loss 57 58 return train_step 59 60 61# TODO(psv): Add tests cases where a model is used in loss function but is 62# not part of the training model. 63 64 65class TestAddLossCorrectness(keras_parameterized.TestCase): 66 67 def setUp(self): 68 super(TestAddLossCorrectness, self).setUp() 69 self.x = np.array([[0.], [1.], [2.]], dtype='float32') 70 self.y = np.array([[0.5], [2.], [3.5]], dtype='float32') 71 self.w = np.array([[1.25], [0.5], [1.25]], dtype='float32') 72 73 @keras_parameterized.run_all_keras_modes 74 def test_loss_on_model_fit(self): 75 inputs = Input(shape=(1,)) 76 targets = Input(shape=(1,)) 77 outputs = testing_utils.Bias()(inputs) 78 model = Model([inputs, targets], outputs) 79 model.add_loss(MAE()(targets, outputs)) 80 model.add_loss(math_ops.reduce_mean(mae(targets, outputs))) 81 model.compile( 82 optimizer_v2.gradient_descent.SGD(0.05), 83 run_eagerly=testing_utils.should_run_eagerly()) 84 85 history = model.fit([self.x, self.y], batch_size=3, epochs=5) 86 self.assertAllClose(history.history['loss'], [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 87 88 @keras_parameterized.run_with_all_model_types(exclude_models=['sequential']) 89 @keras_parameterized.run_all_keras_modes(always_skip_v1=True) 90 def test_loss_callable_on_model_fit(self): 91 model = testing_utils.get_model_from_layers([testing_utils.Bias()], 92 input_shape=(1,)) 93 94 def callable_loss(): 95 return math_ops.reduce_sum(model.weights) 96 97 model.add_loss(callable_loss) 98 model.compile( 99 optimizer_v2.gradient_descent.SGD(0.1), 100 run_eagerly=testing_utils.should_run_eagerly()) 101 102 history = model.fit(self.x, batch_size=3, epochs=5) 103 self.assertAllClose(history.history['loss'], [0., -.1, -.2, -.3, -.4], 1e-3) 104 105 @keras_parameterized.run_all_keras_modes(always_skip_v1=True) 106 def test_loss_on_model_ctl(self): 107 def get_model_and_train_step(): 108 inputs = Input(shape=(1,)) 109 targets = Input(shape=(1,)) 110 outputs = testing_utils.Bias()(inputs) 111 model = Model([inputs, targets], outputs) 112 model.add_loss(MAE()(targets, outputs)) 113 model.add_loss(math_ops.reduce_mean(mae(targets, outputs))) 114 return get_ctl_train_step(model) 115 116 train_step = get_model_and_train_step() 117 loss = [train_step(self.x, self.y) for _ in range(5)] 118 self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 119 120 train_step = def_function.function(get_model_and_train_step()) 121 loss = [train_step(self.x, self.y) for _ in range(5)] 122 self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 123 124 @keras_parameterized.run_all_keras_modes(always_skip_v1=True) 125 def test_loss_callable_on_model_ctl(self): 126 def get_model_and_train_step(): 127 inputs = Input(shape=(1,)) 128 targets = Input(shape=(1,)) 129 outputs = testing_utils.Bias()(inputs) 130 model = Model([inputs, targets], outputs) 131 132 def callable_loss(): 133 return math_ops.reduce_sum(model.weights) 134 135 model.add_loss(callable_loss) 136 return get_ctl_train_step(model) 137 138 train_step = get_model_and_train_step() 139 loss = [train_step(self.x, self.y) for _ in range(5)] 140 self.assertAllClose(loss, [0., -0.05, -0.1, -0.15, -0.2], 1e-3) 141 142 train_step = def_function.function(get_model_and_train_step()) 143 loss = [train_step(self.x, self.y) for _ in range(5)] 144 self.assertAllClose(loss, [0., -0.05, -0.1, -0.15, -0.2], 1e-3) 145 146 @keras_parameterized.run_all_keras_modes 147 def test_loss_with_sample_weight_on_model_fit(self): 148 inputs = Input(shape=(1,)) 149 targets = Input(shape=(1,)) 150 sw = Input(shape=(1,)) 151 outputs = testing_utils.Bias()(inputs) 152 model = Model([inputs, targets, sw], outputs) 153 model.add_loss(MAE()(targets, outputs, sw)) 154 model.add_loss(3 * math_ops.reduce_mean(sw * mae(targets, outputs))) 155 model.compile( 156 optimizer_v2.gradient_descent.SGD(0.025), 157 run_eagerly=testing_utils.should_run_eagerly()) 158 159 history = model.fit([self.x, self.y, self.w], batch_size=3, epochs=5) 160 self.assertAllClose(history.history['loss'], [4., 3.6, 3.2, 2.8, 2.4], 1e-3) 161 162 @keras_parameterized.run_all_keras_modes(always_skip_v1=True) 163 def test_loss_with_sample_weight_on_model_ctl(self): 164 def get_model_and_train_step(): 165 inputs = Input(shape=(1,)) 166 targets = Input(shape=(1,)) 167 sw = Input(shape=(1,)) 168 outputs = testing_utils.Bias()(inputs) 169 model = Model([inputs, targets, sw], outputs) 170 model.add_loss(MAE()(targets, outputs, sw)) 171 model.add_loss(math_ops.reduce_mean(sw * mae(targets, outputs))) 172 return get_ctl_train_step(model) 173 174 train_step = get_model_and_train_step() 175 loss = [train_step(self.x, self.y, self.w) for _ in range(5)] 176 self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 177 178 train_step = def_function.function(get_model_and_train_step()) 179 loss = [train_step(self.x, self.y, self.w) for _ in range(5)] 180 self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 181 182 @keras_parameterized.run_all_keras_modes 183 def test_loss_with_sample_weight_in_model_call(self): 184 185 class MyModel(Model): 186 187 def __init__(self): 188 super(MyModel, self).__init__() 189 self.bias = testing_utils.Bias() 190 191 def call(self, inputs): 192 outputs = self.bias(inputs[0]) 193 self.add_loss(MAE()(inputs[1], outputs, inputs[2])) 194 self.add_loss(math_ops.reduce_mean(inputs[2] * mae(inputs[1], outputs))) 195 return outputs 196 197 model = MyModel() 198 model.predict([self.x, self.y, self.w]) 199 model.compile( 200 optimizer_v2.gradient_descent.SGD(0.05), 201 run_eagerly=testing_utils.should_run_eagerly()) 202 203 history = model.fit([self.x, self.y, self.w], batch_size=3, epochs=5) 204 self.assertEqual(len(model.losses), 2) 205 self.assertAllClose(history.history['loss'], [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 206 207 eval_out = model.evaluate([self.x, self.y, self.w]) 208 self.assertAlmostEqual(eval_out, 1.0, 3) 209 210 @keras_parameterized.run_all_keras_modes 211 def test_loss_with_sample_weight_in_layer_call(self): 212 213 class MyLayer(layers.Layer): 214 215 def __init__(self): 216 super(MyLayer, self).__init__() 217 self.bias = testing_utils.Bias() 218 219 def call(self, inputs): 220 out = self.bias(inputs[0]) 221 self.add_loss(MAE()(inputs[1], out, inputs[2])) 222 self.add_loss(math_ops.reduce_mean(inputs[2] * mae(inputs[1], out))) 223 return out 224 225 inputs = Input(shape=(1,)) 226 targets = Input(shape=(1,)) 227 sw = Input(shape=(1,)) 228 229 outputs = MyLayer()([inputs, targets, sw]) 230 model = Model([inputs, targets, sw], outputs) 231 model.predict([self.x, self.y, self.w]) 232 model.compile( 233 optimizer_v2.gradient_descent.SGD(0.05), 234 run_eagerly=testing_utils.should_run_eagerly()) 235 236 history = model.fit([self.x, self.y, self.w], batch_size=3, epochs=5) 237 self.assertAllClose(history.history['loss'], [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 238 239 output = model.evaluate([self.x, self.y, self.w]) 240 self.assertAlmostEqual(output, 1.0, 3) 241 242 output = model.test_on_batch([self.x, self.y, self.w]) 243 self.assertAlmostEqual(output, 1.0, 3) 244 245 @keras_parameterized.run_all_keras_modes 246 def test_loss_on_layer(self): 247 248 class MyLayer(layers.Layer): 249 250 def call(self, inputs): 251 self.add_loss(math_ops.reduce_sum(inputs)) 252 return inputs 253 254 inputs = Input((3,)) 255 layer = MyLayer() 256 outputs = layer(inputs) 257 model = Model(inputs, outputs) 258 self.assertEqual(len(model.losses), 1) 259 model.compile( 260 'sgd', 261 'mse', 262 run_eagerly=testing_utils.should_run_eagerly()) 263 loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3))) 264 self.assertEqual(loss, 2 * 3) 265 266 @keras_parameterized.run_all_keras_modes 267 @keras_parameterized.run_with_all_model_types 268 def test_activity_regularizer(self): 269 loss = {} 270 for reg in [None, 'l2']: 271 model_layers = [ 272 layers.Dense( 273 10, 274 activation='relu', 275 activity_regularizer=reg, 276 kernel_initializer='ones', 277 use_bias=False), 278 layers.Dense( 279 1, 280 activation='sigmoid', 281 kernel_initializer='ones', 282 use_bias=False), 283 ] 284 285 model = testing_utils.get_model_from_layers( 286 model_layers, input_shape=(10,)) 287 288 x = np.ones((10, 10), 'float32') 289 y = np.zeros((10, 1), 'float32') 290 291 optimizer = RMSPropOptimizer(learning_rate=0.001) 292 model.compile( 293 optimizer, 294 'binary_crossentropy', 295 run_eagerly=testing_utils.should_run_eagerly()) 296 model.fit(x, y, batch_size=2, epochs=5) 297 loss[reg] = model.evaluate(x, y) 298 self.assertLess(loss[None], loss['l2']) 299 300 @keras_parameterized.run_all_keras_modes 301 @keras_parameterized.run_with_all_model_types 302 def test_activity_regularizer_loss_value(self): 303 layer = layers.Dense( 304 1, 305 kernel_initializer='zeros', 306 bias_initializer='ones', 307 activity_regularizer='l2') 308 309 model = testing_utils.get_model_from_layers([layer], input_shape=(10,)) 310 311 x = np.ones((10, 10), 'float32') 312 optimizer = RMSPropOptimizer(learning_rate=0.001) 313 model.compile( 314 optimizer, 315 run_eagerly=testing_utils.should_run_eagerly()) 316 loss = model.test_on_batch(x) 317 self.assertAlmostEqual(0.01, loss, places=4) 318 319 @keras_parameterized.run_all_keras_modes 320 def test_activity_regularizer_batch_independent(self): 321 inputs = layers.Input(shape=(10,)) 322 x = layers.Dense(10, activation='relu', activity_regularizer='l2')(inputs) 323 outputs = layers.Dense(1, activation='sigmoid')(x) 324 model = Model(inputs, outputs) 325 326 optimizer = RMSPropOptimizer(learning_rate=0.001) 327 model.compile( 328 optimizer, 329 run_eagerly=testing_utils.should_run_eagerly()) 330 331 loss_small_batch = model.test_on_batch(np.ones((10, 10), 'float32')) 332 loss_big_batch = model.test_on_batch(np.ones((20, 10), 'float32')) 333 self.assertAlmostEqual(loss_small_batch, loss_big_batch, places=4) 334 335 @keras_parameterized.run_all_keras_modes 336 def test_with_shared_layer(self): 337 338 class LayerWithLoss(layers.Layer): 339 340 def call(self, inputs): 341 self.add_loss(math_ops.reduce_sum(inputs), inputs=inputs) 342 return inputs * 2 343 344 shared_layer = LayerWithLoss() 345 346 m = Sequential([shared_layer]) 347 m2 = Sequential([shared_layer, m]) 348 m2(array_ops.constant([1, 2, 3])) 349 self.assertEqual(len(m2.losses), 2) 350 self.assertAllClose(m2.losses, [6, 12]) 351 352 @keras_parameterized.run_all_keras_modes 353 def test_with_shared_nested_layer(self): 354 355 class LayerWithLoss(layers.Layer): 356 357 def call(self, inputs): 358 self.add_loss(math_ops.reduce_sum(inputs), inputs=inputs) 359 return inputs * 2 360 361 class LayerWithNestedLayerWithLoss(layers.Layer): 362 363 def __init__(self): 364 super(LayerWithNestedLayerWithLoss, self).__init__() 365 self.loss_layer = LayerWithLoss() 366 367 def call(self, inputs): 368 return self.loss_layer(inputs) 369 370 shared_layer = LayerWithNestedLayerWithLoss() 371 372 m = Sequential([shared_layer]) 373 m2 = Sequential([shared_layer, m]) 374 m2(array_ops.constant([1, 2, 3])) 375 self.assertEqual(len(m2.losses), 2) 376 self.assertAllClose(m2.losses, [6, 12]) 377 378 @keras_parameterized.run_all_keras_modes 379 def test_clear_losses(self): 380 381 class LayerWithSharedNestedLossLayer(layers.Layer): 382 383 def __init__(self): 384 super(LayerWithSharedNestedLossLayer, self).__init__() 385 self.loss_layer = layers.ActivityRegularization(l2=0.001) 386 self.add_weight(shape=(1,), regularizer='l2') 387 388 def call(self, x): 389 x = self.loss_layer(x) 390 return self.loss_layer(x) 391 392 inputs = Input(shape=(1,)) 393 l = LayerWithSharedNestedLossLayer() # Weight loss + 2 activity losses. 394 395 x1 = array_ops.ones((1, 1)) 396 _ = l(x1) 397 if not context.executing_eagerly(): 398 self.assertEqual(len(l.get_losses_for(x1)), 2) 399 self.assertEqual(len(l.get_losses_for(None)), 1) 400 401 x2 = array_ops.ones((1, 1)) 402 _ = l(x2) 403 if not context.executing_eagerly(): 404 self.assertEqual(len(l.get_losses_for(x1)), 2) 405 self.assertEqual(len(l.get_losses_for(x2)), 2) 406 self.assertEqual(len(l.get_losses_for(None)), 1) 407 408 outputs = l(inputs) 409 model = Model(inputs, outputs) 410 if not context.executing_eagerly(): 411 self.assertEqual(len(model.losses), 7) 412 self.assertEqual(len(l.get_losses_for(x1)), 2) 413 self.assertEqual(len(l.get_losses_for(x2)), 2) 414 self.assertEqual(len(l.get_losses_for(None)), 1) 415 416 x3 = array_ops.ones((1, 1)) 417 model(x3) 418 x4 = array_ops.ones((1, 1)) 419 model(x4) 420 if context.executing_eagerly(): 421 # Eager losses are cleared every `__call__`. 422 self.assertEqual(len(model.losses), 3) 423 else: 424 self.assertEqual(len(model.losses), 11) 425 self.assertEqual(len(model.get_losses_for(x3)), 2) 426 self.assertEqual(len(model.get_losses_for(x4)), 2) 427 self.assertEqual(len(model.get_losses_for(None)), 1) 428 429 @keras_parameterized.run_all_keras_modes(always_skip_v1=True) 430 def test_invalid_constant_input(self): 431 inputs = Input(shape=(1,)) 432 outputs = testing_utils.Bias()(inputs) 433 model = Model(inputs, outputs) 434 with self.assertRaisesRegex( 435 ValueError, 436 'Expected a symbolic Tensors or a callable for the loss value'): 437 model.add_loss(1.) 438 439 @keras_parameterized.run_all_keras_modes(always_skip_v1=True) 440 def test_invalid_variable_input(self): 441 inputs = Input(shape=(1,)) 442 outputs = testing_utils.Bias()(inputs) 443 model = Model(inputs, outputs) 444 with self.assertRaisesRegex( 445 ValueError, 446 'Expected a symbolic Tensors or a callable for the loss value'): 447 model.add_loss(model.weights[0]) 448 449 @keras_parameterized.run_all_keras_modes 450 def test_add_entropy_loss_on_functional_model(self): 451 inputs = Input(shape=(1,)) 452 targets = Input(shape=(1,)) 453 outputs = testing_utils.Bias()(inputs) 454 model = Model([inputs, targets], outputs) 455 model.add_loss(losses.binary_crossentropy(targets, outputs)) 456 model.compile('sgd', run_eagerly=testing_utils.should_run_eagerly()) 457 with test.mock.patch.object(logging, 'warning') as mock_log: 458 model.fit([self.x, self.y], batch_size=3, epochs=5) 459 self.assertNotIn('Gradients do not exist for variables', 460 str(mock_log.call_args)) 461 462 463if __name__ == '__main__': 464 test.main() 465