1#!/usr/bin/env python3 2# Copyright 2014 the V8 project authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from contextlib import contextmanager 7from multiprocessing import Process, Queue 8import os 9import signal 10import time 11import traceback 12 13try: 14 from queue import Empty # Python 3 15except ImportError: 16 from Queue import Empty # Python 2 17 18from . import command 19from . import utils 20 21 22def setup_testing(): 23 """For testing only: Use threading under the hood instead of multiprocessing 24 to make coverage work. 25 """ 26 global Queue 27 global Process 28 del Queue 29 del Process 30 try: 31 from queue import Queue # Python 3 32 except ImportError: 33 from Queue import Queue # Python 2 34 35 from threading import Thread as Process 36 # Monkeypatch threading Queue to look like multiprocessing Queue. 37 Queue.cancel_join_thread = lambda self: None 38 # Monkeypatch os.kill and add fake pid property on Thread. 39 os.kill = lambda *args: None 40 Process.pid = property(lambda self: None) 41 42 43class NormalResult(): 44 def __init__(self, result): 45 self.result = result 46 self.exception = None 47 48class ExceptionResult(): 49 def __init__(self, exception): 50 self.exception = exception 51 52 53class MaybeResult(): 54 def __init__(self, heartbeat, value): 55 self.heartbeat = heartbeat 56 self.value = value 57 58 @staticmethod 59 def create_heartbeat(): 60 return MaybeResult(True, None) 61 62 @staticmethod 63 def create_result(value): 64 return MaybeResult(False, value) 65 66 67def Worker(fn, work_queue, done_queue, 68 process_context_fn=None, process_context_args=None): 69 """Worker to be run in a child process. 70 The worker stops when the poison pill "STOP" is reached. 71 """ 72 try: 73 kwargs = {} 74 if process_context_fn and process_context_args is not None: 75 kwargs.update(process_context=process_context_fn(*process_context_args)) 76 for args in iter(work_queue.get, "STOP"): 77 try: 78 done_queue.put(NormalResult(fn(*args, **kwargs))) 79 except command.AbortException: 80 # SIGINT, SIGTERM or internal hard timeout. 81 break 82 except Exception as e: 83 traceback.print_exc() 84 print(">>> EXCEPTION: %s" % e) 85 done_queue.put(ExceptionResult(e)) 86 # When we reach here on normal tear down, all items have been pulled from 87 # the done_queue before and this should have no effect. On fast abort, it's 88 # possible that a fast worker left items on the done_queue in memory, which 89 # will never be pulled. This call purges those to avoid a deadlock. 90 done_queue.cancel_join_thread() 91 except KeyboardInterrupt: 92 assert False, 'Unreachable' 93 94 95@contextmanager 96def without_sig(): 97 int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) 98 term_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN) 99 try: 100 yield 101 finally: 102 signal.signal(signal.SIGINT, int_handler) 103 signal.signal(signal.SIGTERM, term_handler) 104 105 106class Pool(): 107 """Distributes tasks to a number of worker processes. 108 New tasks can be added dynamically even after the workers have been started. 109 Requirement: Tasks can only be added from the parent process, e.g. while 110 consuming the results generator.""" 111 112 # Factor to calculate the maximum number of items in the work/done queue. 113 # Necessary to not overflow the queue's pipe if a keyboard interrupt happens. 114 BUFFER_FACTOR = 4 115 116 def __init__(self, num_workers, heartbeat_timeout=1, notify_fun=None): 117 """ 118 Args: 119 num_workers: Number of worker processes to run in parallel. 120 heartbeat_timeout: Timeout in seconds for waiting for results. Each time 121 the timeout is reached, a heartbeat is signalled and timeout is reset. 122 notify_fun: Callable called to signale some events like termination. The 123 event name is passed as string. 124 """ 125 self.num_workers = num_workers 126 self.processes = [] 127 self.terminated = False 128 self.abort_now = False 129 130 # Invariant: processing_count >= #work_queue + #done_queue. It is greater 131 # when a worker takes an item from the work_queue and before the result is 132 # submitted to the done_queue. It is equal when no worker is working, 133 # e.g. when all workers have finished, and when no results are processed. 134 # Count is only accessed by the parent process. Only the parent process is 135 # allowed to remove items from the done_queue and to add items to the 136 # work_queue. 137 self.processing_count = 0 138 self.heartbeat_timeout = heartbeat_timeout 139 self.notify = notify_fun or (lambda x: x) 140 141 # Disable sigint and sigterm to prevent subprocesses from capturing the 142 # signals. 143 with without_sig(): 144 self.work_queue = Queue() 145 self.done_queue = Queue() 146 147 def imap_unordered(self, fn, gen, 148 process_context_fn=None, process_context_args=None): 149 """Maps function "fn" to items in generator "gen" on the worker processes 150 in an arbitrary order. The items are expected to be lists of arguments to 151 the function. Returns a results iterator. A result value of type 152 MaybeResult either indicates a heartbeat of the runner, i.e. indicating 153 that the runner is still waiting for the result to be computed, or it wraps 154 the real result. 155 156 Args: 157 process_context_fn: Function executed once by each worker. Expected to 158 return a process-context object. If present, this object is passed 159 as additional argument to each call to fn. 160 process_context_args: List of arguments for the invocation of 161 process_context_fn. All arguments will be pickled and sent beyond the 162 process boundary. 163 """ 164 if self.terminated: 165 return 166 try: 167 internal_error = False 168 gen = iter(gen) 169 self.advance = self._advance_more 170 171 # Disable sigint and sigterm to prevent subprocesses from capturing the 172 # signals. 173 with without_sig(): 174 for w in range(self.num_workers): 175 p = Process(target=Worker, args=(fn, 176 self.work_queue, 177 self.done_queue, 178 process_context_fn, 179 process_context_args)) 180 p.start() 181 self.processes.append(p) 182 183 self.advance(gen) 184 while self.processing_count > 0: 185 while True: 186 try: 187 # Read from result queue in a responsive fashion. If available, 188 # this will return a normal result immediately or a heartbeat on 189 # heartbeat timeout (default 1 second). 190 result = self._get_result_from_queue() 191 except: 192 # TODO(machenbach): Handle a few known types of internal errors 193 # gracefully, e.g. missing test files. 194 internal_error = True 195 continue 196 finally: 197 if self.abort_now: 198 # SIGINT, SIGTERM or internal hard timeout. 199 return 200 201 yield result 202 break 203 204 self.advance(gen) 205 except KeyboardInterrupt: 206 assert False, 'Unreachable' 207 except Exception as e: 208 traceback.print_exc() 209 print(">>> EXCEPTION: %s" % e) 210 finally: 211 self._terminate() 212 213 if internal_error: 214 raise Exception("Internal error in a worker process.") 215 216 def _advance_more(self, gen): 217 while self.processing_count < self.num_workers * self.BUFFER_FACTOR: 218 try: 219 self.work_queue.put(next(gen)) 220 self.processing_count += 1 221 except StopIteration: 222 self.advance = self._advance_empty 223 break 224 225 def _advance_empty(self, gen): 226 pass 227 228 def add(self, args): 229 """Adds an item to the work queue. Can be called dynamically while 230 processing the results from imap_unordered.""" 231 assert not self.terminated 232 233 self.work_queue.put(args) 234 self.processing_count += 1 235 236 def abort(self): 237 """Schedules abort on next queue read. 238 239 This is safe to call when handling SIGINT, SIGTERM or when an internal 240 hard timeout is reached. 241 """ 242 self.abort_now = True 243 244 def _terminate_processes(self): 245 for p in self.processes: 246 if utils.IsWindows(): 247 command.taskkill_windows(p, verbose=True, force=False) 248 else: 249 os.kill(p.pid, signal.SIGTERM) 250 251 def _terminate(self): 252 """Terminates execution and cleans up the queues. 253 254 If abort() was called before termination, this also terminates the 255 subprocesses and doesn't wait for ongoing tests. 256 """ 257 if self.terminated: 258 return 259 self.terminated = True 260 261 # Drain out work queue from tests 262 try: 263 while True: 264 self.work_queue.get(True, 0.1) 265 except Empty: 266 pass 267 268 # Make sure all processes stop 269 for _ in self.processes: 270 # During normal tear down the workers block on get(). Feed a poison pill 271 # per worker to make them stop. 272 self.work_queue.put("STOP") 273 274 if self.abort_now: 275 self._terminate_processes() 276 277 self.notify("Joining workers") 278 for p in self.processes: 279 p.join() 280 281 # Drain the queues to prevent stderr chatter when queues are garbage 282 # collected. 283 self.notify("Draining queues") 284 try: 285 while True: self.work_queue.get(False) 286 except: 287 pass 288 try: 289 while True: self.done_queue.get(False) 290 except: 291 pass 292 293 def _get_result_from_queue(self): 294 """Attempts to get the next result from the queue. 295 296 Returns: A wrapped result if one was available within heartbeat timeout, 297 a heartbeat result otherwise. 298 Raises: 299 Exception: If an exception occured when processing the task on the 300 worker side, it is reraised here. 301 """ 302 while True: 303 try: 304 result = self.done_queue.get(timeout=self.heartbeat_timeout) 305 self.processing_count -= 1 306 if result.exception: 307 raise result.exception 308 return MaybeResult.create_result(result.result) 309 except Empty: 310 return MaybeResult.create_heartbeat() 311