1# Copyright 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""A utility to run functions with timeouts and retries.""" 6# pylint: disable=W0702 7 8import logging 9import threading 10import time 11 12from devil.utils import reraiser_thread 13from devil.utils import watchdog_timer 14 15logger = logging.getLogger(__name__) 16 17 18class TimeoutRetryThreadGroup(reraiser_thread.ReraiserThreadGroup): 19 20 def __init__(self, timeout, threads=None): 21 super(TimeoutRetryThreadGroup, self).__init__(threads) 22 self._watcher = watchdog_timer.WatchdogTimer(timeout) 23 24 def GetWatcher(self): 25 """Returns the watchdog keeping track of this thread's time.""" 26 return self._watcher 27 28 def GetElapsedTime(self): 29 return self._watcher.GetElapsed() 30 31 def GetRemainingTime(self, required=0, suffix=None): 32 """Get the remaining time before the thread times out. 33 34 Useful to send as the |timeout| parameter of async IO operations. 35 36 Args: 37 required: minimum amount of time that will be required to complete, e.g., 38 some sleep or IO operation. 39 msg: error message to show if timing out. 40 41 Returns: 42 The number of seconds remaining before the thread times out, or None 43 if the thread never times out. 44 45 Raises: 46 reraiser_thread.TimeoutError if the remaining time is less than the 47 required time. 48 """ 49 remaining = self._watcher.GetRemaining() 50 if remaining is not None and remaining < required: 51 msg = 'Timeout of %.1f secs expired' % self._watcher.GetTimeout() 52 if suffix: 53 msg += suffix 54 raise reraiser_thread.TimeoutError(msg) 55 return remaining 56 57 58def CurrentTimeoutThreadGroup(): 59 """Returns the thread group that owns or is blocked on the active thread. 60 61 Returns: 62 Returns None if no TimeoutRetryThreadGroup is tracking the current thread. 63 """ 64 thread_group = reraiser_thread.CurrentThreadGroup() 65 while thread_group: 66 if isinstance(thread_group, TimeoutRetryThreadGroup): 67 return thread_group 68 thread_group = thread_group.blocked_parent_thread_group 69 return None 70 71 72def WaitFor(condition, wait_period=5, max_tries=None): 73 """Wait for a condition to become true. 74 75 Repeatedly call the function condition(), with no arguments, until it returns 76 a true value. 77 78 If called within a TimeoutRetryThreadGroup, it cooperates nicely with it. 79 80 Args: 81 condition: function with the condition to check 82 wait_period: number of seconds to wait before retrying to check the 83 condition 84 max_tries: maximum number of checks to make, the default tries forever 85 or until the TimeoutRetryThreadGroup expires. 86 87 Returns: 88 The true value returned by the condition, or None if the condition was 89 not met after max_tries. 90 91 Raises: 92 reraiser_thread.TimeoutError: if the current thread is a 93 TimeoutRetryThreadGroup and the timeout expires. 94 """ 95 condition_name = condition.__name__ 96 timeout_thread_group = CurrentTimeoutThreadGroup() 97 while max_tries is None or max_tries > 0: 98 result = condition() 99 if max_tries is not None: 100 max_tries -= 1 101 msg = ['condition', repr(condition_name), 'met' if result else 'not met'] 102 if timeout_thread_group: 103 # pylint: disable=no-member 104 msg.append('(%.1fs)' % timeout_thread_group.GetElapsedTime()) 105 logger.info(' '.join(msg)) 106 if result: 107 return result 108 if timeout_thread_group: 109 # pylint: disable=no-member 110 timeout_thread_group.GetRemainingTime(wait_period, 111 suffix=' waiting for condition %r' % condition_name) 112 if wait_period: 113 time.sleep(wait_period) 114 return None 115 116 117def AlwaysRetry(_exception): 118 return True 119 120 121def Run(func, timeout, retries, args=None, kwargs=None, desc=None, 122 error_log_func=logging.critical, retry_if_func=AlwaysRetry): 123 """Runs the passed function in a separate thread with timeouts and retries. 124 125 Args: 126 func: the function to be wrapped. 127 timeout: the timeout in seconds for each try. 128 retries: the number of retries. 129 args: list of positional args to pass to |func|. 130 kwargs: dictionary of keyword args to pass to |func|. 131 desc: An optional description of |func| used in logging. If omitted, 132 |func.__name__| will be used. 133 error_log_func: Logging function when logging errors. 134 retry_if_func: Unary callable that takes an exception and returns 135 whether |func| should be retried. Defaults to always retrying. 136 137 Returns: 138 The return value of func(*args, **kwargs). 139 """ 140 if not args: 141 args = [] 142 if not kwargs: 143 kwargs = {} 144 if not desc: 145 desc = func.__name__ 146 147 num_try = 1 148 while True: 149 thread_name = 'TimeoutThread-%d-for-%s' % (num_try, 150 threading.current_thread().name) 151 child_thread = reraiser_thread.ReraiserThread(lambda: func(*args, **kwargs), 152 name=thread_name) 153 try: 154 thread_group = TimeoutRetryThreadGroup(timeout, threads=[child_thread]) 155 thread_group.StartAll(will_block=True) 156 while True: 157 thread_group.JoinAll(watcher=thread_group.GetWatcher(), timeout=60, 158 error_log_func=error_log_func) 159 if thread_group.IsAlive(): 160 logger.info('Still working on %s', desc) 161 else: 162 return thread_group.GetAllReturnValues()[0] 163 except reraiser_thread.TimeoutError as e: 164 # Timeouts already get their stacks logged. 165 if num_try > retries or not retry_if_func(e): 166 raise 167 # Do not catch KeyboardInterrupt. 168 except Exception as e: # pylint: disable=broad-except 169 if num_try > retries or not retry_if_func(e): 170 raise 171 error_log_func( 172 '(%s) Exception on %s, attempt %d of %d: %r', 173 thread_name, desc, num_try, retries + 1, e) 174 num_try += 1 175