1#!/usr/bin/env python 2# Copyright 2014 the V8 project authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from multiprocessing import Event, Process, Queue 7 8class NormalResult(): 9 def __init__(self, result): 10 self.result = result 11 self.exception = False 12 self.break_now = False 13 14 15class ExceptionResult(): 16 def __init__(self): 17 self.exception = True 18 self.break_now = False 19 20 21class BreakResult(): 22 def __init__(self): 23 self.exception = False 24 self.break_now = True 25 26 27def Worker(fn, work_queue, done_queue, done): 28 """Worker to be run in a child process. 29 The worker stops on two conditions. 1. When the poison pill "STOP" is 30 reached or 2. when the event "done" is set.""" 31 try: 32 for args in iter(work_queue.get, "STOP"): 33 if done.is_set(): 34 break 35 try: 36 done_queue.put(NormalResult(fn(*args))) 37 except Exception, e: 38 print(">>> EXCEPTION: %s" % e) 39 done_queue.put(ExceptionResult()) 40 except KeyboardInterrupt: 41 done_queue.put(BreakResult()) 42 43 44class Pool(): 45 """Distributes tasks to a number of worker processes. 46 New tasks can be added dynamically even after the workers have been started. 47 Requirement: Tasks can only be added from the parent process, e.g. while 48 consuming the results generator.""" 49 50 # Factor to calculate the maximum number of items in the work/done queue. 51 # Necessary to not overflow the queue's pipe if a keyboard interrupt happens. 52 BUFFER_FACTOR = 4 53 54 def __init__(self, num_workers): 55 self.num_workers = num_workers 56 self.processes = [] 57 self.terminated = False 58 59 # Invariant: count >= #work_queue + #done_queue. It is greater when a 60 # worker takes an item from the work_queue and before the result is 61 # submitted to the done_queue. It is equal when no worker is working, 62 # e.g. when all workers have finished, and when no results are processed. 63 # Count is only accessed by the parent process. Only the parent process is 64 # allowed to remove items from the done_queue and to add items to the 65 # work_queue. 66 self.count = 0 67 self.work_queue = Queue() 68 self.done_queue = Queue() 69 self.done = Event() 70 71 def imap_unordered(self, fn, gen): 72 """Maps function "fn" to items in generator "gen" on the worker processes 73 in an arbitrary order. The items are expected to be lists of arguments to 74 the function. Returns a results iterator.""" 75 try: 76 gen = iter(gen) 77 self.advance = self._advance_more 78 79 for w in xrange(self.num_workers): 80 p = Process(target=Worker, args=(fn, 81 self.work_queue, 82 self.done_queue, 83 self.done)) 84 self.processes.append(p) 85 p.start() 86 87 self.advance(gen) 88 while self.count > 0: 89 result = self.done_queue.get() 90 self.count -= 1 91 if result.exception: 92 # Ignore items with unexpected exceptions. 93 continue 94 elif result.break_now: 95 # A keyboard interrupt happened in one of the worker processes. 96 raise KeyboardInterrupt 97 else: 98 yield result.result 99 self.advance(gen) 100 finally: 101 self.terminate() 102 103 def _advance_more(self, gen): 104 while self.count < self.num_workers * self.BUFFER_FACTOR: 105 try: 106 self.work_queue.put(gen.next()) 107 self.count += 1 108 except StopIteration: 109 self.advance = self._advance_empty 110 break 111 112 def _advance_empty(self, gen): 113 pass 114 115 def add(self, args): 116 """Adds an item to the work queue. Can be called dynamically while 117 processing the results from imap_unordered.""" 118 self.work_queue.put(args) 119 self.count += 1 120 121 def terminate(self): 122 if self.terminated: 123 return 124 self.terminated = True 125 126 # For exceptional tear down set the "done" event to stop the workers before 127 # they empty the queue buffer. 128 self.done.set() 129 130 for p in self.processes: 131 # During normal tear down the workers block on get(). Feed a poison pill 132 # per worker to make them stop. 133 self.work_queue.put("STOP") 134 135 for p in self.processes: 136 p.join() 137 138 # Drain the queues to prevent failures when queues are garbage collected. 139 try: 140 while True: self.work_queue.get(False) 141 except: 142 pass 143 try: 144 while True: self.done_queue.get(False) 145 except: 146 pass 147