• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python3
2# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15# ==============================================================================
16"""Tests for ClusterCoordinator and Keras models."""
17
18from __future__ import absolute_import
19from __future__ import division
20from __future__ import print_function
21
22import random
23import tempfile
24from absl import logging
25from absl.testing import parameterized
26import numpy as np
27
28from tensorflow.python import keras
29from tensorflow.python.compat import v2_compat
30from tensorflow.python.data.ops import dataset_ops
31from tensorflow.python.distribute import combinations
32from tensorflow.python.distribute import multi_worker_test_base
33from tensorflow.python.distribute import parameter_server_strategy_v2
34from tensorflow.python.distribute.cluster_resolver import SimpleClusterResolver
35from tensorflow.python.distribute.coordinator import cluster_coordinator as coordinator_lib
36from tensorflow.python.eager import backprop
37from tensorflow.python.eager import def_function
38from tensorflow.python.framework import constant_op
39from tensorflow.python.framework import dtypes
40from tensorflow.python.framework import tensor_spec
41from tensorflow.python.keras import callbacks as callbacks_lib
42from tensorflow.python.keras.engine import sequential
43from tensorflow.python.keras.layers import core as core_layers
44from tensorflow.python.keras.layers.preprocessing import string_lookup
45from tensorflow.python.keras.optimizer_v2 import gradient_descent
46from tensorflow.python.keras.optimizer_v2 import rmsprop
47from tensorflow.python.keras.utils import dataset_creator
48from tensorflow.python.keras.utils import losses_utils
49from tensorflow.python.ops import array_ops
50from tensorflow.python.ops import math_ops
51from tensorflow.python.ops import nn
52from tensorflow.python.ops import random_ops
53from tensorflow.python.platform import test
54from tensorflow.python.training.server_lib import ClusterSpec
55
56
57# These vocabularies usually come from TFT or a Beam pipeline.
58FEATURE_VOCAB = [
59    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
60    "wonder_woman"
61]
62LABEL_VOCAB = ["yes", "no"]
63
64
65def make_cluster(num_workers, num_ps):
66  cluster_def = multi_worker_test_base.create_in_process_cluster(
67      num_workers=num_workers, num_ps=num_ps, rpc_layer="grpc")
68  cluster_def["chief"] = [
69      "localhost:%d" % multi_worker_test_base.pick_unused_port()
70  ]
71  return SimpleClusterResolver(ClusterSpec(cluster_def), rpc_layer="grpc")
72
73
74def make_coordinator(num_workers, num_ps):
75  return coordinator_lib.ClusterCoordinator(
76      parameter_server_strategy_v2.ParameterServerStrategyV2(
77          make_cluster(num_workers, num_ps)))
78
79
80# TODO(yuefengz): move this to keras/integration_tests.
81class KPLTest(test.TestCase, parameterized.TestCase):
82
83  @classmethod
84  def setUpClass(cls):
85    super(KPLTest, cls).setUpClass()
86    cls.coordinator = make_coordinator(num_workers=3, num_ps=2)
87
88  def define_kpls_for_training(self, use_adapt):
89    # Define KPLs under strategy's scope. Right now, if they have look up
90    # tables, they will be created on the client. Their variables will be
91    # created on PS. Ideally they should be cached on each worker since they
92    # will not be changed in a training step.
93    if use_adapt:
94      feature_lookup_layer = string_lookup.StringLookup(num_oov_indices=1)
95      feature_lookup_layer.adapt(FEATURE_VOCAB)
96      label_lookup_layer = string_lookup.StringLookup(
97          num_oov_indices=0, mask_token=None)
98      label_lookup_layer.adapt(LABEL_VOCAB)
99    else:
100      # Do vocab shuffling.
101      shuffled_vocab = FEATURE_VOCAB.copy()
102      random.shuffle(shuffled_vocab)
103      feature_lookup_layer = string_lookup.StringLookup(
104          vocabulary=shuffled_vocab, num_oov_indices=1)
105      label_lookup_layer = string_lookup.StringLookup(
106          vocabulary=LABEL_VOCAB, num_oov_indices=0, mask_token=None)
107
108    raw_feature_input = keras.layers.Input(
109        shape=(3,), dtype=dtypes.string, name="feature", ragged=True)
110    feature_id_input = feature_lookup_layer(raw_feature_input)
111
112    # Model creates variables as well.
113    feature_ps = keras.Model({"features": raw_feature_input}, feature_id_input)
114
115    raw_label_input = keras.layers.Input(
116        shape=(1,), dtype=dtypes.string, name="label")
117    label_id_input = label_lookup_layer(raw_label_input)
118    label_ps = keras.Model({"label": raw_label_input}, label_id_input)
119
120    return feature_ps, label_ps
121
122  def define_reverse_lookup_layer(self):
123    # Only needed for serving.
124    label_inverse_lookup_layer = string_lookup.StringLookup(
125        num_oov_indices=1, mask_token=None, vocabulary=LABEL_VOCAB, invert=True)
126    return label_inverse_lookup_layer
127
128  @combinations.generate(
129      combinations.combine(mode=["eager"], use_adapt=[True, False]))
130  def testTrainAndServe(self, use_adapt):
131
132    with self.coordinator.strategy.scope():
133
134      feature_ps, label_ps = self.define_kpls_for_training(use_adapt)
135
136      def dataset_fn():
137
138        def feature_and_label_gen():
139          while True:
140            features = random.sample(FEATURE_VOCAB, 3)
141            label = ["yes"] if "avenger" in features else ["no"]
142            yield {"features": features, "label": label}
143
144        # The dataset will be created on the coordinator.
145        raw_dataset = dataset_ops.Dataset.from_generator(
146            feature_and_label_gen,
147            output_signature={
148                "features": tensor_spec.TensorSpec([3], dtypes.string),
149                "label": tensor_spec.TensorSpec([1], dtypes.string)
150            }).shuffle(100).batch(32)
151
152        train_dataset = raw_dataset.map(lambda x: (  # pylint: disable=g-long-lambda
153            {
154                "features": feature_ps(x["features"])
155            }, label_ps(x["label"])))
156        return train_dataset
157
158      # Create the model. The input needs to be compatible with KPLs.
159      model_input = keras.layers.Input(
160          shape=(3,), dtype=dtypes.int64, name="model_input")
161
162      # input_dim includes a mask token and an oov token.
163      emb_output = keras.layers.Embedding(
164          input_dim=len(FEATURE_VOCAB) + 2, output_dim=20)(
165              model_input)
166      emb_output = math_ops.reduce_mean(emb_output, axis=1)
167      dense_output = keras.layers.Dense(
168          units=1, activation="sigmoid")(
169              emb_output)
170      model = keras.Model({"features": model_input}, dense_output)
171
172      optimizer = rmsprop.RMSprop(learning_rate=0.1)
173      accuracy = keras.metrics.Accuracy()
174
175    @def_function.function
176    def worker_fn(iterator):
177
178      def replica_fn(iterator):
179        batch_data, labels = next(iterator)
180        with backprop.GradientTape() as tape:
181          pred = model(batch_data, training=True)
182          loss = nn.compute_average_loss(
183              keras.losses.BinaryCrossentropy(
184                  reduction=losses_utils.ReductionV2.NONE)(labels, pred))
185          gradients = tape.gradient(loss, model.trainable_variables)
186
187        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
188
189        actual_pred = math_ops.cast(math_ops.greater(pred, 0.5), dtypes.int64)
190        accuracy.update_state(labels, actual_pred)
191
192      self.coordinator.strategy.run(replica_fn, args=(iterator,))
193
194    distributed_dataset = self.coordinator.create_per_worker_dataset(dataset_fn)
195    distributed_iterator = iter(distributed_dataset)
196    for _ in range(4):
197      accuracy.reset_states()
198      for _ in range(7):
199        self.coordinator.schedule(worker_fn, args=(distributed_iterator,))
200      self.coordinator.join()
201    self.assertGreater(accuracy.result().numpy(), 0.5)
202
203    # Create a saved model.
204    model.feature_ps = feature_ps
205    model.label_ps = label_ps
206    model.label_inverse_lookup_layer = self.define_reverse_lookup_layer()
207
208    def create_serving_signature(model):
209
210      @def_function.function
211      def serve_fn(raw_features):
212        raw_features = array_ops.expand_dims(raw_features, axis=0)
213        transformed_features = model.feature_ps(raw_features)
214        outputs = model(transformed_features)
215        outputs = array_ops.squeeze(outputs, axis=0)
216        outputs = math_ops.cast(math_ops.greater(outputs, 0.5), dtypes.int64)
217        decoded_outputs = model.label_inverse_lookup_layer(outputs)
218        return array_ops.squeeze(decoded_outputs, axis=0)
219
220      # serving does NOT have batch dimension
221      return serve_fn.get_concrete_function(
222          tensor_spec.TensorSpec(
223              shape=(3), dtype=dtypes.string, name="example"))
224
225    serving_fn = create_serving_signature(model)
226
227    saved_model_dir = tempfile.mkdtemp(dir=self.get_temp_dir())
228    model.save(saved_model_dir, signatures={"serving_default": serving_fn})
229
230    # Test the saved_model.
231    loaded_serving_fn = keras.saving.save.load_model(
232        saved_model_dir).signatures["serving_default"]
233
234    # check the result w/ and w/o avenger.
235    prediction0 = loaded_serving_fn(
236        constant_op.constant(["avenger", "ironman", "avenger"]))["output_0"]
237    self.assertIn(prediction0, ("yes", "no"))
238
239    prediction1 = loaded_serving_fn(
240        constant_op.constant(["ironman", "ironman", "unkonwn"]))["output_0"]
241    self.assertIn(prediction1, ("yes", "no"))
242
243
244class ModelFitTest(test.TestCase, parameterized.TestCase):
245
246  def _model_compile(self, steps_per_execution=1, run_eagerly=False):
247
248    class ResultAssertingCallback(callbacks_lib.Callback):
249
250      def __init__(self):
251        self._prev_epoch = -1
252
253      def on_epoch_end(self, epoch, logs=None):
254        logging.info("testModelFit: epoch=%r, logs=%r", epoch, logs)
255        if epoch <= self._prev_epoch:
256          raise RuntimeError("Epoch is supposed to be larger than previous.")
257        self._prev_epoch = epoch
258        if (logs.get("loss", None) is None or
259            not isinstance(logs["loss"], np.floating)):
260          raise RuntimeError("loss is supposed to be in the logs and float.")
261
262    strategy = parameter_server_strategy_v2.ParameterServerStrategyV2(
263        make_cluster(3, 2))
264    with strategy.scope():
265      model = sequential.Sequential([core_layers.Dense(10)])
266    model.compile(
267        gradient_descent.SGD(),
268        loss="mse",
269        steps_per_execution=steps_per_execution,
270        run_eagerly=run_eagerly)
271    return model, [ResultAssertingCallback()]
272
273  def _model_fit(self,
274                 steps_per_execution=1,
275                 validation_data=None,
276                 x=None,
277                 steps_per_epoch=10,
278                 run_eagerly=False):
279    model, callbacks = self._model_compile(steps_per_execution, run_eagerly)
280
281    def dataset_fn(input_context):
282      del input_context
283      x = random_ops.random_uniform((10, 10))
284      y = random_ops.random_uniform((10,))
285      return dataset_ops.DatasetV2.from_tensor_slices(
286          (x, y)).shuffle(10).repeat().batch(2)
287
288    x = x or dataset_creator.DatasetCreator(dataset_fn)
289
290    model.fit(
291        x,
292        epochs=10,
293        steps_per_epoch=steps_per_epoch,
294        verbose=0,
295        callbacks=callbacks,
296        validation_data=validation_data)
297    return model
298
299  @combinations.generate(combinations.combine(mode=["eager"]))
300  def testModelFit(self):
301    model = self._model_fit()
302    self.assertEqual(model.optimizer.iterations, 100)
303
304  @combinations.generate(combinations.combine(mode=["eager"]))
305  def testModelFitWithStepsPerExecution(self):
306    model = self._model_fit(steps_per_execution=10)
307    self.assertEqual(model.optimizer.iterations, 100)
308
309  @combinations.generate(combinations.combine(mode=["eager"]))
310  def testModelFitWithNoStepsPerEpoch(self):
311    with self.assertRaisesRegex(
312        ValueError, "`steps_per_epoch` must be specified with "
313        "`ParameterServerStrategy`."):
314      self._model_fit(steps_per_epoch=None)
315
316  @combinations.generate(combinations.combine(mode=["eager"]))
317  def testModelFitWithRunEagerly(self):
318    with self.assertRaisesRegex(
319        ValueError, "When using `Model` with `ParameterServerStrategy`, "
320        "`run_eagerly` is not supported."):
321      self._model_fit(run_eagerly=True)
322
323  @combinations.generate(combinations.combine(mode=["eager"]))
324  def testModelFitWithValidationData(self):
325    with self.assertRaisesRegex(
326        NotImplementedError, "Evaluation in `model.fit` with "
327        "`ParameterServerStrategy` is not yet supported."):
328      self._model_fit(
329          validation_data=dataset_ops.DatasetV2.from_tensor_slices([1, 1]))
330
331  @combinations.generate(combinations.combine(mode=["eager"]))
332  def testModelFitWithDatasetInstance(self):
333    with self.assertRaisesRegex(
334        NotImplementedError, "Only `DatasetCreator` input is supported in "
335        "`ParameterServerStrategy` at this time."):
336      self._model_fit(x=dataset_ops.DatasetV2.from_tensor_slices([1, 1]))
337
338  @combinations.generate(combinations.combine(mode=["eager"]))
339  def testModelEvaluate(self):
340    model, _ = self._model_compile()
341    with self.assertRaisesRegex(
342        NotImplementedError, "`model.evaluate` is not yet supported with "
343        "`ParameterServerStrategy`."):
344      model.evaluate(x=dataset_ops.DatasetV2.from_tensor_slices([1, 1]))
345
346  @combinations.generate(combinations.combine(mode=["eager"]))
347  def testModelPredict(self):
348    model, _ = self._model_compile()
349    with self.assertRaisesRegex(
350        NotImplementedError, "`model.predict` is not yet supported with "
351        "`ParameterServerStrategy`."):
352      model.predict(x=dataset_ops.DatasetV2.from_tensor_slices([1, 1]))
353
354  @combinations.generate(combinations.combine(mode=["eager"]))
355  def testClusterCoordinatorSingleInstance(self):
356    model = self._model_fit()
357    strategy = model.distribute_strategy
358    self.assertIs(strategy._cluster_coordinator,
359                  coordinator_lib.ClusterCoordinator(strategy))
360
361
362if __name__ == "__main__":
363  v2_compat.enable_v2_behavior()
364  test.main()
365