1# Copyright 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Thread and ThreadGroup that reraise exceptions on the main thread.""" 6# pylint: disable=W0212 7 8import logging 9import sys 10import threading 11import traceback 12 13from pylib.utils import watchdog_timer 14 15 16class TimeoutError(Exception): 17 """Module-specific timeout exception.""" 18 pass 19 20 21def LogThreadStack(thread): 22 """Log the stack for the given thread. 23 24 Args: 25 thread: a threading.Thread instance. 26 """ 27 stack = sys._current_frames()[thread.ident] 28 logging.critical('*' * 80) 29 logging.critical('Stack dump for thread \'%s\'', thread.name) 30 logging.critical('*' * 80) 31 for filename, lineno, name, line in traceback.extract_stack(stack): 32 logging.critical('File: "%s", line %d, in %s', filename, lineno, name) 33 if line: 34 logging.critical(' %s', line.strip()) 35 logging.critical('*' * 80) 36 37 38class ReraiserThread(threading.Thread): 39 """Thread class that can reraise exceptions.""" 40 41 def __init__(self, func, args=None, kwargs=None, name=None): 42 """Initialize thread. 43 44 Args: 45 func: callable to call on a new thread. 46 args: list of positional arguments for callable, defaults to empty. 47 kwargs: dictionary of keyword arguments for callable, defaults to empty. 48 name: thread name, defaults to Thread-N. 49 """ 50 super(ReraiserThread, self).__init__(name=name) 51 if not args: 52 args = [] 53 if not kwargs: 54 kwargs = {} 55 self.daemon = True 56 self._func = func 57 self._args = args 58 self._kwargs = kwargs 59 self._ret = None 60 self._exc_info = None 61 62 def ReraiseIfException(self): 63 """Reraise exception if an exception was raised in the thread.""" 64 if self._exc_info: 65 raise self._exc_info[0], self._exc_info[1], self._exc_info[2] 66 67 def GetReturnValue(self): 68 """Reraise exception if present, otherwise get the return value.""" 69 self.ReraiseIfException() 70 return self._ret 71 72 #override 73 def run(self): 74 """Overrides Thread.run() to add support for reraising exceptions.""" 75 try: 76 self._ret = self._func(*self._args, **self._kwargs) 77 except: 78 self._exc_info = sys.exc_info() 79 raise 80 81 82class ReraiserThreadGroup(object): 83 """A group of ReraiserThread objects.""" 84 85 def __init__(self, threads=None): 86 """Initialize thread group. 87 88 Args: 89 threads: a list of ReraiserThread objects; defaults to empty. 90 """ 91 if not threads: 92 threads = [] 93 self._threads = threads 94 95 def Add(self, thread): 96 """Add a thread to the group. 97 98 Args: 99 thread: a ReraiserThread object. 100 """ 101 self._threads.append(thread) 102 103 def StartAll(self): 104 """Start all threads.""" 105 for thread in self._threads: 106 thread.start() 107 108 def _JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)): 109 """Join all threads without stack dumps. 110 111 Reraises exceptions raised by the child threads and supports breaking 112 immediately on exceptions raised on the main thread. 113 114 Args: 115 watcher: Watchdog object providing timeout, by default waits forever. 116 """ 117 alive_threads = self._threads[:] 118 while alive_threads: 119 for thread in alive_threads[:]: 120 if watcher.IsTimedOut(): 121 raise TimeoutError('Timed out waiting for %d of %d threads.' % 122 (len(alive_threads), len(self._threads))) 123 # Allow the main thread to periodically check for interrupts. 124 thread.join(0.1) 125 if not thread.isAlive(): 126 alive_threads.remove(thread) 127 # All threads are allowed to complete before reraising exceptions. 128 for thread in self._threads: 129 thread.ReraiseIfException() 130 131 def JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)): 132 """Join all threads. 133 134 Reraises exceptions raised by the child threads and supports breaking 135 immediately on exceptions raised on the main thread. Unfinished threads' 136 stacks will be logged on watchdog timeout. 137 138 Args: 139 watcher: Watchdog object providing timeout, by default waits forever. 140 """ 141 try: 142 self._JoinAll(watcher) 143 except TimeoutError: 144 for thread in (t for t in self._threads if t.isAlive()): 145 LogThreadStack(thread) 146 raise 147 148 def GetAllReturnValues(self, watcher=watchdog_timer.WatchdogTimer(None)): 149 """Get all return values, joining all threads if necessary. 150 151 Args: 152 watcher: same as in |JoinAll|. Only used if threads are alive. 153 """ 154 if any([t.isAlive() for t in self._threads]): 155 self.JoinAll(watcher) 156 return [t.GetReturnValue() for t in self._threads] 157 158