1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import sys 7import threading 8import time 9from autotest_lib.client.common_lib import error 10 11 12class BaseStressor(threading.Thread): 13 """ 14 Implements common functionality for *Stressor classes. 15 16 @var stressor: callable which performs a single stress event. 17 """ 18 def __init__(self, stressor, on_exit=None, escalate_exceptions=True): 19 """ 20 Initialize the ControlledStressor. 21 22 @param stressor: callable which performs a single stress event. 23 @param on_exit: callable which will be called when the thread finishes. 24 @param escalate_exceptions: whether to escalate exceptions to the parent 25 thread; defaults to True. 26 """ 27 super(BaseStressor, self).__init__() 28 self.daemon = True 29 self.stressor = stressor 30 self.on_exit = on_exit 31 self._escalate_exceptions = escalate_exceptions 32 self._exc_info = None 33 34 35 def start(self, start_condition=None, start_timeout_secs=None): 36 """ 37 Creates a new thread which will call the run() method. 38 39 Optionally takes a wait condition before the stressor loop. Returns 40 immediately. 41 42 @param start_condition: the new thread will wait until this optional 43 callable returns True before running the stressor. 44 @param start_timeout_secs: how long to wait for |start_condition| to 45 become True, or None to wait forever. 46 """ 47 self._start_condition = start_condition 48 self._start_timeout_secs = start_timeout_secs 49 super(BaseStressor, self).start() 50 51 52 def run(self): 53 """ 54 Wait for |_start_condition|, and then start the stressor loop. 55 56 Overloaded from threading.Thread. This is run in a separate thread when 57 start() is called. 58 """ 59 try: 60 self._wait_for_start_condition() 61 self._loop_stressor() 62 except Exception as e: 63 if self._escalate_exceptions: 64 self._exc_info = sys.exc_info() 65 raise # Terminates this thread. Caller continues to run. 66 finally: 67 if self.on_exit: 68 self.on_exit() 69 70 71 def _wait_for_start_condition(self): 72 """ 73 Loop until _start_condition() returns True, or _start_timeout_secs 74 have elapsed. 75 76 @raise error.TestFail if we time out waiting for the start condition 77 """ 78 if self._start_condition is None: 79 return 80 81 elapsed_secs = 0 82 while not self._start_condition(): 83 if (self._start_timeout_secs and 84 elapsed_secs >= self._start_timeout_secs): 85 raise error.TestFail('start condition did not become true ' 86 'within %d seconds' % 87 self._start_timeout_secs) 88 time.sleep(1) 89 elapsed_secs += 1 90 91 92 def _loop_stressor(self): 93 """ 94 Apply stressor in a loop. 95 96 Overloaded by the particular *Stressor. 97 """ 98 raise NotImplementedError 99 100 101 def reraise(self): 102 """ 103 Reraise an exception raised in the thread's stress loop. 104 105 This is a No-op if no exception was raised. 106 """ 107 if self._exc_info: 108 exc_info = self._exc_info 109 self._exc_info = None 110 raise exc_info[0], exc_info[1], exc_info[2] 111 112 113class ControlledStressor(BaseStressor): 114 """ 115 Run a stressor in loop on a separate thread. 116 117 Creates a new thread and calls |stressor| in a loop until stop() is called. 118 """ 119 def __init__(self, stressor, on_exit=None, escalate_exceptions=True): 120 """ 121 Initialize the ControlledStressor. 122 123 @param stressor: callable which performs a single stress event. 124 @param on_exit: callable which will be called when the thread finishes. 125 @param escalate_exceptions: whether to escalate exceptions to the parent 126 thread when stop() is called; defaults to True. 127 """ 128 self._complete = threading.Event() 129 super(ControlledStressor, self).__init__(stressor, on_exit, 130 escalate_exceptions) 131 132 133 def _loop_stressor(self): 134 """Overloaded from parent.""" 135 iteration_num = 0 136 while not self._complete.is_set(): 137 iteration_num += 1 138 logging.info('Stressor iteration: %d', iteration_num) 139 self.stressor() 140 141 142 def start(self, start_condition=None, start_timeout_secs=None): 143 """Start applying the stressor. 144 145 Overloaded from parent. 146 147 @param start_condition: the new thread will wait to until this optional 148 callable returns True before running the stressor. 149 @param start_timeout_secs: how long to wait for |start_condition| to 150 become True, or None to wait forever. 151 """ 152 self._complete.clear() 153 super(ControlledStressor, self).start(start_condition, 154 start_timeout_secs) 155 156 157 def stop(self, timeout=45): 158 """ 159 Stop applying the stressor. 160 161 @param timeout: maximum time to wait for a single run of the stressor to 162 complete, defaults to 45 seconds. 163 """ 164 self._complete.set() 165 self.join(timeout) 166 self.reraise() 167 168 169class CountedStressor(BaseStressor): 170 """ 171 Run a stressor in a loop on a separate thread a given number of times. 172 173 Creates a new thread and calls |stressor| in a loop |iterations| times. The 174 calling thread can use wait() to block until the loop completes. If the 175 stressor thread terminates with an exception, wait() will propagate that 176 exception to the thread that called wait(). 177 """ 178 def _loop_stressor(self): 179 """Overloaded from parent.""" 180 for iteration_num in xrange(1, self._iterations + 1): 181 logging.info('Stressor iteration: %d of %d', 182 iteration_num, self._iterations) 183 self.stressor() 184 185 186 def start(self, iterations, start_condition=None, start_timeout_secs=None): 187 """ 188 Apply the stressor a given number of times. 189 190 Overloaded from parent. 191 192 @param iterations: number of times to apply the stressor. 193 @param start_condition: the new thread will wait to until this optional 194 callable returns True before running the stressor. 195 @param start_timeout_secs: how long to wait for |start_condition| to 196 become True, or None to wait forever. 197 """ 198 self._iterations = iterations 199 super(CountedStressor, self).start(start_condition, start_timeout_secs) 200 201 202 def wait(self, timeout=None): 203 """Wait until the stressor completes. 204 205 @param timeout: maximum time for the thread to complete, by default 206 never times out. 207 """ 208 self.join(timeout) 209 self.reraise() 210