• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import collections
18import datetime
19import io
20import itertools
21import traceback
22import unittest
23from xml.etree import ElementTree
24
25import coverage
26
27from tests import _loader
28
29
30class CaseResult(
31    collections.namedtuple(
32        "CaseResult",
33        ["id", "name", "kind", "stdout", "stderr", "skip_reason", "traceback"],
34    )
35):
36    """A serializable result of a single test case.
37
38    Attributes:
39      id (object): Any serializable object used to denote the identity of this
40        test case.
41      name (str or None): A human-readable name of the test case.
42      kind (CaseResult.Kind): The kind of test result.
43      stdout (object or None): Output on stdout, or None if nothing was captured.
44      stderr (object or None): Output on stderr, or None if nothing was captured.
45      skip_reason (object or None): The reason the test was skipped. Must be
46        something if self.kind is CaseResult.Kind.SKIP, else None.
47      traceback (object or None): The traceback of the test. Must be something if
48        self.kind is CaseResult.Kind.{ERROR, FAILURE, EXPECTED_FAILURE}, else
49        None.
50    """
51
52    class Kind(object):
53        UNTESTED = "untested"
54        RUNNING = "running"
55        ERROR = "error"
56        FAILURE = "failure"
57        SUCCESS = "success"
58        SKIP = "skip"
59        EXPECTED_FAILURE = "expected failure"
60        UNEXPECTED_SUCCESS = "unexpected success"
61
62    def __new__(
63        cls,
64        id=None,
65        name=None,
66        kind=None,
67        stdout=None,
68        stderr=None,
69        skip_reason=None,
70        traceback=None,
71    ):
72        """Helper keyword constructor for the namedtuple.
73
74        See this class' attributes for information on the arguments."""
75        assert id is not None
76        assert name is None or isinstance(name, str)
77        if kind is CaseResult.Kind.UNTESTED:
78            pass
79        elif kind is CaseResult.Kind.RUNNING:
80            pass
81        elif kind is CaseResult.Kind.ERROR:
82            assert traceback is not None
83        elif kind is CaseResult.Kind.FAILURE:
84            assert traceback is not None
85        elif kind is CaseResult.Kind.SUCCESS:
86            pass
87        elif kind is CaseResult.Kind.SKIP:
88            assert skip_reason is not None
89        elif kind is CaseResult.Kind.EXPECTED_FAILURE:
90            assert traceback is not None
91        elif kind is CaseResult.Kind.UNEXPECTED_SUCCESS:
92            pass
93        else:
94            assert False
95        return super(cls, CaseResult).__new__(
96            cls, id, name, kind, stdout, stderr, skip_reason, traceback
97        )
98
99    def updated(
100        self,
101        name=None,
102        kind=None,
103        stdout=None,
104        stderr=None,
105        skip_reason=None,
106        traceback=None,
107    ):
108        """Get a new validated CaseResult with the fields updated.
109
110        See this class' attributes for information on the arguments."""
111        name = self.name if name is None else name
112        kind = self.kind if kind is None else kind
113        stdout = self.stdout if stdout is None else stdout
114        stderr = self.stderr if stderr is None else stderr
115        skip_reason = self.skip_reason if skip_reason is None else skip_reason
116        traceback = self.traceback if traceback is None else traceback
117        return CaseResult(
118            id=self.id,
119            name=name,
120            kind=kind,
121            stdout=stdout,
122            stderr=stderr,
123            skip_reason=skip_reason,
124            traceback=traceback,
125        )
126
127
128class AugmentedResult(unittest.TestResult):
129    """unittest.Result that keeps track of additional information.
130
131    Uses CaseResult objects to store test-case results, providing additional
132    information beyond that of the standard Python unittest library, such as
133    standard output.
134
135    Attributes:
136      id_map (callable): A unary callable mapping unittest.TestCase objects to
137        unique identifiers.
138      cases (dict): A dictionary mapping from the identifiers returned by id_map
139        to CaseResult objects corresponding to those IDs.
140    """
141
142    def __init__(self, id_map):
143        """Initialize the object with an identifier mapping.
144
145        Arguments:
146          id_map (callable): Corresponds to the attribute `id_map`."""
147        super(AugmentedResult, self).__init__()
148        self.id_map = id_map
149        self.cases = None
150
151    def startTestRun(self):
152        """See unittest.TestResult.startTestRun."""
153        super(AugmentedResult, self).startTestRun()
154        self.cases = dict()
155
156    def startTest(self, test):
157        """See unittest.TestResult.startTest."""
158        super(AugmentedResult, self).startTest(test)
159        case_id = self.id_map(test)
160        self.cases[case_id] = CaseResult(
161            id=case_id, name=test.id(), kind=CaseResult.Kind.RUNNING
162        )
163
164    def addError(self, test, err):
165        """See unittest.TestResult.addError."""
166        super(AugmentedResult, self).addError(test, err)
167        case_id = self.id_map(test)
168        self.cases[case_id] = self.cases[case_id].updated(
169            kind=CaseResult.Kind.ERROR, traceback=err
170        )
171
172    def addFailure(self, test, err):
173        """See unittest.TestResult.addFailure."""
174        super(AugmentedResult, self).addFailure(test, err)
175        case_id = self.id_map(test)
176        self.cases[case_id] = self.cases[case_id].updated(
177            kind=CaseResult.Kind.FAILURE, traceback=err
178        )
179
180    def addSuccess(self, test):
181        """See unittest.TestResult.addSuccess."""
182        super(AugmentedResult, self).addSuccess(test)
183        case_id = self.id_map(test)
184        self.cases[case_id] = self.cases[case_id].updated(
185            kind=CaseResult.Kind.SUCCESS
186        )
187
188    def addSkip(self, test, reason):
189        """See unittest.TestResult.addSkip."""
190        super(AugmentedResult, self).addSkip(test, reason)
191        case_id = self.id_map(test)
192        self.cases[case_id] = self.cases[case_id].updated(
193            kind=CaseResult.Kind.SKIP, skip_reason=reason
194        )
195
196    def addExpectedFailure(self, test, err):
197        """See unittest.TestResult.addExpectedFailure."""
198        super(AugmentedResult, self).addExpectedFailure(test, err)
199        case_id = self.id_map(test)
200        self.cases[case_id] = self.cases[case_id].updated(
201            kind=CaseResult.Kind.EXPECTED_FAILURE, traceback=err
202        )
203
204    def addUnexpectedSuccess(self, test):
205        """See unittest.TestResult.addUnexpectedSuccess."""
206        super(AugmentedResult, self).addUnexpectedSuccess(test)
207        case_id = self.id_map(test)
208        self.cases[case_id] = self.cases[case_id].updated(
209            kind=CaseResult.Kind.UNEXPECTED_SUCCESS
210        )
211
212    def set_output(self, test, stdout, stderr):
213        """Set the output attributes for the CaseResult corresponding to a test.
214
215        Args:
216          test (unittest.TestCase): The TestCase to set the outputs of.
217          stdout (str): Output from stdout to assign to self.id_map(test).
218          stderr (str): Output from stderr to assign to self.id_map(test).
219        """
220        case_id = self.id_map(test)
221        self.cases[case_id] = self.cases[case_id].updated(
222            stdout=stdout.decode(), stderr=stderr.decode()
223        )
224
225    def augmented_results(self, filter):
226        """Convenience method to retrieve filtered case results.
227
228        Args:
229          filter (callable): A unary predicate to filter over CaseResult objects.
230        """
231        return (
232            self.cases[case_id]
233            for case_id in self.cases
234            if filter(self.cases[case_id])
235        )
236
237
238class CoverageResult(AugmentedResult):
239    """Extension to AugmentedResult adding coverage.py support per test.\
240
241  Attributes:
242    coverage_context (coverage.Coverage): coverage.py management object.
243  """
244
245    def __init__(self, id_map):
246        """See AugmentedResult.__init__."""
247        super(CoverageResult, self).__init__(id_map=id_map)
248        self.coverage_context = None
249
250    def startTest(self, test):
251        """See unittest.TestResult.startTest.
252
253        Additionally initializes and begins code coverage tracking."""
254        super(CoverageResult, self).startTest(test)
255        self.coverage_context = coverage.Coverage(data_suffix=True)
256        self.coverage_context.start()
257
258    def stopTest(self, test):
259        """See unittest.TestResult.stopTest.
260
261        Additionally stops and deinitializes code coverage tracking."""
262        super(CoverageResult, self).stopTest(test)
263        self.coverage_context.stop()
264        self.coverage_context.save()
265        self.coverage_context = None
266
267
268class _Colors(object):
269    """Namespaced constants for terminal color magic numbers."""
270
271    HEADER = "\033[95m"
272    INFO = "\033[94m"
273    OK = "\033[92m"
274    WARN = "\033[93m"
275    FAIL = "\033[91m"
276    BOLD = "\033[1m"
277    UNDERLINE = "\033[4m"
278    END = "\033[0m"
279
280
281class TerminalResult(CoverageResult):
282    """Extension to CoverageResult adding basic terminal reporting."""
283
284    def __init__(self, out, id_map):
285        """Initialize the result object.
286
287        Args:
288          out (file-like): Output file to which terminal-colored live results will
289            be written.
290          id_map (callable): See AugmentedResult.__init__.
291        """
292        super(TerminalResult, self).__init__(id_map=id_map)
293        self.out = out
294        self.start_time = None
295
296    def startTestRun(self):
297        """See unittest.TestResult.startTestRun."""
298        super(TerminalResult, self).startTestRun()
299        self.out.write(
300            _Colors.HEADER
301            + " [{}]Testing gRPC Python...\n".format(datetime.datetime.now())
302            + _Colors.END
303        )
304
305    def startTest(self, test):
306        """See unittest.TestResult.startTest."""
307        super(TerminalResult, self).startTest(test)
308        self.start_time = datetime.datetime.now()
309        self.out.write(
310            _Colors.INFO
311            + " [{}]START         {}\n".format(self.start_time, test.id())
312            + _Colors.END
313        )
314        self.out.flush()
315
316    def stopTestRun(self):
317        """See unittest.TestResult.stopTestRun."""
318        super(TerminalResult, self).stopTestRun()
319        self.out.write(summary(self))
320        self.out.flush()
321
322    def addError(self, test, err):
323        """See unittest.TestResult.addError."""
324        super(TerminalResult, self).addError(test, err)
325        end_time = datetime.datetime.now()
326        duration = end_time - self.start_time
327        self.out.write(
328            _Colors.FAIL
329            + " [{}]ERROR         {}[Duration: {}]\n".format(
330                datetime.datetime.now(), test.id(), duration
331            )
332            + _Colors.END
333        )
334        self.out.flush()
335
336    def addFailure(self, test, err):
337        """See unittest.TestResult.addFailure."""
338        super(TerminalResult, self).addFailure(test, err)
339        end_time = datetime.datetime.now()
340        duration = end_time - self.start_time
341        self.out.write(
342            _Colors.FAIL
343            + " [{}]FAILURE       {}[Duration: {}]\n".format(
344                datetime.datetime.now(), test.id(), duration
345            )
346            + _Colors.END
347        )
348        self.out.flush()
349
350    def addSuccess(self, test):
351        """See unittest.TestResult.addSuccess."""
352        super(TerminalResult, self).addSuccess(test)
353        end_time = datetime.datetime.now()
354        duration = end_time - self.start_time
355        self.out.write(
356            _Colors.OK
357            + " [{}]SUCCESS       {}[Duration: {}]\n".format(
358                end_time, test.id(), duration
359            )
360            + _Colors.END
361        )
362        self.out.flush()
363
364    def addSkip(self, test, reason):
365        """See unittest.TestResult.addSkip."""
366        super(TerminalResult, self).addSkip(test, reason)
367        self.out.write(
368            _Colors.INFO + "SKIP          {}\n".format(test.id()) + _Colors.END
369        )
370        self.out.flush()
371
372    def addExpectedFailure(self, test, err):
373        """See unittest.TestResult.addExpectedFailure."""
374        super(TerminalResult, self).addExpectedFailure(test, err)
375        self.out.write(
376            _Colors.INFO + "FAILURE_OK    {}\n".format(test.id()) + _Colors.END
377        )
378        self.out.flush()
379
380    def addUnexpectedSuccess(self, test):
381        """See unittest.TestResult.addUnexpectedSuccess."""
382        super(TerminalResult, self).addUnexpectedSuccess(test)
383        self.out.write(
384            _Colors.INFO + "UNEXPECTED_OK {}\n".format(test.id()) + _Colors.END
385        )
386        self.out.flush()
387
388
389def _traceback_string(type, value, trace):
390    """Generate a descriptive string of a Python exception traceback.
391
392    Args:
393      type (class): The type of the exception.
394      value (Exception): The value of the exception.
395      trace (traceback): Traceback of the exception.
396
397    Returns:
398      str: Formatted exception descriptive string.
399    """
400    buffer = io.StringIO()
401    traceback.print_exception(type, value, trace, file=buffer)
402    return buffer.getvalue()
403
404
405def summary(result):
406    """A summary string of a result object.
407
408    Args:
409      result (AugmentedResult): The result object to get the summary of.
410
411    Returns:
412      str: The summary string.
413    """
414    assert isinstance(result, AugmentedResult)
415    untested = list(
416        result.augmented_results(
417            lambda case_result: case_result.kind is CaseResult.Kind.UNTESTED
418        )
419    )
420    running = list(
421        result.augmented_results(
422            lambda case_result: case_result.kind is CaseResult.Kind.RUNNING
423        )
424    )
425    failures = list(
426        result.augmented_results(
427            lambda case_result: case_result.kind is CaseResult.Kind.FAILURE
428        )
429    )
430    errors = list(
431        result.augmented_results(
432            lambda case_result: case_result.kind is CaseResult.Kind.ERROR
433        )
434    )
435    successes = list(
436        result.augmented_results(
437            lambda case_result: case_result.kind is CaseResult.Kind.SUCCESS
438        )
439    )
440    skips = list(
441        result.augmented_results(
442            lambda case_result: case_result.kind is CaseResult.Kind.SKIP
443        )
444    )
445    expected_failures = list(
446        result.augmented_results(
447            lambda case_result: case_result.kind
448            is CaseResult.Kind.EXPECTED_FAILURE
449        )
450    )
451    unexpected_successes = list(
452        result.augmented_results(
453            lambda case_result: case_result.kind
454            is CaseResult.Kind.UNEXPECTED_SUCCESS
455        )
456    )
457    running_names = [case.name for case in running]
458    finished_count = (
459        len(failures)
460        + len(errors)
461        + len(successes)
462        + len(expected_failures)
463        + len(unexpected_successes)
464    )
465    statistics = (
466        "{finished} tests finished:\n"
467        "\t{successful} successful\n"
468        "\t{unsuccessful} unsuccessful\n"
469        "\t{skipped} skipped\n"
470        "\t{expected_fail} expected failures\n"
471        "\t{unexpected_successful} unexpected successes\n"
472        "Interrupted Tests:\n"
473        "\t{interrupted}\n".format(
474            finished=finished_count,
475            successful=len(successes),
476            unsuccessful=(len(failures) + len(errors)),
477            skipped=len(skips),
478            expected_fail=len(expected_failures),
479            unexpected_successful=len(unexpected_successes),
480            interrupted=str(running_names),
481        )
482    )
483    tracebacks = "\n\n".join(
484        [
485            (
486                _Colors.FAIL
487                + "{test_name}"
488                + _Colors.END
489                + "\n"
490                + _Colors.BOLD
491                + "traceback:"
492                + _Colors.END
493                + "\n"
494                + "{traceback}\n"
495                + _Colors.BOLD
496                + "stdout:"
497                + _Colors.END
498                + "\n"
499                + "{stdout}\n"
500                + _Colors.BOLD
501                + "stderr:"
502                + _Colors.END
503                + "\n"
504                + "{stderr}\n"
505            ).format(
506                test_name=result.name,
507                traceback=_traceback_string(*result.traceback),
508                stdout=result.stdout,
509                stderr=result.stderr,
510            )
511            for result in itertools.chain(failures, errors)
512        ]
513    )
514    notes = "Unexpected successes: {}\n".format(
515        [result.name for result in unexpected_successes]
516    )
517    return statistics + "\nErrors/Failures: \n" + tracebacks + "\n" + notes
518
519
520def jenkins_junit_xml(result):
521    """An XML tree object that when written is recognizable by Jenkins.
522
523    Args:
524      result (AugmentedResult): The result object to get the junit xml output of.
525
526    Returns:
527      ElementTree.ElementTree: The XML tree.
528    """
529    assert isinstance(result, AugmentedResult)
530    root = ElementTree.Element("testsuites")
531    suite = ElementTree.SubElement(
532        root,
533        "testsuite",
534        {
535            "name": "Python gRPC tests",
536        },
537    )
538    for case in result.cases.values():
539        if case.kind is CaseResult.Kind.SUCCESS:
540            ElementTree.SubElement(
541                suite,
542                "testcase",
543                {
544                    "name": case.name,
545                },
546            )
547        elif case.kind in (CaseResult.Kind.ERROR, CaseResult.Kind.FAILURE):
548            case_xml = ElementTree.SubElement(
549                suite,
550                "testcase",
551                {
552                    "name": case.name,
553                },
554            )
555            error_xml = ElementTree.SubElement(case_xml, "error", {})
556            error_xml.text = "".format(case.stderr, case.traceback)
557    return ElementTree.ElementTree(element=root)
558