• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Thread and ThreadGroup that reraise exceptions on the main thread."""
6# pylint: disable=W0212
7
8import logging
9import sys
10import threading
11import time
12import traceback
13
14from devil import base_error
15from devil.utils import watchdog_timer
16
17
18class TimeoutError(base_error.BaseError):
19  """Module-specific timeout exception."""
20  def __init__(self, message):
21    super(TimeoutError, self).__init__(message)
22
23
24def LogThreadStack(thread, error_log_func=logging.critical):
25  """Log the stack for the given thread.
26
27  Args:
28    thread: a threading.Thread instance.
29    error_log_func: Logging function when logging errors.
30  """
31  stack = sys._current_frames()[thread.ident]
32  error_log_func('*' * 80)
33  error_log_func('Stack dump for thread %r', thread.name)
34  error_log_func('*' * 80)
35  for filename, lineno, name, line in traceback.extract_stack(stack):
36    error_log_func('File: "%s", line %d, in %s', filename, lineno, name)
37    if line:
38      error_log_func('  %s', line.strip())
39  error_log_func('*' * 80)
40
41
42class ReraiserThread(threading.Thread):
43  """Thread class that can reraise exceptions."""
44
45  def __init__(self, func, args=None, kwargs=None, name=None):
46    """Initialize thread.
47
48    Args:
49      func: callable to call on a new thread.
50      args: list of positional arguments for callable, defaults to empty.
51      kwargs: dictionary of keyword arguments for callable, defaults to empty.
52      name: thread name, defaults to the function name.
53    """
54    if not name:
55      if hasattr(func, '__name__') and func.__name__ != '<lambda>':
56        name = func.__name__
57      else:
58        name = 'anonymous'
59    super(ReraiserThread, self).__init__(name=name)
60    if not args:
61      args = []
62    if not kwargs:
63      kwargs = {}
64    self.daemon = True
65    self._func = func
66    self._args = args
67    self._kwargs = kwargs
68    self._ret = None
69    self._exc_info = None
70    self._thread_group = None
71
72  if sys.version_info < (3,):
73    # pylint: disable=exec-used
74    exec('''def ReraiseIfException(self):
75  """Reraise exception if an exception was raised in the thread."""
76  if self._exc_info:
77    raise self._exc_info[0], self._exc_info[1], self._exc_info[2]''')
78  else:
79    def ReraiseIfException(self):
80      """Reraise exception if an exception was raised in the thread."""
81      if self._exc_info:
82        raise self._exc_info[1]
83
84  def GetReturnValue(self):
85    """Reraise exception if present, otherwise get the return value."""
86    self.ReraiseIfException()
87    return self._ret
88
89  # override
90  def run(self):
91    """Overrides Thread.run() to add support for reraising exceptions."""
92    try:
93      self._ret = self._func(*self._args, **self._kwargs)
94    except:  # pylint: disable=W0702
95      self._exc_info = sys.exc_info()
96
97
98class ReraiserThreadGroup(object):
99  """A group of ReraiserThread objects."""
100
101  def __init__(self, threads=None):
102    """Initialize thread group.
103
104    Args:
105      threads: a list of ReraiserThread objects; defaults to empty.
106    """
107    self._threads = []
108    # Set when a thread from one group has called JoinAll on another. It is used
109    # to detect when a there is a TimeoutRetryThread active that links to the
110    # current thread.
111    self.blocked_parent_thread_group = None
112    if threads:
113      for thread in threads:
114        self.Add(thread)
115
116  def Add(self, thread):
117    """Add a thread to the group.
118
119    Args:
120      thread: a ReraiserThread object.
121    """
122    assert thread._thread_group is None
123    thread._thread_group = self
124    self._threads.append(thread)
125
126  def StartAll(self, will_block=False):
127    """Start all threads.
128
129    Args:
130      will_block: Whether the calling thread will subsequently block on this
131        thread group. Causes the active ReraiserThreadGroup (if there is one)
132        to be marked as blocking on this thread group.
133    """
134    if will_block:
135      # Multiple threads blocking on the same outer thread should not happen in
136      # practice.
137      assert not self.blocked_parent_thread_group
138      self.blocked_parent_thread_group = CurrentThreadGroup()
139    for thread in self._threads:
140      thread.start()
141
142  def _JoinAll(self, watcher=None, timeout=None):
143    """Join all threads without stack dumps.
144
145    Reraises exceptions raised by the child threads and supports breaking
146    immediately on exceptions raised on the main thread.
147
148    Args:
149      watcher: Watchdog object providing the thread timeout. If none is
150          provided, the thread will never be timed out.
151      timeout: An optional number of seconds to wait before timing out the join
152          operation. This will not time out the threads.
153    """
154    if watcher is None:
155      watcher = watchdog_timer.WatchdogTimer(None)
156    alive_threads = self._threads[:]
157    end_time = (time.time() + timeout) if timeout else None
158    try:
159      while alive_threads and (end_time is None or end_time > time.time()):
160        for thread in alive_threads[:]:
161          if watcher.IsTimedOut():
162            raise TimeoutError('Timed out waiting for %d of %d threads.' %
163                               (len(alive_threads), len(self._threads)))
164          # Allow the main thread to periodically check for interrupts.
165          thread.join(0.1)
166          if not thread.isAlive():
167            alive_threads.remove(thread)
168      # All threads are allowed to complete before reraising exceptions.
169      for thread in self._threads:
170        thread.ReraiseIfException()
171    finally:
172      self.blocked_parent_thread_group = None
173
174  def IsAlive(self):
175    """Check whether any of the threads are still alive.
176
177    Returns:
178      Whether any of the threads are still alive.
179    """
180    return any(t.isAlive() for t in self._threads)
181
182  def JoinAll(self, watcher=None, timeout=None,
183              error_log_func=logging.critical):
184    """Join all threads.
185
186    Reraises exceptions raised by the child threads and supports breaking
187    immediately on exceptions raised on the main thread. Unfinished threads'
188    stacks will be logged on watchdog timeout.
189
190    Args:
191      watcher: Watchdog object providing the thread timeout. If none is
192          provided, the thread will never be timed out.
193      timeout: An optional number of seconds to wait before timing out the join
194          operation. This will not time out the threads.
195      error_log_func: Logging function when logging errors.
196    """
197    try:
198      self._JoinAll(watcher, timeout)
199    except TimeoutError:
200      error_log_func('Timed out. Dumping threads.')
201      for thread in (t for t in self._threads if t.isAlive()):
202        LogThreadStack(thread, error_log_func=error_log_func)
203      raise
204
205  def GetAllReturnValues(self, watcher=None):
206    """Get all return values, joining all threads if necessary.
207
208    Args:
209      watcher: same as in |JoinAll|. Only used if threads are alive.
210    """
211    if any([t.isAlive() for t in self._threads]):
212      self.JoinAll(watcher)
213    return [t.GetReturnValue() for t in self._threads]
214
215
216def CurrentThreadGroup():
217  """Returns the ReraiserThreadGroup that owns the running thread.
218
219  Returns:
220    The current thread group, otherwise None.
221  """
222  current_thread = threading.current_thread()
223  if isinstance(current_thread, ReraiserThread):
224    return current_thread._thread_group  # pylint: disable=no-member
225  return None
226
227
228def RunAsync(funcs, watcher=None):
229  """Executes the given functions in parallel and returns their results.
230
231  Args:
232    funcs: List of functions to perform on their own threads.
233    watcher: Watchdog object providing timeout, by default waits forever.
234
235  Returns:
236    A list of return values in the order of the given functions.
237  """
238  thread_group = ReraiserThreadGroup(ReraiserThread(f) for f in funcs)
239  thread_group.StartAll(will_block=True)
240  return thread_group.GetAllReturnValues(watcher=watcher)
241