• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Tests that show Distribute Coordinator works with Estimator."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import copy
22import glob
23import json
24import os
25import sys
26import tempfile
27from absl.testing import parameterized
28import numpy as np
29
30from tensorflow.contrib.distribute.python import collective_all_reduce_strategy
31from tensorflow.contrib.distribute.python import combinations
32from tensorflow.contrib.distribute.python import mirrored_strategy
33from tensorflow.contrib.distribute.python import multi_worker_test_base
34from tensorflow.contrib.distribute.python import parameter_server_strategy
35from tensorflow.contrib.optimizer_v2 import adagrad
36from tensorflow.python.data.ops import dataset_ops
37from tensorflow.python.distribute import cross_device_ops as cross_device_ops_lib
38from tensorflow.python.distribute import distribute_coordinator as dc
39from tensorflow.python.distribute import estimator_training as dc_training
40from tensorflow.python.distribute.distribute_config import DistributeConfig
41from tensorflow.python.eager import context
42from tensorflow.python.estimator import exporter as exporter_lib
43from tensorflow.python.estimator import run_config as run_config_lib
44from tensorflow.python.estimator import training as estimator_training
45from tensorflow.python.estimator.canned import dnn_linear_combined
46from tensorflow.python.estimator.canned import prediction_keys
47from tensorflow.python.estimator.export import export as export_lib
48from tensorflow.python.feature_column import feature_column_lib as feature_column
49from tensorflow.python.platform import gfile
50from tensorflow.python.platform import test
51from tensorflow.python.summary import summary_iterator
52from tensorflow.python.summary.writer import writer_cache
53from tensorflow.python.training import session_manager
54
55
56BATCH_SIZE = 10
57LABEL_DIMENSION = 2
58DATA = np.linspace(
59    0., 2., BATCH_SIZE * LABEL_DIMENSION, dtype=np.float32).reshape(
60        BATCH_SIZE, LABEL_DIMENSION)
61EVAL_NAME = "foo"
62EXPORTER_NAME = "saved_model_exporter"
63MAX_STEPS = 10
64
65CHIEF = dc._TaskType.CHIEF
66EVALUATOR = dc._TaskType.EVALUATOR
67WORKER = dc._TaskType.WORKER
68PS = dc._TaskType.PS
69
70original_run_std_server = dc._run_std_server
71
72
73class DistributeCoordinatorIntegrationTest(
74    multi_worker_test_base.IndependentWorkerTestBase, parameterized.TestCase):
75
76  @classmethod
77  def setUpClass(cls):
78    """Create a local cluster with 2 workers."""
79    super(DistributeCoordinatorIntegrationTest, cls).setUpClass()
80    cls._cluster_spec = multi_worker_test_base.create_in_process_cluster(
81        num_workers=3, num_ps=2, has_eval=True)
82
83  def setUp(self):
84    self._model_dir = tempfile.mkdtemp()
85    super(DistributeCoordinatorIntegrationTest, self).setUp()
86
87  def dataset_input_fn(self, x, y, batch_size, shuffle):
88
89    def input_fn():
90      dataset = dataset_ops.Dataset.from_tensor_slices((x, y))
91      if shuffle:
92        dataset = dataset.shuffle(batch_size)
93      dataset = dataset.repeat(100).batch(batch_size)
94      return dataset
95
96    return input_fn
97
98  def _get_exporter(self, name, fc):
99    feature_spec = feature_column.make_parse_example_spec(fc)
100    serving_input_receiver_fn = (
101        export_lib.build_parsing_serving_input_receiver_fn(feature_spec))
102    return exporter_lib.LatestExporter(
103        name, serving_input_receiver_fn=serving_input_receiver_fn)
104
105  def _extract_loss_and_global_step(self, event_folder):
106    """Returns the loss and global step in last event."""
107    event_paths = glob.glob(os.path.join(event_folder, "events*"))
108    self.assertNotEmpty(
109        event_paths, msg="Event file not found in dir %s" % event_folder)
110
111    loss = None
112    global_step_count = None
113
114    for e in summary_iterator.summary_iterator(event_paths[-1]):
115      current_loss = None
116      for v in e.summary.value:
117        if v.tag == "loss":
118          current_loss = v.simple_value
119
120      # If loss is not found, global step is meaningless.
121      if current_loss is None:
122        continue
123
124      current_global_step = e.step
125      if global_step_count is None or current_global_step > global_step_count:
126        global_step_count = current_global_step
127        loss = current_loss
128
129    return (loss, global_step_count)
130
131  def _get_estimator(self,
132                     train_distribute,
133                     eval_distribute,
134                     remote_cluster=None):
135    input_dimension = LABEL_DIMENSION
136    linear_feature_columns = [
137        feature_column.numeric_column("x", shape=(input_dimension,))
138    ]
139    dnn_feature_columns = [
140        feature_column.numeric_column("x", shape=(input_dimension,))
141    ]
142
143    return dnn_linear_combined.DNNLinearCombinedRegressor(
144        linear_feature_columns=linear_feature_columns,
145        dnn_hidden_units=(2, 2),
146        dnn_feature_columns=dnn_feature_columns,
147        label_dimension=LABEL_DIMENSION,
148        model_dir=self._model_dir,
149        dnn_optimizer=adagrad.AdagradOptimizer(0.001),
150        linear_optimizer=adagrad.AdagradOptimizer(0.001),
151        config=run_config_lib.RunConfig(
152            experimental_distribute=DistributeConfig(
153                train_distribute=train_distribute,
154                eval_distribute=eval_distribute,
155                remote_cluster=remote_cluster)))
156
157  def _complete_flow(self,
158                     train_distribute,
159                     eval_distribute,
160                     remote_cluster=None,
161                     use_train_and_evaluate=True):
162    estimator = self._get_estimator(train_distribute, eval_distribute,
163                                    remote_cluster)
164
165    input_dimension = LABEL_DIMENSION
166    train_input_fn = self.dataset_input_fn(
167        x={"x": DATA},
168        y=DATA,
169        batch_size=BATCH_SIZE // train_distribute.num_replicas_in_sync,
170        shuffle=True)
171    if eval_distribute:
172      eval_batch_size = BATCH_SIZE // eval_distribute.num_replicas_in_sync
173    else:
174      eval_batch_size = BATCH_SIZE
175    eval_input_fn = self.dataset_input_fn(
176        x={"x": DATA}, y=DATA, batch_size=eval_batch_size, shuffle=False)
177
178    linear_feature_columns = [
179        feature_column.numeric_column("x", shape=(input_dimension,))
180    ]
181    dnn_feature_columns = [
182        feature_column.numeric_column("x", shape=(input_dimension,))
183    ]
184    feature_columns = linear_feature_columns + dnn_feature_columns
185
186    eval_spec = estimator_training.EvalSpec(
187        name=EVAL_NAME,
188        input_fn=eval_input_fn,
189        steps=None,
190        exporters=self._get_exporter(EXPORTER_NAME, feature_columns),
191        start_delay_secs=0,
192        throttle_secs=1)
193
194    if use_train_and_evaluate:
195      estimator_training.train_and_evaluate(
196          estimator,
197          estimator_training.TrainSpec(train_input_fn, max_steps=MAX_STEPS),
198          eval_spec)
199    else:
200      estimator.train(train_input_fn, max_steps=MAX_STEPS)
201
202      latest_ckpt_path = estimator.latest_checkpoint()
203      metrics = estimator.evaluate(eval_input_fn,
204                                   checkpoint_path=latest_ckpt_path,
205                                   name=EVAL_NAME)
206
207      # Export the eval result to files.
208      eval_result = estimator_training._EvalResult(
209          status=estimator_training._EvalStatus.EVALUATED,
210          metrics=metrics,
211          checkpoint_path=latest_ckpt_path)
212      evaluator = estimator_training._TrainingExecutor._Evaluator(estimator,
213                                                                  eval_spec,
214                                                                  None)
215      evaluator._export_eval_result(eval_result, True)
216
217    return estimator
218
219  def _inspect_train_and_eval_events(self, estimator):
220    # Make sure nothing is stuck in limbo.
221    writer_cache.FileWriterCache.clear()
222
223    # Examine the training events. Use a range to check global step to avoid
224    # flakyness due to global step race condition.
225    training_loss, _ = self._extract_loss_and_global_step(self._model_dir)
226    self.assertIsNotNone(training_loss)
227
228    # Examine the eval events. The global step should be accurate.
229    eval_dir = os.path.join(self._model_dir, "eval_" + EVAL_NAME)
230    eval_loss, eval_global_step = self._extract_loss_and_global_step(
231        event_folder=eval_dir)
232    self.assertIsNotNone(eval_loss)
233    self.assertGreaterEqual(eval_global_step, MAX_STEPS)
234
235    # Examine the export folder.
236    export_dir = os.path.join(
237        os.path.join(self._model_dir, "export"), EXPORTER_NAME)
238    self.assertTrue(gfile.Exists(export_dir))
239
240    # Examine the ckpt for predict.
241    def predict_input_fn():
242      return dataset_ops.Dataset.from_tensor_slices({
243          "x": DATA
244      }).batch(BATCH_SIZE)
245
246    predicted_proba = np.array([
247        x[prediction_keys.PredictionKeys.PREDICTIONS]
248        for x in estimator.predict(predict_input_fn)
249    ])
250    self.assertAllEqual((BATCH_SIZE, LABEL_DIMENSION), predicted_proba.shape)
251
252  def _get_strategy_object(self, strategy_cls):
253    if strategy_cls == mirrored_strategy.CoreMirroredStrategy:
254      return strategy_cls(mirrored_strategy.all_local_devices())
255    else:
256      return strategy_cls(num_gpus_per_worker=context.num_gpus())
257
258  @combinations.generate(
259      combinations.combine(
260          mode=["graph"],
261          train_distribute_cls=[
262              collective_all_reduce_strategy.CollectiveAllReduceStrategy,
263              mirrored_strategy.MirroredStrategy,
264              mirrored_strategy.CoreMirroredStrategy,
265              parameter_server_strategy.ParameterServerStrategy
266          ],
267          eval_distribute_cls=[
268              None,
269              mirrored_strategy.MirroredStrategy,
270              mirrored_strategy.CoreMirroredStrategy,
271              parameter_server_strategy.ParameterServerStrategy,
272          ],
273          required_gpus=[0, 1]))
274  def test_complete_flow_standalone_client(self, train_distribute_cls,
275                                           eval_distribute_cls):
276    train_distribute = self._get_strategy_object(train_distribute_cls)
277
278    if eval_distribute_cls:
279      eval_distribute = self._get_strategy_object(eval_distribute_cls)
280    else:
281      eval_distribute = None
282
283    cluster_spec = copy.deepcopy(self._cluster_spec)
284    if (train_distribute_cls !=
285        parameter_server_strategy.ParameterServerStrategy):
286      cluster_spec.pop("ps", None)
287    estimator = self._complete_flow(train_distribute, eval_distribute,
288                                    cluster_spec)
289    self._inspect_train_and_eval_events(estimator)
290
291  @combinations.generate(
292      combinations.combine(
293          mode=["graph"],
294          eval_distribute_class=[
295              None,
296              mirrored_strategy.MirroredStrategy,
297              mirrored_strategy.CoreMirroredStrategy,
298              parameter_server_strategy.ParameterServerStrategy,
299          ],
300          required_gpus=[0, 1]))
301  def test_complete_flow_standalone_client_collective_nccl(
302      self, eval_distribute_class):
303    train_distribute = (
304        collective_all_reduce_strategy.CollectiveAllReduceStrategy(
305            num_gpus_per_worker=context.num_gpus(),
306            communication=cross_device_ops_lib.CollectiveCommunication.NCCL))
307
308    if eval_distribute_class:
309      eval_distribute = self._get_strategy_object(eval_distribute_class)
310    else:
311      eval_distribute = None
312
313    cluster_spec = copy.deepcopy(self._cluster_spec)
314    cluster_spec.pop("ps", None)
315    estimator = self._complete_flow(train_distribute, eval_distribute,
316                                    cluster_spec)
317    self._inspect_train_and_eval_events(estimator)
318
319  @combinations.generate(
320      combinations.combine(
321          mode=["graph"],
322          train_distribute_cls=[
323              mirrored_strategy.MirroredStrategy,
324              mirrored_strategy.CoreMirroredStrategy,
325          ],
326          eval_distribute_cls=[
327              None,
328              mirrored_strategy.MirroredStrategy,
329              mirrored_strategy.CoreMirroredStrategy,
330          ],
331          required_gpus=[0, 1]))
332  def test_estimator_standalone_client(self, train_distribute_cls,
333                                       eval_distribute_cls):
334    train_distribute = self._get_strategy_object(train_distribute_cls)
335
336    if eval_distribute_cls:
337      eval_distribute = self._get_strategy_object(eval_distribute_cls)
338    else:
339      eval_distribute = None
340
341    # We use the whole cluster for evaluation.
342    cluster = copy.deepcopy(self._cluster_spec)
343    cluster.pop("evaluator", None)
344
345    estimator = self._complete_flow(
346        train_distribute, eval_distribute, remote_cluster=cluster,
347        use_train_and_evaluate=False)
348    self._inspect_train_and_eval_events(estimator)
349
350  def _mock_run_std_server(self, *args, **kwargs):
351    ret = original_run_std_server(*args, **kwargs)
352    # Wait for all std servers to be brought up in order to reduce the chance of
353    # remote sessions taking local ports that have been assigned to std servers.
354    self._barrier.wait()
355    return ret
356
357  def _independent_worker_fn(
358      self,
359      train_distribute,
360      eval_distribute,
361  ):
362    with test.mock.patch.object(dc, "_run_std_server",
363                                self._mock_run_std_server):
364      self._complete_flow(train_distribute, eval_distribute)
365
366  @combinations.generate(
367      combinations.combine(
368          mode=["graph"],
369          train_distribute_cls=[
370              collective_all_reduce_strategy.CollectiveAllReduceStrategy,
371              parameter_server_strategy.ParameterServerStrategy,
372          ],
373          eval_distribute_cls=[
374              None, mirrored_strategy.MirroredStrategy,
375              mirrored_strategy.CoreMirroredStrategy,
376              parameter_server_strategy.ParameterServerStrategy,
377          ],
378          required_gpus=[0, 1]))
379  def test_complete_flow_independent_worker_between_graph(
380      self, train_distribute_cls, eval_distribute_cls):
381    if (context.num_gpus() < 2 and eval_distribute_cls ==
382        collective_all_reduce_strategy.CollectiveAllReduceStrategy):
383      self.skipTest("`CollectiveAllReduceStrategy` needs at least two towers.")
384
385    train_distribute = self._get_strategy_object(train_distribute_cls)
386
387    if eval_distribute_cls:
388      eval_distribute = self._get_strategy_object(eval_distribute_cls)
389    else:
390      eval_distribute = None
391
392    if (train_distribute_cls == parameter_server_strategy
393        .ParameterServerStrategy):
394      cluster_spec = multi_worker_test_base.create_cluster_spec(
395          num_workers=3, num_ps=2, has_eval=True)
396      # 3 workers, 2 ps and 1 evaluator.
397      self._barrier = dc._Barrier(6)
398    else:
399      cluster_spec = multi_worker_test_base.create_cluster_spec(
400          num_workers=3, num_ps=0, has_eval=True)
401      # 3 workers and 1 evaluator.
402      self._barrier = dc._Barrier(4)
403
404    threads = self.run_multiple_tasks_in_threads(self._independent_worker_fn,
405                                                 cluster_spec, train_distribute,
406                                                 eval_distribute)
407    threads_to_join = []
408    for task_type, ts in threads.items():
409      if task_type == PS:
410        continue
411      for t in ts:
412        threads_to_join.append(t)
413    self.join_independent_workers(threads_to_join)
414
415    estimator = self._get_estimator(train_distribute, eval_distribute)
416    self._inspect_train_and_eval_events(estimator)
417
418  @combinations.generate(
419      combinations.combine(
420          mode=["graph"],
421          train_distribute_cls=[
422              mirrored_strategy.MirroredStrategy,
423              mirrored_strategy.CoreMirroredStrategy
424          ],
425          eval_distribute_cls=[
426              None,
427              mirrored_strategy.MirroredStrategy,
428              mirrored_strategy.CoreMirroredStrategy
429          ],
430          required_gpus=[0, 1]))
431  def test_complete_flow_independent_worker_in_graph(self, train_distribute_cls,
432                                                     eval_distribute_cls):
433    train_distribute = self._get_strategy_object(train_distribute_cls)
434
435    if eval_distribute_cls:
436      eval_distribute = self._get_strategy_object(eval_distribute_cls)
437    else:
438      eval_distribute = None
439
440    cluster_spec = multi_worker_test_base.create_cluster_spec(
441        num_workers=3, num_ps=0, has_eval=True)
442    # 3 workers and 1 evaluator.
443    self._barrier = dc._Barrier(4)
444    threads = self.run_multiple_tasks_in_threads(self._independent_worker_fn,
445                                                 cluster_spec, train_distribute,
446                                                 eval_distribute)
447    self.join_independent_workers([threads[WORKER][0], threads[EVALUATOR][0]])
448
449    estimator = self._get_estimator(train_distribute, eval_distribute)
450    self._inspect_train_and_eval_events(estimator)
451
452
453TF_CONFIG_WITH_CHIEF = {
454    "cluster": {
455        "chief": ["fake_chief"],
456    },
457    "task": {
458        "type": "chief",
459        "index": 0
460    }
461}
462
463TF_CONFIG_WITH_MASTER = {
464    "cluster": {
465        "master": ["fake_master"],
466    },
467    "task": {
468        "type": "master",
469        "index": 0
470    }
471}
472
473TF_CONFIG_WITHOUT_TASK = {"cluster": {"chief": ["fake_worker"]}}
474
475
476class RunConfigTest(test.TestCase):
477
478  def test_previously_unexpected_cluster_spec(self):
479    with test.mock.patch.dict(
480        "os.environ", {"TF_CONFIG": json.dumps(TF_CONFIG_WITHOUT_TASK)}):
481      run_config_lib.RunConfig(
482          experimental_distribute=DistributeConfig(
483              train_distribute=mirrored_strategy.CoreMirroredStrategy(
484                  ["/device:GPU:0", "/device:GPU:1"])))
485
486  def test_should_run_distribute_coordinator(self):
487    """Tests that should_run_distribute_coordinator return a correct value."""
488    # We don't use distribute coordinator for local training.
489    self.assertFalse(
490        dc_training.should_run_distribute_coordinator(
491            run_config_lib.RunConfig()))
492
493    # When `train_distribute` is not specified, don't use distribute
494    # coordinator.
495    with test.mock.patch.dict("os.environ",
496                              {"TF_CONFIG": json.dumps(TF_CONFIG_WITH_CHIEF)}):
497      self.assertFalse(
498          dc_training.should_run_distribute_coordinator(
499              run_config_lib.RunConfig()))
500
501    # When `train_distribute` is specified and TF_CONFIG is detected, use
502    # distribute coordinator.
503    with test.mock.patch.dict("os.environ",
504                              {"TF_CONFIG": json.dumps(TF_CONFIG_WITH_CHIEF)}):
505      config_with_train_distribute = run_config_lib.RunConfig(
506          experimental_distribute=DistributeConfig(
507              train_distribute=mirrored_strategy.CoreMirroredStrategy(
508                  ["/device:GPU:0", "/device:GPU:1"])))
509      config_with_eval_distribute = run_config_lib.RunConfig(
510          experimental_distribute=DistributeConfig(
511              eval_distribute=mirrored_strategy.CoreMirroredStrategy(
512                  ["/device:GPU:0", "/device:GPU:1"])))
513    self.assertTrue(
514        dc_training.should_run_distribute_coordinator(
515            config_with_train_distribute))
516    self.assertFalse(
517        dc_training.should_run_distribute_coordinator(
518            config_with_eval_distribute))
519
520    # With a master in the cluster, don't run distribute coordinator.
521    with test.mock.patch.dict("os.environ",
522                              {"TF_CONFIG": json.dumps(TF_CONFIG_WITH_MASTER)}):
523      config = run_config_lib.RunConfig(
524          experimental_distribute=DistributeConfig(
525              train_distribute=mirrored_strategy.CoreMirroredStrategy(
526                  ["/device:GPU:0", "/device:GPU:1"])))
527    self.assertFalse(dc_training.should_run_distribute_coordinator(config))
528
529  def test_init_run_config_duplicate_distribute(self):
530    with self.assertRaises(ValueError):
531      run_config_lib.RunConfig(
532          train_distribute=mirrored_strategy.CoreMirroredStrategy(),
533          experimental_distribute=DistributeConfig(
534              train_distribute=mirrored_strategy.CoreMirroredStrategy()))
535
536    with self.assertRaises(ValueError):
537      run_config_lib.RunConfig(
538          eval_distribute=mirrored_strategy.CoreMirroredStrategy(),
539          experimental_distribute=DistributeConfig(
540              eval_distribute=mirrored_strategy.CoreMirroredStrategy()))
541
542  def test_init_run_config_none_distribute_coordinator_mode(self):
543    # We don't use distribute coordinator for local training.
544    config = run_config_lib.RunConfig(
545        train_distribute=mirrored_strategy.CoreMirroredStrategy())
546    dc_training.init_run_config(config, {})
547    self.assertIsNone(config._distribute_coordinator_mode)
548
549    # With a master in the cluster, don't run distribute coordinator.
550    with test.mock.patch.dict("os.environ",
551                              {"TF_CONFIG": json.dumps(TF_CONFIG_WITH_MASTER)}):
552      config = run_config_lib.RunConfig(
553          train_distribute=mirrored_strategy.CoreMirroredStrategy())
554      self.assertIsNone(config._distribute_coordinator_mode)
555
556    # When `train_distribute` is not specified, don't use distribute
557    # coordinator.
558    with test.mock.patch.dict("os.environ",
559                              {"TF_CONFIG": json.dumps(TF_CONFIG_WITH_CHIEF)}):
560      config = run_config_lib.RunConfig()
561      self.assertFalse(hasattr(config, "_distribute_coordinator_mode"))
562
563  def test_init_run_config_independent_worker(self):
564    # When `train_distribute` is specified and TF_CONFIG is detected, use
565    # distribute coordinator with INDEPENDENT_WORKER mode.
566    with test.mock.patch.dict("os.environ",
567                              {"TF_CONFIG": json.dumps(TF_CONFIG_WITH_CHIEF)}):
568      config = run_config_lib.RunConfig(
569          train_distribute=mirrored_strategy.CoreMirroredStrategy())
570    self.assertEqual(config._distribute_coordinator_mode,
571                     dc.CoordinatorMode.INDEPENDENT_WORKER)
572
573  def test_init_run_config_standalone_client(self):
574    # When `train_distribute` is specified, TF_CONFIG is detected and
575    # `experimental.remote_cluster` is set use distribute coordinator with
576    # STANDALONE_CLIENT mode.
577    config = run_config_lib.RunConfig(
578        train_distribute=mirrored_strategy.CoreMirroredStrategy(),
579        experimental_distribute=DistributeConfig(
580            remote_cluster={"chief": ["fake_worker"]}))
581    self.assertEqual(config._distribute_coordinator_mode,
582                     dc.CoordinatorMode.STANDALONE_CLIENT)
583
584
585if __name__ == "__main__":
586  # Reduce `recovery_wait_secs` from 30 seconds so the test completes quickly.
587  orig_init = session_manager.SessionManager.__init__
588
589  def new_init(*args, **kwargs):
590    kwargs.pop("recovery_wait_secs", None)
591    kwargs["recovery_wait_secs"] = 0.5
592    orig_init(*args, **kwargs)
593
594  session_manager.SessionManager.__init__ = new_init
595
596  with test.mock.patch.object(sys, "exit", os._exit):
597    test.main()
598