1# Copyright 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""A utility to run functions with timeouts and retries.""" 5# pylint: disable=W0702 6 7import logging 8import threading 9import time 10 11from devil.utils import reraiser_thread 12from devil.utils import watchdog_timer 13 14logger = logging.getLogger(__name__) 15 16 17class TimeoutRetryThreadGroup(reraiser_thread.ReraiserThreadGroup): 18 def __init__(self, timeout, threads=None): 19 super(TimeoutRetryThreadGroup, self).__init__(threads) 20 self._watcher = watchdog_timer.WatchdogTimer(timeout) 21 22 def GetWatcher(self): 23 """Returns the watchdog keeping track of this thread's time.""" 24 return self._watcher 25 26 def GetElapsedTime(self): 27 return self._watcher.GetElapsed() 28 29 def GetRemainingTime(self, required=0, suffix=None): 30 """Get the remaining time before the thread times out. 31 32 Useful to send as the |timeout| parameter of async IO operations. 33 34 Args: 35 required: minimum amount of time that will be required to complete, e.g., 36 some sleep or IO operation. 37 msg: error message to show if timing out. 38 39 Returns: 40 The number of seconds remaining before the thread times out, or None 41 if the thread never times out. 42 43 Raises: 44 reraiser_thread.TimeoutError if the remaining time is less than the 45 required time. 46 """ 47 remaining = self._watcher.GetRemaining() 48 if remaining is not None and remaining < required: 49 msg = 'Timeout of %.1f secs expired' % self._watcher.GetTimeout() 50 if suffix: 51 msg += suffix 52 raise reraiser_thread.TimeoutError(msg) 53 return remaining 54 55 56def CurrentTimeoutThreadGroup(): 57 """Returns the thread group that owns or is blocked on the active thread. 58 59 Returns: 60 Returns None if no TimeoutRetryThreadGroup is tracking the current thread. 61 """ 62 thread_group = reraiser_thread.CurrentThreadGroup() 63 while thread_group: 64 if isinstance(thread_group, TimeoutRetryThreadGroup): 65 return thread_group 66 thread_group = thread_group.blocked_parent_thread_group 67 return None 68 69 70def WaitFor(condition, wait_period=5, max_tries=None): 71 """Wait for a condition to become true. 72 73 Repeatedly call the function condition(), with no arguments, until it returns 74 a true value. 75 76 If called within a TimeoutRetryThreadGroup, it cooperates nicely with it. 77 78 Args: 79 condition: function with the condition to check 80 wait_period: number of seconds to wait before retrying to check the 81 condition 82 max_tries: maximum number of checks to make, the default tries forever 83 or until the TimeoutRetryThreadGroup expires. 84 85 Returns: 86 The true value returned by the condition, or None if the condition was 87 not met after max_tries. 88 89 Raises: 90 reraiser_thread.TimeoutError: if the current thread is a 91 TimeoutRetryThreadGroup and the timeout expires. 92 """ 93 condition_name = condition.__name__ 94 timeout_thread_group = CurrentTimeoutThreadGroup() 95 while max_tries is None or max_tries > 0: 96 result = condition() 97 if max_tries is not None: 98 max_tries -= 1 99 msg = ['condition', repr(condition_name), 'met' if result else 'not met'] 100 if timeout_thread_group: 101 # pylint: disable=no-member 102 msg.append('(%.1fs)' % timeout_thread_group.GetElapsedTime()) 103 logger.info(' '.join(msg)) 104 if result: 105 return result 106 if timeout_thread_group: 107 # pylint: disable=no-member 108 timeout_thread_group.GetRemainingTime( 109 wait_period, suffix=' waiting for condition %r' % condition_name) 110 if wait_period: 111 time.sleep(wait_period) 112 return None 113 114 115def AlwaysRetry(_exception): 116 return True 117 118 119def Run(func, 120 timeout, 121 retries, 122 args=None, 123 kwargs=None, 124 desc=None, 125 error_log_func=logging.critical, 126 retry_if_func=AlwaysRetry): 127 """Runs the passed function in a separate thread with timeouts and retries. 128 129 Args: 130 func: the function to be wrapped. 131 timeout: the timeout in seconds for each try. 132 retries: the number of retries. 133 args: list of positional args to pass to |func|. 134 kwargs: dictionary of keyword args to pass to |func|. 135 desc: An optional description of |func| used in logging. If omitted, 136 |func.__name__| will be used. 137 error_log_func: Logging function when logging errors. 138 retry_if_func: Unary callable that takes an exception and returns 139 whether |func| should be retried. Defaults to always retrying. 140 141 Returns: 142 The return value of func(*args, **kwargs). 143 """ 144 if not args: 145 args = [] 146 if not kwargs: 147 kwargs = {} 148 if not desc: 149 desc = func.__name__ 150 151 num_try = 1 152 while True: 153 thread_name = 'TimeoutThread-%d-for-%s' % (num_try, 154 threading.current_thread().name) 155 child_thread = reraiser_thread.ReraiserThread( 156 lambda: func(*args, **kwargs), name=thread_name) 157 try: 158 thread_group = TimeoutRetryThreadGroup(timeout, threads=[child_thread]) 159 thread_group.StartAll(will_block=True) 160 while True: 161 thread_group.JoinAll( 162 watcher=thread_group.GetWatcher(), 163 timeout=60, 164 error_log_func=error_log_func) 165 if thread_group.IsAlive(): 166 logger.info('Still working on %s', desc) 167 else: 168 return thread_group.GetAllReturnValues()[0] 169 except reraiser_thread.TimeoutError as e: 170 # Timeouts already get their stacks logged. 171 if num_try > retries or not retry_if_func(e): 172 raise 173 # Do not catch KeyboardInterrupt. 174 except Exception as e: # pylint: disable=broad-except 175 if num_try > retries or not retry_if_func(e): 176 raise 177 error_log_func('(%s) Exception on %s, attempt %d of %d: %r', thread_name, 178 desc, num_try, retries + 1, e) 179 num_try += 1 180