1# Copyright 2018 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Tests that show Distribute Coordinator works with Estimator.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import copy 22import glob 23import json 24import os 25import sys 26import tempfile 27from absl.testing import parameterized 28import numpy as np 29 30from tensorflow.contrib.distribute.python import collective_all_reduce_strategy 31from tensorflow.contrib.distribute.python import combinations 32from tensorflow.contrib.distribute.python import mirrored_strategy 33from tensorflow.contrib.distribute.python import multi_worker_test_base 34from tensorflow.contrib.distribute.python import parameter_server_strategy 35from tensorflow.contrib.optimizer_v2 import adagrad 36from tensorflow.python.data.ops import dataset_ops 37from tensorflow.python.distribute import cross_device_ops as cross_device_ops_lib 38from tensorflow.python.distribute import distribute_coordinator as dc 39from tensorflow.python.distribute import estimator_training as dc_training 40from tensorflow.python.distribute.distribute_config import DistributeConfig 41from tensorflow.python.eager import context 42from tensorflow.python.estimator import exporter as exporter_lib 43from tensorflow.python.estimator import run_config as run_config_lib 44from tensorflow.python.estimator import training as estimator_training 45from tensorflow.python.estimator.canned import dnn_linear_combined 46from tensorflow.python.estimator.canned import prediction_keys 47from tensorflow.python.estimator.export import export as export_lib 48from tensorflow.python.feature_column import feature_column_lib as feature_column 49from tensorflow.python.platform import gfile 50from tensorflow.python.platform import test 51from tensorflow.python.summary import summary_iterator 52from tensorflow.python.summary.writer import writer_cache 53from tensorflow.python.training import session_manager 54 55 56BATCH_SIZE = 10 57LABEL_DIMENSION = 2 58DATA = np.linspace( 59 0., 2., BATCH_SIZE * LABEL_DIMENSION, dtype=np.float32).reshape( 60 BATCH_SIZE, LABEL_DIMENSION) 61EVAL_NAME = "foo" 62EXPORTER_NAME = "saved_model_exporter" 63MAX_STEPS = 10 64 65CHIEF = dc._TaskType.CHIEF 66EVALUATOR = dc._TaskType.EVALUATOR 67WORKER = dc._TaskType.WORKER 68PS = dc._TaskType.PS 69 70original_run_std_server = dc._run_std_server 71 72 73class DistributeCoordinatorIntegrationTest( 74 multi_worker_test_base.IndependentWorkerTestBase, parameterized.TestCase): 75 76 @classmethod 77 def setUpClass(cls): 78 """Create a local cluster with 2 workers.""" 79 super(DistributeCoordinatorIntegrationTest, cls).setUpClass() 80 cls._cluster_spec = multi_worker_test_base.create_in_process_cluster( 81 num_workers=3, num_ps=2, has_eval=True) 82 83 def setUp(self): 84 self._model_dir = tempfile.mkdtemp() 85 super(DistributeCoordinatorIntegrationTest, self).setUp() 86 87 def dataset_input_fn(self, x, y, batch_size, shuffle): 88 89 def input_fn(): 90 dataset = dataset_ops.Dataset.from_tensor_slices((x, y)) 91 if shuffle: 92 dataset = dataset.shuffle(batch_size) 93 dataset = dataset.repeat(100).batch(batch_size) 94 return dataset 95 96 return input_fn 97 98 def _get_exporter(self, name, fc): 99 feature_spec = feature_column.make_parse_example_spec(fc) 100 serving_input_receiver_fn = ( 101 export_lib.build_parsing_serving_input_receiver_fn(feature_spec)) 102 return exporter_lib.LatestExporter( 103 name, serving_input_receiver_fn=serving_input_receiver_fn) 104 105 def _extract_loss_and_global_step(self, event_folder): 106 """Returns the loss and global step in last event.""" 107 event_paths = glob.glob(os.path.join(event_folder, "events*")) 108 self.assertNotEmpty( 109 event_paths, msg="Event file not found in dir %s" % event_folder) 110 111 loss = None 112 global_step_count = None 113 114 for e in summary_iterator.summary_iterator(event_paths[-1]): 115 current_loss = None 116 for v in e.summary.value: 117 if v.tag == "loss": 118 current_loss = v.simple_value 119 120 # If loss is not found, global step is meaningless. 121 if current_loss is None: 122 continue 123 124 current_global_step = e.step 125 if global_step_count is None or current_global_step > global_step_count: 126 global_step_count = current_global_step 127 loss = current_loss 128 129 return (loss, global_step_count) 130 131 def _get_estimator(self, 132 train_distribute, 133 eval_distribute, 134 remote_cluster=None): 135 input_dimension = LABEL_DIMENSION 136 linear_feature_columns = [ 137 feature_column.numeric_column("x", shape=(input_dimension,)) 138 ] 139 dnn_feature_columns = [ 140 feature_column.numeric_column("x", shape=(input_dimension,)) 141 ] 142 143 return dnn_linear_combined.DNNLinearCombinedRegressor( 144 linear_feature_columns=linear_feature_columns, 145 dnn_hidden_units=(2, 2), 146 dnn_feature_columns=dnn_feature_columns, 147 label_dimension=LABEL_DIMENSION, 148 model_dir=self._model_dir, 149 dnn_optimizer=adagrad.AdagradOptimizer(0.001), 150 linear_optimizer=adagrad.AdagradOptimizer(0.001), 151 config=run_config_lib.RunConfig( 152 experimental_distribute=DistributeConfig( 153 train_distribute=train_distribute, 154 eval_distribute=eval_distribute, 155 remote_cluster=remote_cluster))) 156 157 def _complete_flow(self, 158 train_distribute, 159 eval_distribute, 160 remote_cluster=None, 161 use_train_and_evaluate=True): 162 estimator = self._get_estimator(train_distribute, eval_distribute, 163 remote_cluster) 164 165 input_dimension = LABEL_DIMENSION 166 train_input_fn = self.dataset_input_fn( 167 x={"x": DATA}, 168 y=DATA, 169 batch_size=BATCH_SIZE // train_distribute.num_replicas_in_sync, 170 shuffle=True) 171 if eval_distribute: 172 eval_batch_size = BATCH_SIZE // eval_distribute.num_replicas_in_sync 173 else: 174 eval_batch_size = BATCH_SIZE 175 eval_input_fn = self.dataset_input_fn( 176 x={"x": DATA}, y=DATA, batch_size=eval_batch_size, shuffle=False) 177 178 linear_feature_columns = [ 179 feature_column.numeric_column("x", shape=(input_dimension,)) 180 ] 181 dnn_feature_columns = [ 182 feature_column.numeric_column("x", shape=(input_dimension,)) 183 ] 184 feature_columns = linear_feature_columns + dnn_feature_columns 185 186 eval_spec = estimator_training.EvalSpec( 187 name=EVAL_NAME, 188 input_fn=eval_input_fn, 189 steps=None, 190 exporters=self._get_exporter(EXPORTER_NAME, feature_columns), 191 start_delay_secs=0, 192 throttle_secs=1) 193 194 if use_train_and_evaluate: 195 estimator_training.train_and_evaluate( 196 estimator, 197 estimator_training.TrainSpec(train_input_fn, max_steps=MAX_STEPS), 198 eval_spec) 199 else: 200 estimator.train(train_input_fn, max_steps=MAX_STEPS) 201 202 latest_ckpt_path = estimator.latest_checkpoint() 203 metrics = estimator.evaluate(eval_input_fn, 204 checkpoint_path=latest_ckpt_path, 205 name=EVAL_NAME) 206 207 # Export the eval result to files. 208 eval_result = estimator_training._EvalResult( 209 status=estimator_training._EvalStatus.EVALUATED, 210 metrics=metrics, 211 checkpoint_path=latest_ckpt_path) 212 evaluator = estimator_training._TrainingExecutor._Evaluator(estimator, 213 eval_spec, 214 None) 215 evaluator._export_eval_result(eval_result, True) 216 217 return estimator 218 219 def _inspect_train_and_eval_events(self, estimator): 220 # Make sure nothing is stuck in limbo. 221 writer_cache.FileWriterCache.clear() 222 223 # Examine the training events. Use a range to check global step to avoid 224 # flakyness due to global step race condition. 225 training_loss, _ = self._extract_loss_and_global_step(self._model_dir) 226 self.assertIsNotNone(training_loss) 227 228 # Examine the eval events. The global step should be accurate. 229 eval_dir = os.path.join(self._model_dir, "eval_" + EVAL_NAME) 230 eval_loss, eval_global_step = self._extract_loss_and_global_step( 231 event_folder=eval_dir) 232 self.assertIsNotNone(eval_loss) 233 self.assertGreaterEqual(eval_global_step, MAX_STEPS) 234 235 # Examine the export folder. 236 export_dir = os.path.join( 237 os.path.join(self._model_dir, "export"), EXPORTER_NAME) 238 self.assertTrue(gfile.Exists(export_dir)) 239 240 # Examine the ckpt for predict. 241 def predict_input_fn(): 242 return dataset_ops.Dataset.from_tensor_slices({ 243 "x": DATA 244 }).batch(BATCH_SIZE) 245 246 predicted_proba = np.array([ 247 x[prediction_keys.PredictionKeys.PREDICTIONS] 248 for x in estimator.predict(predict_input_fn) 249 ]) 250 self.assertAllEqual((BATCH_SIZE, LABEL_DIMENSION), predicted_proba.shape) 251 252 def _get_strategy_object(self, strategy_cls): 253 if strategy_cls == mirrored_strategy.CoreMirroredStrategy: 254 return strategy_cls(mirrored_strategy.all_local_devices()) 255 else: 256 return strategy_cls(num_gpus_per_worker=context.num_gpus()) 257 258 @combinations.generate( 259 combinations.combine( 260 mode=["graph"], 261 train_distribute_cls=[ 262 collective_all_reduce_strategy.CollectiveAllReduceStrategy, 263 mirrored_strategy.MirroredStrategy, 264 mirrored_strategy.CoreMirroredStrategy, 265 parameter_server_strategy.ParameterServerStrategy 266 ], 267 eval_distribute_cls=[ 268 None, 269 mirrored_strategy.MirroredStrategy, 270 mirrored_strategy.CoreMirroredStrategy, 271 parameter_server_strategy.ParameterServerStrategy, 272 ], 273 required_gpus=[0, 1])) 274 def test_complete_flow_standalone_client(self, train_distribute_cls, 275 eval_distribute_cls): 276 train_distribute = self._get_strategy_object(train_distribute_cls) 277 278 if eval_distribute_cls: 279 eval_distribute = self._get_strategy_object(eval_distribute_cls) 280 else: 281 eval_distribute = None 282 283 cluster_spec = copy.deepcopy(self._cluster_spec) 284 if (train_distribute_cls != 285 parameter_server_strategy.ParameterServerStrategy): 286 cluster_spec.pop("ps", None) 287 estimator = self._complete_flow(train_distribute, eval_distribute, 288 cluster_spec) 289 self._inspect_train_and_eval_events(estimator) 290 291 @combinations.generate( 292 combinations.combine( 293 mode=["graph"], 294 eval_distribute_class=[ 295 None, 296 mirrored_strategy.MirroredStrategy, 297 mirrored_strategy.CoreMirroredStrategy, 298 parameter_server_strategy.ParameterServerStrategy, 299 ], 300 required_gpus=[0, 1])) 301 def test_complete_flow_standalone_client_collective_nccl( 302 self, eval_distribute_class): 303 train_distribute = ( 304 collective_all_reduce_strategy.CollectiveAllReduceStrategy( 305 num_gpus_per_worker=context.num_gpus(), 306 communication=cross_device_ops_lib.CollectiveCommunication.NCCL)) 307 308 if eval_distribute_class: 309 eval_distribute = self._get_strategy_object(eval_distribute_class) 310 else: 311 eval_distribute = None 312 313 cluster_spec = copy.deepcopy(self._cluster_spec) 314 cluster_spec.pop("ps", None) 315 estimator = self._complete_flow(train_distribute, eval_distribute, 316 cluster_spec) 317 self._inspect_train_and_eval_events(estimator) 318 319 @combinations.generate( 320 combinations.combine( 321 mode=["graph"], 322 train_distribute_cls=[ 323 mirrored_strategy.MirroredStrategy, 324 mirrored_strategy.CoreMirroredStrategy, 325 ], 326 eval_distribute_cls=[ 327 None, 328 mirrored_strategy.MirroredStrategy, 329 mirrored_strategy.CoreMirroredStrategy, 330 ], 331 required_gpus=[0, 1])) 332 def test_estimator_standalone_client(self, train_distribute_cls, 333 eval_distribute_cls): 334 train_distribute = self._get_strategy_object(train_distribute_cls) 335 336 if eval_distribute_cls: 337 eval_distribute = self._get_strategy_object(eval_distribute_cls) 338 else: 339 eval_distribute = None 340 341 # We use the whole cluster for evaluation. 342 cluster = copy.deepcopy(self._cluster_spec) 343 cluster.pop("evaluator", None) 344 345 estimator = self._complete_flow( 346 train_distribute, eval_distribute, remote_cluster=cluster, 347 use_train_and_evaluate=False) 348 self._inspect_train_and_eval_events(estimator) 349 350 def _mock_run_std_server(self, *args, **kwargs): 351 ret = original_run_std_server(*args, **kwargs) 352 # Wait for all std servers to be brought up in order to reduce the chance of 353 # remote sessions taking local ports that have been assigned to std servers. 354 self._barrier.wait() 355 return ret 356 357 def _independent_worker_fn( 358 self, 359 train_distribute, 360 eval_distribute, 361 ): 362 with test.mock.patch.object(dc, "_run_std_server", 363 self._mock_run_std_server): 364 self._complete_flow(train_distribute, eval_distribute) 365 366 @combinations.generate( 367 combinations.combine( 368 mode=["graph"], 369 train_distribute_cls=[ 370 collective_all_reduce_strategy.CollectiveAllReduceStrategy, 371 parameter_server_strategy.ParameterServerStrategy, 372 ], 373 eval_distribute_cls=[ 374 None, mirrored_strategy.MirroredStrategy, 375 mirrored_strategy.CoreMirroredStrategy, 376 parameter_server_strategy.ParameterServerStrategy, 377 ], 378 required_gpus=[0, 1])) 379 def test_complete_flow_independent_worker_between_graph( 380 self, train_distribute_cls, eval_distribute_cls): 381 if (context.num_gpus() < 2 and eval_distribute_cls == 382 collective_all_reduce_strategy.CollectiveAllReduceStrategy): 383 self.skipTest("`CollectiveAllReduceStrategy` needs at least two towers.") 384 385 train_distribute = self._get_strategy_object(train_distribute_cls) 386 387 if eval_distribute_cls: 388 eval_distribute = self._get_strategy_object(eval_distribute_cls) 389 else: 390 eval_distribute = None 391 392 if (train_distribute_cls == parameter_server_strategy 393 .ParameterServerStrategy): 394 cluster_spec = multi_worker_test_base.create_cluster_spec( 395 num_workers=3, num_ps=2, has_eval=True) 396 # 3 workers, 2 ps and 1 evaluator. 397 self._barrier = dc._Barrier(6) 398 else: 399 cluster_spec = multi_worker_test_base.create_cluster_spec( 400 num_workers=3, num_ps=0, has_eval=True) 401 # 3 workers and 1 evaluator. 402 self._barrier = dc._Barrier(4) 403 404 threads = self.run_multiple_tasks_in_threads(self._independent_worker_fn, 405 cluster_spec, train_distribute, 406 eval_distribute) 407 threads_to_join = [] 408 for task_type, ts in threads.items(): 409 if task_type == PS: 410 continue 411 for t in ts: 412 threads_to_join.append(t) 413 self.join_independent_workers(threads_to_join) 414 415 estimator = self._get_estimator(train_distribute, eval_distribute) 416 self._inspect_train_and_eval_events(estimator) 417 418 @combinations.generate( 419 combinations.combine( 420 mode=["graph"], 421 train_distribute_cls=[ 422 mirrored_strategy.MirroredStrategy, 423 mirrored_strategy.CoreMirroredStrategy 424 ], 425 eval_distribute_cls=[ 426 None, 427 mirrored_strategy.MirroredStrategy, 428 mirrored_strategy.CoreMirroredStrategy 429 ], 430 required_gpus=[0, 1])) 431 def test_complete_flow_independent_worker_in_graph(self, train_distribute_cls, 432 eval_distribute_cls): 433 train_distribute = self._get_strategy_object(train_distribute_cls) 434 435 if eval_distribute_cls: 436 eval_distribute = self._get_strategy_object(eval_distribute_cls) 437 else: 438 eval_distribute = None 439 440 cluster_spec = multi_worker_test_base.create_cluster_spec( 441 num_workers=3, num_ps=0, has_eval=True) 442 # 3 workers and 1 evaluator. 443 self._barrier = dc._Barrier(4) 444 threads = self.run_multiple_tasks_in_threads(self._independent_worker_fn, 445 cluster_spec, train_distribute, 446 eval_distribute) 447 self.join_independent_workers([threads[WORKER][0], threads[EVALUATOR][0]]) 448 449 estimator = self._get_estimator(train_distribute, eval_distribute) 450 self._inspect_train_and_eval_events(estimator) 451 452 453TF_CONFIG_WITH_CHIEF = { 454 "cluster": { 455 "chief": ["fake_chief"], 456 }, 457 "task": { 458 "type": "chief", 459 "index": 0 460 } 461} 462 463TF_CONFIG_WITH_MASTER = { 464 "cluster": { 465 "master": ["fake_master"], 466 }, 467 "task": { 468 "type": "master", 469 "index": 0 470 } 471} 472 473TF_CONFIG_WITHOUT_TASK = {"cluster": {"chief": ["fake_worker"]}} 474 475 476class RunConfigTest(test.TestCase): 477 478 def test_previously_unexpected_cluster_spec(self): 479 with test.mock.patch.dict( 480 "os.environ", {"TF_CONFIG": json.dumps(TF_CONFIG_WITHOUT_TASK)}): 481 run_config_lib.RunConfig( 482 experimental_distribute=DistributeConfig( 483 train_distribute=mirrored_strategy.CoreMirroredStrategy( 484 ["/device:GPU:0", "/device:GPU:1"]))) 485 486 def test_should_run_distribute_coordinator(self): 487 """Tests that should_run_distribute_coordinator return a correct value.""" 488 # We don't use distribute coordinator for local training. 489 self.assertFalse( 490 dc_training.should_run_distribute_coordinator( 491 run_config_lib.RunConfig())) 492 493 # When `train_distribute` is not specified, don't use distribute 494 # coordinator. 495 with test.mock.patch.dict("os.environ", 496 {"TF_CONFIG": json.dumps(TF_CONFIG_WITH_CHIEF)}): 497 self.assertFalse( 498 dc_training.should_run_distribute_coordinator( 499 run_config_lib.RunConfig())) 500 501 # When `train_distribute` is specified and TF_CONFIG is detected, use 502 # distribute coordinator. 503 with test.mock.patch.dict("os.environ", 504 {"TF_CONFIG": json.dumps(TF_CONFIG_WITH_CHIEF)}): 505 config_with_train_distribute = run_config_lib.RunConfig( 506 experimental_distribute=DistributeConfig( 507 train_distribute=mirrored_strategy.CoreMirroredStrategy( 508 ["/device:GPU:0", "/device:GPU:1"]))) 509 config_with_eval_distribute = run_config_lib.RunConfig( 510 experimental_distribute=DistributeConfig( 511 eval_distribute=mirrored_strategy.CoreMirroredStrategy( 512 ["/device:GPU:0", "/device:GPU:1"]))) 513 self.assertTrue( 514 dc_training.should_run_distribute_coordinator( 515 config_with_train_distribute)) 516 self.assertFalse( 517 dc_training.should_run_distribute_coordinator( 518 config_with_eval_distribute)) 519 520 # With a master in the cluster, don't run distribute coordinator. 521 with test.mock.patch.dict("os.environ", 522 {"TF_CONFIG": json.dumps(TF_CONFIG_WITH_MASTER)}): 523 config = run_config_lib.RunConfig( 524 experimental_distribute=DistributeConfig( 525 train_distribute=mirrored_strategy.CoreMirroredStrategy( 526 ["/device:GPU:0", "/device:GPU:1"]))) 527 self.assertFalse(dc_training.should_run_distribute_coordinator(config)) 528 529 def test_init_run_config_duplicate_distribute(self): 530 with self.assertRaises(ValueError): 531 run_config_lib.RunConfig( 532 train_distribute=mirrored_strategy.CoreMirroredStrategy(), 533 experimental_distribute=DistributeConfig( 534 train_distribute=mirrored_strategy.CoreMirroredStrategy())) 535 536 with self.assertRaises(ValueError): 537 run_config_lib.RunConfig( 538 eval_distribute=mirrored_strategy.CoreMirroredStrategy(), 539 experimental_distribute=DistributeConfig( 540 eval_distribute=mirrored_strategy.CoreMirroredStrategy())) 541 542 def test_init_run_config_none_distribute_coordinator_mode(self): 543 # We don't use distribute coordinator for local training. 544 config = run_config_lib.RunConfig( 545 train_distribute=mirrored_strategy.CoreMirroredStrategy()) 546 dc_training.init_run_config(config, {}) 547 self.assertIsNone(config._distribute_coordinator_mode) 548 549 # With a master in the cluster, don't run distribute coordinator. 550 with test.mock.patch.dict("os.environ", 551 {"TF_CONFIG": json.dumps(TF_CONFIG_WITH_MASTER)}): 552 config = run_config_lib.RunConfig( 553 train_distribute=mirrored_strategy.CoreMirroredStrategy()) 554 self.assertIsNone(config._distribute_coordinator_mode) 555 556 # When `train_distribute` is not specified, don't use distribute 557 # coordinator. 558 with test.mock.patch.dict("os.environ", 559 {"TF_CONFIG": json.dumps(TF_CONFIG_WITH_CHIEF)}): 560 config = run_config_lib.RunConfig() 561 self.assertFalse(hasattr(config, "_distribute_coordinator_mode")) 562 563 def test_init_run_config_independent_worker(self): 564 # When `train_distribute` is specified and TF_CONFIG is detected, use 565 # distribute coordinator with INDEPENDENT_WORKER mode. 566 with test.mock.patch.dict("os.environ", 567 {"TF_CONFIG": json.dumps(TF_CONFIG_WITH_CHIEF)}): 568 config = run_config_lib.RunConfig( 569 train_distribute=mirrored_strategy.CoreMirroredStrategy()) 570 self.assertEqual(config._distribute_coordinator_mode, 571 dc.CoordinatorMode.INDEPENDENT_WORKER) 572 573 def test_init_run_config_standalone_client(self): 574 # When `train_distribute` is specified, TF_CONFIG is detected and 575 # `experimental.remote_cluster` is set use distribute coordinator with 576 # STANDALONE_CLIENT mode. 577 config = run_config_lib.RunConfig( 578 train_distribute=mirrored_strategy.CoreMirroredStrategy(), 579 experimental_distribute=DistributeConfig( 580 remote_cluster={"chief": ["fake_worker"]})) 581 self.assertEqual(config._distribute_coordinator_mode, 582 dc.CoordinatorMode.STANDALONE_CLIENT) 583 584 585if __name__ == "__main__": 586 # Reduce `recovery_wait_secs` from 30 seconds so the test completes quickly. 587 orig_init = session_manager.SessionManager.__init__ 588 589 def new_init(*args, **kwargs): 590 kwargs.pop("recovery_wait_secs", None) 591 kwargs["recovery_wait_secs"] = 0.5 592 orig_init(*args, **kwargs) 593 594 session_manager.SessionManager.__init__ = new_init 595 596 with test.mock.patch.object(sys, "exit", os._exit): 597 test.main() 598