1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9import logging 10import sys 11import threading 12import time 13from autotest_lib.client.common_lib import error 14import six 15from six.moves import range 16 17 18class BaseStressor(threading.Thread): 19 """ 20 Implements common functionality for *Stressor classes. 21 22 @var stressor: callable which performs a single stress event. 23 """ 24 def __init__(self, stressor, on_exit=None, escalate_exceptions=True): 25 """ 26 Initialize the ControlledStressor. 27 28 @param stressor: callable which performs a single stress event. 29 @param on_exit: callable which will be called when the thread finishes. 30 @param escalate_exceptions: whether to escalate exceptions to the parent 31 thread; defaults to True. 32 """ 33 super(BaseStressor, self).__init__() 34 self.daemon = True 35 self.stressor = stressor 36 self.on_exit = on_exit 37 self._escalate_exceptions = escalate_exceptions 38 self._exc_info = None 39 40 41 def start(self, start_condition=None, start_timeout_secs=None): 42 """ 43 Creates a new thread which will call the run() method. 44 45 Optionally takes a wait condition before the stressor loop. Returns 46 immediately. 47 48 @param start_condition: the new thread will wait until this optional 49 callable returns True before running the stressor. 50 @param start_timeout_secs: how long to wait for |start_condition| to 51 become True, or None to wait forever. 52 """ 53 self._start_condition = start_condition 54 self._start_timeout_secs = start_timeout_secs 55 super(BaseStressor, self).start() 56 57 58 def run(self): 59 """ 60 Wait for |_start_condition|, and then start the stressor loop. 61 62 Overloaded from threading.Thread. This is run in a separate thread when 63 start() is called. 64 """ 65 try: 66 self._wait_for_start_condition() 67 self._loop_stressor() 68 except Exception as e: 69 if self._escalate_exceptions: 70 self._exc_info = sys.exc_info() 71 raise # Terminates this thread. Caller continues to run. 72 finally: 73 if self.on_exit: 74 self.on_exit() 75 76 77 def _wait_for_start_condition(self): 78 """ 79 Loop until _start_condition() returns True, or _start_timeout_secs 80 have elapsed. 81 82 @raise error.TestFail if we time out waiting for the start condition 83 """ 84 if self._start_condition is None: 85 return 86 87 elapsed_secs = 0 88 while not self._start_condition(): 89 if (self._start_timeout_secs and 90 elapsed_secs >= self._start_timeout_secs): 91 raise error.TestFail('start condition did not become true ' 92 'within %d seconds' % 93 self._start_timeout_secs) 94 time.sleep(1) 95 elapsed_secs += 1 96 97 98 def _loop_stressor(self): 99 """ 100 Apply stressor in a loop. 101 102 Overloaded by the particular *Stressor. 103 """ 104 raise NotImplementedError 105 106 107 def reraise(self): 108 """ 109 Reraise an exception raised in the thread's stress loop. 110 111 This is a No-op if no exception was raised. 112 """ 113 if self._exc_info: 114 exc_info = self._exc_info 115 self._exc_info = None 116 six.reraise(exc_info[0], exc_info[1], exc_info[2]) 117 118 119class ControlledStressor(BaseStressor): 120 """ 121 Run a stressor in loop on a separate thread. 122 123 Creates a new thread and calls |stressor| in a loop until stop() is called. 124 """ 125 def __init__(self, stressor, on_exit=None, escalate_exceptions=True): 126 """ 127 Initialize the ControlledStressor. 128 129 @param stressor: callable which performs a single stress event. 130 @param on_exit: callable which will be called when the thread finishes. 131 @param escalate_exceptions: whether to escalate exceptions to the parent 132 thread when stop() is called; defaults to True. 133 """ 134 self._complete = threading.Event() 135 super(ControlledStressor, self).__init__(stressor, on_exit, 136 escalate_exceptions) 137 138 139 def _loop_stressor(self): 140 """Overloaded from parent.""" 141 iteration_num = 0 142 while not self._complete.is_set(): 143 iteration_num += 1 144 logging.info('Stressor iteration: %d', iteration_num) 145 self.stressor() 146 147 148 def start(self, start_condition=None, start_timeout_secs=None): 149 """Start applying the stressor. 150 151 Overloaded from parent. 152 153 @param start_condition: the new thread will wait to until this optional 154 callable returns True before running the stressor. 155 @param start_timeout_secs: how long to wait for |start_condition| to 156 become True, or None to wait forever. 157 """ 158 self._complete.clear() 159 super(ControlledStressor, self).start(start_condition, 160 start_timeout_secs) 161 162 163 def stop(self, timeout=45): 164 """ 165 Stop applying the stressor. 166 167 @param timeout: maximum time to wait for a single run of the stressor to 168 complete, defaults to 45 seconds. 169 """ 170 self._complete.set() 171 self.join(timeout) 172 self.reraise() 173 174 175class CountedStressor(BaseStressor): 176 """ 177 Run a stressor in a loop on a separate thread a given number of times. 178 179 Creates a new thread and calls |stressor| in a loop |iterations| times. The 180 calling thread can use wait() to block until the loop completes. If the 181 stressor thread terminates with an exception, wait() will propagate that 182 exception to the thread that called wait(). 183 """ 184 def _loop_stressor(self): 185 """Overloaded from parent.""" 186 for iteration_num in range(1, self._iterations + 1): 187 logging.info('Stressor iteration: %d of %d', 188 iteration_num, self._iterations) 189 self.stressor() 190 191 192 def start(self, iterations, start_condition=None, start_timeout_secs=None): 193 """ 194 Apply the stressor a given number of times. 195 196 Overloaded from parent. 197 198 @param iterations: number of times to apply the stressor. 199 @param start_condition: the new thread will wait to until this optional 200 callable returns True before running the stressor. 201 @param start_timeout_secs: how long to wait for |start_condition| to 202 become True, or None to wait forever. 203 """ 204 self._iterations = iterations 205 super(CountedStressor, self).start(start_condition, start_timeout_secs) 206 207 208 def wait(self, timeout=None): 209 """Wait until the stressor completes. 210 211 @param timeout: maximum time for the thread to complete, by default 212 never times out. 213 """ 214 self.join(timeout) 215 self.reraise() 216