1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright (c) 2015 Remko Tronçon (https://el-tramo.be) 5# Copied from https://github.com/remko/pycotap/ 6# 7# Released under the MIT license 8# 9# Permission is hereby granted, free of charge, to any person obtaining a copy 10# of this software and associated documentation files (the "Software"), to deal 11# in the Software without restriction, including without limitation the rights 12# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 13# copies of the Software, and to permit persons to whom the Software is 14# furnished to do so, subject to the following conditions: 15# 16# The above copyright notice and this permission notice shall be included in 17# all copies or substantial portions of the Software. 18# 19# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 20# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 21# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 22# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 23# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 24# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 25# SOFTWARE. 26 27 28import unittest 29import sys 30import base64 31from io import StringIO 32 33 34# Log modes 35class LogMode(object): 36 LogToError, LogToDiagnostics, LogToYAML, LogToAttachment = range(4) 37 38 39class TAPTestResult(unittest.TestResult): 40 def __init__(self, output_stream, error_stream, message_log, test_output_log): 41 super(TAPTestResult, self).__init__(self, output_stream) 42 self.output_stream = output_stream 43 self.error_stream = error_stream 44 self.orig_stdout = None 45 self.orig_stderr = None 46 self.message = None 47 self.test_output = None 48 self.message_log = message_log 49 self.test_output_log = test_output_log 50 self.output_stream.write("TAP version 13\n") 51 self._set_streams() 52 53 def printErrors(self): 54 self.print_raw("1..%d\n" % self.testsRun) 55 self._reset_streams() 56 57 def _set_streams(self): 58 self.orig_stdout = sys.stdout 59 self.orig_stderr = sys.stderr 60 if self.message_log == LogMode.LogToError: 61 self.message = self.error_stream 62 else: 63 self.message = StringIO() 64 if self.test_output_log == LogMode.LogToError: 65 self.test_output = self.error_stream 66 else: 67 self.test_output = StringIO() 68 69 if self.message_log == self.test_output_log: 70 self.test_output = self.message 71 sys.stdout = sys.stderr = self.test_output 72 73 def _reset_streams(self): 74 sys.stdout = self.orig_stdout 75 sys.stderr = self.orig_stderr 76 77 def print_raw(self, text): 78 self.output_stream.write(text) 79 self.output_stream.flush() 80 81 def print_result(self, result, test, directive=None): 82 self.output_stream.write("%s %d %s" % (result, self.testsRun, test.id())) 83 if directive: 84 self.output_stream.write(" # " + directive) 85 self.output_stream.write("\n") 86 self.output_stream.flush() 87 88 def ok(self, test, directive=None): 89 self.print_result("ok", test, directive) 90 91 def not_ok(self, test): 92 self.print_result("not ok", test) 93 94 def startTest(self, test): 95 super(TAPTestResult, self).startTest(test) 96 97 def stopTest(self, test): 98 super(TAPTestResult, self).stopTest(test) 99 if self.message_log == self.test_output_log: 100 logs = [(self.message_log, self.message, "output")] 101 else: 102 logs = [ 103 (self.test_output_log, self.test_output, "test_output"), 104 (self.message_log, self.message, "message"), 105 ] 106 for log_mode, log, log_name in logs: 107 if log_mode != LogMode.LogToError: 108 output = log.getvalue() 109 if len(output): 110 if log_mode == LogMode.LogToYAML: 111 self.print_raw(" ---\n") 112 self.print_raw(" " + log_name + ": |\n") 113 self.print_raw( 114 " " + output.rstrip().replace("\n", "\n ") + "\n" 115 ) 116 self.print_raw(" ...\n") 117 elif log_mode == LogMode.LogToAttachment: 118 self.print_raw(" ---\n") 119 self.print_raw(" " + log_name + ":\n") 120 self.print_raw(" File-Name: " + log_name + ".txt\n") 121 self.print_raw(" File-Type: text/plain\n") 122 self.print_raw( 123 " File-Content: " + base64.b64encode(output) + "\n" 124 ) 125 self.print_raw(" ...\n") 126 else: 127 self.print_raw( 128 "# " + output.rstrip().replace("\n", "\n# ") + "\n" 129 ) 130 # Truncate doesn't change the current stream position. 131 # Seek to the beginning to avoid extensions on subsequent writes. 132 log.seek(0) 133 log.truncate(0) 134 135 def addSuccess(self, test): 136 super(TAPTestResult, self).addSuccess(test) 137 self.ok(test) 138 139 def addError(self, test, err): 140 super(TAPTestResult, self).addError(test, err) 141 self.message.write(self.errors[-1][1] + "\n") 142 self.not_ok(test) 143 144 def addFailure(self, test, err): 145 super(TAPTestResult, self).addFailure(test, err) 146 self.message.write(self.failures[-1][1] + "\n") 147 self.not_ok(test) 148 149 def addSkip(self, test, reason): 150 super(TAPTestResult, self).addSkip(test, reason) 151 self.ok(test, "SKIP " + reason) 152 153 def addExpectedFailure(self, test, err): 154 super(TAPTestResult, self).addExpectedFailure(test, err) 155 self.ok(test) 156 157 def addUnexpectedSuccess(self, test): 158 super(TAPTestResult, self).addUnexpectedSuccess(test) 159 self.message.write("Unexpected success" + "\n") 160 self.not_ok(test) 161 162 163class TAPTestRunner(object): 164 def __init__( 165 self, 166 message_log=LogMode.LogToYAML, 167 test_output_log=LogMode.LogToDiagnostics, 168 output_stream=sys.stdout, 169 error_stream=sys.stderr, 170 ): 171 self.output_stream = output_stream 172 self.error_stream = error_stream 173 self.message_log = message_log 174 self.test_output_log = test_output_log 175 176 def run(self, test): 177 result = TAPTestResult( 178 self.output_stream, 179 self.error_stream, 180 self.message_log, 181 self.test_output_log, 182 ) 183 test(result) 184 result.printErrors() 185 186 return result 187