1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import math 7import threading 8 9import common 10from autotest_lib.client.common_lib import env 11from autotest_lib.client.common_lib import error 12from autotest_lib.client.common_lib import utils 13from autotest_lib.client.common_lib.cros import retry 14from autotest_lib.frontend.afe.json_rpc import proxy 15from autotest_lib.server import frontend 16try: 17 from chromite.lib import retry_util 18 from chromite.lib import timeout_util 19except ImportError: 20 logging.warn('Unable to import chromite.') 21 retry_util = None 22 timeout_util = None 23 24try: 25 from chromite.lib import metrics 26except ImportError: 27 logging.warn('Unable to import metrics from chromite.') 28 metrics = utils.metrics_mock 29 30 31def convert_timeout_to_retry(backoff, timeout_min, delay_sec): 32 """Compute the number of retry attempts for use with chromite.retry_util. 33 34 @param backoff: The exponential backoff factor. 35 @param timeout_min: The maximum amount of time (in minutes) to sleep. 36 @param delay_sec: The amount to sleep (in seconds) between each attempt. 37 38 @return: The number of retry attempts in the case of exponential backoff. 39 """ 40 # Estimate the max_retry in the case of exponential backoff: 41 # => total_sleep = sleep*sum(r=0..max_retry-1, backoff^r) 42 # => total_sleep = sleep( (1-backoff^max_retry) / (1-backoff) ) 43 # => max_retry*ln(backoff) = ln(1-(total_sleep/sleep)*(1-backoff)) 44 # => max_retry = ln(1-(total_sleep/sleep)*(1-backoff))/ln(backoff) 45 total_sleep = timeout_min * 60 46 numerator = math.log10(1-(total_sleep/delay_sec)*(1-backoff)) 47 denominator = math.log10(backoff) 48 return int(math.ceil(numerator/denominator)) 49 50 51class RetryingAFE(frontend.AFE): 52 """Wrapper around frontend.AFE that retries all RPCs. 53 54 Timeout for retries and delay between retries are configurable. 55 """ 56 def __init__(self, timeout_min=30, delay_sec=10, **dargs): 57 """Constructor 58 59 @param timeout_min: timeout in minutes until giving up. 60 @param delay_sec: pre-jittered delay between retries in seconds. 61 """ 62 self.timeout_min = timeout_min 63 self.delay_sec = delay_sec 64 super(RetryingAFE, self).__init__(**dargs) 65 66 67 def set_timeout(self, timeout_min): 68 """Set timeout minutes for the AFE server. 69 70 @param timeout_min: The timeout minutes for AFE server. 71 """ 72 self.timeout_min = timeout_min 73 74 75 def run(self, call, **dargs): 76 """Method for running RPC call. 77 78 @param call: A string RPC call. 79 @param dargs: the parameters of the RPC call. 80 """ 81 if retry_util is None: 82 raise ImportError('Unable to import chromite. Please consider to ' 83 'run build_externals to build site packages.') 84 # exc_retry: We retry if this exception is raised. 85 # blacklist: Exceptions that we raise immediately if caught. 86 exc_retry = Exception 87 blacklist = (ImportError, error.RPCException, proxy.JSONRPCException, 88 timeout_util.TimeoutError, error.ControlFileNotFound) 89 backoff = 2 90 max_retry = convert_timeout_to_retry(backoff, self.timeout_min, 91 self.delay_sec) 92 93 def _run(self, call, **dargs): 94 return super(RetryingAFE, self).run(call, **dargs) 95 96 def handler(exc): 97 """Check if exc is an exc_retry or if it's blacklisted. 98 99 @param exc: An exception. 100 101 @return: True if exc is an exc_retry and is not 102 blacklisted. False otherwise. 103 """ 104 is_exc_to_check = isinstance(exc, exc_retry) 105 is_blacklisted = isinstance(exc, blacklist) 106 return is_exc_to_check and not is_blacklisted 107 108 # If the call is not in main thread, signal can't be used to abort the 109 # call. In that case, use a basic retry which does not enforce timeout 110 # if the process hangs. 111 @retry.retry(Exception, timeout_min=self.timeout_min, 112 delay_sec=self.delay_sec, 113 blacklist=[ImportError, error.RPCException, 114 proxy.ValidationError]) 115 def _run_in_child_thread(self, call, **dargs): 116 return super(RetryingAFE, self).run(call, **dargs) 117 118 if isinstance(threading.current_thread(), threading._MainThread): 119 # Set the keyword argument for GenericRetry 120 dargs['sleep'] = self.delay_sec 121 dargs['backoff_factor'] = backoff 122 # timeout_util.Timeout fundamentally relies on sigalrm, and doesn't 123 # work at all in wsgi environment (just emits logs spam). So, don't 124 # use it in wsgi. 125 try: 126 if env.IN_MOD_WSGI: 127 return retry_util.GenericRetry(handler, max_retry, _run, 128 self, call, **dargs) 129 with timeout_util.Timeout(self.timeout_min * 60): 130 return retry_util.GenericRetry(handler, max_retry, _run, 131 self, call, **dargs) 132 except timeout_util.TimeoutError: 133 c = metrics.Counter( 134 'chromeos/autotest/retrying_afe/retry_timeout') 135 # Reserve field job_details for future use. 136 f = {'destination_server': self.server.split(':')[0], 137 'call': call, 138 'job_details': ''} 139 c.increment(fields=f) 140 raise 141 else: 142 return _run_in_child_thread(self, call, **dargs) 143 144 145class RetryingTKO(frontend.TKO): 146 """Wrapper around frontend.TKO that retries all RPCs. 147 148 Timeout for retries and delay between retries are configurable. 149 """ 150 def __init__(self, timeout_min=30, delay_sec=10, **dargs): 151 """Constructor 152 153 @param timeout_min: timeout in minutes until giving up. 154 @param delay_sec: pre-jittered delay between retries in seconds. 155 """ 156 self.timeout_min = timeout_min 157 self.delay_sec = delay_sec 158 super(RetryingTKO, self).__init__(**dargs) 159 160 161 def run(self, call, **dargs): 162 """Method for running RPC call. 163 164 @param call: A string RPC call. 165 @param dargs: the parameters of the RPC call. 166 """ 167 @retry.retry(Exception, timeout_min=self.timeout_min, 168 delay_sec=self.delay_sec, 169 blacklist=[ImportError, error.RPCException, 170 proxy.ValidationError]) 171 def _run(self, call, **dargs): 172 return super(RetryingTKO, self).run(call, **dargs) 173 return _run(self, call, **dargs) 174