# Copyright 2018 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Tests for TensorFlow 2.0 layer behavior.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function import copy import os import sys import traceback import numpy as np from tensorflow.python.eager import context from tensorflow.python.eager import def_function from tensorflow.python.framework import composite_tensor from tensorflow.python.framework import constant_op from tensorflow.python.framework import dtypes from tensorflow.python.framework import errors_impl from tensorflow.python.framework import ops from tensorflow.python.framework import sparse_tensor from tensorflow.python.framework import tensor_spec from tensorflow.python.framework import type_spec from tensorflow.python.keras import backend from tensorflow.python.keras import combinations from tensorflow.python.keras import keras_parameterized from tensorflow.python.keras import layers from tensorflow.python.keras import regularizers from tensorflow.python.keras import testing_utils from tensorflow.python.keras.engine import base_layer from tensorflow.python.keras.engine import input_layer from tensorflow.python.keras.engine import sequential from tensorflow.python.keras.engine import training as training_lib from tensorflow.python.keras.legacy_tf_layers import core as legacy_core from tensorflow.python.keras.optimizer_v2 import rmsprop from tensorflow.python.keras.utils import control_flow_util from tensorflow.python.module import module from tensorflow.python.ops import array_ops from tensorflow.python.ops import math_ops from tensorflow.python.ops import state_ops from tensorflow.python.ops import summary_ops_v2 from tensorflow.python.ops import tensor_array_ops from tensorflow.python.ops import variables from tensorflow.python.ops.ragged import ragged_tensor from tensorflow.python.platform import gfile from tensorflow.python.platform import test from tensorflow.python.summary import summary_iterator from tensorflow.python.util import nest class DynamicLayer(base_layer.Layer): def __init__(self, dynamic=False, **kwargs): super(DynamicLayer, self).__init__(dynamic=dynamic, **kwargs) def call(self, inputs): samples = tensor_array_ops.TensorArray( dtype=dtypes.float32, size=array_ops.shape(inputs)[0]) for idx, sample in enumerate(inputs): samples = samples.write(idx, math_ops.square(sample)) return samples.stack() def compute_output_shape(self, input_shape): return input_shape class InvalidLayer(base_layer.Layer): def call(self, inputs): raise ValueError('You did something wrong!') class BaseLayerTest(keras_parameterized.TestCase): @combinations.generate(combinations.keras_mode_combinations()) def test_layer_instrumentation(self): layer = layers.Add() self.assertTrue(layer._instrumented_keras_api) self.assertTrue(layer._instrumented_keras_layer_class) self.assertFalse(layer._instrumented_keras_model_class) self.assertTrue(base_layer.keras_api_gauge.get_cell('tf.keras.layers.Add')) base_layer.keras_api_gauge.get_cell('tf.keras.layers.Add').set(False) @combinations.generate(combinations.keras_model_type_combinations()) def test_dynamic_layer(self): model = testing_utils.get_model_from_layers([DynamicLayer(dynamic=True)], input_shape=(3,)) self.assertEqual(model.dynamic, True) model.compile(rmsprop.RMSprop(0.001), loss='mse') self.assertEqual(model.run_eagerly, True) model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3))) @combinations.generate(combinations.keras_model_type_combinations()) def test_dynamic_layer_error(self): # Functional Models hit the `dyanamic=True` error during construction. # Subclass Models should just throw the original autograph error during # execution. raised_error = False try: model = testing_utils.get_model_from_layers([DynamicLayer()], input_shape=(3,)) model.compile(rmsprop.RMSprop(0.001), loss='mse') model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3))) except errors_impl.OperatorNotAllowedInGraphError as e: if 'iterating over `tf.Tensor` is not allowed' in str(e): raised_error = True except TypeError as e: if 'attempting to use Python control flow' in str(e): raised_error = True self.assertTrue(raised_error) @combinations.generate(combinations.keras_model_type_combinations()) def test_dynamic_layer_error_running_in_graph_mode(self): with ops.get_default_graph().as_default(): model = testing_utils.get_model_from_layers([DynamicLayer(dynamic=True)], input_shape=(3,)) self.assertEqual(model.dynamic, True) # But then you cannot run the model since you're in a graph scope. with self.assertRaisesRegex(ValueError, 'You must enable eager execution'): model.compile(rmsprop.RMSprop(0.001), loss='mse') def test_manual_compute_output_shape(self): class BuildCounter(base_layer.Layer): def __init__(self, *args, **kwargs): # pylint: disable=redefined-outer-name super(BuildCounter, self).__init__(*args, **kwargs) self.build_counter = 0 def build(self, input_shape): self.build_counter += 1 self.build_shape = input_shape def call(self, inputs): return inputs layer = BuildCounter(dtype=dtypes.float64) output_shape = layer.compute_output_shape((None, 10)) self.assertEqual(layer.build_counter, 1) self.assertEqual(layer.build_shape.as_list(), [None, 10]) self.assertEqual(output_shape.as_list(), [None, 10]) output_signature = layer.compute_output_signature( tensor_spec.TensorSpec(dtype=dtypes.float64, shape=[None, 10])) self.assertEqual(layer.build_counter, 1) self.assertEqual(layer.build_shape.as_list(), [None, 10]) self.assertEqual(output_signature.dtype, dtypes.float64) self.assertEqual(output_signature.shape.as_list(), [None, 10]) layer(np.ones((5, 10))) self.assertEqual(layer.build_counter, 1) self.assertEqual(layer.build_shape.as_list(), [None, 10]) def test_dynamic_layer_with_deferred_sequential_model(self): model = sequential.Sequential([DynamicLayer(dynamic=True), layers.Dense(3)]) self.assertEqual(model.dynamic, True) model.compile(rmsprop.RMSprop(0.001), loss='mse') self.assertEqual(model.run_eagerly, True) model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3))) def test_nested_dynamic_layers_in_eager_mode(self): inputs = input_layer.Input((3,)) outputs = DynamicLayer(dynamic=True)(inputs) inner_model = training_lib.Model(inputs, outputs) self.assertEqual(inner_model.dynamic, True) inputs = input_layer.Input((3,)) x = DynamicLayer(dynamic=True)(inputs) outputs = inner_model(x) model = training_lib.Model(inputs, outputs) self.assertEqual(model.dynamic, True) model.compile(rmsprop.RMSprop(0.001), loss='mse') self.assertEqual(model.run_eagerly, True) model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3))) def test_dynamic_subclassed_model_no_shape_inference(self): class MyModel(training_lib.Model): def __init__(self): super(MyModel, self).__init__(dynamic=True) self.layer1 = layers.Dense(3) self.layer2 = layers.Dense(3) def call(self, inputs): if math_ops.reduce_sum(inputs) > 0: return self.layer1(inputs) else: return self.layer2(inputs) model = MyModel() self.assertEqual(model.dynamic, True) model.compile(rmsprop.RMSprop(0.001), loss='mse') self.assertEqual(model.run_eagerly, True) model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3))) self.assertEqual(model.outputs, None) def test_dynamic_subclassed_model_with_shape_inference(self): class MyModel(training_lib.Model): def __init__(self): super(MyModel, self).__init__(dynamic=True) self.layer1 = layers.Dense(3) self.layer2 = layers.Dense(3) def call(self, inputs): if math_ops.reduce_sum(inputs) > 0: return self.layer1(inputs) else: return self.layer2(inputs) def compute_output_shape(self, input_shape): return tuple(input_shape[:-1].as_list()) + (3,) model = MyModel() self.assertEqual(model.dynamic, True) model.compile(rmsprop.RMSprop(0.001), loss='mse') x, y = np.random.random((2, 3)), np.random.random((2, 3)) model.train_on_batch(x, y) outputs = model(x) self.assertEqual(outputs.shape.as_list(), [2, 3]) def test_deepcopy(self): bias_reg = lambda x: 1e-3 * math_ops.reduce_sum(x) layer = layers.Conv2D(32, (3, 3), bias_regularizer=bias_reg) # Call the Layer on data to generate regularize losses. layer(array_ops.ones((1, 10, 10, 3))) self.assertLen(layer.losses, 1) new_layer = copy.deepcopy(layer) self.assertEqual(new_layer.bias_regularizer, bias_reg) self.assertEqual(layer.get_config(), new_layer.get_config()) @combinations.generate(combinations.combine(mode=['graph', 'eager'])) def test_invalid_forward_pass(self): inputs = input_layer.Input((3,)) with self.assertRaisesRegex(ValueError, 'You did something wrong!'): _ = InvalidLayer()(inputs) def test_no_legacy_model(self): inputs = input_layer.Input((1,)) legacy_dense_0 = legacy_core.Dense(1, name='legacy_dense_0') legacy_dense_1 = legacy_core.Dense(1, name='legacy_dense_1') layer = legacy_dense_0(inputs) layer = layers.Dense(1)(layer) layer = legacy_dense_1(layer) expected_regex = (r'The following are legacy tf\.layers\.Layers:\n ' '{}\n {}'.format(legacy_dense_0, legacy_dense_1)) with self.assertRaisesRegex(TypeError, expected_regex): _ = training_lib.Model(inputs=[inputs], outputs=[layer]) model = training_lib.Model(inputs=[inputs], outputs=[inputs]) with self.assertRaisesRegex(TypeError, expected_regex): model._insert_layers([legacy_dense_0, legacy_dense_1]) def test_no_legacy_sequential(self): layer = [layers.Dense(1), legacy_core.Dense(1, name='legacy_dense_0')] expected_regex = r'legacy tf\.layers\.Layers:\n {}'.format(layer[1]) with self.assertRaisesRegex(TypeError, expected_regex): _ = sequential.Sequential(layer) with self.assertRaisesRegex(TypeError, expected_regex): _ = sequential.Sequential([input_layer.Input(shape=(4,))] + layer) model = sequential.Sequential() with self.assertRaisesRegex(TypeError, expected_regex): for l in layer: model.add(l) @combinations.generate( combinations.times( combinations.keras_model_type_combinations(), combinations.combine(mode=['graph', 'eager']))) def test_build_with_numpy_data(self): model_layers = [ layers.Dense(3, activation='relu', kernel_initializer='ones'), layers.Dense(1, activation='sigmoid', kernel_initializer='ones') ] model = testing_utils.get_model_from_layers(model_layers, input_shape=(4,)) model(np.zeros((2, 4), dtype='float32')) self.assertTrue(model.built) @combinations.generate(combinations.combine(mode=['graph', 'eager'])) def test_default_add_weight(self): class TestLayer(base_layer.Layer): def __init__(self): super(TestLayer, self).__init__() self.default_weight = self.add_weight() self.weight_without_name = self.add_weight(shape=(3, 4)) self.regularized_weight_without_name = self.add_weight( shape=(3, 4), regularizer='l2') layer = TestLayer() self.assertEqual(layer.default_weight.shape.as_list(), []) self.assertEqual(layer.weight_without_name.shape.as_list(), [3, 4]) self.assertEqual(layer.default_weight.dtype.name, 'float32') self.assertEqual(layer.weight_without_name.dtype.name, 'float32') self.assertEqual(len(layer.losses), 1) if not context.executing_eagerly(): # Cannot access tensor.name in eager execution. self.assertIn('Variable_2/Regularizer', layer.losses[0].name) @combinations.generate(combinations.keras_mode_combinations(mode=['eager'])) def test_learning_phase_freezing_for_layers(self): class LearningPhaseLayer(base_layer.Layer): def call(self, inputs): return backend.in_train_phase(lambda: array_ops.ones_like(inputs), lambda: array_ops.zeros_like(inputs)) def get_learning_phase_value(): model = sequential.Sequential([LearningPhaseLayer(input_shape=(1,))]) model._run_eagerly = testing_utils.should_run_eagerly() return np.sum(model(np.ones((1, 1)))) self.assertEqual(get_learning_phase_value(), 0) # Test scope. with backend.learning_phase_scope(1): self.assertEqual(get_learning_phase_value(), 1) # The effects of the scope end after exiting it. self.assertEqual(get_learning_phase_value(), 0) # Test setting. backend.set_learning_phase(1) self.assertEqual(get_learning_phase_value(), 1) backend.set_learning_phase(0) self.assertEqual(get_learning_phase_value(), 0) # Cannot be enabled with `run_eagerly=True`, see b/123904578 @combinations.generate(combinations.combine(mode=['graph', 'eager'])) def test_layer_can_return_variable(self): class ComputeSum(base_layer.Layer): def __init__(self): super(ComputeSum, self).__init__() self.total = variables.Variable( initial_value=array_ops.zeros((1, 1)), trainable=False) if not context.executing_eagerly(): backend.get_session().run(self.total.initializer) def call(self, inputs): self.total.assign_add(inputs) return self.total inputs = input_layer.Input(shape=(1,)) model = training_lib.Model(inputs, ComputeSum()(inputs)) model.predict(np.ones((1, 1))) def _get_layer_with_training_arg(self): class TrainingLayer(base_layer.Layer): """A layer with a `training` argument in a defuned `call`.""" @def_function.function def call(self, inputs, training=None): if training is None: training = backend.learning_phase() return control_flow_util.smart_cond( training, lambda: array_ops.ones_like(inputs), lambda: array_ops.zeros_like(inputs)) return TrainingLayer() # b/124459427: can't test with `run_eagerly=True` for now. @combinations.generate( combinations.times(combinations.keras_mode_combinations(), combinations.keras_model_type_combinations())) def test_training_arg_in_defun(self): layer = self._get_layer_with_training_arg() model = testing_utils.get_model_from_layers([layer], input_shape=(1,)) model.compile(rmsprop.RMSprop(0.), loss='mae') history = model.fit(np.zeros((1, 1)), np.zeros((1, 1))) self.assertEqual(history.history['loss'][0], 1.) loss = model.evaluate(np.zeros((1, 1)), np.zeros((1, 1))) self.assertEqual(loss, 0.) # Test that the argument injection performed in `call` is not active # when the argument is passed explicitly. layer = self._get_layer_with_training_arg() inputs = input_layer.Input(shape=(1,)) # Pass `training` by name outputs = layer(inputs, training=False) model = training_lib.Model(inputs, outputs) model.compile(rmsprop.RMSprop(0.), loss='mae') history = model.fit(np.zeros((1, 1)), np.zeros((1, 1))) self.assertEqual(history.history['loss'][0], 0.) @combinations.generate( combinations.times(combinations.keras_mode_combinations(), combinations.keras_model_type_combinations())) def test_raw_variable_assignment(self): class RawVariableLayer(base_layer.Layer): def __init__(self, **kwargs): super(RawVariableLayer, self).__init__(**kwargs) # Test variables in nested structure. self.var_list = [variables.Variable(1.), {'a': variables.Variable(2.)}] def call(self, inputs): return inputs * self.var_list[0] * self.var_list[1]['a'] model = testing_utils.get_model_from_layers([RawVariableLayer()], input_shape=(10,)) model.compile( 'sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly()) x, y = np.ones((10, 10)), np.ones((10, 10)) # Checks that variables get initialized. model.fit(x, y, batch_size=2, epochs=2) @combinations.generate(combinations.combine(mode=['eager'])) def test_composite_variable_assignment(self): class Spec(type_spec.TypeSpec): value_type = property(lambda self: CompositeVariable) def _component_specs(self): pass def _serialize(self): pass def _to_components(self, value): return value._variables def _from_components(self, variable_list): return CompositeVariable(variable_list) class CompositeVariable(composite_tensor.CompositeTensor): def __init__(self, variable_list): self._variables = variable_list @property def _type_spec(self): return Spec() class CompositeVariableLayer(base_layer.Layer): def __init__(self): super().__init__() self.composite_var = CompositeVariable( [variables.Variable(1.), variables.Variable(2.)]) layer = CompositeVariableLayer() self.assertLen(layer.weights, 2) self.assertIsInstance(layer.weights[0], variables.Variable) self.assertIsInstance(layer.weights[1], variables.Variable) self.assertEqual(self.evaluate(layer.weights[0]), 1.) self.assertEqual(self.evaluate(layer.weights[1]), 2.) @combinations.generate(combinations.combine(mode=['graph', 'eager'])) def test_layer_names(self): inputs = input_layer.Input(shape=[2]) add1 = inputs + inputs add2 = layers.Add()([inputs, inputs]) add3 = inputs + inputs add4 = layers.Add()([inputs, inputs]) model = training_lib.Model(inputs=[inputs], outputs=[add1, add2, add3, add4]) actual_names = [l.name for l in model.layers] graph_names = [ 'input_1', 'tf_op_layer_add', 'add', 'tf_op_layer_add_2', 'add_1' ] eager_names = [ 'input_1', 'tf.__operators__.add', 'add', 'tf.__operators__.add_1', 'add_1' ] for actual, eager, graph in zip(actual_names, graph_names, eager_names): self.assertIn(actual, {eager, graph}) @combinations.generate(combinations.combine(mode=['eager'])) def test_layer_names_after_loading(self): backend.clear_session() # Mimic loading a model that already contained add layers with # name = 'add_1' and 'tf.__operators__.add' layers.Add(name='add_1') layers.Add(name='tf.__operators__.add') inputs = input_layer.Input(shape=[2]) add1 = inputs + inputs add2 = layers.Add()([inputs, inputs]) add3 = inputs + inputs add4 = layers.Add()([inputs, inputs]) model = training_lib.Model( inputs=[inputs], outputs=[add1, add2, add3, add4]) actual_names = [l.name for l in model.layers] # The generated op layer names should have avoided layer names seen in # the loaded model. (This avoiance should not apply to non-op-layers) expected_names = [ 'input_1', 'tf.__operators__.add_1', 'add', 'tf.__operators__.add_2', 'add_1' ] self.assertAllEqual(actual_names, expected_names) def test_add_trainable_weight_on_frozen_layer(self): class TestLayer(base_layer.Layer): def build(self, input_shape): self.w = self.add_weight(shape=(), trainable=True) def call(self, inputs): return self.w * inputs layer = TestLayer() layer.trainable = False layer.build(None) layer.trainable = True self.assertListEqual(layer.trainable_weights, [layer.w]) @combinations.generate( combinations.times(combinations.keras_mode_combinations(), combinations.keras_model_type_combinations())) def test_passing_initial_weights_values(self): kernel_value = np.random.random((10, 2)) layer_with_weights = layers.Dense(2, use_bias=False, weights=[kernel_value]) model = testing_utils.get_model_from_layers([layer_with_weights], input_shape=(10,)) model.compile( 'sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly()) inputs = np.random.random((3, 10)) out = model.predict(inputs) self.assertAllClose(model.layers[-1].get_weights()[0], kernel_value) self.assertAllClose(out, np.dot(inputs, kernel_value)) @combinations.generate(combinations.combine(mode=['graph', 'eager'])) def test_set_weights_and_get_weights(self): layer = layers.Dense(2) layer.build((None, 10)) kernel = np.random.random((10, 2)) bias = np.random.random((2,)) layer.set_weights([kernel, bias]) weights = layer.get_weights() self.assertEqual(len(weights), 2) self.assertAllClose(weights[0], kernel) self.assertAllClose(weights[1], bias) with self.assertRaisesRegex(ValueError, 'but the layer was expecting 2 weights'): layer.set_weights([1, 2, 3]) with self.assertRaisesRegex(ValueError, 'not compatible with provided weight shape'): layer.set_weights([kernel.T, bias]) def test_get_config_error(self): class MyLayer(base_layer.Layer): def __init__(self, my_kwarg='default', **kwargs): super(MyLayer, self).__init__(**kwargs) self.my_kwarg = my_kwarg # `__init__` includes kwargs but `get_config` is not overridden, so # an error should be thrown: with self.assertRaisesRegex(NotImplementedError, 'Layer MyLayer has'): MyLayer('custom').get_config() class MyLayerNew(base_layer.Layer): def __init__(self, my_kwarg='default', **kwargs): super(MyLayerNew, self).__init__(**kwargs) self.my_kwarg = my_kwarg def get_config(self): config = super(MyLayerNew, self).get_config() config['my_kwarg'] = self.my_kwarg return config # Test to make sure that error is not raised if the method call is # from an overridden `get_config`: self.assertEqual(MyLayerNew('custom').get_config()['my_kwarg'], 'custom') class MyLayerNew2(base_layer.Layer): def __init__(self, name='MyLayerName', dtype=None, **kwargs): # pylint:disable=redefined-outer-name super(MyLayerNew2, self).__init__(name=name, dtype=dtype, **kwargs) # Check that if the kwargs in `__init__` are base layer constructor # arguments, no error is thrown: self.assertEqual(MyLayerNew2(name='New').get_config()['name'], 'New') @combinations.generate(combinations.combine(mode=['graph', 'eager'])) def test_count_params(self): dense = layers.Dense(16) dense.build((None, 4)) self.assertEqual(dense.count_params(), 16 * 4 + 16) dense = layers.Dense(16) with self.assertRaisesRegex(ValueError, 'call `count_params`'): dense.count_params() model = sequential.Sequential(layers.Dense(16)) with self.assertRaisesRegex(ValueError, 'call `count_params`'): model.count_params() dense = layers.Dense(16, input_dim=4) model = sequential.Sequential(dense) self.assertEqual(model.count_params(), 16 * 4 + 16) def test_super_not_called(self): class CustomLayerNotCallingSuper(base_layer.Layer): def __init__(self): pass layer = CustomLayerNotCallingSuper() with self.assertRaisesRegex(RuntimeError, 'You must call `super()'): layer(np.random.random((10, 2))) @combinations.generate(combinations.combine(mode=['graph', 'eager'])) def test_first_arg_not_called_inputs(self): x, y = array_ops.ones((10, 1)), array_ops.ones((10, 1)) class ArgLayer(base_layer.Layer): def call(self, x, y): return x + y layer = ArgLayer() out = self.evaluate(layer(x=x, y=y)) self.assertAllClose(out, 2 * np.ones((10, 1))) class KwargLayer(base_layer.Layer): def call(self, x=None, y=None): return x + y layer = KwargLayer() out = self.evaluate(layer(x=x, y=y)) self.assertAllClose(out, 2 * np.ones((10, 1))) with self.assertRaisesRegex(ValueError, 'must always be passed'): layer(y=y) class TFFunctionLayer(base_layer.Layer): @def_function.function def call(self, x, y=None): if y is None: return x return x + y layer = TFFunctionLayer() out = self.evaluate(layer(x=x, y=y)) self.assertAllClose(out, 2 * np.ones((10, 1))) def test_build_input_shape(self): class CustomLayer(base_layer.Layer): def build(self, input_shape): self.add_weight('w', shape=input_shape[1:]) super(CustomLayer, self).build(input_shape) layer = CustomLayer() self.assertFalse(layer.built) layer.build([None, 1, 2, 3]) self.assertTrue(layer.built) self.assertEqual([None, 1, 2, 3], layer._build_input_shape) layer = CustomLayer() layer(input_layer.Input((3,))) self.assertTrue(layer.built) self.assertEqual([None, 3], layer._build_input_shape.as_list()) @combinations.generate(combinations.combine(mode=['eager'])) def custom_layer_training_arg(self): class CustomLayerNoTrainingArg(base_layer.Layer): def __init__(self, nested_layer=None): self._nested_layer = nested_layer or array_ops.identity def call(self, inputs): return self._nested_layer(inputs) class CustomLayerDefaultTrainingMissing(base_layer.Layer): def __init__(self, nested_layer=None): self._nested_layer = nested_layer or array_ops.identity def call(self, inputs, training): if training: return self._nested_layer(inputs) else: return self._nested_layer(inputs) * 0.5 class CustomLayerDefaultTrainingNone(base_layer.Layer): def __init__(self, nested_layer=None): self._nested_layer = nested_layer or array_ops.identity def call(self, inputs, training=None): if training: return self._nested_layer(inputs) else: return self._nested_layer(inputs) * 0.5 class CustomLayerDefaultTrainingFalse(base_layer.Layer): def __init__(self, nested_layer=None): self._nested_layer = nested_layer or array_ops.identity def call(self, inputs, training=False): if training: return self._nested_layer(inputs) else: return self._nested_layer(inputs) * 0.5 class CustomLayerDefaultTrainingTrue(base_layer.Layer): def __init__(self, nested_layer=None): self._nested_layer = nested_layer or array_ops.identity def call(self, inputs, training=True): if training: return self._nested_layer(inputs) else: return self._nested_layer(inputs) * 0.5 x = array_ops.ones(shape=(1, 1)) # If the layer signature doesn't specify a default training arg, # run it in inference mode when to training arg is passed # to __call__ layer = CustomLayerDefaultTrainingMissing() self.assertAllEqual(layer(x), x * 0.5) self.assertAllEqual(layer(x, training=False), x * 0.5) self.assertAllEqual(layer(x, training=True), x) # If the layer signature specifies `False` as the default training arg, # run it in inference mode when no training arg is passed # to __call__ layer = CustomLayerDefaultTrainingFalse() self.assertAllEqual(layer(x), x * 0.5) self.assertAllEqual(layer(x, training=False), x * 0.5) self.assertAllEqual(layer(x, training=True), x) # If the layer signature specifies `True` as the default training arg, # explicitly run it in training mode when no training arg is passed # to __call__ layer = CustomLayerDefaultTrainingTrue() self.assertAllEqual(layer(x), x) self.assertAllEqual(layer(x, training=False), x * 0.5) self.assertAllEqual(layer(x, training=True), x) # Outer layers/models should set the training context implicitly for all # nested layers, respecting whatever mode the outer layer was run with. layer = CustomLayerDefaultTrainingTrue(CustomLayerDefaultTrainingFalse()) # No outer value passed: use local defaults self.assertAllEqual(layer(x), x * 0.25) # Use local default False # Outer value passed: override local defaults self.assertAllEqual(layer(x, training=False), x * 0.25) self.assertAllEqual(layer(x, training=True), x) layer = CustomLayerDefaultTrainingFalse(CustomLayerDefaultTrainingTrue()) # No outer value passed: use local defaults self.assertAllEqual(layer(x), x) # Use local default True # Outer value passed: override local defaults self.assertAllEqual(layer(x, training=False), x * 0.25) self.assertAllEqual(layer(x, training=True), x) # If the outer layer `call` doesn't take a training argument at all, # it'll set the nested scope as None when no training arg is passed in. # If a training arg is passed in it won't use it directly in `call`, but # it will set the nested training mode. layer = CustomLayerNoTrainingArg(CustomLayerDefaultTrainingTrue()) self.assertAllEqual(layer(x), x) # Use local default True self.assertAllEqual(layer(x, training=False), x * 0.5) self.assertAllEqual(layer(x, training=True), x) layer = CustomLayerDefaultTrainingNone(CustomLayerDefaultTrainingTrue()) self.assertAllEqual(layer(x), x) # Use local default True self.assertAllEqual(layer(x, training=False), x * 0.5) self.assertAllEqual(layer(x, training=True), x) def test_activity_regularizer_string(self): class MyLayer(base_layer.Layer): pass layer = MyLayer(activity_regularizer='l2') self.assertIsInstance(layer.activity_regularizer, regularizers.L2) def test_tf_module_tracking(self): class MyModule(module.Module): def __init__(self): super(MyModule, self).__init__() self.v1 = variables.Variable(1., trainable=True, name='v1') self.v2 = variables.Variable(2., trainable=False, name='v2') def __call__(self, x): return x * self.v1 * self.v2 class MyLayer(base_layer.Layer): def __init__(self, **kwargs): super(MyLayer, self).__init__(self, **kwargs) self.my_modules = {} self.my_modules['a'] = MyModule() def call(self, x): return self.my_modules['a'](x) layer = MyLayer() self.assertLen(layer.variables, 2) self.assertLen(layer.trainable_variables, 1) self.assertLen(layer.non_trainable_variables, 1) layer.trainable = False self.assertLen(layer.variables, 2) self.assertLen(layer.trainable_variables, 0) self.assertLen(layer.non_trainable_variables, 2) class MyModel(training_lib.Model): def __init__(self): super(MyModel, self).__init__() self.my_modules = [] self.my_modules.append(MyModule()) def call(self, x): return self.my_modules[0](x) model = MyModel() self.assertLen(model.variables, 2) self.assertLen(model.trainable_variables, 1) self.assertLen(model.non_trainable_variables, 1) model.trainable = False self.assertLen(model.variables, 2) self.assertLen(model.trainable_variables, 0) self.assertLen(model.non_trainable_variables, 2) class SymbolicSupportTest(keras_parameterized.TestCase): def test_using_symbolic_tensors_with_tf_ops(self): # Single-input. x = input_layer.Input((3,)) math_ops.square(x) # Multi-inputs. x1, x2 = input_layer.Input((3,)), input_layer.Input((3,)) array_ops.concat([x1, x2], axis=1) # Mixing Keras symbolic tensors and graph tensors from the same graph works. with backend.get_graph().as_default(): x1 = input_layer.Input((3,)) x2 = input_layer.Input((3,)) math_ops.matmul(x1, x2) # Creating same op type (matmul) multiple times in the Keras graph works. x1 = input_layer.Input((3,)) x2 = input_layer.Input((3,)) math_ops.matmul(x1, x2) def test_mixing_eager_and_graph_tensors(self): with ops.Graph().as_default(): x1 = array_ops.ones((3, 3)) x2 = array_ops.ones((3, 3)) with self.assertRaisesRegex(TypeError, 'Graph tensors'): math_ops.matmul(x1, x2) def test_mixing_numpy_arrays_and_graph_tensors(self): with ops.Graph().as_default(): x1 = array_ops.ones((3, 3)) x2 = np.ones((3, 3), dtype='float32') with self.assertRaisesRegex(TypeError, 'Graph tensors'): math_ops.matmul(x1, x2) @combinations.generate(combinations.combine(mode=['graph', 'eager'])) def test_mixing_keras_symbolic_tensors_and_eager_tensors(self): x1 = input_layer.Input((3,)) x2 = array_ops.ones((3, 3)) y = math_ops.matmul(x1, x2) fn = backend.function(inputs=[x1], outputs=[y]) x_val = np.random.random((3, 3)) y_val = np.ones((3, 3)) self.assertAllClose(fn([x_val])[0], np.matmul(x_val, y_val), atol=1e-5) @combinations.generate(combinations.combine(mode=['graph', 'eager'])) def test_mixing_keras_symbolic_tensors_and_numpy_arrays(self): x1 = input_layer.Input((3,)) x2 = np.ones((3, 3), dtype='float32') y = math_ops.matmul(x1, x2) fn = backend.function(inputs=[x1], outputs=[y]) x_val = np.random.random((3, 3)) y_val = np.ones((3, 3)) self.assertAllClose(fn([x_val])[0], np.matmul(x_val, y_val), atol=1e-5) @combinations.generate(combinations.combine(mode=['graph', 'eager'])) def test_reraising_exception(self): # When layer is not dynamic, we have some pattern matching during exception # handling to detect when the user is trying to use python control flow. # When an exception is thrown but the pattern doesn't match, we want to # preserve the originating stack trace. An early implementation of this # logic lost the stack trace. We test the correct behavior here. class TypeErrorLayer(base_layer.Layer): def call(self, inputs): def easily_identifiable_name(): raise TypeError('Non-matching TypeError message.') easily_identifiable_name() inputs = input_layer.Input((3,)) try: _ = TypeErrorLayer()(inputs) except TypeError as e: if hasattr(e, 'ag_error_metadata'): self.assertIn('easily_identifiable_name', str(e)) # See ErrorMetadataBase in autograph/pyct/errors.py function_name = e.ag_error_metadata.translated_stack[-1].function_name else: tb = traceback.extract_tb(sys.exc_info()[2]) last_entry = tb[-1] function_name = last_entry[2] self.assertEqual(function_name, 'easily_identifiable_name') @combinations.generate(combinations.combine(mode=['graph', 'eager'])) def test_summaries_in_tf_function(self): if not context.executing_eagerly(): return class MyLayer(base_layer.Layer): def call(self, inputs): summary_ops_v2.scalar('mean', math_ops.reduce_mean(inputs)) return inputs tmp_dir = self.get_temp_dir() writer = summary_ops_v2.create_file_writer_v2(tmp_dir) with writer.as_default(step=1), summary_ops_v2.record_if(True): my_layer = MyLayer() x = array_ops.ones((10, 10)) def my_fn(x): return my_layer(x) _ = my_fn(x) event_file = gfile.Glob(os.path.join(tmp_dir, 'events*')) self.assertLen(event_file, 1) event_file = event_file[0] tags = set() for e in summary_iterator.summary_iterator(event_file): for val in e.summary.value: tags.add(val.tag) self.assertEqual(set(['my_layer/mean']), tags) @combinations.generate(combinations.combine(mode=['graph', 'eager'])) def test_error_when_passing_non_tensor(self): # layers that have an `input_spec` will raise an error when called on # non-tensors. This covers all built-in layers. layer = layers.Dense(3) x = object() with self.assertRaisesRegex(TypeError, r'should be tensors'): layer(x) @combinations.generate(combinations.combine(mode=['graph', 'eager'])) class NestedTrackingTest(test.TestCase): def test_nested_layer_variable_tracking(self): # Test that variables from nested sublayers are # being tracked by subclassed layers. class MyLayer(base_layer.Layer): def __init__(self): super(MyLayer, self).__init__() self.dense1 = layers.Dense(1) self.dense2 = layers.BatchNormalization() def build(self, input_shape): self.v1 = self.add_weight('v1', shape=input_shape[1:].as_list()) self.v2 = variables.Variable( name='v2', initial_value=np.zeros(input_shape[1:].as_list(), dtype='float32'), trainable=False) def call(self, inputs): x = self.dense1(inputs) + self.dense2(inputs) return x + self.v1 + self.v2 layer = MyLayer() inputs = input_layer.Input((1,)) _ = layer(inputs) self.assertEqual(len(layer.weights), 8) self.assertEqual(len(layer.trainable_weights), 5) self.assertEqual(len(layer.non_trainable_weights), 3) layer.dense1.trainable = False self.assertEqual(len(layer.weights), 8) self.assertEqual(len(layer.trainable_weights), 3) self.assertEqual(len(layer.non_trainable_weights), 5) layer.trainable = False self.assertEqual(len(layer.weights), 8) self.assertEqual(len(layer.trainable_weights), 0) self.assertEqual(len(layer.non_trainable_weights), 8) self.assertEqual( {id(v) for v in [layer.dense1, layer.dense2, layer.v1, layer.v2]}, {id(v) for _, v in layer._checkpoint_dependencies}) def test_nested_layer_updates_losses_tracking(self): # Test that updates and losses from nested sublayers are # being tracked by subclassed layers. class UpdateAndLossLayer(base_layer.Layer): def build(self, _): self.v1 = self.add_weight('v1', shape=()) def call(self, inputs): self.add_loss(math_ops.reduce_sum(inputs)) self.add_update(state_ops.assign_add(self.v1, 1)) return inputs + 1 class MyLayer(base_layer.Layer): def build(self, _): self.v1 = self.add_weight('v1', shape=()) def __init__(self): super(MyLayer, self).__init__() self.ul1 = UpdateAndLossLayer() self.ul2 = UpdateAndLossLayer() def call(self, inputs): self.add_loss(math_ops.reduce_sum(inputs)) self.add_update(state_ops.assign_add(self.v1, 1)) x = self.ul1(inputs) return self.ul2(x) layer = MyLayer() if context.executing_eagerly(): inputs = array_ops.ones((3, 1)) _ = layer(inputs) self.assertEqual(len(layer.losses), 3) self.assertLen(layer.get_losses_for(None), 3) else: inputs = input_layer.Input((1,)) _ = layer(inputs) self.assertEqual(len(layer.losses), 3) self.assertEqual(len(layer.updates), 3) self.assertLen(layer.get_losses_for(None), 3) def test_attribute_reassignment(self): l = base_layer.Layer() l.a = base_layer.Layer() l.a = [] l.a = variables.Variable(1.) l.a = base_layer.Layer() last_assignment = base_layer.Layer() l.a = last_assignment l.b = variables.Variable(1.) del l.b l.c = base_layer.Layer() del l.c l.d = last_assignment del l.d sublayers = list(l._flatten_layers(include_self=False, recursive=False)) self.assertEqual([last_assignment], sublayers) self.assertEqual([], l.trainable_weights) self.assertEqual([], l.non_trainable_weights) self.assertEqual([], l.weights) del l.a self.assertEqual([], l._self_tracked_trackables) def test_layer_class_not_tracked_as_sublayer(self): # See https://github.com/tensorflow/tensorflow/issues/27431 for details. class LayerWithClassAttribute(base_layer.Layer): def __init__(self): super(LayerWithClassAttribute, self).__init__() self.layer_fn = layers.Dense layer = LayerWithClassAttribute() self.assertEmpty(layer.variables) self.assertEmpty(layer.submodules) def test_layer_call_fn_args(self): class NonDefunLayer(base_layer.Layer): def call(self, inputs, a, mask, b=None, training=None): return inputs class DefunLayer(base_layer.Layer): @def_function.function def call(self, x, mask, a, training=None, b=None): return x nondefun_layer = NonDefunLayer() self.assertEqual(nondefun_layer._call_fn_args, ['inputs', 'a', 'mask', 'b', 'training']) defun_layer = DefunLayer() self.assertEqual(defun_layer._call_fn_args, ['x', 'mask', 'a', 'training', 'b']) def test_sequential_model(self): model = sequential.Sequential( [layers.Dense(10, input_shape=(10,)), layers.Dense(5)]) self.assertLen(model.layers, 2) self.assertLen(model.weights, 4) # Make sure a subclass model also works when it is called 'Sequential'. class Sequential(training_lib.Model): def __init__(self): super(Sequential, self).__init__() self.dense_layers = [layers.Dense(10), layers.Dense(5)] def call(self, inputs): x = inputs for d in self.dense_layers: x = d(x) return x s = Sequential() self.assertLen(s.layers, 2) self.assertLen(s.weights, 0) s(input_layer.Input((10,))) self.assertLen(s.weights, 4) @combinations.generate(combinations.combine(mode=['graph', 'eager'])) class NameScopingTest(keras_parameterized.TestCase): def test_name_scope_layer(self): x = backend.placeholder(shape=(10, 10)) layer = layers.Dense(10, name='MyName') layer(x) self.assertEqual(layer.bias.name, 'MyName/bias:0') self.assertEqual(layer.kernel.name, 'MyName/kernel:0') def test_name_scope_functional_api(self): inputs = input_layer.Input((3,)) layer = layers.Dense(10, name='MyName') _ = layer(inputs) self.assertEqual(layer.bias.name, 'MyName/bias:0') self.assertEqual(layer.kernel.name, 'MyName/kernel:0') def test_name_scope_functional_api_nested(self): class NestedLayer(base_layer.Layer): def __init__(self, name='OuterName'): super(NestedLayer, self).__init__(name=name) self.dense = layers.Dense(10, name='InnerName') def call(self, inputs): return self.dense(inputs) inputs = input_layer.Input((3,)) layer = NestedLayer() _ = layer(inputs) self.assertEqual(layer.dense.bias.name, 'OuterName/InnerName/bias:0') self.assertEqual(layer.dense.kernel.name, 'OuterName/InnerName/kernel:0') def test_name_scope_sublayer(self): class NameScopeTracker(base_layer.Layer): def call(self, inputs): self.active_name_scope = ops.get_name_scope() return inputs x = backend.placeholder(shape=(10, 10)) sublayer = NameScopeTracker(name='Sublayer') layer = layers.Dense(10, activation=sublayer, name='MyName2') layer(x) self.assertEqual(layer.bias.name, 'MyName2/bias:0') self.assertEqual(layer.kernel.name, 'MyName2/kernel:0') self.assertEqual(sublayer.active_name_scope, 'MyName2/Sublayer') def test_name_scope_tf_tensor(self): x = ops.convert_to_tensor_v2_with_dispatch(np.ones((10, 10))) layer = layers.Dense( 10, activation=layers.ReLU(name='MyAct'), name='MyName3') layer(x) self.assertEqual(layer.bias.name, 'MyName3/bias:0') self.assertEqual(layer.kernel.name, 'MyName3/kernel:0') @combinations.generate(combinations.keras_mode_combinations(mode=['eager'])) class AutographControlFlowTest(keras_parameterized.TestCase): def test_disabling_in_context_is_matched(self): test_obj = self class MyLayer(base_layer.Layer): def call(self, inputs, training=None): with test_obj.assertRaisesRegex(TypeError, 'Tensor.*as.*bool'): if constant_op.constant(False): return inputs * 1. return inputs * 0. @def_function.function(autograph=False) def test_fn(): return MyLayer()(constant_op.constant([[1., 2., 3.]])) test_fn() def test_if_training_pattern_output(self): class MyLayer(base_layer.Layer): def call(self, inputs, training=None): if training: return inputs * 1. return inputs * 0. inputs = input_layer.Input((3,)) outputs = MyLayer()(inputs) model = training_lib.Model(inputs, outputs) model.compile( 'sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly()) train_loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3))) self.assertEqual(train_loss, 0.) test_loss = model.test_on_batch(np.ones((2, 3)), np.ones((2, 3))) self.assertEqual(test_loss, 1.) def test_if_training_pattern_loss(self): class MyLayer(base_layer.Layer): def call(self, inputs, training=None): if training: loss = math_ops.reduce_sum(inputs) else: loss = 0. self.add_loss(loss) return inputs inputs = input_layer.Input((3,)) outputs = MyLayer()(inputs) model = training_lib.Model(inputs, outputs) model.compile( 'sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly()) train_loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3))) self.assertEqual(train_loss, 2 * 3) test_loss = model.test_on_batch(np.ones((2, 3)), np.ones((2, 3))) self.assertEqual(test_loss, 0) def test_if_training_pattern_metric(self): class MyLayer(base_layer.Layer): def call(self, inputs, training=None): if training: metric = math_ops.reduce_sum(inputs) else: metric = 0. self.add_metric(metric, name='my_metric', aggregation='mean') return inputs inputs = input_layer.Input((3,)) outputs = MyLayer()(inputs) model = training_lib.Model(inputs, outputs) model.compile( 'sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly()) for _ in range(3): _, train_metric = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3))) self.assertEqual(train_metric, 2 * 3) _, test_metric = model.test_on_batch(np.ones((2, 3)), np.ones((2, 3))) self.assertEqual(test_metric, 0) def test_if_training_pattern_update(self): class MyLayer(base_layer.Layer): def build(self, input_shape): self.counter = self.add_weight( shape=(), trainable=False, initializer='zeros') def call(self, inputs, training=None): if training: increment = 1. else: increment = 0. self.counter.assign_add(increment) return inputs inputs = input_layer.Input((3,)) layer = MyLayer() outputs = layer(inputs) model = training_lib.Model(inputs, outputs) model.compile( 'sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly()) model.train_on_batch(np.ones((2, 3)), np.ones((2, 3))) self.assertEqual(backend.get_value(layer.counter), 1.) def test_conditional_losses_in_call(self): class MyLayer(base_layer.Layer): def __init__(self): super(MyLayer, self).__init__(dynamic=testing_utils.should_run_eagerly()) def call(self, inputs, training=None): if training: self.add_loss(math_ops.reduce_sum(inputs)) return inputs def compute_output_shape(self, input_shape): return input_shape inputs = input_layer.Input((3,)) layer = MyLayer() outputs = layer(inputs) model = training_lib.Model(inputs, outputs) model.compile('sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly()) loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3))) self.assertEqual(loss, 2 * 3) def test_conditional_callable_losses(self): model = sequential.Sequential([ layers.Dense( 1, kernel_regularizer=regularizers.l2(1e-4), input_shape=(1,)) ]) model._run_eagerly = testing_utils.should_run_eagerly() def assert_graph(t): if not context.executing_eagerly(): self.assertEqual(t.graph, ops.get_default_graph()) @def_function.function def get_losses(t): if t < 0: return math_ops.reduce_sum(model.losses) * t else: return math_ops.reduce_sum(model.losses) assert_graph(get_losses(constant_op.constant(2.))) assert_graph(get_losses(constant_op.constant(0.5))) def test_conditional_metrics_in_call(self): class MyLayer(base_layer.Layer): def __init__(self): super(MyLayer, self).__init__(dynamic=testing_utils.should_run_eagerly()) def call(self, inputs, training=None): if training: self.add_metric(math_ops.reduce_sum(inputs), name='sum', aggregation='mean') return inputs def compute_output_shape(self, input_shape): return input_shape inputs = input_layer.Input((3,)) layer = MyLayer() outputs = layer(inputs) model = training_lib.Model(inputs, outputs) model.compile('sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly()) history = model.fit(np.ones((2, 3)), np.ones((2, 3))) self.assertEqual(history.history['sum'][-1], 2 * 3) def test_conditional_activity_regularizer_in_call(self): class TestModel(training_lib.Model): def __init__(self): super(TestModel, self).__init__( name='test_model', dynamic=testing_utils.should_run_eagerly()) self.layer = layers.Dense(2, activity_regularizer='l2') def call(self, x, training=None): if math_ops.greater(math_ops.reduce_sum(x), 0.0): return self.layer(x) else: return self.layer(x) model = TestModel() model.compile( loss='mse', optimizer='sgd', run_eagerly=testing_utils.should_run_eagerly()) x = np.ones(shape=(10, 1)) y = np.ones(shape=(10, 2)) if testing_utils.should_run_eagerly(): model.fit(x, y, epochs=2, batch_size=5) else: with self.assertRaisesRegex(ValueError, 'ActivityRegularizer'): model.fit(x, y, epochs=2, batch_size=5) def test_conditional_activity_regularizer_with_wrappers_in_call(self): class TestModel(training_lib.Model): def __init__(self): super(TestModel, self).__init__( name='test_model', dynamic=testing_utils.should_run_eagerly()) self.layer = layers.TimeDistributed( layers.Dense(2, activity_regularizer='l2'), input_shape=(3, 4)) def call(self, x, training=None): if math_ops.greater(math_ops.reduce_sum(x), 0.0): return self.layer(x) else: return self.layer(x) model = TestModel() model.compile( loss='mse', optimizer='sgd', run_eagerly=testing_utils.should_run_eagerly()) x = np.ones(shape=(10, 3, 4)) y = np.ones(shape=(10, 3, 2)) if testing_utils.should_run_eagerly(): model.fit(x, y, epochs=2, batch_size=5) else: with self.assertRaisesRegex(ValueError, 'ActivityRegularizer'): model.fit(x, y, epochs=2, batch_size=5) class AddLayer(base_layer.Layer): """A layer which adds its input to a variable. Useful for testing a layer with a variable """ def build(self, _): self.v = self.add_weight('v', (), initializer='ones') self.built = True def call(self, inputs): return inputs + self.v class IdentityLayer(base_layer.Layer): """A layer that returns its input. Useful for testing a layer without a variable. """ def call(self, inputs): return inputs @combinations.generate(combinations.combine(mode=['graph', 'eager'])) class DTypeTest(keras_parameterized.TestCase): # This class only have tests relating to layer.dtype. Tests for dtype policies # are in mixed_precision/keras_test.py # TODO(reedwm): Maybe have a separate test file for input casting tests. def _const(self, dtype): return array_ops.constant(1, dtype=dtype) @testing_utils.enable_v2_dtype_behavior def test_dtype_defaults_to_floatx(self): layer = AddLayer() self.assertEqual(layer.dtype, 'float32') layer(self._const('float64')) self.assertEqual(layer.dtype, 'float32') # dtype should not change try: backend.set_floatx('float64') layer = AddLayer() self.assertEqual(layer.dtype, 'float64') finally: backend.set_floatx('float32') @testing_utils.enable_v2_dtype_behavior def test_passing_dtype_to_constructor(self): layer = IdentityLayer(dtype='float64') layer(self._const('float32')) self.assertEqual(layer.dtype, 'float64') layer = IdentityLayer(dtype='int32') layer(self._const('float32')) self.assertEqual(layer.dtype, 'int32') layer = IdentityLayer(dtype=dtypes.float64) layer(self._const('float32')) self.assertEqual(layer.dtype, 'float64') @testing_utils.enable_v2_dtype_behavior def input_cast_to_dtype(self): layer = AddLayer() # Input should be cast to layer.dtype, so output should also be layer.dtype self.assertEqual(layer(self._const('float64')).dtype, 'float32') layer = AddLayer(dtype='float64') self.assertEqual(layer(self._const('float32')).dtype, 'float64') # Test inputs are not casted if layer.dtype is not floating-point layer = IdentityLayer(dtype='int32') self.assertEqual(layer(self._const('float64')).dtype, 'float64') # Test inputs are not casted if the inputs are not floating-point layer = IdentityLayer(dtype='float32') self.assertEqual(layer(self._const('int32')).dtype, 'int32') # Test Numpy arrays are casted layer = IdentityLayer(dtype='float64') self.assertEqual(layer(np.array(1, dtype='float32')).dtype, 'float64') # Test Python floats are casted layer = IdentityLayer(dtype='float64') self.assertEqual(layer(1.).dtype, 'float64') @testing_utils.enable_v2_dtype_behavior def multiple_inputs_cast_to_dtype(self): class MultiIdentityLayer(base_layer.Layer): def call(self, inputs): return [array_ops.identity(x) for x in inputs] # Testing layer with default dtype of float32 layer = MultiIdentityLayer() x, y = layer([self._const('float16'), self._const('float32')]) self.assertEqual(x.dtype, 'float32') self.assertEqual(y.dtype, 'float32') # Test passing dtype to the constructor layer = MultiIdentityLayer(dtype='float64') x, y = layer([self._const('float16'), self._const('float32')]) self.assertEqual(x.dtype, 'float64') self.assertEqual(y.dtype, 'float64') # Test several non-floating point types layer = MultiIdentityLayer(dtype='float64') x, y, z, w = layer([self._const('float16'), self._const('bool'), self._const('float64'), self._constant('complex64')]) self.assertEqual(x.dtype, 'float64') self.assertEqual(y.dtype, 'bool') self.assertEqual(z.dtype, 'float64') self.assertEqual(w.dtype, 'complex64') @testing_utils.enable_v2_dtype_behavior def test_extra_args_and_kwargs_not_casted(self): class IdentityLayerWithArgs(base_layer.Layer): def call(self, inputs, *args, **kwargs): kwargs.pop('training', None) return nest.flatten([inputs, args, kwargs]) layer = IdentityLayerWithArgs(dtype='float64') x, y, z = layer(self._const('float16'), self._const('float16'), kwarg=self._const('float16')) self.assertEqual(x.dtype, 'float64') self.assertEqual(y.dtype, 'float16') self.assertEqual(z.dtype, 'float16') @testing_utils.enable_v2_dtype_behavior def test_layer_without_autocast(self): class IdentityLayerWithoutAutocast(IdentityLayer): def __init__(self, *args, **kwargs): kwargs['autocast'] = False super(IdentityLayerWithoutAutocast, self).__init__(*args, **kwargs) layer = IdentityLayerWithoutAutocast(dtype='float64') self.assertEqual(layer(self._const('float32')).dtype, 'float32') @testing_utils.enable_v2_dtype_behavior def test_compute_output_signature(self): class IdentityLayerWithOutputShape(IdentityLayer): def compute_output_shape(self, input_shape): return input_shape layer = IdentityLayerWithOutputShape(dtype='float64') output_signature = layer.compute_output_signature( tensor_spec.TensorSpec(shape=(), dtype='float32')) self.assertEqual(output_signature.shape, ()) self.assertEqual(output_signature.dtype, 'float64') @testing_utils.enable_v2_dtype_behavior def test_composite_tensors_input_casting(self): sparse = sparse_tensor.SparseTensor( indices=array_ops.constant([[0, 1], [2, 3]], dtype='int64'), values=array_ops.constant([0., 1.], dtype='float32'), dense_shape=array_ops.constant([4, 4], dtype='int64')) ragged = ragged_tensor.RaggedTensor.from_row_splits( values=array_ops.constant([1., 2., 3.], dtype='float32'), row_splits=array_ops.constant([0, 2, 2, 3], dtype='int64')) layer = IdentityLayer(dtype='float16') for x in sparse, ragged: self.assertEqual(x.dtype, 'float32') y = layer(x) self.assertEqual(y.dtype, 'float16') self.assertEqual(type(x), type(y)) @testing_utils.enable_v2_dtype_behavior def test_passing_non_tensor(self): layer = IdentityLayer() x = object() y = layer(x) # Layer should not cast 'x', as it's not a tensor self.assertIs(x, y) @testing_utils.disable_v2_dtype_behavior def test_v1_behavior(self): # Test dtype defaults to None and inferred from input layer = IdentityLayer() self.assertIsNone(layer.dtype) layer(self._const('float64')) self.assertEqual(layer.dtype, 'float64') # Test layer does not cast to dtype self.assertEqual(layer(self._const('float32')).dtype, 'float32') if __name__ == '__main__': ops.enable_eager_execution() test.main()