• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test result object"""
2
3import io
4import sys
5import traceback
6
7from . import util
8from functools import wraps
9
10__unittest = True
11
12def failfast(method):
13    @wraps(method)
14    def inner(self, *args, **kw):
15        if getattr(self, 'failfast', False):
16            self.stop()
17        return method(self, *args, **kw)
18    return inner
19
20STDOUT_LINE = '\nStdout:\n%s'
21STDERR_LINE = '\nStderr:\n%s'
22
23
24class TestResult(object):
25    """Holder for test result information.
26
27    Test results are automatically managed by the TestCase and TestSuite
28    classes, and do not need to be explicitly manipulated by writers of tests.
29
30    Each instance holds the total number of tests run, and collections of
31    failures and errors that occurred among those test runs. The collections
32    contain tuples of (testcase, exceptioninfo), where exceptioninfo is the
33    formatted traceback of the error that occurred.
34    """
35    _previousTestClass = None
36    _testRunEntered = False
37    _moduleSetUpFailed = False
38    def __init__(self, stream=None, descriptions=None, verbosity=None):
39        self.failfast = False
40        self.failures = []
41        self.errors = []
42        self.testsRun = 0
43        self.skipped = []
44        self.expectedFailures = []
45        self.unexpectedSuccesses = []
46        self.collectedDurations = []
47        self.shouldStop = False
48        self.buffer = False
49        self.tb_locals = False
50        self._stdout_buffer = None
51        self._stderr_buffer = None
52        self._original_stdout = sys.stdout
53        self._original_stderr = sys.stderr
54        self._mirrorOutput = False
55
56    def printErrors(self):
57        "Called by TestRunner after test run"
58
59    def startTest(self, test):
60        "Called when the given test is about to be run"
61        self.testsRun += 1
62        self._mirrorOutput = False
63        self._setupStdout()
64
65    def _setupStdout(self):
66        if self.buffer:
67            if self._stderr_buffer is None:
68                self._stderr_buffer = io.StringIO()
69                self._stdout_buffer = io.StringIO()
70            sys.stdout = self._stdout_buffer
71            sys.stderr = self._stderr_buffer
72
73    def startTestRun(self):
74        """Called once before any tests are executed.
75
76        See startTest for a method called before each test.
77        """
78
79    def stopTest(self, test):
80        """Called when the given test has been run"""
81        self._restoreStdout()
82        self._mirrorOutput = False
83
84    def _restoreStdout(self):
85        if self.buffer:
86            if self._mirrorOutput:
87                output = sys.stdout.getvalue()
88                error = sys.stderr.getvalue()
89                if output:
90                    if not output.endswith('\n'):
91                        output += '\n'
92                    self._original_stdout.write(STDOUT_LINE % output)
93                if error:
94                    if not error.endswith('\n'):
95                        error += '\n'
96                    self._original_stderr.write(STDERR_LINE % error)
97
98            sys.stdout = self._original_stdout
99            sys.stderr = self._original_stderr
100            self._stdout_buffer.seek(0)
101            self._stdout_buffer.truncate()
102            self._stderr_buffer.seek(0)
103            self._stderr_buffer.truncate()
104
105    def stopTestRun(self):
106        """Called once after all tests are executed.
107
108        See stopTest for a method called after each test.
109        """
110
111    @failfast
112    def addError(self, test, err):
113        """Called when an error has occurred. 'err' is a tuple of values as
114        returned by sys.exc_info().
115        """
116        self.errors.append((test, self._exc_info_to_string(err, test)))
117        self._mirrorOutput = True
118
119    @failfast
120    def addFailure(self, test, err):
121        """Called when an error has occurred. 'err' is a tuple of values as
122        returned by sys.exc_info()."""
123        self.failures.append((test, self._exc_info_to_string(err, test)))
124        self._mirrorOutput = True
125
126    def addSubTest(self, test, subtest, err):
127        """Called at the end of a subtest.
128        'err' is None if the subtest ended successfully, otherwise it's a
129        tuple of values as returned by sys.exc_info().
130        """
131        # By default, we don't do anything with successful subtests, but
132        # more sophisticated test results might want to record them.
133        if err is not None:
134            if getattr(self, 'failfast', False):
135                self.stop()
136            if issubclass(err[0], test.failureException):
137                errors = self.failures
138            else:
139                errors = self.errors
140            errors.append((subtest, self._exc_info_to_string(err, test)))
141            self._mirrorOutput = True
142
143    def addSuccess(self, test):
144        "Called when a test has completed successfully"
145        pass
146
147    def addSkip(self, test, reason):
148        """Called when a test is skipped."""
149        self.skipped.append((test, reason))
150
151    def addExpectedFailure(self, test, err):
152        """Called when an expected failure/error occurred."""
153        self.expectedFailures.append(
154            (test, self._exc_info_to_string(err, test)))
155
156    @failfast
157    def addUnexpectedSuccess(self, test):
158        """Called when a test was expected to fail, but succeed."""
159        self.unexpectedSuccesses.append(test)
160
161    def addDuration(self, test, elapsed):
162        """Called when a test finished to run, regardless of its outcome.
163        *test* is the test case corresponding to the test method.
164        *elapsed* is the time represented in seconds, and it includes the
165        execution of cleanup functions.
166        """
167        # support for a TextTestRunner using an old TestResult class
168        if hasattr(self, "collectedDurations"):
169            # Pass test repr and not the test object itself to avoid resources leak
170            self.collectedDurations.append((str(test), elapsed))
171
172    def wasSuccessful(self):
173        """Tells whether or not this result was a success."""
174        # The hasattr check is for test_result's OldResult test.  That
175        # way this method works on objects that lack the attribute.
176        # (where would such result instances come from? old stored pickles?)
177        return ((len(self.failures) == len(self.errors) == 0) and
178                (not hasattr(self, 'unexpectedSuccesses') or
179                 len(self.unexpectedSuccesses) == 0))
180
181    def stop(self):
182        """Indicates that the tests should be aborted."""
183        self.shouldStop = True
184
185    def _exc_info_to_string(self, err, test):
186        """Converts a sys.exc_info()-style tuple of values into a string."""
187        exctype, value, tb = err
188        tb = self._clean_tracebacks(exctype, value, tb, test)
189        tb_e = traceback.TracebackException(
190            exctype, value, tb,
191            capture_locals=self.tb_locals, compact=True)
192        msgLines = list(tb_e.format())
193
194        if self.buffer:
195            output = sys.stdout.getvalue()
196            error = sys.stderr.getvalue()
197            if output:
198                if not output.endswith('\n'):
199                    output += '\n'
200                msgLines.append(STDOUT_LINE % output)
201            if error:
202                if not error.endswith('\n'):
203                    error += '\n'
204                msgLines.append(STDERR_LINE % error)
205        return ''.join(msgLines)
206
207    def _clean_tracebacks(self, exctype, value, tb, test):
208        ret = None
209        first = True
210        excs = [(exctype, value, tb)]
211        seen = {id(value)}  # Detect loops in chained exceptions.
212        while excs:
213            (exctype, value, tb) = excs.pop()
214            # Skip test runner traceback levels
215            while tb and self._is_relevant_tb_level(tb):
216                tb = tb.tb_next
217
218            # Skip assert*() traceback levels
219            if exctype is test.failureException:
220                self._remove_unittest_tb_frames(tb)
221
222            if first:
223                ret = tb
224                first = False
225            else:
226                value.__traceback__ = tb
227
228            if value is not None:
229                for c in (value.__cause__, value.__context__):
230                    if c is not None and id(c) not in seen:
231                        excs.append((type(c), c, c.__traceback__))
232                        seen.add(id(c))
233        return ret
234
235    def _is_relevant_tb_level(self, tb):
236        return '__unittest' in tb.tb_frame.f_globals
237
238    def _remove_unittest_tb_frames(self, tb):
239        '''Truncates usercode tb at the first unittest frame.
240
241        If the first frame of the traceback is in user code,
242        the prefix up to the first unittest frame is returned.
243        If the first frame is already in the unittest module,
244        the traceback is not modified.
245        '''
246        prev = None
247        while tb and not self._is_relevant_tb_level(tb):
248            prev = tb
249            tb = tb.tb_next
250        if prev is not None:
251            prev.tb_next = None
252
253    def __repr__(self):
254        return ("<%s run=%i errors=%i failures=%i>" %
255               (util.strclass(self.__class__), self.testsRun, len(self.errors),
256                len(self.failures)))
257