• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import sys
3import threading
4import time
5import traceback
6try:
7    import Queue as queue
8except ImportError:
9    import queue
10
11try:
12    import win32api
13except ImportError:
14    win32api = None
15
16import multiprocessing
17import lit.Test
18
19def abort_now():
20    """Abort the current process without doing any exception teardown"""
21    sys.stdout.flush()
22    if win32api:
23        win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
24    else:
25        os.kill(0, 9)
26
27class _Display(object):
28    def __init__(self, display, provider, maxFailures):
29        self.display = display
30        self.provider = provider
31        self.maxFailures = maxFailures or object()
32        self.failedCount = 0
33    def update(self, test):
34        self.display.update(test)
35        self.failedCount += (test.result.code == lit.Test.FAIL)
36        if self.failedCount == self.maxFailures:
37            self.provider.cancel()
38
39class Run(object):
40    """
41    This class represents a concrete, configured testing run.
42    """
43
44    def __init__(self, lit_config, tests):
45        self.lit_config = lit_config
46        self.tests = tests
47        # Set up semaphores to limit parallelism of certain classes of tests.
48        # For example, some ASan tests require lots of virtual memory and run
49        # faster with less parallelism on OS X.
50        self.parallelism_semaphores = \
51                {k: multiprocessing.Semaphore(v) for k, v in
52                 self.lit_config.parallelism_groups.items()}
53
54    def execute_test(self, test):
55        return _execute_test_impl(test, self.lit_config,
56                                  self.parallelism_semaphores)
57
58    def execute_tests_in_pool(self, jobs, max_time):
59        # We need to issue many wait calls, so compute the final deadline and
60        # subtract time.time() from that as we go along.
61        deadline = None
62        if max_time:
63            deadline = time.time() + max_time
64
65        # Start a process pool. Copy over the data shared between all test runs.
66        # FIXME: Find a way to capture the worker process stderr. If the user
67        # interrupts the workers before we make it into our task callback, they
68        # will each raise a KeyboardInterrupt exception and print to stderr at
69        # the same time.
70        pool = multiprocessing.Pool(jobs, worker_initializer,
71                                    (self.lit_config,
72                                     self.parallelism_semaphores))
73
74        # Install a console-control signal handler on Windows.
75        if win32api is not None:
76            def console_ctrl_handler(type):
77                print('\nCtrl-C detected, terminating.')
78                pool.terminate()
79                pool.join()
80                abort_now()
81                return True
82            win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
83
84        try:
85            async_results = [pool.apply_async(worker_run_one_test,
86                                              args=(test_index, test),
87                                              callback=self.consume_test_result)
88                             for test_index, test in enumerate(self.tests)]
89            pool.close()
90
91            # Wait for all results to come in. The callback that runs in the
92            # parent process will update the display.
93            for a in async_results:
94                if deadline:
95                    a.wait(deadline - time.time())
96                else:
97                    # Python condition variables cannot be interrupted unless
98                    # they have a timeout. This can make lit unresponsive to
99                    # KeyboardInterrupt, so do a busy wait with a timeout.
100                    while not a.ready():
101                        a.wait(1)
102                if not a.successful():
103                    a.get() # Exceptions raised here come from the worker.
104                if self.hit_max_failures:
105                    break
106        except:
107            # Stop the workers and wait for any straggling results to come in
108            # if we exited without waiting on every async result.
109            pool.terminate()
110            raise
111        finally:
112            pool.join()
113
114    def execute_tests(self, display, jobs, max_time=None):
115        """
116        execute_tests(display, jobs, [max_time])
117
118        Execute each of the tests in the run, using up to jobs number of
119        parallel tasks, and inform the display of each individual result. The
120        provided tests should be a subset of the tests available in this run
121        object.
122
123        If max_time is non-None, it should be a time in seconds after which to
124        stop executing tests.
125
126        The display object will have its update method called with each test as
127        it is completed. The calls are guaranteed to be locked with respect to
128        one another, but are *not* guaranteed to be called on the same thread as
129        this method was invoked on.
130
131        Upon completion, each test in the run will have its result
132        computed. Tests which were not actually executed (for any reason) will
133        be given an UNRESOLVED result.
134        """
135        # Don't do anything if we aren't going to run any tests.
136        if not self.tests or jobs == 0:
137            return
138
139        # Save the display object on the runner so that we can update it from
140        # our task completion callback.
141        self.display = display
142
143        self.failure_count = 0
144        self.hit_max_failures = False
145        if self.lit_config.singleProcess:
146            global child_lit_config
147            child_lit_config = self.lit_config
148            for test_index, test in enumerate(self.tests):
149                result = worker_run_one_test(test_index, test)
150                self.consume_test_result(result)
151        else:
152            self.execute_tests_in_pool(jobs, max_time)
153
154        # Mark any tests that weren't run as UNRESOLVED.
155        for test in self.tests:
156            if test.result is None:
157                test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
158
159    def consume_test_result(self, pool_result):
160        """Test completion callback for worker_run_one_test
161
162        Updates the test result status in the parent process. Each task in the
163        pool returns the test index and the result, and we use the index to look
164        up the original test object. Also updates the progress bar as tasks
165        complete.
166        """
167        # Don't add any more test results after we've hit the maximum failure
168        # count.  Otherwise we're racing with the main thread, which is going
169        # to terminate the process pool soon.
170        if self.hit_max_failures:
171            return
172
173        (test_index, test_with_result) = pool_result
174        # Update the parent process copy of the test. This includes the result,
175        # XFAILS, REQUIRES, and UNSUPPORTED statuses.
176        assert self.tests[test_index].file_path == test_with_result.file_path, \
177                "parent and child disagree on test path"
178        self.tests[test_index] = test_with_result
179        self.display.update(test_with_result)
180
181        # If we've finished all the tests or too many tests have failed, notify
182        # the main thread that we've stopped testing.
183        self.failure_count += (test_with_result.result.code == lit.Test.FAIL)
184        if self.lit_config.maxFailures and \
185                self.failure_count == self.lit_config.maxFailures:
186            self.hit_max_failures = True
187
188def _execute_test_impl(test, lit_config, parallelism_semaphores):
189    """Execute one test"""
190    pg = test.config.parallelism_group
191    if callable(pg):
192        pg = pg(test)
193
194    result = None
195    semaphore = None
196    try:
197        if pg:
198            semaphore = parallelism_semaphores[pg]
199        if semaphore:
200            semaphore.acquire()
201        start_time = time.time()
202        result = test.config.test_format.execute(test, lit_config)
203        # Support deprecated result from execute() which returned the result
204        # code and additional output as a tuple.
205        if isinstance(result, tuple):
206            code, output = result
207            result = lit.Test.Result(code, output)
208        elif not isinstance(result, lit.Test.Result):
209            raise ValueError("unexpected result from test execution")
210        result.elapsed = time.time() - start_time
211    except KeyboardInterrupt:
212        raise
213    except:
214        if lit_config.debug:
215            raise
216        output = 'Exception during script execution:\n'
217        output += traceback.format_exc()
218        output += '\n'
219        result = lit.Test.Result(lit.Test.UNRESOLVED, output)
220    finally:
221        if semaphore:
222            semaphore.release()
223
224    test.setResult(result)
225
226child_lit_config = None
227child_parallelism_semaphores = None
228
229def worker_initializer(lit_config, parallelism_semaphores):
230    """Copy expensive repeated data into worker processes"""
231    global child_lit_config
232    child_lit_config = lit_config
233    global child_parallelism_semaphores
234    child_parallelism_semaphores = parallelism_semaphores
235
236def worker_run_one_test(test_index, test):
237    """Run one test in a multiprocessing.Pool
238
239    Side effects in this function and functions it calls are not visible in the
240    main lit process.
241
242    Arguments and results of this function are pickled, so they should be cheap
243    to copy. For efficiency, we copy all data needed to execute all tests into
244    each worker and store it in the child_* global variables. This reduces the
245    cost of each task.
246
247    Returns an index and a Result, which the parent process uses to update
248    the display.
249    """
250    try:
251        _execute_test_impl(test, child_lit_config, child_parallelism_semaphores)
252        return (test_index, test)
253    except KeyboardInterrupt as e:
254        # If a worker process gets an interrupt, abort it immediately.
255        abort_now()
256    except:
257        traceback.print_exc()
258