• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Abseil Authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Base functionality for Abseil Python tests.
16
17This module contains base classes and high-level functions for Abseil-style
18tests.
19"""
20
21from collections import abc
22import contextlib
23import difflib
24import enum
25import errno
26import getpass
27import inspect
28import io
29import itertools
30import json
31import os
32import random
33import re
34import shlex
35import shutil
36import signal
37import stat
38import subprocess
39import sys
40import tempfile
41import textwrap
42import unittest
43from unittest import mock  # pylint: disable=unused-import Allow absltest.mock.
44from urllib import parse
45
46try:
47  # The faulthandler module isn't always available, and pytype doesn't
48  # understand that we're catching ImportError, so suppress the error.
49  # pytype: disable=import-error
50  import faulthandler
51  # pytype: enable=import-error
52except ImportError:
53  # We use faulthandler if it is available.
54  faulthandler = None
55
56from absl import app
57from absl import flags
58from absl import logging
59from absl.testing import _pretty_print_reporter
60from absl.testing import xml_reporter
61
62# Make typing an optional import to avoid it being a required dependency
63# in Python 2. Type checkers will still understand the imports.
64try:
65  # pylint: disable=unused-import
66  import typing
67  from typing import Any, AnyStr, BinaryIO, Callable, ContextManager, IO, Iterator, List, Mapping, MutableMapping, MutableSequence, Optional, Sequence, Text, TextIO, Tuple, Type, Union
68  # pylint: enable=unused-import
69except ImportError:
70  pass
71else:
72  # Use an if-type-checking block to prevent leakage of type-checking only
73  # symbols. We don't want people relying on these at runtime.
74  if typing.TYPE_CHECKING:
75    # Unbounded TypeVar for general usage
76    _T = typing.TypeVar('_T')
77
78    import unittest.case
79    _OutcomeType = unittest.case._Outcome  # pytype: disable=module-attr
80
81
82
83# Re-export a bunch of unittest functions we support so that people don't
84# have to import unittest to get them
85# pylint: disable=invalid-name
86skip = unittest.skip
87skipIf = unittest.skipIf
88skipUnless = unittest.skipUnless
89SkipTest = unittest.SkipTest
90expectedFailure = unittest.expectedFailure
91# pylint: enable=invalid-name
92
93# End unittest re-exports
94
95FLAGS = flags.FLAGS
96
97_TEXT_OR_BINARY_TYPES = (str, bytes)
98
99# Suppress surplus entries in AssertionError stack traces.
100__unittest = True  # pylint: disable=invalid-name
101
102
103def expectedFailureIf(condition, reason):  # pylint: disable=invalid-name
104  """Expects the test to fail if the run condition is True.
105
106  Example usage::
107
108      @expectedFailureIf(sys.version.major == 2, "Not yet working in py2")
109      def test_foo(self):
110        ...
111
112  Args:
113    condition: bool, whether to expect failure or not.
114    reason: Text, the reason to expect failure.
115  Returns:
116    Decorator function
117  """
118  del reason  # Unused
119  if condition:
120    return unittest.expectedFailure
121  else:
122    return lambda f: f
123
124
125class TempFileCleanup(enum.Enum):
126  # Always cleanup temp files when the test completes.
127  ALWAYS = 'always'
128  # Only cleanup temp file if the test passes. This allows easier inspection
129  # of tempfile contents on test failure. absltest.TEST_TMPDIR.value determines
130  # where tempfiles are created.
131  SUCCESS = 'success'
132  # Never cleanup temp files.
133  OFF = 'never'
134
135
136# Many of the methods in this module have names like assertSameElements.
137# This kind of name does not comply with PEP8 style,
138# but it is consistent with the naming of methods in unittest.py.
139# pylint: disable=invalid-name
140
141
142def _get_default_test_random_seed():
143  # type: () -> int
144  random_seed = 301
145  value = os.environ.get('TEST_RANDOM_SEED', '')
146  try:
147    random_seed = int(value)
148  except ValueError:
149    pass
150  return random_seed
151
152
153def get_default_test_srcdir():
154  # type: () -> Text
155  """Returns default test source dir."""
156  return os.environ.get('TEST_SRCDIR', '')
157
158
159def get_default_test_tmpdir():
160  # type: () -> Text
161  """Returns default test temp dir."""
162  tmpdir = os.environ.get('TEST_TMPDIR', '')
163  if not tmpdir:
164    tmpdir = os.path.join(tempfile.gettempdir(), 'absl_testing')
165
166  return tmpdir
167
168
169def _get_default_randomize_ordering_seed():
170  # type: () -> int
171  """Returns default seed to use for randomizing test order.
172
173  This function first checks the --test_randomize_ordering_seed flag, and then
174  the TEST_RANDOMIZE_ORDERING_SEED environment variable. If the first value
175  we find is:
176    * (not set): disable test randomization
177    * 0: disable test randomization
178    * 'random': choose a random seed in [1, 4294967295] for test order
179      randomization
180    * positive integer: use this seed for test order randomization
181
182  (The values used are patterned after
183  https://docs.python.org/3/using/cmdline.html#envvar-PYTHONHASHSEED).
184
185  In principle, it would be simpler to return None if no override is provided;
186  however, the python random module has no `get_seed()`, only `getstate()`,
187  which returns far more data than we want to pass via an environment variable
188  or flag.
189
190  Returns:
191    A default value for test case randomization (int). 0 means do not randomize.
192
193  Raises:
194    ValueError: Raised when the flag or env value is not one of the options
195        above.
196  """
197  if FLAGS['test_randomize_ordering_seed'].present:
198    randomize = FLAGS.test_randomize_ordering_seed
199  elif 'TEST_RANDOMIZE_ORDERING_SEED' in os.environ:
200    randomize = os.environ['TEST_RANDOMIZE_ORDERING_SEED']
201  else:
202    randomize = ''
203  if not randomize:
204    return 0
205  if randomize == 'random':
206    return random.Random().randint(1, 4294967295)
207  if randomize == '0':
208    return 0
209  try:
210    seed = int(randomize)
211    if seed > 0:
212      return seed
213  except ValueError:
214    pass
215  raise ValueError(
216      'Unknown test randomization seed value: {}'.format(randomize))
217
218
219TEST_SRCDIR = flags.DEFINE_string(
220    'test_srcdir',
221    get_default_test_srcdir(),
222    'Root of directory tree where source files live',
223    allow_override_cpp=True)
224TEST_TMPDIR = flags.DEFINE_string(
225    'test_tmpdir',
226    get_default_test_tmpdir(),
227    'Directory for temporary testing files',
228    allow_override_cpp=True)
229
230flags.DEFINE_integer(
231    'test_random_seed',
232    _get_default_test_random_seed(),
233    'Random seed for testing. Some test frameworks may '
234    'change the default value of this flag between runs, so '
235    'it is not appropriate for seeding probabilistic tests.',
236    allow_override_cpp=True)
237flags.DEFINE_string(
238    'test_randomize_ordering_seed',
239    '',
240    'If positive, use this as a seed to randomize the '
241    'execution order for test cases. If "random", pick a '
242    'random seed to use. If 0 or not set, do not randomize '
243    'test case execution order. This flag also overrides '
244    'the TEST_RANDOMIZE_ORDERING_SEED environment variable.',
245    allow_override_cpp=True)
246flags.DEFINE_string('xml_output_file', '', 'File to store XML test results')
247
248
249# We might need to monkey-patch TestResult so that it stops considering an
250# unexpected pass as a as a "successful result".  For details, see
251# http://bugs.python.org/issue20165
252def _monkey_patch_test_result_for_unexpected_passes():
253  # type: () -> None
254  """Workaround for <http://bugs.python.org/issue20165>."""
255
256  def wasSuccessful(self):
257    # type: () -> bool
258    """Tells whether or not this result was a success.
259
260    Any unexpected pass is to be counted as a non-success.
261
262    Args:
263      self: The TestResult instance.
264
265    Returns:
266      Whether or not this result was a success.
267    """
268    return (len(self.failures) == len(self.errors) ==
269            len(self.unexpectedSuccesses) == 0)
270
271  test_result = unittest.TestResult()
272  test_result.addUnexpectedSuccess(unittest.FunctionTestCase(lambda: None))
273  if test_result.wasSuccessful():  # The bug is present.
274    unittest.TestResult.wasSuccessful = wasSuccessful
275    if test_result.wasSuccessful():  # Warn the user if our hot-fix failed.
276      sys.stderr.write('unittest.result.TestResult monkey patch to report'
277                       ' unexpected passes as failures did not work.\n')
278
279
280_monkey_patch_test_result_for_unexpected_passes()
281
282
283def _open(filepath, mode, _open_func=open):
284  # type: (Text, Text, Callable[..., IO]) -> IO
285  """Opens a file.
286
287  Like open(), but ensure that we can open real files even if tests stub out
288  open().
289
290  Args:
291    filepath: A filepath.
292    mode: A mode.
293    _open_func: A built-in open() function.
294
295  Returns:
296    The opened file object.
297  """
298  return _open_func(filepath, mode, encoding='utf-8')
299
300
301class _TempDir(object):
302  """Represents a temporary directory for tests.
303
304  Creation of this class is internal. Using its public methods is OK.
305
306  This class implements the `os.PathLike` interface (specifically,
307  `os.PathLike[str]`). This means, in Python 3, it can be directly passed
308  to e.g. `os.path.join()`.
309  """
310
311  def __init__(self, path):
312    # type: (Text) -> None
313    """Module-private: do not instantiate outside module."""
314    self._path = path
315
316  @property
317  def full_path(self):
318    # type: () -> Text
319    """Returns the path, as a string, for the directory.
320
321    TIP: Instead of e.g. `os.path.join(temp_dir.full_path)`, you can simply
322    do `os.path.join(temp_dir)` because `__fspath__()` is implemented.
323    """
324    return self._path
325
326  def __fspath__(self):
327    # type: () -> Text
328    """See os.PathLike."""
329    return self.full_path
330
331  def create_file(self, file_path=None, content=None, mode='w', encoding='utf8',
332                  errors='strict'):
333    # type: (Optional[Text], Optional[AnyStr], Text, Text, Text) -> _TempFile
334    """Create a file in the directory.
335
336    NOTE: If the file already exists, it will be made writable and overwritten.
337
338    Args:
339      file_path: Optional file path for the temp file. If not given, a unique
340        file name will be generated and used. Slashes are allowed in the name;
341        any missing intermediate directories will be created. NOTE: This path
342        is the path that will be cleaned up, including any directories in the
343        path, e.g., 'foo/bar/baz.txt' will `rm -r foo`
344      content: Optional string or bytes to initially write to the file. If not
345        specified, then an empty file is created.
346      mode: Mode string to use when writing content. Only used if `content` is
347        non-empty.
348      encoding: Encoding to use when writing string content. Only used if
349        `content` is text.
350      errors: How to handle text to bytes encoding errors. Only used if
351        `content` is text.
352
353    Returns:
354      A _TempFile representing the created file.
355    """
356    tf, _ = _TempFile._create(self._path, file_path, content, mode, encoding,
357                              errors)
358    return tf
359
360  def mkdir(self, dir_path=None):
361    # type: (Optional[Text]) -> _TempDir
362    """Create a directory in the directory.
363
364    Args:
365      dir_path: Optional path to the directory to create. If not given,
366        a unique name will be generated and used.
367
368    Returns:
369      A _TempDir representing the created directory.
370    """
371    if dir_path:
372      path = os.path.join(self._path, dir_path)
373    else:
374      path = tempfile.mkdtemp(dir=self._path)
375
376    # Note: there's no need to clear the directory since the containing
377    # dir was cleared by the tempdir() function.
378    os.makedirs(path, exist_ok=True)
379    return _TempDir(path)
380
381
382class _TempFile(object):
383  """Represents a tempfile for tests.
384
385  Creation of this class is internal. Using its public methods is OK.
386
387  This class implements the `os.PathLike` interface (specifically,
388  `os.PathLike[str]`). This means, in Python 3, it can be directly passed
389  to e.g. `os.path.join()`.
390  """
391
392  def __init__(self, path):
393    # type: (Text) -> None
394    """Private: use _create instead."""
395    self._path = path
396
397  # pylint: disable=line-too-long
398  @classmethod
399  def _create(cls, base_path, file_path, content, mode, encoding, errors):
400    # type: (Text, Optional[Text], AnyStr, Text, Text, Text) -> Tuple[_TempFile, Text]
401    # pylint: enable=line-too-long
402    """Module-private: create a tempfile instance."""
403    if file_path:
404      cleanup_path = os.path.join(base_path, _get_first_part(file_path))
405      path = os.path.join(base_path, file_path)
406      os.makedirs(os.path.dirname(path), exist_ok=True)
407      # The file may already exist, in which case, ensure it's writable so that
408      # it can be truncated.
409      if os.path.exists(path) and not os.access(path, os.W_OK):
410        stat_info = os.stat(path)
411        os.chmod(path, stat_info.st_mode | stat.S_IWUSR)
412    else:
413      os.makedirs(base_path, exist_ok=True)
414      fd, path = tempfile.mkstemp(dir=str(base_path))
415      os.close(fd)
416      cleanup_path = path
417
418    tf = cls(path)
419
420    if content:
421      if isinstance(content, str):
422        tf.write_text(content, mode=mode, encoding=encoding, errors=errors)
423      else:
424        tf.write_bytes(content, mode)
425
426    else:
427      tf.write_bytes(b'')
428
429    return tf, cleanup_path
430
431  @property
432  def full_path(self):
433    # type: () -> Text
434    """Returns the path, as a string, for the file.
435
436    TIP: Instead of e.g. `os.path.join(temp_file.full_path)`, you can simply
437    do `os.path.join(temp_file)` because `__fspath__()` is implemented.
438    """
439    return self._path
440
441  def __fspath__(self):
442    # type: () -> Text
443    """See os.PathLike."""
444    return self.full_path
445
446  def read_text(self, encoding='utf8', errors='strict'):
447    # type: (Text, Text) -> Text
448    """Return the contents of the file as text."""
449    with self.open_text(encoding=encoding, errors=errors) as fp:
450      return fp.read()
451
452  def read_bytes(self):
453    # type: () -> bytes
454    """Return the content of the file as bytes."""
455    with self.open_bytes() as fp:
456      return fp.read()
457
458  def write_text(self, text, mode='w', encoding='utf8', errors='strict'):
459    # type: (Text, Text, Text, Text) -> None
460    """Write text to the file.
461
462    Args:
463      text: Text to write. In Python 2, it can be bytes, which will be
464        decoded using the `encoding` arg (this is as an aid for code that
465        is 2 and 3 compatible).
466      mode: The mode to open the file for writing.
467      encoding: The encoding to use when writing the text to the file.
468      errors: The error handling strategy to use when converting text to bytes.
469    """
470    with self.open_text(mode, encoding=encoding, errors=errors) as fp:
471      fp.write(text)
472
473  def write_bytes(self, data, mode='wb'):
474    # type: (bytes, Text) -> None
475    """Write bytes to the file.
476
477    Args:
478      data: bytes to write.
479      mode: Mode to open the file for writing. The "b" flag is implicit if
480        not already present. It must not have the "t" flag.
481    """
482    with self.open_bytes(mode) as fp:
483      fp.write(data)
484
485  def open_text(self, mode='rt', encoding='utf8', errors='strict'):
486    # type: (Text, Text, Text) -> ContextManager[TextIO]
487    """Return a context manager for opening the file in text mode.
488
489    Args:
490      mode: The mode to open the file in. The "t" flag is implicit if not
491        already present. It must not have the "b" flag.
492      encoding: The encoding to use when opening the file.
493      errors: How to handle decoding errors.
494
495    Returns:
496      Context manager that yields an open file.
497
498    Raises:
499      ValueError: if invalid inputs are provided.
500    """
501    if 'b' in mode:
502      raise ValueError('Invalid mode {!r}: "b" flag not allowed when opening '
503                       'file in text mode'.format(mode))
504    if 't' not in mode:
505      mode += 't'
506    cm = self._open(mode, encoding, errors)
507    return cm
508
509  def open_bytes(self, mode='rb'):
510    # type: (Text) -> ContextManager[BinaryIO]
511    """Return a context manager for opening the file in binary mode.
512
513    Args:
514      mode: The mode to open the file in. The "b" mode is implicit if not
515        already present. It must not have the "t" flag.
516
517    Returns:
518      Context manager that yields an open file.
519
520    Raises:
521      ValueError: if invalid inputs are provided.
522    """
523    if 't' in mode:
524      raise ValueError('Invalid mode {!r}: "t" flag not allowed when opening '
525                       'file in binary mode'.format(mode))
526    if 'b' not in mode:
527      mode += 'b'
528    cm = self._open(mode, encoding=None, errors=None)
529    return cm
530
531  # TODO(b/123775699): Once pytype supports typing.Literal, use overload and
532  # Literal to express more precise return types. The contained type is
533  # currently `Any` to avoid [bad-return-type] errors in the open_* methods.
534  @contextlib.contextmanager
535  def _open(
536      self, mode: str, encoding: str = 'utf8', errors: str = 'strict'
537  ) -> Iterator[Any]:
538    with io.open(
539        self.full_path, mode=mode, encoding=encoding, errors=errors) as fp:
540      yield fp
541
542
543class _method(object):
544  """A decorator that supports both instance and classmethod invocations.
545
546  Using similar semantics to the @property builtin, this decorator can augment
547  an instance method to support conditional logic when invoked on a class
548  object. This breaks support for invoking an instance method via the class
549  (e.g. Cls.method(self, ...)) but is still situationally useful.
550  """
551
552  def __init__(self, finstancemethod):
553    # type: (Callable[..., Any]) -> None
554    self._finstancemethod = finstancemethod
555    self._fclassmethod = None
556
557  def classmethod(self, fclassmethod):
558    # type: (Callable[..., Any]) -> _method
559    self._fclassmethod = classmethod(fclassmethod)
560    return self
561
562  def __doc__(self):
563    # type: () -> str
564    if getattr(self._finstancemethod, '__doc__'):
565      return self._finstancemethod.__doc__
566    elif getattr(self._fclassmethod, '__doc__'):
567      return self._fclassmethod.__doc__
568    return ''
569
570  def __get__(self, obj, type_):
571    # type: (Optional[Any], Optional[Type[Any]]) -> Callable[..., Any]
572    func = self._fclassmethod if obj is None else self._finstancemethod
573    return func.__get__(obj, type_)  # pytype: disable=attribute-error
574
575
576class TestCase(unittest.TestCase):
577  """Extension of unittest.TestCase providing more power."""
578
579  # When to cleanup files/directories created by our `create_tempfile()` and
580  # `create_tempdir()` methods after each test case completes. This does *not*
581  # affect e.g., files created outside of those methods, e.g., using the stdlib
582  # tempfile module. This can be overridden at the class level, instance level,
583  # or with the `cleanup` arg of `create_tempfile()` and `create_tempdir()`. See
584  # `TempFileCleanup` for details on the different values.
585  # TODO(b/70517332): Remove the type comment and the disable once pytype has
586  # better support for enums.
587  tempfile_cleanup = TempFileCleanup.ALWAYS  # type: TempFileCleanup  # pytype: disable=annotation-type-mismatch
588
589  maxDiff = 80 * 20
590  longMessage = True
591
592  # Exit stacks for per-test and per-class scopes.
593  _exit_stack = None
594  _cls_exit_stack = None
595
596  def __init__(self, *args, **kwargs):
597    super(TestCase, self).__init__(*args, **kwargs)
598    # This is to work around missing type stubs in unittest.pyi
599    self._outcome = getattr(self, '_outcome')  # type: Optional[_OutcomeType]
600
601  def setUp(self):
602    super(TestCase, self).setUp()
603    # NOTE: Only Python 3 contextlib has ExitStack
604    if hasattr(contextlib, 'ExitStack'):
605      self._exit_stack = contextlib.ExitStack()
606      self.addCleanup(self._exit_stack.close)
607
608  @classmethod
609  def setUpClass(cls):
610    super(TestCase, cls).setUpClass()
611    # NOTE: Only Python 3 contextlib has ExitStack and only Python 3.8+ has
612    # addClassCleanup.
613    if hasattr(contextlib, 'ExitStack') and hasattr(cls, 'addClassCleanup'):
614      cls._cls_exit_stack = contextlib.ExitStack()
615      cls.addClassCleanup(cls._cls_exit_stack.close)
616
617  def create_tempdir(self, name=None, cleanup=None):
618    # type: (Optional[Text], Optional[TempFileCleanup]) -> _TempDir
619    """Create a temporary directory specific to the test.
620
621    NOTE: The directory and its contents will be recursively cleared before
622    creation. This ensures that there is no pre-existing state.
623
624    This creates a named directory on disk that is isolated to this test, and
625    will be properly cleaned up by the test. This avoids several pitfalls of
626    creating temporary directories for test purposes, as well as makes it easier
627    to setup directories and verify their contents. For example::
628
629        def test_foo(self):
630          out_dir = self.create_tempdir()
631          out_log = out_dir.create_file('output.log')
632          expected_outputs = [
633              os.path.join(out_dir, 'data-0.txt'),
634              os.path.join(out_dir, 'data-1.txt'),
635          ]
636          code_under_test(out_dir)
637          self.assertTrue(os.path.exists(expected_paths[0]))
638          self.assertTrue(os.path.exists(expected_paths[1]))
639          self.assertEqual('foo', out_log.read_text())
640
641    See also: :meth:`create_tempdir` for creating temporary files.
642
643    Args:
644      name: Optional name of the directory. If not given, a unique
645        name will be generated and used.
646      cleanup: Optional cleanup policy on when/if to remove the directory (and
647        all its contents) at the end of the test. If None, then uses
648        :attr:`tempfile_cleanup`.
649
650    Returns:
651      A _TempDir representing the created directory; see _TempDir class docs
652      for usage.
653    """
654    test_path = self._get_tempdir_path_test()
655
656    if name:
657      path = os.path.join(test_path, name)
658      cleanup_path = os.path.join(test_path, _get_first_part(name))
659    else:
660      os.makedirs(test_path, exist_ok=True)
661      path = tempfile.mkdtemp(dir=test_path)
662      cleanup_path = path
663
664    _rmtree_ignore_errors(cleanup_path)
665    os.makedirs(path, exist_ok=True)
666
667    self._maybe_add_temp_path_cleanup(cleanup_path, cleanup)
668
669    return _TempDir(path)
670
671  # pylint: disable=line-too-long
672  def create_tempfile(self, file_path=None, content=None, mode='w',
673                      encoding='utf8', errors='strict', cleanup=None):
674    # type: (Optional[Text], Optional[AnyStr], Text, Text, Text, Optional[TempFileCleanup]) -> _TempFile
675    # pylint: enable=line-too-long
676    """Create a temporary file specific to the test.
677
678    This creates a named file on disk that is isolated to this test, and will
679    be properly cleaned up by the test. This avoids several pitfalls of
680    creating temporary files for test purposes, as well as makes it easier
681    to setup files, their data, read them back, and inspect them when
682    a test fails. For example::
683
684        def test_foo(self):
685          output = self.create_tempfile()
686          code_under_test(output)
687          self.assertGreater(os.path.getsize(output), 0)
688          self.assertEqual('foo', output.read_text())
689
690    NOTE: This will zero-out the file. This ensures there is no pre-existing
691    state.
692    NOTE: If the file already exists, it will be made writable and overwritten.
693
694    See also: :meth:`create_tempdir` for creating temporary directories, and
695    ``_TempDir.create_file`` for creating files within a temporary directory.
696
697    Args:
698      file_path: Optional file path for the temp file. If not given, a unique
699        file name will be generated and used. Slashes are allowed in the name;
700        any missing intermediate directories will be created. NOTE: This path is
701        the path that will be cleaned up, including any directories in the path,
702        e.g., ``'foo/bar/baz.txt'`` will ``rm -r foo``.
703      content: Optional string or
704        bytes to initially write to the file. If not
705        specified, then an empty file is created.
706      mode: Mode string to use when writing content. Only used if `content` is
707        non-empty.
708      encoding: Encoding to use when writing string content. Only used if
709        `content` is text.
710      errors: How to handle text to bytes encoding errors. Only used if
711        `content` is text.
712      cleanup: Optional cleanup policy on when/if to remove the directory (and
713        all its contents) at the end of the test. If None, then uses
714        :attr:`tempfile_cleanup`.
715
716    Returns:
717      A _TempFile representing the created file; see _TempFile class docs for
718      usage.
719    """
720    test_path = self._get_tempdir_path_test()
721    tf, cleanup_path = _TempFile._create(test_path, file_path, content=content,
722                                         mode=mode, encoding=encoding,
723                                         errors=errors)
724    self._maybe_add_temp_path_cleanup(cleanup_path, cleanup)
725    return tf
726
727  @_method
728  def enter_context(self, manager):
729    # type: (ContextManager[_T]) -> _T
730    """Returns the CM's value after registering it with the exit stack.
731
732    Entering a context pushes it onto a stack of contexts. When `enter_context`
733    is called on the test instance (e.g. `self.enter_context`), the context is
734    exited after the test case's tearDown call. When called on the test class
735    (e.g. `TestCase.enter_context`), the context is exited after the test
736    class's tearDownClass call.
737
738    Contexts are exited in the reverse order of entering. They will always
739    be exited, regardless of test failure/success.
740
741    This is useful to eliminate per-test boilerplate when context managers
742    are used. For example, instead of decorating every test with `@mock.patch`,
743    simply do `self.foo = self.enter_context(mock.patch(...))' in `setUp()`.
744
745    NOTE: The context managers will always be exited without any error
746    information. This is an unfortunate implementation detail due to some
747    internals of how unittest runs tests.
748
749    Args:
750      manager: The context manager to enter.
751    """
752    if not self._exit_stack:
753      raise AssertionError(
754          'self._exit_stack is not set: enter_context is Py3-only; also make '
755          'sure that AbslTest.setUp() is called.')
756    return self._exit_stack.enter_context(manager)
757
758  @enter_context.classmethod
759  def enter_context(cls, manager):  # pylint: disable=no-self-argument
760    # type: (ContextManager[_T]) -> _T
761    if not cls._cls_exit_stack:
762      raise AssertionError(
763          'cls._cls_exit_stack is not set: cls.enter_context requires '
764          'Python 3.8+; also make sure that AbslTest.setUpClass() is called.')
765    return cls._cls_exit_stack.enter_context(manager)
766
767  @classmethod
768  def _get_tempdir_path_cls(cls):
769    # type: () -> Text
770    return os.path.join(TEST_TMPDIR.value,
771                        cls.__qualname__.replace('__main__.', ''))
772
773  def _get_tempdir_path_test(self):
774    # type: () -> Text
775    return os.path.join(self._get_tempdir_path_cls(), self._testMethodName)
776
777  def _get_tempfile_cleanup(self, override):
778    # type: (Optional[TempFileCleanup]) -> TempFileCleanup
779    if override is not None:
780      return override
781    return self.tempfile_cleanup
782
783  def _maybe_add_temp_path_cleanup(self, path, cleanup):
784    # type: (Text, Optional[TempFileCleanup]) -> None
785    cleanup = self._get_tempfile_cleanup(cleanup)
786    if cleanup == TempFileCleanup.OFF:
787      return
788    elif cleanup == TempFileCleanup.ALWAYS:
789      self.addCleanup(_rmtree_ignore_errors, path)
790    elif cleanup == TempFileCleanup.SUCCESS:
791      self._internal_add_cleanup_on_success(_rmtree_ignore_errors, path)
792    else:
793      raise AssertionError('Unexpected cleanup value: {}'.format(cleanup))
794
795  def _internal_add_cleanup_on_success(
796      self,
797      function: Callable[..., Any],
798      *args: Any,
799      **kwargs: Any,
800  ) -> None:
801    """Adds `function` as cleanup when the test case succeeds."""
802    outcome = self._outcome
803    previous_failure_count = (
804        len(outcome.result.failures)
805        + len(outcome.result.errors)
806        + len(outcome.result.unexpectedSuccesses)
807    )
808    def _call_cleaner_on_success(*args, **kwargs):
809      if not self._internal_ran_and_passed_when_called_during_cleanup(
810          previous_failure_count):
811        return
812      function(*args, **kwargs)
813    self.addCleanup(_call_cleaner_on_success, *args, **kwargs)
814
815  def _internal_ran_and_passed_when_called_during_cleanup(
816      self,
817      previous_failure_count: int,
818  ) -> bool:
819    """Returns whether test is passed. Expected to be called during cleanup."""
820    outcome = self._outcome
821    if sys.version_info[:2] >= (3, 11):
822      current_failure_count = (
823          len(outcome.result.failures)
824          + len(outcome.result.errors)
825          + len(outcome.result.unexpectedSuccesses)
826      )
827      return current_failure_count == previous_failure_count
828    else:
829      # Before Python 3.11 https://github.com/python/cpython/pull/28180, errors
830      # were bufferred in _Outcome before calling cleanup.
831      result = self.defaultTestResult()
832      self._feedErrorsToResult(result, outcome.errors)  # pytype: disable=attribute-error
833      return result.wasSuccessful()
834
835  def shortDescription(self):
836    # type: () -> Text
837    """Formats both the test method name and the first line of its docstring.
838
839    If no docstring is given, only returns the method name.
840
841    This method overrides unittest.TestCase.shortDescription(), which
842    only returns the first line of the docstring, obscuring the name
843    of the test upon failure.
844
845    Returns:
846      desc: A short description of a test method.
847    """
848    desc = self.id()
849
850    # Omit the main name so that test name can be directly copy/pasted to
851    # the command line.
852    if desc.startswith('__main__.'):
853      desc = desc[len('__main__.'):]
854
855    # NOTE: super() is used here instead of directly invoking
856    # unittest.TestCase.shortDescription(self), because of the
857    # following line that occurs later on:
858    #       unittest.TestCase = TestCase
859    # Because of this, direct invocation of what we think is the
860    # superclass will actually cause infinite recursion.
861    doc_first_line = super(TestCase, self).shortDescription()
862    if doc_first_line is not None:
863      desc = '\n'.join((desc, doc_first_line))
864    return desc
865
866  def assertStartsWith(self, actual, expected_start, msg=None):
867    """Asserts that actual.startswith(expected_start) is True.
868
869    Args:
870      actual: str
871      expected_start: str
872      msg: Optional message to report on failure.
873    """
874    if not actual.startswith(expected_start):
875      self.fail('%r does not start with %r' % (actual, expected_start), msg)
876
877  def assertNotStartsWith(self, actual, unexpected_start, msg=None):
878    """Asserts that actual.startswith(unexpected_start) is False.
879
880    Args:
881      actual: str
882      unexpected_start: str
883      msg: Optional message to report on failure.
884    """
885    if actual.startswith(unexpected_start):
886      self.fail('%r does start with %r' % (actual, unexpected_start), msg)
887
888  def assertEndsWith(self, actual, expected_end, msg=None):
889    """Asserts that actual.endswith(expected_end) is True.
890
891    Args:
892      actual: str
893      expected_end: str
894      msg: Optional message to report on failure.
895    """
896    if not actual.endswith(expected_end):
897      self.fail('%r does not end with %r' % (actual, expected_end), msg)
898
899  def assertNotEndsWith(self, actual, unexpected_end, msg=None):
900    """Asserts that actual.endswith(unexpected_end) is False.
901
902    Args:
903      actual: str
904      unexpected_end: str
905      msg: Optional message to report on failure.
906    """
907    if actual.endswith(unexpected_end):
908      self.fail('%r does end with %r' % (actual, unexpected_end), msg)
909
910  def assertSequenceStartsWith(self, prefix, whole, msg=None):
911    """An equality assertion for the beginning of ordered sequences.
912
913    If prefix is an empty sequence, it will raise an error unless whole is also
914    an empty sequence.
915
916    If prefix is not a sequence, it will raise an error if the first element of
917    whole does not match.
918
919    Args:
920      prefix: A sequence expected at the beginning of the whole parameter.
921      whole: The sequence in which to look for prefix.
922      msg: Optional message to report on failure.
923    """
924    try:
925      prefix_len = len(prefix)
926    except (TypeError, NotImplementedError):
927      prefix = [prefix]
928      prefix_len = 1
929
930    try:
931      whole_len = len(whole)
932    except (TypeError, NotImplementedError):
933      self.fail('For whole: len(%s) is not supported, it appears to be type: '
934                '%s' % (whole, type(whole)), msg)
935
936    assert prefix_len <= whole_len, self._formatMessage(
937        msg,
938        'Prefix length (%d) is longer than whole length (%d).' %
939        (prefix_len, whole_len)
940    )
941
942    if not prefix_len and whole_len:
943      self.fail('Prefix length is 0 but whole length is %d: %s' %
944                (len(whole), whole), msg)
945
946    try:
947      self.assertSequenceEqual(prefix, whole[:prefix_len], msg)
948    except AssertionError:
949      self.fail('prefix: %s not found at start of whole: %s.' %
950                (prefix, whole), msg)
951
952  def assertEmpty(self, container, msg=None):
953    """Asserts that an object has zero length.
954
955    Args:
956      container: Anything that implements the collections.abc.Sized interface.
957      msg: Optional message to report on failure.
958    """
959    if not isinstance(container, abc.Sized):
960      self.fail('Expected a Sized object, got: '
961                '{!r}'.format(type(container).__name__), msg)
962
963    # explicitly check the length since some Sized objects (e.g. numpy.ndarray)
964    # have strange __nonzero__/__bool__ behavior.
965    if len(container):  # pylint: disable=g-explicit-length-test
966      self.fail('{!r} has length of {}.'.format(container, len(container)), msg)
967
968  def assertNotEmpty(self, container, msg=None):
969    """Asserts that an object has non-zero length.
970
971    Args:
972      container: Anything that implements the collections.abc.Sized interface.
973      msg: Optional message to report on failure.
974    """
975    if not isinstance(container, abc.Sized):
976      self.fail('Expected a Sized object, got: '
977                '{!r}'.format(type(container).__name__), msg)
978
979    # explicitly check the length since some Sized objects (e.g. numpy.ndarray)
980    # have strange __nonzero__/__bool__ behavior.
981    if not len(container):  # pylint: disable=g-explicit-length-test
982      self.fail('{!r} has length of 0.'.format(container), msg)
983
984  def assertLen(self, container, expected_len, msg=None):
985    """Asserts that an object has the expected length.
986
987    Args:
988      container: Anything that implements the collections.abc.Sized interface.
989      expected_len: The expected length of the container.
990      msg: Optional message to report on failure.
991    """
992    if not isinstance(container, abc.Sized):
993      self.fail('Expected a Sized object, got: '
994                '{!r}'.format(type(container).__name__), msg)
995    if len(container) != expected_len:
996      container_repr = unittest.util.safe_repr(container)  # pytype: disable=module-attr
997      self.fail('{} has length of {}, expected {}.'.format(
998          container_repr, len(container), expected_len), msg)
999
1000  def assertSequenceAlmostEqual(self, expected_seq, actual_seq, places=None,
1001                                msg=None, delta=None):
1002    """An approximate equality assertion for ordered sequences.
1003
1004    Fail if the two sequences are unequal as determined by their value
1005    differences rounded to the given number of decimal places (default 7) and
1006    comparing to zero, or by comparing that the difference between each value
1007    in the two sequences is more than the given delta.
1008
1009    Note that decimal places (from zero) are usually not the same as significant
1010    digits (measured from the most significant digit).
1011
1012    If the two sequences compare equal then they will automatically compare
1013    almost equal.
1014
1015    Args:
1016      expected_seq: A sequence containing elements we are expecting.
1017      actual_seq: The sequence that we are testing.
1018      places: The number of decimal places to compare.
1019      msg: The message to be printed if the test fails.
1020      delta: The OK difference between compared values.
1021    """
1022    if len(expected_seq) != len(actual_seq):
1023      self.fail('Sequence size mismatch: {} vs {}'.format(
1024          len(expected_seq), len(actual_seq)), msg)
1025
1026    err_list = []
1027    for idx, (exp_elem, act_elem) in enumerate(zip(expected_seq, actual_seq)):
1028      try:
1029        # assertAlmostEqual should be called with at most one of `places` and
1030        # `delta`. However, it's okay for assertSequenceAlmostEqual to pass
1031        # both because we want the latter to fail if the former does.
1032        # pytype: disable=wrong-keyword-args
1033        self.assertAlmostEqual(exp_elem, act_elem, places=places, msg=msg,
1034                               delta=delta)
1035        # pytype: enable=wrong-keyword-args
1036      except self.failureException as err:
1037        err_list.append('At index {}: {}'.format(idx, err))
1038
1039    if err_list:
1040      if len(err_list) > 30:
1041        err_list = err_list[:30] + ['...']
1042      msg = self._formatMessage(msg, '\n'.join(err_list))
1043      self.fail(msg)
1044
1045  def assertContainsSubset(self, expected_subset, actual_set, msg=None):
1046    """Checks whether actual iterable is a superset of expected iterable."""
1047    missing = set(expected_subset) - set(actual_set)
1048    if not missing:
1049      return
1050
1051    self.fail('Missing elements %s\nExpected: %s\nActual: %s' % (
1052        missing, expected_subset, actual_set), msg)
1053
1054  def assertNoCommonElements(self, expected_seq, actual_seq, msg=None):
1055    """Checks whether actual iterable and expected iterable are disjoint."""
1056    common = set(expected_seq) & set(actual_seq)
1057    if not common:
1058      return
1059
1060    self.fail('Common elements %s\nExpected: %s\nActual: %s' % (
1061        common, expected_seq, actual_seq), msg)
1062
1063  def assertItemsEqual(self, expected_seq, actual_seq, msg=None):
1064    """Deprecated, please use assertCountEqual instead.
1065
1066    This is equivalent to assertCountEqual.
1067
1068    Args:
1069      expected_seq: A sequence containing elements we are expecting.
1070      actual_seq: The sequence that we are testing.
1071      msg: The message to be printed if the test fails.
1072    """
1073    super().assertCountEqual(expected_seq, actual_seq, msg)
1074
1075  def assertSameElements(self, expected_seq, actual_seq, msg=None):
1076    """Asserts that two sequences have the same elements (in any order).
1077
1078    This method, unlike assertCountEqual, doesn't care about any
1079    duplicates in the expected and actual sequences::
1080
1081        # Doesn't raise an AssertionError
1082        assertSameElements([1, 1, 1, 0, 0, 0], [0, 1])
1083
1084    If possible, you should use assertCountEqual instead of
1085    assertSameElements.
1086
1087    Args:
1088      expected_seq: A sequence containing elements we are expecting.
1089      actual_seq: The sequence that we are testing.
1090      msg: The message to be printed if the test fails.
1091    """
1092    # `unittest2.TestCase` used to have assertSameElements, but it was
1093    # removed in favor of assertItemsEqual. As there's a unit test
1094    # that explicitly checks this behavior, I am leaving this method
1095    # alone.
1096    # Fail on strings: empirically, passing strings to this test method
1097    # is almost always a bug. If comparing the character sets of two strings
1098    # is desired, cast the inputs to sets or lists explicitly.
1099    if (isinstance(expected_seq, _TEXT_OR_BINARY_TYPES) or
1100        isinstance(actual_seq, _TEXT_OR_BINARY_TYPES)):
1101      self.fail('Passing string/bytes to assertSameElements is usually a bug. '
1102                'Did you mean to use assertEqual?\n'
1103                'Expected: %s\nActual: %s' % (expected_seq, actual_seq))
1104    try:
1105      expected = dict([(element, None) for element in expected_seq])
1106      actual = dict([(element, None) for element in actual_seq])
1107      missing = [element for element in expected if element not in actual]
1108      unexpected = [element for element in actual if element not in expected]
1109      missing.sort()
1110      unexpected.sort()
1111    except TypeError:
1112      # Fall back to slower list-compare if any of the objects are
1113      # not hashable.
1114      expected = list(expected_seq)
1115      actual = list(actual_seq)
1116      expected.sort()
1117      actual.sort()
1118      missing, unexpected = _sorted_list_difference(expected, actual)
1119    errors = []
1120    if msg:
1121      errors.extend((msg, ':\n'))
1122    if missing:
1123      errors.append('Expected, but missing:\n  %r\n' % missing)
1124    if unexpected:
1125      errors.append('Unexpected, but present:\n  %r\n' % unexpected)
1126    if missing or unexpected:
1127      self.fail(''.join(errors))
1128
1129  # unittest.TestCase.assertMultiLineEqual works very similarly, but it
1130  # has a different error format. However, I find this slightly more readable.
1131  def assertMultiLineEqual(self, first, second, msg=None, **kwargs):
1132    """Asserts that two multi-line strings are equal."""
1133    assert isinstance(first,
1134                      str), ('First argument is not a string: %r' % (first,))
1135    assert isinstance(second,
1136                      str), ('Second argument is not a string: %r' % (second,))
1137    line_limit = kwargs.pop('line_limit', 0)
1138    if kwargs:
1139      raise TypeError('Unexpected keyword args {}'.format(tuple(kwargs)))
1140
1141    if first == second:
1142      return
1143    if msg:
1144      failure_message = [msg + ':\n']
1145    else:
1146      failure_message = ['\n']
1147    if line_limit:
1148      line_limit += len(failure_message)
1149    for line in difflib.ndiff(first.splitlines(True), second.splitlines(True)):
1150      failure_message.append(line)
1151      if not line.endswith('\n'):
1152        failure_message.append('\n')
1153    if line_limit and len(failure_message) > line_limit:
1154      n_omitted = len(failure_message) - line_limit
1155      failure_message = failure_message[:line_limit]
1156      failure_message.append(
1157          '(... and {} more delta lines omitted for brevity.)\n'.format(
1158              n_omitted))
1159
1160    raise self.failureException(''.join(failure_message))
1161
1162  def assertBetween(self, value, minv, maxv, msg=None):
1163    """Asserts that value is between minv and maxv (inclusive)."""
1164    msg = self._formatMessage(msg,
1165                              '"%r" unexpectedly not between "%r" and "%r"' %
1166                              (value, minv, maxv))
1167    self.assertTrue(minv <= value, msg)
1168    self.assertTrue(maxv >= value, msg)
1169
1170  def assertRegexMatch(self, actual_str, regexes, message=None):
1171    r"""Asserts that at least one regex in regexes matches str.
1172
1173    If possible you should use `assertRegex`, which is a simpler
1174    version of this method. `assertRegex` takes a single regular
1175    expression (a string or re compiled object) instead of a list.
1176
1177    Notes:
1178
1179    1. This function uses substring matching, i.e. the matching
1180       succeeds if *any* substring of the error message matches *any*
1181       regex in the list.  This is more convenient for the user than
1182       full-string matching.
1183
1184    2. If regexes is the empty list, the matching will always fail.
1185
1186    3. Use regexes=[''] for a regex that will always pass.
1187
1188    4. '.' matches any single character *except* the newline.  To
1189       match any character, use '(.|\n)'.
1190
1191    5. '^' matches the beginning of each line, not just the beginning
1192       of the string.  Similarly, '$' matches the end of each line.
1193
1194    6. An exception will be thrown if regexes contains an invalid
1195       regex.
1196
1197    Args:
1198      actual_str:  The string we try to match with the items in regexes.
1199      regexes:  The regular expressions we want to match against str.
1200          See "Notes" above for detailed notes on how this is interpreted.
1201      message:  The message to be printed if the test fails.
1202    """
1203    if isinstance(regexes, _TEXT_OR_BINARY_TYPES):
1204      self.fail('regexes is string or bytes; use assertRegex instead.',
1205                message)
1206    if not regexes:
1207      self.fail('No regexes specified.', message)
1208
1209    regex_type = type(regexes[0])
1210    for regex in regexes[1:]:
1211      if type(regex) is not regex_type:  # pylint: disable=unidiomatic-typecheck
1212        self.fail('regexes list must all be the same type.', message)
1213
1214    if regex_type is bytes and isinstance(actual_str, str):
1215      regexes = [regex.decode('utf-8') for regex in regexes]
1216      regex_type = str
1217    elif regex_type is str and isinstance(actual_str, bytes):
1218      regexes = [regex.encode('utf-8') for regex in regexes]
1219      regex_type = bytes
1220
1221    if regex_type is str:
1222      regex = u'(?:%s)' % u')|(?:'.join(regexes)
1223    elif regex_type is bytes:
1224      regex = b'(?:' + (b')|(?:'.join(regexes)) + b')'
1225    else:
1226      self.fail('Only know how to deal with unicode str or bytes regexes.',
1227                message)
1228
1229    if not re.search(regex, actual_str, re.MULTILINE):
1230      self.fail('"%s" does not contain any of these regexes: %s.' %
1231                (actual_str, regexes), message)
1232
1233  def assertCommandSucceeds(self, command, regexes=(b'',), env=None,
1234                            close_fds=True, msg=None):
1235    """Asserts that a shell command succeeds (i.e. exits with code 0).
1236
1237    Args:
1238      command: List or string representing the command to run.
1239      regexes: List of regular expression byte strings that match success.
1240      env: Dictionary of environment variable settings. If None, no environment
1241          variables will be set for the child process. This is to make tests
1242          more hermetic. NOTE: this behavior is different than the standard
1243          subprocess module.
1244      close_fds: Whether or not to close all open fd's in the child after
1245          forking.
1246      msg: Optional message to report on failure.
1247    """
1248    (ret_code, err) = get_command_stderr(command, env, close_fds)
1249
1250    # We need bytes regexes here because `err` is bytes.
1251    # Accommodate code which listed their output regexes w/o the b'' prefix by
1252    # converting them to bytes for the user.
1253    if isinstance(regexes[0], str):
1254      regexes = [regex.encode('utf-8') for regex in regexes]
1255
1256    command_string = get_command_string(command)
1257    self.assertEqual(
1258        ret_code, 0,
1259        self._formatMessage(msg,
1260                            'Running command\n'
1261                            '%s failed with error code %s and message\n'
1262                            '%s' % (_quote_long_string(command_string),
1263                                    ret_code,
1264                                    _quote_long_string(err)))
1265    )
1266    self.assertRegexMatch(
1267        err,
1268        regexes,
1269        message=self._formatMessage(
1270            msg,
1271            'Running command\n'
1272            '%s failed with error code %s and message\n'
1273            '%s which matches no regex in %s' % (
1274                _quote_long_string(command_string),
1275                ret_code,
1276                _quote_long_string(err),
1277                regexes)))
1278
1279  def assertCommandFails(self, command, regexes, env=None, close_fds=True,
1280                         msg=None):
1281    """Asserts a shell command fails and the error matches a regex in a list.
1282
1283    Args:
1284      command: List or string representing the command to run.
1285      regexes: the list of regular expression strings.
1286      env: Dictionary of environment variable settings. If None, no environment
1287          variables will be set for the child process. This is to make tests
1288          more hermetic. NOTE: this behavior is different than the standard
1289          subprocess module.
1290      close_fds: Whether or not to close all open fd's in the child after
1291          forking.
1292      msg: Optional message to report on failure.
1293    """
1294    (ret_code, err) = get_command_stderr(command, env, close_fds)
1295
1296    # We need bytes regexes here because `err` is bytes.
1297    # Accommodate code which listed their output regexes w/o the b'' prefix by
1298    # converting them to bytes for the user.
1299    if isinstance(regexes[0], str):
1300      regexes = [regex.encode('utf-8') for regex in regexes]
1301
1302    command_string = get_command_string(command)
1303    self.assertNotEqual(
1304        ret_code, 0,
1305        self._formatMessage(msg, 'The following command succeeded '
1306                            'while expected to fail:\n%s' %
1307                            _quote_long_string(command_string)))
1308    self.assertRegexMatch(
1309        err,
1310        regexes,
1311        message=self._formatMessage(
1312            msg,
1313            'Running command\n'
1314            '%s failed with error code %s and message\n'
1315            '%s which matches no regex in %s' % (
1316                _quote_long_string(command_string),
1317                ret_code,
1318                _quote_long_string(err),
1319                regexes)))
1320
1321  class _AssertRaisesContext(object):
1322
1323    def __init__(self, expected_exception, test_case, test_func, msg=None):
1324      self.expected_exception = expected_exception
1325      self.test_case = test_case
1326      self.test_func = test_func
1327      self.msg = msg
1328
1329    def __enter__(self):
1330      return self
1331
1332    def __exit__(self, exc_type, exc_value, tb):
1333      if exc_type is None:
1334        self.test_case.fail(self.expected_exception.__name__ + ' not raised',
1335                            self.msg)
1336      if not issubclass(exc_type, self.expected_exception):
1337        return False
1338      self.test_func(exc_value)
1339      if exc_value:
1340        self.exception = exc_value.with_traceback(None)
1341      return True
1342
1343  @typing.overload
1344  def assertRaisesWithPredicateMatch(
1345      self, expected_exception, predicate) -> _AssertRaisesContext:
1346    # The purpose of this return statement is to work around
1347    # https://github.com/PyCQA/pylint/issues/5273; it is otherwise ignored.
1348    return self._AssertRaisesContext(None, None, None)
1349
1350  @typing.overload
1351  def assertRaisesWithPredicateMatch(
1352      self, expected_exception, predicate, callable_obj: Callable[..., Any],
1353      *args, **kwargs) -> None:
1354    # The purpose of this return statement is to work around
1355    # https://github.com/PyCQA/pylint/issues/5273; it is otherwise ignored.
1356    return self._AssertRaisesContext(None, None, None)
1357
1358  def assertRaisesWithPredicateMatch(self, expected_exception, predicate,
1359                                     callable_obj=None, *args, **kwargs):
1360    """Asserts that exception is thrown and predicate(exception) is true.
1361
1362    Args:
1363      expected_exception: Exception class expected to be raised.
1364      predicate: Function of one argument that inspects the passed-in exception
1365          and returns True (success) or False (please fail the test).
1366      callable_obj: Function to be called.
1367      *args: Extra args.
1368      **kwargs: Extra keyword args.
1369
1370    Returns:
1371      A context manager if callable_obj is None. Otherwise, None.
1372
1373    Raises:
1374      self.failureException if callable_obj does not raise a matching exception.
1375    """
1376    def Check(err):
1377      self.assertTrue(predicate(err),
1378                      '%r does not match predicate %r' % (err, predicate))
1379
1380    context = self._AssertRaisesContext(expected_exception, self, Check)
1381    if callable_obj is None:
1382      return context
1383    with context:
1384      callable_obj(*args, **kwargs)
1385
1386  @typing.overload
1387  def assertRaisesWithLiteralMatch(
1388      self, expected_exception, expected_exception_message
1389  ) -> _AssertRaisesContext:
1390    # The purpose of this return statement is to work around
1391    # https://github.com/PyCQA/pylint/issues/5273; it is otherwise ignored.
1392    return self._AssertRaisesContext(None, None, None)
1393
1394  @typing.overload
1395  def assertRaisesWithLiteralMatch(
1396      self, expected_exception, expected_exception_message,
1397      callable_obj: Callable[..., Any], *args, **kwargs) -> None:
1398    # The purpose of this return statement is to work around
1399    # https://github.com/PyCQA/pylint/issues/5273; it is otherwise ignored.
1400    return self._AssertRaisesContext(None, None, None)
1401
1402  def assertRaisesWithLiteralMatch(self, expected_exception,
1403                                   expected_exception_message,
1404                                   callable_obj=None, *args, **kwargs):
1405    """Asserts that the message in a raised exception equals the given string.
1406
1407    Unlike assertRaisesRegex, this method takes a literal string, not
1408    a regular expression.
1409
1410    with self.assertRaisesWithLiteralMatch(ExType, 'message'):
1411      DoSomething()
1412
1413    Args:
1414      expected_exception: Exception class expected to be raised.
1415      expected_exception_message: String message expected in the raised
1416          exception.  For a raise exception e, expected_exception_message must
1417          equal str(e).
1418      callable_obj: Function to be called, or None to return a context.
1419      *args: Extra args.
1420      **kwargs: Extra kwargs.
1421
1422    Returns:
1423      A context manager if callable_obj is None. Otherwise, None.
1424
1425    Raises:
1426      self.failureException if callable_obj does not raise a matching exception.
1427    """
1428    def Check(err):
1429      actual_exception_message = str(err)
1430      self.assertTrue(expected_exception_message == actual_exception_message,
1431                      'Exception message does not match.\n'
1432                      'Expected: %r\n'
1433                      'Actual: %r' % (expected_exception_message,
1434                                      actual_exception_message))
1435
1436    context = self._AssertRaisesContext(expected_exception, self, Check)
1437    if callable_obj is None:
1438      return context
1439    with context:
1440      callable_obj(*args, **kwargs)
1441
1442  def assertContainsInOrder(self, strings, target, msg=None):
1443    """Asserts that the strings provided are found in the target in order.
1444
1445    This may be useful for checking HTML output.
1446
1447    Args:
1448      strings: A list of strings, such as [ 'fox', 'dog' ]
1449      target: A target string in which to look for the strings, such as
1450          'The quick brown fox jumped over the lazy dog'.
1451      msg: Optional message to report on failure.
1452    """
1453    if isinstance(strings, (bytes, unicode if str is bytes else str)):
1454      strings = (strings,)
1455
1456    current_index = 0
1457    last_string = None
1458    for string in strings:
1459      index = target.find(str(string), current_index)
1460      if index == -1 and current_index == 0:
1461        self.fail("Did not find '%s' in '%s'" %
1462                  (string, target), msg)
1463      elif index == -1:
1464        self.fail("Did not find '%s' after '%s' in '%s'" %
1465                  (string, last_string, target), msg)
1466      last_string = string
1467      current_index = index
1468
1469  def assertContainsSubsequence(self, container, subsequence, msg=None):
1470    """Asserts that "container" contains "subsequence" as a subsequence.
1471
1472    Asserts that "container" contains all the elements of "subsequence", in
1473    order, but possibly with other elements interspersed. For example, [1, 2, 3]
1474    is a subsequence of [0, 0, 1, 2, 0, 3, 0] but not of [0, 0, 1, 3, 0, 2, 0].
1475
1476    Args:
1477      container: the list we're testing for subsequence inclusion.
1478      subsequence: the list we hope will be a subsequence of container.
1479      msg: Optional message to report on failure.
1480    """
1481    first_nonmatching = None
1482    reversed_container = list(reversed(container))
1483    subsequence = list(subsequence)
1484
1485    for e in subsequence:
1486      if e not in reversed_container:
1487        first_nonmatching = e
1488        break
1489      while e != reversed_container.pop():
1490        pass
1491
1492    if first_nonmatching is not None:
1493      self.fail('%s not a subsequence of %s. First non-matching element: %s' %
1494                (subsequence, container, first_nonmatching), msg)
1495
1496  def assertContainsExactSubsequence(self, container, subsequence, msg=None):
1497    """Asserts that "container" contains "subsequence" as an exact subsequence.
1498
1499    Asserts that "container" contains all the elements of "subsequence", in
1500    order, and without other elements interspersed. For example, [1, 2, 3] is an
1501    exact subsequence of [0, 0, 1, 2, 3, 0] but not of [0, 0, 1, 2, 0, 3, 0].
1502
1503    Args:
1504      container: the list we're testing for subsequence inclusion.
1505      subsequence: the list we hope will be an exact subsequence of container.
1506      msg: Optional message to report on failure.
1507    """
1508    container = list(container)
1509    subsequence = list(subsequence)
1510    longest_match = 0
1511
1512    for start in range(1 + len(container) - len(subsequence)):
1513      if longest_match == len(subsequence):
1514        break
1515      index = 0
1516      while (index < len(subsequence) and
1517             subsequence[index] == container[start + index]):
1518        index += 1
1519      longest_match = max(longest_match, index)
1520
1521    if longest_match < len(subsequence):
1522      self.fail('%s not an exact subsequence of %s. '
1523                'Longest matching prefix: %s' %
1524                (subsequence, container, subsequence[:longest_match]), msg)
1525
1526  def assertTotallyOrdered(self, *groups, **kwargs):
1527    """Asserts that total ordering has been implemented correctly.
1528
1529    For example, say you have a class A that compares only on its attribute x.
1530    Comparators other than ``__lt__`` are omitted for brevity::
1531
1532        class A(object):
1533          def __init__(self, x, y):
1534            self.x = x
1535            self.y = y
1536
1537          def __hash__(self):
1538            return hash(self.x)
1539
1540          def __lt__(self, other):
1541            try:
1542              return self.x < other.x
1543            except AttributeError:
1544              return NotImplemented
1545
1546    assertTotallyOrdered will check that instances can be ordered correctly.
1547    For example::
1548
1549        self.assertTotallyOrdered(
1550            [None],  # None should come before everything else.
1551            [1],  # Integers sort earlier.
1552            [A(1, 'a')],
1553            [A(2, 'b')],  # 2 is after 1.
1554            [A(3, 'c'), A(3, 'd')],  # The second argument is irrelevant.
1555            [A(4, 'z')],
1556            ['foo'])  # Strings sort last.
1557
1558    Args:
1559      *groups: A list of groups of elements.  Each group of elements is a list
1560        of objects that are equal.  The elements in each group must be less
1561        than the elements in the group after it.  For example, these groups are
1562        totally ordered: ``[None]``, ``[1]``, ``[2, 2]``, ``[3]``.
1563      **kwargs: optional msg keyword argument can be passed.
1564    """
1565
1566    def CheckOrder(small, big):
1567      """Ensures small is ordered before big."""
1568      self.assertFalse(small == big,
1569                       self._formatMessage(msg, '%r unexpectedly equals %r' %
1570                                           (small, big)))
1571      self.assertTrue(small != big,
1572                      self._formatMessage(msg, '%r unexpectedly equals %r' %
1573                                          (small, big)))
1574      self.assertLess(small, big, msg)
1575      self.assertFalse(big < small,
1576                       self._formatMessage(msg,
1577                                           '%r unexpectedly less than %r' %
1578                                           (big, small)))
1579      self.assertLessEqual(small, big, msg)
1580      self.assertFalse(big <= small, self._formatMessage(
1581          '%r unexpectedly less than or equal to %r' % (big, small), msg
1582      ))
1583      self.assertGreater(big, small, msg)
1584      self.assertFalse(small > big,
1585                       self._formatMessage(msg,
1586                                           '%r unexpectedly greater than %r' %
1587                                           (small, big)))
1588      self.assertGreaterEqual(big, small)
1589      self.assertFalse(small >= big, self._formatMessage(
1590          msg,
1591          '%r unexpectedly greater than or equal to %r' % (small, big)))
1592
1593    def CheckEqual(a, b):
1594      """Ensures that a and b are equal."""
1595      self.assertEqual(a, b, msg)
1596      self.assertFalse(a != b,
1597                       self._formatMessage(msg, '%r unexpectedly unequals %r' %
1598                                           (a, b)))
1599
1600      # Objects that compare equal must hash to the same value, but this only
1601      # applies if both objects are hashable.
1602      if (isinstance(a, abc.Hashable) and
1603          isinstance(b, abc.Hashable)):
1604        self.assertEqual(
1605            hash(a), hash(b),
1606            self._formatMessage(
1607                msg, 'hash %d of %r unexpectedly not equal to hash %d of %r' %
1608                (hash(a), a, hash(b), b)))
1609
1610      self.assertFalse(a < b,
1611                       self._formatMessage(msg,
1612                                           '%r unexpectedly less than %r' %
1613                                           (a, b)))
1614      self.assertFalse(b < a,
1615                       self._formatMessage(msg,
1616                                           '%r unexpectedly less than %r' %
1617                                           (b, a)))
1618      self.assertLessEqual(a, b, msg)
1619      self.assertLessEqual(b, a, msg)  # pylint: disable=arguments-out-of-order
1620      self.assertFalse(a > b,
1621                       self._formatMessage(msg,
1622                                           '%r unexpectedly greater than %r' %
1623                                           (a, b)))
1624      self.assertFalse(b > a,
1625                       self._formatMessage(msg,
1626                                           '%r unexpectedly greater than %r' %
1627                                           (b, a)))
1628      self.assertGreaterEqual(a, b, msg)
1629      self.assertGreaterEqual(b, a, msg)  # pylint: disable=arguments-out-of-order
1630
1631    msg = kwargs.get('msg')
1632
1633    # For every combination of elements, check the order of every pair of
1634    # elements.
1635    for elements in itertools.product(*groups):
1636      elements = list(elements)
1637      for index, small in enumerate(elements[:-1]):
1638        for big in elements[index + 1:]:
1639          CheckOrder(small, big)
1640
1641    # Check that every element in each group is equal.
1642    for group in groups:
1643      for a in group:
1644        CheckEqual(a, a)
1645      for a, b in itertools.product(group, group):
1646        CheckEqual(a, b)
1647
1648  def assertDictEqual(self, a, b, msg=None):
1649    """Raises AssertionError if a and b are not equal dictionaries.
1650
1651    Args:
1652      a: A dict, the expected value.
1653      b: A dict, the actual value.
1654      msg: An optional str, the associated message.
1655
1656    Raises:
1657      AssertionError: if the dictionaries are not equal.
1658    """
1659    self.assertIsInstance(a, dict, self._formatMessage(
1660        msg,
1661        'First argument is not a dictionary'
1662    ))
1663    self.assertIsInstance(b, dict, self._formatMessage(
1664        msg,
1665        'Second argument is not a dictionary'
1666    ))
1667
1668    def Sorted(list_of_items):
1669      try:
1670        return sorted(list_of_items)  # In 3.3, unordered are possible.
1671      except TypeError:
1672        return list_of_items
1673
1674    if a == b:
1675      return
1676    a_items = Sorted(list(a.items()))
1677    b_items = Sorted(list(b.items()))
1678
1679    unexpected = []
1680    missing = []
1681    different = []
1682
1683    safe_repr = unittest.util.safe_repr  # pytype: disable=module-attr
1684
1685    def Repr(dikt):
1686      """Deterministic repr for dict."""
1687      # Sort the entries based on their repr, not based on their sort order,
1688      # which will be non-deterministic across executions, for many types.
1689      entries = sorted((safe_repr(k), safe_repr(v)) for k, v in dikt.items())
1690      return '{%s}' % (', '.join('%s: %s' % pair for pair in entries))
1691
1692    message = ['%s != %s%s' % (Repr(a), Repr(b), ' (%s)' % msg if msg else '')]
1693
1694    # The standard library default output confounds lexical difference with
1695    # value difference; treat them separately.
1696    for a_key, a_value in a_items:
1697      if a_key not in b:
1698        missing.append((a_key, a_value))
1699      elif a_value != b[a_key]:
1700        different.append((a_key, a_value, b[a_key]))
1701
1702    for b_key, b_value in b_items:
1703      if b_key not in a:
1704        unexpected.append((b_key, b_value))
1705
1706    if unexpected:
1707      message.append(
1708          'Unexpected, but present entries:\n%s' % ''.join(
1709              '%s: %s\n' % (safe_repr(k), safe_repr(v)) for k, v in unexpected))
1710
1711    if different:
1712      message.append(
1713          'repr() of differing entries:\n%s' % ''.join(
1714              '%s: %s != %s\n' % (safe_repr(k), safe_repr(a_value),
1715                                  safe_repr(b_value))
1716              for k, a_value, b_value in different))
1717
1718    if missing:
1719      message.append(
1720          'Missing entries:\n%s' % ''.join(
1721              ('%s: %s\n' % (safe_repr(k), safe_repr(v)) for k, v in missing)))
1722
1723    raise self.failureException('\n'.join(message))
1724
1725  def assertUrlEqual(self, a, b, msg=None):
1726    """Asserts that urls are equal, ignoring ordering of query params."""
1727    parsed_a = parse.urlparse(a)
1728    parsed_b = parse.urlparse(b)
1729    self.assertEqual(parsed_a.scheme, parsed_b.scheme, msg)
1730    self.assertEqual(parsed_a.netloc, parsed_b.netloc, msg)
1731    self.assertEqual(parsed_a.path, parsed_b.path, msg)
1732    self.assertEqual(parsed_a.fragment, parsed_b.fragment, msg)
1733    self.assertEqual(sorted(parsed_a.params.split(';')),
1734                     sorted(parsed_b.params.split(';')), msg)
1735    self.assertDictEqual(
1736        parse.parse_qs(parsed_a.query, keep_blank_values=True),
1737        parse.parse_qs(parsed_b.query, keep_blank_values=True), msg)
1738
1739  def assertSameStructure(self, a, b, aname='a', bname='b', msg=None):
1740    """Asserts that two values contain the same structural content.
1741
1742    The two arguments should be data trees consisting of trees of dicts and
1743    lists. They will be deeply compared by walking into the contents of dicts
1744    and lists; other items will be compared using the == operator.
1745    If the two structures differ in content, the failure message will indicate
1746    the location within the structures where the first difference is found.
1747    This may be helpful when comparing large structures.
1748
1749    Mixed Sequence and Set types are supported. Mixed Mapping types are
1750    supported, but the order of the keys will not be considered in the
1751    comparison.
1752
1753    Args:
1754      a: The first structure to compare.
1755      b: The second structure to compare.
1756      aname: Variable name to use for the first structure in assertion messages.
1757      bname: Variable name to use for the second structure.
1758      msg: Additional text to include in the failure message.
1759    """
1760
1761    # Accumulate all the problems found so we can report all of them at once
1762    # rather than just stopping at the first
1763    problems = []
1764
1765    _walk_structure_for_problems(a, b, aname, bname, problems)
1766
1767    # Avoid spamming the user toooo much
1768    if self.maxDiff is not None:
1769      max_problems_to_show = self.maxDiff // 80
1770      if len(problems) > max_problems_to_show:
1771        problems = problems[0:max_problems_to_show-1] + ['...']
1772
1773    if problems:
1774      self.fail('; '.join(problems), msg)
1775
1776  def assertJsonEqual(self, first, second, msg=None):
1777    """Asserts that the JSON objects defined in two strings are equal.
1778
1779    A summary of the differences will be included in the failure message
1780    using assertSameStructure.
1781
1782    Args:
1783      first: A string containing JSON to decode and compare to second.
1784      second: A string containing JSON to decode and compare to first.
1785      msg: Additional text to include in the failure message.
1786    """
1787    try:
1788      first_structured = json.loads(first)
1789    except ValueError as e:
1790      raise ValueError(self._formatMessage(
1791          msg,
1792          'could not decode first JSON value %s: %s' % (first, e)))
1793
1794    try:
1795      second_structured = json.loads(second)
1796    except ValueError as e:
1797      raise ValueError(self._formatMessage(
1798          msg,
1799          'could not decode second JSON value %s: %s' % (second, e)))
1800
1801    self.assertSameStructure(first_structured, second_structured,
1802                             aname='first', bname='second', msg=msg)
1803
1804  def _getAssertEqualityFunc(self, first, second):
1805    # type: (Any, Any) -> Callable[..., None]
1806    try:
1807      return super(TestCase, self)._getAssertEqualityFunc(first, second)
1808    except AttributeError:
1809      # This is a workaround if unittest.TestCase.__init__ was never run.
1810      # It usually means that somebody created a subclass just for the
1811      # assertions and has overridden __init__. "assertTrue" is a safe
1812      # value that will not make __init__ raise a ValueError.
1813      test_method = getattr(self, '_testMethodName', 'assertTrue')
1814      super(TestCase, self).__init__(test_method)
1815
1816    return super(TestCase, self)._getAssertEqualityFunc(first, second)
1817
1818  def fail(self, msg=None, prefix=None):
1819    """Fail immediately with the given message, optionally prefixed."""
1820    return super(TestCase, self).fail(self._formatMessage(prefix, msg))
1821
1822
1823def _sorted_list_difference(expected, actual):
1824  # type: (List[_T], List[_T]) -> Tuple[List[_T], List[_T]]
1825  """Finds elements in only one or the other of two, sorted input lists.
1826
1827  Returns a two-element tuple of lists.  The first list contains those
1828  elements in the "expected" list but not in the "actual" list, and the
1829  second contains those elements in the "actual" list but not in the
1830  "expected" list.  Duplicate elements in either input list are ignored.
1831
1832  Args:
1833    expected:  The list we expected.
1834    actual:  The list we actually got.
1835  Returns:
1836    (missing, unexpected)
1837    missing: items in expected that are not in actual.
1838    unexpected: items in actual that are not in expected.
1839  """
1840  i = j = 0
1841  missing = []
1842  unexpected = []
1843  while True:
1844    try:
1845      e = expected[i]
1846      a = actual[j]
1847      if e < a:
1848        missing.append(e)
1849        i += 1
1850        while expected[i] == e:
1851          i += 1
1852      elif e > a:
1853        unexpected.append(a)
1854        j += 1
1855        while actual[j] == a:
1856          j += 1
1857      else:
1858        i += 1
1859        try:
1860          while expected[i] == e:
1861            i += 1
1862        finally:
1863          j += 1
1864          while actual[j] == a:
1865            j += 1
1866    except IndexError:
1867      missing.extend(expected[i:])
1868      unexpected.extend(actual[j:])
1869      break
1870  return missing, unexpected
1871
1872
1873def _are_both_of_integer_type(a, b):
1874  # type: (object, object) -> bool
1875  return isinstance(a, int) and isinstance(b, int)
1876
1877
1878def _are_both_of_sequence_type(a, b):
1879  # type: (object, object) -> bool
1880  return isinstance(a, abc.Sequence) and isinstance(
1881      b, abc.Sequence) and not isinstance(
1882          a, _TEXT_OR_BINARY_TYPES) and not isinstance(b, _TEXT_OR_BINARY_TYPES)
1883
1884
1885def _are_both_of_set_type(a, b):
1886  # type: (object, object) -> bool
1887  return isinstance(a, abc.Set) and isinstance(b, abc.Set)
1888
1889
1890def _are_both_of_mapping_type(a, b):
1891  # type: (object, object) -> bool
1892  return isinstance(a, abc.Mapping) and isinstance(
1893      b, abc.Mapping)
1894
1895
1896def _walk_structure_for_problems(a, b, aname, bname, problem_list):
1897  """The recursive comparison behind assertSameStructure."""
1898  if type(a) != type(b) and not (  # pylint: disable=unidiomatic-typecheck
1899      _are_both_of_integer_type(a, b) or _are_both_of_sequence_type(a, b) or
1900      _are_both_of_set_type(a, b) or _are_both_of_mapping_type(a, b)):
1901    # We do not distinguish between int and long types as 99.99% of Python 2
1902    # code should never care.  They collapse into a single type in Python 3.
1903    problem_list.append('%s is a %r but %s is a %r' %
1904                        (aname, type(a), bname, type(b)))
1905    # If they have different types there's no point continuing
1906    return
1907
1908  if isinstance(a, abc.Set):
1909    for k in a:
1910      if k not in b:
1911        problem_list.append(
1912            '%s has %r but %s does not' % (aname, k, bname))
1913    for k in b:
1914      if k not in a:
1915        problem_list.append('%s lacks %r but %s has it' % (aname, k, bname))
1916
1917  # NOTE: a or b could be a defaultdict, so we must take care that the traversal
1918  # doesn't modify the data.
1919  elif isinstance(a, abc.Mapping):
1920    for k in a:
1921      if k in b:
1922        _walk_structure_for_problems(
1923            a[k], b[k], '%s[%r]' % (aname, k), '%s[%r]' % (bname, k),
1924            problem_list)
1925      else:
1926        problem_list.append(
1927            "%s has [%r] with value %r but it's missing in %s" %
1928            (aname, k, a[k], bname))
1929    for k in b:
1930      if k not in a:
1931        problem_list.append(
1932            '%s lacks [%r] but %s has it with value %r' %
1933            (aname, k, bname, b[k]))
1934
1935  # Strings/bytes are Sequences but we'll just do those with regular !=
1936  elif (isinstance(a, abc.Sequence) and
1937        not isinstance(a, _TEXT_OR_BINARY_TYPES)):
1938    minlen = min(len(a), len(b))
1939    for i in range(minlen):
1940      _walk_structure_for_problems(
1941          a[i], b[i], '%s[%d]' % (aname, i), '%s[%d]' % (bname, i),
1942          problem_list)
1943    for i in range(minlen, len(a)):
1944      problem_list.append('%s has [%i] with value %r but %s does not' %
1945                          (aname, i, a[i], bname))
1946    for i in range(minlen, len(b)):
1947      problem_list.append('%s lacks [%i] but %s has it with value %r' %
1948                          (aname, i, bname, b[i]))
1949
1950  else:
1951    if a != b:
1952      problem_list.append('%s is %r but %s is %r' % (aname, a, bname, b))
1953
1954
1955def get_command_string(command):
1956  """Returns an escaped string that can be used as a shell command.
1957
1958  Args:
1959    command: List or string representing the command to run.
1960  Returns:
1961    A string suitable for use as a shell command.
1962  """
1963  if isinstance(command, str):
1964    return command
1965  else:
1966    if os.name == 'nt':
1967      return ' '.join(command)
1968    else:
1969      # The following is identical to Python 3's shlex.quote function.
1970      command_string = ''
1971      for word in command:
1972        # Single quote word, and replace each ' in word with '"'"'
1973        command_string += "'" + word.replace("'", "'\"'\"'") + "' "
1974      return command_string[:-1]
1975
1976
1977def get_command_stderr(command, env=None, close_fds=True):
1978  """Runs the given shell command and returns a tuple.
1979
1980  Args:
1981    command: List or string representing the command to run.
1982    env: Dictionary of environment variable settings. If None, no environment
1983        variables will be set for the child process. This is to make tests
1984        more hermetic. NOTE: this behavior is different than the standard
1985        subprocess module.
1986    close_fds: Whether or not to close all open fd's in the child after forking.
1987        On Windows, this is ignored and close_fds is always False.
1988
1989  Returns:
1990    Tuple of (exit status, text printed to stdout and stderr by the command).
1991  """
1992  if env is None: env = {}
1993  if os.name == 'nt':
1994    # Windows does not support setting close_fds to True while also redirecting
1995    # standard handles.
1996    close_fds = False
1997
1998  use_shell = isinstance(command, str)
1999  process = subprocess.Popen(
2000      command,
2001      close_fds=close_fds,
2002      env=env,
2003      shell=use_shell,
2004      stderr=subprocess.STDOUT,
2005      stdout=subprocess.PIPE)
2006  output = process.communicate()[0]
2007  exit_status = process.wait()
2008  return (exit_status, output)
2009
2010
2011def _quote_long_string(s):
2012  # type: (Union[Text, bytes, bytearray]) -> Text
2013  """Quotes a potentially multi-line string to make the start and end obvious.
2014
2015  Args:
2016    s: A string.
2017
2018  Returns:
2019    The quoted string.
2020  """
2021  if isinstance(s, (bytes, bytearray)):
2022    try:
2023      s = s.decode('utf-8')
2024    except UnicodeDecodeError:
2025      s = str(s)
2026  return ('8<-----------\n' +
2027          s + '\n' +
2028          '----------->8\n')
2029
2030
2031def print_python_version():
2032  # type: () -> None
2033  # Having this in the test output logs by default helps debugging when all
2034  # you've got is the log and no other idea of which Python was used.
2035  sys.stderr.write('Running tests under Python {0[0]}.{0[1]}.{0[2]}: '
2036                   '{1}\n'.format(
2037                       sys.version_info,
2038                       sys.executable if sys.executable else 'embedded.'))
2039
2040
2041def main(*args, **kwargs):
2042  # type: (Text, Any) -> None
2043  """Executes a set of Python unit tests.
2044
2045  Usually this function is called without arguments, so the
2046  unittest.TestProgram instance will get created with the default settings,
2047  so it will run all test methods of all TestCase classes in the ``__main__``
2048  module.
2049
2050  Args:
2051    *args: Positional arguments passed through to
2052        ``unittest.TestProgram.__init__``.
2053    **kwargs: Keyword arguments passed through to
2054        ``unittest.TestProgram.__init__``.
2055  """
2056  print_python_version()
2057  _run_in_app(run_tests, args, kwargs)
2058
2059
2060def _is_in_app_main():
2061  # type: () -> bool
2062  """Returns True iff app.run is active."""
2063  f = sys._getframe().f_back  # pylint: disable=protected-access
2064  while f:
2065    if f.f_code == app.run.__code__:
2066      return True
2067    f = f.f_back
2068  return False
2069
2070
2071def _register_sigterm_with_faulthandler():
2072  # type: () -> None
2073  """Have faulthandler dump stacks on SIGTERM.  Useful to diagnose timeouts."""
2074  if faulthandler and getattr(faulthandler, 'register', None):
2075    # faulthandler.register is not available on Windows.
2076    # faulthandler.enable() is already called by app.run.
2077    try:
2078      faulthandler.register(signal.SIGTERM, chain=True)  # pytype: disable=module-attr
2079    except Exception as e:  # pylint: disable=broad-except
2080      sys.stderr.write('faulthandler.register(SIGTERM) failed '
2081                       '%r; ignoring.\n' % e)
2082
2083
2084def _run_in_app(function, args, kwargs):
2085  # type: (Callable[..., None], Sequence[Text], Mapping[Text, Any]) -> None
2086  """Executes a set of Python unit tests, ensuring app.run.
2087
2088  This is a private function, users should call absltest.main().
2089
2090  _run_in_app calculates argv to be the command-line arguments of this program
2091  (without the flags), sets the default of FLAGS.alsologtostderr to True,
2092  then it calls function(argv, args, kwargs), making sure that `function'
2093  will get called within app.run(). _run_in_app does this by checking whether
2094  it is called by app.run(), or by calling app.run() explicitly.
2095
2096  The reason why app.run has to be ensured is to make sure that
2097  flags are parsed and stripped properly, and other initializations done by
2098  the app module are also carried out, no matter if absltest.run() is called
2099  from within or outside app.run().
2100
2101  If _run_in_app is called from within app.run(), then it will reparse
2102  sys.argv and pass the result without command-line flags into the argv
2103  argument of `function'. The reason why this parsing is needed is that
2104  __main__.main() calls absltest.main() without passing its argv. So the
2105  only way _run_in_app could get to know the argv without the flags is that
2106  it reparses sys.argv.
2107
2108  _run_in_app changes the default of FLAGS.alsologtostderr to True so that the
2109  test program's stderr will contain all the log messages unless otherwise
2110  specified on the command-line. This overrides any explicit assignment to
2111  FLAGS.alsologtostderr by the test program prior to the call to _run_in_app()
2112  (e.g. in __main__.main).
2113
2114  Please note that _run_in_app (and the function it calls) is allowed to make
2115  changes to kwargs.
2116
2117  Args:
2118    function: absltest.run_tests or a similar function. It will be called as
2119        function(argv, args, kwargs) where argv is a list containing the
2120        elements of sys.argv without the command-line flags.
2121    args: Positional arguments passed through to unittest.TestProgram.__init__.
2122    kwargs: Keyword arguments passed through to unittest.TestProgram.__init__.
2123  """
2124  if _is_in_app_main():
2125    _register_sigterm_with_faulthandler()
2126
2127    # Change the default of alsologtostderr from False to True, so the test
2128    # programs's stderr will contain all the log messages.
2129    # If --alsologtostderr=false is specified in the command-line, or user
2130    # has called FLAGS.alsologtostderr = False before, then the value is kept
2131    # False.
2132    FLAGS.set_default('alsologtostderr', True)
2133
2134    # Here we only want to get the `argv` without the flags. To avoid any
2135    # side effects of parsing flags, we temporarily stub out the `parse` method
2136    stored_parse_methods = {}
2137    noop_parse = lambda _: None
2138    for name in FLAGS:
2139      # Avoid any side effects of parsing flags.
2140      stored_parse_methods[name] = FLAGS[name].parse
2141    # This must be a separate loop since multiple flag names (short_name=) can
2142    # point to the same flag object.
2143    for name in FLAGS:
2144      FLAGS[name].parse = noop_parse
2145    try:
2146      argv = FLAGS(sys.argv)
2147    finally:
2148      for name in FLAGS:
2149        FLAGS[name].parse = stored_parse_methods[name]
2150      sys.stdout.flush()
2151
2152    function(argv, args, kwargs)
2153  else:
2154    # Send logging to stderr. Use --alsologtostderr instead of --logtostderr
2155    # in case tests are reading their own logs.
2156    FLAGS.set_default('alsologtostderr', True)
2157
2158    def main_function(argv):
2159      _register_sigterm_with_faulthandler()
2160      function(argv, args, kwargs)
2161
2162    app.run(main=main_function)
2163
2164
2165def _is_suspicious_attribute(testCaseClass, name):
2166  # type: (Type, Text) -> bool
2167  """Returns True if an attribute is a method named like a test method."""
2168  if name.startswith('Test') and len(name) > 4 and name[4].isupper():
2169    attr = getattr(testCaseClass, name)
2170    if inspect.isfunction(attr) or inspect.ismethod(attr):
2171      args = inspect.getfullargspec(attr)
2172      return (len(args.args) == 1 and args.args[0] == 'self' and
2173              args.varargs is None and args.varkw is None and
2174              not args.kwonlyargs)
2175  return False
2176
2177
2178def skipThisClass(reason):
2179  # type: (Text) -> Callable[[_T], _T]
2180  """Skip tests in the decorated TestCase, but not any of its subclasses.
2181
2182  This decorator indicates that this class should skip all its tests, but not
2183  any of its subclasses. Useful for if you want to share testMethod or setUp
2184  implementations between a number of concrete testcase classes.
2185
2186  Example usage, showing how you can share some common test methods between
2187  subclasses. In this example, only ``BaseTest`` will be marked as skipped, and
2188  not RealTest or SecondRealTest::
2189
2190      @absltest.skipThisClass("Shared functionality")
2191      class BaseTest(absltest.TestCase):
2192        def test_simple_functionality(self):
2193          self.assertEqual(self.system_under_test.method(), 1)
2194
2195      class RealTest(BaseTest):
2196        def setUp(self):
2197          super().setUp()
2198          self.system_under_test = MakeSystem(argument)
2199
2200        def test_specific_behavior(self):
2201          ...
2202
2203      class SecondRealTest(BaseTest):
2204        def setUp(self):
2205          super().setUp()
2206          self.system_under_test = MakeSystem(other_arguments)
2207
2208        def test_other_behavior(self):
2209          ...
2210
2211  Args:
2212    reason: The reason we have a skip in place. For instance: 'shared test
2213      methods' or 'shared assertion methods'.
2214
2215  Returns:
2216    Decorator function that will cause a class to be skipped.
2217  """
2218  if isinstance(reason, type):
2219    raise TypeError('Got {!r}, expected reason as string'.format(reason))
2220
2221  def _skip_class(test_case_class):
2222    if not issubclass(test_case_class, unittest.TestCase):
2223      raise TypeError(
2224          'Decorating {!r}, expected TestCase subclass'.format(test_case_class))
2225
2226    # Only shadow the setUpClass method if it is directly defined. If it is
2227    # in the parent class we invoke it via a super() call instead of holding
2228    # a reference to it.
2229    shadowed_setupclass = test_case_class.__dict__.get('setUpClass', None)
2230
2231    @classmethod
2232    def replacement_setupclass(cls, *args, **kwargs):
2233      # Skip this class if it is the one that was decorated with @skipThisClass
2234      if cls is test_case_class:
2235        raise SkipTest(reason)
2236      if shadowed_setupclass:
2237        # Pass along `cls` so the MRO chain doesn't break.
2238        # The original method is a `classmethod` descriptor, which can't
2239        # be directly called, but `__func__` has the underlying function.
2240        return shadowed_setupclass.__func__(cls, *args, **kwargs)
2241      else:
2242        # Because there's no setUpClass() defined directly on test_case_class,
2243        # we call super() ourselves to continue execution of the inheritance
2244        # chain.
2245        return super(test_case_class, cls).setUpClass(*args, **kwargs)
2246
2247    test_case_class.setUpClass = replacement_setupclass
2248    return test_case_class
2249
2250  return _skip_class
2251
2252
2253class TestLoader(unittest.TestLoader):
2254  """A test loader which supports common test features.
2255
2256  Supported features include:
2257   * Banning untested methods with test-like names: methods attached to this
2258     testCase with names starting with `Test` are ignored by the test runner,
2259     and often represent mistakenly-omitted test cases. This loader will raise
2260     a TypeError when attempting to load a TestCase with such methods.
2261   * Randomization of test case execution order (optional).
2262  """
2263
2264  _ERROR_MSG = textwrap.dedent("""Method '%s' is named like a test case but
2265  is not one. This is often a bug. If you want it to be a test method,
2266  name it with 'test' in lowercase. If not, rename the method to not begin
2267  with 'Test'.""")
2268
2269  def __init__(self, *args, **kwds):
2270    super(TestLoader, self).__init__(*args, **kwds)
2271    seed = _get_default_randomize_ordering_seed()
2272    if seed:
2273      self._randomize_ordering_seed = seed
2274      self._random = random.Random(self._randomize_ordering_seed)
2275    else:
2276      self._randomize_ordering_seed = None
2277      self._random = None
2278
2279  def getTestCaseNames(self, testCaseClass):  # pylint:disable=invalid-name
2280    """Validates and returns a (possibly randomized) list of test case names."""
2281    for name in dir(testCaseClass):
2282      if _is_suspicious_attribute(testCaseClass, name):
2283        raise TypeError(TestLoader._ERROR_MSG % name)
2284    names = super(TestLoader, self).getTestCaseNames(testCaseClass)
2285    if self._randomize_ordering_seed is not None:
2286      logging.info(
2287          'Randomizing test order with seed: %d', self._randomize_ordering_seed)
2288      logging.info(
2289          'To reproduce this order, re-run with '
2290          '--test_randomize_ordering_seed=%d', self._randomize_ordering_seed)
2291      self._random.shuffle(names)
2292    return names
2293
2294
2295def get_default_xml_output_filename():
2296  # type: () -> Optional[Text]
2297  if os.environ.get('XML_OUTPUT_FILE'):
2298    return os.environ['XML_OUTPUT_FILE']
2299  elif os.environ.get('RUNNING_UNDER_TEST_DAEMON'):
2300    return os.path.join(os.path.dirname(TEST_TMPDIR.value), 'test_detail.xml')
2301  elif os.environ.get('TEST_XMLOUTPUTDIR'):
2302    return os.path.join(
2303        os.environ['TEST_XMLOUTPUTDIR'],
2304        os.path.splitext(os.path.basename(sys.argv[0]))[0] + '.xml')
2305
2306
2307def _setup_filtering(argv):
2308  # type: (MutableSequence[Text]) -> None
2309  """Implements the bazel test filtering protocol.
2310
2311  The following environment variable is used in this method:
2312
2313    TESTBRIDGE_TEST_ONLY: string, if set, is forwarded to the unittest
2314      framework to use as a test filter. Its value is split with shlex, then:
2315      1. On Python 3.6 and before, split values are passed as positional
2316         arguments on argv.
2317      2. On Python 3.7+, split values are passed to unittest's `-k` flag. Tests
2318         are matched by glob patterns or substring. See
2319         https://docs.python.org/3/library/unittest.html#cmdoption-unittest-k
2320
2321  Args:
2322    argv: the argv to mutate in-place.
2323  """
2324  test_filter = os.environ.get('TESTBRIDGE_TEST_ONLY')
2325  if argv is None or not test_filter:
2326    return
2327
2328  filters = shlex.split(test_filter)
2329  if sys.version_info[:2] >= (3, 7):
2330    filters = ['-k=' + test_filter for test_filter in filters]
2331
2332  argv[1:1] = filters
2333
2334
2335def _setup_test_runner_fail_fast(argv):
2336  # type: (MutableSequence[Text]) -> None
2337  """Implements the bazel test fail fast protocol.
2338
2339  The following environment variable is used in this method:
2340
2341    TESTBRIDGE_TEST_RUNNER_FAIL_FAST=<1|0>
2342
2343  If set to 1, --failfast is passed to the unittest framework to return upon
2344  first failure.
2345
2346  Args:
2347    argv: the argv to mutate in-place.
2348  """
2349
2350  if argv is None:
2351    return
2352
2353  if os.environ.get('TESTBRIDGE_TEST_RUNNER_FAIL_FAST') != '1':
2354    return
2355
2356  argv[1:1] = ['--failfast']
2357
2358
2359def _setup_sharding(custom_loader=None):
2360  # type: (Optional[unittest.TestLoader]) -> unittest.TestLoader
2361  """Implements the bazel sharding protocol.
2362
2363  The following environment variables are used in this method:
2364
2365    TEST_SHARD_STATUS_FILE: string, if set, points to a file. We write a blank
2366      file to tell the test runner that this test implements the test sharding
2367      protocol.
2368
2369    TEST_TOTAL_SHARDS: int, if set, sharding is requested.
2370
2371    TEST_SHARD_INDEX: int, must be set if TEST_TOTAL_SHARDS is set. Specifies
2372      the shard index for this instance of the test process. Must satisfy:
2373      0 <= TEST_SHARD_INDEX < TEST_TOTAL_SHARDS.
2374
2375  Args:
2376    custom_loader: A TestLoader to be made sharded.
2377
2378  Returns:
2379    The test loader for shard-filtering or the standard test loader, depending
2380    on the sharding environment variables.
2381  """
2382
2383  # It may be useful to write the shard file even if the other sharding
2384  # environment variables are not set. Test runners may use this functionality
2385  # to query whether a test binary implements the test sharding protocol.
2386  if 'TEST_SHARD_STATUS_FILE' in os.environ:
2387    try:
2388      with open(os.environ['TEST_SHARD_STATUS_FILE'], 'w') as f:
2389        f.write('')
2390    except IOError:
2391      sys.stderr.write('Error opening TEST_SHARD_STATUS_FILE (%s). Exiting.'
2392                       % os.environ['TEST_SHARD_STATUS_FILE'])
2393      sys.exit(1)
2394
2395  base_loader = custom_loader or TestLoader()
2396  if 'TEST_TOTAL_SHARDS' not in os.environ:
2397    # Not using sharding, use the expected test loader.
2398    return base_loader
2399
2400  total_shards = int(os.environ['TEST_TOTAL_SHARDS'])
2401  shard_index = int(os.environ['TEST_SHARD_INDEX'])
2402
2403  if shard_index < 0 or shard_index >= total_shards:
2404    sys.stderr.write('ERROR: Bad sharding values. index=%d, total=%d\n' %
2405                     (shard_index, total_shards))
2406    sys.exit(1)
2407
2408  # Replace the original getTestCaseNames with one that returns
2409  # the test case names for this shard.
2410  delegate_get_names = base_loader.getTestCaseNames
2411
2412  bucket_iterator = itertools.cycle(range(total_shards))
2413
2414  def getShardedTestCaseNames(testCaseClass):
2415    filtered_names = []
2416    # We need to sort the list of tests in order to determine which tests this
2417    # shard is responsible for; however, it's important to preserve the order
2418    # returned by the base loader, e.g. in the case of randomized test ordering.
2419    ordered_names = delegate_get_names(testCaseClass)
2420    for testcase in sorted(ordered_names):
2421      bucket = next(bucket_iterator)
2422      if bucket == shard_index:
2423        filtered_names.append(testcase)
2424    return [x for x in ordered_names if x in filtered_names]
2425
2426  base_loader.getTestCaseNames = getShardedTestCaseNames
2427  return base_loader
2428
2429
2430# pylint: disable=line-too-long
2431def _run_and_get_tests_result(argv, args, kwargs, xml_test_runner_class):
2432  # type: (MutableSequence[Text], Sequence[Any], MutableMapping[Text, Any], Type) -> unittest.TestResult
2433  # pylint: enable=line-too-long
2434  """Same as run_tests, except it returns the result instead of exiting."""
2435
2436  # The entry from kwargs overrides argv.
2437  argv = kwargs.pop('argv', argv)
2438
2439  # Set up test filtering if requested in environment.
2440  _setup_filtering(argv)
2441  # Set up --failfast as requested in environment
2442  _setup_test_runner_fail_fast(argv)
2443
2444  # Shard the (default or custom) loader if sharding is turned on.
2445  kwargs['testLoader'] = _setup_sharding(kwargs.get('testLoader', None))
2446
2447  # XML file name is based upon (sorted by priority):
2448  # --xml_output_file flag, XML_OUTPUT_FILE variable,
2449  # TEST_XMLOUTPUTDIR variable or RUNNING_UNDER_TEST_DAEMON variable.
2450  if not FLAGS.xml_output_file:
2451    FLAGS.xml_output_file = get_default_xml_output_filename()
2452  xml_output_file = FLAGS.xml_output_file
2453
2454  xml_buffer = None
2455  if xml_output_file:
2456    xml_output_dir = os.path.dirname(xml_output_file)
2457    if xml_output_dir and not os.path.isdir(xml_output_dir):
2458      try:
2459        os.makedirs(xml_output_dir)
2460      except OSError as e:
2461        # File exists error can occur with concurrent tests
2462        if e.errno != errno.EEXIST:
2463          raise
2464    # Fail early if we can't write to the XML output file. This is so that we
2465    # don't waste people's time running tests that will just fail anyways.
2466    with _open(xml_output_file, 'w'):
2467      pass
2468
2469    # We can reuse testRunner if it supports XML output (e. g. by inheriting
2470    # from xml_reporter.TextAndXMLTestRunner). Otherwise we need to use
2471    # xml_reporter.TextAndXMLTestRunner.
2472    if (kwargs.get('testRunner') is not None
2473        and not hasattr(kwargs['testRunner'], 'set_default_xml_stream')):
2474      sys.stderr.write('WARNING: XML_OUTPUT_FILE or --xml_output_file setting '
2475                       'overrides testRunner=%r setting (possibly from --pdb)'
2476                       % (kwargs['testRunner']))
2477      # Passing a class object here allows TestProgram to initialize
2478      # instances based on its kwargs and/or parsed command-line args.
2479      kwargs['testRunner'] = xml_test_runner_class
2480    if kwargs.get('testRunner') is None:
2481      kwargs['testRunner'] = xml_test_runner_class
2482    # Use an in-memory buffer (not backed by the actual file) to store the XML
2483    # report, because some tools modify the file (e.g., create a placeholder
2484    # with partial information, in case the test process crashes).
2485    xml_buffer = io.StringIO()
2486    kwargs['testRunner'].set_default_xml_stream(xml_buffer)  # pytype: disable=attribute-error
2487
2488    # If we've used a seed to randomize test case ordering, we want to record it
2489    # as a top-level attribute in the `testsuites` section of the XML output.
2490    randomize_ordering_seed = getattr(
2491        kwargs['testLoader'], '_randomize_ordering_seed', None)
2492    setter = getattr(kwargs['testRunner'], 'set_testsuites_property', None)
2493    if randomize_ordering_seed and setter:
2494      setter('test_randomize_ordering_seed', randomize_ordering_seed)
2495  elif kwargs.get('testRunner') is None:
2496    kwargs['testRunner'] = _pretty_print_reporter.TextTestRunner
2497
2498  if FLAGS.pdb_post_mortem:
2499    runner = kwargs['testRunner']
2500    # testRunner can be a class or an instance, which must be tested for
2501    # differently.
2502    # Overriding testRunner isn't uncommon, so only enable the debugging
2503    # integration if the runner claims it does; we don't want to accidentally
2504    # clobber something on the runner.
2505    if ((isinstance(runner, type) and
2506         issubclass(runner, _pretty_print_reporter.TextTestRunner)) or
2507        isinstance(runner, _pretty_print_reporter.TextTestRunner)):
2508      runner.run_for_debugging = True
2509
2510  # Make sure tmpdir exists.
2511  if not os.path.isdir(TEST_TMPDIR.value):
2512    try:
2513      os.makedirs(TEST_TMPDIR.value)
2514    except OSError as e:
2515      # Concurrent test might have created the directory.
2516      if e.errno != errno.EEXIST:
2517        raise
2518
2519  # Let unittest.TestProgram.__init__ do its own argv parsing, e.g. for '-v',
2520  # on argv, which is sys.argv without the command-line flags.
2521  kwargs['argv'] = argv
2522
2523  try:
2524    test_program = unittest.TestProgram(*args, **kwargs)
2525    return test_program.result
2526  finally:
2527    if xml_buffer:
2528      try:
2529        with _open(xml_output_file, 'w') as f:
2530          f.write(xml_buffer.getvalue())
2531      finally:
2532        xml_buffer.close()
2533
2534
2535def run_tests(argv, args, kwargs):  # pylint: disable=line-too-long
2536  # type: (MutableSequence[Text], Sequence[Any], MutableMapping[Text, Any]) -> None
2537  # pylint: enable=line-too-long
2538  """Executes a set of Python unit tests.
2539
2540  Most users should call absltest.main() instead of run_tests.
2541
2542  Please note that run_tests should be called from app.run.
2543  Calling absltest.main() would ensure that.
2544
2545  Please note that run_tests is allowed to make changes to kwargs.
2546
2547  Args:
2548    argv: sys.argv with the command-line flags removed from the front, i.e. the
2549      argv with which :func:`app.run()<absl.app.run>` has called
2550      ``__main__.main``. It is passed to
2551      ``unittest.TestProgram.__init__(argv=)``, which does its own flag parsing.
2552      It is ignored if kwargs contains an argv entry.
2553    args: Positional arguments passed through to
2554      ``unittest.TestProgram.__init__``.
2555    kwargs: Keyword arguments passed through to
2556      ``unittest.TestProgram.__init__``.
2557  """
2558  result = _run_and_get_tests_result(
2559      argv, args, kwargs, xml_reporter.TextAndXMLTestRunner)
2560  sys.exit(not result.wasSuccessful())
2561
2562
2563def _rmtree_ignore_errors(path):
2564  # type: (Text) -> None
2565  if os.path.isfile(path):
2566    try:
2567      os.unlink(path)
2568    except OSError:
2569      pass
2570  else:
2571    shutil.rmtree(path, ignore_errors=True)
2572
2573
2574def _get_first_part(path):
2575  # type: (Text) -> Text
2576  parts = path.split(os.sep, 1)
2577  return parts[0]
2578