1#!/usr/bin/env python3 2# 3# Copyright 2023, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Module to facilitate integration test within the build and test environment. 18 19This module provides utilities for running tests in both build and test 20environments, managing environment variables, and snapshotting the workspace for 21restoration later. 22""" 23 24import argparse 25import atexit 26import concurrent.futures 27import copy 28import datetime 29import functools 30import itertools 31import logging 32import multiprocessing 33import os 34import pathlib 35import shutil 36import subprocess 37import sys 38import tarfile 39import tempfile 40import time 41import traceback 42from typing import Any, Callable, Iterator 43import unittest 44import zipfile 45 46from snapshot import Snapshot 47 48# Env key for the storage tar path. 49SNAPSHOT_STORAGE_TAR_KEY = 'SNAPSHOT_STORAGE_TAR_PATH' 50 51# Env key for the repo root 52ANDROID_BUILD_TOP_KEY = 'ANDROID_BUILD_TOP' 53 54 55class IntegrationTestConfiguration: 56 """Internal class to store integration test configuration.""" 57 58 device_serial: str = None 59 is_build_env: bool = False 60 is_test_env: bool = False 61 snapshot_storage_path: pathlib.Path = None 62 snapshot_storage_tar_path: pathlib.Path = None 63 workspace_path: pathlib.Path = None 64 is_tar_snapshot: bool = False 65 66 67class StepInput: 68 """Input information for a build/test step.""" 69 70 def __init__(self, env, repo_root, config, objs): 71 self._env = env 72 self._repo_root = repo_root 73 self._config = config 74 self._objs = objs 75 76 def get_device_serial_args_or_empty(self) -> str: 77 """Gets command arguments for device serial. May return empty string.""" 78 # TODO: b/336839543 - Remove this method when we deprecate the support to 79 # run the integration test directly through 'python **.py' command. 80 if self._config.device_serial: 81 return ' -s ' + self._config.device_serial 82 if ANDROID_BUILD_TOP_KEY not in os.environ and self._config.is_test_env: 83 # Likely in test lab environment, where connected devices can are 84 # allocated to other tests. In this case we must explicitly set device 85 # serials in any atest calls . 86 raise RuntimeError('Device serial is required but not set') 87 # Empty is allowed because it allows tradefed to decide which device to 88 # select in local run. 89 return '' 90 91 def get_device_serial(self) -> str: 92 """Returns the serial of the connected device. Throws if not set.""" 93 if not self._config.device_serial: 94 raise RuntimeError('Device serial is not set') 95 return self._config.device_serial 96 97 def get_env(self): 98 """Get environment variables.""" 99 return self._env 100 101 def get_repo_root(self) -> str: 102 """Get repo root directory.""" 103 return self._repo_root 104 105 def get_obj(self, name: str) -> Any: 106 """Get an object saved in previous snapshot.""" 107 return self._objs.get(name, None) 108 109 def get_config(self) -> IntegrationTestConfiguration: 110 """Get the integration test configuration.""" 111 return self._config 112 113 114class StepOutput: 115 """Output information generated from a build step.""" 116 117 def __init__(self): 118 self._snapshot_include_paths: list[str] = [] 119 self._snapshot_exclude_paths: list[str] = [] 120 self._snapshot_env_keys: list[str] = [] 121 self._snapshot_objs: dict[str, Any] = {} 122 123 def add_snapshot_include_paths(self, paths: list[str]) -> None: 124 """Add paths to include in snapshot artifacts.""" 125 self._snapshot_include_paths.extend(paths) 126 127 def set_snapshot_include_paths(self, paths: list[str]) -> None: 128 """Set the snapshot include paths. 129 130 Note that the default include paths will be removed. 131 Use add_snapshot_include_paths if that's not intended. 132 133 Args: 134 paths: The new list of paths to include for snapshot. 135 """ 136 self._snapshot_include_paths.clear() 137 self._snapshot_include_paths.extend(paths) 138 139 def add_snapshot_exclude_paths(self, paths: list[str]) -> None: 140 """Add paths to exclude from snapshot artifacts.""" 141 self._snapshot_exclude_paths.extend(paths) 142 143 def add_snapshot_env_keys(self, keys: list[str]) -> None: 144 """Add environment variable keys for snapshot.""" 145 self._snapshot_env_keys.extend(keys) 146 147 def add_snapshot_obj(self, name: str, obj: Any): 148 """Add objects to save in snapshot.""" 149 self._snapshot_objs[name] = obj 150 151 def get_snapshot_include_paths(self): 152 """Returns the stored snapshot include path list.""" 153 return self._snapshot_include_paths 154 155 def get_snapshot_exclude_paths(self): 156 """Returns the stored snapshot exclude path list.""" 157 return self._snapshot_exclude_paths 158 159 def get_snapshot_env_keys(self): 160 """Returns the stored snapshot env key list.""" 161 return self._snapshot_env_keys 162 163 def get_snapshot_objs(self): 164 """Returns the stored snapshot object dictionary.""" 165 return self._snapshot_objs 166 167 168class SplitBuildTestScript: 169 """Utility for running integration test in build and test environment.""" 170 171 def __init__(self, name: str, config: IntegrationTestConfiguration) -> None: 172 self._config = config 173 self._id: str = name 174 self._snapshot: Snapshot = Snapshot(self._config.snapshot_storage_path) 175 self._has_already_run: bool = False 176 self._steps: list[self._Step] = [] 177 self._snapshot_restore_exclude_paths: list[str] = [] 178 179 def get_config(self) -> IntegrationTestConfiguration: 180 return self._config 181 182 def add_build_step(self, step_func: Callable[StepInput, StepOutput]): 183 """Add a build step. 184 185 Args: 186 step_func: A function that takes a StepInput object and returns a 187 StepOutput object. 188 189 Raises: 190 RuntimeError: Unexpected step orders detected. 191 """ 192 if self._steps and isinstance(self._steps[-1], self._BuildStep): 193 raise RuntimeError( 194 'Two adjacent build steps are unnecessary. Combine them.' 195 ) 196 self._steps.append(self._BuildStep(step_func)) 197 198 def add_test_step(self, step_func: Callable[StepInput, None]): 199 """Add a test step. 200 201 Args: 202 step_func: A function that takes a StepInput object. 203 204 Raises: 205 RuntimeError: Unexpected step orders detected. 206 """ 207 if not self._steps or isinstance(self._steps[-1], self._TestStep): 208 raise RuntimeError('A build step is required before a test step.') 209 self._steps.append(self._TestStep(step_func)) 210 211 def _exception_to_dict(self, exception: Exception): 212 """Converts an exception object to a dictionary to be saved by json.""" 213 return { 214 'type': exception.__class__.__name__, 215 'message': str(exception), 216 'traceback': ''.join(traceback.format_tb(exception.__traceback__)), 217 } 218 219 def _dict_to_exception(self, exception_dict: dict[str, str]): 220 """Converts a dictionary to an exception object.""" 221 return RuntimeError( 222 'The last build step raised an exception:\n' 223 f'{exception_dict["type"]}: {exception_dict["message"]}\n' 224 'Traceback (from saved snapshot):\n' 225 f'{exception_dict["traceback"]}' 226 ) 227 228 def run(self): 229 """Run the steps added previously. 230 231 This function cannot be executed more than once. 232 Raises: 233 RuntimeError: When attempted to run the script multiple times. 234 """ 235 if self._has_already_run: 236 raise RuntimeError(f'Script {self.name} has already run.') 237 self._has_already_run = True 238 239 build_step_exception_key = '_internal_build_step_exception' 240 241 for index, step in enumerate(self._steps): 242 if isinstance(step, self._BuildStep) and self.get_config().is_build_env: 243 env = os.environ 244 step_in = StepInput( 245 env, 246 self._get_repo_root(os.environ), 247 self.get_config(), 248 {}, 249 ) 250 last_exception = None 251 try: 252 step_out = step.get_step_func()(step_in) 253 # pylint: disable=broad-exception-caught 254 except Exception as e: 255 last_exception = e 256 step_out = StepOutput() 257 step_out.add_snapshot_obj( 258 build_step_exception_key, self._exception_to_dict(e) 259 ) 260 261 self._take_snapshot( 262 self._get_repo_root(os.environ), 263 self._id + '_' + str(index // 2), 264 step_out, 265 env, 266 ) 267 268 if last_exception: 269 raise last_exception 270 271 if isinstance(step, self._TestStep) and self.get_config().is_test_env: 272 env, objs = self._restore_snapshot(self._id + '_' + str(index // 2)) 273 274 if build_step_exception_key in objs: 275 raise self._dict_to_exception(objs[build_step_exception_key]) 276 277 step_in = StepInput( 278 env, 279 self._get_repo_root(env), 280 self.get_config(), 281 objs, 282 ) 283 step.get_step_func()(step_in) 284 285 def add_snapshot_restore_exclude_paths(self, paths: list[str]) -> None: 286 """Add paths to ignore during snapshot directory restore.""" 287 self._snapshot_restore_exclude_paths.extend(paths) 288 289 def _take_snapshot( 290 self, 291 repo_root: str, 292 name: str, 293 step_out: StepOutput, 294 env: dict[str, str], 295 ) -> None: 296 """Take a snapshot of the repository and environment.""" 297 self._snapshot.take_snapshot( 298 name, 299 repo_root, 300 include_paths=step_out.get_snapshot_include_paths(), 301 exclude_paths=step_out.get_snapshot_exclude_paths(), 302 env_keys=step_out.get_snapshot_env_keys(), 303 env=env, 304 objs=step_out.get_snapshot_objs(), 305 ) 306 307 def _restore_snapshot(self, name: str) -> None: 308 """Restore the repository and environment from a snapshot.""" 309 return self._snapshot.restore_snapshot( 310 name, 311 self.get_config().workspace_path.as_posix(), 312 exclude_paths=self._snapshot_restore_exclude_paths, 313 ) 314 315 def _get_repo_root(self, env) -> str: 316 """Get repo root directory.""" 317 if self.get_config().is_build_env: 318 return os.environ[ANDROID_BUILD_TOP_KEY] 319 return env[ANDROID_BUILD_TOP_KEY] 320 321 class _Step: 322 """Parent class to build step and test step for typing declaration.""" 323 324 class _BuildStep(_Step): 325 326 def __init__(self, step_func: Callable[StepInput, StepOutput]): 327 self._step_func = step_func 328 329 def get_step_func(self) -> Callable[StepInput, StepOutput]: 330 """Returns the stored step function for build.""" 331 return self._step_func 332 333 class _TestStep(_Step): 334 335 def __init__(self, step_func: Callable[StepInput, None]): 336 self._step_func = step_func 337 338 def get_step_func(self) -> Callable[StepInput, None]: 339 """Returns the stored step function for test.""" 340 return self._step_func 341 342 343class SplitBuildTestTestCase(unittest.TestCase): 344 """Base test case class for split build-test scripting tests.""" 345 346 # Internal config to be injected to the test case from main. 347 _config: IntegrationTestConfiguration = None 348 349 @classmethod 350 def set_config(cls, config: IntegrationTestConfiguration) -> None: 351 cls._config = config 352 353 @classmethod 354 def get_config(cls) -> IntegrationTestConfiguration: 355 return cls._config 356 357 def create_split_build_test_script( 358 self, name: str = None 359 ) -> SplitBuildTestScript: 360 """Return an instance of SplitBuildTestScript with the given name. 361 362 Args: 363 name: The name of the script. The name will be used to store snapshots 364 and it's recommended to set the name to test id such as self.id(). 365 Defaults to the test id if not set. 366 """ 367 if not name: 368 name = self.id() 369 main_module_name = '__main__' 370 if name.startswith(main_module_name): 371 script_name = pathlib.Path(sys.modules[main_module_name].__file__).stem 372 name = name.replace(main_module_name, script_name) 373 return SplitBuildTestScript(name, self.get_config()) 374 375 376class _FileCompressor: 377 """Class for compressing and decompressing files.""" 378 379 def compress_all_sub_files(self, root_path: pathlib.Path) -> None: 380 """Compresses all files in the given directory and subdirectories. 381 382 Args: 383 root_path: The path to the root directory. 384 """ 385 cpu_count = multiprocessing.cpu_count() 386 with concurrent.futures.ThreadPoolExecutor( 387 max_workers=cpu_count 388 ) as executor: 389 for file_path in root_path.rglob('*'): 390 if file_path.is_file(): 391 executor.submit(self.compress_file, file_path) 392 393 def compress_file(self, file_path: pathlib.Path) -> None: 394 """Compresses a single file to zip. 395 396 Args: 397 file_path: The path to the file to compress. 398 """ 399 with zipfile.ZipFile( 400 file_path.with_suffix('.zip'), 'w', zipfile.ZIP_DEFLATED 401 ) as zip_file: 402 zip_file.write(file_path, arcname=file_path.name) 403 file_path.unlink() 404 405 def decompress_all_sub_files(self, root_path: pathlib.Path) -> None: 406 """Decompresses all compressed sub files in the given directory. 407 408 Args: 409 root_path: The path to the root directory. 410 """ 411 cpu_count = multiprocessing.cpu_count() 412 with concurrent.futures.ThreadPoolExecutor( 413 max_workers=cpu_count 414 ) as executor: 415 for file_path in root_path.rglob('*.zip'): 416 executor.submit(self.decompress_file, file_path) 417 418 def decompress_file(self, file_path: pathlib.Path) -> None: 419 """Decompresses a single zip file. 420 421 Args: 422 file_path: The path to the compressed file. 423 """ 424 with zipfile.ZipFile(file_path, 'r') as zip_file: 425 zip_file.extractall(file_path.parent) 426 file_path.unlink() 427 428 429class ParallelTestRunner(unittest.TextTestRunner): 430 """A class that holds the logic of parallel test execution. 431 432 Test methods wrapped by decorators defined in this class will be pre-executed 433 at the beginning of the test run in parallel and have the results cached when 434 the test runner is also this class. Available decorators: `run_in_parallel` 435 for runnint test method in parallel during both build and test env, 436 `run_in_parallel_in_build_env` for parallel run in build env only, and 437 `run_in_parallel_in_test_env` for parallel run in test env only. 438 """ 439 440 _RUN_IN_PARALLEL = 'run_in_parallel' 441 _RUN_IN_PARALLEL_IN_BUILD_ENV = 'run_in_parallel_in_build_env' 442 _RUN_IN_PARALLEL_IN_TEST_ENV = 'run_in_parallel_in_test_env' 443 _DECORATOR_NAME = 'decorator_name' 444 445 @classmethod 446 def _cache_first( 447 cls, func: Callable[[Any], Any], decorator_name: str 448 ) -> Callable[[Any], Any]: 449 """Cache a function's first call result and consumes it in the next call. 450 451 This decorator is similar to the built-in `functools.cache` decorator except 452 that this decorator caches the first call's run result and emit it in the 453 next run of the function, regardless of the function's input argument value 454 changes. Caching only the first call of the test ensures test retries emit 455 fresh results. 456 457 Args: 458 func: The function to cache. 459 decorator_name: The name of the decorator. 460 461 Returns: 462 The wrapped function with queue caching ability. 463 """ 464 setattr(func, cls._DECORATOR_NAME, decorator_name) 465 466 class _ResultCache: 467 result = None 468 is_to_be_cached = False 469 470 result_cache = _ResultCache() 471 472 @functools.wraps(func) 473 def _wrapped(*args, only_set_next_run_caching=False, **kwargs): 474 if only_set_next_run_caching: 475 result_cache.is_to_be_cached = True 476 return 477 478 def _get_fresh_call_result(): 479 try: 480 return (func(*args, **kwargs), None) 481 # pylint: disable-next=broad-exception-caught 482 except Exception as e: 483 return (None, e) 484 485 if result_cache.is_to_be_cached: 486 result = _get_fresh_call_result() 487 result_cache.result = result 488 result_cache.is_to_be_cached = False 489 elif result_cache.result: 490 result = result_cache.result 491 result_cache.result = None 492 else: 493 result = _get_fresh_call_result() 494 if result[1]: 495 raise result[1] 496 return result[0] 497 498 return _wrapped 499 500 @classmethod 501 def run_in_parallel(cls, func: Callable[[Any], Any]) -> Callable[[Any], Any]: 502 """Hint that a test method can run in parallel.""" 503 return cls._cache_first(func, cls.run_in_parallel.__name__) 504 505 @classmethod 506 def run_in_parallel_in_build_env( 507 cls, func: Callable[[Any], Any] 508 ) -> Callable[[Any], Any]: 509 """Hint that a test method can run in parallel in build env only.""" 510 return cls._cache_first(func, cls.run_in_parallel_in_build_env.__name__) 511 512 @classmethod 513 def run_in_parallel_in_test_env( 514 cls, func: Callable[[Any], Any] 515 ) -> Callable[[Any], Any]: 516 """Hint that a test method can run in parallel in test env only.""" 517 return cls._cache_first(func, cls.run_in_parallel_in_test_env.__name__) 518 519 @classmethod 520 def setup_parallel(cls, func: Callable[[Any], Any]) -> Callable[[Any], Any]: 521 """Hint that a method is for setting up a parallel run.""" 522 return cls._cache_first(func, cls.setup_parallel.__name__) 523 524 @classmethod 525 def setup_parallel_in_build_env( 526 cls, func: Callable[[Any], Any] 527 ) -> Callable[[Any], Any]: 528 """Hint that a method is for setting up a parallel run in build env only.""" 529 return cls._cache_first(func, cls.setup_parallel_in_build_env.__name__) 530 531 @classmethod 532 def setup_parallel_in_test_env( 533 cls, func: Callable[[Any], Any] 534 ) -> Callable[[Any], Any]: 535 """Hint that a method is for setting up a parallel run in test env only.""" 536 return cls._cache_first(func, cls.setup_parallel_in_test_env.__name__) 537 538 def run(self, test): 539 """Executes parallel tests first and then non-parallel tests.""" 540 for test_suite in test: 541 self._pre_execute_parallel_tests(test_suite) 542 return super().run(test) 543 544 @staticmethod 545 def _get_test_function(test: unittest.TestCase) -> Callable[Any, Any]: 546 """Gets the test function from a TestCase class wrapped by unittest.""" 547 return getattr(test, test.id().split('.')[-1]) 548 549 @classmethod 550 def _get_parallel_setups( 551 cls, test_suite: unittest.TestSuite 552 ) -> set[Callable[None, Any]]: 553 """Returns a set of functions to be executed as setup for parallel run.""" 554 test_cls = None 555 for test_case in test_suite: 556 test_cls = test_case.__class__ 557 break 558 if not test_cls: 559 return set() 560 561 result = set() 562 update_result = lambda decorator: result.update( 563 filter( 564 lambda func: callable(func) 565 and decorator.__name__ == getattr(func, cls._DECORATOR_NAME, None), 566 map(functools.partial(getattr, test_cls), dir(test_cls)), 567 ) 568 ) 569 update_result(cls.setup_parallel) 570 if test_cls.get_config().is_build_env: 571 update_result(cls.setup_parallel_in_build_env) 572 if test_cls.get_config().is_test_env: 573 update_result(cls.setup_parallel_in_test_env) 574 return result 575 576 @classmethod 577 def _get_parallel_tests( 578 cls, test_suite: unittest.TestSuite 579 ) -> Iterator[unittest.TestCase]: 580 """Returns a list of test cases to be run in parallel from a test suite.""" 581 and_combine = lambda *funcs: functools.reduce( 582 lambda accu, func: lambda item: accu(item) and func(item), funcs 583 ) 584 or_combine = lambda *funcs: functools.reduce( 585 lambda accu, func: lambda item: accu(item) or func(item), funcs 586 ) 587 is_decorated = lambda decorator, test: decorator.__name__ == getattr( 588 cls._get_test_function(test), 589 cls._DECORATOR_NAME, 590 None, 591 ) 592 is_parallel = functools.partial(is_decorated, cls.run_in_parallel) 593 is_parallel_in_build = functools.partial( 594 is_decorated, cls.run_in_parallel_in_build_env 595 ) 596 is_parallel_in_test = functools.partial( 597 is_decorated, cls.run_in_parallel_in_test_env 598 ) 599 is_in_build_env = lambda test: test.get_config().is_build_env 600 is_in_test_env = lambda test: test.get_config().is_test_env 601 combined_filter = or_combine( 602 and_combine(is_parallel_in_build, is_in_build_env), 603 and_combine(is_parallel_in_test, is_in_test_env), 604 is_parallel, 605 ) 606 return filter(combined_filter, test_suite) 607 608 @classmethod 609 def _pre_execute_parallel_tests(cls, test_suite: unittest.TestSuite) -> None: 610 """Pre-execute parallel tests in the test suite.""" 611 for setup_func in cls._get_parallel_setups(test_suite): 612 logging.info('Setting up parallel tests with function %s', setup_func) 613 setup_func() 614 with concurrent.futures.ThreadPoolExecutor( 615 max_workers=multiprocessing.cpu_count() 616 ) as executor: 617 618 def _execute_test(test): 619 # We can't directly call test.run because the function would either not 620 # know that it's being pre-executed or not know whether it's being 621 # executed by this test runner. We can't call the test function directly 622 # because setup and teardown would be missed. We can't set properties 623 # of the test function here because the test function has already been 624 # wrapped by unittest. The only way we can let the test function know 625 # that it needs to cache the next run is to call the function with a 626 # parameter first before calling the run method. 627 cls._get_test_function(test).__func__(only_set_next_run_caching=True) 628 return executor.submit(test.run) 629 630 for class_name, class_group in itertools.groupby( 631 cls._get_parallel_tests(test_suite), 632 lambda obj: f'{obj.__class__.__module__}.{obj.__class__}', 633 ): 634 test_group = list(class_group) 635 logging.info( 636 'Pre-executing %s of %s tests in parallel...', 637 len(test_group), 638 class_name, 639 ) 640 641 list(concurrent.futures.as_completed(map(_execute_test, test_group))) 642 643 644def _configure_logging(verbose: bool, log_file_dir_path: pathlib.Path): 645 """Configure the logger. 646 647 Args: 648 verbose: If true display DEBUG level logs on console. 649 log_file_dir_path: A directory which stores the log file. 650 """ 651 log_file = log_file_dir_path.joinpath('asuite_integration_tests.log') 652 if log_file.exists(): 653 timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S') 654 log_file = log_file_dir_path.joinpath( 655 f'asuite_integration_tests_{timestamp}.log' 656 ) 657 log_file.parent.mkdir(parents=True, exist_ok=True) 658 659 atexit.register(lambda: print('Logs are saved to %s' % log_file)) 660 661 log_format = '%(asctime)s %(filename)s:%(lineno)s:%(levelname)s: %(message)s' 662 date_format = '%Y-%m-%d %H:%M:%S' 663 logging.basicConfig( 664 filename=log_file.as_posix(), 665 level=logging.DEBUG, 666 format=log_format, 667 datefmt=date_format, 668 ) 669 console = logging.StreamHandler() 670 console.name = 'console' 671 console.setLevel(logging.INFO) 672 if verbose: 673 console.setLevel(logging.DEBUG) 674 console.setFormatter(logging.Formatter(log_format)) 675 logging.getLogger('').addHandler(console) 676 677 678def _parse_known_args( 679 argv: list[str], 680 argparser_update_func: Callable[argparse.ArgumentParser, None] = None, 681) -> tuple[argparse.Namespace, list[str]]: 682 """Parse command line args and check required args being provided.""" 683 684 description = """A script to build and/or run the Asuite integration tests. 685Usage examples: 686 python <script_path>: Runs both the build and test steps. 687 python <script_path> -b -t: Runs both the build and test steps. 688 python <script_path> -b: Runs only the build steps. 689 python <script_path> -t: Runs only the test steps. 690""" 691 692 parser = argparse.ArgumentParser( 693 add_help=True, 694 description=description, 695 formatter_class=argparse.RawDescriptionHelpFormatter, 696 ) 697 698 parser.add_argument( 699 '-b', 700 '--build', 701 action='store_true', 702 default=False, 703 help=( 704 'Run build steps. Can be set to true together with the test option.' 705 ' If both build and test are unset, will run both steps.' 706 ), 707 ) 708 parser.add_argument( 709 '-t', 710 '--test', 711 action='store_true', 712 default=False, 713 help=( 714 'Run test steps. Can be set to true together with the build option.' 715 ' If both build and test are unset, will run both steps.' 716 ), 717 ) 718 parser.add_argument( 719 '--tar_snapshot', 720 action='store_true', 721 default=False, 722 help=( 723 'Whether to tar and untar the snapshot storage into/from a single' 724 ' file.' 725 ), 726 ) 727 parser.add_argument( 728 '-v', 729 '--verbose', 730 action='store_true', 731 default=False, 732 help='Whether to set log level to verbose.', 733 ) 734 735 # The below flags are passed in by the TF Python test runner. 736 parser.add_argument( 737 '-s', 738 '--serial', 739 help=( 740 'The device serial. Required in test mode when ANDROID_BUILD_TOP is' 741 ' not set.' 742 ), 743 ) 744 parser.add_argument( 745 '--test-output-file', 746 help=( 747 'The file in which to store the unit test results. This option is' 748 ' usually set by TradeFed when running the script with python and' 749 ' is optional during manual script execution.' 750 ), 751 ) 752 753 if argparser_update_func: 754 argparser_update_func(parser) 755 756 return parser.parse_known_args(argv) 757 758 759def _run_test( 760 config: IntegrationTestConfiguration, 761 argv: list[str], 762 test_output_file_path: str = None, 763) -> None: 764 """Execute integration tests with given test configuration.""" 765 766 compressor = _FileCompressor() 767 768 def cleanup() -> None: 769 if config.workspace_path.exists(): 770 shutil.rmtree(config.workspace_path) 771 if config.snapshot_storage_path.exists(): 772 shutil.rmtree(config.snapshot_storage_path) 773 774 if config.is_test_env and config.is_tar_snapshot: 775 if not config.snapshot_storage_tar_path.exists(): 776 raise EnvironmentError( 777 f'Snapshot tar {config.snapshot_storage_tar_path} does not' 778 ' exist. Have you run the build mode with --tar_snapshot' 779 ' option enabled?' 780 ) 781 with tarfile.open(config.snapshot_storage_tar_path, 'r') as tar: 782 tar.extractall(config.snapshot_storage_path.parent.as_posix()) 783 784 logging.info( 785 'Decompressing the snapshot storage with %s threads...', 786 multiprocessing.cpu_count(), 787 ) 788 start_time = time.time() 789 compressor.decompress_all_sub_files(config.snapshot_storage_path) 790 logging.info( 791 'Decompression finished in {:.2f} seconds'.format( 792 time.time() - start_time 793 ) 794 ) 795 796 atexit.register(cleanup) 797 798 def unittest_main(stream=None): 799 # Note that we use a type and not an instance for 'testRunner' 800 # since TestProgram forwards its constructor arguments when creating 801 # an instance of the runner type. Not doing so would require us to 802 # make sure that the parameters passed to TestProgram are aligned 803 # with those for creating a runner instance. 804 class TestRunner(ParallelTestRunner): 805 """Writes test results to the TF-provided file.""" 806 807 def __init__(self, *args: Any, **kwargs: Any) -> None: 808 super().__init__(stream=stream, *args, **kwargs) 809 810 class TestLoader(unittest.TestLoader): 811 """Injects the test configuration to the test classes.""" 812 813 def loadTestsFromTestCase(self, *args, **kwargs): 814 test_suite = super().loadTestsFromTestCase(*args, **kwargs) 815 for test in test_suite: 816 test.__class__.set_config(config) 817 break 818 return test_suite 819 820 # Setting verbosity is required to generate output that the TradeFed 821 # test runner can parse. 822 unittest.main( 823 testRunner=TestRunner, 824 verbosity=3, 825 argv=argv, 826 testLoader=TestLoader(), 827 exit=config.is_test_env, 828 ) 829 830 if test_output_file_path: 831 pathlib.Path(test_output_file_path).parent.mkdir(exist_ok=True) 832 833 with open(test_output_file_path, 'w', encoding='utf-8') as test_output_file: 834 unittest_main(stream=test_output_file) 835 else: 836 unittest_main(stream=None) 837 838 if config.is_build_env and config.is_tar_snapshot: 839 logging.info( 840 'Compressing the snapshot storage with %s threads...', 841 multiprocessing.cpu_count(), 842 ) 843 start_time = time.time() 844 compressor.compress_all_sub_files(config.snapshot_storage_path) 845 logging.info( 846 'Compression finished in {:.2f} seconds'.format( 847 time.time() - start_time 848 ) 849 ) 850 851 with tarfile.open(config.snapshot_storage_tar_path, 'w') as tar: 852 tar.add( 853 config.snapshot_storage_path, 854 arcname=config.snapshot_storage_path.name, 855 ) 856 cleanup() 857 858 859def main( 860 argv: list[str] = None, 861 make_before_build: list[str] = None, 862 argparser_update_func: Callable[argparse.ArgumentParser, None] = None, 863 config_update_function: Callable[ 864 [IntegrationTestConfiguration, argparse.Namespace], None 865 ] = None, 866) -> None: 867 """Main method to start the integration tests. 868 869 Args: 870 argv: A list of arguments to parse. 871 make_before_build: A list of targets to make before running build steps. 872 argparser_update_func: A function that takes an ArgumentParser object and 873 updates it. 874 config_update_function: A function that takes a 875 IntegrationTestConfiguration config and the parsed args to updates the 876 config. 877 878 Raises: 879 EnvironmentError: When some environment variables are missing. 880 """ 881 if not argv: 882 argv = sys.argv 883 if make_before_build is None: 884 make_before_build = [] 885 886 args, unittest_argv = _parse_known_args(argv, argparser_update_func) 887 888 snapshot_storage_dir_name = 'snapshot_storage' 889 snapshot_storage_tar_name = 'snapshot.tar' 890 891 integration_test_out_path = pathlib.Path( 892 tempfile.gettempdir(), 893 'asuite_integration_tests_%s' 894 % pathlib.Path('~').expanduser().name.replace(' ', '_'), 895 ) 896 897 if SNAPSHOT_STORAGE_TAR_KEY in os.environ: 898 snapshot_storage_tar_path = pathlib.Path( 899 os.environ[SNAPSHOT_STORAGE_TAR_KEY] 900 ) 901 snapshot_storage_tar_path.parent.mkdir(parents=True, exist_ok=True) 902 else: 903 snapshot_storage_tar_path = integration_test_out_path.joinpath( 904 snapshot_storage_tar_name 905 ) 906 907 _configure_logging(args.verbose, snapshot_storage_tar_path.parent) 908 909 logging.debug('The os environ is: %s', os.environ) 910 911 # When the build or test is unset, assume it's a local run for both build 912 # and test steps. 913 is_build_test_unset = not args.build and not args.test 914 config = IntegrationTestConfiguration() 915 config.is_build_env = args.build or is_build_test_unset 916 config.is_test_env = args.test or is_build_test_unset 917 config.device_serial = args.serial 918 config.snapshot_storage_path = integration_test_out_path.joinpath( 919 snapshot_storage_dir_name 920 ) 921 config.snapshot_storage_tar_path = snapshot_storage_tar_path 922 config.workspace_path = integration_test_out_path.joinpath('workspace') 923 config.is_tar_snapshot = args.tar_snapshot 924 925 if config_update_function: 926 config_update_function(config, args) 927 928 if config.is_build_env: 929 if ANDROID_BUILD_TOP_KEY not in os.environ: 930 raise EnvironmentError( 931 f'Environment variable {ANDROID_BUILD_TOP_KEY} is required to' 932 ' build the integration test.' 933 ) 934 935 repo_root = os.environ[ANDROID_BUILD_TOP_KEY] 936 937 total, used, free = shutil.disk_usage(repo_root) 938 logging.debug( 939 'Disk usage: Total: {:.2f} GB, Used: {:.2f} GB, Free: {:.2f} GB'.format( 940 total / (1024**3), used / (1024**3), free / (1024**3) 941 ) 942 ) 943 944 if 'OUT_DIR' in os.environ: 945 out_dir = os.environ['OUT_DIR'] 946 if os.path.isabs(out_dir) and not pathlib.Path(out_dir).is_relative_to( 947 repo_root 948 ): 949 raise EnvironmentError( 950 f'$OUT_DIR {out_dir} not relative to the repo root' 951 f' {repo_root} is not supported yet.' 952 ) 953 elif 'HOST_OUT' in os.environ: 954 out_dir = ( 955 pathlib.Path(os.environ['HOST_OUT']).relative_to(repo_root).parts[0] 956 ) 957 else: 958 out_dir = 'out' 959 os.environ['OUT_DIR'] = out_dir 960 961 for target in make_before_build: 962 logging.info( 963 'Building the %s target before integration test run.', target 964 ) 965 subprocess.check_call( 966 f'build/soong/soong_ui.bash --make-mode {target}'.split(), 967 cwd=repo_root, 968 ) 969 970 if config.is_build_env ^ config.is_test_env: 971 _run_test(config, unittest_argv, args.test_output_file) 972 return 973 974 build_config = copy.deepcopy(config) 975 build_config.is_test_env = False 976 977 test_config = copy.deepcopy(config) 978 test_config.is_build_env = False 979 980 _run_test(build_config, unittest_argv, args.test_output_file) 981 _run_test(test_config, unittest_argv, args.test_output_file) 982