• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Tests add_loss API correctness."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import numpy as np
22
23from tensorflow.python.eager import backprop
24from tensorflow.python.eager import context
25from tensorflow.python.eager import def_function
26from tensorflow.python.keras import Input
27from tensorflow.python.keras import keras_parameterized
28from tensorflow.python.keras import layers
29from tensorflow.python.keras import losses
30from tensorflow.python.keras import Model
31from tensorflow.python.keras import optimizer_v2
32from tensorflow.python.keras import Sequential
33from tensorflow.python.keras import testing_utils
34from tensorflow.python.ops import array_ops
35from tensorflow.python.ops import math_ops
36from tensorflow.python.platform import test
37from tensorflow.python.platform import tf_logging as logging
38from tensorflow.python.training.rmsprop import RMSPropOptimizer
39
40MAE = losses.MeanAbsoluteError
41mae = losses.mean_absolute_error
42
43
44def get_ctl_train_step(model):
45  optimizer = optimizer_v2.gradient_descent.SGD(0.05)
46
47  def train_step(x, y, w=None):
48    with backprop.GradientTape() as tape:
49      if w is not None:
50        model([x, y, w])
51      else:
52        model([x, y])
53      loss = math_ops.reduce_sum(model.losses)
54    gradients = tape.gradient(loss, model.trainable_weights)
55    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
56    return loss
57
58  return train_step
59
60
61# TODO(psv): Add tests cases where a model is used in loss function but is
62# not part of the training model.
63
64
65class TestAddLossCorrectness(keras_parameterized.TestCase):
66
67  def setUp(self):
68    super(TestAddLossCorrectness, self).setUp()
69    self.x = np.array([[0.], [1.], [2.]], dtype='float32')
70    self.y = np.array([[0.5], [2.], [3.5]], dtype='float32')
71    self.w = np.array([[1.25], [0.5], [1.25]], dtype='float32')
72
73  @keras_parameterized.run_all_keras_modes
74  def test_loss_on_model_fit(self):
75    inputs = Input(shape=(1,))
76    targets = Input(shape=(1,))
77    outputs = testing_utils.Bias()(inputs)
78    model = Model([inputs, targets], outputs)
79    model.add_loss(MAE()(targets, outputs))
80    model.add_loss(math_ops.reduce_mean(mae(targets, outputs)))
81    model.compile(
82        optimizer_v2.gradient_descent.SGD(0.05),
83        run_eagerly=testing_utils.should_run_eagerly())
84
85    history = model.fit([self.x, self.y], batch_size=3, epochs=5)
86    self.assertAllClose(history.history['loss'], [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
87
88  @keras_parameterized.run_with_all_model_types(exclude_models=['sequential'])
89  @keras_parameterized.run_all_keras_modes(always_skip_v1=True)
90  def test_loss_callable_on_model_fit(self):
91    model = testing_utils.get_model_from_layers([testing_utils.Bias()],
92                                                input_shape=(1,))
93
94    def callable_loss():
95      return math_ops.reduce_sum(model.weights)
96
97    model.add_loss(callable_loss)
98    model.compile(
99        optimizer_v2.gradient_descent.SGD(0.1),
100        run_eagerly=testing_utils.should_run_eagerly())
101
102    history = model.fit(self.x, batch_size=3, epochs=5)
103    self.assertAllClose(history.history['loss'], [0., -.1, -.2, -.3, -.4], 1e-3)
104
105  @keras_parameterized.run_all_keras_modes(always_skip_v1=True)
106  def test_loss_on_model_ctl(self):
107    def get_model_and_train_step():
108      inputs = Input(shape=(1,))
109      targets = Input(shape=(1,))
110      outputs = testing_utils.Bias()(inputs)
111      model = Model([inputs, targets], outputs)
112      model.add_loss(MAE()(targets, outputs))
113      model.add_loss(math_ops.reduce_mean(mae(targets, outputs)))
114      return get_ctl_train_step(model)
115
116    train_step = get_model_and_train_step()
117    loss = [train_step(self.x, self.y) for _ in range(5)]
118    self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
119
120    train_step = def_function.function(get_model_and_train_step())
121    loss = [train_step(self.x, self.y) for _ in range(5)]
122    self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
123
124  @keras_parameterized.run_all_keras_modes(always_skip_v1=True)
125  def test_loss_callable_on_model_ctl(self):
126    def get_model_and_train_step():
127      inputs = Input(shape=(1,))
128      targets = Input(shape=(1,))
129      outputs = testing_utils.Bias()(inputs)
130      model = Model([inputs, targets], outputs)
131
132      def callable_loss():
133        return math_ops.reduce_sum(model.weights)
134
135      model.add_loss(callable_loss)
136      return get_ctl_train_step(model)
137
138    train_step = get_model_and_train_step()
139    loss = [train_step(self.x, self.y) for _ in range(5)]
140    self.assertAllClose(loss, [0., -0.05, -0.1, -0.15, -0.2], 1e-3)
141
142    train_step = def_function.function(get_model_and_train_step())
143    loss = [train_step(self.x, self.y) for _ in range(5)]
144    self.assertAllClose(loss, [0., -0.05, -0.1, -0.15, -0.2], 1e-3)
145
146  @keras_parameterized.run_all_keras_modes
147  def test_loss_with_sample_weight_on_model_fit(self):
148    inputs = Input(shape=(1,))
149    targets = Input(shape=(1,))
150    sw = Input(shape=(1,))
151    outputs = testing_utils.Bias()(inputs)
152    model = Model([inputs, targets, sw], outputs)
153    model.add_loss(MAE()(targets, outputs, sw))
154    model.add_loss(3 * math_ops.reduce_mean(sw * mae(targets, outputs)))
155    model.compile(
156        optimizer_v2.gradient_descent.SGD(0.025),
157        run_eagerly=testing_utils.should_run_eagerly())
158
159    history = model.fit([self.x, self.y, self.w], batch_size=3, epochs=5)
160    self.assertAllClose(history.history['loss'], [4., 3.6, 3.2, 2.8, 2.4], 1e-3)
161
162  @keras_parameterized.run_all_keras_modes(always_skip_v1=True)
163  def test_loss_with_sample_weight_on_model_ctl(self):
164    def get_model_and_train_step():
165      inputs = Input(shape=(1,))
166      targets = Input(shape=(1,))
167      sw = Input(shape=(1,))
168      outputs = testing_utils.Bias()(inputs)
169      model = Model([inputs, targets, sw], outputs)
170      model.add_loss(MAE()(targets, outputs, sw))
171      model.add_loss(math_ops.reduce_mean(sw * mae(targets, outputs)))
172      return get_ctl_train_step(model)
173
174    train_step = get_model_and_train_step()
175    loss = [train_step(self.x, self.y, self.w) for _ in range(5)]
176    self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
177
178    train_step = def_function.function(get_model_and_train_step())
179    loss = [train_step(self.x, self.y, self.w) for _ in range(5)]
180    self.assertAllClose(loss, [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
181
182  @keras_parameterized.run_all_keras_modes
183  def test_loss_with_sample_weight_in_model_call(self):
184
185    class MyModel(Model):
186
187      def __init__(self):
188        super(MyModel, self).__init__()
189        self.bias = testing_utils.Bias()
190
191      def call(self, inputs):
192        outputs = self.bias(inputs[0])
193        self.add_loss(MAE()(inputs[1], outputs, inputs[2]))
194        self.add_loss(math_ops.reduce_mean(inputs[2] * mae(inputs[1], outputs)))
195        return outputs
196
197    model = MyModel()
198    model.predict([self.x, self.y, self.w])
199    model.compile(
200        optimizer_v2.gradient_descent.SGD(0.05),
201        run_eagerly=testing_utils.should_run_eagerly())
202
203    history = model.fit([self.x, self.y, self.w], batch_size=3, epochs=5)
204    self.assertEqual(len(model.losses), 2)
205    self.assertAllClose(history.history['loss'], [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
206
207    eval_out = model.evaluate([self.x, self.y, self.w])
208    self.assertAlmostEqual(eval_out, 1.0, 3)
209
210  @keras_parameterized.run_all_keras_modes
211  def test_loss_with_sample_weight_in_layer_call(self):
212
213    class MyLayer(layers.Layer):
214
215      def __init__(self):
216        super(MyLayer, self).__init__()
217        self.bias = testing_utils.Bias()
218
219      def call(self, inputs):
220        out = self.bias(inputs[0])
221        self.add_loss(MAE()(inputs[1], out, inputs[2]))
222        self.add_loss(math_ops.reduce_mean(inputs[2] * mae(inputs[1], out)))
223        return out
224
225    inputs = Input(shape=(1,))
226    targets = Input(shape=(1,))
227    sw = Input(shape=(1,))
228
229    outputs = MyLayer()([inputs, targets, sw])
230    model = Model([inputs, targets, sw], outputs)
231    model.predict([self.x, self.y, self.w])
232    model.compile(
233        optimizer_v2.gradient_descent.SGD(0.05),
234        run_eagerly=testing_utils.should_run_eagerly())
235
236    history = model.fit([self.x, self.y, self.w], batch_size=3, epochs=5)
237    self.assertAllClose(history.history['loss'], [2., 1.8, 1.6, 1.4, 1.2], 1e-3)
238
239    output = model.evaluate([self.x, self.y, self.w])
240    self.assertAlmostEqual(output, 1.0, 3)
241
242    output = model.test_on_batch([self.x, self.y, self.w])
243    self.assertAlmostEqual(output, 1.0, 3)
244
245  @keras_parameterized.run_all_keras_modes
246  def test_loss_on_layer(self):
247
248    class MyLayer(layers.Layer):
249
250      def call(self, inputs):
251        self.add_loss(math_ops.reduce_sum(inputs))
252        return inputs
253
254    inputs = Input((3,))
255    layer = MyLayer()
256    outputs = layer(inputs)
257    model = Model(inputs, outputs)
258    self.assertEqual(len(model.losses), 1)
259    model.compile(
260        'sgd',
261        'mse',
262        run_eagerly=testing_utils.should_run_eagerly())
263    loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3)))
264    self.assertEqual(loss, 2 * 3)
265
266  @keras_parameterized.run_all_keras_modes
267  @keras_parameterized.run_with_all_model_types
268  def test_activity_regularizer(self):
269    loss = {}
270    for reg in [None, 'l2']:
271      model_layers = [
272          layers.Dense(
273              10,
274              activation='relu',
275              activity_regularizer=reg,
276              kernel_initializer='ones',
277              use_bias=False),
278          layers.Dense(
279              1,
280              activation='sigmoid',
281              kernel_initializer='ones',
282              use_bias=False),
283      ]
284
285      model = testing_utils.get_model_from_layers(
286          model_layers, input_shape=(10,))
287
288      x = np.ones((10, 10), 'float32')
289      y = np.zeros((10, 1), 'float32')
290
291      optimizer = RMSPropOptimizer(learning_rate=0.001)
292      model.compile(
293          optimizer,
294          'binary_crossentropy',
295          run_eagerly=testing_utils.should_run_eagerly())
296      model.fit(x, y, batch_size=2, epochs=5)
297      loss[reg] = model.evaluate(x, y)
298    self.assertLess(loss[None], loss['l2'])
299
300  @keras_parameterized.run_all_keras_modes
301  @keras_parameterized.run_with_all_model_types
302  def test_activity_regularizer_loss_value(self):
303    layer = layers.Dense(
304        1,
305        kernel_initializer='zeros',
306        bias_initializer='ones',
307        activity_regularizer='l2')
308
309    model = testing_utils.get_model_from_layers([layer], input_shape=(10,))
310
311    x = np.ones((10, 10), 'float32')
312    optimizer = RMSPropOptimizer(learning_rate=0.001)
313    model.compile(
314        optimizer,
315        run_eagerly=testing_utils.should_run_eagerly())
316    loss = model.test_on_batch(x)
317    self.assertAlmostEqual(0.01, loss, places=4)
318
319  @keras_parameterized.run_all_keras_modes
320  def test_activity_regularizer_batch_independent(self):
321    inputs = layers.Input(shape=(10,))
322    x = layers.Dense(10, activation='relu', activity_regularizer='l2')(inputs)
323    outputs = layers.Dense(1, activation='sigmoid')(x)
324    model = Model(inputs, outputs)
325
326    optimizer = RMSPropOptimizer(learning_rate=0.001)
327    model.compile(
328        optimizer,
329        run_eagerly=testing_utils.should_run_eagerly())
330
331    loss_small_batch = model.test_on_batch(np.ones((10, 10), 'float32'))
332    loss_big_batch = model.test_on_batch(np.ones((20, 10), 'float32'))
333    self.assertAlmostEqual(loss_small_batch, loss_big_batch, places=4)
334
335  @keras_parameterized.run_all_keras_modes
336  def test_with_shared_layer(self):
337
338    class LayerWithLoss(layers.Layer):
339
340      def call(self, inputs):
341        self.add_loss(math_ops.reduce_sum(inputs), inputs=inputs)
342        return inputs * 2
343
344    shared_layer = LayerWithLoss()
345
346    m = Sequential([shared_layer])
347    m2 = Sequential([shared_layer, m])
348    m2(array_ops.constant([1, 2, 3]))
349    self.assertEqual(len(m2.losses), 2)
350    self.assertAllClose(m2.losses, [6, 12])
351
352  @keras_parameterized.run_all_keras_modes
353  def test_with_shared_nested_layer(self):
354
355    class LayerWithLoss(layers.Layer):
356
357      def call(self, inputs):
358        self.add_loss(math_ops.reduce_sum(inputs), inputs=inputs)
359        return inputs * 2
360
361    class LayerWithNestedLayerWithLoss(layers.Layer):
362
363      def __init__(self):
364        super(LayerWithNestedLayerWithLoss, self).__init__()
365        self.loss_layer = LayerWithLoss()
366
367      def call(self, inputs):
368        return self.loss_layer(inputs)
369
370    shared_layer = LayerWithNestedLayerWithLoss()
371
372    m = Sequential([shared_layer])
373    m2 = Sequential([shared_layer, m])
374    m2(array_ops.constant([1, 2, 3]))
375    self.assertEqual(len(m2.losses), 2)
376    self.assertAllClose(m2.losses, [6, 12])
377
378  @keras_parameterized.run_all_keras_modes
379  def test_clear_losses(self):
380
381    class LayerWithSharedNestedLossLayer(layers.Layer):
382
383      def __init__(self):
384        super(LayerWithSharedNestedLossLayer, self).__init__()
385        self.loss_layer = layers.ActivityRegularization(l2=0.001)
386        self.add_weight(shape=(1,), regularizer='l2')
387
388      def call(self, x):
389        x = self.loss_layer(x)
390        return self.loss_layer(x)
391
392    inputs = Input(shape=(1,))
393    l = LayerWithSharedNestedLossLayer()  # Weight loss + 2 activity losses.
394
395    x1 = array_ops.ones((1, 1))
396    _ = l(x1)
397    if not context.executing_eagerly():
398      self.assertEqual(len(l.get_losses_for(x1)), 2)
399      self.assertEqual(len(l.get_losses_for(None)), 1)
400
401    x2 = array_ops.ones((1, 1))
402    _ = l(x2)
403    if not context.executing_eagerly():
404      self.assertEqual(len(l.get_losses_for(x1)), 2)
405      self.assertEqual(len(l.get_losses_for(x2)), 2)
406      self.assertEqual(len(l.get_losses_for(None)), 1)
407
408    outputs = l(inputs)
409    model = Model(inputs, outputs)
410    if not context.executing_eagerly():
411      self.assertEqual(len(model.losses), 7)
412      self.assertEqual(len(l.get_losses_for(x1)), 2)
413      self.assertEqual(len(l.get_losses_for(x2)), 2)
414      self.assertEqual(len(l.get_losses_for(None)), 1)
415
416    x3 = array_ops.ones((1, 1))
417    model(x3)
418    x4 = array_ops.ones((1, 1))
419    model(x4)
420    if context.executing_eagerly():
421      # Eager losses are cleared every `__call__`.
422      self.assertEqual(len(model.losses), 3)
423    else:
424      self.assertEqual(len(model.losses), 11)
425      self.assertEqual(len(model.get_losses_for(x3)), 2)
426      self.assertEqual(len(model.get_losses_for(x4)), 2)
427      self.assertEqual(len(model.get_losses_for(None)), 1)
428
429  @keras_parameterized.run_all_keras_modes(always_skip_v1=True)
430  def test_invalid_constant_input(self):
431    inputs = Input(shape=(1,))
432    outputs = testing_utils.Bias()(inputs)
433    model = Model(inputs, outputs)
434    with self.assertRaisesRegex(
435        ValueError,
436        'Expected a symbolic Tensors or a callable for the loss value'):
437      model.add_loss(1.)
438
439  @keras_parameterized.run_all_keras_modes(always_skip_v1=True)
440  def test_invalid_variable_input(self):
441    inputs = Input(shape=(1,))
442    outputs = testing_utils.Bias()(inputs)
443    model = Model(inputs, outputs)
444    with self.assertRaisesRegex(
445        ValueError,
446        'Expected a symbolic Tensors or a callable for the loss value'):
447      model.add_loss(model.weights[0])
448
449  @keras_parameterized.run_all_keras_modes
450  def test_add_entropy_loss_on_functional_model(self):
451    inputs = Input(shape=(1,))
452    targets = Input(shape=(1,))
453    outputs = testing_utils.Bias()(inputs)
454    model = Model([inputs, targets], outputs)
455    model.add_loss(losses.binary_crossentropy(targets, outputs))
456    model.compile('sgd', run_eagerly=testing_utils.should_run_eagerly())
457    with test.mock.patch.object(logging, 'warning') as mock_log:
458      model.fit([self.x, self.y], batch_size=3, epochs=5)
459      self.assertNotIn('Gradients do not exist for variables',
460                       str(mock_log.call_args))
461
462
463if __name__ == '__main__':
464  test.main()
465