1# Copyright 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Thread and ThreadGroup that reraise exceptions on the main thread.""" 6# pylint: disable=W0212 7 8import logging 9import sys 10import threading 11import time 12import traceback 13 14from devil import base_error 15from devil.utils import watchdog_timer 16 17 18class TimeoutError(base_error.BaseError): 19 """Module-specific timeout exception.""" 20 def __init__(self, message): 21 super(TimeoutError, self).__init__(message) 22 23 24def LogThreadStack(thread, error_log_func=logging.critical): 25 """Log the stack for the given thread. 26 27 Args: 28 thread: a threading.Thread instance. 29 error_log_func: Logging function when logging errors. 30 """ 31 stack = sys._current_frames()[thread.ident] 32 error_log_func('*' * 80) 33 error_log_func('Stack dump for thread %r', thread.name) 34 error_log_func('*' * 80) 35 for filename, lineno, name, line in traceback.extract_stack(stack): 36 error_log_func('File: "%s", line %d, in %s', filename, lineno, name) 37 if line: 38 error_log_func(' %s', line.strip()) 39 error_log_func('*' * 80) 40 41 42class ReraiserThread(threading.Thread): 43 """Thread class that can reraise exceptions.""" 44 45 def __init__(self, func, args=None, kwargs=None, name=None): 46 """Initialize thread. 47 48 Args: 49 func: callable to call on a new thread. 50 args: list of positional arguments for callable, defaults to empty. 51 kwargs: dictionary of keyword arguments for callable, defaults to empty. 52 name: thread name, defaults to the function name. 53 """ 54 if not name: 55 if hasattr(func, '__name__') and func.__name__ != '<lambda>': 56 name = func.__name__ 57 else: 58 name = 'anonymous' 59 super(ReraiserThread, self).__init__(name=name) 60 if not args: 61 args = [] 62 if not kwargs: 63 kwargs = {} 64 self.daemon = True 65 self._func = func 66 self._args = args 67 self._kwargs = kwargs 68 self._ret = None 69 self._exc_info = None 70 self._thread_group = None 71 72 if sys.version_info < (3,): 73 # pylint: disable=exec-used 74 exec('''def ReraiseIfException(self): 75 """Reraise exception if an exception was raised in the thread.""" 76 if self._exc_info: 77 raise self._exc_info[0], self._exc_info[1], self._exc_info[2]''') 78 else: 79 def ReraiseIfException(self): 80 """Reraise exception if an exception was raised in the thread.""" 81 if self._exc_info: 82 raise self._exc_info[1] 83 84 def GetReturnValue(self): 85 """Reraise exception if present, otherwise get the return value.""" 86 self.ReraiseIfException() 87 return self._ret 88 89 # override 90 def run(self): 91 """Overrides Thread.run() to add support for reraising exceptions.""" 92 try: 93 self._ret = self._func(*self._args, **self._kwargs) 94 except: # pylint: disable=W0702 95 self._exc_info = sys.exc_info() 96 97 98class ReraiserThreadGroup(object): 99 """A group of ReraiserThread objects.""" 100 101 def __init__(self, threads=None): 102 """Initialize thread group. 103 104 Args: 105 threads: a list of ReraiserThread objects; defaults to empty. 106 """ 107 self._threads = [] 108 # Set when a thread from one group has called JoinAll on another. It is used 109 # to detect when a there is a TimeoutRetryThread active that links to the 110 # current thread. 111 self.blocked_parent_thread_group = None 112 if threads: 113 for thread in threads: 114 self.Add(thread) 115 116 def Add(self, thread): 117 """Add a thread to the group. 118 119 Args: 120 thread: a ReraiserThread object. 121 """ 122 assert thread._thread_group is None 123 thread._thread_group = self 124 self._threads.append(thread) 125 126 def StartAll(self, will_block=False): 127 """Start all threads. 128 129 Args: 130 will_block: Whether the calling thread will subsequently block on this 131 thread group. Causes the active ReraiserThreadGroup (if there is one) 132 to be marked as blocking on this thread group. 133 """ 134 if will_block: 135 # Multiple threads blocking on the same outer thread should not happen in 136 # practice. 137 assert not self.blocked_parent_thread_group 138 self.blocked_parent_thread_group = CurrentThreadGroup() 139 for thread in self._threads: 140 thread.start() 141 142 def _JoinAll(self, watcher=None, timeout=None): 143 """Join all threads without stack dumps. 144 145 Reraises exceptions raised by the child threads and supports breaking 146 immediately on exceptions raised on the main thread. 147 148 Args: 149 watcher: Watchdog object providing the thread timeout. If none is 150 provided, the thread will never be timed out. 151 timeout: An optional number of seconds to wait before timing out the join 152 operation. This will not time out the threads. 153 """ 154 if watcher is None: 155 watcher = watchdog_timer.WatchdogTimer(None) 156 alive_threads = self._threads[:] 157 end_time = (time.time() + timeout) if timeout else None 158 try: 159 while alive_threads and (end_time is None or end_time > time.time()): 160 for thread in alive_threads[:]: 161 if watcher.IsTimedOut(): 162 raise TimeoutError('Timed out waiting for %d of %d threads.' % 163 (len(alive_threads), len(self._threads))) 164 # Allow the main thread to periodically check for interrupts. 165 thread.join(0.1) 166 if not thread.isAlive(): 167 alive_threads.remove(thread) 168 # All threads are allowed to complete before reraising exceptions. 169 for thread in self._threads: 170 thread.ReraiseIfException() 171 finally: 172 self.blocked_parent_thread_group = None 173 174 def IsAlive(self): 175 """Check whether any of the threads are still alive. 176 177 Returns: 178 Whether any of the threads are still alive. 179 """ 180 return any(t.isAlive() for t in self._threads) 181 182 def JoinAll(self, watcher=None, timeout=None, 183 error_log_func=logging.critical): 184 """Join all threads. 185 186 Reraises exceptions raised by the child threads and supports breaking 187 immediately on exceptions raised on the main thread. Unfinished threads' 188 stacks will be logged on watchdog timeout. 189 190 Args: 191 watcher: Watchdog object providing the thread timeout. If none is 192 provided, the thread will never be timed out. 193 timeout: An optional number of seconds to wait before timing out the join 194 operation. This will not time out the threads. 195 error_log_func: Logging function when logging errors. 196 """ 197 try: 198 self._JoinAll(watcher, timeout) 199 except TimeoutError: 200 error_log_func('Timed out. Dumping threads.') 201 for thread in (t for t in self._threads if t.isAlive()): 202 LogThreadStack(thread, error_log_func=error_log_func) 203 raise 204 205 def GetAllReturnValues(self, watcher=None): 206 """Get all return values, joining all threads if necessary. 207 208 Args: 209 watcher: same as in |JoinAll|. Only used if threads are alive. 210 """ 211 if any([t.isAlive() for t in self._threads]): 212 self.JoinAll(watcher) 213 return [t.GetReturnValue() for t in self._threads] 214 215 216def CurrentThreadGroup(): 217 """Returns the ReraiserThreadGroup that owns the running thread. 218 219 Returns: 220 The current thread group, otherwise None. 221 """ 222 current_thread = threading.current_thread() 223 if isinstance(current_thread, ReraiserThread): 224 return current_thread._thread_group # pylint: disable=no-member 225 return None 226 227 228def RunAsync(funcs, watcher=None): 229 """Executes the given functions in parallel and returns their results. 230 231 Args: 232 funcs: List of functions to perform on their own threads. 233 watcher: Watchdog object providing timeout, by default waits forever. 234 235 Returns: 236 A list of return values in the order of the given functions. 237 """ 238 thread_group = ReraiserThreadGroup(ReraiserThread(f) for f in funcs) 239 thread_group.StartAll(will_block=True) 240 return thread_group.GetAllReturnValues(watcher=watcher) 241