• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9import logging
10import sys
11import threading
12import time
13from autotest_lib.client.common_lib import error
14import six
15from six.moves import range
16
17
18class BaseStressor(threading.Thread):
19    """
20    Implements common functionality for *Stressor classes.
21
22    @var stressor: callable which performs a single stress event.
23    """
24    def __init__(self, stressor, on_exit=None, escalate_exceptions=True):
25        """
26        Initialize the ControlledStressor.
27
28        @param stressor: callable which performs a single stress event.
29        @param on_exit: callable which will be called when the thread finishes.
30        @param escalate_exceptions: whether to escalate exceptions to the parent
31            thread; defaults to True.
32        """
33        super(BaseStressor, self).__init__()
34        self.daemon = True
35        self.stressor = stressor
36        self.on_exit = on_exit
37        self._escalate_exceptions = escalate_exceptions
38        self._exc_info = None
39
40
41    def start(self, start_condition=None, start_timeout_secs=None):
42        """
43        Creates a new thread which will call the run() method.
44
45        Optionally takes a wait condition before the stressor loop. Returns
46        immediately.
47
48        @param start_condition: the new thread will wait until this optional
49            callable returns True before running the stressor.
50        @param start_timeout_secs: how long to wait for |start_condition| to
51            become True, or None to wait forever.
52        """
53        self._start_condition = start_condition
54        self._start_timeout_secs = start_timeout_secs
55        super(BaseStressor, self).start()
56
57
58    def run(self):
59        """
60        Wait for |_start_condition|, and then start the stressor loop.
61
62        Overloaded from threading.Thread. This is run in a separate thread when
63        start() is called.
64        """
65        try:
66            self._wait_for_start_condition()
67            self._loop_stressor()
68        except Exception as e:
69            if self._escalate_exceptions:
70                self._exc_info = sys.exc_info()
71            raise  # Terminates this thread. Caller continues to run.
72        finally:
73            if self.on_exit:
74              self.on_exit()
75
76
77    def _wait_for_start_condition(self):
78        """
79        Loop until _start_condition() returns True, or _start_timeout_secs
80        have elapsed.
81
82        @raise error.TestFail if we time out waiting for the start condition
83        """
84        if self._start_condition is None:
85            return
86
87        elapsed_secs = 0
88        while not self._start_condition():
89            if (self._start_timeout_secs and
90                    elapsed_secs >= self._start_timeout_secs):
91                raise error.TestFail('start condition did not become true '
92                                     'within %d seconds' %
93                                     self._start_timeout_secs)
94            time.sleep(1)
95            elapsed_secs += 1
96
97
98    def _loop_stressor(self):
99        """
100        Apply stressor in a loop.
101
102        Overloaded by the particular *Stressor.
103        """
104        raise NotImplementedError
105
106
107    def reraise(self):
108        """
109        Reraise an exception raised in the thread's stress loop.
110
111        This is a No-op if no exception was raised.
112        """
113        if self._exc_info:
114            exc_info = self._exc_info
115            self._exc_info = None
116            six.reraise(exc_info[0], exc_info[1], exc_info[2])
117
118
119class ControlledStressor(BaseStressor):
120    """
121    Run a stressor in loop on a separate thread.
122
123    Creates a new thread and calls |stressor| in a loop until stop() is called.
124    """
125    def __init__(self, stressor, on_exit=None, escalate_exceptions=True):
126        """
127        Initialize the ControlledStressor.
128
129        @param stressor: callable which performs a single stress event.
130        @param on_exit: callable which will be called when the thread finishes.
131        @param escalate_exceptions: whether to escalate exceptions to the parent
132            thread when stop() is called; defaults to True.
133        """
134        self._complete = threading.Event()
135        super(ControlledStressor, self).__init__(stressor, on_exit,
136                                                 escalate_exceptions)
137
138
139    def _loop_stressor(self):
140        """Overloaded from parent."""
141        iteration_num = 0
142        while not self._complete.is_set():
143            iteration_num += 1
144            logging.info('Stressor iteration: %d', iteration_num)
145            self.stressor()
146
147
148    def start(self, start_condition=None, start_timeout_secs=None):
149        """Start applying the stressor.
150
151        Overloaded from parent.
152
153        @param start_condition: the new thread will wait to until this optional
154            callable returns True before running the stressor.
155        @param start_timeout_secs: how long to wait for |start_condition| to
156            become True, or None to wait forever.
157        """
158        self._complete.clear()
159        super(ControlledStressor, self).start(start_condition,
160                                              start_timeout_secs)
161
162
163    def stop(self, timeout=45):
164        """
165        Stop applying the stressor.
166
167        @param timeout: maximum time to wait for a single run of the stressor to
168            complete, defaults to 45 seconds.
169        """
170        self._complete.set()
171        self.join(timeout)
172        self.reraise()
173
174
175class CountedStressor(BaseStressor):
176    """
177    Run a stressor in a loop on a separate thread a given number of times.
178
179    Creates a new thread and calls |stressor| in a loop |iterations| times. The
180    calling thread can use wait() to block until the loop completes. If the
181    stressor thread terminates with an exception, wait() will propagate that
182    exception to the thread that called wait().
183    """
184    def _loop_stressor(self):
185        """Overloaded from parent."""
186        for iteration_num in range(1, self._iterations + 1):
187            logging.info('Stressor iteration: %d of %d',
188                         iteration_num, self._iterations)
189            self.stressor()
190
191
192    def start(self, iterations, start_condition=None, start_timeout_secs=None):
193        """
194        Apply the stressor a given number of times.
195
196        Overloaded from parent.
197
198        @param iterations: number of times to apply the stressor.
199        @param start_condition: the new thread will wait to until this optional
200            callable returns True before running the stressor.
201        @param start_timeout_secs: how long to wait for |start_condition| to
202            become True, or None to wait forever.
203        """
204        self._iterations = iterations
205        super(CountedStressor, self).start(start_condition, start_timeout_secs)
206
207
208    def wait(self, timeout=None):
209        """Wait until the stressor completes.
210
211        @param timeout: maximum time for the thread to complete, by default
212            never times out.
213        """
214        self.join(timeout)
215        self.reraise()
216