• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dataclasses
6import json
7import logging
8import multiprocessing
9import os
10import queue
11import re
12import subprocess
13import sys
14import tempfile
15import threading
16import time
17import zipfile
18from concurrent.futures import ThreadPoolExecutor
19
20from devil.utils import cmd_helper
21from py_utils import tempfile_ext
22from pylib import constants
23from pylib.base import base_test_result
24from pylib.base import test_run
25from pylib.constants import host_paths
26from pylib.results import json_results
27
28# Chosen after timing test runs of chrome_junit_tests with 7,16,32,
29# and 64 workers in threadpool and different classes_per_job.
30_MAX_TESTS_PER_JOB = 150
31
32_FAILURE_TYPES = (
33    base_test_result.ResultType.FAIL,
34    base_test_result.ResultType.CRASH,
35    base_test_result.ResultType.TIMEOUT,
36)
37
38# RegExp to detect logcat lines, e.g., 'I/AssetManager: not found'.
39_LOGCAT_RE = re.compile(r' ?\d+\| (:?\d+\| )?[A-Z]/[\w\d_-]+:')
40
41# Regex to detect start or failure of tests. Matches
42# [ RUN      ] org.ui.ForeignSessionItemViewBinderUnitTest.test_phone[28]
43# [ FAILED|CRASHED|TIMEOUT ] org.ui.ForeignBinderUnitTest.test_phone[28] (56 ms)
44_TEST_START_RE = re.compile(r'.*\[\s+RUN\s+\]\s(.*)')
45_TEST_FAILED_RE = re.compile(r'.*\[\s+(?:FAILED|CRASHED|TIMEOUT)\s+\]')
46
47
48@dataclasses.dataclass
49class _TestGroup:
50  config: str
51  methods_by_class: dict
52
53
54@dataclasses.dataclass
55class _Job:
56  shard_id: int
57  cmd: str
58  timeout: int
59  json_config: dict
60  json_results_path: str
61
62
63class LocalMachineJunitTestRun(test_run.TestRun):
64  # override
65  def TestPackage(self):
66    return self._test_instance.suite
67
68  # override
69  def SetUp(self):
70    pass
71
72  def _GetFilterArgs(self):
73    ret = []
74    for test_filter in self._test_instance.test_filters:
75      ret += ['--gtest-filter', test_filter]
76
77    if self._test_instance.package_filter:
78      ret += ['--package-filter', self._test_instance.package_filter]
79    if self._test_instance.runner_filter:
80      ret += ['--runner-filter', self._test_instance.runner_filter]
81
82    return ret
83
84  def _CreatePropertiesJar(self, temp_dir):
85    # Create properties file for Robolectric test runners so they can find the
86    # binary resources.
87    properties_jar_path = os.path.join(temp_dir, 'properties.jar')
88    resource_apk = self._test_instance.resource_apk
89    with zipfile.ZipFile(properties_jar_path, 'w') as z:
90      z.writestr('com/android/tools/test_config.properties',
91                 'android_resource_apk=%s\n' % resource_apk)
92      props = [
93          'application = android.app.Application',
94          'sdk = 28',
95          ('shadows = org.chromium.testing.local.'
96           'CustomShadowApplicationPackageManager'),
97      ]
98      z.writestr('robolectric.properties', '\n'.join(props))
99    return properties_jar_path
100
101  def _CreateJvmArgsList(self, for_listing=False, allow_debugging=True):
102    # Creates a list of jvm_args (robolectric, code coverage, etc...)
103    jvm_args = [
104        '-Drobolectric.dependency.dir=%s' %
105        self._test_instance.robolectric_runtime_deps_dir,
106        '-Ddir.source.root=%s' % constants.DIR_SOURCE_ROOT,
107        # Use locally available sdk jars from 'robolectric.dependency.dir'
108        '-Drobolectric.offline=true',
109        '-Drobolectric.resourcesMode=binary',
110        '-Drobolectric.logging=stdout',
111        '-Djava.library.path=%s' % self._test_instance.native_libs_dir,
112    ]
113    if self._test_instance.debug_socket and allow_debugging:
114      jvm_args += [
115          '-Dchromium.jdwp_active=true',
116          ('-agentlib:jdwp=transport=dt_socket'
117           ',server=y,suspend=y,address=%s' % self._test_instance.debug_socket)
118      ]
119
120    if self._test_instance.coverage_dir and not for_listing:
121      if not os.path.exists(self._test_instance.coverage_dir):
122        os.makedirs(self._test_instance.coverage_dir)
123      elif not os.path.isdir(self._test_instance.coverage_dir):
124        raise Exception('--coverage-dir takes a directory, not file path.')
125      # Jacoco supports concurrent processes using the same output file:
126      # https://github.com/jacoco/jacoco/blob/6cd3f0bd8e348f8fba7bffec5225407151f1cc91/org.jacoco.agent.rt/src/org/jacoco/agent/rt/internal/output/FileOutput.java#L67
127      # So no need to vary the output based on shard number.
128      jacoco_coverage_file = os.path.join(self._test_instance.coverage_dir,
129                                          '%s.exec' % self._test_instance.suite)
130      if self._test_instance.coverage_on_the_fly:
131        jacoco_agent_path = os.path.join(host_paths.DIR_SOURCE_ROOT,
132                                         'third_party', 'jacoco', 'lib',
133                                         'jacocoagent.jar')
134
135        # inclnolocationclasses is false to prevent no class def found error.
136        jacoco_args = '-javaagent:{}=destfile={},inclnolocationclasses=false'
137        jvm_args.append(
138            jacoco_args.format(jacoco_agent_path, jacoco_coverage_file))
139      else:
140        jvm_args.append('-Djacoco-agent.destfile=%s' % jacoco_coverage_file)
141
142    return jvm_args
143
144  def _ChooseNumWorkers(self, num_jobs):
145    if self._test_instance.debug_socket:
146      num_workers = 1
147    elif self._test_instance.shards is not None:
148      num_workers = self._test_instance.shards
149    else:
150      num_workers = max(1, multiprocessing.cpu_count() // 2)
151    return min(num_workers, num_jobs)
152
153  @property
154  def _wrapper_path(self):
155    return os.path.join(constants.GetOutDirectory(), 'bin', 'helper',
156                        self._test_instance.suite)
157
158  def _QueryTestJsonConfig(self,
159                           temp_dir,
160                           allow_debugging=True,
161                           enable_shadow_allowlist=False):
162    json_config_path = os.path.join(temp_dir, 'main_test_config.json')
163    cmd = [self._wrapper_path]
164    # Allow debugging of test listing when run as:
165    # "--wait-for-java-debugger --list-tests"
166    jvm_args = self._CreateJvmArgsList(for_listing=True,
167                                       allow_debugging=allow_debugging)
168    if jvm_args:
169      cmd += ['--jvm-args', '"%s"' % ' '.join(jvm_args)]
170    cmd += ['--classpath', self._CreatePropertiesJar(temp_dir)]
171    cmd += ['--list-tests', '--json-config', json_config_path]
172    if enable_shadow_allowlist and self._test_instance.shadows_allowlist:
173      cmd += ['--shadows-allowlist', self._test_instance.shadows_allowlist]
174    cmd += self._GetFilterArgs()
175    subprocess.run(cmd, check=True)
176    with open(json_config_path) as f:
177      return json.load(f)
178
179  def _MakeJob(self, shard_id, temp_dir, test_group, properties_jar_path,
180               json_config):
181    json_results_path = os.path.join(temp_dir, f'results{shard_id}.json')
182    job_json_config_path = os.path.join(temp_dir, f'config{shard_id}.json')
183    job_json_config = json_config.copy()
184    job_json_config['configs'] = {
185        test_group.config: test_group.methods_by_class
186    }
187    with open(job_json_config_path, 'w') as f:
188      json.dump(job_json_config, f)
189
190    cmd = [self._wrapper_path]
191    cmd += ['--jvm-args', '"%s"' % ' '.join(self._CreateJvmArgsList())]
192    cmd += ['--classpath', properties_jar_path]
193    cmd += ['--json-results', json_results_path]
194    cmd += ['--json-config', job_json_config_path]
195
196    if self._test_instance.debug_socket:
197      timeout = 999999
198    else:
199      # 20 seconds for process init,
200      # 5 seconds per class,
201      # 3 seconds per method.
202      num_classes = len(test_group.methods_by_class)
203      num_tests = sum(len(x) for x in test_group.methods_by_class.values())
204      timeout = 20 + 5 * num_classes + num_tests * 3
205    return _Job(shard_id=shard_id,
206                cmd=cmd,
207                timeout=timeout,
208                json_config=job_json_config,
209                json_results_path=json_results_path)
210
211  #override
212  def GetTestsForListing(self):
213    with tempfile_ext.NamedTemporaryDirectory() as temp_dir:
214      json_config = self._QueryTestJsonConfig(temp_dir)
215      ret = []
216      for config in json_config['configs'].values():
217        for class_name, methods in config.items():
218          ret.extend(f'{class_name}.{method}' for method in methods)
219      ret.sort()
220      return ret
221
222  # override
223  def RunTests(self, results, raw_logs_fh=None):
224    with tempfile_ext.NamedTemporaryDirectory() as temp_dir:
225      self._RunTestsInternal(temp_dir, results, raw_logs_fh)
226
227  def _RunTestsInternal(self, temp_dir, results, raw_logs_fh):
228    if self._test_instance.json_config:
229      with open(self._test_instance.json_config) as f:
230        json_config = json.load(f)
231    else:
232      # TODO(crbug.com/40878339): This step can take 3-4 seconds for
233      # chrome_junit_tests.
234      try:
235        json_config = self._QueryTestJsonConfig(temp_dir,
236                                                allow_debugging=False,
237                                                enable_shadow_allowlist=True)
238      except subprocess.CalledProcessError:
239        results.append(_MakeUnknownFailureResult('Filter matched no tests'))
240        return
241    test_groups = GroupTests(json_config, _MAX_TESTS_PER_JOB)
242
243    shard_list = list(range(len(test_groups)))
244    shard_filter = self._test_instance.shard_filter
245    if shard_filter:
246      shard_list = [x for x in shard_list if x in shard_filter]
247
248    if not shard_list:
249      results.append(_MakeUnknownFailureResult('Invalid shard filter'))
250      return
251
252    num_workers = self._ChooseNumWorkers(len(shard_list))
253    if shard_filter:
254      logging.warning('Running test shards: %s using %s concurrent process(es)',
255                      ', '.join(str(x) for x in shard_list), num_workers)
256    else:
257      logging.warning(
258          'Running tests with %d shard(s) using %s concurrent process(es).',
259          len(shard_list), num_workers)
260
261    properties_jar_path = self._CreatePropertiesJar(temp_dir)
262    jobs = [
263        self._MakeJob(i, temp_dir, test_groups[i], properties_jar_path,
264                      json_config) for i in shard_list
265    ]
266
267    show_logcat = logging.getLogger().isEnabledFor(logging.INFO)
268    num_omitted_lines = 0
269    failed_test_logs = {}
270    log_lines = []
271    current_test = None
272    for line in RunCommandsAndSerializeOutput(jobs, num_workers):
273      if raw_logs_fh:
274        raw_logs_fh.write(line)
275      if show_logcat or not _LOGCAT_RE.match(line):
276        sys.stdout.write(line)
277      else:
278        num_omitted_lines += 1
279
280      # Collect log data between a test starting and the test failing.
281      # There can be info after a test fails and before the next test starts
282      # that we discard.
283      test_start_match = _TEST_START_RE.match(line)
284      if test_start_match:
285        current_test = test_start_match.group(1)
286        log_lines = [line]
287      elif _TEST_FAILED_RE.match(line) and current_test:
288        log_lines.append(line)
289        failed_test_logs[current_test] = ''.join(log_lines)
290        current_test = None
291      else:
292        log_lines.append(line)
293
294    if num_omitted_lines > 0:
295      logging.critical('%d log lines omitted.', num_omitted_lines)
296    sys.stdout.flush()
297
298    results_list = []
299    failed_jobs = []
300    try:
301      for job in jobs:
302        with open(job.json_results_path, 'r') as f:
303          parsed_results = json_results.ParseResultsFromJson(
304              json.loads(f.read()))
305        has_failed = False
306        for r in parsed_results:
307          if r.GetType() in _FAILURE_TYPES:
308            has_failed = True
309            r.SetLog(failed_test_logs.get(r.GetName().replace('#', '.'), ''))
310
311        results_list += parsed_results
312        if has_failed:
313          failed_jobs.append(job)
314    except IOError:
315      # In the case of a failure in the JUnit or Robolectric test runner
316      # the output json file may never be written.
317      results_list = [
318          base_test_result.BaseTestResult('Test Runner Failure',
319                                          base_test_result.ResultType.UNKNOWN)
320      ]
321
322    if failed_jobs:
323      for job in failed_jobs:
324        print(f'To re-run failed shard {job.shard_id}, use --json-config '
325              'config.json, where config.json contains:')
326        print(json.dumps(job.json_config, indent=2))
327        print()
328
329      print(
330          f'To re-run the {len(failed_jobs)} failed shard(s), use: '
331          f'--shards {num_workers} --shard-filter',
332          ','.join(str(j.shard_id) for j in failed_jobs))
333
334    test_run_results = base_test_result.TestRunResults()
335    test_run_results.AddResults(results_list)
336    results.append(test_run_results)
337
338  # override
339  def TearDown(self):
340    pass
341
342
343def GroupTests(json_config, max_per_job):
344  """Groups tests that will be run on each shard.
345
346  Args:
347    json_config: The result from _QueryTestJsonConfig().
348    max_per_job: Stop adding tests to a group once this limit has been passed.
349
350  Return:
351    Returns a list of _TestGroup.
352  """
353  ret = []
354  for config, methods_by_class in json_config['configs'].items():
355    size = 0
356    group = {}
357    for class_name, methods in methods_by_class.items():
358      # There is some per-class overhead, so do not splits tests from one class
359      # across multiple shards (unless configs differ).
360      group[class_name] = methods
361      size += len(methods)
362      if size >= max_per_job:
363        ret.append(_TestGroup(config, group))
364        group = {}
365        size = 0
366
367    if group:
368      ret.append(_TestGroup(config, group))
369
370  # Put largest shards first to prevent long shards from being scheduled right
371  # at the end.
372  ret.sort(key=lambda x: -len(x.methods_by_class))
373  return ret
374
375
376def _MakeUnknownFailureResult(message):
377  results_list = [
378      base_test_result.BaseTestResult(message,
379                                      base_test_result.ResultType.UNKNOWN)
380  ]
381  test_run_results = base_test_result.TestRunResults()
382  test_run_results.AddResults(results_list)
383  return test_run_results
384
385
386def _DumpJavaStacks(pid):
387  jcmd = os.path.join(constants.JAVA_HOME, 'bin', 'jcmd')
388  cmd = [jcmd, str(pid), 'Thread.print']
389  result = subprocess.run(cmd,
390                          check=False,
391                          stdout=subprocess.PIPE,
392                          encoding='utf8')
393  if result.returncode:
394    return 'Failed to dump stacks\n' + result.stdout
395  return result.stdout
396
397
398def RunCommandsAndSerializeOutput(jobs, num_workers):
399  """Runs multiple commands in parallel and yields serialized output lines.
400
401  Raises:
402    TimeoutError: If timeout is exceeded.
403
404  Yields:
405    Command output.
406  """
407  assert jobs
408  temp_files = [None]  # First shard is streamed directly to stdout.
409  for _ in range(len(jobs) - 1):
410    temp_files.append(tempfile.TemporaryFile(mode='w+t', encoding='utf-8'))
411
412  yield '\n'
413  yield f'Shard {jobs[0].shard_id} output:\n'
414
415  timeout_dumps = {}
416
417  def run_proc(idx):
418    if idx == 0:
419      s_out = subprocess.PIPE
420      s_err = subprocess.STDOUT
421    else:
422      s_out = temp_files[idx]
423      s_err = temp_files[idx]
424
425    job = jobs[idx]
426    proc = cmd_helper.Popen(job.cmd, stdout=s_out, stderr=s_err,
427                            env=getattr(job, 'env', None))
428    # Need to return process so that output can be displayed on stdout
429    # in real time.
430    if idx == 0:
431      return proc
432
433    try:
434      proc.wait(timeout=job.timeout)
435    except subprocess.TimeoutExpired:
436      timeout_dumps[idx] = _DumpJavaStacks(proc.pid)
437      proc.kill()
438
439    # Not needed, but keeps pylint happy.
440    return None
441
442  with ThreadPoolExecutor(max_workers=num_workers) as pool:
443    futures = [pool.submit(run_proc, idx=i) for i in range(len(jobs))]
444
445    yield from _StreamFirstShardOutput(jobs[0], futures[0].result())
446
447    for i, job in enumerate(jobs[1:], 1):
448      shard_id = job.shard_id
449      # Shouldn't cause timeout as run_proc terminates the process with
450      # a proc.wait().
451      futures[i].result()
452      f = temp_files[i]
453      yield '\n'
454      yield f'Shard {shard_id} output:\n'
455      f.seek(0)
456      for line in f.readlines():
457        yield f'{shard_id:2}| {line}'
458      f.close()
459
460  # Output stacks
461  if timeout_dumps:
462    yield '\n'
463    yield ('=' * 80) + '\n'
464    yield '\nOne or more shards timed out.\n'
465    yield ('=' * 80) + '\n'
466    for i, dump in sorted(timeout_dumps.items()):
467      job = jobs[i]
468      yield f'Shard {job.shard_id} timed out after {job.timeout} seconds.\n'
469      yield 'Thread dump:\n'
470      yield dump
471      yield '\n'
472
473    raise cmd_helper.TimeoutError('Junit shards timed out.')
474
475
476def _StreamFirstShardOutput(job, shard_proc):
477  shard_id = job.shard_id
478  # The following will be run from a thread to pump Shard 0 results, allowing
479  # live output while allowing timeout.
480  shard_queue = queue.Queue()
481
482  def pump_stream_to_queue():
483    for line in shard_proc.stdout:
484      shard_queue.put(line)
485    shard_queue.put(None)
486
487  shard_0_pump = threading.Thread(target=pump_stream_to_queue)
488  shard_0_pump.start()
489  deadline = time.time() + job.timeout
490  # Print the first process until timeout or completion.
491  while shard_0_pump.is_alive():
492    try:
493      line = shard_queue.get(timeout=max(0, deadline - time.time()))
494      if line is None:
495        break
496      yield f'{shard_id:2}| {line}'
497    except queue.Empty:
498      if time.time() > deadline:
499        break
500
501  # Output any remaining output from a timed-out first shard.
502  shard_0_pump.join()
503  while not shard_queue.empty():
504    line = shard_queue.get()
505    if line:
506      yield f'{shard_id:2}| {line}'
507