1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Cluster Resolvers are used for dynamic cluster IP/hostname resolution.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import abc 22 23import collections 24 25import six 26 27from tensorflow.python.client import session 28from tensorflow.python.eager import context 29from tensorflow.python.framework import config 30from tensorflow.python.framework import ops 31from tensorflow.python.training.server_lib import ClusterSpec 32from tensorflow.python.util.tf_export import tf_export 33 34 35def format_master_url(master, rpc_layer=None): 36 if rpc_layer: 37 return '%s://%s' % (rpc_layer, master) 38 else: 39 return master 40 41 42def get_accelerator_devices(master, config_proto): 43 """Returns accelerator devices given a master and a configuration.""" 44 if context.executing_eagerly(): 45 logical_devices = config.list_logical_devices() 46 devices = [] 47 for d in logical_devices: 48 if d.device_type == 'CPU' or d.device_type == 'XLA_CPU': # Filter CPUs 49 continue 50 devices.append(session._DeviceAttributes(d.name, d.device_type, 0, 0)) # pylint: disable=protected-access 51 return devices 52 else: 53 with ops.Graph().as_default(): 54 with session.Session(master, config=config_proto) as s: 55 devices = s.list_devices() 56 return devices 57 58 59@tf_export('distribute.cluster_resolver.ClusterResolver') 60@six.add_metaclass(abc.ABCMeta) 61class ClusterResolver(object): 62 """Abstract class for all implementations of ClusterResolvers. 63 64 This defines the skeleton for all implementations of ClusterResolvers. 65 ClusterResolvers are a way for TensorFlow to communicate with various cluster 66 management systems (e.g. GCE, AWS, etc...) and gives TensorFlow necessary 67 information to set up distributed training. 68 69 By letting TensorFlow communicate with these systems, we will be able to 70 automatically discover and resolve IP addresses for various TensorFlow 71 workers. This will eventually allow us to automatically recover from 72 underlying machine failures and scale TensorFlow worker clusters up and down. 73 74 Note to Implementors of `tf.distribute.cluster_resolver.ClusterResolver` 75 subclass: In addition to these abstract methods, when task_type, task_id, and 76 rpc_layer attributes are applicable, you should also implement them either as 77 properties with getters or setters, or directly set the attributes 78 `self._task_type`, `self._task_id`, or `self._rpc_layer` so the base class' 79 getters and setters are used. See 80 `tf.distribute.cluster_resolver.SimpleClusterResolver.__init__` for an 81 example. 82 83 In general, multi-client tf.distribute strategies such as 84 `tf.distribute.experimental.MultiWorkerMirroredStrategy` require task_type and 85 task_id properties to be available in the `ClusterResolver` they are using. On 86 the other hand, these concepts are not applicable in single-client strategies, 87 such as `tf.distribute.experimental.TPUStrategy`, because the program is only 88 expected to be run on one task, so there should not be a need to have code 89 branches according to task type and task id. 90 91 - task_type is the name of the server's current named job (e.g. 'worker', 92 'ps' in a distributed parameterized training job). 93 - task_id is the ordinal index of the server within the task type. 94 - rpc_layer is the protocol used by TensorFlow to communicate with other 95 TensorFlow servers in a distributed environment. 96 """ 97 98 @abc.abstractmethod 99 def cluster_spec(self): 100 """Retrieve the current state of the cluster and return a `tf.train.ClusterSpec`. 101 102 Returns: 103 A `tf.train.ClusterSpec` representing the state of the cluster at the 104 moment this function is called. 105 106 Implementors of this function must take care in ensuring that the 107 ClusterSpec returned is up-to-date at the time of calling this function. 108 This usually means retrieving the information from the underlying cluster 109 management system every time this function is invoked and reconstructing 110 a cluster_spec, rather than attempting to cache anything. 111 """ 112 raise NotImplementedError() 113 114 @abc.abstractmethod 115 def master(self, task_type=None, task_id=None, rpc_layer=None): 116 """Retrieves the name or URL of the session master. 117 118 Note: this is only useful for TensorFlow 1.x. 119 120 Args: 121 task_type: (Optional) The type of the TensorFlow task of the master. 122 task_id: (Optional) The index of the TensorFlow task of the master. 123 rpc_layer: (Optional) The RPC protocol for the given cluster. 124 125 Returns: 126 The name or URL of the session master. 127 128 Implementors of this function must take care in ensuring that the master 129 returned is up-to-date at the time to calling this function. This usually 130 means retrieving the master every time this function is invoked. 131 """ 132 raise NotImplementedError() 133 134 def num_accelerators(self, 135 task_type=None, 136 task_id=None, 137 config_proto=None): 138 """Returns the number of accelerator cores per worker. 139 140 This returns the number of accelerator cores (such as GPUs and TPUs) 141 available per worker. 142 143 Optionally, we allow callers to specify the task_type, and task_id, for 144 if they want to target a specific TensorFlow task to query 145 the number of accelerators. This is to support heterogenous environments, 146 where the number of accelerators cores per host is different. 147 148 Args: 149 task_type: (Optional) The type of the TensorFlow task of the machine we 150 want to query. 151 task_id: (Optional) The index of the TensorFlow task of the machine we 152 want to query. 153 config_proto: (Optional) Configuration for starting a new session to 154 query how many accelerator cores it has. 155 156 Returns: 157 A map of accelerator types to number of cores. 158 """ 159 master = self.master(task_type, task_id) 160 # TODO(b/126786766): in eager mode, we should check whether 161 # `tf.config.experimental_connect_to_cluster` is called or not. 162 devices = get_accelerator_devices(master, config_proto) 163 mapping = collections.defaultdict(int) 164 for device in devices: 165 if task_type is not None and task_id is not None: 166 job_path = '/job:%s' % task_type 167 task_path = '/task:%s' % task_id 168 if job_path not in device.name or task_path not in device.name: 169 continue 170 mapping[device.device_type] += 1 171 return mapping 172 173 @property 174 def environment(self): 175 """Returns the current environment which TensorFlow is running in. 176 177 There are two possible return values, "google" (when TensorFlow is running 178 in a Google-internal environment) or an empty string (when TensorFlow is 179 running elsewhere). 180 181 If you are implementing a ClusterResolver that works in both the Google 182 environment and the open-source world (for instance, a TPU ClusterResolver 183 or similar), you will have to return the appropriate string depending on the 184 environment, which you will have to detect. 185 186 Otherwise, if you are implementing a ClusterResolver that will only work 187 in open-source TensorFlow, you do not need to implement this property. 188 """ 189 return '' 190 191 @property 192 def task_type(self): 193 """Returns the task type this `ClusterResolver` indicates. 194 195 In TensorFlow distributed environment, each job may have an applicable 196 task type. Valid task types in TensorFlow include 197 'chief': a worker that is designated with more responsibility, 198 'worker': a regular worker for training/evaluation, 199 'ps': a parameter server, or 200 'evaluator': an evaluator that evaluates the checkpoints for metrics. 201 202 See [Multi-worker configuration]( 203 https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#multi-worker_configuration) 204 for more information about 'chief' and 'worker' task type, which are most 205 commonly used. 206 207 Having access to such information is useful when user needs to run specific 208 code according to task types. For example, 209 210 ```python 211 cluster_spec = tf.train.ClusterSpec({ 212 "ps": ["localhost:2222", "localhost:2223"], 213 "worker": ["localhost:2224", "localhost:2225", "localhost:2226"] 214 }) 215 216 # SimpleClusterResolver is used here for illustration; other cluster 217 # resolvers may be used for other source of task type/id. 218 simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker", 219 task_id=1) 220 221 ... 222 223 if cluster_resolver.task_type == 'worker': 224 # Perform something that's only applicable on workers. This block 225 # will run on this particular instance since we've specified this task to 226 # be a worker in above cluster resolver. 227 elif cluster_resolver.task_type == 'ps': 228 # Perform something that's only applicable on parameter servers. This 229 # block will not run on this particular instance. 230 ``` 231 232 Returns `None` if such information is not available or is not applicable 233 in the current distributed environment, such as training with 234 `tf.distribute.experimental.TPUStrategy`. 235 236 For more information, please see 237 `tf.distribute.cluster_resolver.ClusterResolver`'s class doc. 238 """ 239 return getattr(self, '_task_type', None) 240 241 @property 242 def task_id(self): 243 """Returns the task id this `ClusterResolver` indicates. 244 245 In TensorFlow distributed environment, each job may have an applicable 246 task id, which is the index of the instance within its task type. This is 247 useful when user needs to run specific code according to task index. For 248 example, 249 250 ```python 251 cluster_spec = tf.train.ClusterSpec({ 252 "ps": ["localhost:2222", "localhost:2223"], 253 "worker": ["localhost:2224", "localhost:2225", "localhost:2226"] 254 }) 255 256 # SimpleClusterResolver is used here for illustration; other cluster 257 # resolvers may be used for other source of task type/id. 258 simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker", 259 task_id=0) 260 261 ... 262 263 if cluster_resolver.task_type == 'worker' and cluster_resolver.task_id == 0: 264 # Perform something that's only applicable on 'worker' type, id 0. This 265 # block will run on this particular instance since we've specified this 266 # task to be a 'worker', id 0 in above cluster resolver. 267 else: 268 # Perform something that's only applicable on other ids. This block will 269 # not run on this particular instance. 270 ``` 271 272 Returns `None` if such information is not available or is not applicable 273 in the current distributed environment, such as training with 274 `tf.distribute.cluster_resolver.TPUClusterResolver`. 275 276 For more information, please see 277 `tf.distribute.cluster_resolver.ClusterResolver`'s class docstring. 278 """ 279 return getattr(self, '_task_id', None) 280 281 @task_type.setter 282 def task_type(self, task_type): 283 """Setter of `task_type` property. See `task_type` property doc.""" 284 self._task_type = task_type 285 286 @task_id.setter 287 def task_id(self, task_id): 288 """Setter of `task_id` property. See `task_type` property doc.""" 289 self._task_id = task_id 290 291 292@tf_export('distribute.cluster_resolver.SimpleClusterResolver') 293class SimpleClusterResolver(ClusterResolver): 294 """Simple implementation of ClusterResolver that accepts all attributes. 295 296 Please see the base class for documentation of arguments of its constructor. 297 298 It is useful if you want to specify some or all attributes. 299 300 Usage example with `tf.distribute.Strategy`: 301 302 ```Python 303 cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222", 304 "worker1.example.com:2222"]}) 305 306 # On worker 0 307 cluster_resolver = SimpleClusterResolver(cluster, task_type="worker", 308 task_id=0, 309 num_accelerators={"GPU": 8}, 310 rpc_layer="grpc") 311 strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( 312 cluster_resolver=cluster_resolver) 313 314 # On worker 1 315 cluster_resolver = SimpleClusterResolver(cluster, task_type="worker", 316 task_id=1, 317 num_accelerators={"GPU": 8}, 318 rpc_layer="grpc") 319 strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( 320 cluster_resolver=cluster_resolver) 321 ``` 322 """ 323 324 def __init__(self, cluster_spec, master='', task_type=None, task_id=None, 325 environment='', num_accelerators=None, 326 rpc_layer=None): 327 """Creates a SimpleClusterResolver from a ClusterSpec.""" 328 super(SimpleClusterResolver, self).__init__() 329 330 self._task_type = task_type 331 self._task_id = task_id 332 self._environment = environment 333 334 self._num_accelerators = num_accelerators 335 self._rpc_layer = rpc_layer 336 337 if not isinstance(cluster_spec, ClusterSpec): 338 raise TypeError('cluster_spec must be a `tf.train.ClusterSpec`.') 339 self._cluster_spec = cluster_spec 340 341 if not isinstance(master, str): 342 raise TypeError('master must be a string.') 343 self._master = master 344 345 def cluster_spec(self): 346 """Returns the ClusterSpec passed into the constructor.""" 347 return self._cluster_spec 348 349 def master(self, task_type=None, task_id=None, rpc_layer=None): 350 """Returns the master address to use when creating a session. 351 352 Note: this is only useful for TensorFlow 1.x. 353 354 Args: 355 task_type: (Optional) The type of the TensorFlow task of the master. 356 task_id: (Optional) The index of the TensorFlow task of the master. 357 rpc_layer: (Optional) The RPC used by distributed TensorFlow. 358 359 Returns: 360 The name or URL of the session master. 361 362 If a task_type and task_id is given, this will override the `master` 363 string passed into the initialization function. 364 """ 365 if task_type is not None and task_id is not None: 366 master = self.cluster_spec().task_address(task_type, task_id) 367 else: 368 master = self._master 369 370 return format_master_url(master, rpc_layer=rpc_layer or self._rpc_layer) 371 372 @property 373 def task_type(self): 374 return self._task_type 375 376 @property 377 def task_id(self): 378 return self._task_id 379 380 @task_type.setter 381 def task_type(self, task_type): 382 self._task_type = task_type 383 384 @task_id.setter 385 def task_id(self, task_id): 386 self._task_id = task_id 387 388 @property 389 def environment(self): 390 return self._environment 391 392 def num_accelerators(self, 393 task_type=None, 394 task_id=None, 395 config_proto=None): 396 """Returns the number of accelerator cores per worker. 397 398 The SimpleClusterResolver does not do automatic detection of accelerators, 399 and thus all arguments are unused and we simply return the value provided 400 in the constructor. 401 402 Args: 403 task_type: Unused. 404 task_id: Unused. 405 config_proto: Unused. 406 """ 407 # Unused 408 del task_type, task_id, config_proto 409 if self._num_accelerators is None: 410 return {} 411 return self._num_accelerators 412 413 @property 414 def rpc_layer(self): 415 return self._rpc_layer 416 417 @rpc_layer.setter 418 def rpc_layer(self, rpc_layer): 419 self._rpc_layer = rpc_layer 420 421 422@tf_export('distribute.cluster_resolver.UnionResolver') 423class UnionClusterResolver(ClusterResolver): 424 """Performs a union on underlying ClusterResolvers. 425 426 This class performs a union given two or more existing ClusterResolvers. It 427 merges the underlying ClusterResolvers, and returns one unified ClusterSpec 428 when cluster_spec is called. The details of the merge function is 429 documented in the cluster_spec function. 430 431 For additional ClusterResolver properties such as task type, task index, 432 rpc layer, environment, etc..., we will return the value from the first 433 ClusterResolver in the union. 434 435 An example to combine two cluster resolvers: 436 437 ```Python 438 cluster_0 = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222", 439 "worker1.example.com:2222"]}) 440 cluster_resolver_0 = SimpleClusterResolver(cluster, task_type="worker", 441 task_id=0, 442 rpc_layer="grpc") 443 444 cluster_1 = tf.train.ClusterSpec({"ps": ["ps0.example.com:2222", 445 "ps1.example.com:2222"]}) 446 cluster_resolver_1 = SimpleClusterResolver(cluster, task_type="ps", 447 task_id=0, 448 rpc_layer="grpc") 449 450 # Its task type would be "worker". 451 cluster_resolver = UnionClusterResolver(cluster_resolver_0, 452 cluster_resolver_1) 453 ``` 454 455 An example to override the number of GPUs in a TFConfigClusterResolver 456 instance: 457 458 ```Python 459 tf_config = TFConfigClusterResolver() 460 gpu_override = SimpleClusterResolver(tf_config.cluster_spec(), 461 num_accelerators={"GPU": 1}) 462 cluster_resolver = UnionResolver(gpu_override, tf_config) 463 ``` 464 """ 465 466 def __init__(self, *args, **kwargs): 467 """Initializes a UnionClusterResolver with other ClusterResolvers. 468 469 Args: 470 *args: `ClusterResolver` objects to be unionized. 471 **kwargs: 472 rpc_layer - (Optional) Override value for the RPC layer used by 473 TensorFlow. 474 task_type - (Optional) Override value for the current task type. 475 task_id - (Optional) Override value for the current task index. 476 477 Raises: 478 TypeError: If any argument is not a subclass of `ClusterResolvers`. 479 ValueError: If there are no arguments passed. 480 """ 481 super(UnionClusterResolver, self).__init__() 482 483 self._rpc_layer = kwargs.pop('rpc_layer', None) 484 self._task_type = kwargs.pop('task_type', None) 485 self._task_id = kwargs.pop('task_id', None) 486 487 if kwargs: 488 raise ValueError('Unexpected kwargs provided {!r}'.format(kwargs)) 489 490 if not args: 491 raise ValueError('At least one ClusterResolver is required.') 492 493 for cluster_resolver in args: 494 if not isinstance(cluster_resolver, ClusterResolver): 495 raise TypeError('All arguments must be a sub-class of ' 496 '`ClusterResolver.`') 497 self._cluster_resolvers = args 498 499 def cluster_spec(self): 500 """Returns a union of all the ClusterSpecs from the ClusterResolvers. 501 502 Returns: 503 A ClusterSpec containing host information merged from all the underlying 504 ClusterResolvers. 505 506 Raises: 507 KeyError: If there are conflicting keys detected when merging two or 508 more dictionaries, this exception is raised. 509 510 Note: If there are multiple ClusterResolvers exposing ClusterSpecs with the 511 same job name, we will merge the list/dict of workers. 512 513 If *all* underlying ClusterSpecs expose the set of workers as lists, we will 514 concatenate the lists of workers, starting with the list of workers from 515 the first ClusterResolver passed into the constructor. 516 517 If *any* of the ClusterSpecs expose the set of workers as a dict, we will 518 treat all the sets of workers as dicts (even if they are returned as lists) 519 and will only merge them into a dict if there is no conflicting keys. If 520 there is a conflicting key, we will raise a `KeyError`. 521 """ 522 523 merged_cluster = {} 524 525 # We figure out whether it is all lists for a particular job, or whether 526 # there are dicts inside. 527 for cluster_resolver in self._cluster_resolvers: 528 cluster_spec = cluster_resolver.cluster_spec() 529 cluster_dict = cluster_spec.as_dict() 530 531 for job_name, tasks in cluster_dict.items(): 532 if job_name in merged_cluster: 533 # If we see a dict, then we write a dict out regardless. 534 if isinstance(tasks, dict): 535 merged_cluster[job_name] = {} 536 else: 537 # We take whichever type is present. 538 if isinstance(tasks, list): 539 merged_cluster[job_name] = [] 540 else: 541 merged_cluster[job_name] = {} 542 543 # We then do the merge as appropriate in merged_cluster[job]. 544 for cluster_resolver in self._cluster_resolvers: 545 cluster_spec = cluster_resolver.cluster_spec() 546 cluster_dict = cluster_spec.as_dict() 547 548 for job_name, tasks in cluster_dict.items(): 549 if isinstance(merged_cluster[job_name], list): 550 # We all have lists, we can just concatenate and be done. 551 merged_cluster[job_name].extend(tasks) 552 else: 553 if isinstance(tasks, list): 554 # We convert to a dictionary if the type is a list. 555 task_dict = dict(zip(range(0, len(tasks)), tasks)) 556 else: 557 # We can simply make a copy (for update) and be done. 558 task_dict = tasks.copy() 559 560 # We detect if there are duplicates, and raise an error if so. 561 task_keys = set(task_dict) 562 merged_keys = set(merged_cluster[job_name].keys()) 563 intersected_keys = task_keys.intersection(merged_keys) 564 if intersected_keys: 565 raise KeyError('Duplicate keys detected when merging two ' 566 'ClusterSpecs: %s' % repr(intersected_keys)) 567 568 # We do the merge after all the processing. 569 merged_cluster[job_name].update(task_dict) 570 571 return ClusterSpec(merged_cluster) 572 573 def master(self, task_type=None, task_id=None, rpc_layer=None): 574 """Returns the master address to use when creating a session. 575 576 This usually returns the master from the first ClusterResolver passed in, 577 but you can override this by specifying the task_type and task_id. 578 579 Note: this is only useful for TensorFlow 1.x. 580 581 Args: 582 task_type: (Optional) The type of the TensorFlow task of the master. 583 task_id: (Optional) The index of the TensorFlow task of the master. 584 rpc_layer: (Optional) The RPC protocol for the given cluster. 585 586 Returns: 587 The name or URL of the session master. 588 """ 589 if task_type is not None and task_id is not None: 590 master = self.cluster_spec().task_address(task_type, task_id) 591 return format_master_url(master, rpc_layer or self._rpc_layer) 592 593 return self._cluster_resolvers[0].master(rpc_layer=rpc_layer) 594 595 @property 596 def task_type(self): 597 return self._task_type or self._cluster_resolvers[0].task_type 598 599 @property 600 def task_id(self): 601 return self._task_id or self._cluster_resolvers[0].task_id 602 603 @task_type.setter 604 def task_type(self, task_type): 605 self._task_type = task_type 606 607 @task_id.setter 608 def task_id(self, task_id): 609 self._task_id = task_id 610 611 @property 612 def environment(self): 613 return self._cluster_resolvers[0].environment 614 615 def num_accelerators(self, 616 task_type=None, 617 task_id=None, 618 config_proto=None): 619 return self._cluster_resolvers[0].num_accelerators( 620 task_type, task_id, config_proto) 621 622 @property 623 def rpc_layer(self): 624 return self._rpc_layer or self._cluster_resolvers[0].rpc_layer 625 626 @rpc_layer.setter 627 def rpc_layer(self, rpc_layer): 628 self._rpc_layer = rpc_layer 629