• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# coding=utf-8
3
4# Copyright (c) 2015 Remko Tronçon (https://el-tramo.be)
5# Copied from https://github.com/remko/pycotap/
6#
7# Released under the MIT license
8#
9# Permission is hereby granted, free of charge, to any person obtaining a copy
10# of this software and associated documentation files (the "Software"), to deal
11# in the Software without restriction, including without limitation the rights
12# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13# copies of the Software, and to permit persons to whom the Software is
14# furnished to do so, subject to the following conditions:
15#
16# The above copyright notice and this permission notice shall be included in
17# all copies or substantial portions of the Software.
18#
19# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25# SOFTWARE.
26
27
28import unittest
29import sys
30import base64
31from io import StringIO
32
33
34# Log modes
35class LogMode(object):
36    LogToError, LogToDiagnostics, LogToYAML, LogToAttachment = range(4)
37
38
39class TAPTestResult(unittest.TestResult):
40    def __init__(self, output_stream, error_stream, message_log, test_output_log):
41        super(TAPTestResult, self).__init__(self, output_stream)
42        self.output_stream = output_stream
43        self.error_stream = error_stream
44        self.orig_stdout = None
45        self.orig_stderr = None
46        self.message = None
47        self.test_output = None
48        self.message_log = message_log
49        self.test_output_log = test_output_log
50        self.output_stream.write("TAP version 13\n")
51        self._set_streams()
52
53    def printErrors(self):
54        self.print_raw("1..%d\n" % self.testsRun)
55        self._reset_streams()
56
57    def _set_streams(self):
58        self.orig_stdout = sys.stdout
59        self.orig_stderr = sys.stderr
60        if self.message_log == LogMode.LogToError:
61            self.message = self.error_stream
62        else:
63            self.message = StringIO()
64        if self.test_output_log == LogMode.LogToError:
65            self.test_output = self.error_stream
66        else:
67            self.test_output = StringIO()
68
69        if self.message_log == self.test_output_log:
70            self.test_output = self.message
71        sys.stdout = sys.stderr = self.test_output
72
73    def _reset_streams(self):
74        sys.stdout = self.orig_stdout
75        sys.stderr = self.orig_stderr
76
77    def print_raw(self, text):
78        self.output_stream.write(text)
79        self.output_stream.flush()
80
81    def print_result(self, result, test, directive=None):
82        self.output_stream.write("%s %d %s" % (result, self.testsRun, test.id()))
83        if directive:
84            self.output_stream.write(" # " + directive)
85        self.output_stream.write("\n")
86        self.output_stream.flush()
87
88    def ok(self, test, directive=None):
89        self.print_result("ok", test, directive)
90
91    def not_ok(self, test):
92        self.print_result("not ok", test)
93
94    def startTest(self, test):
95        super(TAPTestResult, self).startTest(test)
96
97    def stopTest(self, test):
98        super(TAPTestResult, self).stopTest(test)
99        if self.message_log == self.test_output_log:
100            logs = [(self.message_log, self.message, "output")]
101        else:
102            logs = [
103                (self.test_output_log, self.test_output, "test_output"),
104                (self.message_log, self.message, "message"),
105            ]
106        for log_mode, log, log_name in logs:
107            if log_mode != LogMode.LogToError:
108                output = log.getvalue()
109                if len(output):
110                    if log_mode == LogMode.LogToYAML:
111                        self.print_raw("  ---\n")
112                        self.print_raw("    " + log_name + ": |\n")
113                        self.print_raw(
114                            "      " + output.rstrip().replace("\n", "\n      ") + "\n"
115                        )
116                        self.print_raw("  ...\n")
117                    elif log_mode == LogMode.LogToAttachment:
118                        self.print_raw("  ---\n")
119                        self.print_raw("    " + log_name + ":\n")
120                        self.print_raw("      File-Name: " + log_name + ".txt\n")
121                        self.print_raw("      File-Type: text/plain\n")
122                        self.print_raw(
123                            "      File-Content: " + base64.b64encode(output) + "\n"
124                        )
125                        self.print_raw("  ...\n")
126                    else:
127                        self.print_raw(
128                            "# " + output.rstrip().replace("\n", "\n# ") + "\n"
129                        )
130                # Truncate doesn't change the current stream position.
131                # Seek to the beginning to avoid extensions on subsequent writes.
132                log.seek(0)
133                log.truncate(0)
134
135    def addSuccess(self, test):
136        super(TAPTestResult, self).addSuccess(test)
137        self.ok(test)
138
139    def addError(self, test, err):
140        super(TAPTestResult, self).addError(test, err)
141        self.message.write(self.errors[-1][1] + "\n")
142        self.not_ok(test)
143
144    def addFailure(self, test, err):
145        super(TAPTestResult, self).addFailure(test, err)
146        self.message.write(self.failures[-1][1] + "\n")
147        self.not_ok(test)
148
149    def addSkip(self, test, reason):
150        super(TAPTestResult, self).addSkip(test, reason)
151        self.ok(test, "SKIP " + reason)
152
153    def addExpectedFailure(self, test, err):
154        super(TAPTestResult, self).addExpectedFailure(test, err)
155        self.ok(test)
156
157    def addUnexpectedSuccess(self, test):
158        super(TAPTestResult, self).addUnexpectedSuccess(test)
159        self.message.write("Unexpected success" + "\n")
160        self.not_ok(test)
161
162
163class TAPTestRunner(object):
164    def __init__(
165        self,
166        message_log=LogMode.LogToYAML,
167        test_output_log=LogMode.LogToDiagnostics,
168        output_stream=sys.stdout,
169        error_stream=sys.stderr,
170    ):
171        self.output_stream = output_stream
172        self.error_stream = error_stream
173        self.message_log = message_log
174        self.test_output_log = test_output_log
175
176    def run(self, test):
177        result = TAPTestResult(
178            self.output_stream,
179            self.error_stream,
180            self.message_log,
181            self.test_output_log,
182        )
183        test(result)
184        result.printErrors()
185
186        return result
187