1# Copyright 2017, The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16Atest Tradefed test runner class. 17""" 18 19from __future__ import print_function 20import json 21import logging 22import os 23import re 24import socket 25import subprocess 26 27from functools import partial 28 29# pylint: disable=import-error 30import atest_utils 31import constants 32from event_handler import EventHandler 33from test_finders import test_info 34from test_runners import test_runner_base 35 36POLL_FREQ_SECS = 10 37SOCKET_HOST = '127.0.0.1' 38SOCKET_QUEUE_MAX = 1 39SOCKET_BUFFER = 4096 40# Socket Events of form FIRST_EVENT {JSON_DATA}\nSECOND_EVENT {JSON_DATA} 41# EVENT_RE has groups for the name and the data. "." does not match \n. 42EVENT_RE = re.compile(r'^(?P<event_name>[A-Z_]+) (?P<json_data>{.*})(?:\n|$)') 43 44EXEC_DEPENDENCIES = ('adb', 'aapt') 45 46TRADEFED_EXIT_MSG = ('TradeFed subprocess exited early with exit code=%s.') 47 48 49class TradeFedExitError(Exception): 50 """Raised when TradeFed exists before test run has finished.""" 51 52 53class AtestTradefedTestRunner(test_runner_base.TestRunnerBase): 54 """TradeFed Test Runner class.""" 55 NAME = 'AtestTradefedTestRunner' 56 EXECUTABLE = 'atest_tradefed.sh' 57 _TF_TEMPLATE = 'template/local_min' 58 _RUN_CMD = ('{exe} {template} --template:map ' 59 'test=atest {args}') 60 _BUILD_REQ = {'tradefed-core'} 61 62 def __init__(self, results_dir, module_info=None, **kwargs): 63 """Init stuff for base class.""" 64 super(AtestTradefedTestRunner, self).__init__(results_dir, **kwargs) 65 self.module_info = module_info 66 self.run_cmd_dict = {'exe': self.EXECUTABLE, 67 'template': self._TF_TEMPLATE, 68 'args': ''} 69 self.is_verbose = logging.getLogger().isEnabledFor(logging.DEBUG) 70 self.root_dir = os.environ.get(constants.ANDROID_BUILD_TOP) 71 72 def _try_set_gts_authentication_key(self): 73 """Set GTS authentication key if it is available or exists. 74 75 Strategy: 76 Get APE_API_KEY from os.environ: 77 - If APE_API_KEY is already set by user -> do nothing. 78 Get the APE_API_KEY from constants: 79 - If the key file exists -> set to env var. 80 If APE_API_KEY isn't set and the key file doesn't exist: 81 - Warn user some GTS tests may fail without authentication. 82 """ 83 if os.environ.get('APE_API_KEY'): 84 logging.debug('APE_API_KEY is set by developer.') 85 return 86 ape_api_key = constants.GTS_GOOGLE_SERVICE_ACCOUNT 87 key_path = os.path.join(self.root_dir, ape_api_key) 88 if ape_api_key and os.path.exists(key_path): 89 logging.debug('Set APE_API_KEY: %s', ape_api_key) 90 os.environ['APE_API_KEY'] = ape_api_key 91 else: 92 logging.debug('APE_API_KEY not set, some GTS tests may fail' 93 'without authentication.') 94 95 def run_tests(self, test_infos, extra_args, reporter): 96 """Run the list of test_infos. See base class for more. 97 98 Args: 99 test_infos: A list of TestInfos. 100 extra_args: Dict of extra args to add to test run. 101 reporter: An instance of result_report.ResultReporter. 102 103 Returns: 104 0 if tests succeed, non-zero otherwise. 105 """ 106 # Set google service key if it's available or found before running tests. 107 self._try_set_gts_authentication_key() 108 if os.getenv(test_runner_base.OLD_OUTPUT_ENV_VAR): 109 return self.run_tests_raw(test_infos, extra_args, reporter) 110 return self.run_tests_pretty(test_infos, extra_args, reporter) 111 112 def run_tests_raw(self, test_infos, extra_args, reporter): 113 """Run the list of test_infos. See base class for more. 114 115 Args: 116 test_infos: A list of TestInfos. 117 extra_args: Dict of extra args to add to test run. 118 reporter: An instance of result_report.ResultReporter. 119 120 Returns: 121 0 if tests succeed, non-zero otherwise. 122 """ 123 iterations = self._generate_iterations(extra_args) 124 reporter.register_unsupported_runner(self.NAME) 125 126 ret_code = constants.EXIT_CODE_SUCCESS 127 for _ in range(iterations): 128 run_cmds = self.generate_run_commands(test_infos, extra_args) 129 subproc = self.run(run_cmds[0], output_to_stdout=True) 130 ret_code |= self.wait_for_subprocess(subproc) 131 return ret_code 132 133 def run_tests_pretty(self, test_infos, extra_args, reporter): 134 """Run the list of test_infos. See base class for more. 135 136 Args: 137 test_infos: A list of TestInfos. 138 extra_args: Dict of extra args to add to test run. 139 reporter: An instance of result_report.ResultReporter. 140 141 Returns: 142 0 if tests succeed, non-zero otherwise. 143 """ 144 iterations = self._generate_iterations(extra_args) 145 ret_code = constants.EXIT_CODE_SUCCESS 146 for _ in range(iterations): 147 server = self._start_socket_server() 148 run_cmds = self.generate_run_commands(test_infos, extra_args, 149 server.getsockname()[1]) 150 subproc = self.run(run_cmds[0], output_to_stdout=self.is_verbose) 151 event_handler = EventHandler(reporter, self.NAME) 152 self.handle_subprocess(subproc, partial(self._start_monitor, 153 server, 154 subproc, 155 event_handler)) 156 server.close() 157 ret_code |= self.wait_for_subprocess(subproc) 158 return ret_code 159 160 def _start_monitor(self, server, tf_subproc, event_handler): 161 """Polling and process event. 162 163 Args: 164 server: Socket server object. 165 tf_subproc: The tradefed subprocess to poll. 166 event_handler: EventHandler object. 167 """ 168 conn, addr = self._exec_with_tf_polling(server.accept, tf_subproc) 169 logging.debug('Accepted connection from %s', addr) 170 self._process_connection(conn, event_handler) 171 172 def _start_socket_server(self): 173 """Start a TCP server.""" 174 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 175 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 176 # Port 0 lets the OS pick an open port between 1024 and 65535. 177 server.bind((SOCKET_HOST, 0)) 178 server.listen(SOCKET_QUEUE_MAX) 179 server.settimeout(POLL_FREQ_SECS) 180 logging.debug('Socket server started on port %s', 181 server.getsockname()[1]) 182 return server 183 184 def _exec_with_tf_polling(self, socket_func, tf_subproc): 185 """Check for TF subproc exit during blocking socket func. 186 187 Args: 188 socket_func: A blocking socket function, e.g. recv(), accept(). 189 tf_subproc: The tradefed subprocess to poll. 190 """ 191 while True: 192 try: 193 return socket_func() 194 except socket.timeout: 195 logging.debug('Polling TF subproc for early exit.') 196 if tf_subproc.poll() is not None: 197 logging.debug('TF subproc exited early') 198 raise TradeFedExitError(TRADEFED_EXIT_MSG 199 % tf_subproc.returncode) 200 201 def _process_connection(self, conn, event_handler): 202 """Process a socket connection from TradeFed. 203 204 Expect data of form EVENT_NAME {JSON_DATA}. Multiple events will be 205 \n deliminated. Need to buffer data in case data exceeds socket 206 buffer. 207 208 Args: 209 conn: A socket connection. 210 event_handler: EventHandler object. 211 """ 212 conn.settimeout(None) 213 buf = '' 214 while True: 215 logging.debug('Waiting to receive data') 216 data = conn.recv(SOCKET_BUFFER) 217 logging.debug('received: %s', data) 218 if data: 219 buf += data 220 while True: 221 match = EVENT_RE.match(buf) 222 if match: 223 try: 224 event_data = json.loads(match.group('json_data')) 225 except ValueError: 226 # Json incomplete, wait for more data. 227 break 228 event_name = match.group('event_name') 229 buf = buf[match.end():] 230 event_handler.process_event(event_name, event_data) 231 continue 232 break 233 else: 234 # client sent empty string, so no more data. 235 conn.close() 236 break 237 238 def host_env_check(self): 239 """Check that host env has everything we need. 240 241 We actually can assume the host env is fine because we have the same 242 requirements that atest has. Update this to check for android env vars 243 if that changes. 244 """ 245 pass 246 247 @staticmethod 248 def _is_missing_exec(executable): 249 """Check if system build executable is available. 250 251 Args: 252 executable: Executable we are checking for. 253 Returns: 254 True if executable is missing, False otherwise. 255 """ 256 try: 257 output = subprocess.check_output(['which', executable]) 258 except subprocess.CalledProcessError: 259 return True 260 # TODO: Check if there is a clever way to determine if system adb is 261 # good enough. 262 root_dir = os.environ.get(constants.ANDROID_BUILD_TOP) 263 return os.path.commonprefix([output, root_dir]) != root_dir 264 265 def get_test_runner_build_reqs(self): 266 """Return the build requirements. 267 268 Returns: 269 Set of build targets. 270 """ 271 build_req = self._BUILD_REQ 272 # Use different base build requirements if google-tf is around. 273 if self.module_info.is_module(constants.GTF_MODULE): 274 build_req = {constants.GTF_TARGET} 275 # Always add ATest's own TF target. 276 build_req.add(constants.ATEST_TF_MODULE) 277 # Add adb if we can't find it. 278 for executable in EXEC_DEPENDENCIES: 279 if self._is_missing_exec(executable): 280 build_req.add(executable) 281 return build_req 282 283 @staticmethod 284 def _parse_extra_args(extra_args): 285 """Convert the extra args into something tf can understand. 286 287 Args: 288 extra_args: Dict of args 289 290 Returns: 291 Tuple of args to append and args not supported. 292 """ 293 args_to_append = [] 294 args_not_supported = [] 295 for arg in extra_args: 296 if constants.WAIT_FOR_DEBUGGER == arg: 297 args_to_append.append('--wait-for-debugger') 298 continue 299 if constants.DISABLE_INSTALL == arg: 300 args_to_append.append('--disable-target-preparers') 301 continue 302 if constants.SERIAL == arg: 303 args_to_append.append('--serial') 304 args_to_append.append(extra_args[arg]) 305 continue 306 if constants.DISABLE_TEARDOWN == arg: 307 args_to_append.append('--disable-teardown') 308 continue 309 if constants.HOST == arg: 310 args_to_append.append('-n') 311 args_to_append.append('--prioritize-host-config') 312 args_to_append.append('--skip-host-arch-check') 313 continue 314 if constants.CUSTOM_ARGS == arg: 315 # We might need to sanitize it prior to appending but for now 316 # let's just treat it like a simple arg to pass on through. 317 args_to_append.extend(extra_args[arg]) 318 continue 319 if constants.ALL_ABI == arg: 320 args_to_append.append('--all-abi') 321 continue 322 if constants.DRY_RUN == arg: 323 continue 324 if constants.INSTANT == arg: 325 args_to_append.append('--enable-parameterized-modules') 326 args_to_append.append('--module-parameter') 327 args_to_append.append('instant_app') 328 continue 329 args_not_supported.append(arg) 330 return args_to_append, args_not_supported 331 332 def _generate_metrics_folder(self, extra_args): 333 """Generate metrics folder.""" 334 metrics_folder = '' 335 if extra_args.get(constants.PRE_PATCH_ITERATIONS): 336 metrics_folder = os.path.join(self.results_dir, 'baseline-metrics') 337 elif extra_args.get(constants.POST_PATCH_ITERATIONS): 338 metrics_folder = os.path.join(self.results_dir, 'new-metrics') 339 return metrics_folder 340 341 def _generate_iterations(self, extra_args): 342 """Generate iterations.""" 343 iterations = 1 344 if extra_args.get(constants.PRE_PATCH_ITERATIONS): 345 iterations = extra_args.pop(constants.PRE_PATCH_ITERATIONS) 346 elif extra_args.get(constants.POST_PATCH_ITERATIONS): 347 iterations = extra_args.pop(constants.POST_PATCH_ITERATIONS) 348 return iterations 349 350 def generate_run_commands(self, test_infos, extra_args, port=None): 351 """Generate a single run command from TestInfos. 352 353 Args: 354 test_infos: A set of TestInfo instances. 355 extra_args: A Dict of extra args to append. 356 port: Optional. An int of the port number to send events to. If 357 None, then subprocess reporter in TF won't try to connect. 358 359 Returns: 360 A list that contains the string of atest tradefed run command. 361 Only one command is returned. 362 """ 363 args = self._create_test_args(test_infos) 364 metrics_folder = self._generate_metrics_folder(extra_args) 365 366 # Create a copy of args as more args could be added to the list. 367 test_args = list(args) 368 if port: 369 test_args.extend(['--subprocess-report-port', str(port)]) 370 if metrics_folder: 371 test_args.extend(['--metrics-folder', metrics_folder]) 372 logging.info('Saved metrics in: %s', metrics_folder) 373 log_level = 'VERBOSE' if self.is_verbose else 'WARN' 374 test_args.extend(['--log-level', log_level]) 375 376 args_to_add, args_not_supported = self._parse_extra_args(extra_args) 377 378 # TODO(b/122889707) Remove this after finding the root cause. 379 env_serial = os.environ.get(constants.ANDROID_SERIAL) 380 # Use the env variable ANDROID_SERIAL if it's set by user but only when 381 # the target tests are not deviceless tests. 382 if env_serial and '--serial' not in args_to_add and '-n' not in args_to_add: 383 args_to_add.append("--serial") 384 args_to_add.append(env_serial) 385 386 test_args.extend(args_to_add) 387 if args_not_supported: 388 logging.info('%s does not support the following args %s', 389 self.EXECUTABLE, args_not_supported) 390 391 test_args.extend(atest_utils.get_result_server_args()) 392 self.run_cmd_dict['args'] = ' '.join(test_args) 393 return [self._RUN_CMD.format(**self.run_cmd_dict)] 394 395 def _flatten_test_infos(self, test_infos): 396 """Sort and group test_infos by module_name and sort and group filters 397 by class name. 398 399 Example of three test_infos in a set: 400 Module1, {(classA, {})} 401 Module1, {(classB, {Method1})} 402 Module1, {(classB, {Method2}} 403 Becomes a set with one element: 404 Module1, {(ClassA, {}), (ClassB, {Method1, Method2})} 405 Where: 406 Each line is a test_info namedtuple 407 {} = Frozenset 408 () = TestFilter namedtuple 409 410 Args: 411 test_infos: A set of TestInfo namedtuples. 412 413 Returns: 414 A set of TestInfos flattened. 415 """ 416 results = set() 417 key = lambda x: x.test_name 418 for module, group in atest_utils.sort_and_group(test_infos, key): 419 # module is a string, group is a generator of grouped TestInfos. 420 # Module Test, so flatten test_infos: 421 no_filters = False 422 filters = set() 423 test_runner = None 424 build_targets = set() 425 data = {} 426 module_args = [] 427 for test_info_i in group: 428 data.update(test_info_i.data) 429 # Extend data with constants.TI_MODULE_ARG instead of overwriting. 430 module_args.extend(test_info_i.data.get(constants.TI_MODULE_ARG, [])) 431 test_runner = test_info_i.test_runner 432 build_targets |= test_info_i.build_targets 433 test_filters = test_info_i.data.get(constants.TI_FILTER) 434 if not test_filters or no_filters: 435 # test_info wants whole module run, so hardcode no filters. 436 no_filters = True 437 filters = set() 438 continue 439 filters |= test_filters 440 if module_args: 441 data[constants.TI_MODULE_ARG] = module_args 442 data[constants.TI_FILTER] = self._flatten_test_filters(filters) 443 results.add( 444 test_info.TestInfo(test_name=module, 445 test_runner=test_runner, 446 build_targets=build_targets, 447 data=data)) 448 return results 449 450 @staticmethod 451 def _flatten_test_filters(filters): 452 """Sort and group test_filters by class_name. 453 454 Example of three test_filters in a frozenset: 455 classA, {} 456 classB, {Method1} 457 classB, {Method2} 458 Becomes a frozenset with these elements: 459 classA, {} 460 classB, {Method1, Method2} 461 Where: 462 Each line is a TestFilter namedtuple 463 {} = Frozenset 464 465 Args: 466 filters: A frozenset of test_filters. 467 468 Returns: 469 A frozenset of test_filters flattened. 470 """ 471 results = set() 472 key = lambda x: x.class_name 473 for class_name, group in atest_utils.sort_and_group(filters, key): 474 # class_name is a string, group is a generator of TestFilters 475 assert class_name is not None 476 methods = set() 477 for test_filter in group: 478 if not test_filter.methods: 479 # Whole class should be run 480 methods = set() 481 break 482 methods |= test_filter.methods 483 results.add(test_info.TestFilter(class_name, frozenset(methods))) 484 return frozenset(results) 485 486 def _create_test_args(self, test_infos): 487 """Compile TF command line args based on the given test infos. 488 489 Args: 490 test_infos: A set of TestInfo instances. 491 492 Returns: A list of TF arguments to run the tests. 493 """ 494 args = [] 495 if not test_infos: 496 return [] 497 498 # Only need to check one TestInfo to determine if the tests are 499 # configured in TEST_MAPPING. 500 if test_infos[0].from_test_mapping: 501 args.extend(constants.TEST_MAPPING_RESULT_SERVER_ARGS) 502 test_infos = self._flatten_test_infos(test_infos) 503 504 for info in test_infos: 505 args.extend([constants.TF_INCLUDE_FILTER, info.test_name]) 506 filters = set() 507 for test_filter in info.data.get(constants.TI_FILTER, []): 508 filters.update(test_filter.to_set_of_tf_strings()) 509 for test_filter in filters: 510 filter_arg = constants.TF_ATEST_INCLUDE_FILTER_VALUE_FMT.format( 511 test_name=info.test_name, test_filter=test_filter) 512 args.extend([constants.TF_ATEST_INCLUDE_FILTER, filter_arg]) 513 for option in info.data.get(constants.TI_MODULE_ARG, []): 514 if constants.TF_INCLUDE_FILTER_OPTION == option[0]: 515 suite_filter = ( 516 constants.TF_SUITE_FILTER_ARG_VALUE_FMT.format( 517 test_name=info.test_name, option_value=option[1])) 518 args.extend([constants.TF_INCLUDE_FILTER, suite_filter]) 519 elif constants.TF_EXCLUDE_FILTER_OPTION == option[0]: 520 suite_filter = ( 521 constants.TF_SUITE_FILTER_ARG_VALUE_FMT.format( 522 test_name=info.test_name, option_value=option[1])) 523 args.extend([constants.TF_EXCLUDE_FILTER, suite_filter]) 524 else: 525 module_arg = ( 526 constants.TF_MODULE_ARG_VALUE_FMT.format( 527 test_name=info.test_name, option_name=option[0], 528 option_value=option[1])) 529 args.extend([constants.TF_MODULE_ARG, module_arg]) 530 return args 531