1# Copyright 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Thread and ThreadGroup that reraise exceptions on the main thread.""" 6# pylint: disable=W0212 7 8import logging 9import sys 10import threading 11import traceback 12 13from pylib.utils import watchdog_timer 14 15 16class TimeoutError(Exception): 17 """Module-specific timeout exception.""" 18 pass 19 20 21def LogThreadStack(thread): 22 """Log the stack for the given thread. 23 24 Args: 25 thread: a threading.Thread instance. 26 """ 27 stack = sys._current_frames()[thread.ident] 28 logging.critical('*' * 80) 29 logging.critical('Stack dump for thread \'%s\'', thread.name) 30 logging.critical('*' * 80) 31 for filename, lineno, name, line in traceback.extract_stack(stack): 32 logging.critical('File: "%s", line %d, in %s', filename, lineno, name) 33 if line: 34 logging.critical(' %s', line.strip()) 35 logging.critical('*' * 80) 36 37 38class ReraiserThread(threading.Thread): 39 """Thread class that can reraise exceptions.""" 40 41 def __init__(self, func, args=None, kwargs=None, name=None): 42 """Initialize thread. 43 44 Args: 45 func: callable to call on a new thread. 46 args: list of positional arguments for callable, defaults to empty. 47 kwargs: dictionary of keyword arguments for callable, defaults to empty. 48 name: thread name, defaults to Thread-N. 49 """ 50 super(ReraiserThread, self).__init__(name=name) 51 if not args: 52 args = [] 53 if not kwargs: 54 kwargs = {} 55 self.daemon = True 56 self._func = func 57 self._args = args 58 self._kwargs = kwargs 59 self._ret = None 60 self._exc_info = None 61 62 def ReraiseIfException(self): 63 """Reraise exception if an exception was raised in the thread.""" 64 if self._exc_info: 65 raise self._exc_info[0], self._exc_info[1], self._exc_info[2] 66 67 def GetReturnValue(self): 68 """Reraise exception if present, otherwise get the return value.""" 69 self.ReraiseIfException() 70 return self._ret 71 72 #override 73 def run(self): 74 """Overrides Thread.run() to add support for reraising exceptions.""" 75 try: 76 self._ret = self._func(*self._args, **self._kwargs) 77 except: # pylint: disable=W0702 78 self._exc_info = sys.exc_info() 79 80 81class ReraiserThreadGroup(object): 82 """A group of ReraiserThread objects.""" 83 84 def __init__(self, threads=None): 85 """Initialize thread group. 86 87 Args: 88 threads: a list of ReraiserThread objects; defaults to empty. 89 """ 90 if not threads: 91 threads = [] 92 self._threads = threads 93 94 def Add(self, thread): 95 """Add a thread to the group. 96 97 Args: 98 thread: a ReraiserThread object. 99 """ 100 self._threads.append(thread) 101 102 def StartAll(self): 103 """Start all threads.""" 104 for thread in self._threads: 105 thread.start() 106 107 def _JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)): 108 """Join all threads without stack dumps. 109 110 Reraises exceptions raised by the child threads and supports breaking 111 immediately on exceptions raised on the main thread. 112 113 Args: 114 watcher: Watchdog object providing timeout, by default waits forever. 115 """ 116 alive_threads = self._threads[:] 117 while alive_threads: 118 for thread in alive_threads[:]: 119 if watcher.IsTimedOut(): 120 raise TimeoutError('Timed out waiting for %d of %d threads.' % 121 (len(alive_threads), len(self._threads))) 122 # Allow the main thread to periodically check for interrupts. 123 thread.join(0.1) 124 if not thread.isAlive(): 125 alive_threads.remove(thread) 126 # All threads are allowed to complete before reraising exceptions. 127 for thread in self._threads: 128 thread.ReraiseIfException() 129 130 def JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)): 131 """Join all threads. 132 133 Reraises exceptions raised by the child threads and supports breaking 134 immediately on exceptions raised on the main thread. Unfinished threads' 135 stacks will be logged on watchdog timeout. 136 137 Args: 138 watcher: Watchdog object providing timeout, by default waits forever. 139 """ 140 try: 141 self._JoinAll(watcher) 142 except TimeoutError: 143 for thread in (t for t in self._threads if t.isAlive()): 144 LogThreadStack(thread) 145 raise 146 147 def GetAllReturnValues(self, watcher=watchdog_timer.WatchdogTimer(None)): 148 """Get all return values, joining all threads if necessary. 149 150 Args: 151 watcher: same as in |JoinAll|. Only used if threads are alive. 152 """ 153 if any([t.isAlive() for t in self._threads]): 154 self.JoinAll(watcher) 155 return [t.GetReturnValue() for t in self._threads] 156 157