1# Copyright 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Thread and ThreadGroup that reraise exceptions on the main thread.""" 5# pylint: disable=W0212 6 7import logging 8import sys 9import threading 10import time 11import traceback 12 13from devil import base_error 14from devil.utils import watchdog_timer 15 16 17class TimeoutError(base_error.BaseError): 18 """Module-specific timeout exception.""" 19 20 def __init__(self, message): 21 super(TimeoutError, self).__init__(message) 22 23 24def LogThreadStack(thread, error_log_func=logging.critical): 25 """Log the stack for the given thread. 26 27 Args: 28 thread: a threading.Thread instance. 29 error_log_func: Logging function when logging errors. 30 """ 31 stack = sys._current_frames()[thread.ident] 32 error_log_func('*' * 80) 33 error_log_func('Stack dump for thread %r', thread.name) 34 error_log_func('*' * 80) 35 for filename, lineno, name, line in traceback.extract_stack(stack): 36 error_log_func('File: "%s", line %d, in %s', filename, lineno, name) 37 if line: 38 error_log_func(' %s', line.strip()) 39 error_log_func('*' * 80) 40 41 42class ReraiserThread(threading.Thread): 43 """Thread class that can reraise exceptions.""" 44 45 def __init__(self, func, args=None, kwargs=None, name=None): 46 """Initialize thread. 47 48 Args: 49 func: callable to call on a new thread. 50 args: list of positional arguments for callable, defaults to empty. 51 kwargs: dictionary of keyword arguments for callable, defaults to empty. 52 name: thread name, defaults to the function name. 53 """ 54 if not name: 55 if hasattr(func, '__name__') and func.__name__ != '<lambda>': 56 name = func.__name__ 57 else: 58 name = 'anonymous' 59 super(ReraiserThread, self).__init__(name=name) 60 if not args: 61 args = [] 62 if not kwargs: 63 kwargs = {} 64 self.daemon = True 65 self._func = func 66 self._args = args 67 self._kwargs = kwargs 68 self._ret = None 69 self._exc_info = None 70 self._thread_group = None 71 72 if sys.version_info < (3, ): 73 # pylint: disable=exec-used 74 exec ('''def ReraiseIfException(self): 75 """Reraise exception if an exception was raised in the thread.""" 76 if self._exc_info: 77 raise self._exc_info[0], self._exc_info[1], self._exc_info[2]''') 78 else: 79 80 def ReraiseIfException(self): 81 """Reraise exception if an exception was raised in the thread.""" 82 if self._exc_info: 83 raise self._exc_info[1] 84 85 def GetReturnValue(self): 86 """Reraise exception if present, otherwise get the return value.""" 87 self.ReraiseIfException() 88 return self._ret 89 90 # override 91 def run(self): 92 """Overrides Thread.run() to add support for reraising exceptions.""" 93 try: 94 self._ret = self._func(*self._args, **self._kwargs) 95 except: # pylint: disable=W0702 96 self._exc_info = sys.exc_info() 97 98 99class ReraiserThreadGroup(object): 100 """A group of ReraiserThread objects.""" 101 102 def __init__(self, threads=None): 103 """Initialize thread group. 104 105 Args: 106 threads: a list of ReraiserThread objects; defaults to empty. 107 """ 108 self._threads = [] 109 # Set when a thread from one group has called JoinAll on another. It is used 110 # to detect when a there is a TimeoutRetryThread active that links to the 111 # current thread. 112 self.blocked_parent_thread_group = None 113 if threads: 114 for thread in threads: 115 self.Add(thread) 116 117 def Add(self, thread): 118 """Add a thread to the group. 119 120 Args: 121 thread: a ReraiserThread object. 122 """ 123 assert thread._thread_group is None 124 thread._thread_group = self 125 self._threads.append(thread) 126 127 def StartAll(self, will_block=False): 128 """Start all threads. 129 130 Args: 131 will_block: Whether the calling thread will subsequently block on this 132 thread group. Causes the active ReraiserThreadGroup (if there is one) 133 to be marked as blocking on this thread group. 134 """ 135 if will_block: 136 # Multiple threads blocking on the same outer thread should not happen in 137 # practice. 138 assert not self.blocked_parent_thread_group 139 self.blocked_parent_thread_group = CurrentThreadGroup() 140 for thread in self._threads: 141 thread.start() 142 143 def _JoinAll(self, watcher=None, timeout=None): 144 """Join all threads without stack dumps. 145 146 Reraises exceptions raised by the child threads and supports breaking 147 immediately on exceptions raised on the main thread. 148 149 Args: 150 watcher: Watchdog object providing the thread timeout. If none is 151 provided, the thread will never be timed out. 152 timeout: An optional number of seconds to wait before timing out the join 153 operation. This will not time out the threads. 154 """ 155 if watcher is None: 156 watcher = watchdog_timer.WatchdogTimer(None) 157 alive_threads = self._threads[:] 158 end_time = (time.time() + timeout) if timeout else None 159 try: 160 while alive_threads and (end_time is None or end_time > time.time()): 161 for thread in alive_threads[:]: 162 if watcher.IsTimedOut(): 163 raise TimeoutError('Timed out waiting for %d of %d threads.' % 164 (len(alive_threads), len(self._threads))) 165 # Allow the main thread to periodically check for interrupts. 166 thread.join(0.1) 167 if not thread.isAlive(): 168 alive_threads.remove(thread) 169 # All threads are allowed to complete before reraising exceptions. 170 for thread in self._threads: 171 thread.ReraiseIfException() 172 finally: 173 self.blocked_parent_thread_group = None 174 175 def IsAlive(self): 176 """Check whether any of the threads are still alive. 177 178 Returns: 179 Whether any of the threads are still alive. 180 """ 181 return any(t.isAlive() for t in self._threads) 182 183 def JoinAll(self, watcher=None, timeout=None, 184 error_log_func=logging.critical): 185 """Join all threads. 186 187 Reraises exceptions raised by the child threads and supports breaking 188 immediately on exceptions raised on the main thread. Unfinished threads' 189 stacks will be logged on watchdog timeout. 190 191 Args: 192 watcher: Watchdog object providing the thread timeout. If none is 193 provided, the thread will never be timed out. 194 timeout: An optional number of seconds to wait before timing out the join 195 operation. This will not time out the threads. 196 error_log_func: Logging function when logging errors. 197 """ 198 try: 199 self._JoinAll(watcher, timeout) 200 except TimeoutError: 201 error_log_func('Timed out. Dumping threads.') 202 for thread in (t for t in self._threads if t.isAlive()): 203 LogThreadStack(thread, error_log_func=error_log_func) 204 raise 205 206 def GetAllReturnValues(self, watcher=None): 207 """Get all return values, joining all threads if necessary. 208 209 Args: 210 watcher: same as in |JoinAll|. Only used if threads are alive. 211 """ 212 if any([t.isAlive() for t in self._threads]): 213 self.JoinAll(watcher) 214 return [t.GetReturnValue() for t in self._threads] 215 216 217def CurrentThreadGroup(): 218 """Returns the ReraiserThreadGroup that owns the running thread. 219 220 Returns: 221 The current thread group, otherwise None. 222 """ 223 current_thread = threading.current_thread() 224 if isinstance(current_thread, ReraiserThread): 225 return current_thread._thread_group # pylint: disable=no-member 226 return None 227 228 229def RunAsync(funcs, watcher=None): 230 """Executes the given functions in parallel and returns their results. 231 232 Args: 233 funcs: List of functions to perform on their own threads. 234 watcher: Watchdog object providing timeout, by default waits forever. 235 236 Returns: 237 A list of return values in the order of the given functions. 238 """ 239 thread_group = ReraiserThreadGroup(ReraiserThread(f) for f in funcs) 240 thread_group.StartAll(will_block=True) 241 return thread_group.GetAllReturnValues(watcher=watcher) 242