• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Abseil Authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""A Python test reporter that generates test reports in JUnit XML format."""
16
17import datetime
18import re
19import sys
20import threading
21import time
22import traceback
23import unittest
24from xml.sax import saxutils
25from absl.testing import _pretty_print_reporter
26
27
28# See http://www.w3.org/TR/REC-xml/#NT-Char
29_bad_control_character_codes = set(range(0, 0x20)) - {0x9, 0xA, 0xD}
30
31
32_control_character_conversions = {
33    chr(i): '\\x{:02x}'.format(i) for i in _bad_control_character_codes}
34
35
36_escape_xml_attr_conversions = {
37    '"': '"',
38    "'": ''',
39    '\n': '
',
40    '\t': '	',
41    '\r': '
',
42    ' ': ' '}
43_escape_xml_attr_conversions.update(_control_character_conversions)
44
45
46# When class or module level function fails, unittest/suite.py adds a
47# _ErrorHolder instance instead of a real TestCase, and it has a description
48# like "setUpClass (__main__.MyTestCase)".
49_CLASS_OR_MODULE_LEVEL_TEST_DESC_REGEX = re.compile(r'^(\w+) \((\S+)\)$')
50
51
52# NOTE: while saxutils.quoteattr() theoretically does the same thing; it
53# seems to often end up being too smart for it's own good not escaping properly.
54# This function is much more reliable.
55def _escape_xml_attr(content):
56  """Escapes xml attributes."""
57  # Note: saxutils doesn't escape the quotes.
58  return saxutils.escape(content, _escape_xml_attr_conversions)
59
60
61def _escape_cdata(s):
62  """Escapes a string to be used as XML CDATA.
63
64  CDATA characters are treated strictly as character data, not as XML markup,
65  but there are still certain restrictions on them.
66
67  Args:
68    s: the string to be escaped.
69  Returns:
70    An escaped version of the input string.
71  """
72  for char, escaped in _control_character_conversions.items():
73    s = s.replace(char, escaped)
74  return s.replace(']]>', ']] >')
75
76
77def _iso8601_timestamp(timestamp):
78  """Produces an ISO8601 datetime.
79
80  Args:
81    timestamp: an Epoch based timestamp in seconds.
82
83  Returns:
84    A iso8601 format timestamp if the input is a valid timestamp, None otherwise
85  """
86  if timestamp is None or timestamp < 0:
87    return None
88  return datetime.datetime.fromtimestamp(
89      timestamp, tz=datetime.timezone.utc).isoformat()
90
91
92def _print_xml_element_header(element, attributes, stream, indentation=''):
93  """Prints an XML header of an arbitrary element.
94
95  Args:
96    element: element name (testsuites, testsuite, testcase)
97    attributes: 2-tuple list with (attributes, values) already escaped
98    stream: output stream to write test report XML to
99    indentation: indentation added to the element header
100  """
101  stream.write('%s<%s' % (indentation, element))
102  for attribute in attributes:
103    if (len(attribute) == 2 and attribute[0] is not None and
104        attribute[1] is not None):
105      stream.write(' %s="%s"' % (attribute[0], attribute[1]))
106  stream.write('>\n')
107
108# Copy time.time which ensures the real time is used internally.
109# This prevents bad interactions with tests that stub out time.
110_time_copy = time.time
111
112if hasattr(traceback, '_some_str'):
113  # Use the traceback module str function to format safely.
114  _safe_str = traceback._some_str
115else:
116  _safe_str = str  # pylint: disable=invalid-name
117
118
119class _TestCaseResult(object):
120  """Private helper for _TextAndXMLTestResult that represents a test result.
121
122  Attributes:
123    test: A TestCase instance of an individual test method.
124    name: The name of the individual test method.
125    full_class_name: The full name of the test class.
126    run_time: The duration (in seconds) it took to run the test.
127    start_time: Epoch relative timestamp of when test started (in seconds)
128    errors: A list of error 4-tuples. Error tuple entries are
129        1) a string identifier of either "failure" or "error"
130        2) an exception_type
131        3) an exception_message
132        4) a string version of a sys.exc_info()-style tuple of values
133           ('error', err[0], err[1], self._exc_info_to_string(err))
134           If the length of errors is 0, then the test is either passed or
135           skipped.
136    skip_reason: A string explaining why the test was skipped.
137  """
138
139  def __init__(self, test):
140    self.run_time = -1
141    self.start_time = -1
142    self.skip_reason = None
143    self.errors = []
144    self.test = test
145
146    # Parse the test id to get its test name and full class path.
147    # Unfortunately there is no better way of knowning the test and class.
148    # Worse, unittest uses _ErrorHandler instances to represent class / module
149    # level failures.
150    test_desc = test.id() or str(test)
151    # Check if it's something like "setUpClass (__main__.TestCase)".
152    match = _CLASS_OR_MODULE_LEVEL_TEST_DESC_REGEX.match(test_desc)
153    if match:
154      name = match.group(1)
155      full_class_name = match.group(2)
156    else:
157      class_name = unittest.util.strclass(test.__class__)
158      if isinstance(test, unittest.case._SubTest):
159        # If the test case is a _SubTest, the real TestCase instance is
160        # available as _SubTest.test_case.
161        class_name = unittest.util.strclass(test.test_case.__class__)
162      if test_desc.startswith(class_name + '.'):
163        # In a typical unittest.TestCase scenario, test.id() returns with
164        # a class name formatted using unittest.util.strclass.
165        name = test_desc[len(class_name)+1:]
166        full_class_name = class_name
167      else:
168        # Otherwise make a best effort to guess the test name and full class
169        # path.
170        parts = test_desc.rsplit('.', 1)
171        name = parts[-1]
172        full_class_name = parts[0] if len(parts) == 2 else ''
173    self.name = _escape_xml_attr(name)
174    self.full_class_name = _escape_xml_attr(full_class_name)
175
176  def set_run_time(self, time_in_secs):
177    self.run_time = time_in_secs
178
179  def set_start_time(self, time_in_secs):
180    self.start_time = time_in_secs
181
182  def print_xml_summary(self, stream):
183    """Prints an XML Summary of a TestCase.
184
185    Status and result are populated as per JUnit XML test result reporter.
186    A test that has been skipped will always have a skip reason,
187    as every skip method in Python's unittest requires the reason arg to be
188    passed.
189
190    Args:
191      stream: output stream to write test report XML to
192    """
193
194    if self.skip_reason is None:
195      status = 'run'
196      result = 'completed'
197    else:
198      status = 'notrun'
199      result = 'suppressed'
200
201    test_case_attributes = [
202        ('name', '%s' % self.name),
203        ('status', '%s' % status),
204        ('result', '%s' % result),
205        ('time', '%.3f' % self.run_time),
206        ('classname', self.full_class_name),
207        ('timestamp', _iso8601_timestamp(self.start_time)),
208    ]
209    _print_xml_element_header('testcase', test_case_attributes, stream, '  ')
210    self._print_testcase_details(stream)
211    stream.write('  </testcase>\n')
212
213  def _print_testcase_details(self, stream):
214    for error in self.errors:
215      outcome, exception_type, message, error_msg = error  # pylint: disable=unpacking-non-sequence
216      message = _escape_xml_attr(_safe_str(message))
217      exception_type = _escape_xml_attr(str(exception_type))
218      error_msg = _escape_cdata(error_msg)
219      stream.write('  <%s message="%s" type="%s"><![CDATA[%s]]></%s>\n'
220                   % (outcome, message, exception_type, error_msg, outcome))
221
222
223class _TestSuiteResult(object):
224  """Private helper for _TextAndXMLTestResult."""
225
226  def __init__(self):
227    self.suites = {}
228    self.failure_counts = {}
229    self.error_counts = {}
230    self.overall_start_time = -1
231    self.overall_end_time = -1
232    self._testsuites_properties = {}
233
234  def add_test_case_result(self, test_case_result):
235    suite_name = type(test_case_result.test).__name__
236    if suite_name == '_ErrorHolder':
237      # _ErrorHolder is a special case created by unittest for class / module
238      # level functions.
239      suite_name = test_case_result.full_class_name.rsplit('.')[-1]
240    if isinstance(test_case_result.test, unittest.case._SubTest):
241      # If the test case is a _SubTest, the real TestCase instance is
242      # available as _SubTest.test_case.
243      suite_name = type(test_case_result.test.test_case).__name__
244
245    self._setup_test_suite(suite_name)
246    self.suites[suite_name].append(test_case_result)
247    for error in test_case_result.errors:
248      # Only count the first failure or error so that the sum is equal to the
249      # total number of *testcases* that have failures or errors.
250      if error[0] == 'failure':
251        self.failure_counts[suite_name] += 1
252        break
253      elif error[0] == 'error':
254        self.error_counts[suite_name] += 1
255        break
256
257  def print_xml_summary(self, stream):
258    overall_test_count = sum(len(x) for x in self.suites.values())
259    overall_failures = sum(self.failure_counts.values())
260    overall_errors = sum(self.error_counts.values())
261    overall_attributes = [
262        ('name', ''),
263        ('tests', '%d' % overall_test_count),
264        ('failures', '%d' % overall_failures),
265        ('errors', '%d' % overall_errors),
266        ('time', '%.3f' % (self.overall_end_time - self.overall_start_time)),
267        ('timestamp', _iso8601_timestamp(self.overall_start_time)),
268    ]
269    _print_xml_element_header('testsuites', overall_attributes, stream)
270    if self._testsuites_properties:
271      stream.write('    <properties>\n')
272      for name, value in sorted(self._testsuites_properties.items()):
273        stream.write('      <property name="%s" value="%s"></property>\n' %
274                     (_escape_xml_attr(name), _escape_xml_attr(str(value))))
275      stream.write('    </properties>\n')
276
277    for suite_name in self.suites:
278      suite = self.suites[suite_name]
279      suite_end_time = max(x.start_time + x.run_time for x in suite)
280      suite_start_time = min(x.start_time for x in suite)
281      failures = self.failure_counts[suite_name]
282      errors = self.error_counts[suite_name]
283      suite_attributes = [
284          ('name', '%s' % suite_name),
285          ('tests', '%d' % len(suite)),
286          ('failures', '%d' % failures),
287          ('errors', '%d' % errors),
288          ('time', '%.3f' % (suite_end_time - suite_start_time)),
289          ('timestamp', _iso8601_timestamp(suite_start_time)),
290      ]
291      _print_xml_element_header('testsuite', suite_attributes, stream)
292
293      for test_case_result in suite:
294        test_case_result.print_xml_summary(stream)
295      stream.write('</testsuite>\n')
296    stream.write('</testsuites>\n')
297
298  def _setup_test_suite(self, suite_name):
299    """Adds a test suite to the set of suites tracked by this test run.
300
301    Args:
302      suite_name: string, The name of the test suite being initialized.
303    """
304    if suite_name in self.suites:
305      return
306    self.suites[suite_name] = []
307    self.failure_counts[suite_name] = 0
308    self.error_counts[suite_name] = 0
309
310  def set_end_time(self, timestamp_in_secs):
311    """Sets the start timestamp of this test suite.
312
313    Args:
314      timestamp_in_secs: timestamp in seconds since epoch
315    """
316    self.overall_end_time = timestamp_in_secs
317
318  def set_start_time(self, timestamp_in_secs):
319    """Sets the end timestamp of this test suite.
320
321    Args:
322      timestamp_in_secs: timestamp in seconds since epoch
323    """
324    self.overall_start_time = timestamp_in_secs
325
326
327class _TextAndXMLTestResult(_pretty_print_reporter.TextTestResult):
328  """Private TestResult class that produces both formatted text results and XML.
329
330  Used by TextAndXMLTestRunner.
331  """
332
333  _TEST_SUITE_RESULT_CLASS = _TestSuiteResult
334  _TEST_CASE_RESULT_CLASS = _TestCaseResult
335
336  def __init__(self, xml_stream, stream, descriptions, verbosity,
337               time_getter=_time_copy, testsuites_properties=None):
338    super(_TextAndXMLTestResult, self).__init__(stream, descriptions, verbosity)
339    self.xml_stream = xml_stream
340    self.pending_test_case_results = {}
341    self.suite = self._TEST_SUITE_RESULT_CLASS()
342    if testsuites_properties:
343      self.suite._testsuites_properties = testsuites_properties
344    self.time_getter = time_getter
345
346    # This lock guards any mutations on pending_test_case_results.
347    self._pending_test_case_results_lock = threading.RLock()
348
349  def startTest(self, test):
350    self.start_time = self.time_getter()
351    super(_TextAndXMLTestResult, self).startTest(test)
352
353  def stopTest(self, test):
354    # Grabbing the write lock to avoid conflicting with stopTestRun.
355    with self._pending_test_case_results_lock:
356      super(_TextAndXMLTestResult, self).stopTest(test)
357      result = self.get_pending_test_case_result(test)
358      if not result:
359        test_name = test.id() or str(test)
360        sys.stderr.write('No pending test case: %s\n' % test_name)
361        return
362      test_id = id(test)
363      run_time = self.time_getter() - self.start_time
364      result.set_run_time(run_time)
365      result.set_start_time(self.start_time)
366      self.suite.add_test_case_result(result)
367      del self.pending_test_case_results[test_id]
368
369  def startTestRun(self):
370    self.suite.set_start_time(self.time_getter())
371    super(_TextAndXMLTestResult, self).startTestRun()
372
373  def stopTestRun(self):
374    self.suite.set_end_time(self.time_getter())
375    # All pending_test_case_results will be added to the suite and removed from
376    # the pending_test_case_results dictionary. Grabbing the write lock to avoid
377    # results from being added during this process to avoid duplicating adds or
378    # accidentally erasing newly appended pending results.
379    with self._pending_test_case_results_lock:
380      # Errors in the test fixture (setUpModule, tearDownModule,
381      # setUpClass, tearDownClass) can leave a pending result which
382      # never gets added to the suite.  The runner calls stopTestRun
383      # which gives us an opportunity to add these errors for
384      # reporting here.
385      for test_id in self.pending_test_case_results:
386        result = self.pending_test_case_results[test_id]
387        if hasattr(self, 'start_time'):
388          run_time = self.suite.overall_end_time - self.start_time
389          result.set_run_time(run_time)
390          result.set_start_time(self.start_time)
391        self.suite.add_test_case_result(result)
392      self.pending_test_case_results.clear()
393
394  def _exc_info_to_string(self, err, test=None):
395    """Converts a sys.exc_info()-style tuple of values into a string.
396
397    This method must be overridden because the method signature in
398    unittest.TestResult changed between Python 2.2 and 2.4.
399
400    Args:
401      err: A sys.exc_info() tuple of values for an error.
402      test: The test method.
403
404    Returns:
405      A formatted exception string.
406    """
407    if test:
408      return super(_TextAndXMLTestResult, self)._exc_info_to_string(err, test)
409    return ''.join(traceback.format_exception(*err))
410
411  def add_pending_test_case_result(self, test, error_summary=None,
412                                   skip_reason=None):
413    """Adds result information to a test case result which may still be running.
414
415    If a result entry for the test already exists, add_pending_test_case_result
416    will add error summary tuples and/or overwrite skip_reason for the result.
417    If it does not yet exist, a result entry will be created.
418    Note that a test result is considered to have been run and passed
419    only if there are no errors or skip_reason.
420
421    Args:
422      test: A test method as defined by unittest
423      error_summary: A 4-tuple with the following entries:
424          1) a string identifier of either "failure" or "error"
425          2) an exception_type
426          3) an exception_message
427          4) a string version of a sys.exc_info()-style tuple of values
428             ('error', err[0], err[1], self._exc_info_to_string(err))
429             If the length of errors is 0, then the test is either passed or
430             skipped.
431      skip_reason: a string explaining why the test was skipped
432    """
433    with self._pending_test_case_results_lock:
434      test_id = id(test)
435      if test_id not in self.pending_test_case_results:
436        self.pending_test_case_results[test_id] = self._TEST_CASE_RESULT_CLASS(
437            test)
438      if error_summary:
439        self.pending_test_case_results[test_id].errors.append(error_summary)
440      if skip_reason:
441        self.pending_test_case_results[test_id].skip_reason = skip_reason
442
443  def delete_pending_test_case_result(self, test):
444    with self._pending_test_case_results_lock:
445      test_id = id(test)
446      del self.pending_test_case_results[test_id]
447
448  def get_pending_test_case_result(self, test):
449    test_id = id(test)
450    return self.pending_test_case_results.get(test_id, None)
451
452  def addSuccess(self, test):
453    super(_TextAndXMLTestResult, self).addSuccess(test)
454    self.add_pending_test_case_result(test)
455
456  def addError(self, test, err):
457    super(_TextAndXMLTestResult, self).addError(test, err)
458    error_summary = ('error', err[0], err[1],
459                     self._exc_info_to_string(err, test=test))
460    self.add_pending_test_case_result(test, error_summary=error_summary)
461
462  def addFailure(self, test, err):
463    super(_TextAndXMLTestResult, self).addFailure(test, err)
464    error_summary = ('failure', err[0], err[1],
465                     self._exc_info_to_string(err, test=test))
466    self.add_pending_test_case_result(test, error_summary=error_summary)
467
468  def addSkip(self, test, reason):
469    super(_TextAndXMLTestResult, self).addSkip(test, reason)
470    self.add_pending_test_case_result(test, skip_reason=reason)
471
472  def addExpectedFailure(self, test, err):
473    super(_TextAndXMLTestResult, self).addExpectedFailure(test, err)
474    if callable(getattr(test, 'recordProperty', None)):
475      test.recordProperty('EXPECTED_FAILURE',
476                          self._exc_info_to_string(err, test=test))
477    self.add_pending_test_case_result(test)
478
479  def addUnexpectedSuccess(self, test):
480    super(_TextAndXMLTestResult, self).addUnexpectedSuccess(test)
481    test_name = test.id() or str(test)
482    error_summary = ('error', '', '',
483                     'Test case %s should have failed, but passed.'
484                     % (test_name))
485    self.add_pending_test_case_result(test, error_summary=error_summary)
486
487  def addSubTest(self, test, subtest, err):  # pylint: disable=invalid-name
488    super(_TextAndXMLTestResult, self).addSubTest(test, subtest, err)
489    if err is not None:
490      if issubclass(err[0], test.failureException):
491        error_summary = ('failure', err[0], err[1],
492                         self._exc_info_to_string(err, test=test))
493      else:
494        error_summary = ('error', err[0], err[1],
495                         self._exc_info_to_string(err, test=test))
496    else:
497      error_summary = None
498    self.add_pending_test_case_result(subtest, error_summary=error_summary)
499
500  def printErrors(self):
501    super(_TextAndXMLTestResult, self).printErrors()
502    self.xml_stream.write('<?xml version="1.0"?>\n')
503    self.suite.print_xml_summary(self.xml_stream)
504
505
506class TextAndXMLTestRunner(unittest.TextTestRunner):
507  """A test runner that produces both formatted text results and XML.
508
509  It prints out the names of tests as they are run, errors as they
510  occur, and a summary of the results at the end of the test run.
511  """
512
513  _TEST_RESULT_CLASS = _TextAndXMLTestResult
514
515  _xml_stream = None
516  _testsuites_properties = {}
517
518  def __init__(self, xml_stream=None, *args, **kwargs):
519    """Initialize a TextAndXMLTestRunner.
520
521    Args:
522      xml_stream: file-like or None; XML-formatted test results are output
523          via this object's write() method.  If None (the default), the
524          new instance behaves as described in the set_default_xml_stream method
525          documentation below.
526      *args: passed unmodified to unittest.TextTestRunner.__init__.
527      **kwargs: passed unmodified to unittest.TextTestRunner.__init__.
528    """
529    super(TextAndXMLTestRunner, self).__init__(*args, **kwargs)
530    if xml_stream is not None:
531      self._xml_stream = xml_stream
532    # else, do not set self._xml_stream to None -- this allows implicit fallback
533    # to the class attribute's value.
534
535  @classmethod
536  def set_default_xml_stream(cls, xml_stream):
537    """Sets the default XML stream for the class.
538
539    Args:
540      xml_stream: file-like or None; used for instances when xml_stream is None
541          or not passed to their constructors.  If None is passed, instances
542          created with xml_stream=None will act as ordinary TextTestRunner
543          instances; this is the default state before any calls to this method
544          have been made.
545    """
546    cls._xml_stream = xml_stream
547
548  def _makeResult(self):
549    if self._xml_stream is None:
550      return super(TextAndXMLTestRunner, self)._makeResult()
551    else:
552      return self._TEST_RESULT_CLASS(
553          self._xml_stream, self.stream, self.descriptions, self.verbosity,
554          testsuites_properties=self._testsuites_properties)
555
556  @classmethod
557  def set_testsuites_property(cls, key, value):
558    cls._testsuites_properties[key] = value
559