1# Copyright 2018 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Facilities for creating multiple test combinations. 16 17Here is an example of testing various optimizers in Eager and Graph mode: 18 19class AdditionExample(test.TestCase, parameterized.TestCase): 20 @combinations.generate( 21 combinations.combine(mode=["graph", "eager"], 22 optimizer=[AdamOptimizer(), 23 GradientDescentOptimizer()])) 24 def testOptimizer(self, optimizer): 25 ... f(optimizer)... 26 27This will run `testOptimizer` 4 times with the specified optimizers: 2 in 28Eager and 2 in Graph mode. 29The test will be provided with arguments that match the arguments of combine 30by name. It is necessary to request all arguments, except for `mode`, which is 31optional. 32 33`combine()` function is available for creating a cross product of various 34options. `times()` function exists for creating a product of N `combine()`-ed 35results. See below. 36""" 37 38from __future__ import absolute_import 39from __future__ import division 40from __future__ import print_function 41 42from collections import OrderedDict 43import sys 44import types 45import unittest 46from absl.testing import parameterized 47import six 48from tensorflow.contrib.distribute.python import mirrored_strategy as mirrored_lib 49from tensorflow.contrib.distribute.python import parameter_server_strategy 50from tensorflow.contrib.optimizer_v2 import adagrad as adagrad_v2 51from tensorflow.contrib.optimizer_v2 import adam as adam_v2 52from tensorflow.contrib.optimizer_v2 import gradient_descent as gradient_descent_v2 53from tensorflow.python.distribute import distribution_strategy_context 54from tensorflow.python.distribute import one_device_strategy as one_device_lib 55from tensorflow.python.distribute import tpu_strategy as tpu_lib 56from tensorflow.python.distribute.cluster_resolver import tpu_cluster_resolver 57from tensorflow.python.eager import context 58from tensorflow.python.framework import ops 59from tensorflow.python.keras.optimizer_v2 import adagrad as adagrad_keras_v2 60from tensorflow.python.keras.optimizer_v2 import adam as adam_keras_v2 61from tensorflow.python.keras.optimizer_v2 import gradient_descent as gradient_descent_keras_v2 62from tensorflow.python.keras.optimizer_v2 import rmsprop as rmsprop_keras_v2 63from tensorflow.python.tpu import device_assignment as device_assignment_lib 64from tensorflow.python.tpu import tpu_strategy_util 65from tensorflow.python.training import adagrad 66from tensorflow.python.training import adam 67from tensorflow.python.training import gradient_descent 68from tensorflow.python.training import rmsprop 69from tensorflow.python.util import tf_inspect 70 71 72GPU_TEST = "test_gpu" in sys.argv[0] 73TPU_TEST = "test_tpu" in sys.argv[0] 74 75 76def generate(combinations): 77 """A decorator for generating test cases of a test method or a test class. 78 79 Args: 80 combinations: a list of dictionaries created using combine() and times(). 81 82 Restrictions: 83 -- the "mode" argument can be either "eager" or "graph". It's "graph" by 84 default. 85 -- arguments of the test method must match by name to get the corresponding 86 value of the combination. Tests must accept all arguments except the 87 "mode", "required_tpu" and "required_gpus". 88 -- "distribution" argument is special and optional. It is meant for passing 89 instances of DistributionStrategy. Each instance is to be passed as via 90 `NamedDistribution`. If using "distribution", "required_gpus" and 91 "required_tpu" should be specified via the NamedDistribution instance, 92 rather than as separate arguments. 93 -- "required_tpu" argument is special and optional. If not `None`, then the 94 test will be skipped if TPUs aren't available. 95 -- "required_gpus" argument is special and optional. If not `None`, then the 96 test will be skipped if the specified number of GPUs aren't available. 97 98 Returns: 99 a decorator that will cause the test method or the test class to be run 100 under the specified conditions. 101 102 Raises: 103 ValueError - if "mode" argument wasn't either "eager" or "graph" or if other 104 arguments were not accepted by the test method. 105 """ 106 107 def decorator(test_method_or_class): 108 """The decorator to be returned.""" 109 110 # Generate good test names that can be used with --test_filter. 111 named_combinations = [] 112 for combination in combinations: 113 # We use OrderedDicts in `combine()` and `times()` to ensure stable 114 # order of keys in each dictionary. 115 assert isinstance(combination, OrderedDict) 116 name = "".join([ 117 "_{}_{}".format( 118 "".join(filter(str.isalnum, key)), 119 "".join(filter(str.isalnum, str(value)))) 120 for key, value in combination.items() 121 ]) 122 named_combinations.append( 123 OrderedDict( 124 list(combination.items()) + [("testcase_name", 125 "_test{}".format(name))])) 126 127 if isinstance(test_method_or_class, type): 128 class_object = test_method_or_class 129 class_object._test_method_ids = test_method_ids = {} 130 for name, test_method in six.iteritems(class_object.__dict__.copy()): 131 if (name.startswith(unittest.TestLoader.testMethodPrefix) and 132 isinstance(test_method, types.FunctionType)): 133 delattr(class_object, name) 134 methods = {} 135 parameterized._update_class_dict_for_param_test_case( 136 class_object.__name__, methods, test_method_ids, name, 137 parameterized._ParameterizedTestIter( 138 _augment_with_special_arguments(test_method), 139 named_combinations, parameterized._NAMED, name)) 140 for method_name, method in six.iteritems(methods): 141 setattr(class_object, method_name, method) 142 143 return class_object 144 else: 145 test_method = _augment_with_special_arguments(test_method_or_class) 146 return parameterized.named_parameters(*named_combinations)(test_method) 147 148 return decorator 149 150 151def _augment_with_special_arguments(test_method): 152 def decorated(self, **kwargs): 153 """A wrapped test method that treats some arguments in a special way.""" 154 mode = kwargs.pop("mode", "graph") 155 156 distribution = kwargs.get("distribution", None) 157 required_tpu = kwargs.pop("required_tpu", False) 158 required_gpus = kwargs.pop("required_gpus", None) 159 160 if distribution: 161 assert required_gpus is None, ( 162 "Do not use `required_gpus` and `distribution` together.") 163 assert required_tpu is False, ( 164 "Do not use `required_tpu` and `distribution` together.") 165 required_gpus = distribution.required_gpus 166 required_tpu = distribution.required_tpu 167 168 if required_tpu and not TPU_TEST: 169 self.skipTest("Test requires a TPU, but it's not available.") 170 if not required_tpu and TPU_TEST: 171 self.skipTest("Test that doesn't require a TPU.") 172 173 if not required_gpus: 174 if GPU_TEST: 175 self.skipTest("Test that doesn't require GPUs.") 176 elif context.num_gpus() < required_gpus: 177 # TODO(priyag): Consider allowing tests in graph mode using soft 178 # placement. 179 self.skipTest( 180 "{} GPUs are not available for this test. {} GPUs are available". 181 format(required_gpus, context.num_gpus())) 182 183 # At this point, `kwargs` doesn't have `required_gpus` or `required_tpu` 184 # that the user might have specified. `kwargs` still has `mode`, which 185 # the test is allowed to accept or ignore. 186 requested_arguments = tf_inspect.getfullargspec(test_method).args 187 missing_arguments = set(list(kwargs.keys()) + ["self"]).difference( 188 set(requested_arguments + ["mode"])) 189 if missing_arguments: 190 raise ValueError("The test is missing arguments {} .".format( 191 missing_arguments)) 192 193 kwargs_to_pass = {} 194 for arg in requested_arguments: 195 if arg == "self": 196 kwargs_to_pass[arg] = self 197 else: 198 kwargs_to_pass[arg] = kwargs[arg] 199 200 if mode == "eager": 201 with context.eager_mode(): 202 if distribution: 203 kwargs_to_pass["distribution"] = distribution.strategy 204 test_method(**kwargs_to_pass) 205 elif mode == "graph": 206 with ops.Graph().as_default(), context.graph_mode(): 207 if distribution: 208 kwargs_to_pass["distribution"] = distribution.strategy 209 test_method(**kwargs_to_pass) 210 else: 211 raise ValueError( 212 "'mode' has to be either 'eager' or 'graph' and not {}".format( 213 mode)) 214 return decorated 215 216 217def combine(**kwargs): 218 """Generate combinations based on its keyword arguments. 219 220 Two sets of returned combinations can be concatenated using +. Their product 221 can be computed using `times()`. 222 223 Args: 224 **kwargs: keyword arguments of form `option=[possibilities, ...]` 225 or `option=the_only_possibility`. 226 227 Returns: 228 a list of dictionaries for each combination. Keys in the dictionaries are 229 the keyword argument names. Each key has one value - one of the 230 corresponding keyword argument values. 231 """ 232 if not kwargs: 233 return [OrderedDict()] 234 235 sort_by_key = lambda k: k[0] 236 kwargs = OrderedDict(sorted(kwargs.items(), key=sort_by_key)) 237 first = list(kwargs.items())[0] 238 239 rest = dict(list(kwargs.items())[1:]) 240 rest_combined = combine(**rest) 241 242 key = first[0] 243 values = first[1] 244 if not isinstance(values, list): 245 values = [values] 246 247 return [ 248 OrderedDict(sorted(list(combined.items()) + [(key, v)], key=sort_by_key)) 249 for v in values 250 for combined in rest_combined 251 ] 252 253 254def times(*combined): 255 """Generate a product of N sets of combinations. 256 257 times(combine(a=[1,2]), combine(b=[3,4])) == combine(a=[1,2], b=[3,4]) 258 259 Args: 260 *combined: N lists of dictionaries that specify combinations. 261 262 Returns: 263 a list of dictionaries for each combination. 264 265 Raises: 266 ValueError: if some of the inputs have overlapping keys. 267 """ 268 assert combined 269 270 if len(combined) == 1: 271 return combined[0] 272 273 first = combined[0] 274 rest_combined = times(*combined[1:]) 275 276 combined_results = [] 277 for a in first: 278 for b in rest_combined: 279 if set(a.keys()).intersection(set(b.keys())): 280 raise ValueError("Keys need to not overlap: {} vs {}".format( 281 a.keys(), b.keys())) 282 283 combined_results.append(OrderedDict(list(a.items()) + list(b.items()))) 284 return combined_results 285 286 287class NamedObject(object): 288 """A class that translates an object into a good test name.""" 289 290 def __init__(self, name, obj): 291 self._name = name 292 self._obj = obj 293 294 def __getattr__(self, name): 295 return getattr(self._obj, name) 296 297 def __call__(self, *args, **kwargs): 298 return self._obj(*args, **kwargs) 299 300 def __repr__(self): 301 return self._name 302 303 304class NamedDistribution(object): 305 """Translates DistributionStrategy and its data into a good name.""" 306 307 def __init__(self, name, distribution_fn, required_gpus=None, 308 required_tpu=False): 309 self._distribution_fn = distribution_fn 310 self._name = name 311 self._required_gpus = required_gpus 312 self._required_tpu = required_tpu 313 314 def __repr__(self): 315 return self._name 316 317 @property 318 def strategy(self): 319 return self._distribution_fn() 320 321 @property 322 def required_gpus(self): 323 return self._required_gpus 324 325 @property 326 def required_tpu(self): 327 return self._required_tpu 328 329 330def _get_tpu_strategy_creator(steps_per_run, use_single_core=False, **kwargs): 331 def _create_tpu_strategy(): 332 resolver = tpu_cluster_resolver.TPUClusterResolver("") 333 topology = tpu_strategy_util.initialize_tpu_system(resolver) 334 device_assignment = None 335 if use_single_core: 336 device_assignment = device_assignment_lib.DeviceAssignment( 337 topology, core_assignment=device_assignment_lib. 338 SINGLE_CORE_ASSIGNMENT) 339 340 strategy = tpu_lib.TPUStrategy(resolver, steps_per_run=steps_per_run, 341 device_assignment=device_assignment, 342 **kwargs) 343 return strategy 344 return _create_tpu_strategy 345 346 347# pylint: disable=g-long-lambda 348default_strategy = NamedDistribution( 349 "Default", 350 distribution_strategy_context._get_default_strategy, # pylint: disable=protected-access 351 required_gpus=None) 352one_device_strategy = NamedDistribution( 353 "OneDeviceCPU", lambda: one_device_lib.OneDeviceStrategy("/cpu:0"), 354 required_gpus=None) 355one_device_strategy_gpu = NamedDistribution( 356 "OneDeviceGPU", lambda: one_device_lib.OneDeviceStrategy("/gpu:0"), 357 required_gpus=1) 358tpu_strategy = NamedDistribution( 359 "TPU", _get_tpu_strategy_creator(steps_per_run=2), 360 required_tpu=True) 361tpu_strategy_one_step = NamedDistribution( 362 "TPUOneStep", _get_tpu_strategy_creator(steps_per_run=1), 363 required_tpu=True) 364tpu_strategy_one_core = NamedDistribution( 365 "TPUOneCore", _get_tpu_strategy_creator( 366 steps_per_run=2, use_single_core=True), 367 required_tpu=True) 368tpu_strategy_one_step_one_core = NamedDistribution( 369 "TPUOneStepOneCore", _get_tpu_strategy_creator( 370 steps_per_run=1, use_single_core=True), 371 required_tpu=True) 372 373mirrored_strategy_with_one_cpu = NamedDistribution( 374 "Mirrored1CPU", 375 lambda: mirrored_lib.MirroredStrategy(["/cpu:0"])) 376mirrored_strategy_with_one_gpu = NamedDistribution( 377 "Mirrored1GPU", 378 lambda: mirrored_lib.MirroredStrategy(["/gpu:0"]), 379 required_gpus=1) 380mirrored_strategy_with_gpu_and_cpu = NamedDistribution( 381 "MirroredCPUAndGPU", 382 lambda: mirrored_lib.MirroredStrategy(["/gpu:0", "/cpu:0"]), 383 required_gpus=1) 384mirrored_strategy_with_two_gpus = NamedDistribution( 385 "Mirrored2GPUs", 386 lambda: mirrored_lib.MirroredStrategy(["/gpu:0", "/gpu:1"]), 387 required_gpus=2) 388core_mirrored_strategy_with_one_cpu = NamedDistribution( 389 "CoreMirrored1CPU", 390 lambda: mirrored_lib.CoreMirroredStrategy(["/cpu:0"])) 391core_mirrored_strategy_with_one_gpu = NamedDistribution( 392 "CoreMirrored1GPU", 393 lambda: mirrored_lib.CoreMirroredStrategy(["/gpu:0"]), 394 required_gpus=1) 395core_mirrored_strategy_with_gpu_and_cpu = NamedDistribution( 396 "CoreMirroredCPUAndGPU", 397 lambda: mirrored_lib.CoreMirroredStrategy(["/gpu:0", "/cpu:0"]), 398 required_gpus=1) 399core_mirrored_strategy_with_two_gpus = NamedDistribution( 400 "CoreMirrored2GPUs", 401 lambda: mirrored_lib.CoreMirroredStrategy(["/gpu:0", "/gpu:1"]), 402 required_gpus=2) 403parameter_server_strategy_with_two_gpus = NamedDistribution( 404 "ParameterServer2GPUs", 405 lambda: parameter_server_strategy.ParameterServerStrategy( 406 num_gpus_per_worker=2), 407 required_gpus=2) 408 409 410gradient_descent_optimizer_v1_fn = NamedObject( 411 "GradientDescentV1", lambda: gradient_descent.GradientDescentOptimizer(0.2)) 412adagrad_optimizer_v1_fn = NamedObject( 413 "AdagradV1", lambda: adagrad.AdagradOptimizer(0.001)) 414adam_optimizer_v1_fn = NamedObject("AdamV1", 415 lambda: adam.AdamOptimizer(0.001, epsilon=1)) 416rmsprop_optimizer_v1_fn = NamedObject( 417 "RmsPropV1", lambda: rmsprop.RMSPropOptimizer(0.001)) 418 419optimizers_v1 = [gradient_descent_optimizer_v1_fn, adagrad_optimizer_v1_fn] 420 421gradient_descent_optimizer_v2_fn = NamedObject( 422 "GradientDescentV2", 423 lambda: gradient_descent_v2.GradientDescentOptimizer(0.2)) 424adagrad_optimizer_v2_fn = NamedObject( 425 "AdagradV2", lambda: adagrad_v2.AdagradOptimizer(0.001)) 426adam_optimizer_v2_fn = NamedObject( 427 "AdamV2", lambda: adam_v2.AdamOptimizer(0.001, epsilon=1.0)) 428 429optimizers_v2 = [gradient_descent_optimizer_v2_fn, adagrad_optimizer_v2_fn] 430 431gradient_descent_optimizer_keras_v2_fn = NamedObject( 432 "GradientDescentKerasV2", 433 lambda: gradient_descent_keras_v2.SGD(0.2)) 434adagrad_optimizer_keras_v2_fn = NamedObject( 435 "AdagradKerasV2", lambda: adagrad_keras_v2.Adagrad(0.001)) 436adam_optimizer_keras_v2_fn = NamedObject( 437 "AdamKerasV2", lambda: adam_keras_v2.Adam(0.001, epsilon=1.0)) 438rmsprop_optimizer_keras_v2_fn = NamedObject( 439 "RmsPropKerasV2", lambda: rmsprop_keras_v2.RMSprop(0.001)) 440 441graph_and_eager_modes = ["graph", "eager"] 442 443 444def distributions_and_v1_optimizers(): 445 """A common set of combination with DistributionStrategies and Optimizers.""" 446 return combine( 447 distribution=[ 448 one_device_strategy, 449 mirrored_strategy_with_gpu_and_cpu, 450 mirrored_strategy_with_two_gpus, 451 core_mirrored_strategy_with_gpu_and_cpu, 452 core_mirrored_strategy_with_two_gpus, 453 ], 454 optimizer_fn=optimizers_v1) 455 456 457def distributions_and_v2_optimizers(): 458 """DistributionStrategies and V2 Optimizers.""" 459 return combine( 460 distribution=[ 461 one_device_strategy, 462 mirrored_strategy_with_gpu_and_cpu, 463 mirrored_strategy_with_two_gpus, 464 core_mirrored_strategy_with_gpu_and_cpu, 465 core_mirrored_strategy_with_two_gpus, 466 ], 467 optimizer_fn=optimizers_v2) 468