1# Copyright 2016 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import dataclasses 6import json 7import logging 8import multiprocessing 9import os 10import queue 11import re 12import subprocess 13import sys 14import tempfile 15import threading 16import time 17import zipfile 18from concurrent.futures import ThreadPoolExecutor 19 20from devil.utils import cmd_helper 21from py_utils import tempfile_ext 22from pylib import constants 23from pylib.base import base_test_result 24from pylib.base import test_run 25from pylib.constants import host_paths 26from pylib.results import json_results 27 28# Chosen after timing test runs of chrome_junit_tests with 7,16,32, 29# and 64 workers in threadpool and different classes_per_job. 30_MAX_TESTS_PER_JOB = 150 31 32_FAILURE_TYPES = ( 33 base_test_result.ResultType.FAIL, 34 base_test_result.ResultType.CRASH, 35 base_test_result.ResultType.TIMEOUT, 36) 37 38# RegExp to detect logcat lines, e.g., 'I/AssetManager: not found'. 39_LOGCAT_RE = re.compile(r' ?\d+\| (:?\d+\| )?[A-Z]/[\w\d_-]+:') 40 41# Regex to detect start or failure of tests. Matches 42# [ RUN ] org.ui.ForeignSessionItemViewBinderUnitTest.test_phone[28] 43# [ FAILED|CRASHED|TIMEOUT ] org.ui.ForeignBinderUnitTest.test_phone[28] (56 ms) 44_TEST_START_RE = re.compile(r'.*\[\s+RUN\s+\]\s(.*)') 45_TEST_FAILED_RE = re.compile(r'.*\[\s+(?:FAILED|CRASHED|TIMEOUT)\s+\]') 46 47 48@dataclasses.dataclass 49class _TestGroup: 50 config: str 51 methods_by_class: dict 52 53 54@dataclasses.dataclass 55class _Job: 56 shard_id: int 57 cmd: str 58 timeout: int 59 json_config: dict 60 json_results_path: str 61 62 63class LocalMachineJunitTestRun(test_run.TestRun): 64 # override 65 def TestPackage(self): 66 return self._test_instance.suite 67 68 # override 69 def SetUp(self): 70 pass 71 72 def _GetFilterArgs(self): 73 ret = [] 74 for test_filter in self._test_instance.test_filters: 75 ret += ['--gtest-filter', test_filter] 76 77 if self._test_instance.package_filter: 78 ret += ['--package-filter', self._test_instance.package_filter] 79 if self._test_instance.runner_filter: 80 ret += ['--runner-filter', self._test_instance.runner_filter] 81 82 return ret 83 84 def _CreatePropertiesJar(self, temp_dir): 85 # Create properties file for Robolectric test runners so they can find the 86 # binary resources. 87 properties_jar_path = os.path.join(temp_dir, 'properties.jar') 88 resource_apk = self._test_instance.resource_apk 89 with zipfile.ZipFile(properties_jar_path, 'w') as z: 90 z.writestr('com/android/tools/test_config.properties', 91 'android_resource_apk=%s\n' % resource_apk) 92 props = [ 93 'application = android.app.Application', 94 'sdk = 28', 95 ('shadows = org.chromium.testing.local.' 96 'CustomShadowApplicationPackageManager'), 97 ] 98 z.writestr('robolectric.properties', '\n'.join(props)) 99 return properties_jar_path 100 101 def _CreateJvmArgsList(self, for_listing=False, allow_debugging=True): 102 # Creates a list of jvm_args (robolectric, code coverage, etc...) 103 jvm_args = [ 104 '-Drobolectric.dependency.dir=%s' % 105 self._test_instance.robolectric_runtime_deps_dir, 106 '-Ddir.source.root=%s' % constants.DIR_SOURCE_ROOT, 107 # Use locally available sdk jars from 'robolectric.dependency.dir' 108 '-Drobolectric.offline=true', 109 '-Drobolectric.resourcesMode=binary', 110 '-Drobolectric.logging=stdout', 111 '-Djava.library.path=%s' % self._test_instance.native_libs_dir, 112 ] 113 if self._test_instance.debug_socket and allow_debugging: 114 jvm_args += [ 115 '-Dchromium.jdwp_active=true', 116 ('-agentlib:jdwp=transport=dt_socket' 117 ',server=y,suspend=y,address=%s' % self._test_instance.debug_socket) 118 ] 119 120 if self._test_instance.coverage_dir and not for_listing: 121 if not os.path.exists(self._test_instance.coverage_dir): 122 os.makedirs(self._test_instance.coverage_dir) 123 elif not os.path.isdir(self._test_instance.coverage_dir): 124 raise Exception('--coverage-dir takes a directory, not file path.') 125 # Jacoco supports concurrent processes using the same output file: 126 # https://github.com/jacoco/jacoco/blob/6cd3f0bd8e348f8fba7bffec5225407151f1cc91/org.jacoco.agent.rt/src/org/jacoco/agent/rt/internal/output/FileOutput.java#L67 127 # So no need to vary the output based on shard number. 128 jacoco_coverage_file = os.path.join(self._test_instance.coverage_dir, 129 '%s.exec' % self._test_instance.suite) 130 if self._test_instance.coverage_on_the_fly: 131 jacoco_agent_path = os.path.join(host_paths.DIR_SOURCE_ROOT, 132 'third_party', 'jacoco', 'lib', 133 'jacocoagent.jar') 134 135 # inclnolocationclasses is false to prevent no class def found error. 136 jacoco_args = '-javaagent:{}=destfile={},inclnolocationclasses=false' 137 jvm_args.append( 138 jacoco_args.format(jacoco_agent_path, jacoco_coverage_file)) 139 else: 140 jvm_args.append('-Djacoco-agent.destfile=%s' % jacoco_coverage_file) 141 142 return jvm_args 143 144 def _ChooseNumWorkers(self, num_jobs): 145 if self._test_instance.debug_socket: 146 num_workers = 1 147 elif self._test_instance.shards is not None: 148 num_workers = self._test_instance.shards 149 else: 150 num_workers = max(1, multiprocessing.cpu_count() // 2) 151 return min(num_workers, num_jobs) 152 153 @property 154 def _wrapper_path(self): 155 return os.path.join(constants.GetOutDirectory(), 'bin', 'helper', 156 self._test_instance.suite) 157 158 def _QueryTestJsonConfig(self, 159 temp_dir, 160 allow_debugging=True, 161 enable_shadow_allowlist=False): 162 json_config_path = os.path.join(temp_dir, 'main_test_config.json') 163 cmd = [self._wrapper_path] 164 # Allow debugging of test listing when run as: 165 # "--wait-for-java-debugger --list-tests" 166 jvm_args = self._CreateJvmArgsList(for_listing=True, 167 allow_debugging=allow_debugging) 168 if jvm_args: 169 cmd += ['--jvm-args', '"%s"' % ' '.join(jvm_args)] 170 cmd += ['--classpath', self._CreatePropertiesJar(temp_dir)] 171 cmd += ['--list-tests', '--json-config', json_config_path] 172 if enable_shadow_allowlist and self._test_instance.shadows_allowlist: 173 cmd += ['--shadows-allowlist', self._test_instance.shadows_allowlist] 174 cmd += self._GetFilterArgs() 175 subprocess.run(cmd, check=True) 176 with open(json_config_path) as f: 177 return json.load(f) 178 179 def _MakeJob(self, shard_id, temp_dir, test_group, properties_jar_path, 180 json_config): 181 json_results_path = os.path.join(temp_dir, f'results{shard_id}.json') 182 job_json_config_path = os.path.join(temp_dir, f'config{shard_id}.json') 183 job_json_config = json_config.copy() 184 job_json_config['configs'] = { 185 test_group.config: test_group.methods_by_class 186 } 187 with open(job_json_config_path, 'w') as f: 188 json.dump(job_json_config, f) 189 190 cmd = [self._wrapper_path] 191 cmd += ['--jvm-args', '"%s"' % ' '.join(self._CreateJvmArgsList())] 192 cmd += ['--classpath', properties_jar_path] 193 cmd += ['--json-results', json_results_path] 194 cmd += ['--json-config', job_json_config_path] 195 196 if self._test_instance.debug_socket: 197 timeout = 999999 198 else: 199 # 20 seconds for process init, 200 # 5 seconds per class, 201 # 3 seconds per method. 202 num_classes = len(test_group.methods_by_class) 203 num_tests = sum(len(x) for x in test_group.methods_by_class.values()) 204 timeout = 20 + 5 * num_classes + num_tests * 3 205 return _Job(shard_id=shard_id, 206 cmd=cmd, 207 timeout=timeout, 208 json_config=job_json_config, 209 json_results_path=json_results_path) 210 211 #override 212 def GetTestsForListing(self): 213 with tempfile_ext.NamedTemporaryDirectory() as temp_dir: 214 json_config = self._QueryTestJsonConfig(temp_dir) 215 ret = [] 216 for config in json_config['configs'].values(): 217 for class_name, methods in config.items(): 218 ret.extend(f'{class_name}.{method}' for method in methods) 219 ret.sort() 220 return ret 221 222 # override 223 def RunTests(self, results, raw_logs_fh=None): 224 with tempfile_ext.NamedTemporaryDirectory() as temp_dir: 225 self._RunTestsInternal(temp_dir, results, raw_logs_fh) 226 227 def _RunTestsInternal(self, temp_dir, results, raw_logs_fh): 228 if self._test_instance.json_config: 229 with open(self._test_instance.json_config) as f: 230 json_config = json.load(f) 231 else: 232 # TODO(crbug.com/40878339): This step can take 3-4 seconds for 233 # chrome_junit_tests. 234 try: 235 json_config = self._QueryTestJsonConfig(temp_dir, 236 allow_debugging=False, 237 enable_shadow_allowlist=True) 238 except subprocess.CalledProcessError: 239 results.append(_MakeUnknownFailureResult('Filter matched no tests')) 240 return 241 test_groups = GroupTests(json_config, _MAX_TESTS_PER_JOB) 242 243 shard_list = list(range(len(test_groups))) 244 shard_filter = self._test_instance.shard_filter 245 if shard_filter: 246 shard_list = [x for x in shard_list if x in shard_filter] 247 248 if not shard_list: 249 results.append(_MakeUnknownFailureResult('Invalid shard filter')) 250 return 251 252 num_workers = self._ChooseNumWorkers(len(shard_list)) 253 if shard_filter: 254 logging.warning('Running test shards: %s using %s concurrent process(es)', 255 ', '.join(str(x) for x in shard_list), num_workers) 256 else: 257 logging.warning( 258 'Running tests with %d shard(s) using %s concurrent process(es).', 259 len(shard_list), num_workers) 260 261 properties_jar_path = self._CreatePropertiesJar(temp_dir) 262 jobs = [ 263 self._MakeJob(i, temp_dir, test_groups[i], properties_jar_path, 264 json_config) for i in shard_list 265 ] 266 267 show_logcat = logging.getLogger().isEnabledFor(logging.INFO) 268 num_omitted_lines = 0 269 failed_test_logs = {} 270 log_lines = [] 271 current_test = None 272 for line in RunCommandsAndSerializeOutput(jobs, num_workers): 273 if raw_logs_fh: 274 raw_logs_fh.write(line) 275 if show_logcat or not _LOGCAT_RE.match(line): 276 sys.stdout.write(line) 277 else: 278 num_omitted_lines += 1 279 280 # Collect log data between a test starting and the test failing. 281 # There can be info after a test fails and before the next test starts 282 # that we discard. 283 test_start_match = _TEST_START_RE.match(line) 284 if test_start_match: 285 current_test = test_start_match.group(1) 286 log_lines = [line] 287 elif _TEST_FAILED_RE.match(line) and current_test: 288 log_lines.append(line) 289 failed_test_logs[current_test] = ''.join(log_lines) 290 current_test = None 291 else: 292 log_lines.append(line) 293 294 if num_omitted_lines > 0: 295 logging.critical('%d log lines omitted.', num_omitted_lines) 296 sys.stdout.flush() 297 298 results_list = [] 299 failed_jobs = [] 300 try: 301 for job in jobs: 302 with open(job.json_results_path, 'r') as f: 303 parsed_results = json_results.ParseResultsFromJson( 304 json.loads(f.read())) 305 has_failed = False 306 for r in parsed_results: 307 if r.GetType() in _FAILURE_TYPES: 308 has_failed = True 309 r.SetLog(failed_test_logs.get(r.GetName().replace('#', '.'), '')) 310 311 results_list += parsed_results 312 if has_failed: 313 failed_jobs.append(job) 314 except IOError: 315 # In the case of a failure in the JUnit or Robolectric test runner 316 # the output json file may never be written. 317 results_list = [ 318 base_test_result.BaseTestResult('Test Runner Failure', 319 base_test_result.ResultType.UNKNOWN) 320 ] 321 322 if failed_jobs: 323 for job in failed_jobs: 324 print(f'To re-run failed shard {job.shard_id}, use --json-config ' 325 'config.json, where config.json contains:') 326 print(json.dumps(job.json_config, indent=2)) 327 print() 328 329 print( 330 f'To re-run the {len(failed_jobs)} failed shard(s), use: ' 331 f'--shards {num_workers} --shard-filter', 332 ','.join(str(j.shard_id) for j in failed_jobs)) 333 334 test_run_results = base_test_result.TestRunResults() 335 test_run_results.AddResults(results_list) 336 results.append(test_run_results) 337 338 # override 339 def TearDown(self): 340 pass 341 342 343def GroupTests(json_config, max_per_job): 344 """Groups tests that will be run on each shard. 345 346 Args: 347 json_config: The result from _QueryTestJsonConfig(). 348 max_per_job: Stop adding tests to a group once this limit has been passed. 349 350 Return: 351 Returns a list of _TestGroup. 352 """ 353 ret = [] 354 for config, methods_by_class in json_config['configs'].items(): 355 size = 0 356 group = {} 357 for class_name, methods in methods_by_class.items(): 358 # There is some per-class overhead, so do not splits tests from one class 359 # across multiple shards (unless configs differ). 360 group[class_name] = methods 361 size += len(methods) 362 if size >= max_per_job: 363 ret.append(_TestGroup(config, group)) 364 group = {} 365 size = 0 366 367 if group: 368 ret.append(_TestGroup(config, group)) 369 370 # Put largest shards first to prevent long shards from being scheduled right 371 # at the end. 372 ret.sort(key=lambda x: -len(x.methods_by_class)) 373 return ret 374 375 376def _MakeUnknownFailureResult(message): 377 results_list = [ 378 base_test_result.BaseTestResult(message, 379 base_test_result.ResultType.UNKNOWN) 380 ] 381 test_run_results = base_test_result.TestRunResults() 382 test_run_results.AddResults(results_list) 383 return test_run_results 384 385 386def _DumpJavaStacks(pid): 387 jcmd = os.path.join(constants.JAVA_HOME, 'bin', 'jcmd') 388 cmd = [jcmd, str(pid), 'Thread.print'] 389 result = subprocess.run(cmd, 390 check=False, 391 stdout=subprocess.PIPE, 392 encoding='utf8') 393 if result.returncode: 394 return 'Failed to dump stacks\n' + result.stdout 395 return result.stdout 396 397 398def RunCommandsAndSerializeOutput(jobs, num_workers): 399 """Runs multiple commands in parallel and yields serialized output lines. 400 401 Raises: 402 TimeoutError: If timeout is exceeded. 403 404 Yields: 405 Command output. 406 """ 407 assert jobs 408 temp_files = [None] # First shard is streamed directly to stdout. 409 for _ in range(len(jobs) - 1): 410 temp_files.append(tempfile.TemporaryFile(mode='w+t', encoding='utf-8')) 411 412 yield '\n' 413 yield f'Shard {jobs[0].shard_id} output:\n' 414 415 timeout_dumps = {} 416 417 def run_proc(idx): 418 if idx == 0: 419 s_out = subprocess.PIPE 420 s_err = subprocess.STDOUT 421 else: 422 s_out = temp_files[idx] 423 s_err = temp_files[idx] 424 425 job = jobs[idx] 426 proc = cmd_helper.Popen(job.cmd, stdout=s_out, stderr=s_err, 427 env=getattr(job, 'env', None)) 428 # Need to return process so that output can be displayed on stdout 429 # in real time. 430 if idx == 0: 431 return proc 432 433 try: 434 proc.wait(timeout=job.timeout) 435 except subprocess.TimeoutExpired: 436 timeout_dumps[idx] = _DumpJavaStacks(proc.pid) 437 proc.kill() 438 439 # Not needed, but keeps pylint happy. 440 return None 441 442 with ThreadPoolExecutor(max_workers=num_workers) as pool: 443 futures = [pool.submit(run_proc, idx=i) for i in range(len(jobs))] 444 445 yield from _StreamFirstShardOutput(jobs[0], futures[0].result()) 446 447 for i, job in enumerate(jobs[1:], 1): 448 shard_id = job.shard_id 449 # Shouldn't cause timeout as run_proc terminates the process with 450 # a proc.wait(). 451 futures[i].result() 452 f = temp_files[i] 453 yield '\n' 454 yield f'Shard {shard_id} output:\n' 455 f.seek(0) 456 for line in f.readlines(): 457 yield f'{shard_id:2}| {line}' 458 f.close() 459 460 # Output stacks 461 if timeout_dumps: 462 yield '\n' 463 yield ('=' * 80) + '\n' 464 yield '\nOne or more shards timed out.\n' 465 yield ('=' * 80) + '\n' 466 for i, dump in sorted(timeout_dumps.items()): 467 job = jobs[i] 468 yield f'Shard {job.shard_id} timed out after {job.timeout} seconds.\n' 469 yield 'Thread dump:\n' 470 yield dump 471 yield '\n' 472 473 raise cmd_helper.TimeoutError('Junit shards timed out.') 474 475 476def _StreamFirstShardOutput(job, shard_proc): 477 shard_id = job.shard_id 478 # The following will be run from a thread to pump Shard 0 results, allowing 479 # live output while allowing timeout. 480 shard_queue = queue.Queue() 481 482 def pump_stream_to_queue(): 483 for line in shard_proc.stdout: 484 shard_queue.put(line) 485 shard_queue.put(None) 486 487 shard_0_pump = threading.Thread(target=pump_stream_to_queue) 488 shard_0_pump.start() 489 deadline = time.time() + job.timeout 490 # Print the first process until timeout or completion. 491 while shard_0_pump.is_alive(): 492 try: 493 line = shard_queue.get(timeout=max(0, deadline - time.time())) 494 if line is None: 495 break 496 yield f'{shard_id:2}| {line}' 497 except queue.Empty: 498 if time.time() > deadline: 499 break 500 501 # Output any remaining output from a timed-out first shard. 502 shard_0_pump.join() 503 while not shard_queue.empty(): 504 line = shard_queue.get() 505 if line: 506 yield f'{shard_id:2}| {line}' 507