• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Tests add_loss API correctness."""
16
17import numpy as np
18
19from tensorflow.python.eager import backprop
20from tensorflow.python.eager import context
21from tensorflow.python.eager import def_function
22from tensorflow.python.keras import Input
23from tensorflow.python.keras import keras_parameterized
24from tensorflow.python.keras import layers
25from tensorflow.python.keras import losses
26from tensorflow.python.keras import Model
27from tensorflow.python.keras import optimizer_v2
28from tensorflow.python.keras import Sequential
29from tensorflow.python.keras import testing_utils
30from tensorflow.python.ops import array_ops
31from tensorflow.python.ops import math_ops
32from tensorflow.python.platform import test
33from tensorflow.python.platform import tf_logging as logging
34from tensorflow.python.training.rmsprop import RMSPropOptimizer
35
36MAE = losses.MeanAbsoluteError
37mae = losses.mean_absolute_error
38
39
40def get_ctl_train_step(model):
41  optimizer = optimizer_v2.gradient_descent.SGD(0.05)
42
43  def train_step(x, y, w=None):
44    with backprop.GradientTape() as tape:
45      if w is not None:
46        model([x, y, w])
47      else:
48        model([x, y])
49      loss = math_ops.reduce_sum(model.losses)
50    gradients = tape.gradient(loss, model.trainable_weights)
51    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
52    return loss
53
54  return train_step
55
56
57# TODO(psv): Add tests cases where a model is used in loss function but is
58# not part of the training model.
59
60
61class TestAddLossCorrectness(keras_parameterized.TestCase):
62
63  def setUp(self):
64    super(TestAddLossCorrectness, self).setUp()
65    self.x = np.array([[0.], [1.], [2.]], dtype='float32')
66    self.y = np.array([[0.5], [2.], [3.5]], dtype='float32')
67    self.w = np.array([[1.25], [0.5], [1.25]], dtype='float32')
68
69  @keras_parameterized.run_all_keras_modes
70  def test_loss_on_model_fit(self):
71    inputs = Input(shape=(1,))
72    targets = Input(shape=(1,))
73    outputs = testing_utils.Bias()(inputs)
74    model = Model([inputs, targets], outputs)
75    model.add_loss(MAE()(targets, outputs))
76    model.add_loss(math_ops.reduce_mean(mae(targets, outputs)))
77    model.compile(
78        optimizer_v2.gradient_descent.SGD(0.05),
79        run_eagerly=testing_utils.should_run_eagerly())
80
81    history = model.fit([self.x, self.y], batch_size=3, epochs=5)
82    self.assertAllClose(history.history['loss'], [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
83
84  @keras_parameterized.run_with_all_model_types(exclude_models=['sequential'])
85  @keras_parameterized.run_all_keras_modes(always_skip_v1=True)
86  def test_loss_callable_on_model_fit(self):
87    model = testing_utils.get_model_from_layers([testing_utils.Bias()],
88                                                input_shape=(1,))
89
90    def callable_loss():
91      return math_ops.reduce_sum(model.weights)
92
93    model.add_loss(callable_loss)
94    model.compile(
95        optimizer_v2.gradient_descent.SGD(0.1),
96        run_eagerly=testing_utils.should_run_eagerly())
97
98    history = model.fit(self.x, batch_size=3, epochs=5)
99    self.assertAllClose(history.history['loss'], [0., -.1, -.2, -.3, -.4], 1e-3)
100
101  @keras_parameterized.run_all_keras_modes(always_skip_v1=True)
102  def test_loss_on_model_ctl(self):
103    def get_model_and_train_step():
104      inputs = Input(shape=(1,))
105      targets = Input(shape=(1,))
106      outputs = testing_utils.Bias()(inputs)
107      model = Model([inputs, targets], outputs)
108      model.add_loss(MAE()(targets, outputs))
109      model.add_loss(math_ops.reduce_mean(mae(targets, outputs)))
110      return get_ctl_train_step(model)
111
112    train_step = get_model_and_train_step()
113    loss = [train_step(self.x, self.y) for _ in range(5)]
114    self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
115
116    train_step = def_function.function(get_model_and_train_step())
117    loss = [train_step(self.x, self.y) for _ in range(5)]
118    self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
119
120  @keras_parameterized.run_all_keras_modes(always_skip_v1=True)
121  def test_loss_callable_on_model_ctl(self):
122    def get_model_and_train_step():
123      inputs = Input(shape=(1,))
124      targets = Input(shape=(1,))
125      outputs = testing_utils.Bias()(inputs)
126      model = Model([inputs, targets], outputs)
127
128      def callable_loss():
129        return math_ops.reduce_sum(model.weights)
130
131      model.add_loss(callable_loss)
132      return get_ctl_train_step(model)
133
134    train_step = get_model_and_train_step()
135    loss = [train_step(self.x, self.y) for _ in range(5)]
136    self.assertAllClose(loss, [0., -0.05, -0.1, -0.15, -0.2], 1e-3)
137
138    train_step = def_function.function(get_model_and_train_step())
139    loss = [train_step(self.x, self.y) for _ in range(5)]
140    self.assertAllClose(loss, [0., -0.05, -0.1, -0.15, -0.2], 1e-3)
141
142  @keras_parameterized.run_all_keras_modes
143  def test_loss_with_sample_weight_on_model_fit(self):
144    inputs = Input(shape=(1,))
145    targets = Input(shape=(1,))
146    sw = Input(shape=(1,))
147    outputs = testing_utils.Bias()(inputs)
148    model = Model([inputs, targets, sw], outputs)
149    model.add_loss(MAE()(targets, outputs, sw))
150    model.add_loss(3 * math_ops.reduce_mean(sw * mae(targets, outputs)))
151    model.compile(
152        optimizer_v2.gradient_descent.SGD(0.025),
153        run_eagerly=testing_utils.should_run_eagerly())
154
155    history = model.fit([self.x, self.y, self.w], batch_size=3, epochs=5)
156    self.assertAllClose(history.history['loss'], [4., 3.6, 3.2, 2.8, 2.4], 1e-3)
157
158  @keras_parameterized.run_all_keras_modes(always_skip_v1=True)
159  def test_loss_with_sample_weight_on_model_ctl(self):
160    def get_model_and_train_step():
161      inputs = Input(shape=(1,))
162      targets = Input(shape=(1,))
163      sw = Input(shape=(1,))
164      outputs = testing_utils.Bias()(inputs)
165      model = Model([inputs, targets, sw], outputs)
166      model.add_loss(MAE()(targets, outputs, sw))
167      model.add_loss(math_ops.reduce_mean(sw * mae(targets, outputs)))
168      return get_ctl_train_step(model)
169
170    train_step = get_model_and_train_step()
171    loss = [train_step(self.x, self.y, self.w) for _ in range(5)]
172    self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
173
174    train_step = def_function.function(get_model_and_train_step())
175    loss = [train_step(self.x, self.y, self.w) for _ in range(5)]
176    self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
177
178  @keras_parameterized.run_all_keras_modes
179  def test_loss_with_sample_weight_in_model_call(self):
180
181    class MyModel(Model):
182
183      def __init__(self):
184        super(MyModel, self).__init__()
185        self.bias = testing_utils.Bias()
186
187      def call(self, inputs):
188        outputs = self.bias(inputs[0])
189        self.add_loss(MAE()(inputs[1], outputs, inputs[2]))
190        self.add_loss(math_ops.reduce_mean(inputs[2] * mae(inputs[1], outputs)))
191        return outputs
192
193    model = MyModel()
194    model.predict([self.x, self.y, self.w])
195    model.compile(
196        optimizer_v2.gradient_descent.SGD(0.05),
197        run_eagerly=testing_utils.should_run_eagerly())
198
199    history = model.fit([self.x, self.y, self.w], batch_size=3, epochs=5)
200    self.assertEqual(len(model.losses), 2)
201    self.assertAllClose(history.history['loss'], [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
202
203    eval_out = model.evaluate([self.x, self.y, self.w])
204    self.assertAlmostEqual(eval_out, 1.0, 3)
205
206  @keras_parameterized.run_all_keras_modes
207  def test_loss_with_sample_weight_in_layer_call(self):
208
209    class MyLayer(layers.Layer):
210
211      def __init__(self):
212        super(MyLayer, self).__init__()
213        self.bias = testing_utils.Bias()
214
215      def call(self, inputs):
216        out = self.bias(inputs[0])
217        self.add_loss(MAE()(inputs[1], out, inputs[2]))
218        self.add_loss(math_ops.reduce_mean(inputs[2] * mae(inputs[1], out)))
219        return out
220
221    inputs = Input(shape=(1,))
222    targets = Input(shape=(1,))
223    sw = Input(shape=(1,))
224
225    outputs = MyLayer()([inputs, targets, sw])
226    model = Model([inputs, targets, sw], outputs)
227    model.predict([self.x, self.y, self.w])
228    model.compile(
229        optimizer_v2.gradient_descent.SGD(0.05),
230        run_eagerly=testing_utils.should_run_eagerly())
231
232    history = model.fit([self.x, self.y, self.w], batch_size=3, epochs=5)
233    self.assertAllClose(history.history['loss'], [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
234
235    output = model.evaluate([self.x, self.y, self.w])
236    self.assertAlmostEqual(output, 1.0, 3)
237
238    output = model.test_on_batch([self.x, self.y, self.w])
239    self.assertAlmostEqual(output, 1.0, 3)
240
241  @keras_parameterized.run_all_keras_modes
242  def test_loss_on_layer(self):
243
244    class MyLayer(layers.Layer):
245
246      def call(self, inputs):
247        self.add_loss(math_ops.reduce_sum(inputs))
248        return inputs
249
250    inputs = Input((3,))
251    layer = MyLayer()
252    outputs = layer(inputs)
253    model = Model(inputs, outputs)
254    self.assertEqual(len(model.losses), 1)
255    model.compile(
256        'sgd',
257        'mse',
258        run_eagerly=testing_utils.should_run_eagerly())
259    loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3)))
260    self.assertEqual(loss, 2 * 3)
261
262  @keras_parameterized.run_all_keras_modes
263  @keras_parameterized.run_with_all_model_types
264  def test_activity_regularizer(self):
265    loss = {}
266    for reg in [None, 'l2']:
267      model_layers = [
268          layers.Dense(
269              10,
270              activation='relu',
271              activity_regularizer=reg,
272              kernel_initializer='ones',
273              use_bias=False),
274          layers.Dense(
275              1,
276              activation='sigmoid',
277              kernel_initializer='ones',
278              use_bias=False),
279      ]
280
281      model = testing_utils.get_model_from_layers(
282          model_layers, input_shape=(10,))
283
284      x = np.ones((10, 10), 'float32')
285      y = np.zeros((10, 1), 'float32')
286
287      optimizer = RMSPropOptimizer(learning_rate=0.001)
288      model.compile(
289          optimizer,
290          'binary_crossentropy',
291          run_eagerly=testing_utils.should_run_eagerly())
292      model.fit(x, y, batch_size=2, epochs=5)
293      loss[reg] = model.evaluate(x, y)
294    self.assertLess(loss[None], loss['l2'])
295
296  @keras_parameterized.run_all_keras_modes
297  @keras_parameterized.run_with_all_model_types
298  def test_activity_regularizer_loss_value(self):
299    layer = layers.Dense(
300        1,
301        kernel_initializer='zeros',
302        bias_initializer='ones',
303        activity_regularizer='l2')
304
305    model = testing_utils.get_model_from_layers([layer], input_shape=(10,))
306
307    x = np.ones((10, 10), 'float32')
308    optimizer = RMSPropOptimizer(learning_rate=0.001)
309    model.compile(
310        optimizer,
311        run_eagerly=testing_utils.should_run_eagerly())
312    loss = model.test_on_batch(x)
313    self.assertAlmostEqual(0.01, loss, places=4)
314
315  @keras_parameterized.run_all_keras_modes
316  def test_activity_regularizer_batch_independent(self):
317    inputs = layers.Input(shape=(10,))
318    x = layers.Dense(10, activation='relu', activity_regularizer='l2')(inputs)
319    outputs = layers.Dense(1, activation='sigmoid')(x)
320    model = Model(inputs, outputs)
321
322    optimizer = RMSPropOptimizer(learning_rate=0.001)
323    model.compile(
324        optimizer,
325        run_eagerly=testing_utils.should_run_eagerly())
326
327    loss_small_batch = model.test_on_batch(np.ones((10, 10), 'float32'))
328    loss_big_batch = model.test_on_batch(np.ones((20, 10), 'float32'))
329    self.assertAlmostEqual(loss_small_batch, loss_big_batch, places=4)
330
331  @keras_parameterized.run_all_keras_modes
332  def test_with_shared_layer(self):
333
334    class LayerWithLoss(layers.Layer):
335
336      def call(self, inputs):
337        self.add_loss(math_ops.reduce_sum(inputs), inputs=inputs)
338        return inputs * 2
339
340    shared_layer = LayerWithLoss()
341
342    m = Sequential([shared_layer])
343    m2 = Sequential([shared_layer, m])
344    m2(array_ops.constant([1, 2, 3]))
345    self.assertEqual(len(m2.losses), 2)
346    self.assertAllClose(m2.losses, [6, 12])
347
348  @keras_parameterized.run_all_keras_modes
349  def test_with_shared_nested_layer(self):
350
351    class LayerWithLoss(layers.Layer):
352
353      def call(self, inputs):
354        self.add_loss(math_ops.reduce_sum(inputs), inputs=inputs)
355        return inputs * 2
356
357    class LayerWithNestedLayerWithLoss(layers.Layer):
358
359      def __init__(self):
360        super(LayerWithNestedLayerWithLoss, self).__init__()
361        self.loss_layer = LayerWithLoss()
362
363      def call(self, inputs):
364        return self.loss_layer(inputs)
365
366    shared_layer = LayerWithNestedLayerWithLoss()
367
368    m = Sequential([shared_layer])
369    m2 = Sequential([shared_layer, m])
370    m2(array_ops.constant([1, 2, 3]))
371    self.assertEqual(len(m2.losses), 2)
372    self.assertAllClose(m2.losses, [6, 12])
373
374  @keras_parameterized.run_all_keras_modes
375  def test_clear_losses(self):
376
377    class LayerWithSharedNestedLossLayer(layers.Layer):
378
379      def __init__(self):
380        super(LayerWithSharedNestedLossLayer, self).__init__()
381        self.loss_layer = layers.ActivityRegularization(l2=0.001)
382        self.add_weight(shape=(1,), regularizer='l2')
383
384      def call(self, x):
385        x = self.loss_layer(x)
386        return self.loss_layer(x)
387
388    inputs = Input(shape=(1,))
389    l = LayerWithSharedNestedLossLayer()  # Weight loss + 2 activity losses.
390
391    x1 = array_ops.ones((1, 1))
392    _ = l(x1)
393    if not context.executing_eagerly():
394      self.assertEqual(len(l.get_losses_for(x1)), 2)
395      self.assertEqual(len(l.get_losses_for(None)), 1)
396
397    x2 = array_ops.ones((1, 1))
398    _ = l(x2)
399    if not context.executing_eagerly():
400      self.assertEqual(len(l.get_losses_for(x1)), 2)
401      self.assertEqual(len(l.get_losses_for(x2)), 2)
402      self.assertEqual(len(l.get_losses_for(None)), 1)
403
404    outputs = l(inputs)
405    model = Model(inputs, outputs)
406    if not context.executing_eagerly():
407      self.assertEqual(len(model.losses), 7)
408      self.assertEqual(len(l.get_losses_for(x1)), 2)
409      self.assertEqual(len(l.get_losses_for(x2)), 2)
410      self.assertEqual(len(l.get_losses_for(None)), 1)
411
412    x3 = array_ops.ones((1, 1))
413    model(x3)
414    x4 = array_ops.ones((1, 1))
415    model(x4)
416    if context.executing_eagerly():
417      # Eager losses are cleared every `__call__`.
418      self.assertEqual(len(model.losses), 3)
419    else:
420      self.assertEqual(len(model.losses), 11)
421      self.assertEqual(len(model.get_losses_for(x3)), 2)
422      self.assertEqual(len(model.get_losses_for(x4)), 2)
423      self.assertEqual(len(model.get_losses_for(None)), 1)
424
425  @keras_parameterized.run_all_keras_modes(always_skip_v1=True)
426  def test_invalid_constant_input(self):
427    inputs = Input(shape=(1,))
428    outputs = testing_utils.Bias()(inputs)
429    model = Model(inputs, outputs)
430    with self.assertRaisesRegex(
431        ValueError,
432        'Expected a symbolic Tensors or a callable for the loss value'):
433      model.add_loss(1.)
434
435  @keras_parameterized.run_all_keras_modes(always_skip_v1=True)
436  def test_invalid_variable_input(self):
437    inputs = Input(shape=(1,))
438    outputs = testing_utils.Bias()(inputs)
439    model = Model(inputs, outputs)
440    with self.assertRaisesRegex(
441        ValueError,
442        'Expected a symbolic Tensors or a callable for the loss value'):
443      model.add_loss(model.weights[0])
444
445  @keras_parameterized.run_all_keras_modes
446  def test_add_entropy_loss_on_functional_model(self):
447    inputs = Input(shape=(1,))
448    targets = Input(shape=(1,))
449    outputs = testing_utils.Bias()(inputs)
450    model = Model([inputs, targets], outputs)
451    model.add_loss(losses.binary_crossentropy(targets, outputs))
452    model.compile('sgd', run_eagerly=testing_utils.should_run_eagerly())
453    with test.mock.patch.object(logging, 'warning') as mock_log:
454      model.fit([self.x, self.y], batch_size=3, epochs=5)
455      self.assertNotIn('Gradients do not exist for variables',
456                       str(mock_log.call_args))
457
458
459if __name__ == '__main__':
460  test.main()
461