• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# Copyright 2014 the V8 project authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from Queue import Empty
7from multiprocessing import Event, Process, Queue
8import traceback
9
10
11class NormalResult():
12  def __init__(self, result):
13    self.result = result
14    self.exception = False
15    self.break_now = False
16
17
18class ExceptionResult():
19  def __init__(self):
20    self.exception = True
21    self.break_now = False
22
23
24class BreakResult():
25  def __init__(self):
26    self.exception = False
27    self.break_now = True
28
29
30class MaybeResult():
31  def __init__(self, heartbeat, value):
32    self.heartbeat = heartbeat
33    self.value = value
34
35  @staticmethod
36  def create_heartbeat():
37    return MaybeResult(True, None)
38
39  @staticmethod
40  def create_result(value):
41    return MaybeResult(False, value)
42
43
44def Worker(fn, work_queue, done_queue, done,
45           process_context_fn=None, process_context_args=None):
46  """Worker to be run in a child process.
47  The worker stops on two conditions. 1. When the poison pill "STOP" is
48  reached or 2. when the event "done" is set."""
49  try:
50    kwargs = {}
51    if process_context_fn and process_context_args is not None:
52      kwargs.update(process_context=process_context_fn(*process_context_args))
53    for args in iter(work_queue.get, "STOP"):
54      if done.is_set():
55        break
56      try:
57        done_queue.put(NormalResult(fn(*args, **kwargs)))
58      except Exception, e:
59        traceback.print_exc()
60        print(">>> EXCEPTION: %s" % e)
61        done_queue.put(ExceptionResult())
62  except KeyboardInterrupt:
63    done_queue.put(BreakResult())
64
65
66class Pool():
67  """Distributes tasks to a number of worker processes.
68  New tasks can be added dynamically even after the workers have been started.
69  Requirement: Tasks can only be added from the parent process, e.g. while
70  consuming the results generator."""
71
72  # Factor to calculate the maximum number of items in the work/done queue.
73  # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
74  BUFFER_FACTOR = 4
75
76  def __init__(self, num_workers, heartbeat_timeout=30):
77    self.num_workers = num_workers
78    self.processes = []
79    self.terminated = False
80
81    # Invariant: count >= #work_queue + #done_queue. It is greater when a
82    # worker takes an item from the work_queue and before the result is
83    # submitted to the done_queue. It is equal when no worker is working,
84    # e.g. when all workers have finished, and when no results are processed.
85    # Count is only accessed by the parent process. Only the parent process is
86    # allowed to remove items from the done_queue and to add items to the
87    # work_queue.
88    self.count = 0
89    self.work_queue = Queue()
90    self.done_queue = Queue()
91    self.done = Event()
92    self.heartbeat_timeout = heartbeat_timeout
93
94  def imap_unordered(self, fn, gen,
95                     process_context_fn=None, process_context_args=None):
96    """Maps function "fn" to items in generator "gen" on the worker processes
97    in an arbitrary order. The items are expected to be lists of arguments to
98    the function. Returns a results iterator. A result value of type
99    MaybeResult either indicates a heartbeat of the runner, i.e. indicating
100    that the runner is still waiting for the result to be computed, or it wraps
101    the real result.
102
103    Args:
104      process_context_fn: Function executed once by each worker. Expected to
105          return a process-context object. If present, this object is passed
106          as additional argument to each call to fn.
107      process_context_args: List of arguments for the invocation of
108          process_context_fn. All arguments will be pickled and sent beyond the
109          process boundary.
110    """
111    try:
112      gen = iter(gen)
113      self.advance = self._advance_more
114
115      for w in xrange(self.num_workers):
116        p = Process(target=Worker, args=(fn,
117                                         self.work_queue,
118                                         self.done_queue,
119                                         self.done,
120                                         process_context_fn,
121                                         process_context_args))
122        self.processes.append(p)
123        p.start()
124
125      self.advance(gen)
126      while self.count > 0:
127        while True:
128          try:
129            result = self.done_queue.get(timeout=self.heartbeat_timeout)
130            break
131          except Empty:
132            # Indicate a heartbeat. The iterator will continue fetching the
133            # next result.
134            yield MaybeResult.create_heartbeat()
135        self.count -= 1
136        if result.exception:
137          # Ignore items with unexpected exceptions.
138          continue
139        elif result.break_now:
140          # A keyboard interrupt happened in one of the worker processes.
141          raise KeyboardInterrupt
142        else:
143          yield MaybeResult.create_result(result.result)
144        self.advance(gen)
145    finally:
146      self.terminate()
147
148  def _advance_more(self, gen):
149    while self.count < self.num_workers * self.BUFFER_FACTOR:
150      try:
151        self.work_queue.put(gen.next())
152        self.count += 1
153      except StopIteration:
154        self.advance = self._advance_empty
155        break
156
157  def _advance_empty(self, gen):
158    pass
159
160  def add(self, args):
161    """Adds an item to the work queue. Can be called dynamically while
162    processing the results from imap_unordered."""
163    self.work_queue.put(args)
164    self.count += 1
165
166  def terminate(self):
167    if self.terminated:
168      return
169    self.terminated = True
170
171    # For exceptional tear down set the "done" event to stop the workers before
172    # they empty the queue buffer.
173    self.done.set()
174
175    for p in self.processes:
176      # During normal tear down the workers block on get(). Feed a poison pill
177      # per worker to make them stop.
178      self.work_queue.put("STOP")
179
180    for p in self.processes:
181      p.join()
182
183    # Drain the queues to prevent failures when queues are garbage collected.
184    try:
185      while True: self.work_queue.get(False)
186    except:
187      pass
188    try:
189      while True: self.done_queue.get(False)
190    except:
191      pass
192