1# Copyright 2017 The Abseil Authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Base functionality for Abseil Python tests. 16 17This module contains base classes and high-level functions for Abseil-style 18tests. 19""" 20 21from collections import abc 22import contextlib 23import difflib 24import enum 25import errno 26import getpass 27import inspect 28import io 29import itertools 30import json 31import os 32import random 33import re 34import shlex 35import shutil 36import signal 37import stat 38import subprocess 39import sys 40import tempfile 41import textwrap 42import unittest 43from unittest import mock # pylint: disable=unused-import Allow absltest.mock. 44from urllib import parse 45 46try: 47 # The faulthandler module isn't always available, and pytype doesn't 48 # understand that we're catching ImportError, so suppress the error. 49 # pytype: disable=import-error 50 import faulthandler 51 # pytype: enable=import-error 52except ImportError: 53 # We use faulthandler if it is available. 54 faulthandler = None 55 56from absl import app 57from absl import flags 58from absl import logging 59from absl.testing import _pretty_print_reporter 60from absl.testing import xml_reporter 61 62# Make typing an optional import to avoid it being a required dependency 63# in Python 2. Type checkers will still understand the imports. 64try: 65 # pylint: disable=unused-import 66 import typing 67 from typing import Any, AnyStr, BinaryIO, Callable, ContextManager, IO, Iterator, List, Mapping, MutableMapping, MutableSequence, Optional, Sequence, Text, TextIO, Tuple, Type, Union 68 # pylint: enable=unused-import 69except ImportError: 70 pass 71else: 72 # Use an if-type-checking block to prevent leakage of type-checking only 73 # symbols. We don't want people relying on these at runtime. 74 if typing.TYPE_CHECKING: 75 # Unbounded TypeVar for general usage 76 _T = typing.TypeVar('_T') 77 78 import unittest.case 79 _OutcomeType = unittest.case._Outcome # pytype: disable=module-attr 80 81 82 83# Re-export a bunch of unittest functions we support so that people don't 84# have to import unittest to get them 85# pylint: disable=invalid-name 86skip = unittest.skip 87skipIf = unittest.skipIf 88skipUnless = unittest.skipUnless 89SkipTest = unittest.SkipTest 90expectedFailure = unittest.expectedFailure 91# pylint: enable=invalid-name 92 93# End unittest re-exports 94 95FLAGS = flags.FLAGS 96 97_TEXT_OR_BINARY_TYPES = (str, bytes) 98 99# Suppress surplus entries in AssertionError stack traces. 100__unittest = True # pylint: disable=invalid-name 101 102 103def expectedFailureIf(condition, reason): # pylint: disable=invalid-name 104 """Expects the test to fail if the run condition is True. 105 106 Example usage:: 107 108 @expectedFailureIf(sys.version.major == 2, "Not yet working in py2") 109 def test_foo(self): 110 ... 111 112 Args: 113 condition: bool, whether to expect failure or not. 114 reason: Text, the reason to expect failure. 115 Returns: 116 Decorator function 117 """ 118 del reason # Unused 119 if condition: 120 return unittest.expectedFailure 121 else: 122 return lambda f: f 123 124 125class TempFileCleanup(enum.Enum): 126 # Always cleanup temp files when the test completes. 127 ALWAYS = 'always' 128 # Only cleanup temp file if the test passes. This allows easier inspection 129 # of tempfile contents on test failure. absltest.TEST_TMPDIR.value determines 130 # where tempfiles are created. 131 SUCCESS = 'success' 132 # Never cleanup temp files. 133 OFF = 'never' 134 135 136# Many of the methods in this module have names like assertSameElements. 137# This kind of name does not comply with PEP8 style, 138# but it is consistent with the naming of methods in unittest.py. 139# pylint: disable=invalid-name 140 141 142def _get_default_test_random_seed(): 143 # type: () -> int 144 random_seed = 301 145 value = os.environ.get('TEST_RANDOM_SEED', '') 146 try: 147 random_seed = int(value) 148 except ValueError: 149 pass 150 return random_seed 151 152 153def get_default_test_srcdir(): 154 # type: () -> Text 155 """Returns default test source dir.""" 156 return os.environ.get('TEST_SRCDIR', '') 157 158 159def get_default_test_tmpdir(): 160 # type: () -> Text 161 """Returns default test temp dir.""" 162 tmpdir = os.environ.get('TEST_TMPDIR', '') 163 if not tmpdir: 164 tmpdir = os.path.join(tempfile.gettempdir(), 'absl_testing') 165 166 return tmpdir 167 168 169def _get_default_randomize_ordering_seed(): 170 # type: () -> int 171 """Returns default seed to use for randomizing test order. 172 173 This function first checks the --test_randomize_ordering_seed flag, and then 174 the TEST_RANDOMIZE_ORDERING_SEED environment variable. If the first value 175 we find is: 176 * (not set): disable test randomization 177 * 0: disable test randomization 178 * 'random': choose a random seed in [1, 4294967295] for test order 179 randomization 180 * positive integer: use this seed for test order randomization 181 182 (The values used are patterned after 183 https://docs.python.org/3/using/cmdline.html#envvar-PYTHONHASHSEED). 184 185 In principle, it would be simpler to return None if no override is provided; 186 however, the python random module has no `get_seed()`, only `getstate()`, 187 which returns far more data than we want to pass via an environment variable 188 or flag. 189 190 Returns: 191 A default value for test case randomization (int). 0 means do not randomize. 192 193 Raises: 194 ValueError: Raised when the flag or env value is not one of the options 195 above. 196 """ 197 if FLAGS['test_randomize_ordering_seed'].present: 198 randomize = FLAGS.test_randomize_ordering_seed 199 elif 'TEST_RANDOMIZE_ORDERING_SEED' in os.environ: 200 randomize = os.environ['TEST_RANDOMIZE_ORDERING_SEED'] 201 else: 202 randomize = '' 203 if not randomize: 204 return 0 205 if randomize == 'random': 206 return random.Random().randint(1, 4294967295) 207 if randomize == '0': 208 return 0 209 try: 210 seed = int(randomize) 211 if seed > 0: 212 return seed 213 except ValueError: 214 pass 215 raise ValueError( 216 'Unknown test randomization seed value: {}'.format(randomize)) 217 218 219TEST_SRCDIR = flags.DEFINE_string( 220 'test_srcdir', 221 get_default_test_srcdir(), 222 'Root of directory tree where source files live', 223 allow_override_cpp=True) 224TEST_TMPDIR = flags.DEFINE_string( 225 'test_tmpdir', 226 get_default_test_tmpdir(), 227 'Directory for temporary testing files', 228 allow_override_cpp=True) 229 230flags.DEFINE_integer( 231 'test_random_seed', 232 _get_default_test_random_seed(), 233 'Random seed for testing. Some test frameworks may ' 234 'change the default value of this flag between runs, so ' 235 'it is not appropriate for seeding probabilistic tests.', 236 allow_override_cpp=True) 237flags.DEFINE_string( 238 'test_randomize_ordering_seed', 239 '', 240 'If positive, use this as a seed to randomize the ' 241 'execution order for test cases. If "random", pick a ' 242 'random seed to use. If 0 or not set, do not randomize ' 243 'test case execution order. This flag also overrides ' 244 'the TEST_RANDOMIZE_ORDERING_SEED environment variable.', 245 allow_override_cpp=True) 246flags.DEFINE_string('xml_output_file', '', 'File to store XML test results') 247 248 249# We might need to monkey-patch TestResult so that it stops considering an 250# unexpected pass as a as a "successful result". For details, see 251# http://bugs.python.org/issue20165 252def _monkey_patch_test_result_for_unexpected_passes(): 253 # type: () -> None 254 """Workaround for <http://bugs.python.org/issue20165>.""" 255 256 def wasSuccessful(self): 257 # type: () -> bool 258 """Tells whether or not this result was a success. 259 260 Any unexpected pass is to be counted as a non-success. 261 262 Args: 263 self: The TestResult instance. 264 265 Returns: 266 Whether or not this result was a success. 267 """ 268 return (len(self.failures) == len(self.errors) == 269 len(self.unexpectedSuccesses) == 0) 270 271 test_result = unittest.TestResult() 272 test_result.addUnexpectedSuccess(unittest.FunctionTestCase(lambda: None)) 273 if test_result.wasSuccessful(): # The bug is present. 274 unittest.TestResult.wasSuccessful = wasSuccessful 275 if test_result.wasSuccessful(): # Warn the user if our hot-fix failed. 276 sys.stderr.write('unittest.result.TestResult monkey patch to report' 277 ' unexpected passes as failures did not work.\n') 278 279 280_monkey_patch_test_result_for_unexpected_passes() 281 282 283def _open(filepath, mode, _open_func=open): 284 # type: (Text, Text, Callable[..., IO]) -> IO 285 """Opens a file. 286 287 Like open(), but ensure that we can open real files even if tests stub out 288 open(). 289 290 Args: 291 filepath: A filepath. 292 mode: A mode. 293 _open_func: A built-in open() function. 294 295 Returns: 296 The opened file object. 297 """ 298 return _open_func(filepath, mode, encoding='utf-8') 299 300 301class _TempDir(object): 302 """Represents a temporary directory for tests. 303 304 Creation of this class is internal. Using its public methods is OK. 305 306 This class implements the `os.PathLike` interface (specifically, 307 `os.PathLike[str]`). This means, in Python 3, it can be directly passed 308 to e.g. `os.path.join()`. 309 """ 310 311 def __init__(self, path): 312 # type: (Text) -> None 313 """Module-private: do not instantiate outside module.""" 314 self._path = path 315 316 @property 317 def full_path(self): 318 # type: () -> Text 319 """Returns the path, as a string, for the directory. 320 321 TIP: Instead of e.g. `os.path.join(temp_dir.full_path)`, you can simply 322 do `os.path.join(temp_dir)` because `__fspath__()` is implemented. 323 """ 324 return self._path 325 326 def __fspath__(self): 327 # type: () -> Text 328 """See os.PathLike.""" 329 return self.full_path 330 331 def create_file(self, file_path=None, content=None, mode='w', encoding='utf8', 332 errors='strict'): 333 # type: (Optional[Text], Optional[AnyStr], Text, Text, Text) -> _TempFile 334 """Create a file in the directory. 335 336 NOTE: If the file already exists, it will be made writable and overwritten. 337 338 Args: 339 file_path: Optional file path for the temp file. If not given, a unique 340 file name will be generated and used. Slashes are allowed in the name; 341 any missing intermediate directories will be created. NOTE: This path 342 is the path that will be cleaned up, including any directories in the 343 path, e.g., 'foo/bar/baz.txt' will `rm -r foo` 344 content: Optional string or bytes to initially write to the file. If not 345 specified, then an empty file is created. 346 mode: Mode string to use when writing content. Only used if `content` is 347 non-empty. 348 encoding: Encoding to use when writing string content. Only used if 349 `content` is text. 350 errors: How to handle text to bytes encoding errors. Only used if 351 `content` is text. 352 353 Returns: 354 A _TempFile representing the created file. 355 """ 356 tf, _ = _TempFile._create(self._path, file_path, content, mode, encoding, 357 errors) 358 return tf 359 360 def mkdir(self, dir_path=None): 361 # type: (Optional[Text]) -> _TempDir 362 """Create a directory in the directory. 363 364 Args: 365 dir_path: Optional path to the directory to create. If not given, 366 a unique name will be generated and used. 367 368 Returns: 369 A _TempDir representing the created directory. 370 """ 371 if dir_path: 372 path = os.path.join(self._path, dir_path) 373 else: 374 path = tempfile.mkdtemp(dir=self._path) 375 376 # Note: there's no need to clear the directory since the containing 377 # dir was cleared by the tempdir() function. 378 os.makedirs(path, exist_ok=True) 379 return _TempDir(path) 380 381 382class _TempFile(object): 383 """Represents a tempfile for tests. 384 385 Creation of this class is internal. Using its public methods is OK. 386 387 This class implements the `os.PathLike` interface (specifically, 388 `os.PathLike[str]`). This means, in Python 3, it can be directly passed 389 to e.g. `os.path.join()`. 390 """ 391 392 def __init__(self, path): 393 # type: (Text) -> None 394 """Private: use _create instead.""" 395 self._path = path 396 397 # pylint: disable=line-too-long 398 @classmethod 399 def _create(cls, base_path, file_path, content, mode, encoding, errors): 400 # type: (Text, Optional[Text], AnyStr, Text, Text, Text) -> Tuple[_TempFile, Text] 401 # pylint: enable=line-too-long 402 """Module-private: create a tempfile instance.""" 403 if file_path: 404 cleanup_path = os.path.join(base_path, _get_first_part(file_path)) 405 path = os.path.join(base_path, file_path) 406 os.makedirs(os.path.dirname(path), exist_ok=True) 407 # The file may already exist, in which case, ensure it's writable so that 408 # it can be truncated. 409 if os.path.exists(path) and not os.access(path, os.W_OK): 410 stat_info = os.stat(path) 411 os.chmod(path, stat_info.st_mode | stat.S_IWUSR) 412 else: 413 os.makedirs(base_path, exist_ok=True) 414 fd, path = tempfile.mkstemp(dir=str(base_path)) 415 os.close(fd) 416 cleanup_path = path 417 418 tf = cls(path) 419 420 if content: 421 if isinstance(content, str): 422 tf.write_text(content, mode=mode, encoding=encoding, errors=errors) 423 else: 424 tf.write_bytes(content, mode) 425 426 else: 427 tf.write_bytes(b'') 428 429 return tf, cleanup_path 430 431 @property 432 def full_path(self): 433 # type: () -> Text 434 """Returns the path, as a string, for the file. 435 436 TIP: Instead of e.g. `os.path.join(temp_file.full_path)`, you can simply 437 do `os.path.join(temp_file)` because `__fspath__()` is implemented. 438 """ 439 return self._path 440 441 def __fspath__(self): 442 # type: () -> Text 443 """See os.PathLike.""" 444 return self.full_path 445 446 def read_text(self, encoding='utf8', errors='strict'): 447 # type: (Text, Text) -> Text 448 """Return the contents of the file as text.""" 449 with self.open_text(encoding=encoding, errors=errors) as fp: 450 return fp.read() 451 452 def read_bytes(self): 453 # type: () -> bytes 454 """Return the content of the file as bytes.""" 455 with self.open_bytes() as fp: 456 return fp.read() 457 458 def write_text(self, text, mode='w', encoding='utf8', errors='strict'): 459 # type: (Text, Text, Text, Text) -> None 460 """Write text to the file. 461 462 Args: 463 text: Text to write. In Python 2, it can be bytes, which will be 464 decoded using the `encoding` arg (this is as an aid for code that 465 is 2 and 3 compatible). 466 mode: The mode to open the file for writing. 467 encoding: The encoding to use when writing the text to the file. 468 errors: The error handling strategy to use when converting text to bytes. 469 """ 470 with self.open_text(mode, encoding=encoding, errors=errors) as fp: 471 fp.write(text) 472 473 def write_bytes(self, data, mode='wb'): 474 # type: (bytes, Text) -> None 475 """Write bytes to the file. 476 477 Args: 478 data: bytes to write. 479 mode: Mode to open the file for writing. The "b" flag is implicit if 480 not already present. It must not have the "t" flag. 481 """ 482 with self.open_bytes(mode) as fp: 483 fp.write(data) 484 485 def open_text(self, mode='rt', encoding='utf8', errors='strict'): 486 # type: (Text, Text, Text) -> ContextManager[TextIO] 487 """Return a context manager for opening the file in text mode. 488 489 Args: 490 mode: The mode to open the file in. The "t" flag is implicit if not 491 already present. It must not have the "b" flag. 492 encoding: The encoding to use when opening the file. 493 errors: How to handle decoding errors. 494 495 Returns: 496 Context manager that yields an open file. 497 498 Raises: 499 ValueError: if invalid inputs are provided. 500 """ 501 if 'b' in mode: 502 raise ValueError('Invalid mode {!r}: "b" flag not allowed when opening ' 503 'file in text mode'.format(mode)) 504 if 't' not in mode: 505 mode += 't' 506 cm = self._open(mode, encoding, errors) 507 return cm 508 509 def open_bytes(self, mode='rb'): 510 # type: (Text) -> ContextManager[BinaryIO] 511 """Return a context manager for opening the file in binary mode. 512 513 Args: 514 mode: The mode to open the file in. The "b" mode is implicit if not 515 already present. It must not have the "t" flag. 516 517 Returns: 518 Context manager that yields an open file. 519 520 Raises: 521 ValueError: if invalid inputs are provided. 522 """ 523 if 't' in mode: 524 raise ValueError('Invalid mode {!r}: "t" flag not allowed when opening ' 525 'file in binary mode'.format(mode)) 526 if 'b' not in mode: 527 mode += 'b' 528 cm = self._open(mode, encoding=None, errors=None) 529 return cm 530 531 # TODO(b/123775699): Once pytype supports typing.Literal, use overload and 532 # Literal to express more precise return types. The contained type is 533 # currently `Any` to avoid [bad-return-type] errors in the open_* methods. 534 @contextlib.contextmanager 535 def _open( 536 self, mode: str, encoding: str = 'utf8', errors: str = 'strict' 537 ) -> Iterator[Any]: 538 with io.open( 539 self.full_path, mode=mode, encoding=encoding, errors=errors) as fp: 540 yield fp 541 542 543class _method(object): 544 """A decorator that supports both instance and classmethod invocations. 545 546 Using similar semantics to the @property builtin, this decorator can augment 547 an instance method to support conditional logic when invoked on a class 548 object. This breaks support for invoking an instance method via the class 549 (e.g. Cls.method(self, ...)) but is still situationally useful. 550 """ 551 552 def __init__(self, finstancemethod): 553 # type: (Callable[..., Any]) -> None 554 self._finstancemethod = finstancemethod 555 self._fclassmethod = None 556 557 def classmethod(self, fclassmethod): 558 # type: (Callable[..., Any]) -> _method 559 self._fclassmethod = classmethod(fclassmethod) 560 return self 561 562 def __doc__(self): 563 # type: () -> str 564 if getattr(self._finstancemethod, '__doc__'): 565 return self._finstancemethod.__doc__ 566 elif getattr(self._fclassmethod, '__doc__'): 567 return self._fclassmethod.__doc__ 568 return '' 569 570 def __get__(self, obj, type_): 571 # type: (Optional[Any], Optional[Type[Any]]) -> Callable[..., Any] 572 func = self._fclassmethod if obj is None else self._finstancemethod 573 return func.__get__(obj, type_) # pytype: disable=attribute-error 574 575 576class TestCase(unittest.TestCase): 577 """Extension of unittest.TestCase providing more power.""" 578 579 # When to cleanup files/directories created by our `create_tempfile()` and 580 # `create_tempdir()` methods after each test case completes. This does *not* 581 # affect e.g., files created outside of those methods, e.g., using the stdlib 582 # tempfile module. This can be overridden at the class level, instance level, 583 # or with the `cleanup` arg of `create_tempfile()` and `create_tempdir()`. See 584 # `TempFileCleanup` for details on the different values. 585 # TODO(b/70517332): Remove the type comment and the disable once pytype has 586 # better support for enums. 587 tempfile_cleanup = TempFileCleanup.ALWAYS # type: TempFileCleanup # pytype: disable=annotation-type-mismatch 588 589 maxDiff = 80 * 20 590 longMessage = True 591 592 # Exit stacks for per-test and per-class scopes. 593 _exit_stack = None 594 _cls_exit_stack = None 595 596 def __init__(self, *args, **kwargs): 597 super(TestCase, self).__init__(*args, **kwargs) 598 # This is to work around missing type stubs in unittest.pyi 599 self._outcome = getattr(self, '_outcome') # type: Optional[_OutcomeType] 600 601 def setUp(self): 602 super(TestCase, self).setUp() 603 # NOTE: Only Python 3 contextlib has ExitStack 604 if hasattr(contextlib, 'ExitStack'): 605 self._exit_stack = contextlib.ExitStack() 606 self.addCleanup(self._exit_stack.close) 607 608 @classmethod 609 def setUpClass(cls): 610 super(TestCase, cls).setUpClass() 611 # NOTE: Only Python 3 contextlib has ExitStack and only Python 3.8+ has 612 # addClassCleanup. 613 if hasattr(contextlib, 'ExitStack') and hasattr(cls, 'addClassCleanup'): 614 cls._cls_exit_stack = contextlib.ExitStack() 615 cls.addClassCleanup(cls._cls_exit_stack.close) 616 617 def create_tempdir(self, name=None, cleanup=None): 618 # type: (Optional[Text], Optional[TempFileCleanup]) -> _TempDir 619 """Create a temporary directory specific to the test. 620 621 NOTE: The directory and its contents will be recursively cleared before 622 creation. This ensures that there is no pre-existing state. 623 624 This creates a named directory on disk that is isolated to this test, and 625 will be properly cleaned up by the test. This avoids several pitfalls of 626 creating temporary directories for test purposes, as well as makes it easier 627 to setup directories and verify their contents. For example:: 628 629 def test_foo(self): 630 out_dir = self.create_tempdir() 631 out_log = out_dir.create_file('output.log') 632 expected_outputs = [ 633 os.path.join(out_dir, 'data-0.txt'), 634 os.path.join(out_dir, 'data-1.txt'), 635 ] 636 code_under_test(out_dir) 637 self.assertTrue(os.path.exists(expected_paths[0])) 638 self.assertTrue(os.path.exists(expected_paths[1])) 639 self.assertEqual('foo', out_log.read_text()) 640 641 See also: :meth:`create_tempdir` for creating temporary files. 642 643 Args: 644 name: Optional name of the directory. If not given, a unique 645 name will be generated and used. 646 cleanup: Optional cleanup policy on when/if to remove the directory (and 647 all its contents) at the end of the test. If None, then uses 648 :attr:`tempfile_cleanup`. 649 650 Returns: 651 A _TempDir representing the created directory; see _TempDir class docs 652 for usage. 653 """ 654 test_path = self._get_tempdir_path_test() 655 656 if name: 657 path = os.path.join(test_path, name) 658 cleanup_path = os.path.join(test_path, _get_first_part(name)) 659 else: 660 os.makedirs(test_path, exist_ok=True) 661 path = tempfile.mkdtemp(dir=test_path) 662 cleanup_path = path 663 664 _rmtree_ignore_errors(cleanup_path) 665 os.makedirs(path, exist_ok=True) 666 667 self._maybe_add_temp_path_cleanup(cleanup_path, cleanup) 668 669 return _TempDir(path) 670 671 # pylint: disable=line-too-long 672 def create_tempfile(self, file_path=None, content=None, mode='w', 673 encoding='utf8', errors='strict', cleanup=None): 674 # type: (Optional[Text], Optional[AnyStr], Text, Text, Text, Optional[TempFileCleanup]) -> _TempFile 675 # pylint: enable=line-too-long 676 """Create a temporary file specific to the test. 677 678 This creates a named file on disk that is isolated to this test, and will 679 be properly cleaned up by the test. This avoids several pitfalls of 680 creating temporary files for test purposes, as well as makes it easier 681 to setup files, their data, read them back, and inspect them when 682 a test fails. For example:: 683 684 def test_foo(self): 685 output = self.create_tempfile() 686 code_under_test(output) 687 self.assertGreater(os.path.getsize(output), 0) 688 self.assertEqual('foo', output.read_text()) 689 690 NOTE: This will zero-out the file. This ensures there is no pre-existing 691 state. 692 NOTE: If the file already exists, it will be made writable and overwritten. 693 694 See also: :meth:`create_tempdir` for creating temporary directories, and 695 ``_TempDir.create_file`` for creating files within a temporary directory. 696 697 Args: 698 file_path: Optional file path for the temp file. If not given, a unique 699 file name will be generated and used. Slashes are allowed in the name; 700 any missing intermediate directories will be created. NOTE: This path is 701 the path that will be cleaned up, including any directories in the path, 702 e.g., ``'foo/bar/baz.txt'`` will ``rm -r foo``. 703 content: Optional string or 704 bytes to initially write to the file. If not 705 specified, then an empty file is created. 706 mode: Mode string to use when writing content. Only used if `content` is 707 non-empty. 708 encoding: Encoding to use when writing string content. Only used if 709 `content` is text. 710 errors: How to handle text to bytes encoding errors. Only used if 711 `content` is text. 712 cleanup: Optional cleanup policy on when/if to remove the directory (and 713 all its contents) at the end of the test. If None, then uses 714 :attr:`tempfile_cleanup`. 715 716 Returns: 717 A _TempFile representing the created file; see _TempFile class docs for 718 usage. 719 """ 720 test_path = self._get_tempdir_path_test() 721 tf, cleanup_path = _TempFile._create(test_path, file_path, content=content, 722 mode=mode, encoding=encoding, 723 errors=errors) 724 self._maybe_add_temp_path_cleanup(cleanup_path, cleanup) 725 return tf 726 727 @_method 728 def enter_context(self, manager): 729 # type: (ContextManager[_T]) -> _T 730 """Returns the CM's value after registering it with the exit stack. 731 732 Entering a context pushes it onto a stack of contexts. When `enter_context` 733 is called on the test instance (e.g. `self.enter_context`), the context is 734 exited after the test case's tearDown call. When called on the test class 735 (e.g. `TestCase.enter_context`), the context is exited after the test 736 class's tearDownClass call. 737 738 Contexts are exited in the reverse order of entering. They will always 739 be exited, regardless of test failure/success. 740 741 This is useful to eliminate per-test boilerplate when context managers 742 are used. For example, instead of decorating every test with `@mock.patch`, 743 simply do `self.foo = self.enter_context(mock.patch(...))' in `setUp()`. 744 745 NOTE: The context managers will always be exited without any error 746 information. This is an unfortunate implementation detail due to some 747 internals of how unittest runs tests. 748 749 Args: 750 manager: The context manager to enter. 751 """ 752 if not self._exit_stack: 753 raise AssertionError( 754 'self._exit_stack is not set: enter_context is Py3-only; also make ' 755 'sure that AbslTest.setUp() is called.') 756 return self._exit_stack.enter_context(manager) 757 758 @enter_context.classmethod 759 def enter_context(cls, manager): # pylint: disable=no-self-argument 760 # type: (ContextManager[_T]) -> _T 761 if not cls._cls_exit_stack: 762 raise AssertionError( 763 'cls._cls_exit_stack is not set: cls.enter_context requires ' 764 'Python 3.8+; also make sure that AbslTest.setUpClass() is called.') 765 return cls._cls_exit_stack.enter_context(manager) 766 767 @classmethod 768 def _get_tempdir_path_cls(cls): 769 # type: () -> Text 770 return os.path.join(TEST_TMPDIR.value, 771 cls.__qualname__.replace('__main__.', '')) 772 773 def _get_tempdir_path_test(self): 774 # type: () -> Text 775 return os.path.join(self._get_tempdir_path_cls(), self._testMethodName) 776 777 def _get_tempfile_cleanup(self, override): 778 # type: (Optional[TempFileCleanup]) -> TempFileCleanup 779 if override is not None: 780 return override 781 return self.tempfile_cleanup 782 783 def _maybe_add_temp_path_cleanup(self, path, cleanup): 784 # type: (Text, Optional[TempFileCleanup]) -> None 785 cleanup = self._get_tempfile_cleanup(cleanup) 786 if cleanup == TempFileCleanup.OFF: 787 return 788 elif cleanup == TempFileCleanup.ALWAYS: 789 self.addCleanup(_rmtree_ignore_errors, path) 790 elif cleanup == TempFileCleanup.SUCCESS: 791 self._internal_add_cleanup_on_success(_rmtree_ignore_errors, path) 792 else: 793 raise AssertionError('Unexpected cleanup value: {}'.format(cleanup)) 794 795 def _internal_add_cleanup_on_success( 796 self, 797 function: Callable[..., Any], 798 *args: Any, 799 **kwargs: Any, 800 ) -> None: 801 """Adds `function` as cleanup when the test case succeeds.""" 802 outcome = self._outcome 803 previous_failure_count = ( 804 len(outcome.result.failures) 805 + len(outcome.result.errors) 806 + len(outcome.result.unexpectedSuccesses) 807 ) 808 def _call_cleaner_on_success(*args, **kwargs): 809 if not self._internal_ran_and_passed_when_called_during_cleanup( 810 previous_failure_count): 811 return 812 function(*args, **kwargs) 813 self.addCleanup(_call_cleaner_on_success, *args, **kwargs) 814 815 def _internal_ran_and_passed_when_called_during_cleanup( 816 self, 817 previous_failure_count: int, 818 ) -> bool: 819 """Returns whether test is passed. Expected to be called during cleanup.""" 820 outcome = self._outcome 821 if sys.version_info[:2] >= (3, 11): 822 current_failure_count = ( 823 len(outcome.result.failures) 824 + len(outcome.result.errors) 825 + len(outcome.result.unexpectedSuccesses) 826 ) 827 return current_failure_count == previous_failure_count 828 else: 829 # Before Python 3.11 https://github.com/python/cpython/pull/28180, errors 830 # were bufferred in _Outcome before calling cleanup. 831 result = self.defaultTestResult() 832 self._feedErrorsToResult(result, outcome.errors) # pytype: disable=attribute-error 833 return result.wasSuccessful() 834 835 def shortDescription(self): 836 # type: () -> Text 837 """Formats both the test method name and the first line of its docstring. 838 839 If no docstring is given, only returns the method name. 840 841 This method overrides unittest.TestCase.shortDescription(), which 842 only returns the first line of the docstring, obscuring the name 843 of the test upon failure. 844 845 Returns: 846 desc: A short description of a test method. 847 """ 848 desc = self.id() 849 850 # Omit the main name so that test name can be directly copy/pasted to 851 # the command line. 852 if desc.startswith('__main__.'): 853 desc = desc[len('__main__.'):] 854 855 # NOTE: super() is used here instead of directly invoking 856 # unittest.TestCase.shortDescription(self), because of the 857 # following line that occurs later on: 858 # unittest.TestCase = TestCase 859 # Because of this, direct invocation of what we think is the 860 # superclass will actually cause infinite recursion. 861 doc_first_line = super(TestCase, self).shortDescription() 862 if doc_first_line is not None: 863 desc = '\n'.join((desc, doc_first_line)) 864 return desc 865 866 def assertStartsWith(self, actual, expected_start, msg=None): 867 """Asserts that actual.startswith(expected_start) is True. 868 869 Args: 870 actual: str 871 expected_start: str 872 msg: Optional message to report on failure. 873 """ 874 if not actual.startswith(expected_start): 875 self.fail('%r does not start with %r' % (actual, expected_start), msg) 876 877 def assertNotStartsWith(self, actual, unexpected_start, msg=None): 878 """Asserts that actual.startswith(unexpected_start) is False. 879 880 Args: 881 actual: str 882 unexpected_start: str 883 msg: Optional message to report on failure. 884 """ 885 if actual.startswith(unexpected_start): 886 self.fail('%r does start with %r' % (actual, unexpected_start), msg) 887 888 def assertEndsWith(self, actual, expected_end, msg=None): 889 """Asserts that actual.endswith(expected_end) is True. 890 891 Args: 892 actual: str 893 expected_end: str 894 msg: Optional message to report on failure. 895 """ 896 if not actual.endswith(expected_end): 897 self.fail('%r does not end with %r' % (actual, expected_end), msg) 898 899 def assertNotEndsWith(self, actual, unexpected_end, msg=None): 900 """Asserts that actual.endswith(unexpected_end) is False. 901 902 Args: 903 actual: str 904 unexpected_end: str 905 msg: Optional message to report on failure. 906 """ 907 if actual.endswith(unexpected_end): 908 self.fail('%r does end with %r' % (actual, unexpected_end), msg) 909 910 def assertSequenceStartsWith(self, prefix, whole, msg=None): 911 """An equality assertion for the beginning of ordered sequences. 912 913 If prefix is an empty sequence, it will raise an error unless whole is also 914 an empty sequence. 915 916 If prefix is not a sequence, it will raise an error if the first element of 917 whole does not match. 918 919 Args: 920 prefix: A sequence expected at the beginning of the whole parameter. 921 whole: The sequence in which to look for prefix. 922 msg: Optional message to report on failure. 923 """ 924 try: 925 prefix_len = len(prefix) 926 except (TypeError, NotImplementedError): 927 prefix = [prefix] 928 prefix_len = 1 929 930 try: 931 whole_len = len(whole) 932 except (TypeError, NotImplementedError): 933 self.fail('For whole: len(%s) is not supported, it appears to be type: ' 934 '%s' % (whole, type(whole)), msg) 935 936 assert prefix_len <= whole_len, self._formatMessage( 937 msg, 938 'Prefix length (%d) is longer than whole length (%d).' % 939 (prefix_len, whole_len) 940 ) 941 942 if not prefix_len and whole_len: 943 self.fail('Prefix length is 0 but whole length is %d: %s' % 944 (len(whole), whole), msg) 945 946 try: 947 self.assertSequenceEqual(prefix, whole[:prefix_len], msg) 948 except AssertionError: 949 self.fail('prefix: %s not found at start of whole: %s.' % 950 (prefix, whole), msg) 951 952 def assertEmpty(self, container, msg=None): 953 """Asserts that an object has zero length. 954 955 Args: 956 container: Anything that implements the collections.abc.Sized interface. 957 msg: Optional message to report on failure. 958 """ 959 if not isinstance(container, abc.Sized): 960 self.fail('Expected a Sized object, got: ' 961 '{!r}'.format(type(container).__name__), msg) 962 963 # explicitly check the length since some Sized objects (e.g. numpy.ndarray) 964 # have strange __nonzero__/__bool__ behavior. 965 if len(container): # pylint: disable=g-explicit-length-test 966 self.fail('{!r} has length of {}.'.format(container, len(container)), msg) 967 968 def assertNotEmpty(self, container, msg=None): 969 """Asserts that an object has non-zero length. 970 971 Args: 972 container: Anything that implements the collections.abc.Sized interface. 973 msg: Optional message to report on failure. 974 """ 975 if not isinstance(container, abc.Sized): 976 self.fail('Expected a Sized object, got: ' 977 '{!r}'.format(type(container).__name__), msg) 978 979 # explicitly check the length since some Sized objects (e.g. numpy.ndarray) 980 # have strange __nonzero__/__bool__ behavior. 981 if not len(container): # pylint: disable=g-explicit-length-test 982 self.fail('{!r} has length of 0.'.format(container), msg) 983 984 def assertLen(self, container, expected_len, msg=None): 985 """Asserts that an object has the expected length. 986 987 Args: 988 container: Anything that implements the collections.abc.Sized interface. 989 expected_len: The expected length of the container. 990 msg: Optional message to report on failure. 991 """ 992 if not isinstance(container, abc.Sized): 993 self.fail('Expected a Sized object, got: ' 994 '{!r}'.format(type(container).__name__), msg) 995 if len(container) != expected_len: 996 container_repr = unittest.util.safe_repr(container) # pytype: disable=module-attr 997 self.fail('{} has length of {}, expected {}.'.format( 998 container_repr, len(container), expected_len), msg) 999 1000 def assertSequenceAlmostEqual(self, expected_seq, actual_seq, places=None, 1001 msg=None, delta=None): 1002 """An approximate equality assertion for ordered sequences. 1003 1004 Fail if the two sequences are unequal as determined by their value 1005 differences rounded to the given number of decimal places (default 7) and 1006 comparing to zero, or by comparing that the difference between each value 1007 in the two sequences is more than the given delta. 1008 1009 Note that decimal places (from zero) are usually not the same as significant 1010 digits (measured from the most significant digit). 1011 1012 If the two sequences compare equal then they will automatically compare 1013 almost equal. 1014 1015 Args: 1016 expected_seq: A sequence containing elements we are expecting. 1017 actual_seq: The sequence that we are testing. 1018 places: The number of decimal places to compare. 1019 msg: The message to be printed if the test fails. 1020 delta: The OK difference between compared values. 1021 """ 1022 if len(expected_seq) != len(actual_seq): 1023 self.fail('Sequence size mismatch: {} vs {}'.format( 1024 len(expected_seq), len(actual_seq)), msg) 1025 1026 err_list = [] 1027 for idx, (exp_elem, act_elem) in enumerate(zip(expected_seq, actual_seq)): 1028 try: 1029 # assertAlmostEqual should be called with at most one of `places` and 1030 # `delta`. However, it's okay for assertSequenceAlmostEqual to pass 1031 # both because we want the latter to fail if the former does. 1032 # pytype: disable=wrong-keyword-args 1033 self.assertAlmostEqual(exp_elem, act_elem, places=places, msg=msg, 1034 delta=delta) 1035 # pytype: enable=wrong-keyword-args 1036 except self.failureException as err: 1037 err_list.append('At index {}: {}'.format(idx, err)) 1038 1039 if err_list: 1040 if len(err_list) > 30: 1041 err_list = err_list[:30] + ['...'] 1042 msg = self._formatMessage(msg, '\n'.join(err_list)) 1043 self.fail(msg) 1044 1045 def assertContainsSubset(self, expected_subset, actual_set, msg=None): 1046 """Checks whether actual iterable is a superset of expected iterable.""" 1047 missing = set(expected_subset) - set(actual_set) 1048 if not missing: 1049 return 1050 1051 self.fail('Missing elements %s\nExpected: %s\nActual: %s' % ( 1052 missing, expected_subset, actual_set), msg) 1053 1054 def assertNoCommonElements(self, expected_seq, actual_seq, msg=None): 1055 """Checks whether actual iterable and expected iterable are disjoint.""" 1056 common = set(expected_seq) & set(actual_seq) 1057 if not common: 1058 return 1059 1060 self.fail('Common elements %s\nExpected: %s\nActual: %s' % ( 1061 common, expected_seq, actual_seq), msg) 1062 1063 def assertItemsEqual(self, expected_seq, actual_seq, msg=None): 1064 """Deprecated, please use assertCountEqual instead. 1065 1066 This is equivalent to assertCountEqual. 1067 1068 Args: 1069 expected_seq: A sequence containing elements we are expecting. 1070 actual_seq: The sequence that we are testing. 1071 msg: The message to be printed if the test fails. 1072 """ 1073 super().assertCountEqual(expected_seq, actual_seq, msg) 1074 1075 def assertSameElements(self, expected_seq, actual_seq, msg=None): 1076 """Asserts that two sequences have the same elements (in any order). 1077 1078 This method, unlike assertCountEqual, doesn't care about any 1079 duplicates in the expected and actual sequences:: 1080 1081 # Doesn't raise an AssertionError 1082 assertSameElements([1, 1, 1, 0, 0, 0], [0, 1]) 1083 1084 If possible, you should use assertCountEqual instead of 1085 assertSameElements. 1086 1087 Args: 1088 expected_seq: A sequence containing elements we are expecting. 1089 actual_seq: The sequence that we are testing. 1090 msg: The message to be printed if the test fails. 1091 """ 1092 # `unittest2.TestCase` used to have assertSameElements, but it was 1093 # removed in favor of assertItemsEqual. As there's a unit test 1094 # that explicitly checks this behavior, I am leaving this method 1095 # alone. 1096 # Fail on strings: empirically, passing strings to this test method 1097 # is almost always a bug. If comparing the character sets of two strings 1098 # is desired, cast the inputs to sets or lists explicitly. 1099 if (isinstance(expected_seq, _TEXT_OR_BINARY_TYPES) or 1100 isinstance(actual_seq, _TEXT_OR_BINARY_TYPES)): 1101 self.fail('Passing string/bytes to assertSameElements is usually a bug. ' 1102 'Did you mean to use assertEqual?\n' 1103 'Expected: %s\nActual: %s' % (expected_seq, actual_seq)) 1104 try: 1105 expected = dict([(element, None) for element in expected_seq]) 1106 actual = dict([(element, None) for element in actual_seq]) 1107 missing = [element for element in expected if element not in actual] 1108 unexpected = [element for element in actual if element not in expected] 1109 missing.sort() 1110 unexpected.sort() 1111 except TypeError: 1112 # Fall back to slower list-compare if any of the objects are 1113 # not hashable. 1114 expected = list(expected_seq) 1115 actual = list(actual_seq) 1116 expected.sort() 1117 actual.sort() 1118 missing, unexpected = _sorted_list_difference(expected, actual) 1119 errors = [] 1120 if msg: 1121 errors.extend((msg, ':\n')) 1122 if missing: 1123 errors.append('Expected, but missing:\n %r\n' % missing) 1124 if unexpected: 1125 errors.append('Unexpected, but present:\n %r\n' % unexpected) 1126 if missing or unexpected: 1127 self.fail(''.join(errors)) 1128 1129 # unittest.TestCase.assertMultiLineEqual works very similarly, but it 1130 # has a different error format. However, I find this slightly more readable. 1131 def assertMultiLineEqual(self, first, second, msg=None, **kwargs): 1132 """Asserts that two multi-line strings are equal.""" 1133 assert isinstance(first, 1134 str), ('First argument is not a string: %r' % (first,)) 1135 assert isinstance(second, 1136 str), ('Second argument is not a string: %r' % (second,)) 1137 line_limit = kwargs.pop('line_limit', 0) 1138 if kwargs: 1139 raise TypeError('Unexpected keyword args {}'.format(tuple(kwargs))) 1140 1141 if first == second: 1142 return 1143 if msg: 1144 failure_message = [msg + ':\n'] 1145 else: 1146 failure_message = ['\n'] 1147 if line_limit: 1148 line_limit += len(failure_message) 1149 for line in difflib.ndiff(first.splitlines(True), second.splitlines(True)): 1150 failure_message.append(line) 1151 if not line.endswith('\n'): 1152 failure_message.append('\n') 1153 if line_limit and len(failure_message) > line_limit: 1154 n_omitted = len(failure_message) - line_limit 1155 failure_message = failure_message[:line_limit] 1156 failure_message.append( 1157 '(... and {} more delta lines omitted for brevity.)\n'.format( 1158 n_omitted)) 1159 1160 raise self.failureException(''.join(failure_message)) 1161 1162 def assertBetween(self, value, minv, maxv, msg=None): 1163 """Asserts that value is between minv and maxv (inclusive).""" 1164 msg = self._formatMessage(msg, 1165 '"%r" unexpectedly not between "%r" and "%r"' % 1166 (value, minv, maxv)) 1167 self.assertTrue(minv <= value, msg) 1168 self.assertTrue(maxv >= value, msg) 1169 1170 def assertRegexMatch(self, actual_str, regexes, message=None): 1171 r"""Asserts that at least one regex in regexes matches str. 1172 1173 If possible you should use `assertRegex`, which is a simpler 1174 version of this method. `assertRegex` takes a single regular 1175 expression (a string or re compiled object) instead of a list. 1176 1177 Notes: 1178 1179 1. This function uses substring matching, i.e. the matching 1180 succeeds if *any* substring of the error message matches *any* 1181 regex in the list. This is more convenient for the user than 1182 full-string matching. 1183 1184 2. If regexes is the empty list, the matching will always fail. 1185 1186 3. Use regexes=[''] for a regex that will always pass. 1187 1188 4. '.' matches any single character *except* the newline. To 1189 match any character, use '(.|\n)'. 1190 1191 5. '^' matches the beginning of each line, not just the beginning 1192 of the string. Similarly, '$' matches the end of each line. 1193 1194 6. An exception will be thrown if regexes contains an invalid 1195 regex. 1196 1197 Args: 1198 actual_str: The string we try to match with the items in regexes. 1199 regexes: The regular expressions we want to match against str. 1200 See "Notes" above for detailed notes on how this is interpreted. 1201 message: The message to be printed if the test fails. 1202 """ 1203 if isinstance(regexes, _TEXT_OR_BINARY_TYPES): 1204 self.fail('regexes is string or bytes; use assertRegex instead.', 1205 message) 1206 if not regexes: 1207 self.fail('No regexes specified.', message) 1208 1209 regex_type = type(regexes[0]) 1210 for regex in regexes[1:]: 1211 if type(regex) is not regex_type: # pylint: disable=unidiomatic-typecheck 1212 self.fail('regexes list must all be the same type.', message) 1213 1214 if regex_type is bytes and isinstance(actual_str, str): 1215 regexes = [regex.decode('utf-8') for regex in regexes] 1216 regex_type = str 1217 elif regex_type is str and isinstance(actual_str, bytes): 1218 regexes = [regex.encode('utf-8') for regex in regexes] 1219 regex_type = bytes 1220 1221 if regex_type is str: 1222 regex = u'(?:%s)' % u')|(?:'.join(regexes) 1223 elif regex_type is bytes: 1224 regex = b'(?:' + (b')|(?:'.join(regexes)) + b')' 1225 else: 1226 self.fail('Only know how to deal with unicode str or bytes regexes.', 1227 message) 1228 1229 if not re.search(regex, actual_str, re.MULTILINE): 1230 self.fail('"%s" does not contain any of these regexes: %s.' % 1231 (actual_str, regexes), message) 1232 1233 def assertCommandSucceeds(self, command, regexes=(b'',), env=None, 1234 close_fds=True, msg=None): 1235 """Asserts that a shell command succeeds (i.e. exits with code 0). 1236 1237 Args: 1238 command: List or string representing the command to run. 1239 regexes: List of regular expression byte strings that match success. 1240 env: Dictionary of environment variable settings. If None, no environment 1241 variables will be set for the child process. This is to make tests 1242 more hermetic. NOTE: this behavior is different than the standard 1243 subprocess module. 1244 close_fds: Whether or not to close all open fd's in the child after 1245 forking. 1246 msg: Optional message to report on failure. 1247 """ 1248 (ret_code, err) = get_command_stderr(command, env, close_fds) 1249 1250 # We need bytes regexes here because `err` is bytes. 1251 # Accommodate code which listed their output regexes w/o the b'' prefix by 1252 # converting them to bytes for the user. 1253 if isinstance(regexes[0], str): 1254 regexes = [regex.encode('utf-8') for regex in regexes] 1255 1256 command_string = get_command_string(command) 1257 self.assertEqual( 1258 ret_code, 0, 1259 self._formatMessage(msg, 1260 'Running command\n' 1261 '%s failed with error code %s and message\n' 1262 '%s' % (_quote_long_string(command_string), 1263 ret_code, 1264 _quote_long_string(err))) 1265 ) 1266 self.assertRegexMatch( 1267 err, 1268 regexes, 1269 message=self._formatMessage( 1270 msg, 1271 'Running command\n' 1272 '%s failed with error code %s and message\n' 1273 '%s which matches no regex in %s' % ( 1274 _quote_long_string(command_string), 1275 ret_code, 1276 _quote_long_string(err), 1277 regexes))) 1278 1279 def assertCommandFails(self, command, regexes, env=None, close_fds=True, 1280 msg=None): 1281 """Asserts a shell command fails and the error matches a regex in a list. 1282 1283 Args: 1284 command: List or string representing the command to run. 1285 regexes: the list of regular expression strings. 1286 env: Dictionary of environment variable settings. If None, no environment 1287 variables will be set for the child process. This is to make tests 1288 more hermetic. NOTE: this behavior is different than the standard 1289 subprocess module. 1290 close_fds: Whether or not to close all open fd's in the child after 1291 forking. 1292 msg: Optional message to report on failure. 1293 """ 1294 (ret_code, err) = get_command_stderr(command, env, close_fds) 1295 1296 # We need bytes regexes here because `err` is bytes. 1297 # Accommodate code which listed their output regexes w/o the b'' prefix by 1298 # converting them to bytes for the user. 1299 if isinstance(regexes[0], str): 1300 regexes = [regex.encode('utf-8') for regex in regexes] 1301 1302 command_string = get_command_string(command) 1303 self.assertNotEqual( 1304 ret_code, 0, 1305 self._formatMessage(msg, 'The following command succeeded ' 1306 'while expected to fail:\n%s' % 1307 _quote_long_string(command_string))) 1308 self.assertRegexMatch( 1309 err, 1310 regexes, 1311 message=self._formatMessage( 1312 msg, 1313 'Running command\n' 1314 '%s failed with error code %s and message\n' 1315 '%s which matches no regex in %s' % ( 1316 _quote_long_string(command_string), 1317 ret_code, 1318 _quote_long_string(err), 1319 regexes))) 1320 1321 class _AssertRaisesContext(object): 1322 1323 def __init__(self, expected_exception, test_case, test_func, msg=None): 1324 self.expected_exception = expected_exception 1325 self.test_case = test_case 1326 self.test_func = test_func 1327 self.msg = msg 1328 1329 def __enter__(self): 1330 return self 1331 1332 def __exit__(self, exc_type, exc_value, tb): 1333 if exc_type is None: 1334 self.test_case.fail(self.expected_exception.__name__ + ' not raised', 1335 self.msg) 1336 if not issubclass(exc_type, self.expected_exception): 1337 return False 1338 self.test_func(exc_value) 1339 if exc_value: 1340 self.exception = exc_value.with_traceback(None) 1341 return True 1342 1343 @typing.overload 1344 def assertRaisesWithPredicateMatch( 1345 self, expected_exception, predicate) -> _AssertRaisesContext: 1346 # The purpose of this return statement is to work around 1347 # https://github.com/PyCQA/pylint/issues/5273; it is otherwise ignored. 1348 return self._AssertRaisesContext(None, None, None) 1349 1350 @typing.overload 1351 def assertRaisesWithPredicateMatch( 1352 self, expected_exception, predicate, callable_obj: Callable[..., Any], 1353 *args, **kwargs) -> None: 1354 # The purpose of this return statement is to work around 1355 # https://github.com/PyCQA/pylint/issues/5273; it is otherwise ignored. 1356 return self._AssertRaisesContext(None, None, None) 1357 1358 def assertRaisesWithPredicateMatch(self, expected_exception, predicate, 1359 callable_obj=None, *args, **kwargs): 1360 """Asserts that exception is thrown and predicate(exception) is true. 1361 1362 Args: 1363 expected_exception: Exception class expected to be raised. 1364 predicate: Function of one argument that inspects the passed-in exception 1365 and returns True (success) or False (please fail the test). 1366 callable_obj: Function to be called. 1367 *args: Extra args. 1368 **kwargs: Extra keyword args. 1369 1370 Returns: 1371 A context manager if callable_obj is None. Otherwise, None. 1372 1373 Raises: 1374 self.failureException if callable_obj does not raise a matching exception. 1375 """ 1376 def Check(err): 1377 self.assertTrue(predicate(err), 1378 '%r does not match predicate %r' % (err, predicate)) 1379 1380 context = self._AssertRaisesContext(expected_exception, self, Check) 1381 if callable_obj is None: 1382 return context 1383 with context: 1384 callable_obj(*args, **kwargs) 1385 1386 @typing.overload 1387 def assertRaisesWithLiteralMatch( 1388 self, expected_exception, expected_exception_message 1389 ) -> _AssertRaisesContext: 1390 # The purpose of this return statement is to work around 1391 # https://github.com/PyCQA/pylint/issues/5273; it is otherwise ignored. 1392 return self._AssertRaisesContext(None, None, None) 1393 1394 @typing.overload 1395 def assertRaisesWithLiteralMatch( 1396 self, expected_exception, expected_exception_message, 1397 callable_obj: Callable[..., Any], *args, **kwargs) -> None: 1398 # The purpose of this return statement is to work around 1399 # https://github.com/PyCQA/pylint/issues/5273; it is otherwise ignored. 1400 return self._AssertRaisesContext(None, None, None) 1401 1402 def assertRaisesWithLiteralMatch(self, expected_exception, 1403 expected_exception_message, 1404 callable_obj=None, *args, **kwargs): 1405 """Asserts that the message in a raised exception equals the given string. 1406 1407 Unlike assertRaisesRegex, this method takes a literal string, not 1408 a regular expression. 1409 1410 with self.assertRaisesWithLiteralMatch(ExType, 'message'): 1411 DoSomething() 1412 1413 Args: 1414 expected_exception: Exception class expected to be raised. 1415 expected_exception_message: String message expected in the raised 1416 exception. For a raise exception e, expected_exception_message must 1417 equal str(e). 1418 callable_obj: Function to be called, or None to return a context. 1419 *args: Extra args. 1420 **kwargs: Extra kwargs. 1421 1422 Returns: 1423 A context manager if callable_obj is None. Otherwise, None. 1424 1425 Raises: 1426 self.failureException if callable_obj does not raise a matching exception. 1427 """ 1428 def Check(err): 1429 actual_exception_message = str(err) 1430 self.assertTrue(expected_exception_message == actual_exception_message, 1431 'Exception message does not match.\n' 1432 'Expected: %r\n' 1433 'Actual: %r' % (expected_exception_message, 1434 actual_exception_message)) 1435 1436 context = self._AssertRaisesContext(expected_exception, self, Check) 1437 if callable_obj is None: 1438 return context 1439 with context: 1440 callable_obj(*args, **kwargs) 1441 1442 def assertContainsInOrder(self, strings, target, msg=None): 1443 """Asserts that the strings provided are found in the target in order. 1444 1445 This may be useful for checking HTML output. 1446 1447 Args: 1448 strings: A list of strings, such as [ 'fox', 'dog' ] 1449 target: A target string in which to look for the strings, such as 1450 'The quick brown fox jumped over the lazy dog'. 1451 msg: Optional message to report on failure. 1452 """ 1453 if isinstance(strings, (bytes, unicode if str is bytes else str)): 1454 strings = (strings,) 1455 1456 current_index = 0 1457 last_string = None 1458 for string in strings: 1459 index = target.find(str(string), current_index) 1460 if index == -1 and current_index == 0: 1461 self.fail("Did not find '%s' in '%s'" % 1462 (string, target), msg) 1463 elif index == -1: 1464 self.fail("Did not find '%s' after '%s' in '%s'" % 1465 (string, last_string, target), msg) 1466 last_string = string 1467 current_index = index 1468 1469 def assertContainsSubsequence(self, container, subsequence, msg=None): 1470 """Asserts that "container" contains "subsequence" as a subsequence. 1471 1472 Asserts that "container" contains all the elements of "subsequence", in 1473 order, but possibly with other elements interspersed. For example, [1, 2, 3] 1474 is a subsequence of [0, 0, 1, 2, 0, 3, 0] but not of [0, 0, 1, 3, 0, 2, 0]. 1475 1476 Args: 1477 container: the list we're testing for subsequence inclusion. 1478 subsequence: the list we hope will be a subsequence of container. 1479 msg: Optional message to report on failure. 1480 """ 1481 first_nonmatching = None 1482 reversed_container = list(reversed(container)) 1483 subsequence = list(subsequence) 1484 1485 for e in subsequence: 1486 if e not in reversed_container: 1487 first_nonmatching = e 1488 break 1489 while e != reversed_container.pop(): 1490 pass 1491 1492 if first_nonmatching is not None: 1493 self.fail('%s not a subsequence of %s. First non-matching element: %s' % 1494 (subsequence, container, first_nonmatching), msg) 1495 1496 def assertContainsExactSubsequence(self, container, subsequence, msg=None): 1497 """Asserts that "container" contains "subsequence" as an exact subsequence. 1498 1499 Asserts that "container" contains all the elements of "subsequence", in 1500 order, and without other elements interspersed. For example, [1, 2, 3] is an 1501 exact subsequence of [0, 0, 1, 2, 3, 0] but not of [0, 0, 1, 2, 0, 3, 0]. 1502 1503 Args: 1504 container: the list we're testing for subsequence inclusion. 1505 subsequence: the list we hope will be an exact subsequence of container. 1506 msg: Optional message to report on failure. 1507 """ 1508 container = list(container) 1509 subsequence = list(subsequence) 1510 longest_match = 0 1511 1512 for start in range(1 + len(container) - len(subsequence)): 1513 if longest_match == len(subsequence): 1514 break 1515 index = 0 1516 while (index < len(subsequence) and 1517 subsequence[index] == container[start + index]): 1518 index += 1 1519 longest_match = max(longest_match, index) 1520 1521 if longest_match < len(subsequence): 1522 self.fail('%s not an exact subsequence of %s. ' 1523 'Longest matching prefix: %s' % 1524 (subsequence, container, subsequence[:longest_match]), msg) 1525 1526 def assertTotallyOrdered(self, *groups, **kwargs): 1527 """Asserts that total ordering has been implemented correctly. 1528 1529 For example, say you have a class A that compares only on its attribute x. 1530 Comparators other than ``__lt__`` are omitted for brevity:: 1531 1532 class A(object): 1533 def __init__(self, x, y): 1534 self.x = x 1535 self.y = y 1536 1537 def __hash__(self): 1538 return hash(self.x) 1539 1540 def __lt__(self, other): 1541 try: 1542 return self.x < other.x 1543 except AttributeError: 1544 return NotImplemented 1545 1546 assertTotallyOrdered will check that instances can be ordered correctly. 1547 For example:: 1548 1549 self.assertTotallyOrdered( 1550 [None], # None should come before everything else. 1551 [1], # Integers sort earlier. 1552 [A(1, 'a')], 1553 [A(2, 'b')], # 2 is after 1. 1554 [A(3, 'c'), A(3, 'd')], # The second argument is irrelevant. 1555 [A(4, 'z')], 1556 ['foo']) # Strings sort last. 1557 1558 Args: 1559 *groups: A list of groups of elements. Each group of elements is a list 1560 of objects that are equal. The elements in each group must be less 1561 than the elements in the group after it. For example, these groups are 1562 totally ordered: ``[None]``, ``[1]``, ``[2, 2]``, ``[3]``. 1563 **kwargs: optional msg keyword argument can be passed. 1564 """ 1565 1566 def CheckOrder(small, big): 1567 """Ensures small is ordered before big.""" 1568 self.assertFalse(small == big, 1569 self._formatMessage(msg, '%r unexpectedly equals %r' % 1570 (small, big))) 1571 self.assertTrue(small != big, 1572 self._formatMessage(msg, '%r unexpectedly equals %r' % 1573 (small, big))) 1574 self.assertLess(small, big, msg) 1575 self.assertFalse(big < small, 1576 self._formatMessage(msg, 1577 '%r unexpectedly less than %r' % 1578 (big, small))) 1579 self.assertLessEqual(small, big, msg) 1580 self.assertFalse(big <= small, self._formatMessage( 1581 '%r unexpectedly less than or equal to %r' % (big, small), msg 1582 )) 1583 self.assertGreater(big, small, msg) 1584 self.assertFalse(small > big, 1585 self._formatMessage(msg, 1586 '%r unexpectedly greater than %r' % 1587 (small, big))) 1588 self.assertGreaterEqual(big, small) 1589 self.assertFalse(small >= big, self._formatMessage( 1590 msg, 1591 '%r unexpectedly greater than or equal to %r' % (small, big))) 1592 1593 def CheckEqual(a, b): 1594 """Ensures that a and b are equal.""" 1595 self.assertEqual(a, b, msg) 1596 self.assertFalse(a != b, 1597 self._formatMessage(msg, '%r unexpectedly unequals %r' % 1598 (a, b))) 1599 1600 # Objects that compare equal must hash to the same value, but this only 1601 # applies if both objects are hashable. 1602 if (isinstance(a, abc.Hashable) and 1603 isinstance(b, abc.Hashable)): 1604 self.assertEqual( 1605 hash(a), hash(b), 1606 self._formatMessage( 1607 msg, 'hash %d of %r unexpectedly not equal to hash %d of %r' % 1608 (hash(a), a, hash(b), b))) 1609 1610 self.assertFalse(a < b, 1611 self._formatMessage(msg, 1612 '%r unexpectedly less than %r' % 1613 (a, b))) 1614 self.assertFalse(b < a, 1615 self._formatMessage(msg, 1616 '%r unexpectedly less than %r' % 1617 (b, a))) 1618 self.assertLessEqual(a, b, msg) 1619 self.assertLessEqual(b, a, msg) # pylint: disable=arguments-out-of-order 1620 self.assertFalse(a > b, 1621 self._formatMessage(msg, 1622 '%r unexpectedly greater than %r' % 1623 (a, b))) 1624 self.assertFalse(b > a, 1625 self._formatMessage(msg, 1626 '%r unexpectedly greater than %r' % 1627 (b, a))) 1628 self.assertGreaterEqual(a, b, msg) 1629 self.assertGreaterEqual(b, a, msg) # pylint: disable=arguments-out-of-order 1630 1631 msg = kwargs.get('msg') 1632 1633 # For every combination of elements, check the order of every pair of 1634 # elements. 1635 for elements in itertools.product(*groups): 1636 elements = list(elements) 1637 for index, small in enumerate(elements[:-1]): 1638 for big in elements[index + 1:]: 1639 CheckOrder(small, big) 1640 1641 # Check that every element in each group is equal. 1642 for group in groups: 1643 for a in group: 1644 CheckEqual(a, a) 1645 for a, b in itertools.product(group, group): 1646 CheckEqual(a, b) 1647 1648 def assertDictEqual(self, a, b, msg=None): 1649 """Raises AssertionError if a and b are not equal dictionaries. 1650 1651 Args: 1652 a: A dict, the expected value. 1653 b: A dict, the actual value. 1654 msg: An optional str, the associated message. 1655 1656 Raises: 1657 AssertionError: if the dictionaries are not equal. 1658 """ 1659 self.assertIsInstance(a, dict, self._formatMessage( 1660 msg, 1661 'First argument is not a dictionary' 1662 )) 1663 self.assertIsInstance(b, dict, self._formatMessage( 1664 msg, 1665 'Second argument is not a dictionary' 1666 )) 1667 1668 def Sorted(list_of_items): 1669 try: 1670 return sorted(list_of_items) # In 3.3, unordered are possible. 1671 except TypeError: 1672 return list_of_items 1673 1674 if a == b: 1675 return 1676 a_items = Sorted(list(a.items())) 1677 b_items = Sorted(list(b.items())) 1678 1679 unexpected = [] 1680 missing = [] 1681 different = [] 1682 1683 safe_repr = unittest.util.safe_repr # pytype: disable=module-attr 1684 1685 def Repr(dikt): 1686 """Deterministic repr for dict.""" 1687 # Sort the entries based on their repr, not based on their sort order, 1688 # which will be non-deterministic across executions, for many types. 1689 entries = sorted((safe_repr(k), safe_repr(v)) for k, v in dikt.items()) 1690 return '{%s}' % (', '.join('%s: %s' % pair for pair in entries)) 1691 1692 message = ['%s != %s%s' % (Repr(a), Repr(b), ' (%s)' % msg if msg else '')] 1693 1694 # The standard library default output confounds lexical difference with 1695 # value difference; treat them separately. 1696 for a_key, a_value in a_items: 1697 if a_key not in b: 1698 missing.append((a_key, a_value)) 1699 elif a_value != b[a_key]: 1700 different.append((a_key, a_value, b[a_key])) 1701 1702 for b_key, b_value in b_items: 1703 if b_key not in a: 1704 unexpected.append((b_key, b_value)) 1705 1706 if unexpected: 1707 message.append( 1708 'Unexpected, but present entries:\n%s' % ''.join( 1709 '%s: %s\n' % (safe_repr(k), safe_repr(v)) for k, v in unexpected)) 1710 1711 if different: 1712 message.append( 1713 'repr() of differing entries:\n%s' % ''.join( 1714 '%s: %s != %s\n' % (safe_repr(k), safe_repr(a_value), 1715 safe_repr(b_value)) 1716 for k, a_value, b_value in different)) 1717 1718 if missing: 1719 message.append( 1720 'Missing entries:\n%s' % ''.join( 1721 ('%s: %s\n' % (safe_repr(k), safe_repr(v)) for k, v in missing))) 1722 1723 raise self.failureException('\n'.join(message)) 1724 1725 def assertUrlEqual(self, a, b, msg=None): 1726 """Asserts that urls are equal, ignoring ordering of query params.""" 1727 parsed_a = parse.urlparse(a) 1728 parsed_b = parse.urlparse(b) 1729 self.assertEqual(parsed_a.scheme, parsed_b.scheme, msg) 1730 self.assertEqual(parsed_a.netloc, parsed_b.netloc, msg) 1731 self.assertEqual(parsed_a.path, parsed_b.path, msg) 1732 self.assertEqual(parsed_a.fragment, parsed_b.fragment, msg) 1733 self.assertEqual(sorted(parsed_a.params.split(';')), 1734 sorted(parsed_b.params.split(';')), msg) 1735 self.assertDictEqual( 1736 parse.parse_qs(parsed_a.query, keep_blank_values=True), 1737 parse.parse_qs(parsed_b.query, keep_blank_values=True), msg) 1738 1739 def assertSameStructure(self, a, b, aname='a', bname='b', msg=None): 1740 """Asserts that two values contain the same structural content. 1741 1742 The two arguments should be data trees consisting of trees of dicts and 1743 lists. They will be deeply compared by walking into the contents of dicts 1744 and lists; other items will be compared using the == operator. 1745 If the two structures differ in content, the failure message will indicate 1746 the location within the structures where the first difference is found. 1747 This may be helpful when comparing large structures. 1748 1749 Mixed Sequence and Set types are supported. Mixed Mapping types are 1750 supported, but the order of the keys will not be considered in the 1751 comparison. 1752 1753 Args: 1754 a: The first structure to compare. 1755 b: The second structure to compare. 1756 aname: Variable name to use for the first structure in assertion messages. 1757 bname: Variable name to use for the second structure. 1758 msg: Additional text to include in the failure message. 1759 """ 1760 1761 # Accumulate all the problems found so we can report all of them at once 1762 # rather than just stopping at the first 1763 problems = [] 1764 1765 _walk_structure_for_problems(a, b, aname, bname, problems) 1766 1767 # Avoid spamming the user toooo much 1768 if self.maxDiff is not None: 1769 max_problems_to_show = self.maxDiff // 80 1770 if len(problems) > max_problems_to_show: 1771 problems = problems[0:max_problems_to_show-1] + ['...'] 1772 1773 if problems: 1774 self.fail('; '.join(problems), msg) 1775 1776 def assertJsonEqual(self, first, second, msg=None): 1777 """Asserts that the JSON objects defined in two strings are equal. 1778 1779 A summary of the differences will be included in the failure message 1780 using assertSameStructure. 1781 1782 Args: 1783 first: A string containing JSON to decode and compare to second. 1784 second: A string containing JSON to decode and compare to first. 1785 msg: Additional text to include in the failure message. 1786 """ 1787 try: 1788 first_structured = json.loads(first) 1789 except ValueError as e: 1790 raise ValueError(self._formatMessage( 1791 msg, 1792 'could not decode first JSON value %s: %s' % (first, e))) 1793 1794 try: 1795 second_structured = json.loads(second) 1796 except ValueError as e: 1797 raise ValueError(self._formatMessage( 1798 msg, 1799 'could not decode second JSON value %s: %s' % (second, e))) 1800 1801 self.assertSameStructure(first_structured, second_structured, 1802 aname='first', bname='second', msg=msg) 1803 1804 def _getAssertEqualityFunc(self, first, second): 1805 # type: (Any, Any) -> Callable[..., None] 1806 try: 1807 return super(TestCase, self)._getAssertEqualityFunc(first, second) 1808 except AttributeError: 1809 # This is a workaround if unittest.TestCase.__init__ was never run. 1810 # It usually means that somebody created a subclass just for the 1811 # assertions and has overridden __init__. "assertTrue" is a safe 1812 # value that will not make __init__ raise a ValueError. 1813 test_method = getattr(self, '_testMethodName', 'assertTrue') 1814 super(TestCase, self).__init__(test_method) 1815 1816 return super(TestCase, self)._getAssertEqualityFunc(first, second) 1817 1818 def fail(self, msg=None, prefix=None): 1819 """Fail immediately with the given message, optionally prefixed.""" 1820 return super(TestCase, self).fail(self._formatMessage(prefix, msg)) 1821 1822 1823def _sorted_list_difference(expected, actual): 1824 # type: (List[_T], List[_T]) -> Tuple[List[_T], List[_T]] 1825 """Finds elements in only one or the other of two, sorted input lists. 1826 1827 Returns a two-element tuple of lists. The first list contains those 1828 elements in the "expected" list but not in the "actual" list, and the 1829 second contains those elements in the "actual" list but not in the 1830 "expected" list. Duplicate elements in either input list are ignored. 1831 1832 Args: 1833 expected: The list we expected. 1834 actual: The list we actually got. 1835 Returns: 1836 (missing, unexpected) 1837 missing: items in expected that are not in actual. 1838 unexpected: items in actual that are not in expected. 1839 """ 1840 i = j = 0 1841 missing = [] 1842 unexpected = [] 1843 while True: 1844 try: 1845 e = expected[i] 1846 a = actual[j] 1847 if e < a: 1848 missing.append(e) 1849 i += 1 1850 while expected[i] == e: 1851 i += 1 1852 elif e > a: 1853 unexpected.append(a) 1854 j += 1 1855 while actual[j] == a: 1856 j += 1 1857 else: 1858 i += 1 1859 try: 1860 while expected[i] == e: 1861 i += 1 1862 finally: 1863 j += 1 1864 while actual[j] == a: 1865 j += 1 1866 except IndexError: 1867 missing.extend(expected[i:]) 1868 unexpected.extend(actual[j:]) 1869 break 1870 return missing, unexpected 1871 1872 1873def _are_both_of_integer_type(a, b): 1874 # type: (object, object) -> bool 1875 return isinstance(a, int) and isinstance(b, int) 1876 1877 1878def _are_both_of_sequence_type(a, b): 1879 # type: (object, object) -> bool 1880 return isinstance(a, abc.Sequence) and isinstance( 1881 b, abc.Sequence) and not isinstance( 1882 a, _TEXT_OR_BINARY_TYPES) and not isinstance(b, _TEXT_OR_BINARY_TYPES) 1883 1884 1885def _are_both_of_set_type(a, b): 1886 # type: (object, object) -> bool 1887 return isinstance(a, abc.Set) and isinstance(b, abc.Set) 1888 1889 1890def _are_both_of_mapping_type(a, b): 1891 # type: (object, object) -> bool 1892 return isinstance(a, abc.Mapping) and isinstance( 1893 b, abc.Mapping) 1894 1895 1896def _walk_structure_for_problems(a, b, aname, bname, problem_list): 1897 """The recursive comparison behind assertSameStructure.""" 1898 if type(a) != type(b) and not ( # pylint: disable=unidiomatic-typecheck 1899 _are_both_of_integer_type(a, b) or _are_both_of_sequence_type(a, b) or 1900 _are_both_of_set_type(a, b) or _are_both_of_mapping_type(a, b)): 1901 # We do not distinguish between int and long types as 99.99% of Python 2 1902 # code should never care. They collapse into a single type in Python 3. 1903 problem_list.append('%s is a %r but %s is a %r' % 1904 (aname, type(a), bname, type(b))) 1905 # If they have different types there's no point continuing 1906 return 1907 1908 if isinstance(a, abc.Set): 1909 for k in a: 1910 if k not in b: 1911 problem_list.append( 1912 '%s has %r but %s does not' % (aname, k, bname)) 1913 for k in b: 1914 if k not in a: 1915 problem_list.append('%s lacks %r but %s has it' % (aname, k, bname)) 1916 1917 # NOTE: a or b could be a defaultdict, so we must take care that the traversal 1918 # doesn't modify the data. 1919 elif isinstance(a, abc.Mapping): 1920 for k in a: 1921 if k in b: 1922 _walk_structure_for_problems( 1923 a[k], b[k], '%s[%r]' % (aname, k), '%s[%r]' % (bname, k), 1924 problem_list) 1925 else: 1926 problem_list.append( 1927 "%s has [%r] with value %r but it's missing in %s" % 1928 (aname, k, a[k], bname)) 1929 for k in b: 1930 if k not in a: 1931 problem_list.append( 1932 '%s lacks [%r] but %s has it with value %r' % 1933 (aname, k, bname, b[k])) 1934 1935 # Strings/bytes are Sequences but we'll just do those with regular != 1936 elif (isinstance(a, abc.Sequence) and 1937 not isinstance(a, _TEXT_OR_BINARY_TYPES)): 1938 minlen = min(len(a), len(b)) 1939 for i in range(minlen): 1940 _walk_structure_for_problems( 1941 a[i], b[i], '%s[%d]' % (aname, i), '%s[%d]' % (bname, i), 1942 problem_list) 1943 for i in range(minlen, len(a)): 1944 problem_list.append('%s has [%i] with value %r but %s does not' % 1945 (aname, i, a[i], bname)) 1946 for i in range(minlen, len(b)): 1947 problem_list.append('%s lacks [%i] but %s has it with value %r' % 1948 (aname, i, bname, b[i])) 1949 1950 else: 1951 if a != b: 1952 problem_list.append('%s is %r but %s is %r' % (aname, a, bname, b)) 1953 1954 1955def get_command_string(command): 1956 """Returns an escaped string that can be used as a shell command. 1957 1958 Args: 1959 command: List or string representing the command to run. 1960 Returns: 1961 A string suitable for use as a shell command. 1962 """ 1963 if isinstance(command, str): 1964 return command 1965 else: 1966 if os.name == 'nt': 1967 return ' '.join(command) 1968 else: 1969 # The following is identical to Python 3's shlex.quote function. 1970 command_string = '' 1971 for word in command: 1972 # Single quote word, and replace each ' in word with '"'"' 1973 command_string += "'" + word.replace("'", "'\"'\"'") + "' " 1974 return command_string[:-1] 1975 1976 1977def get_command_stderr(command, env=None, close_fds=True): 1978 """Runs the given shell command and returns a tuple. 1979 1980 Args: 1981 command: List or string representing the command to run. 1982 env: Dictionary of environment variable settings. If None, no environment 1983 variables will be set for the child process. This is to make tests 1984 more hermetic. NOTE: this behavior is different than the standard 1985 subprocess module. 1986 close_fds: Whether or not to close all open fd's in the child after forking. 1987 On Windows, this is ignored and close_fds is always False. 1988 1989 Returns: 1990 Tuple of (exit status, text printed to stdout and stderr by the command). 1991 """ 1992 if env is None: env = {} 1993 if os.name == 'nt': 1994 # Windows does not support setting close_fds to True while also redirecting 1995 # standard handles. 1996 close_fds = False 1997 1998 use_shell = isinstance(command, str) 1999 process = subprocess.Popen( 2000 command, 2001 close_fds=close_fds, 2002 env=env, 2003 shell=use_shell, 2004 stderr=subprocess.STDOUT, 2005 stdout=subprocess.PIPE) 2006 output = process.communicate()[0] 2007 exit_status = process.wait() 2008 return (exit_status, output) 2009 2010 2011def _quote_long_string(s): 2012 # type: (Union[Text, bytes, bytearray]) -> Text 2013 """Quotes a potentially multi-line string to make the start and end obvious. 2014 2015 Args: 2016 s: A string. 2017 2018 Returns: 2019 The quoted string. 2020 """ 2021 if isinstance(s, (bytes, bytearray)): 2022 try: 2023 s = s.decode('utf-8') 2024 except UnicodeDecodeError: 2025 s = str(s) 2026 return ('8<-----------\n' + 2027 s + '\n' + 2028 '----------->8\n') 2029 2030 2031def print_python_version(): 2032 # type: () -> None 2033 # Having this in the test output logs by default helps debugging when all 2034 # you've got is the log and no other idea of which Python was used. 2035 sys.stderr.write('Running tests under Python {0[0]}.{0[1]}.{0[2]}: ' 2036 '{1}\n'.format( 2037 sys.version_info, 2038 sys.executable if sys.executable else 'embedded.')) 2039 2040 2041def main(*args, **kwargs): 2042 # type: (Text, Any) -> None 2043 """Executes a set of Python unit tests. 2044 2045 Usually this function is called without arguments, so the 2046 unittest.TestProgram instance will get created with the default settings, 2047 so it will run all test methods of all TestCase classes in the ``__main__`` 2048 module. 2049 2050 Args: 2051 *args: Positional arguments passed through to 2052 ``unittest.TestProgram.__init__``. 2053 **kwargs: Keyword arguments passed through to 2054 ``unittest.TestProgram.__init__``. 2055 """ 2056 print_python_version() 2057 _run_in_app(run_tests, args, kwargs) 2058 2059 2060def _is_in_app_main(): 2061 # type: () -> bool 2062 """Returns True iff app.run is active.""" 2063 f = sys._getframe().f_back # pylint: disable=protected-access 2064 while f: 2065 if f.f_code == app.run.__code__: 2066 return True 2067 f = f.f_back 2068 return False 2069 2070 2071def _register_sigterm_with_faulthandler(): 2072 # type: () -> None 2073 """Have faulthandler dump stacks on SIGTERM. Useful to diagnose timeouts.""" 2074 if faulthandler and getattr(faulthandler, 'register', None): 2075 # faulthandler.register is not available on Windows. 2076 # faulthandler.enable() is already called by app.run. 2077 try: 2078 faulthandler.register(signal.SIGTERM, chain=True) # pytype: disable=module-attr 2079 except Exception as e: # pylint: disable=broad-except 2080 sys.stderr.write('faulthandler.register(SIGTERM) failed ' 2081 '%r; ignoring.\n' % e) 2082 2083 2084def _run_in_app(function, args, kwargs): 2085 # type: (Callable[..., None], Sequence[Text], Mapping[Text, Any]) -> None 2086 """Executes a set of Python unit tests, ensuring app.run. 2087 2088 This is a private function, users should call absltest.main(). 2089 2090 _run_in_app calculates argv to be the command-line arguments of this program 2091 (without the flags), sets the default of FLAGS.alsologtostderr to True, 2092 then it calls function(argv, args, kwargs), making sure that `function' 2093 will get called within app.run(). _run_in_app does this by checking whether 2094 it is called by app.run(), or by calling app.run() explicitly. 2095 2096 The reason why app.run has to be ensured is to make sure that 2097 flags are parsed and stripped properly, and other initializations done by 2098 the app module are also carried out, no matter if absltest.run() is called 2099 from within or outside app.run(). 2100 2101 If _run_in_app is called from within app.run(), then it will reparse 2102 sys.argv and pass the result without command-line flags into the argv 2103 argument of `function'. The reason why this parsing is needed is that 2104 __main__.main() calls absltest.main() without passing its argv. So the 2105 only way _run_in_app could get to know the argv without the flags is that 2106 it reparses sys.argv. 2107 2108 _run_in_app changes the default of FLAGS.alsologtostderr to True so that the 2109 test program's stderr will contain all the log messages unless otherwise 2110 specified on the command-line. This overrides any explicit assignment to 2111 FLAGS.alsologtostderr by the test program prior to the call to _run_in_app() 2112 (e.g. in __main__.main). 2113 2114 Please note that _run_in_app (and the function it calls) is allowed to make 2115 changes to kwargs. 2116 2117 Args: 2118 function: absltest.run_tests or a similar function. It will be called as 2119 function(argv, args, kwargs) where argv is a list containing the 2120 elements of sys.argv without the command-line flags. 2121 args: Positional arguments passed through to unittest.TestProgram.__init__. 2122 kwargs: Keyword arguments passed through to unittest.TestProgram.__init__. 2123 """ 2124 if _is_in_app_main(): 2125 _register_sigterm_with_faulthandler() 2126 2127 # Change the default of alsologtostderr from False to True, so the test 2128 # programs's stderr will contain all the log messages. 2129 # If --alsologtostderr=false is specified in the command-line, or user 2130 # has called FLAGS.alsologtostderr = False before, then the value is kept 2131 # False. 2132 FLAGS.set_default('alsologtostderr', True) 2133 2134 # Here we only want to get the `argv` without the flags. To avoid any 2135 # side effects of parsing flags, we temporarily stub out the `parse` method 2136 stored_parse_methods = {} 2137 noop_parse = lambda _: None 2138 for name in FLAGS: 2139 # Avoid any side effects of parsing flags. 2140 stored_parse_methods[name] = FLAGS[name].parse 2141 # This must be a separate loop since multiple flag names (short_name=) can 2142 # point to the same flag object. 2143 for name in FLAGS: 2144 FLAGS[name].parse = noop_parse 2145 try: 2146 argv = FLAGS(sys.argv) 2147 finally: 2148 for name in FLAGS: 2149 FLAGS[name].parse = stored_parse_methods[name] 2150 sys.stdout.flush() 2151 2152 function(argv, args, kwargs) 2153 else: 2154 # Send logging to stderr. Use --alsologtostderr instead of --logtostderr 2155 # in case tests are reading their own logs. 2156 FLAGS.set_default('alsologtostderr', True) 2157 2158 def main_function(argv): 2159 _register_sigterm_with_faulthandler() 2160 function(argv, args, kwargs) 2161 2162 app.run(main=main_function) 2163 2164 2165def _is_suspicious_attribute(testCaseClass, name): 2166 # type: (Type, Text) -> bool 2167 """Returns True if an attribute is a method named like a test method.""" 2168 if name.startswith('Test') and len(name) > 4 and name[4].isupper(): 2169 attr = getattr(testCaseClass, name) 2170 if inspect.isfunction(attr) or inspect.ismethod(attr): 2171 args = inspect.getfullargspec(attr) 2172 return (len(args.args) == 1 and args.args[0] == 'self' and 2173 args.varargs is None and args.varkw is None and 2174 not args.kwonlyargs) 2175 return False 2176 2177 2178def skipThisClass(reason): 2179 # type: (Text) -> Callable[[_T], _T] 2180 """Skip tests in the decorated TestCase, but not any of its subclasses. 2181 2182 This decorator indicates that this class should skip all its tests, but not 2183 any of its subclasses. Useful for if you want to share testMethod or setUp 2184 implementations between a number of concrete testcase classes. 2185 2186 Example usage, showing how you can share some common test methods between 2187 subclasses. In this example, only ``BaseTest`` will be marked as skipped, and 2188 not RealTest or SecondRealTest:: 2189 2190 @absltest.skipThisClass("Shared functionality") 2191 class BaseTest(absltest.TestCase): 2192 def test_simple_functionality(self): 2193 self.assertEqual(self.system_under_test.method(), 1) 2194 2195 class RealTest(BaseTest): 2196 def setUp(self): 2197 super().setUp() 2198 self.system_under_test = MakeSystem(argument) 2199 2200 def test_specific_behavior(self): 2201 ... 2202 2203 class SecondRealTest(BaseTest): 2204 def setUp(self): 2205 super().setUp() 2206 self.system_under_test = MakeSystem(other_arguments) 2207 2208 def test_other_behavior(self): 2209 ... 2210 2211 Args: 2212 reason: The reason we have a skip in place. For instance: 'shared test 2213 methods' or 'shared assertion methods'. 2214 2215 Returns: 2216 Decorator function that will cause a class to be skipped. 2217 """ 2218 if isinstance(reason, type): 2219 raise TypeError('Got {!r}, expected reason as string'.format(reason)) 2220 2221 def _skip_class(test_case_class): 2222 if not issubclass(test_case_class, unittest.TestCase): 2223 raise TypeError( 2224 'Decorating {!r}, expected TestCase subclass'.format(test_case_class)) 2225 2226 # Only shadow the setUpClass method if it is directly defined. If it is 2227 # in the parent class we invoke it via a super() call instead of holding 2228 # a reference to it. 2229 shadowed_setupclass = test_case_class.__dict__.get('setUpClass', None) 2230 2231 @classmethod 2232 def replacement_setupclass(cls, *args, **kwargs): 2233 # Skip this class if it is the one that was decorated with @skipThisClass 2234 if cls is test_case_class: 2235 raise SkipTest(reason) 2236 if shadowed_setupclass: 2237 # Pass along `cls` so the MRO chain doesn't break. 2238 # The original method is a `classmethod` descriptor, which can't 2239 # be directly called, but `__func__` has the underlying function. 2240 return shadowed_setupclass.__func__(cls, *args, **kwargs) 2241 else: 2242 # Because there's no setUpClass() defined directly on test_case_class, 2243 # we call super() ourselves to continue execution of the inheritance 2244 # chain. 2245 return super(test_case_class, cls).setUpClass(*args, **kwargs) 2246 2247 test_case_class.setUpClass = replacement_setupclass 2248 return test_case_class 2249 2250 return _skip_class 2251 2252 2253class TestLoader(unittest.TestLoader): 2254 """A test loader which supports common test features. 2255 2256 Supported features include: 2257 * Banning untested methods with test-like names: methods attached to this 2258 testCase with names starting with `Test` are ignored by the test runner, 2259 and often represent mistakenly-omitted test cases. This loader will raise 2260 a TypeError when attempting to load a TestCase with such methods. 2261 * Randomization of test case execution order (optional). 2262 """ 2263 2264 _ERROR_MSG = textwrap.dedent("""Method '%s' is named like a test case but 2265 is not one. This is often a bug. If you want it to be a test method, 2266 name it with 'test' in lowercase. If not, rename the method to not begin 2267 with 'Test'.""") 2268 2269 def __init__(self, *args, **kwds): 2270 super(TestLoader, self).__init__(*args, **kwds) 2271 seed = _get_default_randomize_ordering_seed() 2272 if seed: 2273 self._randomize_ordering_seed = seed 2274 self._random = random.Random(self._randomize_ordering_seed) 2275 else: 2276 self._randomize_ordering_seed = None 2277 self._random = None 2278 2279 def getTestCaseNames(self, testCaseClass): # pylint:disable=invalid-name 2280 """Validates and returns a (possibly randomized) list of test case names.""" 2281 for name in dir(testCaseClass): 2282 if _is_suspicious_attribute(testCaseClass, name): 2283 raise TypeError(TestLoader._ERROR_MSG % name) 2284 names = super(TestLoader, self).getTestCaseNames(testCaseClass) 2285 if self._randomize_ordering_seed is not None: 2286 logging.info( 2287 'Randomizing test order with seed: %d', self._randomize_ordering_seed) 2288 logging.info( 2289 'To reproduce this order, re-run with ' 2290 '--test_randomize_ordering_seed=%d', self._randomize_ordering_seed) 2291 self._random.shuffle(names) 2292 return names 2293 2294 2295def get_default_xml_output_filename(): 2296 # type: () -> Optional[Text] 2297 if os.environ.get('XML_OUTPUT_FILE'): 2298 return os.environ['XML_OUTPUT_FILE'] 2299 elif os.environ.get('RUNNING_UNDER_TEST_DAEMON'): 2300 return os.path.join(os.path.dirname(TEST_TMPDIR.value), 'test_detail.xml') 2301 elif os.environ.get('TEST_XMLOUTPUTDIR'): 2302 return os.path.join( 2303 os.environ['TEST_XMLOUTPUTDIR'], 2304 os.path.splitext(os.path.basename(sys.argv[0]))[0] + '.xml') 2305 2306 2307def _setup_filtering(argv): 2308 # type: (MutableSequence[Text]) -> None 2309 """Implements the bazel test filtering protocol. 2310 2311 The following environment variable is used in this method: 2312 2313 TESTBRIDGE_TEST_ONLY: string, if set, is forwarded to the unittest 2314 framework to use as a test filter. Its value is split with shlex, then: 2315 1. On Python 3.6 and before, split values are passed as positional 2316 arguments on argv. 2317 2. On Python 3.7+, split values are passed to unittest's `-k` flag. Tests 2318 are matched by glob patterns or substring. See 2319 https://docs.python.org/3/library/unittest.html#cmdoption-unittest-k 2320 2321 Args: 2322 argv: the argv to mutate in-place. 2323 """ 2324 test_filter = os.environ.get('TESTBRIDGE_TEST_ONLY') 2325 if argv is None or not test_filter: 2326 return 2327 2328 filters = shlex.split(test_filter) 2329 if sys.version_info[:2] >= (3, 7): 2330 filters = ['-k=' + test_filter for test_filter in filters] 2331 2332 argv[1:1] = filters 2333 2334 2335def _setup_test_runner_fail_fast(argv): 2336 # type: (MutableSequence[Text]) -> None 2337 """Implements the bazel test fail fast protocol. 2338 2339 The following environment variable is used in this method: 2340 2341 TESTBRIDGE_TEST_RUNNER_FAIL_FAST=<1|0> 2342 2343 If set to 1, --failfast is passed to the unittest framework to return upon 2344 first failure. 2345 2346 Args: 2347 argv: the argv to mutate in-place. 2348 """ 2349 2350 if argv is None: 2351 return 2352 2353 if os.environ.get('TESTBRIDGE_TEST_RUNNER_FAIL_FAST') != '1': 2354 return 2355 2356 argv[1:1] = ['--failfast'] 2357 2358 2359def _setup_sharding(custom_loader=None): 2360 # type: (Optional[unittest.TestLoader]) -> unittest.TestLoader 2361 """Implements the bazel sharding protocol. 2362 2363 The following environment variables are used in this method: 2364 2365 TEST_SHARD_STATUS_FILE: string, if set, points to a file. We write a blank 2366 file to tell the test runner that this test implements the test sharding 2367 protocol. 2368 2369 TEST_TOTAL_SHARDS: int, if set, sharding is requested. 2370 2371 TEST_SHARD_INDEX: int, must be set if TEST_TOTAL_SHARDS is set. Specifies 2372 the shard index for this instance of the test process. Must satisfy: 2373 0 <= TEST_SHARD_INDEX < TEST_TOTAL_SHARDS. 2374 2375 Args: 2376 custom_loader: A TestLoader to be made sharded. 2377 2378 Returns: 2379 The test loader for shard-filtering or the standard test loader, depending 2380 on the sharding environment variables. 2381 """ 2382 2383 # It may be useful to write the shard file even if the other sharding 2384 # environment variables are not set. Test runners may use this functionality 2385 # to query whether a test binary implements the test sharding protocol. 2386 if 'TEST_SHARD_STATUS_FILE' in os.environ: 2387 try: 2388 with open(os.environ['TEST_SHARD_STATUS_FILE'], 'w') as f: 2389 f.write('') 2390 except IOError: 2391 sys.stderr.write('Error opening TEST_SHARD_STATUS_FILE (%s). Exiting.' 2392 % os.environ['TEST_SHARD_STATUS_FILE']) 2393 sys.exit(1) 2394 2395 base_loader = custom_loader or TestLoader() 2396 if 'TEST_TOTAL_SHARDS' not in os.environ: 2397 # Not using sharding, use the expected test loader. 2398 return base_loader 2399 2400 total_shards = int(os.environ['TEST_TOTAL_SHARDS']) 2401 shard_index = int(os.environ['TEST_SHARD_INDEX']) 2402 2403 if shard_index < 0 or shard_index >= total_shards: 2404 sys.stderr.write('ERROR: Bad sharding values. index=%d, total=%d\n' % 2405 (shard_index, total_shards)) 2406 sys.exit(1) 2407 2408 # Replace the original getTestCaseNames with one that returns 2409 # the test case names for this shard. 2410 delegate_get_names = base_loader.getTestCaseNames 2411 2412 bucket_iterator = itertools.cycle(range(total_shards)) 2413 2414 def getShardedTestCaseNames(testCaseClass): 2415 filtered_names = [] 2416 # We need to sort the list of tests in order to determine which tests this 2417 # shard is responsible for; however, it's important to preserve the order 2418 # returned by the base loader, e.g. in the case of randomized test ordering. 2419 ordered_names = delegate_get_names(testCaseClass) 2420 for testcase in sorted(ordered_names): 2421 bucket = next(bucket_iterator) 2422 if bucket == shard_index: 2423 filtered_names.append(testcase) 2424 return [x for x in ordered_names if x in filtered_names] 2425 2426 base_loader.getTestCaseNames = getShardedTestCaseNames 2427 return base_loader 2428 2429 2430# pylint: disable=line-too-long 2431def _run_and_get_tests_result(argv, args, kwargs, xml_test_runner_class): 2432 # type: (MutableSequence[Text], Sequence[Any], MutableMapping[Text, Any], Type) -> unittest.TestResult 2433 # pylint: enable=line-too-long 2434 """Same as run_tests, except it returns the result instead of exiting.""" 2435 2436 # The entry from kwargs overrides argv. 2437 argv = kwargs.pop('argv', argv) 2438 2439 # Set up test filtering if requested in environment. 2440 _setup_filtering(argv) 2441 # Set up --failfast as requested in environment 2442 _setup_test_runner_fail_fast(argv) 2443 2444 # Shard the (default or custom) loader if sharding is turned on. 2445 kwargs['testLoader'] = _setup_sharding(kwargs.get('testLoader', None)) 2446 2447 # XML file name is based upon (sorted by priority): 2448 # --xml_output_file flag, XML_OUTPUT_FILE variable, 2449 # TEST_XMLOUTPUTDIR variable or RUNNING_UNDER_TEST_DAEMON variable. 2450 if not FLAGS.xml_output_file: 2451 FLAGS.xml_output_file = get_default_xml_output_filename() 2452 xml_output_file = FLAGS.xml_output_file 2453 2454 xml_buffer = None 2455 if xml_output_file: 2456 xml_output_dir = os.path.dirname(xml_output_file) 2457 if xml_output_dir and not os.path.isdir(xml_output_dir): 2458 try: 2459 os.makedirs(xml_output_dir) 2460 except OSError as e: 2461 # File exists error can occur with concurrent tests 2462 if e.errno != errno.EEXIST: 2463 raise 2464 # Fail early if we can't write to the XML output file. This is so that we 2465 # don't waste people's time running tests that will just fail anyways. 2466 with _open(xml_output_file, 'w'): 2467 pass 2468 2469 # We can reuse testRunner if it supports XML output (e. g. by inheriting 2470 # from xml_reporter.TextAndXMLTestRunner). Otherwise we need to use 2471 # xml_reporter.TextAndXMLTestRunner. 2472 if (kwargs.get('testRunner') is not None 2473 and not hasattr(kwargs['testRunner'], 'set_default_xml_stream')): 2474 sys.stderr.write('WARNING: XML_OUTPUT_FILE or --xml_output_file setting ' 2475 'overrides testRunner=%r setting (possibly from --pdb)' 2476 % (kwargs['testRunner'])) 2477 # Passing a class object here allows TestProgram to initialize 2478 # instances based on its kwargs and/or parsed command-line args. 2479 kwargs['testRunner'] = xml_test_runner_class 2480 if kwargs.get('testRunner') is None: 2481 kwargs['testRunner'] = xml_test_runner_class 2482 # Use an in-memory buffer (not backed by the actual file) to store the XML 2483 # report, because some tools modify the file (e.g., create a placeholder 2484 # with partial information, in case the test process crashes). 2485 xml_buffer = io.StringIO() 2486 kwargs['testRunner'].set_default_xml_stream(xml_buffer) # pytype: disable=attribute-error 2487 2488 # If we've used a seed to randomize test case ordering, we want to record it 2489 # as a top-level attribute in the `testsuites` section of the XML output. 2490 randomize_ordering_seed = getattr( 2491 kwargs['testLoader'], '_randomize_ordering_seed', None) 2492 setter = getattr(kwargs['testRunner'], 'set_testsuites_property', None) 2493 if randomize_ordering_seed and setter: 2494 setter('test_randomize_ordering_seed', randomize_ordering_seed) 2495 elif kwargs.get('testRunner') is None: 2496 kwargs['testRunner'] = _pretty_print_reporter.TextTestRunner 2497 2498 if FLAGS.pdb_post_mortem: 2499 runner = kwargs['testRunner'] 2500 # testRunner can be a class or an instance, which must be tested for 2501 # differently. 2502 # Overriding testRunner isn't uncommon, so only enable the debugging 2503 # integration if the runner claims it does; we don't want to accidentally 2504 # clobber something on the runner. 2505 if ((isinstance(runner, type) and 2506 issubclass(runner, _pretty_print_reporter.TextTestRunner)) or 2507 isinstance(runner, _pretty_print_reporter.TextTestRunner)): 2508 runner.run_for_debugging = True 2509 2510 # Make sure tmpdir exists. 2511 if not os.path.isdir(TEST_TMPDIR.value): 2512 try: 2513 os.makedirs(TEST_TMPDIR.value) 2514 except OSError as e: 2515 # Concurrent test might have created the directory. 2516 if e.errno != errno.EEXIST: 2517 raise 2518 2519 # Let unittest.TestProgram.__init__ do its own argv parsing, e.g. for '-v', 2520 # on argv, which is sys.argv without the command-line flags. 2521 kwargs['argv'] = argv 2522 2523 try: 2524 test_program = unittest.TestProgram(*args, **kwargs) 2525 return test_program.result 2526 finally: 2527 if xml_buffer: 2528 try: 2529 with _open(xml_output_file, 'w') as f: 2530 f.write(xml_buffer.getvalue()) 2531 finally: 2532 xml_buffer.close() 2533 2534 2535def run_tests(argv, args, kwargs): # pylint: disable=line-too-long 2536 # type: (MutableSequence[Text], Sequence[Any], MutableMapping[Text, Any]) -> None 2537 # pylint: enable=line-too-long 2538 """Executes a set of Python unit tests. 2539 2540 Most users should call absltest.main() instead of run_tests. 2541 2542 Please note that run_tests should be called from app.run. 2543 Calling absltest.main() would ensure that. 2544 2545 Please note that run_tests is allowed to make changes to kwargs. 2546 2547 Args: 2548 argv: sys.argv with the command-line flags removed from the front, i.e. the 2549 argv with which :func:`app.run()<absl.app.run>` has called 2550 ``__main__.main``. It is passed to 2551 ``unittest.TestProgram.__init__(argv=)``, which does its own flag parsing. 2552 It is ignored if kwargs contains an argv entry. 2553 args: Positional arguments passed through to 2554 ``unittest.TestProgram.__init__``. 2555 kwargs: Keyword arguments passed through to 2556 ``unittest.TestProgram.__init__``. 2557 """ 2558 result = _run_and_get_tests_result( 2559 argv, args, kwargs, xml_reporter.TextAndXMLTestRunner) 2560 sys.exit(not result.wasSuccessful()) 2561 2562 2563def _rmtree_ignore_errors(path): 2564 # type: (Text) -> None 2565 if os.path.isfile(path): 2566 try: 2567 os.unlink(path) 2568 except OSError: 2569 pass 2570 else: 2571 shutil.rmtree(path, ignore_errors=True) 2572 2573 2574def _get_first_part(path): 2575 # type: (Text) -> Text 2576 parts = path.split(os.sep, 1) 2577 return parts[0] 2578