1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from __future__ import absolute_import 16 17import collections 18import os 19import select 20import signal 21import sys 22import tempfile 23import threading 24import time 25import unittest 26import uuid 27 28import six 29from six import moves 30 31from tests import _loader 32from tests import _result 33 34 35class CaptureFile(object): 36 """A context-managed file to redirect output to a byte array. 37 38 Use by invoking `start` (`__enter__`) and at some point invoking `stop` 39 (`__exit__`). At any point after the initial call to `start` call `output` to 40 get the current redirected output. Note that we don't currently use file 41 locking, so calling `output` between calls to `start` and `stop` may muddle 42 the result (you should only be doing this during a Python-handled interrupt as 43 a last ditch effort to provide output to the user). 44 45 Attributes: 46 _redirected_fd (int): File descriptor of file to redirect writes from. 47 _saved_fd (int): A copy of the original value of the redirected file 48 descriptor. 49 _into_file (TemporaryFile or None): File to which writes are redirected. 50 Only non-None when self is started. 51 """ 52 53 def __init__(self, fd): 54 self._redirected_fd = fd 55 self._saved_fd = os.dup(self._redirected_fd) 56 self._into_file = None 57 58 def output(self): 59 """Get all output from the redirected-to file if it exists.""" 60 if self._into_file: 61 self._into_file.seek(0) 62 return bytes(self._into_file.read()) 63 else: 64 return bytes() 65 66 def start(self): 67 """Start redirection of writes to the file descriptor.""" 68 self._into_file = tempfile.TemporaryFile() 69 os.dup2(self._into_file.fileno(), self._redirected_fd) 70 71 def stop(self): 72 """Stop redirection of writes to the file descriptor.""" 73 # n.b. this dup2 call auto-closes self._redirected_fd 74 os.dup2(self._saved_fd, self._redirected_fd) 75 76 def write_bypass(self, value): 77 """Bypass the redirection and write directly to the original file. 78 79 Arguments: 80 value (str): What to write to the original file. 81 """ 82 if six.PY3 and not isinstance(value, six.binary_type): 83 value = value.encode('ascii') 84 if self._saved_fd is None: 85 os.write(self._redirect_fd, value) 86 else: 87 os.write(self._saved_fd, value) 88 89 def __enter__(self): 90 self.start() 91 return self 92 93 def __exit__(self, type, value, traceback): 94 self.stop() 95 96 def close(self): 97 """Close any resources used by self not closed by stop().""" 98 os.close(self._saved_fd) 99 100 101class AugmentedCase(collections.namedtuple('AugmentedCase', ['case', 'id'])): 102 """A test case with a guaranteed unique externally specified identifier. 103 104 Attributes: 105 case (unittest.TestCase): TestCase we're decorating with an additional 106 identifier. 107 id (object): Any identifier that may be considered 'unique' for testing 108 purposes. 109 """ 110 111 def __new__(cls, case, id=None): 112 if id is None: 113 id = uuid.uuid4() 114 return super(cls, AugmentedCase).__new__(cls, case, id) 115 116 117# NOTE(lidiz) This complex wrapper is not triggering setUpClass nor 118# tearDownClass. Do not use those methods, or fix this wrapper! 119class Runner(object): 120 121 def __init__(self, dedicated_threads=False): 122 """Constructs the Runner object. 123 124 Args: 125 dedicated_threads: A bool indicates whether to spawn each unit test 126 in separate thread or not. 127 """ 128 self._skipped_tests = [] 129 self._dedicated_threads = dedicated_threads 130 131 def skip_tests(self, tests): 132 self._skipped_tests = tests 133 134 def run(self, suite): 135 """See setuptools' test_runner setup argument for information.""" 136 # only run test cases with id starting with given prefix 137 testcase_filter = os.getenv('GRPC_PYTHON_TESTRUNNER_FILTER') 138 filtered_cases = [] 139 for case in _loader.iterate_suite_cases(suite): 140 if not testcase_filter or case.id().startswith(testcase_filter): 141 filtered_cases.append(case) 142 143 # Ensure that every test case has no collision with any other test case in 144 # the augmented results. 145 augmented_cases = [ 146 AugmentedCase(case, uuid.uuid4()) for case in filtered_cases 147 ] 148 case_id_by_case = dict((augmented_case.case, augmented_case.id) 149 for augmented_case in augmented_cases) 150 result_out = moves.cStringIO() 151 result = _result.TerminalResult( 152 result_out, id_map=lambda case: case_id_by_case[case]) 153 stdout_pipe = CaptureFile(sys.stdout.fileno()) 154 stderr_pipe = CaptureFile(sys.stderr.fileno()) 155 kill_flag = [False] 156 157 def sigint_handler(signal_number, frame): 158 if signal_number == signal.SIGINT: 159 kill_flag[0] = True # Python 2.7 not having 'local'... :-( 160 signal.signal(signal_number, signal.SIG_DFL) 161 162 def fault_handler(signal_number, frame): 163 stdout_pipe.write_bypass( 164 'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n'.format( 165 signal_number, stdout_pipe.output(), stderr_pipe.output())) 166 os._exit(1) 167 168 def check_kill_self(): 169 if kill_flag[0]: 170 stdout_pipe.write_bypass('Stopping tests short...') 171 result.stopTestRun() 172 stdout_pipe.write_bypass(result_out.getvalue()) 173 stdout_pipe.write_bypass('\ninterrupted stdout:\n{}\n'.format( 174 stdout_pipe.output().decode())) 175 stderr_pipe.write_bypass('\ninterrupted stderr:\n{}\n'.format( 176 stderr_pipe.output().decode())) 177 os._exit(1) 178 179 def try_set_handler(name, handler): 180 try: 181 signal.signal(getattr(signal, name), handler) 182 except AttributeError: 183 pass 184 185 try_set_handler('SIGINT', sigint_handler) 186 try_set_handler('SIGBUS', fault_handler) 187 try_set_handler('SIGABRT', fault_handler) 188 try_set_handler('SIGFPE', fault_handler) 189 try_set_handler('SIGILL', fault_handler) 190 # Sometimes output will lag after a test has successfully finished; we 191 # ignore such writes to our pipes. 192 try_set_handler('SIGPIPE', signal.SIG_IGN) 193 194 # Run the tests 195 result.startTestRun() 196 for augmented_case in augmented_cases: 197 for skipped_test in self._skipped_tests: 198 if skipped_test in augmented_case.case.id(): 199 break 200 else: 201 sys.stdout.write('Running {}\n'.format( 202 augmented_case.case.id())) 203 sys.stdout.flush() 204 if self._dedicated_threads: 205 # (Deprecated) Spawns dedicated thread for each test case. 206 case_thread = threading.Thread( 207 target=augmented_case.case.run, args=(result,)) 208 try: 209 with stdout_pipe, stderr_pipe: 210 case_thread.start() 211 # If the thread is exited unexpected, stop testing. 212 while case_thread.is_alive(): 213 check_kill_self() 214 time.sleep(0) 215 case_thread.join() 216 except: # pylint: disable=try-except-raise 217 # re-raise the exception after forcing the with-block to end 218 raise 219 # Records the result of the test case run. 220 result.set_output(augmented_case.case, stdout_pipe.output(), 221 stderr_pipe.output()) 222 sys.stdout.write(result_out.getvalue()) 223 sys.stdout.flush() 224 result_out.truncate(0) 225 check_kill_self() 226 else: 227 # Donates current thread to test case execution. 228 augmented_case.case.run(result) 229 result.stopTestRun() 230 stdout_pipe.close() 231 stderr_pipe.close() 232 233 # Report results 234 sys.stdout.write(result_out.getvalue()) 235 sys.stdout.flush() 236 signal.signal(signal.SIGINT, signal.SIG_DFL) 237 with open('report.xml', 'wb') as report_xml_file: 238 _result.jenkins_junit_xml(result).write(report_xml_file) 239 return result 240