• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Thread and ThreadGroup that reraise exceptions on the main thread."""
6# pylint: disable=W0212
7
8import logging
9import sys
10import threading
11import traceback
12
13from pylib.utils import watchdog_timer
14
15
16class TimeoutError(Exception):
17  """Module-specific timeout exception."""
18  pass
19
20
21def LogThreadStack(thread):
22  """Log the stack for the given thread.
23
24  Args:
25    thread: a threading.Thread instance.
26  """
27  stack = sys._current_frames()[thread.ident]
28  logging.critical('*' * 80)
29  logging.critical('Stack dump for thread \'%s\'', thread.name)
30  logging.critical('*' * 80)
31  for filename, lineno, name, line in traceback.extract_stack(stack):
32    logging.critical('File: "%s", line %d, in %s', filename, lineno, name)
33    if line:
34      logging.critical('  %s', line.strip())
35  logging.critical('*' * 80)
36
37
38class ReraiserThread(threading.Thread):
39  """Thread class that can reraise exceptions."""
40
41  def __init__(self, func, args=None, kwargs=None, name=None):
42    """Initialize thread.
43
44    Args:
45      func: callable to call on a new thread.
46      args: list of positional arguments for callable, defaults to empty.
47      kwargs: dictionary of keyword arguments for callable, defaults to empty.
48      name: thread name, defaults to Thread-N.
49    """
50    super(ReraiserThread, self).__init__(name=name)
51    if not args:
52      args = []
53    if not kwargs:
54      kwargs = {}
55    self.daemon = True
56    self._func = func
57    self._args = args
58    self._kwargs = kwargs
59    self._ret = None
60    self._exc_info = None
61
62  def ReraiseIfException(self):
63    """Reraise exception if an exception was raised in the thread."""
64    if self._exc_info:
65      raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
66
67  def GetReturnValue(self):
68    """Reraise exception if present, otherwise get the return value."""
69    self.ReraiseIfException()
70    return self._ret
71
72  #override
73  def run(self):
74    """Overrides Thread.run() to add support for reraising exceptions."""
75    try:
76      self._ret = self._func(*self._args, **self._kwargs)
77    except:
78      self._exc_info = sys.exc_info()
79      raise
80
81
82class ReraiserThreadGroup(object):
83  """A group of ReraiserThread objects."""
84
85  def __init__(self, threads=None):
86    """Initialize thread group.
87
88    Args:
89      threads: a list of ReraiserThread objects; defaults to empty.
90    """
91    if not threads:
92      threads = []
93    self._threads = threads
94
95  def Add(self, thread):
96    """Add a thread to the group.
97
98    Args:
99      thread: a ReraiserThread object.
100    """
101    self._threads.append(thread)
102
103  def StartAll(self):
104    """Start all threads."""
105    for thread in self._threads:
106      thread.start()
107
108  def _JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)):
109    """Join all threads without stack dumps.
110
111    Reraises exceptions raised by the child threads and supports breaking
112    immediately on exceptions raised on the main thread.
113
114    Args:
115      watcher: Watchdog object providing timeout, by default waits forever.
116    """
117    alive_threads = self._threads[:]
118    while alive_threads:
119      for thread in alive_threads[:]:
120        if watcher.IsTimedOut():
121          raise TimeoutError('Timed out waiting for %d of %d threads.' %
122                             (len(alive_threads), len(self._threads)))
123        # Allow the main thread to periodically check for interrupts.
124        thread.join(0.1)
125        if not thread.isAlive():
126          alive_threads.remove(thread)
127    # All threads are allowed to complete before reraising exceptions.
128    for thread in self._threads:
129      thread.ReraiseIfException()
130
131  def JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)):
132    """Join all threads.
133
134    Reraises exceptions raised by the child threads and supports breaking
135    immediately on exceptions raised on the main thread. Unfinished threads'
136    stacks will be logged on watchdog timeout.
137
138    Args:
139      watcher: Watchdog object providing timeout, by default waits forever.
140    """
141    try:
142      self._JoinAll(watcher)
143    except TimeoutError:
144      for thread in (t for t in self._threads if t.isAlive()):
145        LogThreadStack(thread)
146      raise
147
148  def GetAllReturnValues(self, watcher=watchdog_timer.WatchdogTimer(None)):
149    """Get all return values, joining all threads if necessary.
150
151    Args:
152      watcher: same as in |JoinAll|. Only used if threads are alive.
153    """
154    if any([t.isAlive() for t in self._threads]):
155      self.JoinAll(watcher)
156    return [t.GetReturnValue() for t in self._threads]
157
158