• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Thread and ThreadGroup that reraise exceptions on the main thread."""
6# pylint: disable=W0212
7
8import logging
9import sys
10import threading
11import traceback
12
13from pylib.utils import watchdog_timer
14
15
16class TimeoutError(Exception):
17  """Module-specific timeout exception."""
18  pass
19
20
21def LogThreadStack(thread):
22  """Log the stack for the given thread.
23
24  Args:
25    thread: a threading.Thread instance.
26  """
27  stack = sys._current_frames()[thread.ident]
28  logging.critical('*' * 80)
29  logging.critical('Stack dump for thread \'%s\'', thread.name)
30  logging.critical('*' * 80)
31  for filename, lineno, name, line in traceback.extract_stack(stack):
32    logging.critical('File: "%s", line %d, in %s', filename, lineno, name)
33    if line:
34      logging.critical('  %s', line.strip())
35  logging.critical('*' * 80)
36
37
38class ReraiserThread(threading.Thread):
39  """Thread class that can reraise exceptions."""
40
41  def __init__(self, func, args=None, kwargs=None, name=None):
42    """Initialize thread.
43
44    Args:
45      func: callable to call on a new thread.
46      args: list of positional arguments for callable, defaults to empty.
47      kwargs: dictionary of keyword arguments for callable, defaults to empty.
48      name: thread name, defaults to Thread-N.
49    """
50    super(ReraiserThread, self).__init__(name=name)
51    if not args:
52      args = []
53    if not kwargs:
54      kwargs = {}
55    self.daemon = True
56    self._func = func
57    self._args = args
58    self._kwargs = kwargs
59    self._ret = None
60    self._exc_info = None
61
62  def ReraiseIfException(self):
63    """Reraise exception if an exception was raised in the thread."""
64    if self._exc_info:
65      raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
66
67  def GetReturnValue(self):
68    """Reraise exception if present, otherwise get the return value."""
69    self.ReraiseIfException()
70    return self._ret
71
72  #override
73  def run(self):
74    """Overrides Thread.run() to add support for reraising exceptions."""
75    try:
76      self._ret = self._func(*self._args, **self._kwargs)
77    except: # pylint: disable=W0702
78      self._exc_info = sys.exc_info()
79
80
81class ReraiserThreadGroup(object):
82  """A group of ReraiserThread objects."""
83
84  def __init__(self, threads=None):
85    """Initialize thread group.
86
87    Args:
88      threads: a list of ReraiserThread objects; defaults to empty.
89    """
90    if not threads:
91      threads = []
92    self._threads = threads
93
94  def Add(self, thread):
95    """Add a thread to the group.
96
97    Args:
98      thread: a ReraiserThread object.
99    """
100    self._threads.append(thread)
101
102  def StartAll(self):
103    """Start all threads."""
104    for thread in self._threads:
105      thread.start()
106
107  def _JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)):
108    """Join all threads without stack dumps.
109
110    Reraises exceptions raised by the child threads and supports breaking
111    immediately on exceptions raised on the main thread.
112
113    Args:
114      watcher: Watchdog object providing timeout, by default waits forever.
115    """
116    alive_threads = self._threads[:]
117    while alive_threads:
118      for thread in alive_threads[:]:
119        if watcher.IsTimedOut():
120          raise TimeoutError('Timed out waiting for %d of %d threads.' %
121                             (len(alive_threads), len(self._threads)))
122        # Allow the main thread to periodically check for interrupts.
123        thread.join(0.1)
124        if not thread.isAlive():
125          alive_threads.remove(thread)
126    # All threads are allowed to complete before reraising exceptions.
127    for thread in self._threads:
128      thread.ReraiseIfException()
129
130  def JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)):
131    """Join all threads.
132
133    Reraises exceptions raised by the child threads and supports breaking
134    immediately on exceptions raised on the main thread. Unfinished threads'
135    stacks will be logged on watchdog timeout.
136
137    Args:
138      watcher: Watchdog object providing timeout, by default waits forever.
139    """
140    try:
141      self._JoinAll(watcher)
142    except TimeoutError:
143      for thread in (t for t in self._threads if t.isAlive()):
144        LogThreadStack(thread)
145      raise
146
147  def GetAllReturnValues(self, watcher=watchdog_timer.WatchdogTimer(None)):
148    """Get all return values, joining all threads if necessary.
149
150    Args:
151      watcher: same as in |JoinAll|. Only used if threads are alive.
152    """
153    if any([t.isAlive() for t in self._threads]):
154      self.JoinAll(watcher)
155    return [t.GetReturnValue() for t in self._threads]
156
157