• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2014 the V8 project authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from contextlib import contextmanager
7from multiprocessing import Process, Queue
8import os
9import signal
10import time
11import traceback
12
13try:
14  from queue import Empty  # Python 3
15except ImportError:
16  from Queue import Empty  # Python 2
17
18from . import command
19from . import utils
20
21
22def setup_testing():
23  """For testing only: Use threading under the hood instead of multiprocessing
24  to make coverage work.
25  """
26  global Queue
27  global Process
28  del Queue
29  del Process
30  try:
31    from queue import Queue  # Python 3
32  except ImportError:
33    from Queue import Queue  # Python 2
34
35  from threading import Thread as Process
36  # Monkeypatch threading Queue to look like multiprocessing Queue.
37  Queue.cancel_join_thread = lambda self: None
38  # Monkeypatch os.kill and add fake pid property on Thread.
39  os.kill = lambda *args: None
40  Process.pid = property(lambda self: None)
41
42
43class NormalResult():
44  def __init__(self, result):
45    self.result = result
46    self.exception = None
47
48class ExceptionResult():
49  def __init__(self, exception):
50    self.exception = exception
51
52
53class MaybeResult():
54  def __init__(self, heartbeat, value):
55    self.heartbeat = heartbeat
56    self.value = value
57
58  @staticmethod
59  def create_heartbeat():
60    return MaybeResult(True, None)
61
62  @staticmethod
63  def create_result(value):
64    return MaybeResult(False, value)
65
66
67def Worker(fn, work_queue, done_queue,
68           process_context_fn=None, process_context_args=None):
69  """Worker to be run in a child process.
70  The worker stops when the poison pill "STOP" is reached.
71  """
72  try:
73    kwargs = {}
74    if process_context_fn and process_context_args is not None:
75      kwargs.update(process_context=process_context_fn(*process_context_args))
76    for args in iter(work_queue.get, "STOP"):
77      try:
78        done_queue.put(NormalResult(fn(*args, **kwargs)))
79      except command.AbortException:
80        # SIGINT, SIGTERM or internal hard timeout.
81        break
82      except Exception as e:
83        traceback.print_exc()
84        print(">>> EXCEPTION: %s" % e)
85        done_queue.put(ExceptionResult(e))
86    # When we reach here on normal tear down, all items have been pulled from
87    # the done_queue before and this should have no effect. On fast abort, it's
88    # possible that a fast worker left items on the done_queue in memory, which
89    # will never be pulled. This call purges those to avoid a deadlock.
90    done_queue.cancel_join_thread()
91  except KeyboardInterrupt:
92    assert False, 'Unreachable'
93
94
95@contextmanager
96def without_sig():
97  int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
98  term_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
99  try:
100    yield
101  finally:
102    signal.signal(signal.SIGINT, int_handler)
103    signal.signal(signal.SIGTERM, term_handler)
104
105
106class Pool():
107  """Distributes tasks to a number of worker processes.
108  New tasks can be added dynamically even after the workers have been started.
109  Requirement: Tasks can only be added from the parent process, e.g. while
110  consuming the results generator."""
111
112  # Factor to calculate the maximum number of items in the work/done queue.
113  # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
114  BUFFER_FACTOR = 4
115
116  def __init__(self, num_workers, heartbeat_timeout=1, notify_fun=None):
117    """
118    Args:
119      num_workers: Number of worker processes to run in parallel.
120      heartbeat_timeout: Timeout in seconds for waiting for results. Each time
121          the timeout is reached, a heartbeat is signalled and timeout is reset.
122      notify_fun: Callable called to signale some events like termination. The
123          event name is passed as string.
124    """
125    self.num_workers = num_workers
126    self.processes = []
127    self.terminated = False
128    self.abort_now = False
129
130    # Invariant: processing_count >= #work_queue + #done_queue. It is greater
131    # when a worker takes an item from the work_queue and before the result is
132    # submitted to the done_queue. It is equal when no worker is working,
133    # e.g. when all workers have finished, and when no results are processed.
134    # Count is only accessed by the parent process. Only the parent process is
135    # allowed to remove items from the done_queue and to add items to the
136    # work_queue.
137    self.processing_count = 0
138    self.heartbeat_timeout = heartbeat_timeout
139    self.notify = notify_fun or (lambda x: x)
140
141    # Disable sigint and sigterm to prevent subprocesses from capturing the
142    # signals.
143    with without_sig():
144      self.work_queue = Queue()
145      self.done_queue = Queue()
146
147  def imap_unordered(self, fn, gen,
148                     process_context_fn=None, process_context_args=None):
149    """Maps function "fn" to items in generator "gen" on the worker processes
150    in an arbitrary order. The items are expected to be lists of arguments to
151    the function. Returns a results iterator. A result value of type
152    MaybeResult either indicates a heartbeat of the runner, i.e. indicating
153    that the runner is still waiting for the result to be computed, or it wraps
154    the real result.
155
156    Args:
157      process_context_fn: Function executed once by each worker. Expected to
158          return a process-context object. If present, this object is passed
159          as additional argument to each call to fn.
160      process_context_args: List of arguments for the invocation of
161          process_context_fn. All arguments will be pickled and sent beyond the
162          process boundary.
163    """
164    if self.terminated:
165      return
166    try:
167      internal_error = False
168      gen = iter(gen)
169      self.advance = self._advance_more
170
171      # Disable sigint and sigterm to prevent subprocesses from capturing the
172      # signals.
173      with without_sig():
174        for w in range(self.num_workers):
175          p = Process(target=Worker, args=(fn,
176                                          self.work_queue,
177                                          self.done_queue,
178                                          process_context_fn,
179                                          process_context_args))
180          p.start()
181          self.processes.append(p)
182
183      self.advance(gen)
184      while self.processing_count > 0:
185        while True:
186          try:
187            # Read from result queue in a responsive fashion. If available,
188            # this will return a normal result immediately or a heartbeat on
189            # heartbeat timeout (default 1 second).
190            result = self._get_result_from_queue()
191          except:
192            # TODO(machenbach): Handle a few known types of internal errors
193            # gracefully, e.g. missing test files.
194            internal_error = True
195            continue
196          finally:
197            if self.abort_now:
198              # SIGINT, SIGTERM or internal hard timeout.
199              return
200
201          yield result
202          break
203
204        self.advance(gen)
205    except KeyboardInterrupt:
206      assert False, 'Unreachable'
207    except Exception as e:
208      traceback.print_exc()
209      print(">>> EXCEPTION: %s" % e)
210    finally:
211      self._terminate()
212
213    if internal_error:
214      raise Exception("Internal error in a worker process.")
215
216  def _advance_more(self, gen):
217    while self.processing_count < self.num_workers * self.BUFFER_FACTOR:
218      try:
219        self.work_queue.put(next(gen))
220        self.processing_count += 1
221      except StopIteration:
222        self.advance = self._advance_empty
223        break
224
225  def _advance_empty(self, gen):
226    pass
227
228  def add(self, args):
229    """Adds an item to the work queue. Can be called dynamically while
230    processing the results from imap_unordered."""
231    assert not self.terminated
232
233    self.work_queue.put(args)
234    self.processing_count += 1
235
236  def abort(self):
237    """Schedules abort on next queue read.
238
239    This is safe to call when handling SIGINT, SIGTERM or when an internal
240    hard timeout is reached.
241    """
242    self.abort_now = True
243
244  def _terminate_processes(self):
245    for p in self.processes:
246      if utils.IsWindows():
247        command.taskkill_windows(p, verbose=True, force=False)
248      else:
249        os.kill(p.pid, signal.SIGTERM)
250
251  def _terminate(self):
252    """Terminates execution and cleans up the queues.
253
254    If abort() was called before termination, this also terminates the
255    subprocesses and doesn't wait for ongoing tests.
256    """
257    if self.terminated:
258      return
259    self.terminated = True
260
261    # Drain out work queue from tests
262    try:
263      while True:
264        self.work_queue.get(True, 0.1)
265    except Empty:
266      pass
267
268    # Make sure all processes stop
269    for _ in self.processes:
270      # During normal tear down the workers block on get(). Feed a poison pill
271      # per worker to make them stop.
272      self.work_queue.put("STOP")
273
274    if self.abort_now:
275      self._terminate_processes()
276
277    self.notify("Joining workers")
278    for p in self.processes:
279      p.join()
280
281    # Drain the queues to prevent stderr chatter when queues are garbage
282    # collected.
283    self.notify("Draining queues")
284    try:
285      while True: self.work_queue.get(False)
286    except:
287      pass
288    try:
289      while True: self.done_queue.get(False)
290    except:
291      pass
292
293  def _get_result_from_queue(self):
294    """Attempts to get the next result from the queue.
295
296    Returns: A wrapped result if one was available within heartbeat timeout,
297        a heartbeat result otherwise.
298    Raises:
299        Exception: If an exception occured when processing the task on the
300            worker side, it is reraised here.
301    """
302    while True:
303      try:
304        result = self.done_queue.get(timeout=self.heartbeat_timeout)
305        self.processing_count -= 1
306        if result.exception:
307          raise result.exception
308        return MaybeResult.create_result(result.result)
309      except Empty:
310        return MaybeResult.create_heartbeat()
311