• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'''Test runner and result class for the regression test suite.
2
3'''
4
5import functools
6import io
7import sys
8import time
9import traceback
10import unittest
11
12class RegressionTestResult(unittest.TextTestResult):
13    USE_XML = False
14
15    def __init__(self, stream, descriptions, verbosity):
16        super().__init__(stream=stream, descriptions=descriptions,
17                         verbosity=2 if verbosity else 0)
18        self.buffer = True
19        if self.USE_XML:
20            from xml.etree import ElementTree as ET
21            from datetime import datetime
22            self.__ET = ET
23            self.__suite = ET.Element('testsuite')
24            self.__suite.set('start', datetime.utcnow().isoformat(' '))
25            self.__e = None
26        self.__start_time = None
27
28    @classmethod
29    def __getId(cls, test):
30        try:
31            test_id = test.id
32        except AttributeError:
33            return str(test)
34        try:
35            return test_id()
36        except TypeError:
37            return str(test_id)
38        return repr(test)
39
40    def startTest(self, test):
41        super().startTest(test)
42        if self.USE_XML:
43            self.__e = e = self.__ET.SubElement(self.__suite, 'testcase')
44        self.__start_time = time.perf_counter()
45
46    def _add_result(self, test, capture=False, **args):
47        if not self.USE_XML:
48            return
49        e = self.__e
50        self.__e = None
51        if e is None:
52            return
53        ET = self.__ET
54
55        e.set('name', args.pop('name', self.__getId(test)))
56        e.set('status', args.pop('status', 'run'))
57        e.set('result', args.pop('result', 'completed'))
58        if self.__start_time:
59            e.set('time', f'{time.perf_counter() - self.__start_time:0.6f}')
60
61        if capture:
62            if self._stdout_buffer is not None:
63                stdout = self._stdout_buffer.getvalue().rstrip()
64                ET.SubElement(e, 'system-out').text = stdout
65            if self._stderr_buffer is not None:
66                stderr = self._stderr_buffer.getvalue().rstrip()
67                ET.SubElement(e, 'system-err').text = stderr
68
69        for k, v in args.items():
70            if not k or not v:
71                continue
72            e2 = ET.SubElement(e, k)
73            if hasattr(v, 'items'):
74                for k2, v2 in v.items():
75                    if k2:
76                        e2.set(k2, str(v2))
77                    else:
78                        e2.text = str(v2)
79            else:
80                e2.text = str(v)
81
82    @classmethod
83    def __makeErrorDict(cls, err_type, err_value, err_tb):
84        if isinstance(err_type, type):
85            if err_type.__module__ == 'builtins':
86                typename = err_type.__name__
87            else:
88                typename = f'{err_type.__module__}.{err_type.__name__}'
89        else:
90            typename = repr(err_type)
91
92        msg = traceback.format_exception(err_type, err_value, None)
93        tb = traceback.format_exception(err_type, err_value, err_tb)
94
95        return {
96            'type': typename,
97            'message': ''.join(msg),
98            '': ''.join(tb),
99        }
100
101    def addError(self, test, err):
102        self._add_result(test, True, error=self.__makeErrorDict(*err))
103        super().addError(test, err)
104
105    def addExpectedFailure(self, test, err):
106        self._add_result(test, True, output=self.__makeErrorDict(*err))
107        super().addExpectedFailure(test, err)
108
109    def addFailure(self, test, err):
110        self._add_result(test, True, failure=self.__makeErrorDict(*err))
111        super().addFailure(test, err)
112
113    def addSkip(self, test, reason):
114        self._add_result(test, skipped=reason)
115        super().addSkip(test, reason)
116
117    def addSuccess(self, test):
118        self._add_result(test)
119        super().addSuccess(test)
120
121    def addUnexpectedSuccess(self, test):
122        self._add_result(test, outcome='UNEXPECTED_SUCCESS')
123        super().addUnexpectedSuccess(test)
124
125    def get_xml_element(self):
126        if not self.USE_XML:
127            raise ValueError("USE_XML is false")
128        e = self.__suite
129        e.set('tests', str(self.testsRun))
130        e.set('errors', str(len(self.errors)))
131        e.set('failures', str(len(self.failures)))
132        return e
133
134class QuietRegressionTestRunner:
135    def __init__(self, stream, buffer=False):
136        self.result = RegressionTestResult(stream, None, 0)
137        self.result.buffer = buffer
138
139    def run(self, test):
140        test(self.result)
141        return self.result
142
143def get_test_runner_class(verbosity, buffer=False):
144    if verbosity:
145        return functools.partial(unittest.TextTestRunner,
146                                 resultclass=RegressionTestResult,
147                                 buffer=buffer,
148                                 verbosity=verbosity)
149    return functools.partial(QuietRegressionTestRunner, buffer=buffer)
150
151def get_test_runner(stream, verbosity, capture_output=False):
152    return get_test_runner_class(verbosity, capture_output)(stream)
153
154if __name__ == '__main__':
155    import xml.etree.ElementTree as ET
156    RegressionTestResult.USE_XML = True
157
158    class TestTests(unittest.TestCase):
159        def test_pass(self):
160            pass
161
162        def test_pass_slow(self):
163            time.sleep(1.0)
164
165        def test_fail(self):
166            print('stdout', file=sys.stdout)
167            print('stderr', file=sys.stderr)
168            self.fail('failure message')
169
170        def test_error(self):
171            print('stdout', file=sys.stdout)
172            print('stderr', file=sys.stderr)
173            raise RuntimeError('error message')
174
175    suite = unittest.TestSuite()
176    suite.addTest(unittest.makeSuite(TestTests))
177    stream = io.StringIO()
178    runner_cls = get_test_runner_class(sum(a == '-v' for a in sys.argv))
179    runner = runner_cls(sys.stdout)
180    result = runner.run(suite)
181    print('Output:', stream.getvalue())
182    print('XML: ', end='')
183    for s in ET.tostringlist(result.get_xml_element()):
184        print(s.decode(), end='')
185    print()
186