• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Dispatches tests, either sharding or replicating them.
6
7Performs the following steps:
8* Create a test collection factory, using the given tests
9  - If sharding: test collection factory returns the same shared test collection
10    to all test runners
11  - If replciating: test collection factory returns a unique test collection to
12    each test runner, with the same set of tests in each.
13* Create a test runner for each device.
14* Run each test runner in its own thread, grabbing tests from the test
15  collection until there are no tests left.
16"""
17
18import logging
19import threading
20
21from pylib import android_commands
22from pylib import constants
23from pylib.base import base_test_result
24from pylib.device import device_errors
25from pylib.utils import reraiser_thread
26from pylib.utils import watchdog_timer
27
28
29DEFAULT_TIMEOUT = 7 * 60  # seven minutes
30
31
32class _ThreadSafeCounter(object):
33  """A threadsafe counter."""
34
35  def __init__(self):
36    self._lock = threading.Lock()
37    self._value = 0
38
39  def GetAndIncrement(self):
40    """Get the current value and increment it atomically.
41
42    Returns:
43      The value before incrementing.
44    """
45    with self._lock:
46      pre_increment = self._value
47      self._value += 1
48      return pre_increment
49
50
51class _Test(object):
52  """Holds a test with additional metadata."""
53
54  def __init__(self, test, tries=0):
55    """Initializes the _Test object.
56
57    Args:
58      test: The test.
59      tries: Number of tries so far.
60    """
61    self.test = test
62    self.tries = tries
63
64
65class _TestCollection(object):
66  """A threadsafe collection of tests.
67
68  Args:
69    tests: List of tests to put in the collection.
70  """
71
72  def __init__(self, tests=None):
73    if not tests:
74      tests = []
75    self._lock = threading.Lock()
76    self._tests = []
77    self._tests_in_progress = 0
78    # Used to signal that an item is available or all items have been handled.
79    self._item_available_or_all_done = threading.Event()
80    for t in tests:
81      self.add(t)
82
83  def _pop(self):
84    """Pop a test from the collection.
85
86    Waits until a test is available or all tests have been handled.
87
88    Returns:
89      A test or None if all tests have been handled.
90    """
91    while True:
92      # Wait for a test to be available or all tests to have been handled.
93      self._item_available_or_all_done.wait()
94      with self._lock:
95        # Check which of the two conditions triggered the signal.
96        if self._tests_in_progress == 0:
97          return None
98        try:
99          return self._tests.pop(0)
100        except IndexError:
101          # Another thread beat us to the available test, wait again.
102          self._item_available_or_all_done.clear()
103
104  def add(self, test):
105    """Add an test to the collection.
106
107    Args:
108      test: A test to add.
109    """
110    with self._lock:
111      self._tests.append(test)
112      self._item_available_or_all_done.set()
113      self._tests_in_progress += 1
114
115  def test_completed(self):
116    """Indicate that a test has been fully handled."""
117    with self._lock:
118      self._tests_in_progress -= 1
119      if self._tests_in_progress == 0:
120        # All tests have been handled, signal all waiting threads.
121        self._item_available_or_all_done.set()
122
123  def __iter__(self):
124    """Iterate through tests in the collection until all have been handled."""
125    while True:
126      r = self._pop()
127      if r is None:
128        break
129      yield r
130
131  def __len__(self):
132    """Return the number of tests currently in the collection."""
133    return len(self._tests)
134
135  def test_names(self):
136    """Return a list of the names of the tests currently in the collection."""
137    with self._lock:
138      return list(t.test for t in self._tests)
139
140
141def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
142                       num_retries, tag_results_with_device=False):
143  """Runs tests from the test_collection until empty using the given runner.
144
145  Adds TestRunResults objects to the out_results list and may add tests to the
146  out_retry list.
147
148  Args:
149    runner: A TestRunner object used to run the tests.
150    test_collection: A _TestCollection from which to get _Test objects to run.
151    out_results: A list to add TestRunResults to.
152    watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
153    num_retries: Number of retries for a test.
154    tag_results_with_device: If True, appends the name of the device on which
155        the test was run to the test name. Used when replicating to identify
156        which device ran each copy of the test, and to ensure each copy of the
157        test is recorded separately.
158  """
159
160  def TagTestRunResults(test_run_results):
161    """Tags all results with the last 4 digits of the device id.
162
163    Used when replicating tests to distinguish the same tests run on different
164    devices. We use a set to store test results, so the hash (generated from
165    name and tag) must be unique to be considered different results.
166    """
167    new_test_run_results = base_test_result.TestRunResults()
168    for test_result in test_run_results.GetAll():
169      test_result.SetName('%s_%s' % (runner.device_serial[-4:],
170                                     test_result.GetName()))
171      new_test_run_results.AddResult(test_result)
172    return new_test_run_results
173
174  for test in test_collection:
175    watcher.Reset()
176    try:
177      if runner.device_serial not in android_commands.GetAttachedDevices():
178        # Device is unresponsive, stop handling tests on this device.
179        msg = 'Device %s is unresponsive.' % runner.device_serial
180        logging.warning(msg)
181        raise device_errors.DeviceUnreachableError(msg)
182      result, retry = runner.RunTest(test.test)
183      if tag_results_with_device:
184        result = TagTestRunResults(result)
185      test.tries += 1
186      if retry and test.tries <= num_retries:
187        # Retry non-passing results, only record passing results.
188        pass_results = base_test_result.TestRunResults()
189        pass_results.AddResults(result.GetPass())
190        out_results.append(pass_results)
191        logging.warning('Will retry test, try #%s.' % test.tries)
192        test_collection.add(_Test(test=retry, tries=test.tries))
193      else:
194        # All tests passed or retry limit reached. Either way, record results.
195        out_results.append(result)
196    except:
197      # An unhandleable exception, ensure tests get run by another device and
198      # reraise this exception on the main thread.
199      test_collection.add(test)
200      raise
201    finally:
202      # Retries count as separate tasks so always mark the popped test as done.
203      test_collection.test_completed()
204
205
206def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
207  """Creates a test runner for each device and calls SetUp() in parallel.
208
209  Note: if a device is unresponsive the corresponding TestRunner will not be
210    added to out_runners.
211
212  Args:
213    runner_factory: Callable that takes a device and index and returns a
214      TestRunner object.
215    device: The device serial number to set up.
216    out_runners: List to add the successfully set up TestRunner object.
217    threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
218  """
219  try:
220    index = threadsafe_counter.GetAndIncrement()
221    logging.warning('Creating shard %s for device %s.', index, device)
222    runner = runner_factory(device, index)
223    runner.SetUp()
224    out_runners.append(runner)
225  except (device_errors.DeviceUnreachableError,
226          # TODO(jbudorick) Remove this once the underlying implementations
227          #                 for the above are switched or wrapped.
228          android_commands.errors.DeviceUnresponsiveError) as e:
229    logging.warning('Failed to create shard for %s: [%s]', device, e)
230
231
232def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
233                 tag_results_with_device=False):
234  """Run all tests using the given TestRunners.
235
236  Args:
237    runners: A list of TestRunner objects.
238    test_collection_factory: A callable to generate a _TestCollection object for
239        each test runner.
240    num_retries: Number of retries for a test.
241    timeout: Watchdog timeout in seconds.
242    tag_results_with_device: If True, appends the name of the device on which
243        the test was run to the test name. Used when replicating to identify
244        which device ran each copy of the test, and to ensure each copy of the
245        test is recorded separately.
246
247  Returns:
248    A tuple of (TestRunResults object, exit code)
249  """
250  logging.warning('Running tests with %s test runners.' % (len(runners)))
251  results = []
252  exit_code = 0
253  run_results = base_test_result.TestRunResults()
254  watcher = watchdog_timer.WatchdogTimer(timeout)
255  test_collections = [test_collection_factory() for _ in runners]
256
257  threads = [
258      reraiser_thread.ReraiserThread(
259          _RunTestsFromQueue,
260          [r, tc, results, watcher, num_retries, tag_results_with_device],
261          name=r.device_serial[-4:])
262      for r, tc in zip(runners, test_collections)]
263
264  workers = reraiser_thread.ReraiserThreadGroup(threads)
265  workers.StartAll()
266
267  # Catch DeviceUnreachableErrors and set a warning exit code
268  try:
269    workers.JoinAll(watcher)
270  except (device_errors.DeviceUnreachableError,
271          # TODO(jbudorick) Remove this once the underlying implementations
272          #                 for the above are switched or wrapped.
273          android_commands.errors.DeviceUnresponsiveError) as e:
274    logging.error(e)
275    exit_code = constants.WARNING_EXIT_CODE
276
277  if not all((len(tc) == 0 for tc in test_collections)):
278    logging.error('Only ran %d tests (all devices are likely offline).' %
279                  len(results))
280    for tc in test_collections:
281      run_results.AddResults(base_test_result.BaseTestResult(
282          t, base_test_result.ResultType.UNKNOWN) for t in tc.test_names())
283
284  for r in results:
285    run_results.AddTestRunResults(r)
286  if not run_results.DidRunPass():
287    exit_code = constants.ERROR_EXIT_CODE
288  return (run_results, exit_code)
289
290
291def _CreateRunners(runner_factory, devices, timeout=None):
292  """Creates a test runner for each device and calls SetUp() in parallel.
293
294  Note: if a device is unresponsive the corresponding TestRunner will not be
295    included in the returned list.
296
297  Args:
298    runner_factory: Callable that takes a device and index and returns a
299      TestRunner object.
300    devices: List of device serial numbers as strings.
301    timeout: Watchdog timeout in seconds, defaults to the default timeout.
302
303  Returns:
304    A list of TestRunner objects.
305  """
306  logging.warning('Creating %s test runners.' % len(devices))
307  runners = []
308  counter = _ThreadSafeCounter()
309  threads = reraiser_thread.ReraiserThreadGroup(
310      [reraiser_thread.ReraiserThread(_SetUp,
311                                      [runner_factory, d, runners, counter],
312                                      name=d[-4:])
313       for d in devices])
314  threads.StartAll()
315  threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
316  return runners
317
318
319def _TearDownRunners(runners, timeout=None):
320  """Calls TearDown() for each test runner in parallel.
321
322  Args:
323    runners: A list of TestRunner objects.
324    timeout: Watchdog timeout in seconds, defaults to the default timeout.
325  """
326  threads = reraiser_thread.ReraiserThreadGroup(
327      [reraiser_thread.ReraiserThread(r.TearDown, name=r.device_serial[-4:])
328       for r in runners])
329  threads.StartAll()
330  threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
331
332
333def ApplyMaxPerRun(tests, max_per_run):
334  """Rearrange the tests so that no group contains more than max_per_run tests.
335
336  Args:
337    tests:
338    max_per_run:
339
340  Returns:
341    A list of tests with no more than max_per_run per run.
342  """
343  tests_expanded = []
344  for test_group in tests:
345    if type(test_group) != str:
346      # Do not split test objects which are not strings.
347      tests_expanded.append(test_group)
348    else:
349      test_split = test_group.split(':')
350      for i in range(0, len(test_split), max_per_run):
351        tests_expanded.append(':'.join(test_split[i:i+max_per_run]))
352  return tests_expanded
353
354
355def RunTests(tests, runner_factory, devices, shard=True,
356             test_timeout=DEFAULT_TIMEOUT, setup_timeout=DEFAULT_TIMEOUT,
357             num_retries=2, max_per_run=256):
358  """Run all tests on attached devices, retrying tests that don't pass.
359
360  Args:
361    tests: List of tests to run.
362    runner_factory: Callable that takes a device and index and returns a
363        TestRunner object.
364    devices: List of attached devices.
365    shard: True if we should shard, False if we should replicate tests.
366      - Sharding tests will distribute tests across all test runners through a
367        shared test collection.
368      - Replicating tests will copy all tests to each test runner through a
369        unique test collection for each test runner.
370    test_timeout: Watchdog timeout in seconds for running tests.
371    setup_timeout: Watchdog timeout in seconds for creating and cleaning up
372        test runners.
373    num_retries: Number of retries for a test.
374    max_per_run: Maximum number of tests to run in any group.
375
376  Returns:
377    A tuple of (base_test_result.TestRunResults object, exit code).
378  """
379  if not tests:
380    logging.critical('No tests to run.')
381    return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
382
383  tests_expanded = ApplyMaxPerRun(tests, max_per_run)
384  if shard:
385    # Generate a shared _TestCollection object for all test runners, so they
386    # draw from a common pool of tests.
387    shared_test_collection = _TestCollection([_Test(t) for t in tests_expanded])
388    test_collection_factory = lambda: shared_test_collection
389    tag_results_with_device = False
390    log_string = 'sharded across devices'
391  else:
392    # Generate a unique _TestCollection object for each test runner, but use
393    # the same set of tests.
394    test_collection_factory = lambda: _TestCollection(
395        [_Test(t) for t in tests_expanded])
396    tag_results_with_device = True
397    log_string = 'replicated on each device'
398
399  logging.info('Will run %d tests (%s): %s',
400               len(tests_expanded), log_string, str(tests_expanded))
401  runners = _CreateRunners(runner_factory, devices, setup_timeout)
402  try:
403    return _RunAllTests(runners, test_collection_factory,
404                        num_retries, test_timeout, tag_results_with_device)
405  finally:
406    try:
407      _TearDownRunners(runners, setup_timeout)
408    except (device_errors.DeviceUnreachableError,
409            # TODO(jbudorick) Remove this once the underlying implementations
410            #                 for the above are switched or wrapped.
411            android_commands.errors.DeviceUnresponsiveError) as e:
412      logging.warning('Device unresponsive during TearDown: [%s]', e)
413    except Exception as e:
414      logging.error('Unexpected exception caught during TearDown: %s' % str(e))
415