1import importlib 2import os 3import re 4from itertools import chain 5from pathlib import Path 6from typing import Any, Iterable, Literal 7from unittest import mock 8 9import lava.utils.constants 10import pytest 11from lava.lava_job_submitter import LAVAJobSubmitter 12from lava.utils.lava_job_definition import LAVAJobDefinition 13from ruamel.yaml import YAML 14 15 16def flatten(iterable: Iterable[Iterable[Any]]) -> list[Any]: 17 return list(chain.from_iterable(iterable)) 18 19 20# mock shell file 21@pytest.fixture(scope="session") 22def shell_file(tmp_path_factory): 23 def create_shell_file(content: str = "# test"): 24 shell_file = tmp_path_factory.mktemp("data") / "shell_file.sh" 25 shell_file.write_text(content) 26 return shell_file 27 28 return create_shell_file 29 30 31# fn to load the data file from $CWD/data using pathlib 32def load_data_file(filename): 33 return Path(__file__).parent.parent / "data" / filename 34 35 36def load_yaml_file(filename) -> dict: 37 with open(load_data_file(filename)) as f: 38 return YAML().load(f) 39 40 41def job_submitter_factory(mode: Literal["UBOOT", "FASTBOOT"], shell_file): 42 if mode == "UBOOT": 43 boot_method = "u-boot" 44 device_type = "my_uboot_device_type" 45 elif mode == "FASTBOOT": 46 boot_method = "fastboot" 47 device_type = "my_fastboot_device_type" 48 49 job_timeout_min = 10 50 mesa_job_name = "dut test" 51 pipeline_info = "my_pipeline_info" 52 project_name = "test-project" 53 visibility_group = "my_visibility_group" 54 55 return LAVAJobSubmitter( 56 boot_method=boot_method, 57 ci_project_dir="/ci/project/dir", 58 device_type=device_type, 59 dtb_filename="my_dtb_filename", 60 first_stage_init=shell_file, 61 job_timeout_min=job_timeout_min, 62 mesa_job_name=mesa_job_name, 63 pipeline_info=pipeline_info, 64 visibility_group=visibility_group, 65 project_name=project_name, 66 ) 67 68 69@pytest.fixture 70def clear_env_vars(autouse=True): 71 with mock.patch.dict(os.environ) as environ: 72 # Remove all LAVA-related environment variables to make the test more robust 73 # and deterministic, once a envvar is capable of overriding the default value 74 for key in environ: 75 if any(kw in key for kw in ("LAVA_", "CI_", "JOB_", "RUNNER_", "DEVICE_")): 76 del environ[key] 77 # reload lava.utils.constants to update the JOB_PRIORITY value 78 importlib.reload(lava.utils.constants) 79 importlib.reload(lava.utils.lava_job_definition) 80 yield 81 82 83@pytest.fixture 84def mock_collabora_farm(clear_env_vars, monkeypatch): 85 # Mock a Collabora farm-like device runner tag to enable SSH execution 86 monkeypatch.setenv("RUNNER_TAG", "mesa-ci-1234-lava-collabora") 87 88 89@pytest.mark.parametrize("force_uart", [True, False], ids=["SSH", "UART"]) 90@pytest.mark.parametrize("mode", ["UBOOT", "FASTBOOT"]) 91def test_generate_lava_job_definition_sanity( 92 force_uart, mode, shell_file, mock_collabora_farm, monkeypatch 93): 94 monkeypatch.setattr(lava.utils.lava_job_definition, "FORCE_UART", force_uart) 95 96 init_script_content = f"echo test {mode}" 97 job_submitter = job_submitter_factory(mode, shell_file(init_script_content)) 98 job_definition = LAVAJobDefinition(job_submitter).generate_lava_job_definition() 99 100 # Load the YAML output and check that it contains the expected keys and values 101 yaml = YAML() 102 job_dict = yaml.load(job_definition) 103 yaml.dump(job_dict, Path(f"/tmp/{mode}_force_uart={force_uart}_job_definition.yaml")) 104 assert job_dict["device_type"] == job_submitter.device_type 105 assert job_dict["visibility"]["group"] == [job_submitter.visibility_group] 106 assert job_dict["timeouts"]["job"]["minutes"] == job_submitter.job_timeout_min 107 assert job_dict["context"]["extra_nfsroot_args"] 108 assert job_dict["timeouts"]["actions"] 109 110 assert len(job_dict["actions"]) == 3 if mode == "UART" else 5 111 112 last_test_action = job_dict["actions"][-1]["test"] 113 # TODO: Remove hardcoded "mesa" test name, as this submitter is being used by other projects 114 first_test_name = last_test_action["definitions"][0]["name"] 115 is_running_ssh = "ssh" in first_test_name 116 # if force_uart, is_ssh must be False. If is_ssh, force_uart must be False. Both can be False 117 assert not (is_running_ssh and force_uart) 118 assert last_test_action["failure_retry"] == 3 if is_running_ssh else 1 119 120 run_steps = "".join(last_test_action["definitions"][0]["repository"]["run"]["steps"]) 121 # Check for project name in lava-test-case 122 assert re.search(rf"lava.?\S*.test.case.*{job_submitter.project_name}", run_steps) 123 124 action_names = flatten(j.keys() for j in job_dict["actions"]) 125 if is_running_ssh: 126 assert action_names == ( 127 [ 128 "deploy", 129 "boot", 130 "test", # DUT: SSH server 131 "test", # Docker: SSH client 132 ] 133 if mode == "UBOOT" 134 else [ 135 "deploy", # NFS 136 "deploy", # Image generation 137 "deploy", # Image deployment 138 "boot", 139 "test", # DUT: SSH server 140 "test", # Docker: SSH client 141 ] 142 ) 143 test_action_server = job_dict["actions"][-2]["test"] 144 # SSH server in the DUT 145 assert test_action_server["namespace"] == "dut" 146 # SSH client via docker 147 assert last_test_action["namespace"] == "container" 148 149 boot_action = next(a["boot"] for a in job_dict["actions"] if "boot" in a) 150 assert boot_action["namespace"] == "dut" 151 152 # SSH server bootstrapping 153 assert "dropbear" in "".join(boot_action["auto_login"]["login_commands"]) 154 return 155 156 # ---- Not SSH job 157 assert action_names == ( 158 [ 159 "deploy", 160 "boot", 161 "test", 162 ] 163 if mode == "UBOOT" 164 else [ 165 "deploy", # NFS 166 "deploy", # Image generation 167 "deploy", # Image deployment 168 "boot", 169 "test", 170 ] 171 ) 172 assert init_script_content in run_steps 173 174 175# use yaml files from tests/data/ to test the job definition generation 176@pytest.mark.parametrize("force_uart", [False, True], ids=["SSH", "UART"]) 177@pytest.mark.parametrize("mode", ["UBOOT", "FASTBOOT"]) 178def test_lava_job_definition(mode, force_uart, shell_file, mock_collabora_farm, monkeypatch): 179 monkeypatch.setattr(lava.utils.lava_job_definition, "FORCE_UART", force_uart) 180 181 yaml = YAML() 182 yaml.default_flow_style = False 183 184 # Load the YAML output and check that it contains the expected keys and values 185 expected_job_dict = load_yaml_file(f"{mode}_force_uart={force_uart}_job_definition.yaml") 186 187 init_script_content = f"echo test {mode}" 188 job_submitter = job_submitter_factory(mode, shell_file(init_script_content)) 189 job_definition = LAVAJobDefinition(job_submitter).generate_lava_job_definition() 190 191 job_dict = yaml.load(job_definition) 192 193 # Uncomment the following to update the expected YAML files 194 # yaml.dump(job_dict, Path(f"../../data/{mode}_force_uart={force_uart}_job_definition.yaml")) 195 196 # Check that the generated job definition matches the expected one 197 assert job_dict == expected_job_dict 198