1# Lint as: python3 2# Copyright 2020 The TensorFlow Authors. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# ============================================================================== 16"""Tests for ClusterCoordinator and Keras models.""" 17 18from __future__ import absolute_import 19from __future__ import division 20from __future__ import print_function 21 22import random 23import tempfile 24from absl import logging 25from absl.testing import parameterized 26import numpy as np 27 28from tensorflow.python import keras 29from tensorflow.python.compat import v2_compat 30from tensorflow.python.data.ops import dataset_ops 31from tensorflow.python.distribute import combinations 32from tensorflow.python.distribute import multi_worker_test_base 33from tensorflow.python.distribute import parameter_server_strategy_v2 34from tensorflow.python.distribute.cluster_resolver import SimpleClusterResolver 35from tensorflow.python.distribute.coordinator import cluster_coordinator as coordinator_lib 36from tensorflow.python.eager import backprop 37from tensorflow.python.eager import def_function 38from tensorflow.python.framework import constant_op 39from tensorflow.python.framework import dtypes 40from tensorflow.python.framework import tensor_spec 41from tensorflow.python.keras import callbacks as callbacks_lib 42from tensorflow.python.keras.engine import sequential 43from tensorflow.python.keras.layers import core as core_layers 44from tensorflow.python.keras.layers.preprocessing import string_lookup 45from tensorflow.python.keras.optimizer_v2 import gradient_descent 46from tensorflow.python.keras.optimizer_v2 import rmsprop 47from tensorflow.python.keras.utils import dataset_creator 48from tensorflow.python.keras.utils import losses_utils 49from tensorflow.python.ops import array_ops 50from tensorflow.python.ops import math_ops 51from tensorflow.python.ops import nn 52from tensorflow.python.ops import random_ops 53from tensorflow.python.platform import test 54from tensorflow.python.training.server_lib import ClusterSpec 55 56 57# These vocabularies usually come from TFT or a Beam pipeline. 58FEATURE_VOCAB = [ 59 "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", 60 "wonder_woman" 61] 62LABEL_VOCAB = ["yes", "no"] 63 64 65def make_cluster(num_workers, num_ps): 66 cluster_def = multi_worker_test_base.create_in_process_cluster( 67 num_workers=num_workers, num_ps=num_ps, rpc_layer="grpc") 68 cluster_def["chief"] = [ 69 "localhost:%d" % multi_worker_test_base.pick_unused_port() 70 ] 71 return SimpleClusterResolver(ClusterSpec(cluster_def), rpc_layer="grpc") 72 73 74def make_coordinator(num_workers, num_ps): 75 return coordinator_lib.ClusterCoordinator( 76 parameter_server_strategy_v2.ParameterServerStrategyV2( 77 make_cluster(num_workers, num_ps))) 78 79 80# TODO(yuefengz): move this to keras/integration_tests. 81class KPLTest(test.TestCase, parameterized.TestCase): 82 83 @classmethod 84 def setUpClass(cls): 85 super(KPLTest, cls).setUpClass() 86 cls.coordinator = make_coordinator(num_workers=3, num_ps=2) 87 88 def define_kpls_for_training(self, use_adapt): 89 # Define KPLs under strategy's scope. Right now, if they have look up 90 # tables, they will be created on the client. Their variables will be 91 # created on PS. Ideally they should be cached on each worker since they 92 # will not be changed in a training step. 93 if use_adapt: 94 feature_lookup_layer = string_lookup.StringLookup(num_oov_indices=1) 95 feature_lookup_layer.adapt(FEATURE_VOCAB) 96 label_lookup_layer = string_lookup.StringLookup( 97 num_oov_indices=0, mask_token=None) 98 label_lookup_layer.adapt(LABEL_VOCAB) 99 else: 100 # Do vocab shuffling. 101 shuffled_vocab = FEATURE_VOCAB.copy() 102 random.shuffle(shuffled_vocab) 103 feature_lookup_layer = string_lookup.StringLookup( 104 vocabulary=shuffled_vocab, num_oov_indices=1) 105 label_lookup_layer = string_lookup.StringLookup( 106 vocabulary=LABEL_VOCAB, num_oov_indices=0, mask_token=None) 107 108 raw_feature_input = keras.layers.Input( 109 shape=(3,), dtype=dtypes.string, name="feature", ragged=True) 110 feature_id_input = feature_lookup_layer(raw_feature_input) 111 112 # Model creates variables as well. 113 feature_ps = keras.Model({"features": raw_feature_input}, feature_id_input) 114 115 raw_label_input = keras.layers.Input( 116 shape=(1,), dtype=dtypes.string, name="label") 117 label_id_input = label_lookup_layer(raw_label_input) 118 label_ps = keras.Model({"label": raw_label_input}, label_id_input) 119 120 return feature_ps, label_ps 121 122 def define_reverse_lookup_layer(self): 123 # Only needed for serving. 124 label_inverse_lookup_layer = string_lookup.StringLookup( 125 num_oov_indices=1, mask_token=None, vocabulary=LABEL_VOCAB, invert=True) 126 return label_inverse_lookup_layer 127 128 @combinations.generate( 129 combinations.combine(mode=["eager"], use_adapt=[True, False])) 130 def testTrainAndServe(self, use_adapt): 131 132 with self.coordinator.strategy.scope(): 133 134 feature_ps, label_ps = self.define_kpls_for_training(use_adapt) 135 136 def dataset_fn(): 137 138 def feature_and_label_gen(): 139 while True: 140 features = random.sample(FEATURE_VOCAB, 3) 141 label = ["yes"] if "avenger" in features else ["no"] 142 yield {"features": features, "label": label} 143 144 # The dataset will be created on the coordinator. 145 raw_dataset = dataset_ops.Dataset.from_generator( 146 feature_and_label_gen, 147 output_signature={ 148 "features": tensor_spec.TensorSpec([3], dtypes.string), 149 "label": tensor_spec.TensorSpec([1], dtypes.string) 150 }).shuffle(100).batch(32) 151 152 train_dataset = raw_dataset.map(lambda x: ( # pylint: disable=g-long-lambda 153 { 154 "features": feature_ps(x["features"]) 155 }, label_ps(x["label"]))) 156 return train_dataset 157 158 # Create the model. The input needs to be compatible with KPLs. 159 model_input = keras.layers.Input( 160 shape=(3,), dtype=dtypes.int64, name="model_input") 161 162 # input_dim includes a mask token and an oov token. 163 emb_output = keras.layers.Embedding( 164 input_dim=len(FEATURE_VOCAB) + 2, output_dim=20)( 165 model_input) 166 emb_output = math_ops.reduce_mean(emb_output, axis=1) 167 dense_output = keras.layers.Dense( 168 units=1, activation="sigmoid")( 169 emb_output) 170 model = keras.Model({"features": model_input}, dense_output) 171 172 optimizer = rmsprop.RMSprop(learning_rate=0.1) 173 accuracy = keras.metrics.Accuracy() 174 175 @def_function.function 176 def worker_fn(iterator): 177 178 def replica_fn(iterator): 179 batch_data, labels = next(iterator) 180 with backprop.GradientTape() as tape: 181 pred = model(batch_data, training=True) 182 loss = nn.compute_average_loss( 183 keras.losses.BinaryCrossentropy( 184 reduction=losses_utils.ReductionV2.NONE)(labels, pred)) 185 gradients = tape.gradient(loss, model.trainable_variables) 186 187 optimizer.apply_gradients(zip(gradients, model.trainable_variables)) 188 189 actual_pred = math_ops.cast(math_ops.greater(pred, 0.5), dtypes.int64) 190 accuracy.update_state(labels, actual_pred) 191 192 self.coordinator.strategy.run(replica_fn, args=(iterator,)) 193 194 distributed_dataset = self.coordinator.create_per_worker_dataset(dataset_fn) 195 distributed_iterator = iter(distributed_dataset) 196 for _ in range(4): 197 accuracy.reset_states() 198 for _ in range(7): 199 self.coordinator.schedule(worker_fn, args=(distributed_iterator,)) 200 self.coordinator.join() 201 self.assertGreater(accuracy.result().numpy(), 0.5) 202 203 # Create a saved model. 204 model.feature_ps = feature_ps 205 model.label_ps = label_ps 206 model.label_inverse_lookup_layer = self.define_reverse_lookup_layer() 207 208 def create_serving_signature(model): 209 210 @def_function.function 211 def serve_fn(raw_features): 212 raw_features = array_ops.expand_dims(raw_features, axis=0) 213 transformed_features = model.feature_ps(raw_features) 214 outputs = model(transformed_features) 215 outputs = array_ops.squeeze(outputs, axis=0) 216 outputs = math_ops.cast(math_ops.greater(outputs, 0.5), dtypes.int64) 217 decoded_outputs = model.label_inverse_lookup_layer(outputs) 218 return array_ops.squeeze(decoded_outputs, axis=0) 219 220 # serving does NOT have batch dimension 221 return serve_fn.get_concrete_function( 222 tensor_spec.TensorSpec( 223 shape=(3), dtype=dtypes.string, name="example")) 224 225 serving_fn = create_serving_signature(model) 226 227 saved_model_dir = tempfile.mkdtemp(dir=self.get_temp_dir()) 228 model.save(saved_model_dir, signatures={"serving_default": serving_fn}) 229 230 # Test the saved_model. 231 loaded_serving_fn = keras.saving.save.load_model( 232 saved_model_dir).signatures["serving_default"] 233 234 # check the result w/ and w/o avenger. 235 prediction0 = loaded_serving_fn( 236 constant_op.constant(["avenger", "ironman", "avenger"]))["output_0"] 237 self.assertIn(prediction0, ("yes", "no")) 238 239 prediction1 = loaded_serving_fn( 240 constant_op.constant(["ironman", "ironman", "unkonwn"]))["output_0"] 241 self.assertIn(prediction1, ("yes", "no")) 242 243 244class ModelFitTest(test.TestCase, parameterized.TestCase): 245 246 def _model_compile(self, steps_per_execution=1, run_eagerly=False): 247 248 class ResultAssertingCallback(callbacks_lib.Callback): 249 250 def __init__(self): 251 self._prev_epoch = -1 252 253 def on_epoch_end(self, epoch, logs=None): 254 logging.info("testModelFit: epoch=%r, logs=%r", epoch, logs) 255 if epoch <= self._prev_epoch: 256 raise RuntimeError("Epoch is supposed to be larger than previous.") 257 self._prev_epoch = epoch 258 if (logs.get("loss", None) is None or 259 not isinstance(logs["loss"], np.floating)): 260 raise RuntimeError("loss is supposed to be in the logs and float.") 261 262 strategy = parameter_server_strategy_v2.ParameterServerStrategyV2( 263 make_cluster(3, 2)) 264 with strategy.scope(): 265 model = sequential.Sequential([core_layers.Dense(10)]) 266 model.compile( 267 gradient_descent.SGD(), 268 loss="mse", 269 steps_per_execution=steps_per_execution, 270 run_eagerly=run_eagerly) 271 return model, [ResultAssertingCallback()] 272 273 def _model_fit(self, 274 steps_per_execution=1, 275 validation_data=None, 276 x=None, 277 steps_per_epoch=10, 278 run_eagerly=False): 279 model, callbacks = self._model_compile(steps_per_execution, run_eagerly) 280 281 def dataset_fn(input_context): 282 del input_context 283 x = random_ops.random_uniform((10, 10)) 284 y = random_ops.random_uniform((10,)) 285 return dataset_ops.DatasetV2.from_tensor_slices( 286 (x, y)).shuffle(10).repeat().batch(2) 287 288 x = x or dataset_creator.DatasetCreator(dataset_fn) 289 290 model.fit( 291 x, 292 epochs=10, 293 steps_per_epoch=steps_per_epoch, 294 verbose=0, 295 callbacks=callbacks, 296 validation_data=validation_data) 297 return model 298 299 @combinations.generate(combinations.combine(mode=["eager"])) 300 def testModelFit(self): 301 model = self._model_fit() 302 self.assertEqual(model.optimizer.iterations, 100) 303 304 @combinations.generate(combinations.combine(mode=["eager"])) 305 def testModelFitWithStepsPerExecution(self): 306 model = self._model_fit(steps_per_execution=10) 307 self.assertEqual(model.optimizer.iterations, 100) 308 309 @combinations.generate(combinations.combine(mode=["eager"])) 310 def testModelFitWithNoStepsPerEpoch(self): 311 with self.assertRaisesRegex( 312 ValueError, "`steps_per_epoch` must be specified with " 313 "`ParameterServerStrategy`."): 314 self._model_fit(steps_per_epoch=None) 315 316 @combinations.generate(combinations.combine(mode=["eager"])) 317 def testModelFitWithRunEagerly(self): 318 with self.assertRaisesRegex( 319 ValueError, "When using `Model` with `ParameterServerStrategy`, " 320 "`run_eagerly` is not supported."): 321 self._model_fit(run_eagerly=True) 322 323 @combinations.generate(combinations.combine(mode=["eager"])) 324 def testModelFitWithValidationData(self): 325 with self.assertRaisesRegex( 326 NotImplementedError, "Evaluation in `model.fit` with " 327 "`ParameterServerStrategy` is not yet supported."): 328 self._model_fit( 329 validation_data=dataset_ops.DatasetV2.from_tensor_slices([1, 1])) 330 331 @combinations.generate(combinations.combine(mode=["eager"])) 332 def testModelFitWithDatasetInstance(self): 333 with self.assertRaisesRegex( 334 NotImplementedError, "Only `DatasetCreator` input is supported in " 335 "`ParameterServerStrategy` at this time."): 336 self._model_fit(x=dataset_ops.DatasetV2.from_tensor_slices([1, 1])) 337 338 @combinations.generate(combinations.combine(mode=["eager"])) 339 def testModelEvaluate(self): 340 model, _ = self._model_compile() 341 with self.assertRaisesRegex( 342 NotImplementedError, "`model.evaluate` is not yet supported with " 343 "`ParameterServerStrategy`."): 344 model.evaluate(x=dataset_ops.DatasetV2.from_tensor_slices([1, 1])) 345 346 @combinations.generate(combinations.combine(mode=["eager"])) 347 def testModelPredict(self): 348 model, _ = self._model_compile() 349 with self.assertRaisesRegex( 350 NotImplementedError, "`model.predict` is not yet supported with " 351 "`ParameterServerStrategy`."): 352 model.predict(x=dataset_ops.DatasetV2.from_tensor_slices([1, 1])) 353 354 @combinations.generate(combinations.combine(mode=["eager"])) 355 def testClusterCoordinatorSingleInstance(self): 356 model = self._model_fit() 357 strategy = model.distribute_strategy 358 self.assertIs(strategy._cluster_coordinator, 359 coordinator_lib.ClusterCoordinator(strategy)) 360 361 362if __name__ == "__main__": 363 v2_compat.enable_v2_behavior() 364 test.main() 365