• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""A utility to run functions with timeouts and retries."""
5# pylint: disable=W0702
6
7import logging
8import threading
9import time
10
11from devil.utils import reraiser_thread
12from devil.utils import watchdog_timer
13
14logger = logging.getLogger(__name__)
15
16
17class TimeoutRetryThreadGroup(reraiser_thread.ReraiserThreadGroup):
18  def __init__(self, timeout, threads=None):
19    super(TimeoutRetryThreadGroup, self).__init__(threads)
20    self._watcher = watchdog_timer.WatchdogTimer(timeout)
21
22  def GetWatcher(self):
23    """Returns the watchdog keeping track of this thread's time."""
24    return self._watcher
25
26  def GetElapsedTime(self):
27    return self._watcher.GetElapsed()
28
29  def GetRemainingTime(self, required=0, suffix=None):
30    """Get the remaining time before the thread times out.
31
32    Useful to send as the |timeout| parameter of async IO operations.
33
34    Args:
35      required: minimum amount of time that will be required to complete, e.g.,
36        some sleep or IO operation.
37      msg: error message to show if timing out.
38
39    Returns:
40      The number of seconds remaining before the thread times out, or None
41      if the thread never times out.
42
43    Raises:
44      reraiser_thread.TimeoutError if the remaining time is less than the
45        required time.
46    """
47    remaining = self._watcher.GetRemaining()
48    if remaining is not None and remaining < required:
49      msg = 'Timeout of %.1f secs expired' % self._watcher.GetTimeout()
50      if suffix:
51        msg += suffix
52      raise reraiser_thread.TimeoutError(msg)
53    return remaining
54
55
56def CurrentTimeoutThreadGroup():
57  """Returns the thread group that owns or is blocked on the active thread.
58
59  Returns:
60    Returns None if no TimeoutRetryThreadGroup is tracking the current thread.
61  """
62  thread_group = reraiser_thread.CurrentThreadGroup()
63  while thread_group:
64    if isinstance(thread_group, TimeoutRetryThreadGroup):
65      return thread_group
66    thread_group = thread_group.blocked_parent_thread_group
67  return None
68
69
70def WaitFor(condition, wait_period=5, max_tries=None):
71  """Wait for a condition to become true.
72
73  Repeatedly call the function condition(), with no arguments, until it returns
74  a true value.
75
76  If called within a TimeoutRetryThreadGroup, it cooperates nicely with it.
77
78  Args:
79    condition: function with the condition to check
80    wait_period: number of seconds to wait before retrying to check the
81      condition
82    max_tries: maximum number of checks to make, the default tries forever
83      or until the TimeoutRetryThreadGroup expires.
84
85  Returns:
86    The true value returned by the condition, or None if the condition was
87    not met after max_tries.
88
89  Raises:
90    reraiser_thread.TimeoutError: if the current thread is a
91      TimeoutRetryThreadGroup and the timeout expires.
92  """
93  condition_name = condition.__name__
94  timeout_thread_group = CurrentTimeoutThreadGroup()
95  while max_tries is None or max_tries > 0:
96    result = condition()
97    if max_tries is not None:
98      max_tries -= 1
99    msg = ['condition', repr(condition_name), 'met' if result else 'not met']
100    if timeout_thread_group:
101      # pylint: disable=no-member
102      msg.append('(%.1fs)' % timeout_thread_group.GetElapsedTime())
103    logger.info(' '.join(msg))
104    if result:
105      return result
106    if timeout_thread_group:
107      # pylint: disable=no-member
108      timeout_thread_group.GetRemainingTime(
109          wait_period, suffix=' waiting for condition %r' % condition_name)
110    if wait_period:
111      time.sleep(wait_period)
112  return None
113
114
115def AlwaysRetry(_exception):
116  return True
117
118
119def Run(func,
120        timeout,
121        retries,
122        args=None,
123        kwargs=None,
124        desc=None,
125        error_log_func=logging.critical,
126        retry_if_func=AlwaysRetry):
127  """Runs the passed function in a separate thread with timeouts and retries.
128
129  Args:
130    func: the function to be wrapped.
131    timeout: the timeout in seconds for each try.
132    retries: the number of retries.
133    args: list of positional args to pass to |func|.
134    kwargs: dictionary of keyword args to pass to |func|.
135    desc: An optional description of |func| used in logging. If omitted,
136      |func.__name__| will be used.
137    error_log_func: Logging function when logging errors.
138    retry_if_func: Unary callable that takes an exception and returns
139      whether |func| should be retried. Defaults to always retrying.
140
141  Returns:
142    The return value of func(*args, **kwargs).
143  """
144  if not args:
145    args = []
146  if not kwargs:
147    kwargs = {}
148  if not desc:
149    desc = func.__name__
150
151  num_try = 1
152  while True:
153    thread_name = 'TimeoutThread-%d-for-%s' % (num_try,
154                                               threading.current_thread().name)
155    child_thread = reraiser_thread.ReraiserThread(
156        lambda: func(*args, **kwargs), name=thread_name)
157    try:
158      thread_group = TimeoutRetryThreadGroup(timeout, threads=[child_thread])
159      thread_group.StartAll(will_block=True)
160      while True:
161        thread_group.JoinAll(
162            watcher=thread_group.GetWatcher(),
163            timeout=60,
164            error_log_func=error_log_func)
165        if thread_group.IsAlive():
166          logger.info('Still working on %s', desc)
167        else:
168          return thread_group.GetAllReturnValues()[0]
169    except reraiser_thread.TimeoutError as e:
170      # Timeouts already get their stacks logged.
171      if num_try > retries or not retry_if_func(e):
172        raise
173      # Do not catch KeyboardInterrupt.
174    except Exception as e:  # pylint: disable=broad-except
175      if num_try > retries or not retry_if_func(e):
176        raise
177      error_log_func('(%s) Exception on %s, attempt %d of %d: %r', thread_name,
178                     desc, num_try, retries + 1, e)
179    num_try += 1
180