• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2014 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import fnmatch
6import hashlib
7import logging
8import posixpath
9import signal
10try:
11  import _thread as thread
12except ImportError:
13  import thread
14import threading
15
16from devil import base_error
17from devil.android import crash_handler
18from devil.android import device_errors
19from devil.android.sdk import version_codes
20from devil.android.tools import device_recovery
21from devil.utils import signal_handler
22from pylib.base import base_test_result
23from pylib.base import test_collection
24from pylib.base import test_exception
25from pylib.base import test_run
26from pylib.local.device import local_device_environment
27
28
29_SIGTERM_TEST_LOG = (
30  '  Suite execution terminated, probably due to swarming timeout.\n'
31  '  Your test may not have run.')
32
33
34def SubstituteDeviceRoot(device_path, device_root):
35  if not device_path:
36    return device_root
37  if isinstance(device_path, list):
38    return posixpath.join(*(p if p else device_root for p in device_path))
39  return device_path
40
41
42class TestsTerminated(Exception):
43  pass
44
45
46class LocalDeviceTestRun(test_run.TestRun):
47
48  def __init__(self, env, test_instance):
49    super().__init__(env, test_instance)
50    # This is intended to be filled by a child class.
51    self._installed_packages = []
52    env.SetPreferredAbis(test_instance.GetPreferredAbis())
53
54  #override
55  def RunTests(self, results, raw_logs_fh=None):
56    tests = self._GetTests()
57
58    exit_now = threading.Event()
59
60    @local_device_environment.handle_shard_failures
61    def run_tests_on_device(dev, tests, results):
62      # This is performed here instead of during setup because restarting the
63      # device clears app compatibility flags, which will happen if a device
64      # needs to be recovered.
65      SetAppCompatibilityFlagsIfNecessary(self._installed_packages, dev)
66      consecutive_device_errors = 0
67      for test in tests:
68        if not test:
69          logging.warning('No tests in shard. Continuing.')
70          tests.test_completed()
71          continue
72        if exit_now.isSet():
73          thread.exit()
74
75        result = None
76        rerun = None
77        try:
78          result, rerun = crash_handler.RetryOnSystemCrash(
79              lambda d, t=test: self._RunTest(d, t),
80              device=dev)
81          consecutive_device_errors = 0
82          if isinstance(result, base_test_result.BaseTestResult):
83            results.AddResult(result)
84          elif isinstance(result, list):
85            results.AddResults(result)
86          else:
87            raise Exception(
88                'Unexpected result type: %s' % type(result).__name__)
89        except device_errors.CommandTimeoutError:
90          # Test timeouts don't count as device errors for the purpose
91          # of bad device detection.
92          consecutive_device_errors = 0
93
94          if isinstance(test, list):
95            result_log = ''
96            if len(test) > 1:
97              result_log = ('The test command timed out when running multiple '
98                            'tests including this test. It does not '
99                            'necessarily mean this specific test timed out.')
100              # Ensure instrumentation tests not batched at env level retries.
101              for t in test:
102                # |dict| type infers it's an instrumentation test.
103                if isinstance(t, dict) and t['annotations']:
104                  t['annotations'].pop('Batch', None)
105
106            results.AddResults(
107                base_test_result.BaseTestResult(
108                    self._GetUniqueTestName(t),
109                    base_test_result.ResultType.TIMEOUT,
110                    log=result_log) for t in test)
111          else:
112            results.AddResult(
113                base_test_result.BaseTestResult(
114                    self._GetUniqueTestName(test),
115                    base_test_result.ResultType.TIMEOUT))
116        except device_errors.DeviceUnreachableError:
117          # If the device is no longer reachable then terminate this
118          # run_tests_on_device call.
119          raise
120        except base_error.BaseError:
121          # If we get a device error but believe the device is still
122          # reachable, attempt to continue using it.
123          if isinstance(tests, test_collection.TestCollection):
124            rerun = test
125
126          consecutive_device_errors += 1
127          if consecutive_device_errors >= 3:
128            # We believe the device is still reachable and may still be usable,
129            # but if it fails repeatedly, we shouldn't attempt to keep using
130            # it.
131            logging.error('Repeated failures on device %s. Abandoning.',
132                          str(dev))
133            raise
134
135          logging.exception(
136              'Attempting to continue using device %s despite failure (%d/3).',
137              str(dev), consecutive_device_errors)
138
139        finally:
140          if isinstance(tests, test_collection.TestCollection):
141            if rerun:
142              tests.add(rerun)
143            tests.test_completed()
144
145      logging.info('Finished running tests on this device.')
146
147    def stop_tests(_signum, _frame):
148      logging.critical('Received SIGTERM. Stopping test execution.')
149      exit_now.set()
150      raise TestsTerminated()
151
152    try:
153      with signal_handler.AddSignalHandler(signal.SIGTERM, stop_tests):
154        self._env.ResetCurrentTry()
155        while self._env.current_try < self._env.max_tries and tests:
156          tries = self._env.current_try
157          tests = self._SortTests(tests)
158          grouped_tests = self._GroupTestsAfterSharding(tests)
159          logging.info('STARTING TRY #%d/%d', tries + 1, self._env.max_tries)
160          if tries > 0 and self._env.recover_devices:
161            if any(d.build_version_sdk == version_codes.LOLLIPOP_MR1
162                   for d in self._env.devices):
163              logging.info(
164                  'Attempting to recover devices due to known issue on L MR1. '
165                  'See crbug.com/787056 for details.')
166              self._env.parallel_devices.pMap(
167                  device_recovery.RecoverDevice, None)
168            elif tries + 1 == self._env.max_tries:
169              logging.info(
170                  'Attempting to recover devices prior to last test attempt.')
171              self._env.parallel_devices.pMap(
172                  device_recovery.RecoverDevice, None)
173          logging.info(
174              'Will run %d tests, grouped into %d groups, on %d devices: %s',
175              len(tests), len(grouped_tests), len(self._env.devices),
176              ', '.join(str(d) for d in self._env.devices))
177          for t in tests:
178            logging.debug('  %s', t)
179
180          try_results = base_test_result.TestRunResults()
181          test_names = (self._GetUniqueTestName(t) for t in tests)
182          try_results.AddResults(
183              base_test_result.BaseTestResult(
184                  t, base_test_result.ResultType.NOTRUN)
185              for t in test_names if not t.endswith('*'))
186
187          # As soon as we know the names of the tests, we populate |results|.
188          # The tests in try_results will have their results updated by
189          # try_results.AddResult() as they are run.
190          results.append(try_results)
191
192          try:
193            if self._ShouldShardTestsForDevices():
194              tc = test_collection.TestCollection(
195                  self._CreateShardsForDevices(grouped_tests))
196              self._env.parallel_devices.pMap(
197                  run_tests_on_device, tc, try_results).pGet(None)
198            else:
199              self._env.parallel_devices.pMap(run_tests_on_device,
200                                              grouped_tests,
201                                              try_results).pGet(None)
202          except TestsTerminated:
203            for unknown_result in try_results.GetUnknown():
204              try_results.AddResult(
205                  base_test_result.BaseTestResult(
206                      unknown_result.GetName(),
207                      base_test_result.ResultType.TIMEOUT,
208                      log=_SIGTERM_TEST_LOG))
209            raise
210
211          self._env.IncrementCurrentTry()
212          tests = self._GetTestsToRetry(tests, try_results)
213
214          logging.info('FINISHED TRY #%d/%d', tries + 1, self._env.max_tries)
215          if tests:
216            logging.info('%d failed tests remain.', len(tests))
217          else:
218            logging.info('All tests completed.')
219    except TestsTerminated:
220      pass
221
222  def _GetTestsToRetry(self, tests, try_results):
223
224    def is_failure_result(test_result):
225      if isinstance(test_result, list):
226        return any(is_failure_result(r) for r in test_result)
227      return (
228          test_result is None
229          or test_result.GetType() not in (
230              base_test_result.ResultType.PASS,
231              base_test_result.ResultType.SKIP))
232
233    all_test_results = {r.GetName(): r for r in try_results.GetAll()}
234
235    tests_and_names = ((t, self._GetUniqueTestName(t)) for t in tests)
236
237    tests_and_results = {}
238    for test, name in tests_and_names:
239      if name.endswith('*'):
240        tests_and_results[name] = (test, [
241            r for n, r in all_test_results.items() if fnmatch.fnmatch(n, name)
242        ])
243      else:
244        tests_and_results[name] = (test, all_test_results.get(name))
245
246    failed_tests_and_results = ((test, result)
247                                for test, result in tests_and_results.values()
248                                if is_failure_result(result))
249
250    failed_tests = [
251        t for t, r in failed_tests_and_results if self._ShouldRetry(t, r)
252    ]
253    return self._AppendPreTestsForRetry(failed_tests, tests)
254
255  def _ApplyExternalSharding(self, tests, shard_index, total_shards):
256    logging.info('Using external sharding settings. This is shard %d/%d',
257                 shard_index, total_shards)
258
259    if total_shards < 0 or shard_index < 0 or total_shards <= shard_index:
260      raise test_exception.InvalidShardingSettings(shard_index, total_shards)
261
262    sharded_tests = []
263
264    # Sort tests by hash.
265    # TODO(crbug.com/40200835): Add sorting logic back to _PartitionTests.
266    tests = self._SortTests(tests)
267
268    # Group tests by tests that should run in the same test invocation - either
269    # unit tests or batched tests.
270    grouped_tests = self._GroupTests(tests)
271
272    # Partition grouped tests approximately evenly across shards.
273    partitioned_tests = self._PartitionTests(grouped_tests, total_shards,
274                                             float('inf'))
275    if len(partitioned_tests) <= shard_index:
276      return []
277    for t in partitioned_tests[shard_index]:
278      if isinstance(t, list):
279        sharded_tests.extend(t)
280      else:
281        sharded_tests.append(t)
282    return sharded_tests
283
284  # Sort by hash so we don't put all tests in a slow suite in the same
285  # partition.
286  def _SortTests(self, tests):
287    return sorted(tests,
288                  key=lambda t: hashlib.sha256(
289                      self._GetUniqueTestName(t[0] if isinstance(t, list) else t
290                                              ).encode()).hexdigest())
291
292  # Partition tests evenly into |num_desired_partitions| partitions where
293  # possible. However, many constraints make partitioning perfectly impossible.
294  # If the max_partition_size isn't large enough, extra partitions may be
295  # created (infinite max size should always return precisely the desired
296  # number of partitions). Even if the |max_partition_size| is technically large
297  # enough to hold all of the tests in |num_desired_partitions|, we attempt to
298  # keep test order relatively stable to minimize flakes, so when tests are
299  # grouped (eg. batched tests), we cannot perfectly fill all paritions as that
300  # would require breaking up groups.
301  def _PartitionTests(self, tests, num_desired_partitions, max_partition_size):
302    # pylint: disable=no-self-use
303    partitions = []
304
305
306    num_not_yet_allocated = sum(
307        [len(test) - 1 for test in tests if self._CountTestsIndividually(test)])
308    num_not_yet_allocated += len(tests)
309
310    # Fast linear partition approximation capped by max_partition_size. We
311    # cannot round-robin or otherwise re-order tests dynamically because we want
312    # test order to remain stable.
313    partition_size = min(num_not_yet_allocated // num_desired_partitions,
314                         max_partition_size)
315    partitions.append([])
316    last_partition_size = 0
317    for test in tests:
318      test_count = len(test) if self._CountTestsIndividually(test) else 1
319      # Make a new shard whenever we would overfill the previous one. However,
320      # if the size of the test group is larger than the max partition size on
321      # its own, just put the group in its own shard instead of splitting up the
322      # group.
323      # TODO(crbug.com/40200835): Add logic to support PRE_ test recognition but
324      # it may hurt performance in most scenarios. Currently all PRE_ tests are
325      # partitioned into the last shard. Unless the number of PRE_ tests are
326      # larger than the partition size, the PRE_ test may get assigned into a
327      # different shard and cause test failure.
328      if (last_partition_size + test_count > partition_size
329          and last_partition_size > 0):
330        num_desired_partitions -= 1
331        if num_desired_partitions <= 0:
332          # Too many tests for number of partitions, just fill all partitions
333          # beyond num_desired_partitions.
334          partition_size = max_partition_size
335        else:
336          # Re-balance remaining partitions.
337          partition_size = min(num_not_yet_allocated // num_desired_partitions,
338                               max_partition_size)
339        partitions.append([])
340        partitions[-1].append(test)
341        last_partition_size = test_count
342      else:
343        partitions[-1].append(test)
344        last_partition_size += test_count
345
346      num_not_yet_allocated -= test_count
347
348    if not partitions[-1]:
349      partitions.pop()
350    return partitions
351
352  def _CountTestsIndividually(self, test):
353    # pylint: disable=no-self-use
354    if not isinstance(test, list):
355      return False
356    annotations = test[0]['annotations']
357    # UnitTests tests are really fast, so to balance shards better, count
358    # UnitTests Batches as single tests.
359    return ('Batch' not in annotations
360            or annotations['Batch']['value'] != 'UnitTests')
361
362  def _CreateShardsForDevices(self, tests):
363    raise NotImplementedError
364
365  def _GetUniqueTestName(self, test):
366    # pylint: disable=no-self-use
367    return test
368
369  def _ShouldRetry(self, test, result):
370    # pylint: disable=no-self-use,unused-argument
371    return True
372
373  #override
374  def GetTestsForListing(self):
375    ret = self._GetTests()
376    ret = FlattenTestList(ret)
377    ret.sort()
378    return ret
379
380  def _GetTests(self):
381    raise NotImplementedError
382
383  def _GroupTests(self, tests):
384    # pylint: disable=no-self-use
385    return tests
386
387  def _GroupTestsAfterSharding(self, tests):
388    # pylint: disable=no-self-use
389    return tests
390
391  def _AppendPreTestsForRetry(self, failed_tests, tests):
392    # pylint: disable=no-self-use,unused-argument
393    return failed_tests
394
395  def _RunTest(self, device, test):
396    raise NotImplementedError
397
398  def _ShouldShardTestsForDevices(self):
399    raise NotImplementedError
400
401
402def FlattenTestList(values):
403  """Returns a list with all nested lists (shard groupings) expanded."""
404  ret = []
405  for v in values:
406    if isinstance(v, list):
407      ret += v
408    else:
409      ret.append(v)
410  return ret
411
412
413def SetAppCompatibilityFlagsIfNecessary(packages, device):
414  """Sets app compatibility flags on the given packages and device.
415
416  Args:
417    packages: A list of strings containing package names to apply flags to.
418    device: A DeviceUtils instance to apply the flags on.
419  """
420
421  def set_flag_for_packages(flag, enable):
422    enable_str = 'enable' if enable else 'disable'
423    for p in packages:
424      cmd = ['am', 'compat', enable_str, flag, p]
425      device.RunShellCommand(cmd)
426
427  sdk_version = device.build_version_sdk
428  if sdk_version >= version_codes.R:
429    # These flags are necessary to use the legacy storage permissions on R+.
430    # See crbug.com/1173699 for more information.
431    set_flag_for_packages('DEFAULT_SCOPED_STORAGE', False)
432    set_flag_for_packages('FORCE_ENABLE_SCOPED_STORAGE', False)
433
434
435class NoTestsError(Exception):
436  """Error for when no tests are found."""
437