# Copyright 2022 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Utility functions that process cuttlefish images.""" import collections import glob import logging import os import posixpath as remote_path import re import subprocess import tempfile from acloud import errors from acloud.create import create_common from acloud.internal import constants from acloud.internal.lib import ota_tools from acloud.internal.lib import ssh from acloud.internal.lib import utils from acloud.public import report logger = logging.getLogger(__name__) # Local build artifacts to be uploaded. _ARTIFACT_FILES = ["*.img", "bootloader", "kernel"] # The boot image name pattern corresponds to the use cases: # - In a cuttlefish build environment, ANDROID_PRODUCT_OUT conatins boot.img # and boot-debug.img. The former is the default boot image. The latter is not # useful for cuttlefish. # - In an officially released GKI (Generic Kernel Image) package, the image # name is boot-<kernel version>.img. _BOOT_IMAGE_NAME_PATTERN = r"boot(-[\d.]+)?\.img" _VENDOR_BOOT_IMAGE_NAME = "vendor_boot.img" _KERNEL_IMAGE_NAMES = ("kernel", "bzImage", "Image") _INITRAMFS_IMAGE_NAME = "initramfs.img" _VENDOR_IMAGE_NAMES = ("vendor.img", "vendor_dlkm.img", "odm.img", "odm_dlkm.img") VendorImagePaths = collections.namedtuple( "VendorImagePaths", ["vendor", "vendor_dlkm", "odm", "odm_dlkm"]) # The relative path to the base directory containing cuttelfish images, tools, # and runtime files. On a GCE instance, the directory is the SSH user's HOME. GCE_BASE_DIR = "." _REMOTE_HOST_BASE_DIR_FORMAT = "acloud_cf_%(num)d" # Relative paths in a base directory. _REMOTE_IMAGE_DIR = "acloud_image" _REMOTE_BOOT_IMAGE_PATH = remote_path.join(_REMOTE_IMAGE_DIR, "boot.img") _REMOTE_VENDOR_BOOT_IMAGE_PATH = remote_path.join( _REMOTE_IMAGE_DIR, _VENDOR_BOOT_IMAGE_NAME) _REMOTE_VBMETA_IMAGE_PATH = remote_path.join(_REMOTE_IMAGE_DIR, "vbmeta.img") _REMOTE_KERNEL_IMAGE_PATH = remote_path.join( _REMOTE_IMAGE_DIR, _KERNEL_IMAGE_NAMES[0]) _REMOTE_INITRAMFS_IMAGE_PATH = remote_path.join( _REMOTE_IMAGE_DIR, _INITRAMFS_IMAGE_NAME) _REMOTE_SUPER_IMAGE_DIR = remote_path.join(_REMOTE_IMAGE_DIR, "super_image_dir") # Remote host instance name _REMOTE_HOST_INSTANCE_NAME_FORMAT = ( constants.INSTANCE_TYPE_HOST + "-%(ip_addr)s-%(num)d-%(build_id)s-%(build_target)s") _REMOTE_HOST_INSTANCE_NAME_PATTERN = re.compile( constants.INSTANCE_TYPE_HOST + r"-(?P<ip_addr>[\d.]+)-(?P<num>\d+)-.+") # launch_cvd arguments. _DATA_POLICY_CREATE_IF_MISSING = "create_if_missing" _DATA_POLICY_ALWAYS_CREATE = "always_create" _NUM_AVDS_ARG = "-num_instances=%(num_AVD)s" AGREEMENT_PROMPT_ARG = "-report_anonymous_usage_stats=y" UNDEFOK_ARG = "-undefok=report_anonymous_usage_stats,config" # Connect the OpenWrt device via console file. _ENABLE_CONSOLE_ARG = "-console=true" # WebRTC args _WEBRTC_ID = "--webrtc_device_id=%(instance)s" _WEBRTC_ARGS = ["--start_webrtc", "--vm_manager=crosvm"] _VNC_ARGS = ["--start_vnc_server=true"] # Cuttlefish runtime directory is specified by `-instance_dir <runtime_dir>`. # Cuttlefish tools may create a symbolic link at the specified path. # The actual location of the runtime directory depends on the version: # # In Android 10, the directory is `<runtime_dir>`. # # In Android 11 and 12, the directory is `<runtime_dir>.<num>`. # `<runtime_dir>` is a symbolic link to the first device's directory. # # In the latest version, if `--instance-dir <runtime_dir>` is specified, the # directory is `<runtime_dir>/instances/cvd-<num>`. # `<runtime_dir>_runtime` and `<runtime_dir>.<num>` are symbolic links. # # If `--instance-dir <runtime_dir>` is not specified, the directory is # `~/cuttlefish/instances/cvd-<num>`. # `~/cuttlefish_runtime` and `~/cuttelfish_runtime.<num>` are symbolic links. _LOCAL_LOG_DIR_FORMAT = os.path.join( "%(runtime_dir)s", "instances", "cvd-%(num)d", "logs") # Relative paths in a base directory. _REMOTE_RUNTIME_DIR_FORMAT = remote_path.join( "cuttlefish", "instances", "cvd-%(num)d") _REMOTE_LEGACY_RUNTIME_DIR_FORMAT = "cuttlefish_runtime.%(num)d" HOST_KERNEL_LOG = report.LogFile( "/var/log/kern.log", constants.LOG_TYPE_KERNEL_LOG, "host_kernel.log") # Contents of the target_files archive. _DOWNLOAD_MIX_IMAGE_NAME = "{build_target}-target_files-{build_id}.zip" _TARGET_FILES_META_DIR_NAME = "META" _TARGET_FILES_IMAGES_DIR_NAME = "IMAGES" _MISC_INFO_FILE_NAME = "misc_info.txt" # ARM flavor build target pattern. _ARM_TARGET_PATTERN = "arm" def GetAdbPorts(base_instance_num, num_avds_per_instance): """Get ADB ports of cuttlefish. Args: base_instance_num: An integer or None, the instance number of the first device. num_avds_per_instance: An integer or None, the number of devices. Returns: The port numbers as a list of integers. """ return [constants.CF_ADB_PORT + (base_instance_num or 1) - 1 + index for index in range(num_avds_per_instance or 1)] def GetFastbootPorts(base_instance_num, num_avds_per_instance): """Get Fastboot ports of cuttlefish. Args: base_instance_num: An integer or None, the instance number of the first device. num_avds_per_instance: An integer or None, the number of devices. Returns: The port numbers as a list of integers. """ return [constants.CF_FASTBOOT_PORT + (base_instance_num or 1) - 1 + index for index in range(num_avds_per_instance or 1)] def GetVncPorts(base_instance_num, num_avds_per_instance): """Get VNC ports of cuttlefish. Args: base_instance_num: An integer or None, the instance number of the first device. num_avds_per_instance: An integer or None, the number of devices. Returns: The port numbers as a list of integers. """ return [constants.CF_VNC_PORT + (base_instance_num or 1) - 1 + index for index in range(num_avds_per_instance or 1)] def _UploadImageZip(ssh_obj, remote_dir, image_zip): """Upload an image zip to a remote host and a GCE instance. Args: ssh_obj: An Ssh object. remote_dir: The remote base directory. image_zip: The path to the image zip. """ remote_cmd = f"/usr/bin/install_zip.sh {remote_dir} < {image_zip}" logger.debug("remote_cmd:\n %s", remote_cmd) ssh_obj.Run(remote_cmd) def _UploadImageDir(ssh_obj, remote_dir, image_dir): """Upload an image directory to a remote host or a GCE instance. The images are compressed for faster upload. Args: ssh_obj: An Ssh object. remote_dir: The remote base directory. image_dir: The directory containing the files to be uploaded. """ try: images_path = os.path.join(image_dir, "required_images") with open(images_path, "r", encoding="utf-8") as images: artifact_files = images.read().splitlines() except IOError: # Older builds may not have a required_images file. In this case # we fall back to *.img. artifact_files = [] for file_name in _ARTIFACT_FILES: artifact_files.extend( os.path.basename(image) for image in glob.glob( os.path.join(image_dir, file_name))) # Upload android-info.txt to parse config value. artifact_files.append(constants.ANDROID_INFO_FILE) cmd = (f"tar -cf - --lzop -S -C {image_dir} {' '.join(artifact_files)} | " f"{ssh_obj.GetBaseCmd(constants.SSH_BIN)} -- " f"tar -xf - --lzop -S -C {remote_dir}") logger.debug("cmd:\n %s", cmd) ssh.ShellCmdWithRetry(cmd) def _UploadCvdHostPackage(ssh_obj, remote_dir, cvd_host_package): """Upload a CVD host package to a remote host or a GCE instance. Args: ssh_obj: An Ssh object. remote_dir: The remote base directory. cvd_host_package: The path to the CVD host package. """ if cvd_host_package.endswith(".tar.gz"): remote_cmd = f"tar -xzf - -C {remote_dir} < {cvd_host_package}" logger.debug("remote_cmd:\n %s", remote_cmd) ssh_obj.Run(remote_cmd) else: cmd = (f"tar -cf - --lzop -S -C {cvd_host_package} . | " f"{ssh_obj.GetBaseCmd(constants.SSH_BIN)} -- " f"tar -xf - --lzop -S -C {remote_dir}") logger.debug("cmd:\n %s", cmd) ssh.ShellCmdWithRetry(cmd) @utils.TimeExecute(function_description="Processing and uploading local images") def UploadArtifacts(ssh_obj, remote_dir, image_path, cvd_host_package): """Upload images and a CVD host package to a remote host or a GCE instance. Args: ssh_obj: An Ssh object. remote_dir: The remote base directory. image_path: A string, the path to the image zip built by `m dist` or the directory containing the images built by `m`. cvd_host_package: A string, the path to the CVD host package in gzip. """ if os.path.isdir(image_path): _UploadImageDir(ssh_obj, remote_dir, image_path) else: _UploadImageZip(ssh_obj, remote_dir, image_path) _UploadCvdHostPackage(ssh_obj, remote_dir, cvd_host_package) def FindBootImages(search_path): """Find boot and vendor_boot images in a path. Args: search_path: A path to an image file or an image directory. Returns: The boot image path and the vendor_boot image path. Each value can be None if the path doesn't exist. Raises: errors.GetLocalImageError if search_path contains more than one boot image or the file format is not correct. """ boot_image_path = create_common.FindBootImage(search_path, raise_error=False) vendor_boot_image_path = os.path.join(search_path, _VENDOR_BOOT_IMAGE_NAME) if not os.path.isfile(vendor_boot_image_path): vendor_boot_image_path = None return boot_image_path, vendor_boot_image_path def FindKernelImages(search_path): """Find kernel and initramfs images in a path. Args: search_path: A path to an image directory. Returns: The kernel image path and the initramfs image path. Each value can be None if the path doesn't exist. """ paths = [os.path.join(search_path, name) for name in _KERNEL_IMAGE_NAMES] kernel_image_path = next((path for path in paths if os.path.isfile(path)), None) initramfs_image_path = os.path.join(search_path, _INITRAMFS_IMAGE_NAME) if not os.path.isfile(initramfs_image_path): initramfs_image_path = None return kernel_image_path, initramfs_image_path @utils.TimeExecute(function_description="Uploading local kernel images.") def _UploadKernelImages(ssh_obj, remote_dir, search_path): """Find and upload kernel or boot images to a remote host or a GCE instance. Args: ssh_obj: An Ssh object. remote_dir: The remote base directory. search_path: A path to an image file or an image directory. Returns: A list of strings, the launch_cvd arguments including the remote paths. Raises: errors.GetLocalImageError if search_path does not contain kernel images. """ # Assume that the caller cleaned up the remote home directory. ssh_obj.Run("mkdir -p " + remote_path.join(remote_dir, _REMOTE_IMAGE_DIR)) kernel_image_path, initramfs_image_path = FindKernelImages(search_path) if kernel_image_path and initramfs_image_path: remote_kernel_image_path = remote_path.join( remote_dir, _REMOTE_KERNEL_IMAGE_PATH) remote_initramfs_image_path = remote_path.join( remote_dir, _REMOTE_INITRAMFS_IMAGE_PATH) ssh_obj.ScpPushFile(kernel_image_path, remote_kernel_image_path) ssh_obj.ScpPushFile(initramfs_image_path, remote_initramfs_image_path) return ["-kernel_path", remote_kernel_image_path, "-initramfs_path", remote_initramfs_image_path] boot_image_path, vendor_boot_image_path = FindBootImages(search_path) if boot_image_path: remote_boot_image_path = remote_path.join( remote_dir, _REMOTE_BOOT_IMAGE_PATH) ssh_obj.ScpPushFile(boot_image_path, remote_boot_image_path) launch_cvd_args = ["-boot_image", remote_boot_image_path] if vendor_boot_image_path: remote_vendor_boot_image_path = remote_path.join( remote_dir, _REMOTE_VENDOR_BOOT_IMAGE_PATH) ssh_obj.ScpPushFile(vendor_boot_image_path, remote_vendor_boot_image_path) launch_cvd_args.extend(["-vendor_boot_image", remote_vendor_boot_image_path]) return launch_cvd_args raise errors.GetLocalImageError( f"{search_path} is not a boot image or a directory containing images.") @utils.TimeExecute(function_description="Uploading disabled vbmeta image.") def _UploadDisabledVbmetaImage(ssh_obj, remote_dir, local_tool_dirs): """Upload disabled vbmeta image to a remote host or a GCE instance. Args: ssh_obj: An Ssh object. remote_dir: The remote base directory. local_tool_dirs: A list of local directories containing tools. Returns: A list of strings, the launch_cvd arguments including the remote paths. Raises: CheckPathError if local_tool_dirs do not contain OTA tools. """ # Assume that the caller cleaned up the remote home directory. ssh_obj.Run("mkdir -p " + remote_path.join(remote_dir, _REMOTE_IMAGE_DIR)) remote_vbmeta_image_path = remote_path.join(remote_dir, _REMOTE_VBMETA_IMAGE_PATH) with tempfile.NamedTemporaryFile(prefix="vbmeta", suffix=".img") as temp_file: tool_dirs = local_tool_dirs + create_common.GetNonEmptyEnvVars( constants.ENV_ANDROID_SOONG_HOST_OUT, constants.ENV_ANDROID_HOST_OUT) ota = ota_tools.FindOtaTools(tool_dirs) ota.MakeDisabledVbmetaImage(temp_file.name) ssh_obj.ScpPushFile(temp_file.name, remote_vbmeta_image_path) return ["-vbmeta_image", remote_vbmeta_image_path] def UploadExtraImages(ssh_obj, remote_dir, avd_spec): """Find and upload the images specified in avd_spec. Args: ssh_obj: An Ssh object. remote_dir: The remote base directory. avd_spec: An AvdSpec object containing extra image paths. Returns: A list of strings, the launch_cvd arguments including the remote paths. Raises: errors.GetLocalImageError if any specified image path does not exist. """ extra_img_args = [] if avd_spec.local_kernel_image: extra_img_args += _UploadKernelImages(ssh_obj, remote_dir, avd_spec.local_kernel_image) if avd_spec.local_vendor_image: extra_img_args += _UploadDisabledVbmetaImage(ssh_obj, remote_dir, avd_spec.local_tool_dirs) return extra_img_args @utils.TimeExecute(function_description="Uploading local super image") def UploadSuperImage(ssh_obj, remote_dir, super_image_path): """Upload a super image to a remote host or a GCE instance. Args: ssh_obj: An Ssh object. remote_dir: The remote base directory. super_image_path: Path to the super image file. Returns: A list of strings, the launch_cvd arguments including the remote paths. """ # Assume that the caller cleaned up the remote home directory. super_image_stem = os.path.basename(super_image_path) remote_super_image_dir = remote_path.join( remote_dir, _REMOTE_SUPER_IMAGE_DIR) remote_super_image_path = remote_path.join( remote_super_image_dir, super_image_stem) ssh_obj.Run(f"mkdir -p {remote_super_image_dir}") cmd = (f"tar -cf - --lzop -S -C {os.path.dirname(super_image_path)} " f"{super_image_stem} | " f"{ssh_obj.GetBaseCmd(constants.SSH_BIN)} -- " f"tar -xf - --lzop -S -C {remote_super_image_dir}") ssh.ShellCmdWithRetry(cmd) launch_cvd_args = ["-super_image", remote_super_image_path] return launch_cvd_args def CleanUpRemoteCvd(ssh_obj, remote_dir, raise_error): """Call stop_cvd and delete the files on a remote host or a GCE instance. Args: ssh_obj: An Ssh object. remote_dir: The remote base directory. raise_error: Whether to raise an error if the remote instance is not running. Raises: subprocess.CalledProcessError if any command fails. """ home = remote_path.join("$HOME", remote_dir) stop_cvd_path = remote_path.join(remote_dir, "bin", "stop_cvd") stop_cvd_cmd = f"'HOME={home} {stop_cvd_path}'" if raise_error: ssh_obj.Run(stop_cvd_cmd) else: try: ssh_obj.Run(stop_cvd_cmd, retry=0) except Exception as e: logger.debug( "Failed to stop_cvd (possibly no running device): %s", e) # This command deletes all files except hidden files under HOME. # It does not raise an error if no files can be deleted. ssh_obj.Run(f"'rm -rf {remote_path.join(remote_dir, '*')}'") def GetRemoteHostBaseDir(base_instance_num): """Get remote base directory by instance number. Args: base_instance_num: Integer or None, the instance number of the device. Returns: The remote base directory. """ return _REMOTE_HOST_BASE_DIR_FORMAT % {"num": base_instance_num or 1} def FormatRemoteHostInstanceName(ip_addr, base_instance_num, build_id, build_target): """Convert an IP address and build info to an instance name. Args: ip_addr: String, the IP address of the remote host. base_instance_num: Integer or None, the instance number of the device. build_id: String, the build id. build_target: String, the build target, e.g., aosp_cf_x86_64_phone. Return: String, the instance name. """ return _REMOTE_HOST_INSTANCE_NAME_FORMAT % { "ip_addr": ip_addr, "num": base_instance_num or 1, "build_id": build_id, "build_target": build_target} def ParseRemoteHostAddress(instance_name): """Parse IP address from a remote host instance name. Args: instance_name: String, the instance name. Returns: The IP address and the base directory as strings. None if the name does not represent a remote host instance. """ match = _REMOTE_HOST_INSTANCE_NAME_PATTERN.fullmatch(instance_name) if match: return (match.group("ip_addr"), GetRemoteHostBaseDir(int(match.group("num")))) return None # pylint:disable=too-many-branches def GetLaunchCvdArgs(avd_spec, config=None): """Get launch_cvd arguments for remote instances. Args: avd_spec: An AVDSpec instance. config: A string, the name of the predefined hardware config. e.g., "auto", "phone", and "tv". Returns: A list of strings, arguments of launch_cvd. """ launch_cvd_args = [] blank_data_disk_size_gb = avd_spec.cfg.extra_data_disk_size_gb if blank_data_disk_size_gb and blank_data_disk_size_gb > 0: launch_cvd_args.append( "-data_policy=" + _DATA_POLICY_CREATE_IF_MISSING) launch_cvd_args.append( "-blank_data_image_mb=" + str(blank_data_disk_size_gb * 1024)) if config: launch_cvd_args.append("-config=" + config) if avd_spec.hw_customize or not config: launch_cvd_args.append( "-x_res=" + avd_spec.hw_property[constants.HW_X_RES]) launch_cvd_args.append( "-y_res=" + avd_spec.hw_property[constants.HW_Y_RES]) launch_cvd_args.append( "-dpi=" + avd_spec.hw_property[constants.HW_ALIAS_DPI]) if constants.HW_ALIAS_DISK in avd_spec.hw_property: launch_cvd_args.append( "-data_policy=" + _DATA_POLICY_ALWAYS_CREATE) launch_cvd_args.append( "-blank_data_image_mb=" + avd_spec.hw_property[constants.HW_ALIAS_DISK]) if constants.HW_ALIAS_CPUS in avd_spec.hw_property: launch_cvd_args.append( "-cpus=" + str(avd_spec.hw_property[constants.HW_ALIAS_CPUS])) if constants.HW_ALIAS_MEMORY in avd_spec.hw_property: launch_cvd_args.append( "-memory_mb=" + str(avd_spec.hw_property[constants.HW_ALIAS_MEMORY])) if avd_spec.connect_webrtc: launch_cvd_args.extend(_WEBRTC_ARGS) if avd_spec.webrtc_device_id: launch_cvd_args.append( _WEBRTC_ID % {"instance": avd_spec.webrtc_device_id}) if avd_spec.connect_vnc: launch_cvd_args.extend(_VNC_ARGS) if avd_spec.openwrt: launch_cvd_args.append(_ENABLE_CONSOLE_ARG) if avd_spec.num_avds_per_instance > 1: launch_cvd_args.append( _NUM_AVDS_ARG % {"num_AVD": avd_spec.num_avds_per_instance}) if avd_spec.base_instance_num: launch_cvd_args.append( "--base-instance-num=" + str(avd_spec.base_instance_num)) if avd_spec.launch_args: launch_cvd_args.append(avd_spec.launch_args) launch_cvd_args.append(UNDEFOK_ARG) launch_cvd_args.append(AGREEMENT_PROMPT_ARG) return launch_cvd_args def _GetRemoteRuntimeDirs(ssh_obj, remote_dir, base_instance_num, num_avds_per_instance): """Get cuttlefish runtime directories on a remote host or a GCE instance. Args: ssh_obj: An Ssh object. remote_dir: The remote base directory. base_instance_num: An integer, the instance number of the first device. num_avds_per_instance: An integer, the number of devices. Returns: A list of strings, the paths to the runtime directories. """ runtime_dir = remote_path.join( remote_dir, _REMOTE_RUNTIME_DIR_FORMAT % {"num": base_instance_num}) try: ssh_obj.Run(f"test -d {runtime_dir}", retry=0) return [remote_path.join(remote_dir, _REMOTE_RUNTIME_DIR_FORMAT % {"num": base_instance_num + num}) for num in range(num_avds_per_instance)] except subprocess.CalledProcessError: logger.debug("%s is not the runtime directory.", runtime_dir) legacy_runtime_dirs = [ remote_path.join(remote_dir, constants.REMOTE_LOG_FOLDER)] legacy_runtime_dirs.extend( remote_path.join(remote_dir, _REMOTE_LEGACY_RUNTIME_DIR_FORMAT % {"num": base_instance_num + num}) for num in range(1, num_avds_per_instance)) return legacy_runtime_dirs def GetRemoteFetcherConfigJson(remote_dir): """Get the config created by fetch_cvd on a remote host or a GCE instance. Args: remote_dir: The remote base directory. Returns: An object of report.LogFile. """ return report.LogFile(remote_path.join(remote_dir, "fetcher_config.json"), constants.LOG_TYPE_CUTTLEFISH_LOG) def _GetRemoteTombstone(runtime_dir, name_suffix): """Get log object for tombstones in a remote cuttlefish runtime directory. Args: runtime_dir: The path to the remote cuttlefish runtime directory. name_suffix: The string appended to the log name. It is used to distinguish log files found in different runtime_dirs. Returns: A report.LogFile object. """ return report.LogFile(remote_path.join(runtime_dir, "tombstones"), constants.LOG_TYPE_DIR, "tombstones-zip" + name_suffix) def _GetLogType(file_name): """Determine log type by file name. Args: file_name: A file name. Returns: A string, one of the log types defined in constants. None if the file is not a log file. """ if file_name == "kernel.log": return constants.LOG_TYPE_KERNEL_LOG if file_name == "logcat": return constants.LOG_TYPE_LOGCAT if file_name.endswith(".log") or file_name == "cuttlefish_config.json": return constants.LOG_TYPE_CUTTLEFISH_LOG return None def FindRemoteLogs(ssh_obj, remote_dir, base_instance_num, num_avds_per_instance): """Find log objects on a remote host or a GCE instance. Args: ssh_obj: An Ssh object. remote_dir: The remote base directory. base_instance_num: An integer or None, the instance number of the first device. num_avds_per_instance: An integer or None, the number of devices. Returns: A list of report.LogFile objects. """ runtime_dirs = _GetRemoteRuntimeDirs( ssh_obj, remote_dir, (base_instance_num or 1), (num_avds_per_instance or 1)) logs = [] for log_path in utils.FindRemoteFiles(ssh_obj, runtime_dirs): file_name = remote_path.basename(log_path) log_type = _GetLogType(file_name) if not log_type: continue base, ext = remote_path.splitext(file_name) # The index of the runtime_dir containing log_path. index_str = "" for index, runtime_dir in enumerate(runtime_dirs): if log_path.startswith(runtime_dir + remote_path.sep): index_str = "." + str(index) if index else "" log_name = ("full_gce_logcat" + index_str if file_name == "logcat" else base + index_str + ext) logs.append(report.LogFile(log_path, log_type, log_name)) logs.extend(_GetRemoteTombstone(runtime_dir, ("." + str(index) if index else "")) for index, runtime_dir in enumerate(runtime_dirs)) return logs def FindLocalLogs(runtime_dir, instance_num): """Find log objects in a local runtime directory. Args: runtime_dir: A string, the runtime directory path. instance_num: An integer, the instance number. Returns: A list of report.LogFile. """ log_dir = _LOCAL_LOG_DIR_FORMAT % {"runtime_dir": runtime_dir, "num": instance_num} if not os.path.isdir(log_dir): log_dir = runtime_dir logs = [] for parent_dir, _, file_names in os.walk(log_dir, followlinks=False): for file_name in file_names: log_path = os.path.join(parent_dir, file_name) log_type = _GetLogType(file_name) if os.path.islink(log_path) or not log_type: continue logs.append(report.LogFile(log_path, log_type)) return logs def GetRemoteBuildInfoDict(avd_spec): """Convert remote build infos to a dictionary for reporting. Args: avd_spec: An AvdSpec object containing the build infos. Returns: A dict containing the build infos. """ build_info_dict = { key: val for key, val in avd_spec.remote_image.items() if val} # kernel_target has a default value. If the user provides kernel_build_id # or kernel_branch, then convert kernel build info. if (avd_spec.kernel_build_info.get(constants.BUILD_ID) or avd_spec.kernel_build_info.get(constants.BUILD_BRANCH)): build_info_dict.update( {"kernel_" + key: val for key, val in avd_spec.kernel_build_info.items() if val} ) build_info_dict.update( {"system_" + key: val for key, val in avd_spec.system_build_info.items() if val} ) build_info_dict.update( {"bootloader_" + key: val for key, val in avd_spec.bootloader_build_info.items() if val} ) return build_info_dict def GetMixBuildTargetFilename(build_target, build_id): """Get the mix build target filename. Args: build_id: String, Build id, e.g. "2263051", "P2804227" build_target: String, the build target, e.g. cf_x86_phone-userdebug Returns: String, a file name, e.g. "cf_x86_phone-target_files-2263051.zip" """ return _DOWNLOAD_MIX_IMAGE_NAME.format( build_target=build_target.split('-')[0], build_id=build_id) def FindMiscInfo(image_dir): """Find misc info in build output dir or extracted target files. Args: image_dir: The directory to search for misc info. Returns: image_dir if the directory structure looks like an output directory in build environment. image_dir/META if it looks like extracted target files. Raises: errors.CheckPathError if this function cannot find misc info. """ misc_info_path = os.path.join(image_dir, _MISC_INFO_FILE_NAME) if os.path.isfile(misc_info_path): return misc_info_path misc_info_path = os.path.join(image_dir, _TARGET_FILES_META_DIR_NAME, _MISC_INFO_FILE_NAME) if os.path.isfile(misc_info_path): return misc_info_path raise errors.CheckPathError( f"Cannot find {_MISC_INFO_FILE_NAME} in {image_dir}. The " f"directory is expected to be an extracted target files zip or " f"{constants.ENV_ANDROID_PRODUCT_OUT}.") def FindImageDir(image_dir): """Find images in build output dir or extracted target files. Args: image_dir: The directory to search for images. Returns: image_dir if the directory structure looks like an output directory in build environment. image_dir/IMAGES if it looks like extracted target files. Raises: errors.GetLocalImageError if this function cannot find any image. """ if glob.glob(os.path.join(image_dir, "*.img")): return image_dir subdir = os.path.join(image_dir, _TARGET_FILES_IMAGES_DIR_NAME) if glob.glob(os.path.join(subdir, "*.img")): return subdir raise errors.GetLocalImageError( "Cannot find images in %s." % image_dir) def IsArmImage(image): """Check if the image is built for ARM. Args: image: Image meta info. Returns: A boolean, whether the image is for ARM. """ return _ARM_TARGET_PATTERN in image.get("build_target", "") def FindVendorImages(image_dir): """Find vendor, vendor_dlkm, odm, and odm_dlkm image in build output dir. Args: image_dir: The directory to search for images. Returns: An object of VendorImagePaths. Raises: errors.GetLocalImageError if this function cannot find images. """ image_paths = [] for image_name in _VENDOR_IMAGE_NAMES: image_path = os.path.join(image_dir, image_name) if not os.path.isfile(image_path): raise errors.GetLocalImageError( f"Cannot find {image_path} in {image_dir}.") image_paths.append(image_path) return VendorImagePaths(*image_paths)