1# Copyright 2023 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import collections 6import contextlib 7import dataclasses 8import json 9import logging 10import os 11import re 12import subprocess 13import sys 14import time 15from typing import Optional 16 17from devil.android import logcat_monitor 18from devil.android.tools import script_common 19from devil.android.tools import webview_app 20from pylib.base import base_test_result 21from pylib.base import test_exception 22from pylib.base import test_run 23from pylib.local.machine import local_machine_junit_test_run as junitrun 24from pylib.symbols import stack_symbolizer 25 26 27_FAILURE_TYPES = ( 28 base_test_result.ResultType.FAIL, 29 base_test_result.ResultType.CRASH, 30 base_test_result.ResultType.TIMEOUT, 31) 32 33_TEST_START_RE = re.compile(r'.*=+ (\S+) STARTED: .*=+') 34_TEST_END_RE = re.compile(r'.*=+ (\S+) ENDED: .*=+') 35_JSON_RESULTS_RE = re.compile( 36 r'.*D/LUCIResultReporter: JSON result for LUCI: (.*\.json)\b') 37 38LOGCAT_FILTERS = [ 39 'chromium:v', 40 'cr_*:v', 41 'DEBUG:I', 42 'StrictMode:D', 43] 44 45@dataclasses.dataclass 46class _Job: 47 shard_id: int 48 cmd: str 49 timeout: int 50 env: Optional[dict] = None 51 52 53class LocalMachineHostsideTestRun(test_run.TestRun): 54 def __init__(self, env, test_instance): 55 super().__init__(env, test_instance) 56 self.webview_context = None 57 self.device = None 58 self.test_name_to_logcat = collections.defaultdict(collections.deque) 59 60 # override 61 def TestPackage(self): 62 return self._test_instance.suite 63 64 # override 65 def GetTestsForListing(self): 66 return self._GetTests() 67 68 # override 69 def SetUp(self): 70 if self._test_instance.use_webview_provider: 71 self.device = script_common.GetDevices( 72 requested_devices=None, 73 denylist_file=None)[0] 74 for apk in self._test_instance.additional_apks: 75 self.device.Install(apk) 76 self.webview_context = webview_app.UseWebViewProvider( 77 self.device, 78 self._test_instance.use_webview_provider) 79 # Pylint is not smart enough to realize that this field has 80 # an __enter__ method, and will complain loudly. 81 # pylint: disable=no-member 82 self.webview_context.__enter__() 83 # pylint: enable=no-member 84 85 @staticmethod 86 def _ApplyExternalSharding(tests, shard_index, total_shards): 87 logging.info('Using external sharding settings. This is shard %d/%d', 88 shard_index, total_shards) 89 90 if total_shards < 0 or shard_index < 0 or total_shards <= shard_index: 91 raise test_exception.InvalidShardingSettings(shard_index, total_shards) 92 93 return tests[shard_index::total_shards] 94 95 def _MakeModeArgs(self): 96 if self._test_instance.instant_mode: 97 mode_args = [ 98 '--module-parameter', 99 'INSTANT_APP', 100 ] 101 else: 102 mode_args = [ 103 '--exclude-filter', 104 f'{self.TestPackage()}[instant]', 105 ] 106 return mode_args 107 108 def _MakeEnv(self): 109 return dict( 110 os.environ, 111 PATH=':'.join([ 112 os.getenv('PATH'), 113 self._test_instance.aapt_path, 114 self._test_instance.adb_path]), 115 CTS_ROOT=os.path.join( 116 os.path.dirname(self._test_instance.tradefed_executable), 117 os.pardir, 118 os.pardir 119 ) 120 ) 121 122 def _GetTests(self): 123 filter_args = [] 124 # use passed in filters to list all filters tests 125 for combined_filter in self._test_instance.test_filters: 126 pattern_groups = combined_filter.split('-') 127 negative_pattern = pattern_groups[1] if len(pattern_groups) > 1 else '' 128 positive_pattern = pattern_groups[0] 129 if negative_pattern: 130 for exclude_filter in negative_pattern.split(':'): 131 filter_args.extend([ 132 '--exclude-filter', 133 self.TestPackage() 134 + '[instant]' * self._test_instance.instant_mode 135 + ' ' + '#'.join(exclude_filter.rsplit('.', 1)), 136 ]) 137 if positive_pattern: 138 for include_filter in positive_pattern.split(':'): 139 filter_args.extend([ 140 '--include-filter', 141 self.TestPackage() 142 + '[instant]' * self._test_instance.instant_mode 143 + ' ' + '#'.join(include_filter.rsplit('.', 1)), 144 ]) 145 146 cmd = [ 147 self._test_instance.tradefed_executable, 148 'run', 149 'commandAndExit', 150 'cts', 151 '-m', 152 self.TestPackage(), 153 ] + self._MakeModeArgs() + filter_args + ['--collect-tests-only'] 154 155 logging.info('Getting tests from cts-tradefed using --collect_tests_only.') 156 output = subprocess.check_output( 157 cmd, 158 timeout=600, 159 env=self._MakeEnv(), 160 universal_newlines=True, 161 ) 162 163 tests = set() 164 for line in output.splitlines(): 165 if test_start_match := _TEST_START_RE.match(line): 166 tests.add(test_start_match.group(1)) 167 tests = self._ApplyExternalSharding( 168 sorted(tests), 169 self._test_instance.external_shard_index, 170 self._test_instance.total_external_shards) 171 return tests 172 173 def _MakeJob(self): 174 filter_args = [] 175 # only include current shard tests 176 for test in self._GetTests(): 177 filter_args.extend([ 178 '--include-filter', 179 self.TestPackage() + '[instant]' * self._test_instance.instant_mode 180 + ' ' + test, 181 ]) 182 183 cmd = [ 184 self._test_instance.tradefed_executable, 185 'run', 186 'commandAndExit', 187 'cts', 188 '-m', 189 self.TestPackage(), 190 ] + self._MakeModeArgs() + filter_args + [ 191 '--retry-strategy', 192 'RETRY_ANY_FAILURE', 193 '--max-testcase-run-count', 194 str(self._test_instance.max_tries), 195 '--template:map', 196 'reporters=../../build/android/pylib/local/machine/' 197 'local_machine_hostside_tradefed_config.xml', 198 ] 199 200 return _Job( 201 # Note: the shard_id here is not to be confused with the external shard 202 # index (set by swarming). This is a local shard index as we are reusing 203 # more general-purpose code that allows for having multiple subprocesses 204 # run tests locally in parallel. We are not doing that here, so sticking 205 # to a fixed shard_id. 206 shard_id=0, 207 cmd=cmd, 208 timeout=600, 209 env=self._MakeEnv() 210 ) 211 212 # override 213 def RunTests(self, results, raw_logs_fh=None): 214 job = self._MakeJob() 215 216 per_test_logs = {} 217 log_lines = [] 218 current_test = None 219 json_results_path = None 220 archive_logcat = None 221 for line in junitrun.RunCommandsAndSerializeOutput([job], 1): 222 if raw_logs_fh: 223 raw_logs_fh.write(line) 224 sys.stdout.write(line) 225 226 # Collect log data between a test starting and the test failing. 227 # There can be info after a test fails and before the next test starts 228 # that we discard. 229 if test_start_match := _TEST_START_RE.match(line): 230 current_test = test_start_match.group(1) 231 log_lines = [line] 232 if archive_logcat is not None: 233 archive_logcat.__exit__(None, None, None) 234 archive_logcat = self._ArchiveLogcat(self.device, current_test) 235 # Pylint is not smart enough to realize that this field has 236 # an __enter__ method, and will complain loudly. 237 # pylint: disable=no-member 238 archive_logcat.__enter__() 239 # pylint: enable=no-member 240 else: 241 log_lines.append(line) 242 if _TEST_END_RE.match(line) and current_test: 243 per_test_logs[current_test] = ''.join(log_lines) 244 archive_logcat.__exit__(None, None, None) 245 archive_logcat = None 246 current_test = None 247 elif json_results_path_match := _JSON_RESULTS_RE.match(line): 248 json_results_path = json_results_path_match.group(1) 249 250 sys.stdout.flush() 251 if archive_logcat is not None: 252 # Pylint is not smart enough to realize that this field has 253 # an __exit__ method, and will complain loudly. 254 # pylint: disable=no-member 255 archive_logcat.__exit__(None, None, None) 256 # pylint: enable=no-member 257 258 result_list = [] 259 if json_results_path: 260 with open(json_results_path, 'r') as f: 261 json_results = json.load(f) 262 parsed_results = _ParseResultsFromJson(json_results) 263 for r in parsed_results: 264 if r.GetType() in _FAILURE_TYPES: 265 r.SetLog(per_test_logs.get(r.GetName(), '')) 266 attempt_counter = collections.Counter( 267 result.GetName() for result in parsed_results 268 ) 269 for result in parsed_results: 270 test_name = result.GetName() 271 logcat_deque = self.test_name_to_logcat[test_name] 272 if attempt_counter[test_name] == len(logcat_deque): 273 # Set logcat link in FIFO order in case of multiple test attempts 274 result.SetLink('logcat', logcat_deque.popleft()) 275 attempt_counter[test_name] -= 1 276 277 result_list += parsed_results 278 else: 279 # In the case of a failure in the test runner 280 # the output json file may never be written. 281 result_list = [ 282 base_test_result.BaseTestResult( 283 'Test Runner Failure', 284 base_test_result.ResultType.UNKNOWN) 285 ] 286 287 test_run_results = base_test_result.TestRunResults() 288 test_run_results.AddResults(result_list) 289 results.append(test_run_results) 290 291 # override 292 def TearDown(self): 293 if self._test_instance.use_webview_provider: 294 # Pylint is not smart enough to realize that this field has 295 # an __exit__ method, and will complain loudly. 296 # pylint: disable=no-member 297 self.webview_context.__exit__(*sys.exc_info()) 298 # pylint: enable=no-member 299 300 @contextlib.contextmanager 301 def _ArchiveLogcat(self, device, test_name): 302 stream_name = 'logcat_%s_shard%s_%s_%s' % ( 303 test_name.replace('#', '.'), self._test_instance.external_shard_index, 304 time.strftime('%Y%m%dT%H%M%S-UTC', time.gmtime()), device.serial) 305 306 logcat_file = None 307 logmon = None 308 try: 309 with self._env.output_manager.ArchivedTempfile(stream_name, 310 'logcat') as logcat_file: 311 symbolizer = stack_symbolizer.PassThroughSymbolizerPool( 312 device.product_cpu_abi) 313 with symbolizer: 314 with logcat_monitor.LogcatMonitor( 315 device.adb, 316 filter_specs=LOGCAT_FILTERS, 317 output_file=logcat_file.name, 318 check_error=False) as logmon: 319 yield logcat_file 320 finally: 321 if logmon: 322 logmon.Close() 323 if logcat_file and logcat_file.Link(): 324 logging.critical('Logcat saved to %s', logcat_file.Link()) 325 self.test_name_to_logcat[test_name].append(logcat_file.Link()) 326 327def _ParseResultsFromJson(json_results): 328 result_list = [] 329 result_list.extend([ 330 base_test_result.BaseTestResult( 331 tr['testId'], 332 getattr( 333 base_test_result.ResultType, 334 tr['status'], 335 base_test_result.ResultType.UNKNOWN 336 ), 337 duration=int(tr['duration'] * 1000), 338 failure_reason=tr.get('failureReason'), 339 ) 340 for tr in json_results['tr'] 341 ]) 342 return result_list 343