1#!/usr/bin/env python 2# Copyright 2014 the V8 project authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from Queue import Empty 7from multiprocessing import Event, Process, Queue 8import traceback 9 10 11class NormalResult(): 12 def __init__(self, result): 13 self.result = result 14 self.exception = False 15 self.break_now = False 16 17 18class ExceptionResult(): 19 def __init__(self): 20 self.exception = True 21 self.break_now = False 22 23 24class BreakResult(): 25 def __init__(self): 26 self.exception = False 27 self.break_now = True 28 29 30class MaybeResult(): 31 def __init__(self, heartbeat, value): 32 self.heartbeat = heartbeat 33 self.value = value 34 35 @staticmethod 36 def create_heartbeat(): 37 return MaybeResult(True, None) 38 39 @staticmethod 40 def create_result(value): 41 return MaybeResult(False, value) 42 43 44def Worker(fn, work_queue, done_queue, done, 45 process_context_fn=None, process_context_args=None): 46 """Worker to be run in a child process. 47 The worker stops on two conditions. 1. When the poison pill "STOP" is 48 reached or 2. when the event "done" is set.""" 49 try: 50 kwargs = {} 51 if process_context_fn and process_context_args is not None: 52 kwargs.update(process_context=process_context_fn(*process_context_args)) 53 for args in iter(work_queue.get, "STOP"): 54 if done.is_set(): 55 break 56 try: 57 done_queue.put(NormalResult(fn(*args, **kwargs))) 58 except Exception, e: 59 traceback.print_exc() 60 print(">>> EXCEPTION: %s" % e) 61 done_queue.put(ExceptionResult()) 62 except KeyboardInterrupt: 63 done_queue.put(BreakResult()) 64 65 66class Pool(): 67 """Distributes tasks to a number of worker processes. 68 New tasks can be added dynamically even after the workers have been started. 69 Requirement: Tasks can only be added from the parent process, e.g. while 70 consuming the results generator.""" 71 72 # Factor to calculate the maximum number of items in the work/done queue. 73 # Necessary to not overflow the queue's pipe if a keyboard interrupt happens. 74 BUFFER_FACTOR = 4 75 76 def __init__(self, num_workers, heartbeat_timeout=30): 77 self.num_workers = num_workers 78 self.processes = [] 79 self.terminated = False 80 81 # Invariant: count >= #work_queue + #done_queue. It is greater when a 82 # worker takes an item from the work_queue and before the result is 83 # submitted to the done_queue. It is equal when no worker is working, 84 # e.g. when all workers have finished, and when no results are processed. 85 # Count is only accessed by the parent process. Only the parent process is 86 # allowed to remove items from the done_queue and to add items to the 87 # work_queue. 88 self.count = 0 89 self.work_queue = Queue() 90 self.done_queue = Queue() 91 self.done = Event() 92 self.heartbeat_timeout = heartbeat_timeout 93 94 def imap_unordered(self, fn, gen, 95 process_context_fn=None, process_context_args=None): 96 """Maps function "fn" to items in generator "gen" on the worker processes 97 in an arbitrary order. The items are expected to be lists of arguments to 98 the function. Returns a results iterator. A result value of type 99 MaybeResult either indicates a heartbeat of the runner, i.e. indicating 100 that the runner is still waiting for the result to be computed, or it wraps 101 the real result. 102 103 Args: 104 process_context_fn: Function executed once by each worker. Expected to 105 return a process-context object. If present, this object is passed 106 as additional argument to each call to fn. 107 process_context_args: List of arguments for the invocation of 108 process_context_fn. All arguments will be pickled and sent beyond the 109 process boundary. 110 """ 111 try: 112 internal_error = False 113 gen = iter(gen) 114 self.advance = self._advance_more 115 116 for w in xrange(self.num_workers): 117 p = Process(target=Worker, args=(fn, 118 self.work_queue, 119 self.done_queue, 120 self.done, 121 process_context_fn, 122 process_context_args)) 123 self.processes.append(p) 124 p.start() 125 126 self.advance(gen) 127 while self.count > 0: 128 while True: 129 try: 130 result = self.done_queue.get(timeout=self.heartbeat_timeout) 131 break 132 except Empty: 133 # Indicate a heartbeat. The iterator will continue fetching the 134 # next result. 135 yield MaybeResult.create_heartbeat() 136 self.count -= 1 137 if result.exception: 138 # TODO(machenbach): Handle a few known types of internal errors 139 # gracefully, e.g. missing test files. 140 internal_error = True 141 continue 142 elif result.break_now: 143 # A keyboard interrupt happened in one of the worker processes. 144 raise KeyboardInterrupt 145 else: 146 yield MaybeResult.create_result(result.result) 147 self.advance(gen) 148 finally: 149 self.terminate() 150 if internal_error: 151 raise Exception("Internal error in a worker process.") 152 153 def _advance_more(self, gen): 154 while self.count < self.num_workers * self.BUFFER_FACTOR: 155 try: 156 self.work_queue.put(gen.next()) 157 self.count += 1 158 except StopIteration: 159 self.advance = self._advance_empty 160 break 161 162 def _advance_empty(self, gen): 163 pass 164 165 def add(self, args): 166 """Adds an item to the work queue. Can be called dynamically while 167 processing the results from imap_unordered.""" 168 self.work_queue.put(args) 169 self.count += 1 170 171 def terminate(self): 172 if self.terminated: 173 return 174 self.terminated = True 175 176 # For exceptional tear down set the "done" event to stop the workers before 177 # they empty the queue buffer. 178 self.done.set() 179 180 for p in self.processes: 181 # During normal tear down the workers block on get(). Feed a poison pill 182 # per worker to make them stop. 183 self.work_queue.put("STOP") 184 185 for p in self.processes: 186 p.join() 187 188 # Drain the queues to prevent failures when queues are garbage collected. 189 try: 190 while True: self.work_queue.get(False) 191 except: 192 pass 193 try: 194 while True: self.done_queue.get(False) 195 except: 196 pass 197