• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'''Test runner and result class for the regression test suite.
2
3'''
4
5import functools
6import io
7import sys
8import time
9import traceback
10import unittest
11
12import xml.etree.ElementTree as ET
13
14from datetime import datetime
15
16class RegressionTestResult(unittest.TextTestResult):
17    separator1 = '=' * 70 + '\n'
18    separator2 = '-' * 70 + '\n'
19
20    def __init__(self, stream, descriptions, verbosity):
21        super().__init__(stream=stream, descriptions=descriptions, verbosity=0)
22        self.buffer = True
23        self.__suite = ET.Element('testsuite')
24        self.__suite.set('start', datetime.utcnow().isoformat(' '))
25
26        self.__e = None
27        self.__start_time = None
28        self.__results = []
29        self.__verbose = bool(verbosity)
30
31    @classmethod
32    def __getId(cls, test):
33        try:
34            test_id = test.id
35        except AttributeError:
36            return str(test)
37        try:
38            return test_id()
39        except TypeError:
40            return str(test_id)
41        return repr(test)
42
43    def startTest(self, test):
44        super().startTest(test)
45        self.__e = e = ET.SubElement(self.__suite, 'testcase')
46        self.__start_time = time.perf_counter()
47        if self.__verbose:
48            self.stream.write(f'{self.getDescription(test)} ... ')
49            self.stream.flush()
50
51    def _add_result(self, test, capture=False, **args):
52        e = self.__e
53        self.__e = None
54        if e is None:
55            return
56        e.set('name', args.pop('name', self.__getId(test)))
57        e.set('status', args.pop('status', 'run'))
58        e.set('result', args.pop('result', 'completed'))
59        if self.__start_time:
60            e.set('time', f'{time.perf_counter() - self.__start_time:0.6f}')
61
62        if capture:
63            if self._stdout_buffer is not None:
64                stdout = self._stdout_buffer.getvalue().rstrip()
65                ET.SubElement(e, 'system-out').text = stdout
66            if self._stderr_buffer is not None:
67                stderr = self._stderr_buffer.getvalue().rstrip()
68                ET.SubElement(e, 'system-err').text = stderr
69
70        for k, v in args.items():
71            if not k or not v:
72                continue
73            e2 = ET.SubElement(e, k)
74            if hasattr(v, 'items'):
75                for k2, v2 in v.items():
76                    if k2:
77                        e2.set(k2, str(v2))
78                    else:
79                        e2.text = str(v2)
80            else:
81                e2.text = str(v)
82
83    def __write(self, c, word):
84        if self.__verbose:
85            self.stream.write(f'{word}\n')
86
87    @classmethod
88    def __makeErrorDict(cls, err_type, err_value, err_tb):
89        if isinstance(err_type, type):
90            if err_type.__module__ == 'builtins':
91                typename = err_type.__name__
92            else:
93                typename = f'{err_type.__module__}.{err_type.__name__}'
94        else:
95            typename = repr(err_type)
96
97        msg = traceback.format_exception(err_type, err_value, None)
98        tb = traceback.format_exception(err_type, err_value, err_tb)
99
100        return {
101            'type': typename,
102            'message': ''.join(msg),
103            '': ''.join(tb),
104        }
105
106    def addError(self, test, err):
107        self._add_result(test, True, error=self.__makeErrorDict(*err))
108        super().addError(test, err)
109        self.__write('E', 'ERROR')
110
111    def addExpectedFailure(self, test, err):
112        self._add_result(test, True, output=self.__makeErrorDict(*err))
113        super().addExpectedFailure(test, err)
114        self.__write('x', 'expected failure')
115
116    def addFailure(self, test, err):
117        self._add_result(test, True, failure=self.__makeErrorDict(*err))
118        super().addFailure(test, err)
119        self.__write('F', 'FAIL')
120
121    def addSkip(self, test, reason):
122        self._add_result(test, skipped=reason)
123        super().addSkip(test, reason)
124        self.__write('S', f'skipped {reason!r}')
125
126    def addSuccess(self, test):
127        self._add_result(test)
128        super().addSuccess(test)
129        self.__write('.', 'ok')
130
131    def addUnexpectedSuccess(self, test):
132        self._add_result(test, outcome='UNEXPECTED_SUCCESS')
133        super().addUnexpectedSuccess(test)
134        self.__write('u', 'unexpected success')
135
136    def printErrors(self):
137        if self.__verbose:
138            self.stream.write('\n')
139        self.printErrorList('ERROR', self.errors)
140        self.printErrorList('FAIL', self.failures)
141
142    def printErrorList(self, flavor, errors):
143        for test, err in errors:
144            self.stream.write(self.separator1)
145            self.stream.write(f'{flavor}: {self.getDescription(test)}\n')
146            self.stream.write(self.separator2)
147            self.stream.write('%s\n' % err)
148
149    def get_xml_element(self):
150        e = self.__suite
151        e.set('tests', str(self.testsRun))
152        e.set('errors', str(len(self.errors)))
153        e.set('failures', str(len(self.failures)))
154        return e
155
156class QuietRegressionTestRunner:
157    def __init__(self, stream, buffer=False):
158        self.result = RegressionTestResult(stream, None, 0)
159        self.result.buffer = buffer
160
161    def run(self, test):
162        test(self.result)
163        return self.result
164
165def get_test_runner_class(verbosity, buffer=False):
166    if verbosity:
167        return functools.partial(unittest.TextTestRunner,
168                                 resultclass=RegressionTestResult,
169                                 buffer=buffer,
170                                 verbosity=verbosity)
171    return functools.partial(QuietRegressionTestRunner, buffer=buffer)
172
173def get_test_runner(stream, verbosity, capture_output=False):
174    return get_test_runner_class(verbosity, capture_output)(stream)
175
176if __name__ == '__main__':
177    class TestTests(unittest.TestCase):
178        def test_pass(self):
179            pass
180
181        def test_pass_slow(self):
182            time.sleep(1.0)
183
184        def test_fail(self):
185            print('stdout', file=sys.stdout)
186            print('stderr', file=sys.stderr)
187            self.fail('failure message')
188
189        def test_error(self):
190            print('stdout', file=sys.stdout)
191            print('stderr', file=sys.stderr)
192            raise RuntimeError('error message')
193
194    suite = unittest.TestSuite()
195    suite.addTest(unittest.makeSuite(TestTests))
196    stream = io.StringIO()
197    runner_cls = get_test_runner_class(sum(a == '-v' for a in sys.argv))
198    runner = runner_cls(sys.stdout)
199    result = runner.run(suite)
200    print('Output:', stream.getvalue())
201    print('XML: ', end='')
202    for s in ET.tostringlist(result.get_xml_element()):
203        print(s.decode(), end='')
204    print()
205