#!/usr/bin/env python3 # # Copyright (C) 2022 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """The script to upload generated artifacts from build server to CAS.""" import argparse import copy import dataclasses import glob import json import logging import os import shutil import subprocess import tempfile import time import re @dataclasses.dataclass class ArtifactConfig: """Configuration of an artifact to be uploaded to CAS. Attributes: source_path: path to the artifact that relative to the root of source code. unzip: true if the artifact should be unzipped and uploaded as a directory. chunk: true if the artifact should be uploaded with chunking. chunk_fallback: true if a regular version (no chunking) of the artifact should be uploaded. exclude_filters: a list of regular expressions for files that are excluded from uploading. """ source_path: str unzip: bool chunk: bool = False chunk_fallback: bool = False exclude_filters: list[str] = dataclasses.field(default_factory=list) @dataclasses.dataclass class CasInfo: """Basic information of CAS server and client. Attributes: cas_instance: the instance name of CAS service. cas_service: the address of CAS service. client_path: path to the CAS uploader client. version: version of the CAS uploader client, in turple format. """ cas_instance: str cas_service: str client_path: str client_version: tuple @dataclasses.dataclass class UploadResult: """Result of uploading a single artifact with CAS client. Attributes: digest: root digest of the artifact. content_details: detail information of all uploaded files inside the uploaded artifact. """ digest: str content_details: list[dict[str,any]] CAS_UPLOADER_PREBUILT_PATH = 'tools/tradefederation/prebuilts/' CAS_UPLOADER_PATH = 'tools/content_addressed_storage/prebuilts/' CAS_UPLOADER_BIN = 'casuploader' UPLOADER_TIMEOUT_SECS = 600 # 10 minutes AVG_CHUNK_SIZE_IN_KB = 128 DIGESTS_PATH = 'cas_digests.json' LOG_PATH = 'logs/cas_uploader.log' CONTENT_DETAILS_PATH = 'logs/cas_content_details.json' CHUNKED_ARTIFACT_NAME_PREFIX = "_chunked_" CHUNKED_DIR_ARTIFACT_NAME_PREFIX = "_chunked_dir_" # Configurations of artifacts will be uploaded to CAS. # TODO(b/298890453) Add artifacts after this script is attached to build process. # If configs share files, chunking enabled artifacts should come first. ARTIFACTS = [ # test_suite targets ArtifactConfig('android-catbox.zip', True), ArtifactConfig('android-csuite.zip', True), ArtifactConfig('android-cts.zip', True, exclude_filters=['android-cts/jdk/.*']), ArtifactConfig('android-gcatbox.zip', True), ArtifactConfig('android-gts.zip', True, exclude_filters=['android-gts/jdk/.*']), ArtifactConfig('android-mcts.zip', True), ArtifactConfig('android-mts.zip', True, exclude_filters=['android-mts/jdk/.*']), ArtifactConfig('android-pts.zip', True, exclude_filters=['android-pts/jdk/.*']), ArtifactConfig('android-sts.zip', True), ArtifactConfig('android-vts.zip', True), ArtifactConfig('art-host-tests.zip', True), ArtifactConfig('bazel-test-suite.zip', True), ArtifactConfig('host-unit-tests.zip', True), ArtifactConfig('general-tests.zip', True), ArtifactConfig('general-tests_configs.zip', True), ArtifactConfig('general-tests_host-shared-libs.zip', True), ArtifactConfig('tradefed.zip', True), ArtifactConfig('google-tradefed.zip', True), ArtifactConfig('robolectric-tests.zip', True), ArtifactConfig('ravenwood-tests.zip', True), ArtifactConfig('test_mappings.zip', True), # Mainline artifacts ArtifactConfig('*.apex', False), ArtifactConfig('*.apk', False), # Device target artifacts ArtifactConfig('androidTest.zip', True), ArtifactConfig('device-tests.zip', True), ArtifactConfig('device-tests_configs.zip', True), ArtifactConfig('device-tests_host-shared-libs.zip', True), ArtifactConfig('performance-tests.zip', True), ArtifactConfig('device-platinum-tests.zip', True), ArtifactConfig('device-platinum-tests_configs.zip', True), ArtifactConfig('device-platinum-tests_host-shared-libs.zip', True), ArtifactConfig('device-pixel-tests.zip', True), ArtifactConfig('device-pixel-tests_configs.zip', True), ArtifactConfig('device-pixel-tests_host-shared-libs.zip', True), ArtifactConfig('*-tests-*zip', True), ArtifactConfig('*-continuous_instrumentation_tests-*zip', True), ArtifactConfig('*-continuous_instrumentation_metric_tests-*zip', True), ArtifactConfig('*-continuous_native_tests-*zip', True), ArtifactConfig('cvd-host_package.tar.gz', False), ArtifactConfig('bootloader.img', False), ArtifactConfig('radio.img', False), ArtifactConfig('*-target_files-*.zip', True), ArtifactConfig('*-img-*zip', True, True, True) ] # Artifacts will be uploaded if the config name is set in arguments `--experiment_artifacts`. # These configs are usually used to upload artifacts in partial branches/targets for experiment # purpose. # A sample entry: # "device_image_target_files": ArtifactConfig('*-target_files-*.zip', True) EXPERIMENT_ARTIFACT_CONFIGS = { "device_image_proguard_dict": ArtifactConfig('*-proguard-dict-*.zip', False, True, True), } def _init_cas_info() -> CasInfo: client_path = _get_client() return CasInfo( _get_env_var('RBE_instance', check=True), _get_env_var('RBE_service', check=True), client_path, _get_client_version(client_path) ) def _get_client() -> str: if CAS_UPLOADER_PREBUILT_PATH in os.path.abspath(__file__): return _get_prebuilt_client() bin_path = os.path.join(CAS_UPLOADER_PATH, CAS_UPLOADER_BIN) if os.path.isfile(bin_path): logging.info('Using client at %s', bin_path) return bin_path return _get_prebuilt_client() def _get_prebuilt_client() -> str: client = glob.glob(CAS_UPLOADER_PREBUILT_PATH + '**/' + CAS_UPLOADER_BIN, recursive=True) if not client: raise ValueError('Could not find casuploader binary') logging.info('Using client at %s', client[0]) return client[0] def _get_client_version(client_path: str) -> int: """Get the version of CAS client in turple format.""" version_output = '' try: version_output = subprocess.check_output([client_path, '-version']).decode('utf-8').strip() matched = re.findall(r'version: (\d+\.\d+)', version_output) if not matched: logging.warning('Failed to parse CAS client version. Output: %s', version_output) return (0, 0) version = tuple(map(int, matched[0].split('.'))) logging.info('CAS client version is %s', version) return version # pylint: disable=broad-exception-caught except Exception as e: # pylint: enable=broad-exception-caught logging.warning('Failed to get CAS client version. Output: %s. Error %s', version_output, e) return (0, 0) def _get_env_var(key: str, default=None, check=False): value = os.environ.get(key, default) if check and not value: raise ValueError(f'Error: the environment variable {key} is not set') return value def _parse_additional_artifacts(args) -> list[ArtifactConfig]: additional_artifacts = [] for config in args.experiment_artifacts: if config not in EXPERIMENT_ARTIFACT_CONFIGS: logging.warning('Ignore invalid experiment_artifacts: %s', config) else: additional_artifacts.append(EXPERIMENT_ARTIFACT_CONFIGS[config]) logging.info( 'Added experiment artifact from arguments %s', EXPERIMENT_ARTIFACT_CONFIGS[config].source_path, ) return additional_artifacts def _upload( cas_info: CasInfo, artifact: ArtifactConfig, working_dir: str, log_file: str, ) -> str: """Upload the artifact to CAS by casuploader binary. Args: cas_info: the basic CAS server information. artifact: the artifact to be uploaded to CAS. working_dir: the directory for intermediate files. log_file: the file where to add the upload logs. Returns: the digest of the uploaded artifact, formatted as "/". returns None if artifact upload fails. """ # `-dump-file-details` only supports on cas uploader V1.0 or later. dump_file_details = cas_info.client_version >= (1, 0) if not dump_file_details: logging.warning('-dump-file-details is not enabled') with tempfile.NamedTemporaryFile(mode='w+') as digest_file, tempfile.NamedTemporaryFile( mode='w+') as content_details_file: logging.info( 'Uploading %s to CAS instance %s', artifact.source_path, cas_info.cas_instance ) cmd = [ cas_info.client_path, '-cas-instance', cas_info.cas_instance, '-cas-addr', cas_info.cas_service, '-dump-digest', digest_file.name, '-use-adc', ] cmd = cmd + _path_for_artifact(artifact, working_dir) if artifact.chunk: cmd = cmd + ['-chunk', '-avg-chunk-size', str(AVG_CHUNK_SIZE_IN_KB)] for exclude_filter in artifact.exclude_filters: cmd = cmd + ['-exclude-filters', exclude_filter] if dump_file_details: cmd = cmd + ['-dump-file-details', content_details_file.name] try: logging.info('Running command: %s', cmd) with open(log_file, 'a', encoding='utf8') as outfile: subprocess.run( cmd, check=True, text=True, stdout=outfile, stderr=subprocess.STDOUT, encoding='utf-8', timeout=UPLOADER_TIMEOUT_SECS ) except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: logging.warning( 'Failed to upload %s to CAS instance %s. Skip.\nError message: %s\nLog: %s', artifact.source_path, cas_info.cas_instance, e, e.stdout, ) return None except subprocess.SubprocessError as e: logging.warning('Failed to upload %s to CAS instance %s. Skip.\n. Error %s', artifact.source_path, cas_info.cas_instance, e) return None # Read digest of the root directory or file from dumped digest file. digest = digest_file.read() if digest: logging.info('Uploaded %s to CAS. Digest: %s', artifact.source_path, digest) else: logging.warning( 'No digest is dumped for file %s, the uploading may fail.', artifact.source_path) return None content_details = None if dump_file_details: try: content_details = json.loads(content_details_file.read()) except json.JSONDecodeError as e: logging.warning('Failed to parse uploaded content details: %s', e) return UploadResult(digest, content_details) def _path_for_artifact(artifact: ArtifactConfig, working_dir: str) -> [str]: if artifact.unzip: return ['-zip-path', artifact.source_path] if artifact.chunk: return ['-file-path', artifact.source_path] # TODO(b/250643926) This is a workaround to handle non-directory files. tmp_dir = tempfile.mkdtemp(dir=working_dir) target_path = os.path.join(tmp_dir, os.path.basename(artifact.source_path)) shutil.copy(artifact.source_path, target_path) return ['-dir-path', tmp_dir] def _output_results( cas_info: CasInfo, output_dir: str, digests: dict[str, str], content_details: list[dict[str, any]], ): digests_output = { 'cas_instance': cas_info.cas_instance, 'cas_service': cas_info.cas_service, 'client_version': '.'.join(map(str, cas_info.client_version)), 'files': digests, } output_path = os.path.join(output_dir, DIGESTS_PATH) with open(output_path, 'w', encoding='utf8') as writer: writer.write(json.dumps(digests_output, sort_keys=True, indent=2)) logging.info('Output digests to %s', output_path) output_path = os.path.join(output_dir, CONTENT_DETAILS_PATH) with open(output_path, 'w', encoding='utf8') as writer: writer.write(json.dumps(content_details, sort_keys=True, indent=2)) logging.info('Output uploaded content details to %s', output_path) def _upload_all_artifacts(cas_info: CasInfo, all_artifacts: ArtifactConfig, dist_dir: str, working_dir: str, log_file:str): file_digests = {} content_details = [] skip_files = [] _add_fallback_artifacts(all_artifacts) for artifact in all_artifacts: source_path = artifact.source_path for f in glob.glob(dist_dir + '/**/' + source_path, recursive=True): start = time.time() basename = os.path.basename(f) name = _artifact_name(basename, artifact.chunk, artifact.unzip) # Avoid redundant upload if multiple ArtifactConfigs share files. if name in file_digests or name in skip_files: continue artifact.source_path = f result = _upload(cas_info, artifact, working_dir, log_file) if result and result.digest: file_digests[name] = result.digest if artifact.chunk and (not artifact.chunk_fallback or artifact.unzip): # Skip the regular version even it matches other configs. skip_files.append(basename) else: logging.warning( 'Skip to save the digest of file %s, the uploading may fail', name ) if result and result.content_details: content_details.append({"artifact": name, "details": result.content_details}) else: logging.warning('Skip to save the content details of file %s', name) logging.info( 'Elapsed time of uploading %s: %d seconds\n\n', artifact.source_path, time.time() - start, ) _output_results( cas_info, dist_dir, file_digests, content_details, ) def _add_fallback_artifacts(artifacts: list[ArtifactConfig]): """Add a fallback artifact if chunking is enabled for an artifact. For unzip artifacts, the fallback is the zipped chunked version. For the rest, the fallback is the standard version (not chunked). """ for artifact in artifacts: if artifact.chunk and artifact.chunk_fallback: fallback_artifact = copy.copy(artifact) if artifact.unzip: fallback_artifact.unzip = False else: fallback_artifact.chunk = False artifacts.append(fallback_artifact) def _artifact_name(basename: str, chunk: bool, unzip: bool) -> str: if not chunk: return basename if unzip: return CHUNKED_DIR_ARTIFACT_NAME_PREFIX + basename return CHUNKED_ARTIFACT_NAME_PREFIX + basename def main(): """Uploads the specified artifacts to CAS.""" parser = argparse.ArgumentParser() parser.add_argument( '--experiment_artifacts', required=False, action='append', default=[], help='Name of configuration which artifact to upload', ) args = parser.parse_args() dist_dir = _get_env_var('DIST_DIR', check=True) log_file = os.path.join(dist_dir, LOG_PATH) print('content_uploader.py will export logs to:', log_file) logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s', filename=log_file, ) logging.info('Environment variables of running server: %s', os.environ) additional_artifacts = _parse_additional_artifacts(args) cas_info = _init_cas_info() with tempfile.TemporaryDirectory() as working_dir: logging.info('The working dir is %s', working_dir) start = time.time() _upload_all_artifacts(cas_info, ARTIFACTS + additional_artifacts, dist_dir, working_dir, log_file) logging.info('Total time of uploading build artifacts to CAS: %d seconds', time.time() - start) if __name__ == '__main__': main()