• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Cluster Resolvers are used for dynamic cluster IP/hostname resolution."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import abc
22
23import collections
24
25import six
26
27from tensorflow.python.client import session
28from tensorflow.python.eager import context
29from tensorflow.python.framework import config
30from tensorflow.python.framework import ops
31from tensorflow.python.training.server_lib import ClusterSpec
32from tensorflow.python.util.tf_export import tf_export
33
34
35def format_master_url(master, rpc_layer=None):
36  if rpc_layer:
37    return '%s://%s' % (rpc_layer, master)
38  else:
39    return master
40
41
42def get_accelerator_devices(master, config_proto):
43  """Returns accelerator devices given a master and a configuration."""
44  if context.executing_eagerly():
45    logical_devices = config.list_logical_devices()
46    devices = []
47    for d in logical_devices:
48      if d.device_type == 'CPU' or d.device_type == 'XLA_CPU':  # Filter CPUs
49        continue
50      devices.append(session._DeviceAttributes(d.name, d.device_type, 0, 0))  # pylint: disable=protected-access
51    return devices
52  else:
53    with ops.Graph().as_default():
54      with session.Session(master, config=config_proto) as s:
55        devices = s.list_devices()
56    return devices
57
58
59@tf_export('distribute.cluster_resolver.ClusterResolver')
60@six.add_metaclass(abc.ABCMeta)
61class ClusterResolver(object):
62  """Abstract class for all implementations of ClusterResolvers.
63
64  This defines the skeleton for all implementations of ClusterResolvers.
65  ClusterResolvers are a way for TensorFlow to communicate with various cluster
66  management systems (e.g. GCE, AWS, etc...) and gives TensorFlow necessary
67  information to set up distributed training.
68
69  By letting TensorFlow communicate with these systems, we will be able to
70  automatically discover and resolve IP addresses for various TensorFlow
71  workers. This will eventually allow us to automatically recover from
72  underlying machine failures and scale TensorFlow worker clusters up and down.
73
74  Note to Implementors of `tf.distribute.cluster_resolver.ClusterResolver`
75  subclass: In addition to these abstract methods, when task_type, task_id, and
76  rpc_layer attributes are applicable, you should also implement them either as
77  properties with getters or setters, or directly set the attributes
78  `self._task_type`, `self._task_id`, or `self._rpc_layer` so the base class'
79  getters and setters are used. See
80  `tf.distribute.cluster_resolver.SimpleClusterResolver.__init__` for an
81  example.
82
83  In general, multi-client tf.distribute strategies such as
84  `tf.distribute.experimental.MultiWorkerMirroredStrategy` require task_type and
85  task_id properties to be available in the `ClusterResolver` they are using. On
86  the other hand, these concepts are not applicable in single-client strategies,
87  such as `tf.distribute.experimental.TPUStrategy`, because the program is only
88  expected to be run on one task, so there should not be a need to have code
89  branches according to task type and task id.
90
91  - task_type is the name of the server's current named job (e.g. 'worker',
92     'ps' in a distributed parameterized training job).
93  - task_id is the ordinal index of the server within the task type.
94  - rpc_layer is the protocol used by TensorFlow to communicate with other
95      TensorFlow servers in a distributed environment.
96  """
97
98  @abc.abstractmethod
99  def cluster_spec(self):
100    """Retrieve the current state of the cluster and return a `tf.train.ClusterSpec`.
101
102    Returns:
103      A `tf.train.ClusterSpec` representing the state of the cluster at the
104      moment this function is called.
105
106    Implementors of this function must take care in ensuring that the
107    ClusterSpec returned is up-to-date at the time of calling this function.
108    This usually means retrieving the information from the underlying cluster
109    management system every time this function is invoked and reconstructing
110    a cluster_spec, rather than attempting to cache anything.
111    """
112    raise NotImplementedError()
113
114  @abc.abstractmethod
115  def master(self, task_type=None, task_id=None, rpc_layer=None):
116    """Retrieves the name or URL of the session master.
117
118    Note: this is only useful for TensorFlow 1.x.
119
120    Args:
121      task_type: (Optional) The type of the TensorFlow task of the master.
122      task_id: (Optional) The index of the TensorFlow task of the master.
123      rpc_layer: (Optional) The RPC protocol for the given cluster.
124
125    Returns:
126      The name or URL of the session master.
127
128    Implementors of this function must take care in ensuring that the master
129    returned is up-to-date at the time to calling this function. This usually
130    means retrieving the master every time this function is invoked.
131    """
132    raise NotImplementedError()
133
134  def num_accelerators(self,
135                       task_type=None,
136                       task_id=None,
137                       config_proto=None):
138    """Returns the number of accelerator cores per worker.
139
140    This returns the number of accelerator cores (such as GPUs and TPUs)
141    available per worker.
142
143    Optionally, we allow callers to specify the task_type, and task_id, for
144    if they want to target a specific TensorFlow task to query
145    the number of accelerators. This is to support heterogenous environments,
146    where the number of accelerators cores per host is different.
147
148    Args:
149      task_type: (Optional) The type of the TensorFlow task of the machine we
150        want to query.
151      task_id: (Optional) The index of the TensorFlow task of the machine we
152        want to query.
153      config_proto: (Optional) Configuration for starting a new session to
154        query how many accelerator cores it has.
155
156    Returns:
157      A map of accelerator types to number of cores.
158    """
159    master = self.master(task_type, task_id)
160    # TODO(b/126786766): in eager mode, we should check whether
161    # `tf.config.experimental_connect_to_cluster` is called or not.
162    devices = get_accelerator_devices(master, config_proto)
163    mapping = collections.defaultdict(int)
164    for device in devices:
165      if task_type is not None and task_id is not None:
166        job_path = '/job:%s' % task_type
167        task_path = '/task:%s' % task_id
168        if job_path not in device.name or task_path not in device.name:
169          continue
170      mapping[device.device_type] += 1
171    return mapping
172
173  @property
174  def environment(self):
175    """Returns the current environment which TensorFlow is running in.
176
177    There are two possible return values, "google" (when TensorFlow is running
178    in a Google-internal environment) or an empty string (when TensorFlow is
179    running elsewhere).
180
181    If you are implementing a ClusterResolver that works in both the Google
182    environment and the open-source world (for instance, a TPU ClusterResolver
183    or similar), you will have to return the appropriate string depending on the
184    environment, which you will have to detect.
185
186    Otherwise, if you are implementing a ClusterResolver that will only work
187    in open-source TensorFlow, you do not need to implement this property.
188    """
189    return ''
190
191  @property
192  def task_type(self):
193    """Returns the task type this `ClusterResolver` indicates.
194
195    In TensorFlow distributed environment, each job may have an applicable
196    task type. Valid task types in TensorFlow include
197    'chief': a worker that is designated with more responsibility,
198    'worker': a regular worker for training/evaluation,
199    'ps': a parameter server, or
200    'evaluator': an evaluator that evaluates the checkpoints for metrics.
201
202    See [Multi-worker configuration](
203    https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#multi-worker_configuration)
204    for more information about 'chief' and 'worker' task type, which are most
205    commonly used.
206
207    Having access to such information is useful when user needs to run specific
208    code according to task types. For example,
209
210    ```python
211    cluster_spec = tf.train.ClusterSpec({
212        "ps": ["localhost:2222", "localhost:2223"],
213        "worker": ["localhost:2224", "localhost:2225", "localhost:2226"]
214    })
215
216    # SimpleClusterResolver is used here for illustration; other cluster
217    # resolvers may be used for other source of task type/id.
218    simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker",
219                                            task_id=1)
220
221    ...
222
223    if cluster_resolver.task_type == 'worker':
224      # Perform something that's only applicable on workers. This block
225      # will run on this particular instance since we've specified this task to
226      # be a worker in above cluster resolver.
227    elif cluster_resolver.task_type == 'ps':
228      # Perform something that's only applicable on parameter servers. This
229      # block will not run on this particular instance.
230    ```
231
232    Returns `None` if such information is not available or is not applicable
233    in the current distributed environment, such as training with
234    `tf.distribute.experimental.TPUStrategy`.
235
236    For more information, please see
237    `tf.distribute.cluster_resolver.ClusterResolver`'s class doc.
238    """
239    return getattr(self, '_task_type', None)
240
241  @property
242  def task_id(self):
243    """Returns the task id this `ClusterResolver` indicates.
244
245    In TensorFlow distributed environment, each job may have an applicable
246    task id, which is the index of the instance within its task type. This is
247    useful when user needs to run specific code according to task index. For
248    example,
249
250    ```python
251    cluster_spec = tf.train.ClusterSpec({
252        "ps": ["localhost:2222", "localhost:2223"],
253        "worker": ["localhost:2224", "localhost:2225", "localhost:2226"]
254    })
255
256    # SimpleClusterResolver is used here for illustration; other cluster
257    # resolvers may be used for other source of task type/id.
258    simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker",
259                                            task_id=0)
260
261    ...
262
263    if cluster_resolver.task_type == 'worker' and cluster_resolver.task_id == 0:
264      # Perform something that's only applicable on 'worker' type, id 0. This
265      # block will run on this particular instance since we've specified this
266      # task to be a 'worker', id 0 in above cluster resolver.
267    else:
268      # Perform something that's only applicable on other ids. This block will
269      # not run on this particular instance.
270    ```
271
272    Returns `None` if such information is not available or is not applicable
273    in the current distributed environment, such as training with
274    `tf.distribute.cluster_resolver.TPUClusterResolver`.
275
276    For more information, please see
277    `tf.distribute.cluster_resolver.ClusterResolver`'s class docstring.
278    """
279    return getattr(self, '_task_id', None)
280
281  @task_type.setter
282  def task_type(self, task_type):
283    """Setter of `task_type` property. See `task_type` property doc."""
284    self._task_type = task_type
285
286  @task_id.setter
287  def task_id(self, task_id):
288    """Setter of `task_id` property. See `task_type` property doc."""
289    self._task_id = task_id
290
291
292@tf_export('distribute.cluster_resolver.SimpleClusterResolver')
293class SimpleClusterResolver(ClusterResolver):
294  """Simple implementation of ClusterResolver that accepts all attributes.
295
296  Please see the base class for documentation of arguments of its constructor.
297
298  It is useful if you want to specify some or all attributes.
299
300  Usage example with `tf.distribute.Strategy`:
301
302    ```Python
303    cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222",
304                                               "worker1.example.com:2222"]})
305
306    # On worker 0
307    cluster_resolver = SimpleClusterResolver(cluster, task_type="worker",
308                                             task_id=0,
309                                             num_accelerators={"GPU": 8},
310                                             rpc_layer="grpc")
311    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
312        cluster_resolver=cluster_resolver)
313
314    # On worker 1
315    cluster_resolver = SimpleClusterResolver(cluster, task_type="worker",
316                                             task_id=1,
317                                             num_accelerators={"GPU": 8},
318                                             rpc_layer="grpc")
319    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
320        cluster_resolver=cluster_resolver)
321    ```
322  """
323
324  def __init__(self, cluster_spec, master='', task_type=None, task_id=None,
325               environment='', num_accelerators=None,
326               rpc_layer=None):
327    """Creates a SimpleClusterResolver from a ClusterSpec."""
328    super(SimpleClusterResolver, self).__init__()
329
330    self._task_type = task_type
331    self._task_id = task_id
332    self._environment = environment
333
334    self._num_accelerators = num_accelerators
335    self._rpc_layer = rpc_layer
336
337    if not isinstance(cluster_spec, ClusterSpec):
338      raise TypeError('cluster_spec must be a `tf.train.ClusterSpec`.')
339    self._cluster_spec = cluster_spec
340
341    if not isinstance(master, str):
342      raise TypeError('master must be a string.')
343    self._master = master
344
345  def cluster_spec(self):
346    """Returns the ClusterSpec passed into the constructor."""
347    return self._cluster_spec
348
349  def master(self, task_type=None, task_id=None, rpc_layer=None):
350    """Returns the master address to use when creating a session.
351
352    Note: this is only useful for TensorFlow 1.x.
353
354    Args:
355      task_type: (Optional) The type of the TensorFlow task of the master.
356      task_id: (Optional) The index of the TensorFlow task of the master.
357      rpc_layer: (Optional) The RPC used by distributed TensorFlow.
358
359    Returns:
360      The name or URL of the session master.
361
362    If a task_type and task_id is given, this will override the `master`
363    string passed into the initialization function.
364    """
365    if task_type is not None and task_id is not None:
366      master = self.cluster_spec().task_address(task_type, task_id)
367    else:
368      master = self._master
369
370    return format_master_url(master, rpc_layer=rpc_layer or self._rpc_layer)
371
372  @property
373  def task_type(self):
374    return self._task_type
375
376  @property
377  def task_id(self):
378    return self._task_id
379
380  @task_type.setter
381  def task_type(self, task_type):
382    self._task_type = task_type
383
384  @task_id.setter
385  def task_id(self, task_id):
386    self._task_id = task_id
387
388  @property
389  def environment(self):
390    return self._environment
391
392  def num_accelerators(self,
393                       task_type=None,
394                       task_id=None,
395                       config_proto=None):
396    """Returns the number of accelerator cores per worker.
397
398    The SimpleClusterResolver does not do automatic detection of accelerators,
399    and thus all arguments are unused and we simply return the value provided
400    in the constructor.
401
402    Args:
403      task_type: Unused.
404      task_id: Unused.
405      config_proto: Unused.
406    """
407    # Unused
408    del task_type, task_id, config_proto
409    if self._num_accelerators is None:
410      return {}
411    return self._num_accelerators
412
413  @property
414  def rpc_layer(self):
415    return self._rpc_layer
416
417  @rpc_layer.setter
418  def rpc_layer(self, rpc_layer):
419    self._rpc_layer = rpc_layer
420
421
422@tf_export('distribute.cluster_resolver.UnionResolver')
423class UnionClusterResolver(ClusterResolver):
424  """Performs a union on underlying ClusterResolvers.
425
426  This class performs a union given two or more existing ClusterResolvers. It
427  merges the underlying ClusterResolvers, and returns one unified ClusterSpec
428  when cluster_spec is called. The details of the merge function is
429  documented in the cluster_spec function.
430
431  For additional ClusterResolver properties such as task type, task index,
432  rpc layer, environment, etc..., we will return the value from the first
433  ClusterResolver in the union.
434
435  An example to combine two cluster resolvers:
436
437    ```Python
438    cluster_0 = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222",
439                                                 "worker1.example.com:2222"]})
440    cluster_resolver_0 = SimpleClusterResolver(cluster, task_type="worker",
441                                               task_id=0,
442                                               rpc_layer="grpc")
443
444    cluster_1 = tf.train.ClusterSpec({"ps": ["ps0.example.com:2222",
445                                             "ps1.example.com:2222"]})
446    cluster_resolver_1 = SimpleClusterResolver(cluster, task_type="ps",
447                                               task_id=0,
448                                               rpc_layer="grpc")
449
450    # Its task type would be "worker".
451    cluster_resolver = UnionClusterResolver(cluster_resolver_0,
452                                            cluster_resolver_1)
453    ```
454
455  An example to override the number of GPUs in a TFConfigClusterResolver
456  instance:
457
458    ```Python
459    tf_config = TFConfigClusterResolver()
460    gpu_override = SimpleClusterResolver(tf_config.cluster_spec(),
461                                         num_accelerators={"GPU": 1})
462    cluster_resolver = UnionResolver(gpu_override, tf_config)
463    ```
464  """
465
466  def __init__(self, *args, **kwargs):
467    """Initializes a UnionClusterResolver with other ClusterResolvers.
468
469    Args:
470      *args: `ClusterResolver` objects to be unionized.
471      **kwargs:
472        rpc_layer - (Optional) Override value for the RPC layer used by
473          TensorFlow.
474        task_type - (Optional) Override value for the current task type.
475        task_id - (Optional) Override value for the current task index.
476
477    Raises:
478      TypeError: If any argument is not a subclass of `ClusterResolvers`.
479      ValueError: If there are no arguments passed.
480    """
481    super(UnionClusterResolver, self).__init__()
482
483    self._rpc_layer = kwargs.pop('rpc_layer', None)
484    self._task_type = kwargs.pop('task_type', None)
485    self._task_id = kwargs.pop('task_id', None)
486
487    if kwargs:
488      raise ValueError('Unexpected kwargs provided {!r}'.format(kwargs))
489
490    if not args:
491      raise ValueError('At least one ClusterResolver is required.')
492
493    for cluster_resolver in args:
494      if not isinstance(cluster_resolver, ClusterResolver):
495        raise TypeError('All arguments must be a sub-class of '
496                        '`ClusterResolver.`')
497    self._cluster_resolvers = args
498
499  def cluster_spec(self):
500    """Returns a union of all the ClusterSpecs from the ClusterResolvers.
501
502    Returns:
503      A ClusterSpec containing host information merged from all the underlying
504      ClusterResolvers.
505
506    Raises:
507      KeyError: If there are conflicting keys detected when merging two or
508      more dictionaries, this exception is raised.
509
510    Note: If there are multiple ClusterResolvers exposing ClusterSpecs with the
511    same job name, we will merge the list/dict of workers.
512
513    If *all* underlying ClusterSpecs expose the set of workers as lists, we will
514    concatenate the lists of workers, starting with the list of workers from
515    the first ClusterResolver passed into the constructor.
516
517    If *any* of the ClusterSpecs expose the set of workers as a dict, we will
518    treat all the sets of workers as dicts (even if they are returned as lists)
519    and will only merge them into a dict if there is no conflicting keys. If
520    there is a conflicting key, we will raise a `KeyError`.
521    """
522
523    merged_cluster = {}
524
525    # We figure out whether it is all lists for a particular job, or whether
526    # there are dicts inside.
527    for cluster_resolver in self._cluster_resolvers:
528      cluster_spec = cluster_resolver.cluster_spec()
529      cluster_dict = cluster_spec.as_dict()
530
531      for job_name, tasks in cluster_dict.items():
532        if job_name in merged_cluster:
533          # If we see a dict, then we write a dict out regardless.
534          if isinstance(tasks, dict):
535            merged_cluster[job_name] = {}
536        else:
537          # We take whichever type is present.
538          if isinstance(tasks, list):
539            merged_cluster[job_name] = []
540          else:
541            merged_cluster[job_name] = {}
542
543    # We then do the merge as appropriate in merged_cluster[job].
544    for cluster_resolver in self._cluster_resolvers:
545      cluster_spec = cluster_resolver.cluster_spec()
546      cluster_dict = cluster_spec.as_dict()
547
548      for job_name, tasks in cluster_dict.items():
549        if isinstance(merged_cluster[job_name], list):
550          # We all have lists, we can just concatenate and be done.
551          merged_cluster[job_name].extend(tasks)
552        else:
553          if isinstance(tasks, list):
554            # We convert to a dictionary if the type is a list.
555            task_dict = dict(zip(range(0, len(tasks)), tasks))
556          else:
557            # We can simply make a copy (for update) and be done.
558            task_dict = tasks.copy()
559
560          # We detect if there are duplicates, and raise an error if so.
561          task_keys = set(task_dict)
562          merged_keys = set(merged_cluster[job_name].keys())
563          intersected_keys = task_keys.intersection(merged_keys)
564          if intersected_keys:
565            raise KeyError('Duplicate keys detected when merging two '
566                           'ClusterSpecs: %s' % repr(intersected_keys))
567
568          # We do the merge after all the processing.
569          merged_cluster[job_name].update(task_dict)
570
571    return ClusterSpec(merged_cluster)
572
573  def master(self, task_type=None, task_id=None, rpc_layer=None):
574    """Returns the master address to use when creating a session.
575
576    This usually returns the master from the first ClusterResolver passed in,
577    but you can override this by specifying the task_type and task_id.
578
579    Note: this is only useful for TensorFlow 1.x.
580
581    Args:
582      task_type: (Optional) The type of the TensorFlow task of the master.
583      task_id: (Optional) The index of the TensorFlow task of the master.
584      rpc_layer: (Optional) The RPC protocol for the given cluster.
585
586    Returns:
587      The name or URL of the session master.
588    """
589    if task_type is not None and task_id is not None:
590      master = self.cluster_spec().task_address(task_type, task_id)
591      return format_master_url(master, rpc_layer or self._rpc_layer)
592
593    return self._cluster_resolvers[0].master(rpc_layer=rpc_layer)
594
595  @property
596  def task_type(self):
597    return self._task_type or self._cluster_resolvers[0].task_type
598
599  @property
600  def task_id(self):
601    return self._task_id or self._cluster_resolvers[0].task_id
602
603  @task_type.setter
604  def task_type(self, task_type):
605    self._task_type = task_type
606
607  @task_id.setter
608  def task_id(self, task_id):
609    self._task_id = task_id
610
611  @property
612  def environment(self):
613    return self._cluster_resolvers[0].environment
614
615  def num_accelerators(self,
616                       task_type=None,
617                       task_id=None,
618                       config_proto=None):
619    return self._cluster_resolvers[0].num_accelerators(
620        task_type, task_id, config_proto)
621
622  @property
623  def rpc_layer(self):
624    return self._rpc_layer or self._cluster_resolvers[0].rpc_layer
625
626  @rpc_layer.setter
627  def rpc_layer(self, rpc_layer):
628    self._rpc_layer = rpc_layer
629