• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Strategy and optimizer combinations for combinations.combine()."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21from tensorflow.python import tf2
22from tensorflow.python.distribute import central_storage_strategy
23from tensorflow.python.distribute import combinations
24from tensorflow.python.distribute import distribution_strategy_context
25from tensorflow.python.distribute import mirrored_strategy as mirrored_lib
26from tensorflow.python.distribute import one_device_strategy as one_device_lib
27from tensorflow.python.distribute import tpu_strategy as tpu_lib
28from tensorflow.python.distribute.cluster_resolver import tpu_cluster_resolver
29from tensorflow.python.eager import context
30from tensorflow.python.eager import remote
31from tensorflow.python.framework import config
32from tensorflow.python.keras.optimizer_v2 import adadelta as adadelta_keras_v2
33from tensorflow.python.keras.optimizer_v2 import adagrad as adagrad_keras_v2
34from tensorflow.python.keras.optimizer_v2 import adam as adam_keras_v2
35from tensorflow.python.keras.optimizer_v2 import adamax as adamax_keras_v2
36from tensorflow.python.keras.optimizer_v2 import ftrl as ftrl_keras_v2
37from tensorflow.python.keras.optimizer_v2 import gradient_descent as gradient_descent_keras_v2
38from tensorflow.python.keras.optimizer_v2 import nadam as nadam_keras_v2
39from tensorflow.python.keras.optimizer_v2 import rmsprop as rmsprop_keras_v2
40from tensorflow.python.platform import flags
41from tensorflow.python.tpu import device_assignment as device_assignment_lib
42from tensorflow.python.tpu import tpu_strategy_util
43from tensorflow.python.training import adagrad
44from tensorflow.python.training import adam
45from tensorflow.python.training import ftrl
46from tensorflow.python.training import gradient_descent
47from tensorflow.python.training import rmsprop
48
49
50FLAGS = flags.FLAGS
51
52_did_connect_to_cluster = False
53
54
55# pylint: disable=missing-docstring
56def _get_tpu_strategy_creator(steps_per_run, use_single_core=False, **kwargs):
57  def _create_tpu_strategy():
58    global _did_connect_to_cluster
59
60    # These flags will be defined by tpu_test_wrapper.py.
61    resolver = tpu_cluster_resolver.TPUClusterResolver(
62        tpu=hasattr(FLAGS, "tpu") and FLAGS.tpu or "",
63        zone=hasattr(FLAGS, "zone") and FLAGS.zone or None,
64        project=hasattr(FLAGS, "project") and FLAGS.project or None,
65    )
66    # Only connect once per process, rather than per test method.
67    if hasattr(FLAGS, "tpu") and FLAGS.tpu and not _did_connect_to_cluster:
68      remote.connect_to_cluster(resolver)
69      _did_connect_to_cluster = True
70
71    topology = tpu_strategy_util.initialize_tpu_system(resolver)
72    device_assignment = None
73    if use_single_core:
74      device_assignment = device_assignment_lib.DeviceAssignment(
75          topology, core_assignment=device_assignment_lib.
76          SINGLE_CORE_ASSIGNMENT)
77
78    # Steps per run is only supported in TF 1.x
79    if tf2.enabled():
80      return tpu_lib.TPUStrategy(resolver, device_assignment, **kwargs)
81    else:
82      return tpu_lib.TPUStrategyV1(resolver, steps_per_run,
83                                   device_assignment, **kwargs)
84  return _create_tpu_strategy
85
86
87# pylint: disable=g-long-lambda
88default_strategy = combinations.NamedDistribution(
89    "Default",
90    distribution_strategy_context._get_default_strategy,  # pylint: disable=protected-access
91    required_gpus=None)
92one_device_strategy = combinations.NamedDistribution(
93    "OneDeviceCPU",
94    lambda: one_device_lib.OneDeviceStrategy("/cpu:0"),
95    required_gpus=None)
96one_device_strategy_gpu = combinations.NamedDistribution(
97    "OneDeviceGPU",
98    lambda: one_device_lib.OneDeviceStrategy("/gpu:0"),
99    required_gpus=1)
100one_device_strategy_on_worker_1 = combinations.NamedDistribution(
101    "OneDeviceOnWorker1CPU",
102    lambda: one_device_lib.OneDeviceStrategy("/job:worker/replica:0/task:1/cpu:0"),  # pylint: disable=line-too-long
103    required_gpus=None)
104one_device_strategy_gpu_on_worker_1 = combinations.NamedDistribution(
105    "OneDeviceOnWorker1GPU",
106    lambda: one_device_lib.OneDeviceStrategy("/job:worker/replica:0/task:1/gpu:0"),  # pylint: disable=line-too-long
107    required_gpus=1)
108tpu_strategy = combinations.NamedDistribution(
109    "TPU", _get_tpu_strategy_creator(steps_per_run=2), required_tpu=True)
110tpu_strategy_one_step = combinations.NamedDistribution(
111    "TPUOneStep", _get_tpu_strategy_creator(steps_per_run=1), required_tpu=True)
112tpu_strategy_one_core = combinations.NamedDistribution(
113    "TPUOneCore",
114    _get_tpu_strategy_creator(steps_per_run=2, use_single_core=True),
115    required_tpu=True)
116tpu_strategy_one_step_one_core = combinations.NamedDistribution(
117    "TPUOneStepOneCore",
118    _get_tpu_strategy_creator(steps_per_run=1, use_single_core=True),
119    required_tpu=True)
120cloud_tpu_strategy = combinations.NamedDistribution(
121    "CloudTPU",
122    _get_tpu_strategy_creator(steps_per_run=2),
123    required_tpu=True,
124    use_cloud_tpu=True)
125mirrored_strategy_with_one_cpu = combinations.NamedDistribution(
126    "Mirrored1CPU", lambda: mirrored_lib.MirroredStrategy(["/cpu:0"]))
127mirrored_strategy_with_one_gpu = combinations.NamedDistribution(
128    "Mirrored1GPU",
129    lambda: mirrored_lib.MirroredStrategy(["/gpu:0"]),
130    required_gpus=1)
131mirrored_strategy_with_gpu_and_cpu = combinations.NamedDistribution(
132    "MirroredCPUAndGPU",
133    lambda: mirrored_lib.MirroredStrategy(["/gpu:0", "/cpu:0"]),
134    required_gpus=1)
135mirrored_strategy_with_two_gpus = combinations.NamedDistribution(
136    "Mirrored2GPUs",
137    lambda: mirrored_lib.MirroredStrategy(["/gpu:0", "/gpu:1"]),
138    required_gpus=2)
139# Should call set_virtual_cpus_to_at_least(3) in your test's setUp methods.
140mirrored_strategy_with_cpu_1_and_2 = combinations.NamedDistribution(
141    "Mirrored2CPU", lambda: mirrored_lib.MirroredStrategy(["/cpu:1", "/cpu:2"]))
142central_storage_strategy_with_two_gpus = combinations.NamedDistribution(
143    "CentralStorage2GPUs",
144    lambda: central_storage_strategy.CentralStorageStrategy._from_num_gpus(2),  # pylint: disable=protected-access
145    required_gpus=2)
146central_storage_strategy_with_gpu_and_cpu = combinations.NamedDistribution(
147    "CentralStorageCPUAndGPU",
148    lambda: central_storage_strategy.CentralStorageStrategy(
149        ["/gpu:0", "/cpu:0"]),
150    required_gpus=1)
151
152gradient_descent_optimizer_v1_fn = combinations.NamedObject(
153    "GradientDescentV1",
154    lambda: gradient_descent.GradientDescentOptimizer(0.001))
155adagrad_optimizer_v1_fn = combinations.NamedObject(
156    "AdagradV1", lambda: adagrad.AdagradOptimizer(0.001))
157adam_optimizer_v1_fn = combinations.NamedObject(
158    "AdamV1", lambda: adam.AdamOptimizer(0.001, epsilon=1))
159ftrl_optimizer_v1_fn = combinations.NamedObject(
160    "FtrlV1", lambda: ftrl.FtrlOptimizer(0.001))
161rmsprop_optimizer_v1_fn = combinations.NamedObject(
162    "RmsPropV1", lambda: rmsprop.RMSPropOptimizer(0.001))
163
164# TODO(shiningsun): consider adding the other v1 optimizers
165optimizers_v1 = [
166    gradient_descent_optimizer_v1_fn, adagrad_optimizer_v1_fn,
167    ftrl_optimizer_v1_fn, rmsprop_optimizer_v1_fn
168]
169
170adadelta_optimizer_keras_v2_fn = combinations.NamedObject(
171    "AdadeltaKerasV2", lambda: adadelta_keras_v2.Adadelta(0.001))
172adagrad_optimizer_keras_v2_fn = combinations.NamedObject(
173    "AdagradKerasV2", lambda: adagrad_keras_v2.Adagrad(0.001))
174adam_optimizer_keras_v2_fn = combinations.NamedObject(
175    "AdamKerasV2", lambda: adam_keras_v2.Adam(0.001, epsilon=1.0))
176adamax_optimizer_keras_v2_fn = combinations.NamedObject(
177    "AdamaxKerasV2", lambda: adamax_keras_v2.Adamax(0.001, epsilon=1.0))
178nadam_optimizer_keras_v2_fn = combinations.NamedObject(
179    "NadamKerasV2", lambda: nadam_keras_v2.Nadam(0.001, epsilon=1.0))
180ftrl_optimizer_keras_v2_fn = combinations.NamedObject(
181    "FtrlKerasV2", lambda: ftrl_keras_v2.Ftrl(0.001))
182gradient_descent_optimizer_keras_v2_fn = combinations.NamedObject(
183    "GradientDescentKerasV2", lambda: gradient_descent_keras_v2.SGD(0.001))
184rmsprop_optimizer_keras_v2_fn = combinations.NamedObject(
185    "RmsPropKerasV2", lambda: rmsprop_keras_v2.RMSprop(0.001))
186
187# TODO(shiningsun): consider adding the other v2 optimizers
188optimizers_v2 = [
189    gradient_descent_optimizer_keras_v2_fn, adagrad_optimizer_keras_v2_fn
190]
191
192optimizers_v1_and_v2 = optimizers_v1 + optimizers_v2
193
194graph_and_eager_modes = ["graph", "eager"]
195
196
197# This function should be called in a test's `setUp` method with the
198# maximum value needed in any test.
199def set_virtual_cpus_to_at_least(num_virtual_cpus):
200  """Create virtual CPU devices if they haven't yet been created."""
201  if num_virtual_cpus < 1:
202    raise ValueError("`num_virtual_cpus` must be at least 1 not %r" %
203                     (num_virtual_cpus,))
204  physical_devices = config.list_physical_devices("CPU")
205  if not physical_devices:
206    raise RuntimeError("No CPUs found")
207  configs = config.get_logical_device_configuration(physical_devices[0])
208  if configs is None:
209    logical_devices = [
210        context.LogicalDeviceConfiguration() for _ in range(num_virtual_cpus)
211    ]
212    config.set_logical_device_configuration(physical_devices[0],
213                                            logical_devices)
214  else:
215    if len(configs) < num_virtual_cpus:
216      raise RuntimeError("Already configured with %d < %d virtual CPUs" %
217                         (len(configs), num_virtual_cpus))
218
219
220def distributions_and_v1_optimizers():
221  """A common set of combination with DistributionStrategies and Optimizers."""
222  return combinations.combine(
223      distribution=[
224          one_device_strategy,
225          mirrored_strategy_with_gpu_and_cpu,
226          mirrored_strategy_with_two_gpus,
227      ],
228      optimizer_fn=optimizers_v1)
229
230
231def distributions_and_v2_optimizers():
232  """A common set of combination with DistributionStrategies and Optimizers."""
233  return combinations.combine(
234      distribution=[
235          one_device_strategy,
236          mirrored_strategy_with_gpu_and_cpu,
237          mirrored_strategy_with_two_gpus,
238      ],
239      optimizer_fn=optimizers_v2)
240
241
242def distributions_and_v1_and_v2_optimizers():
243  """A common set of combination with DistributionStrategies and Optimizers."""
244  return combinations.combine(
245      distribution=[
246          one_device_strategy,
247          mirrored_strategy_with_gpu_and_cpu,
248          mirrored_strategy_with_two_gpus,
249      ],
250      optimizer_fn=optimizers_v1_and_v2)
251
252
253strategies_minus_tpu = [
254    default_strategy, one_device_strategy, one_device_strategy_gpu,
255    mirrored_strategy_with_gpu_and_cpu, mirrored_strategy_with_two_gpus
256]
257
258tpu_strategies = [
259    tpu_strategy,  # steps_per_run=2
260    tpu_strategy_one_step,
261    cloud_tpu_strategy,
262]
263
264all_strategies = strategies_minus_tpu + tpu_strategies
265
266multidevice_strategies = [
267    mirrored_strategy_with_gpu_and_cpu,
268    mirrored_strategy_with_two_gpus,
269    tpu_strategy,  # steps_per_run=2
270    tpu_strategy_one_step
271]
272
273
274def strategy_minus_tpu_combinations():
275  return combinations.combine(
276      distribution=strategies_minus_tpu, mode=["graph", "eager"])
277
278
279def tpu_strategy_combinations():
280  return combinations.combine(distribution=tpu_strategies, mode=["graph"])
281
282
283def all_strategy_combinations():
284  return strategy_minus_tpu_combinations() + tpu_strategy_combinations()
285
286
287def all_strategy_minus_default_and_tpu_combinations():
288  return combinations.combine(
289      distribution=[
290          one_device_strategy, one_device_strategy_gpu,
291          mirrored_strategy_with_gpu_and_cpu, mirrored_strategy_with_two_gpus
292      ],
293      mode=["graph", "eager"])
294
295
296def all_strategy_combinations_minus_default():
297  return (all_strategy_minus_default_and_tpu_combinations() +
298          tpu_strategy_combinations())
299