from io import StringIO from typing import TYPE_CHECKING, Any from ruamel.yaml import YAML from lava.utils.lava_farm import LavaFarm, get_lava_farm from lava.utils.ssh_job_definition import ( generate_docker_test, generate_dut_test, wrap_boot_action, wrap_final_deploy_action, ) from lava.utils.uart_job_definition import ( fastboot_boot_action, fastboot_deploy_actions, tftp_boot_action, tftp_deploy_actions, qemu_boot_action, qemu_deploy_actions, uart_test_actions, ) if TYPE_CHECKING: from lava.lava_job_submitter import LAVAJobSubmitter from .constants import FORCE_UART, JOB_PRIORITY, NUMBER_OF_ATTEMPTS_LAVA_BOOT class LAVAJobDefinition: """ This class is responsible for generating the YAML payload to submit a LAVA job. """ def __init__(self, job_submitter: "LAVAJobSubmitter") -> None: self.job_submitter: "LAVAJobSubmitter" = job_submitter # NFS args provided by LAVA self.lava_nfs_args: str = "root=/dev/nfs rw nfsroot=$NFS_SERVER_IP:$NFS_ROOTFS,tcp,hard,v3 ip=dhcp" # extra_nfsroot_args appends to cmdline self.extra_nfsroot_args: str = " init=/init rootwait usbcore.quirks=0bda:8153:k" def has_ssh_support(self) -> bool: if FORCE_UART: return False # Only Collabora's farm supports to run docker container as a LAVA actions, # which is required to follow the job in a SSH section current_farm = get_lava_farm() return current_farm == LavaFarm.COLLABORA def generate_lava_yaml_payload(self) -> dict[str, Any]: """ Generates a YAML payload for submitting a LAVA job, based on the provided arguments. Args: None Returns: a dictionary containing the values generated by the `generate_metadata` function and the actions for the LAVA job submission. """ args = self.job_submitter nfsrootfs = { "url": f"{args.rootfs_url}", "compression": "zstd", "format": "tar", "overlays": args._overlays, } values = self.generate_metadata() init_stage1_steps = self.init_stage1_steps() jwt_steps = self.jwt_steps() deploy_actions = [] boot_action = [] test_actions = uart_test_actions(args, init_stage1_steps, jwt_steps) if args.boot_method == "fastboot": deploy_actions = fastboot_deploy_actions(self, nfsrootfs) boot_action = fastboot_boot_action(args) elif args.boot_method == "qemu-nfs": deploy_actions = qemu_deploy_actions(self, nfsrootfs) boot_action = qemu_boot_action(args) else: # tftp deploy_actions = tftp_deploy_actions(self, nfsrootfs) boot_action = tftp_boot_action(args) if self.has_ssh_support(): wrap_final_deploy_action(deploy_actions[-1]) # SSH jobs use namespaces to differentiate between the DUT and the # docker container. Every LAVA action needs an explicit namespace, when we are not using # the default one. for deploy_action in deploy_actions: deploy_action["namespace"] = "dut" wrap_boot_action(boot_action) test_actions = ( generate_dut_test(args, init_stage1_steps), generate_docker_test(args, jwt_steps), ) values["actions"] = [ *[{"deploy": d} for d in deploy_actions], {"boot": boot_action}, *[{"test": t} for t in test_actions], ] return values def generate_lava_job_definition(self) -> str: """ Generates a LAVA job definition in YAML format and returns it as a string. Returns: a string representation of the job definition generated by analysing job submitter arguments and environment variables """ job_stream = StringIO() yaml = YAML() yaml.width = 4096 yaml.dump(self.generate_lava_yaml_payload(), job_stream) return job_stream.getvalue() def consume_lava_tags_args(self, values: dict[str, Any]): # python-fire parses --lava-tags without arguments as True if isinstance(self.job_submitter.lava_tags, tuple): values["tags"] = self.job_submitter.lava_tags # python-fire parses "tag-1,tag2" as str and "tag1,tag2" as tuple # even if the -- --separator is something other than '-' elif isinstance(self.job_submitter.lava_tags, str): # Split string tags by comma, removing any trailing commas values["tags"] = self.job_submitter.lava_tags.rstrip(",").split(",") # Ensure tags are always a list of non-empty strings if "tags" in values: values["tags"] = [tag for tag in values["tags"] if tag] # Remove empty tags if "tags" in values and not values["tags"]: del values["tags"] def generate_metadata(self) -> dict[str, Any]: # General metadata and permissions values = { "job_name": f"{self.job_submitter.project_name}: {self.job_submitter.pipeline_info}", "device_type": self.job_submitter.device_type, "visibility": {"group": [self.job_submitter.visibility_group]}, "priority": JOB_PRIORITY, "context": {"extra_nfsroot_args": self.extra_nfsroot_args}, "timeouts": { "job": {"minutes": self.job_submitter.job_timeout_min}, "actions": { "depthcharge-retry": { # Could take between 1 and 1.5 min in slower boots "minutes": 4 }, "depthcharge-start": { # Should take less than 1 min. "minutes": 1, }, "depthcharge-action": { # This timeout englobes the entire depthcharge timing, # including retries "minutes": 5 * NUMBER_OF_ATTEMPTS_LAVA_BOOT, }, }, }, } self.consume_lava_tags_args(values) # QEMU lava jobs mandate proper arch value in the context if self.job_submitter.boot_method == "qemu-nfs": values["context"]["arch"] = self.job_submitter.mesa_job_name.split(":")[1] return values def attach_kernel_and_dtb(self, deploy_field): if self.job_submitter.kernel_image_type: deploy_field["kernel"]["type"] = self.job_submitter.kernel_image_type if self.job_submitter.dtb_filename: deploy_field["dtb"] = { "url": f"{self.job_submitter.kernel_url_prefix}/" f"{self.job_submitter.dtb_filename}.dtb" } def attach_external_modules(self, deploy_field): if self.job_submitter.kernel_external: deploy_field["modules"] = { "url": f"{self.job_submitter.kernel_url_prefix}/modules.tar.zst", "compression": "zstd" } def jwt_steps(self): """ This function is responsible for setting up the SSH server in the DUT and to export the first boot environment to a file. """ # Pre-process the JWT jwt_steps = [ "set -e", ] # If the JWT file is provided, we will use it to authenticate with the cloud # storage provider and will hide it from the job output in Gitlab. if self.job_submitter.jwt_file: with open(self.job_submitter.jwt_file) as jwt_file: jwt_steps += [ "set +x # HIDE_START", f'echo -n "{jwt_file.read()}" > "{self.job_submitter.jwt_file}"', "set -x # HIDE_END", f'echo "export S3_JWT_FILE={self.job_submitter.jwt_file}" >> /set-job-env-vars.sh', ] else: jwt_steps += [ "echo Could not find jwt file, disabling S3 requests...", "sed -i '/S3_RESULTS_UPLOAD/d' /set-job-env-vars.sh", ] return jwt_steps def init_stage1_steps(self) -> list[str]: run_steps = [] # job execution script: # - inline .gitlab-ci/common/init-stage1.sh # - fetch and unpack per-pipeline build artifacts from build job # - fetch and unpack per-job environment from lava-submit.sh # - exec .gitlab-ci/common/init-stage2.sh with open(self.job_submitter.first_stage_init, "r") as init_sh: # For vmware farm, patch nameserver as 8.8.8.8 is off limit. # This is temporary and will be reverted once the farm is moved. if self.job_submitter.mesa_job_name.startswith("vmware-"): run_steps += [x.rstrip().replace("nameserver 8.8.8.8", "nameserver 192.19.189.10") for x in init_sh if not x.startswith("#") and x.rstrip()] else: run_steps += [x.rstrip() for x in init_sh if not x.startswith("#") and x.rstrip()] # We cannot distribute the Adreno 660 shader firmware inside rootfs, # since the license isn't bundled inside the repository if self.job_submitter.device_type == "sm8350-hdk": run_steps.append( "curl -L --retry 4 -f --retry-all-errors --retry-delay 60 " + "https://github.com/allahjasif1990/hdk888-firmware/raw/main/a660_zap.mbn " + '-o "/lib/firmware/qcom/sm8350/a660_zap.mbn"' ) run_steps.append("export CURRENT_SECTION=dut_boot") return run_steps