1# Copyright 2019 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Tests add_loss API correctness.""" 16 17import numpy as np 18 19from tensorflow.python.eager import backprop 20from tensorflow.python.eager import context 21from tensorflow.python.eager import def_function 22from tensorflow.python.keras import Input 23from tensorflow.python.keras import keras_parameterized 24from tensorflow.python.keras import layers 25from tensorflow.python.keras import losses 26from tensorflow.python.keras import Model 27from tensorflow.python.keras import optimizer_v2 28from tensorflow.python.keras import Sequential 29from tensorflow.python.keras import testing_utils 30from tensorflow.python.ops import array_ops 31from tensorflow.python.ops import math_ops 32from tensorflow.python.platform import test 33from tensorflow.python.platform import tf_logging as logging 34from tensorflow.python.training.rmsprop import RMSPropOptimizer 35 36MAE = losses.MeanAbsoluteError 37mae = losses.mean_absolute_error 38 39 40def get_ctl_train_step(model): 41 optimizer = optimizer_v2.gradient_descent.SGD(0.05) 42 43 def train_step(x, y, w=None): 44 with backprop.GradientTape() as tape: 45 if w is not None: 46 model([x, y, w]) 47 else: 48 model([x, y]) 49 loss = math_ops.reduce_sum(model.losses) 50 gradients = tape.gradient(loss, model.trainable_weights) 51 optimizer.apply_gradients(zip(gradients, model.trainable_weights)) 52 return loss 53 54 return train_step 55 56 57# TODO(psv): Add tests cases where a model is used in loss function but is 58# not part of the training model. 59 60 61class TestAddLossCorrectness(keras_parameterized.TestCase): 62 63 def setUp(self): 64 super(TestAddLossCorrectness, self).setUp() 65 self.x = np.array([[0.], [1.], [2.]], dtype='float32') 66 self.y = np.array([[0.5], [2.], [3.5]], dtype='float32') 67 self.w = np.array([[1.25], [0.5], [1.25]], dtype='float32') 68 69 @keras_parameterized.run_all_keras_modes 70 def test_loss_on_model_fit(self): 71 inputs = Input(shape=(1,)) 72 targets = Input(shape=(1,)) 73 outputs = testing_utils.Bias()(inputs) 74 model = Model([inputs, targets], outputs) 75 model.add_loss(MAE()(targets, outputs)) 76 model.add_loss(math_ops.reduce_mean(mae(targets, outputs))) 77 model.compile( 78 optimizer_v2.gradient_descent.SGD(0.05), 79 run_eagerly=testing_utils.should_run_eagerly()) 80 81 history = model.fit([self.x, self.y], batch_size=3, epochs=5) 82 self.assertAllClose(history.history['loss'], [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 83 84 @keras_parameterized.run_with_all_model_types(exclude_models=['sequential']) 85 @keras_parameterized.run_all_keras_modes(always_skip_v1=True) 86 def test_loss_callable_on_model_fit(self): 87 model = testing_utils.get_model_from_layers([testing_utils.Bias()], 88 input_shape=(1,)) 89 90 def callable_loss(): 91 return math_ops.reduce_sum(model.weights) 92 93 model.add_loss(callable_loss) 94 model.compile( 95 optimizer_v2.gradient_descent.SGD(0.1), 96 run_eagerly=testing_utils.should_run_eagerly()) 97 98 history = model.fit(self.x, batch_size=3, epochs=5) 99 self.assertAllClose(history.history['loss'], [0., -.1, -.2, -.3, -.4], 1e-3) 100 101 @keras_parameterized.run_all_keras_modes(always_skip_v1=True) 102 def test_loss_on_model_ctl(self): 103 def get_model_and_train_step(): 104 inputs = Input(shape=(1,)) 105 targets = Input(shape=(1,)) 106 outputs = testing_utils.Bias()(inputs) 107 model = Model([inputs, targets], outputs) 108 model.add_loss(MAE()(targets, outputs)) 109 model.add_loss(math_ops.reduce_mean(mae(targets, outputs))) 110 return get_ctl_train_step(model) 111 112 train_step = get_model_and_train_step() 113 loss = [train_step(self.x, self.y) for _ in range(5)] 114 self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 115 116 train_step = def_function.function(get_model_and_train_step()) 117 loss = [train_step(self.x, self.y) for _ in range(5)] 118 self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 119 120 @keras_parameterized.run_all_keras_modes(always_skip_v1=True) 121 def test_loss_callable_on_model_ctl(self): 122 def get_model_and_train_step(): 123 inputs = Input(shape=(1,)) 124 targets = Input(shape=(1,)) 125 outputs = testing_utils.Bias()(inputs) 126 model = Model([inputs, targets], outputs) 127 128 def callable_loss(): 129 return math_ops.reduce_sum(model.weights) 130 131 model.add_loss(callable_loss) 132 return get_ctl_train_step(model) 133 134 train_step = get_model_and_train_step() 135 loss = [train_step(self.x, self.y) for _ in range(5)] 136 self.assertAllClose(loss, [0., -0.05, -0.1, -0.15, -0.2], 1e-3) 137 138 train_step = def_function.function(get_model_and_train_step()) 139 loss = [train_step(self.x, self.y) for _ in range(5)] 140 self.assertAllClose(loss, [0., -0.05, -0.1, -0.15, -0.2], 1e-3) 141 142 @keras_parameterized.run_all_keras_modes 143 def test_loss_with_sample_weight_on_model_fit(self): 144 inputs = Input(shape=(1,)) 145 targets = Input(shape=(1,)) 146 sw = Input(shape=(1,)) 147 outputs = testing_utils.Bias()(inputs) 148 model = Model([inputs, targets, sw], outputs) 149 model.add_loss(MAE()(targets, outputs, sw)) 150 model.add_loss(3 * math_ops.reduce_mean(sw * mae(targets, outputs))) 151 model.compile( 152 optimizer_v2.gradient_descent.SGD(0.025), 153 run_eagerly=testing_utils.should_run_eagerly()) 154 155 history = model.fit([self.x, self.y, self.w], batch_size=3, epochs=5) 156 self.assertAllClose(history.history['loss'], [4., 3.6, 3.2, 2.8, 2.4], 1e-3) 157 158 @keras_parameterized.run_all_keras_modes(always_skip_v1=True) 159 def test_loss_with_sample_weight_on_model_ctl(self): 160 def get_model_and_train_step(): 161 inputs = Input(shape=(1,)) 162 targets = Input(shape=(1,)) 163 sw = Input(shape=(1,)) 164 outputs = testing_utils.Bias()(inputs) 165 model = Model([inputs, targets, sw], outputs) 166 model.add_loss(MAE()(targets, outputs, sw)) 167 model.add_loss(math_ops.reduce_mean(sw * mae(targets, outputs))) 168 return get_ctl_train_step(model) 169 170 train_step = get_model_and_train_step() 171 loss = [train_step(self.x, self.y, self.w) for _ in range(5)] 172 self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 173 174 train_step = def_function.function(get_model_and_train_step()) 175 loss = [train_step(self.x, self.y, self.w) for _ in range(5)] 176 self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 177 178 @keras_parameterized.run_all_keras_modes 179 def test_loss_with_sample_weight_in_model_call(self): 180 181 class MyModel(Model): 182 183 def __init__(self): 184 super(MyModel, self).__init__() 185 self.bias = testing_utils.Bias() 186 187 def call(self, inputs): 188 outputs = self.bias(inputs[0]) 189 self.add_loss(MAE()(inputs[1], outputs, inputs[2])) 190 self.add_loss(math_ops.reduce_mean(inputs[2] * mae(inputs[1], outputs))) 191 return outputs 192 193 model = MyModel() 194 model.predict([self.x, self.y, self.w]) 195 model.compile( 196 optimizer_v2.gradient_descent.SGD(0.05), 197 run_eagerly=testing_utils.should_run_eagerly()) 198 199 history = model.fit([self.x, self.y, self.w], batch_size=3, epochs=5) 200 self.assertEqual(len(model.losses), 2) 201 self.assertAllClose(history.history['loss'], [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 202 203 eval_out = model.evaluate([self.x, self.y, self.w]) 204 self.assertAlmostEqual(eval_out, 1.0, 3) 205 206 @keras_parameterized.run_all_keras_modes 207 def test_loss_with_sample_weight_in_layer_call(self): 208 209 class MyLayer(layers.Layer): 210 211 def __init__(self): 212 super(MyLayer, self).__init__() 213 self.bias = testing_utils.Bias() 214 215 def call(self, inputs): 216 out = self.bias(inputs[0]) 217 self.add_loss(MAE()(inputs[1], out, inputs[2])) 218 self.add_loss(math_ops.reduce_mean(inputs[2] * mae(inputs[1], out))) 219 return out 220 221 inputs = Input(shape=(1,)) 222 targets = Input(shape=(1,)) 223 sw = Input(shape=(1,)) 224 225 outputs = MyLayer()([inputs, targets, sw]) 226 model = Model([inputs, targets, sw], outputs) 227 model.predict([self.x, self.y, self.w]) 228 model.compile( 229 optimizer_v2.gradient_descent.SGD(0.05), 230 run_eagerly=testing_utils.should_run_eagerly()) 231 232 history = model.fit([self.x, self.y, self.w], batch_size=3, epochs=5) 233 self.assertAllClose(history.history['loss'], [2., 1.8, 1.6, 1.4, 1.2], 1e-3) 234 235 output = model.evaluate([self.x, self.y, self.w]) 236 self.assertAlmostEqual(output, 1.0, 3) 237 238 output = model.test_on_batch([self.x, self.y, self.w]) 239 self.assertAlmostEqual(output, 1.0, 3) 240 241 @keras_parameterized.run_all_keras_modes 242 def test_loss_on_layer(self): 243 244 class MyLayer(layers.Layer): 245 246 def call(self, inputs): 247 self.add_loss(math_ops.reduce_sum(inputs)) 248 return inputs 249 250 inputs = Input((3,)) 251 layer = MyLayer() 252 outputs = layer(inputs) 253 model = Model(inputs, outputs) 254 self.assertEqual(len(model.losses), 1) 255 model.compile( 256 'sgd', 257 'mse', 258 run_eagerly=testing_utils.should_run_eagerly()) 259 loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3))) 260 self.assertEqual(loss, 2 * 3) 261 262 @keras_parameterized.run_all_keras_modes 263 @keras_parameterized.run_with_all_model_types 264 def test_activity_regularizer(self): 265 loss = {} 266 for reg in [None, 'l2']: 267 model_layers = [ 268 layers.Dense( 269 10, 270 activation='relu', 271 activity_regularizer=reg, 272 kernel_initializer='ones', 273 use_bias=False), 274 layers.Dense( 275 1, 276 activation='sigmoid', 277 kernel_initializer='ones', 278 use_bias=False), 279 ] 280 281 model = testing_utils.get_model_from_layers( 282 model_layers, input_shape=(10,)) 283 284 x = np.ones((10, 10), 'float32') 285 y = np.zeros((10, 1), 'float32') 286 287 optimizer = RMSPropOptimizer(learning_rate=0.001) 288 model.compile( 289 optimizer, 290 'binary_crossentropy', 291 run_eagerly=testing_utils.should_run_eagerly()) 292 model.fit(x, y, batch_size=2, epochs=5) 293 loss[reg] = model.evaluate(x, y) 294 self.assertLess(loss[None], loss['l2']) 295 296 @keras_parameterized.run_all_keras_modes 297 @keras_parameterized.run_with_all_model_types 298 def test_activity_regularizer_loss_value(self): 299 layer = layers.Dense( 300 1, 301 kernel_initializer='zeros', 302 bias_initializer='ones', 303 activity_regularizer='l2') 304 305 model = testing_utils.get_model_from_layers([layer], input_shape=(10,)) 306 307 x = np.ones((10, 10), 'float32') 308 optimizer = RMSPropOptimizer(learning_rate=0.001) 309 model.compile( 310 optimizer, 311 run_eagerly=testing_utils.should_run_eagerly()) 312 loss = model.test_on_batch(x) 313 self.assertAlmostEqual(0.01, loss, places=4) 314 315 @keras_parameterized.run_all_keras_modes 316 def test_activity_regularizer_batch_independent(self): 317 inputs = layers.Input(shape=(10,)) 318 x = layers.Dense(10, activation='relu', activity_regularizer='l2')(inputs) 319 outputs = layers.Dense(1, activation='sigmoid')(x) 320 model = Model(inputs, outputs) 321 322 optimizer = RMSPropOptimizer(learning_rate=0.001) 323 model.compile( 324 optimizer, 325 run_eagerly=testing_utils.should_run_eagerly()) 326 327 loss_small_batch = model.test_on_batch(np.ones((10, 10), 'float32')) 328 loss_big_batch = model.test_on_batch(np.ones((20, 10), 'float32')) 329 self.assertAlmostEqual(loss_small_batch, loss_big_batch, places=4) 330 331 @keras_parameterized.run_all_keras_modes 332 def test_with_shared_layer(self): 333 334 class LayerWithLoss(layers.Layer): 335 336 def call(self, inputs): 337 self.add_loss(math_ops.reduce_sum(inputs), inputs=inputs) 338 return inputs * 2 339 340 shared_layer = LayerWithLoss() 341 342 m = Sequential([shared_layer]) 343 m2 = Sequential([shared_layer, m]) 344 m2(array_ops.constant([1, 2, 3])) 345 self.assertEqual(len(m2.losses), 2) 346 self.assertAllClose(m2.losses, [6, 12]) 347 348 @keras_parameterized.run_all_keras_modes 349 def test_with_shared_nested_layer(self): 350 351 class LayerWithLoss(layers.Layer): 352 353 def call(self, inputs): 354 self.add_loss(math_ops.reduce_sum(inputs), inputs=inputs) 355 return inputs * 2 356 357 class LayerWithNestedLayerWithLoss(layers.Layer): 358 359 def __init__(self): 360 super(LayerWithNestedLayerWithLoss, self).__init__() 361 self.loss_layer = LayerWithLoss() 362 363 def call(self, inputs): 364 return self.loss_layer(inputs) 365 366 shared_layer = LayerWithNestedLayerWithLoss() 367 368 m = Sequential([shared_layer]) 369 m2 = Sequential([shared_layer, m]) 370 m2(array_ops.constant([1, 2, 3])) 371 self.assertEqual(len(m2.losses), 2) 372 self.assertAllClose(m2.losses, [6, 12]) 373 374 @keras_parameterized.run_all_keras_modes 375 def test_clear_losses(self): 376 377 class LayerWithSharedNestedLossLayer(layers.Layer): 378 379 def __init__(self): 380 super(LayerWithSharedNestedLossLayer, self).__init__() 381 self.loss_layer = layers.ActivityRegularization(l2=0.001) 382 self.add_weight(shape=(1,), regularizer='l2') 383 384 def call(self, x): 385 x = self.loss_layer(x) 386 return self.loss_layer(x) 387 388 inputs = Input(shape=(1,)) 389 l = LayerWithSharedNestedLossLayer() # Weight loss + 2 activity losses. 390 391 x1 = array_ops.ones((1, 1)) 392 _ = l(x1) 393 if not context.executing_eagerly(): 394 self.assertEqual(len(l.get_losses_for(x1)), 2) 395 self.assertEqual(len(l.get_losses_for(None)), 1) 396 397 x2 = array_ops.ones((1, 1)) 398 _ = l(x2) 399 if not context.executing_eagerly(): 400 self.assertEqual(len(l.get_losses_for(x1)), 2) 401 self.assertEqual(len(l.get_losses_for(x2)), 2) 402 self.assertEqual(len(l.get_losses_for(None)), 1) 403 404 outputs = l(inputs) 405 model = Model(inputs, outputs) 406 if not context.executing_eagerly(): 407 self.assertEqual(len(model.losses), 7) 408 self.assertEqual(len(l.get_losses_for(x1)), 2) 409 self.assertEqual(len(l.get_losses_for(x2)), 2) 410 self.assertEqual(len(l.get_losses_for(None)), 1) 411 412 x3 = array_ops.ones((1, 1)) 413 model(x3) 414 x4 = array_ops.ones((1, 1)) 415 model(x4) 416 if context.executing_eagerly(): 417 # Eager losses are cleared every `__call__`. 418 self.assertEqual(len(model.losses), 3) 419 else: 420 self.assertEqual(len(model.losses), 11) 421 self.assertEqual(len(model.get_losses_for(x3)), 2) 422 self.assertEqual(len(model.get_losses_for(x4)), 2) 423 self.assertEqual(len(model.get_losses_for(None)), 1) 424 425 @keras_parameterized.run_all_keras_modes(always_skip_v1=True) 426 def test_invalid_constant_input(self): 427 inputs = Input(shape=(1,)) 428 outputs = testing_utils.Bias()(inputs) 429 model = Model(inputs, outputs) 430 with self.assertRaisesRegex( 431 ValueError, 432 'Expected a symbolic Tensors or a callable for the loss value'): 433 model.add_loss(1.) 434 435 @keras_parameterized.run_all_keras_modes(always_skip_v1=True) 436 def test_invalid_variable_input(self): 437 inputs = Input(shape=(1,)) 438 outputs = testing_utils.Bias()(inputs) 439 model = Model(inputs, outputs) 440 with self.assertRaisesRegex( 441 ValueError, 442 'Expected a symbolic Tensors or a callable for the loss value'): 443 model.add_loss(model.weights[0]) 444 445 @keras_parameterized.run_all_keras_modes 446 def test_add_entropy_loss_on_functional_model(self): 447 inputs = Input(shape=(1,)) 448 targets = Input(shape=(1,)) 449 outputs = testing_utils.Bias()(inputs) 450 model = Model([inputs, targets], outputs) 451 model.add_loss(losses.binary_crossentropy(targets, outputs)) 452 model.compile('sgd', run_eagerly=testing_utils.should_run_eagerly()) 453 with test.mock.patch.object(logging, 'warning') as mock_log: 454 model.fit([self.x, self.y], batch_size=3, epochs=5) 455 self.assertNotIn('Gradients do not exist for variables', 456 str(mock_log.call_args)) 457 458 459if __name__ == '__main__': 460 test.main() 461