1import os 2import sys 3import threading 4import time 5import traceback 6try: 7 import Queue as queue 8except ImportError: 9 import queue 10 11try: 12 import win32api 13except ImportError: 14 win32api = None 15 16import multiprocessing 17import lit.Test 18 19def abort_now(): 20 """Abort the current process without doing any exception teardown""" 21 sys.stdout.flush() 22 if win32api: 23 win32api.TerminateProcess(win32api.GetCurrentProcess(), 3) 24 else: 25 os.kill(0, 9) 26 27class _Display(object): 28 def __init__(self, display, provider, maxFailures): 29 self.display = display 30 self.provider = provider 31 self.maxFailures = maxFailures or object() 32 self.failedCount = 0 33 def update(self, test): 34 self.display.update(test) 35 self.failedCount += (test.result.code == lit.Test.FAIL) 36 if self.failedCount == self.maxFailures: 37 self.provider.cancel() 38 39class Run(object): 40 """ 41 This class represents a concrete, configured testing run. 42 """ 43 44 def __init__(self, lit_config, tests): 45 self.lit_config = lit_config 46 self.tests = tests 47 # Set up semaphores to limit parallelism of certain classes of tests. 48 # For example, some ASan tests require lots of virtual memory and run 49 # faster with less parallelism on OS X. 50 self.parallelism_semaphores = \ 51 {k: multiprocessing.Semaphore(v) for k, v in 52 self.lit_config.parallelism_groups.items()} 53 54 def execute_test(self, test): 55 return _execute_test_impl(test, self.lit_config, 56 self.parallelism_semaphores) 57 58 def execute_tests_in_pool(self, jobs, max_time): 59 # We need to issue many wait calls, so compute the final deadline and 60 # subtract time.time() from that as we go along. 61 deadline = None 62 if max_time: 63 deadline = time.time() + max_time 64 65 # Start a process pool. Copy over the data shared between all test runs. 66 # FIXME: Find a way to capture the worker process stderr. If the user 67 # interrupts the workers before we make it into our task callback, they 68 # will each raise a KeyboardInterrupt exception and print to stderr at 69 # the same time. 70 pool = multiprocessing.Pool(jobs, worker_initializer, 71 (self.lit_config, 72 self.parallelism_semaphores)) 73 74 # Install a console-control signal handler on Windows. 75 if win32api is not None: 76 def console_ctrl_handler(type): 77 print('\nCtrl-C detected, terminating.') 78 pool.terminate() 79 pool.join() 80 abort_now() 81 return True 82 win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) 83 84 try: 85 async_results = [pool.apply_async(worker_run_one_test, 86 args=(test_index, test), 87 callback=self.consume_test_result) 88 for test_index, test in enumerate(self.tests)] 89 pool.close() 90 91 # Wait for all results to come in. The callback that runs in the 92 # parent process will update the display. 93 for a in async_results: 94 if deadline: 95 a.wait(deadline - time.time()) 96 else: 97 # Python condition variables cannot be interrupted unless 98 # they have a timeout. This can make lit unresponsive to 99 # KeyboardInterrupt, so do a busy wait with a timeout. 100 while not a.ready(): 101 a.wait(1) 102 if not a.successful(): 103 a.get() # Exceptions raised here come from the worker. 104 if self.hit_max_failures: 105 break 106 except: 107 # Stop the workers and wait for any straggling results to come in 108 # if we exited without waiting on every async result. 109 pool.terminate() 110 raise 111 finally: 112 pool.join() 113 114 def execute_tests(self, display, jobs, max_time=None): 115 """ 116 execute_tests(display, jobs, [max_time]) 117 118 Execute each of the tests in the run, using up to jobs number of 119 parallel tasks, and inform the display of each individual result. The 120 provided tests should be a subset of the tests available in this run 121 object. 122 123 If max_time is non-None, it should be a time in seconds after which to 124 stop executing tests. 125 126 The display object will have its update method called with each test as 127 it is completed. The calls are guaranteed to be locked with respect to 128 one another, but are *not* guaranteed to be called on the same thread as 129 this method was invoked on. 130 131 Upon completion, each test in the run will have its result 132 computed. Tests which were not actually executed (for any reason) will 133 be given an UNRESOLVED result. 134 """ 135 # Don't do anything if we aren't going to run any tests. 136 if not self.tests or jobs == 0: 137 return 138 139 # Save the display object on the runner so that we can update it from 140 # our task completion callback. 141 self.display = display 142 143 self.failure_count = 0 144 self.hit_max_failures = False 145 if self.lit_config.singleProcess: 146 global child_lit_config 147 child_lit_config = self.lit_config 148 for test_index, test in enumerate(self.tests): 149 result = worker_run_one_test(test_index, test) 150 self.consume_test_result(result) 151 else: 152 self.execute_tests_in_pool(jobs, max_time) 153 154 # Mark any tests that weren't run as UNRESOLVED. 155 for test in self.tests: 156 if test.result is None: 157 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) 158 159 def consume_test_result(self, pool_result): 160 """Test completion callback for worker_run_one_test 161 162 Updates the test result status in the parent process. Each task in the 163 pool returns the test index and the result, and we use the index to look 164 up the original test object. Also updates the progress bar as tasks 165 complete. 166 """ 167 # Don't add any more test results after we've hit the maximum failure 168 # count. Otherwise we're racing with the main thread, which is going 169 # to terminate the process pool soon. 170 if self.hit_max_failures: 171 return 172 173 (test_index, test_with_result) = pool_result 174 # Update the parent process copy of the test. This includes the result, 175 # XFAILS, REQUIRES, and UNSUPPORTED statuses. 176 assert self.tests[test_index].file_path == test_with_result.file_path, \ 177 "parent and child disagree on test path" 178 self.tests[test_index] = test_with_result 179 self.display.update(test_with_result) 180 181 # If we've finished all the tests or too many tests have failed, notify 182 # the main thread that we've stopped testing. 183 self.failure_count += (test_with_result.result.code == lit.Test.FAIL) 184 if self.lit_config.maxFailures and \ 185 self.failure_count == self.lit_config.maxFailures: 186 self.hit_max_failures = True 187 188def _execute_test_impl(test, lit_config, parallelism_semaphores): 189 """Execute one test""" 190 pg = test.config.parallelism_group 191 if callable(pg): 192 pg = pg(test) 193 194 result = None 195 semaphore = None 196 try: 197 if pg: 198 semaphore = parallelism_semaphores[pg] 199 if semaphore: 200 semaphore.acquire() 201 start_time = time.time() 202 result = test.config.test_format.execute(test, lit_config) 203 # Support deprecated result from execute() which returned the result 204 # code and additional output as a tuple. 205 if isinstance(result, tuple): 206 code, output = result 207 result = lit.Test.Result(code, output) 208 elif not isinstance(result, lit.Test.Result): 209 raise ValueError("unexpected result from test execution") 210 result.elapsed = time.time() - start_time 211 except KeyboardInterrupt: 212 raise 213 except: 214 if lit_config.debug: 215 raise 216 output = 'Exception during script execution:\n' 217 output += traceback.format_exc() 218 output += '\n' 219 result = lit.Test.Result(lit.Test.UNRESOLVED, output) 220 finally: 221 if semaphore: 222 semaphore.release() 223 224 test.setResult(result) 225 226child_lit_config = None 227child_parallelism_semaphores = None 228 229def worker_initializer(lit_config, parallelism_semaphores): 230 """Copy expensive repeated data into worker processes""" 231 global child_lit_config 232 child_lit_config = lit_config 233 global child_parallelism_semaphores 234 child_parallelism_semaphores = parallelism_semaphores 235 236def worker_run_one_test(test_index, test): 237 """Run one test in a multiprocessing.Pool 238 239 Side effects in this function and functions it calls are not visible in the 240 main lit process. 241 242 Arguments and results of this function are pickled, so they should be cheap 243 to copy. For efficiency, we copy all data needed to execute all tests into 244 each worker and store it in the child_* global variables. This reduces the 245 cost of each task. 246 247 Returns an index and a Result, which the parent process uses to update 248 the display. 249 """ 250 try: 251 _execute_test_impl(test, child_lit_config, child_parallelism_semaphores) 252 return (test_index, test) 253 except KeyboardInterrupt as e: 254 # If a worker process gets an interrupt, abort it immediately. 255 abort_now() 256 except: 257 traceback.print_exc() 258