• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import math
7import threading
8
9import common
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.common_lib.cros import retry
12from autotest_lib.frontend.afe.json_rpc import proxy
13from autotest_lib.server import frontend
14try:
15    from chromite.lib import retry_util
16    from chromite.lib import timeout_util
17except ImportError:
18    logging.warn('Unable to import chromite.')
19    retry_util = None
20    timeout_util = None
21
22
23def convert_timeout_to_retry(backoff, timeout_min, delay_sec):
24    """Compute the number of retry attempts for use with chromite.retry_util.
25
26    @param backoff: The exponential backoff factor.
27    @param timeout_min: The maximum amount of time (in minutes) to sleep.
28    @param delay_sec: The amount to sleep (in seconds) between each attempt.
29
30    @return: The number of retry attempts in the case of exponential backoff.
31    """
32    # Estimate the max_retry in the case of exponential backoff:
33    # => total_sleep = sleep*sum(r=0..max_retry-1, backoff^r)
34    # => total_sleep = sleep( (1-backoff^max_retry) / (1-backoff) )
35    # => max_retry*ln(backoff) = ln(1-(total_sleep/sleep)*(1-backoff))
36    # => max_retry = ln(1-(total_sleep/sleep)*(1-backoff))/ln(backoff)
37    total_sleep = timeout_min * 60
38    numerator = math.log10(1-(total_sleep/delay_sec)*(1-backoff))
39    denominator = math.log10(backoff)
40    return int(math.ceil(numerator/denominator))
41
42
43class RetryingAFE(frontend.AFE):
44    """Wrapper around frontend.AFE that retries all RPCs.
45
46    Timeout for retries and delay between retries are configurable.
47    """
48    def __init__(self, timeout_min=30, delay_sec=10, **dargs):
49        """Constructor
50
51        @param timeout_min: timeout in minutes until giving up.
52        @param delay_sec: pre-jittered delay between retries in seconds.
53        """
54        self.timeout_min = timeout_min
55        self.delay_sec = delay_sec
56        super(RetryingAFE, self).__init__(**dargs)
57
58
59    def set_timeout(self, timeout_min):
60        """Set timeout minutes for the AFE server.
61
62        @param timeout_min: The timeout minutes for AFE server.
63        """
64        self.timeout_min = timeout_min
65
66
67    def run(self, call, **dargs):
68        if retry_util is None:
69            raise ImportError('Unable to import chromite. Please consider to '
70                              'run build_externals to build site packages.')
71        # exc_retry: We retry if this exception is raised.
72        # blacklist: Exceptions that we raise immediately if caught.
73        exc_retry = Exception
74        blacklist = (ImportError, error.RPCException, proxy.JSONRPCException,
75                     timeout_util.TimeoutError)
76        backoff = 2
77        max_retry = convert_timeout_to_retry(backoff, self.timeout_min,
78                                             self.delay_sec)
79
80        def _run(self, call, **dargs):
81            return super(RetryingAFE, self).run(call, **dargs)
82
83        def handler(exc):
84            """Check if exc is an exc_retry or if it's blacklisted.
85
86            @param exc: An exception.
87
88            @return: True if exc is an exc_retry and is not
89                     blacklisted. False otherwise.
90            """
91            is_exc_to_check = isinstance(exc, exc_retry)
92            is_blacklisted = isinstance(exc, blacklist)
93            return is_exc_to_check and not is_blacklisted
94
95        # If the call is not in main thread, signal can't be used to abort the
96        # call. In that case, use a basic retry which does not enforce timeout
97        # if the process hangs.
98        @retry.retry(Exception, timeout_min=self.timeout_min,
99                     delay_sec=self.delay_sec,
100                     blacklist=[ImportError, error.RPCException,
101                                proxy.ValidationError])
102        def _run_in_child_thread(self, call, **dargs):
103            return super(RetryingAFE, self).run(call, **dargs)
104
105        if isinstance(threading.current_thread(), threading._MainThread):
106            # Set the keyword argument for GenericRetry
107            dargs['sleep'] = self.delay_sec
108            dargs['backoff_factor'] = backoff
109            with timeout_util.Timeout(self.timeout_min * 60):
110                return retry_util.GenericRetry(handler, max_retry, _run,
111                                               self, call, **dargs)
112        else:
113            return _run_in_child_thread(self, call, **dargs)
114
115
116class RetryingTKO(frontend.TKO):
117    """Wrapper around frontend.TKO that retries all RPCs.
118
119    Timeout for retries and delay between retries are configurable.
120    """
121    def __init__(self, timeout_min=30, delay_sec=10, **dargs):
122        """Constructor
123
124        @param timeout_min: timeout in minutes until giving up.
125        @param delay_sec: pre-jittered delay between retries in seconds.
126        """
127        self.timeout_min = timeout_min
128        self.delay_sec = delay_sec
129        super(RetryingTKO, self).__init__(**dargs)
130
131
132    def run(self, call, **dargs):
133        @retry.retry(Exception, timeout_min=self.timeout_min,
134                     delay_sec=self.delay_sec,
135                     blacklist=[ImportError, error.RPCException,
136                                proxy.ValidationError])
137        def _run(self, call, **dargs):
138            return super(RetryingTKO, self).run(call, **dargs)
139        return _run(self, call, **dargs)
140