1'''Test runner and result class for the regression test suite. 2 3''' 4 5import functools 6import io 7import sys 8import time 9import traceback 10import unittest 11 12import xml.etree.ElementTree as ET 13 14from datetime import datetime 15 16class RegressionTestResult(unittest.TextTestResult): 17 separator1 = '=' * 70 + '\n' 18 separator2 = '-' * 70 + '\n' 19 20 def __init__(self, stream, descriptions, verbosity): 21 super().__init__(stream=stream, descriptions=descriptions, verbosity=0) 22 self.buffer = True 23 self.__suite = ET.Element('testsuite') 24 self.__suite.set('start', datetime.utcnow().isoformat(' ')) 25 26 self.__e = None 27 self.__start_time = None 28 self.__results = [] 29 self.__verbose = bool(verbosity) 30 31 @classmethod 32 def __getId(cls, test): 33 try: 34 test_id = test.id 35 except AttributeError: 36 return str(test) 37 try: 38 return test_id() 39 except TypeError: 40 return str(test_id) 41 return repr(test) 42 43 def startTest(self, test): 44 super().startTest(test) 45 self.__e = e = ET.SubElement(self.__suite, 'testcase') 46 self.__start_time = time.perf_counter() 47 if self.__verbose: 48 self.stream.write(f'{self.getDescription(test)} ... ') 49 self.stream.flush() 50 51 def _add_result(self, test, capture=False, **args): 52 e = self.__e 53 self.__e = None 54 if e is None: 55 return 56 e.set('name', args.pop('name', self.__getId(test))) 57 e.set('status', args.pop('status', 'run')) 58 e.set('result', args.pop('result', 'completed')) 59 if self.__start_time: 60 e.set('time', f'{time.perf_counter() - self.__start_time:0.6f}') 61 62 if capture: 63 if self._stdout_buffer is not None: 64 stdout = self._stdout_buffer.getvalue().rstrip() 65 ET.SubElement(e, 'system-out').text = stdout 66 if self._stderr_buffer is not None: 67 stderr = self._stderr_buffer.getvalue().rstrip() 68 ET.SubElement(e, 'system-err').text = stderr 69 70 for k, v in args.items(): 71 if not k or not v: 72 continue 73 e2 = ET.SubElement(e, k) 74 if hasattr(v, 'items'): 75 for k2, v2 in v.items(): 76 if k2: 77 e2.set(k2, str(v2)) 78 else: 79 e2.text = str(v2) 80 else: 81 e2.text = str(v) 82 83 def __write(self, c, word): 84 if self.__verbose: 85 self.stream.write(f'{word}\n') 86 87 @classmethod 88 def __makeErrorDict(cls, err_type, err_value, err_tb): 89 if isinstance(err_type, type): 90 if err_type.__module__ == 'builtins': 91 typename = err_type.__name__ 92 else: 93 typename = f'{err_type.__module__}.{err_type.__name__}' 94 else: 95 typename = repr(err_type) 96 97 msg = traceback.format_exception(err_type, err_value, None) 98 tb = traceback.format_exception(err_type, err_value, err_tb) 99 100 return { 101 'type': typename, 102 'message': ''.join(msg), 103 '': ''.join(tb), 104 } 105 106 def addError(self, test, err): 107 self._add_result(test, True, error=self.__makeErrorDict(*err)) 108 super().addError(test, err) 109 self.__write('E', 'ERROR') 110 111 def addExpectedFailure(self, test, err): 112 self._add_result(test, True, output=self.__makeErrorDict(*err)) 113 super().addExpectedFailure(test, err) 114 self.__write('x', 'expected failure') 115 116 def addFailure(self, test, err): 117 self._add_result(test, True, failure=self.__makeErrorDict(*err)) 118 super().addFailure(test, err) 119 self.__write('F', 'FAIL') 120 121 def addSkip(self, test, reason): 122 self._add_result(test, skipped=reason) 123 super().addSkip(test, reason) 124 self.__write('S', f'skipped {reason!r}') 125 126 def addSuccess(self, test): 127 self._add_result(test) 128 super().addSuccess(test) 129 self.__write('.', 'ok') 130 131 def addUnexpectedSuccess(self, test): 132 self._add_result(test, outcome='UNEXPECTED_SUCCESS') 133 super().addUnexpectedSuccess(test) 134 self.__write('u', 'unexpected success') 135 136 def printErrors(self): 137 if self.__verbose: 138 self.stream.write('\n') 139 self.printErrorList('ERROR', self.errors) 140 self.printErrorList('FAIL', self.failures) 141 142 def printErrorList(self, flavor, errors): 143 for test, err in errors: 144 self.stream.write(self.separator1) 145 self.stream.write(f'{flavor}: {self.getDescription(test)}\n') 146 self.stream.write(self.separator2) 147 self.stream.write('%s\n' % err) 148 149 def get_xml_element(self): 150 e = self.__suite 151 e.set('tests', str(self.testsRun)) 152 e.set('errors', str(len(self.errors))) 153 e.set('failures', str(len(self.failures))) 154 return e 155 156class QuietRegressionTestRunner: 157 def __init__(self, stream, buffer=False): 158 self.result = RegressionTestResult(stream, None, 0) 159 self.result.buffer = buffer 160 161 def run(self, test): 162 test(self.result) 163 return self.result 164 165def get_test_runner_class(verbosity, buffer=False): 166 if verbosity: 167 return functools.partial(unittest.TextTestRunner, 168 resultclass=RegressionTestResult, 169 buffer=buffer, 170 verbosity=verbosity) 171 return functools.partial(QuietRegressionTestRunner, buffer=buffer) 172 173def get_test_runner(stream, verbosity, capture_output=False): 174 return get_test_runner_class(verbosity, capture_output)(stream) 175 176if __name__ == '__main__': 177 class TestTests(unittest.TestCase): 178 def test_pass(self): 179 pass 180 181 def test_pass_slow(self): 182 time.sleep(1.0) 183 184 def test_fail(self): 185 print('stdout', file=sys.stdout) 186 print('stderr', file=sys.stderr) 187 self.fail('failure message') 188 189 def test_error(self): 190 print('stdout', file=sys.stdout) 191 print('stderr', file=sys.stderr) 192 raise RuntimeError('error message') 193 194 suite = unittest.TestSuite() 195 suite.addTest(unittest.makeSuite(TestTests)) 196 stream = io.StringIO() 197 runner_cls = get_test_runner_class(sum(a == '-v' for a in sys.argv)) 198 runner = runner_cls(sys.stdout) 199 result = runner.run(suite) 200 print('Output:', stream.getvalue()) 201 print('XML: ', end='') 202 for s in ET.tostringlist(result.get_xml_element()): 203 print(s.decode(), end='') 204 print() 205