1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright (c) 2015 Remko Tronçon (https://el-tramo.be) 5# Copied from https://github.com/remko/pycotap/ 6# 7# Released under the MIT license 8# 9# Permission is hereby granted, free of charge, to any person obtaining a copy 10# of this software and associated documentation files (the "Software"), to deal 11# in the Software without restriction, including without limitation the rights 12# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 13# copies of the Software, and to permit persons to whom the Software is 14# furnished to do so, subject to the following conditions: 15# 16# The above copyright notice and this permission notice shall be included in 17# all copies or substantial portions of the Software. 18# 19# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 20# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 21# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 22# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 23# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 24# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 25# SOFTWARE. 26 27 28import unittest 29import sys 30import base64 31from io import StringIO 32 33# Log modes 34class LogMode(object) : 35 LogToError, LogToDiagnostics, LogToYAML, LogToAttachment = range(4) 36 37 38class TAPTestResult(unittest.TestResult): 39 def __init__(self, output_stream, error_stream, message_log, test_output_log): 40 super(TAPTestResult, self).__init__(self, output_stream) 41 self.output_stream = output_stream 42 self.error_stream = error_stream 43 self.orig_stdout = None 44 self.orig_stderr = None 45 self.message = None 46 self.test_output = None 47 self.message_log = message_log 48 self.test_output_log = test_output_log 49 self.output_stream.write("TAP version 13\n") 50 self._set_streams() 51 52 def printErrors(self): 53 self.print_raw("1..%d\n" % self.testsRun) 54 self._reset_streams() 55 56 def _set_streams(self): 57 self.orig_stdout = sys.stdout 58 self.orig_stderr = sys.stderr 59 if self.message_log == LogMode.LogToError: 60 self.message = self.error_stream 61 else: 62 self.message = StringIO() 63 if self.test_output_log == LogMode.LogToError: 64 self.test_output = self.error_stream 65 else: 66 self.test_output = StringIO() 67 68 if self.message_log == self.test_output_log: 69 self.test_output = self.message 70 sys.stdout = sys.stderr = self.test_output 71 72 def _reset_streams(self): 73 sys.stdout = self.orig_stdout 74 sys.stderr = self.orig_stderr 75 76 77 def print_raw(self, text): 78 self.output_stream.write(text) 79 self.output_stream.flush() 80 81 def print_result(self, result, test, directive = None): 82 self.output_stream.write("%s %d %s" % (result, self.testsRun, test.id())) 83 if directive: 84 self.output_stream.write(" # " + directive) 85 self.output_stream.write("\n") 86 self.output_stream.flush() 87 88 def ok(self, test, directive = None): 89 self.print_result("ok", test, directive) 90 91 def not_ok(self, test): 92 self.print_result("not ok", test) 93 94 def startTest(self, test): 95 super(TAPTestResult, self).startTest(test) 96 97 def stopTest(self, test): 98 super(TAPTestResult, self).stopTest(test) 99 if self.message_log == self.test_output_log: 100 logs = [(self.message_log, self.message, "output")] 101 else: 102 logs = [ 103 (self.test_output_log, self.test_output, "test_output"), 104 (self.message_log, self.message, "message") 105 ] 106 for log_mode, log, log_name in logs: 107 if log_mode != LogMode.LogToError: 108 output = log.getvalue() 109 if len(output): 110 if log_mode == LogMode.LogToYAML: 111 self.print_raw(" ---\n") 112 self.print_raw(" " + log_name + ": |\n") 113 self.print_raw(" " + output.rstrip().replace("\n", "\n ") + "\n") 114 self.print_raw(" ...\n") 115 elif log_mode == LogMode.LogToAttachment: 116 self.print_raw(" ---\n") 117 self.print_raw(" " + log_name + ":\n") 118 self.print_raw(" File-Name: " + log_name + ".txt\n") 119 self.print_raw(" File-Type: text/plain\n") 120 self.print_raw(" File-Content: " + base64.b64encode(output) + "\n") 121 self.print_raw(" ...\n") 122 else: 123 self.print_raw("# " + output.rstrip().replace("\n", "\n# ") + "\n") 124 # Truncate doesn't change the current stream position. 125 # Seek to the beginning to avoid extensions on subsequent writes. 126 log.seek(0) 127 log.truncate(0) 128 129 def addSuccess(self, test): 130 super(TAPTestResult, self).addSuccess(test) 131 self.ok(test) 132 133 def addError(self, test, err): 134 super(TAPTestResult, self).addError(test, err) 135 self.message.write(self.errors[-1][1] + "\n") 136 self.not_ok(test) 137 138 def addFailure(self, test, err): 139 super(TAPTestResult, self).addFailure(test, err) 140 self.message.write(self.failures[-1][1] + "\n") 141 self.not_ok(test) 142 143 def addSkip(self, test, reason): 144 super(TAPTestResult, self).addSkip(test, reason) 145 self.ok(test, "SKIP " + reason) 146 147 def addExpectedFailure(self, test, err): 148 super(TAPTestResult, self).addExpectedFailure(test, err) 149 self.ok(test) 150 151 def addUnexpectedSuccess(self, test): 152 super(TAPTestResult, self).addUnexpectedSuccess(test) 153 self.message.write("Unexpected success" + "\n") 154 self.not_ok(test) 155 156 157class TAPTestRunner(object): 158 def __init__(self, 159 message_log = LogMode.LogToYAML, 160 test_output_log = LogMode.LogToDiagnostics, 161 output_stream = sys.stdout, error_stream = sys.stderr): 162 self.output_stream = output_stream 163 self.error_stream = error_stream 164 self.message_log = message_log 165 self.test_output_log = test_output_log 166 167 def run(self, test): 168 result = TAPTestResult( 169 self.output_stream, 170 self.error_stream, 171 self.message_log, 172 self.test_output_log) 173 test(result) 174 result.printErrors() 175 176 return result 177