1import importlib 2import os 3import re 4from itertools import chain 5from pathlib import Path 6from typing import Any, Iterable, Literal 7from unittest import mock 8 9import lava.utils.constants 10import pytest 11from lava.lava_job_submitter import LAVAJobSubmitter 12from lava.utils.lava_job_definition import LAVAJobDefinition 13from ruamel.yaml import YAML 14 15 16def flatten(iterable: Iterable[Iterable[Any]]) -> list[Any]: 17 return list(chain.from_iterable(iterable)) 18 19 20# mock shell file 21@pytest.fixture(scope="session") 22def shell_file(tmp_path_factory): 23 def create_shell_file(content: str = "# test"): 24 shell_file = tmp_path_factory.mktemp("data") / "shell_file.sh" 25 shell_file.write_text(content) 26 return shell_file 27 28 return create_shell_file 29 30 31# fn to load the data file from $CWD/data using pathlib 32def load_data_file(filename): 33 return Path(__file__).parent.parent / "data" / filename 34 35 36def load_yaml_file(filename) -> dict: 37 with open(load_data_file(filename)) as f: 38 return YAML().load(f) 39 40 41def job_submitter_factory(mode: Literal["UBOOT", "FASTBOOT"], shell_file): 42 if mode == "UBOOT": 43 boot_method = "u-boot" 44 device_type = "my_uboot_device_type" 45 elif mode == "FASTBOOT": 46 boot_method = "fastboot" 47 device_type = "my_fastboot_device_type" 48 49 job_timeout_min = 10 50 mesa_job_name = "dut test" 51 pipeline_info = "my_pipeline_info" 52 project_name = "test-project" 53 visibility_group = "my_visibility_group" 54 55 return LAVAJobSubmitter( 56 boot_method=boot_method, 57 device_type=device_type, 58 farm="test_farm", 59 dtb_filename="my_dtb_filename", 60 first_stage_init=shell_file, 61 job_timeout_min=job_timeout_min, 62 mesa_job_name=mesa_job_name, 63 pipeline_info=pipeline_info, 64 visibility_group=visibility_group, 65 project_name=project_name, 66 ) 67 68 69@pytest.fixture 70def clear_env_vars(autouse=True): 71 with mock.patch.dict(os.environ) as environ: 72 # Remove all LAVA-related environment variables to make the test more robust 73 # and deterministic, once a envvar is capable of overriding the default value 74 for key in environ: 75 if any(kw in key for kw in ("LAVA_", "CI_", "JOB_", "RUNNER_", "DEVICE_")): 76 del environ[key] 77 # reload lava.utils.constants to update the JOB_PRIORITY value 78 importlib.reload(lava.utils.constants) 79 importlib.reload(lava.utils.lava_job_definition) 80 yield 81 82 83@pytest.fixture 84def mock_collabora_farm(clear_env_vars, monkeypatch): 85 # Mock a Collabora farm-like device runner tag to enable SSH execution 86 monkeypatch.setenv("RUNNER_TAG", "mesa-ci-1234-lava-collabora") 87 88 89@pytest.mark.parametrize("force_uart", [True, False], ids=["SSH", "UART"]) 90@pytest.mark.parametrize("mode", ["UBOOT", "FASTBOOT"]) 91@mock.patch("lava.lava_job_submitter.setup_lava_proxy") 92def test_generate_lava_job_definition_sanity( 93 mock_lava_proxy, 94 force_uart, 95 mode, 96 shell_file, 97 mock_collabora_farm, 98 monkeypatch, 99 mock_proxy, 100): 101 monkeypatch.setattr(lava.utils.lava_job_definition, "FORCE_UART", force_uart) 102 # Do not actually connect to the LAVA server 103 mock_lava_proxy.return_value = mock_proxy 104 105 init_script_content = f"echo test {mode}" 106 job_submitter = job_submitter_factory(mode, shell_file(init_script_content)) 107 job_definition = LAVAJobDefinition(job_submitter).generate_lava_job_definition() 108 109 # Load the YAML output and check that it contains the expected keys and values 110 yaml = YAML() 111 job_dict = yaml.load(job_definition) 112 yaml.dump(job_dict, Path(f"/tmp/{mode}_force_uart={force_uart}_job_definition.yaml")) 113 assert job_dict["device_type"] == job_submitter.device_type 114 assert job_dict["visibility"]["group"] == [job_submitter.visibility_group] 115 assert job_dict["timeouts"]["job"]["minutes"] == job_submitter.job_timeout_min 116 assert job_dict["context"]["extra_nfsroot_args"] 117 assert job_dict["timeouts"]["actions"] 118 119 assert len(job_dict["actions"]) == 3 if mode == "UART" else 5 120 121 last_test_action = job_dict["actions"][-1]["test"] 122 # TODO: Remove hardcoded "mesa" test name, as this submitter is being used by other projects 123 first_test_name = last_test_action["definitions"][0]["name"] 124 is_running_ssh = "ssh" in first_test_name 125 # if force_uart, is_ssh must be False. If is_ssh, force_uart must be False. Both can be False 126 assert not (is_running_ssh and force_uart) 127 assert last_test_action["failure_retry"] == 3 if is_running_ssh else 1 128 129 run_steps = "".join(last_test_action["definitions"][0]["repository"]["run"]["steps"]) 130 # Check for project name in lava-test-case 131 assert re.search(rf"lava.?\S*.test.case.*{job_submitter.project_name}", run_steps) 132 133 action_names = flatten(j.keys() for j in job_dict["actions"]) 134 if is_running_ssh: 135 assert action_names == ( 136 [ 137 "deploy", 138 "boot", 139 "test", # DUT: SSH server 140 "test", # Docker: SSH client 141 ] 142 if mode == "UBOOT" 143 else [ 144 "deploy", # NFS 145 "deploy", # Image generation 146 "deploy", # Image deployment 147 "boot", 148 "test", # DUT: SSH server 149 "test", # Docker: SSH client 150 ] 151 ) 152 test_action_server = job_dict["actions"][-2]["test"] 153 # SSH server in the DUT 154 assert test_action_server["namespace"] == "dut" 155 # SSH client via docker 156 assert last_test_action["namespace"] == "container" 157 158 boot_action = next(a["boot"] for a in job_dict["actions"] if "boot" in a) 159 assert boot_action["namespace"] == "dut" 160 161 # SSH server bootstrapping 162 assert "dropbear" in "".join(boot_action["auto_login"]["login_commands"]) 163 return 164 165 # ---- Not SSH job 166 assert action_names == ( 167 [ 168 "deploy", 169 "boot", 170 "test", 171 ] 172 if mode == "UBOOT" 173 else [ 174 "deploy", # NFS 175 "deploy", # Image generation 176 "deploy", # Image deployment 177 "boot", 178 "test", 179 ] 180 ) 181 assert init_script_content in run_steps 182 183 184# use yaml files from tests/data/ to test the job definition generation 185@pytest.mark.parametrize("force_uart", [False, True], ids=["SSH", "UART"]) 186@pytest.mark.parametrize("mode", ["UBOOT", "FASTBOOT"]) 187@mock.patch("lava.lava_job_submitter.setup_lava_proxy") 188def test_lava_job_definition( 189 mock_lava_proxy, 190 mode, 191 force_uart, 192 shell_file, 193 mock_collabora_farm, 194 mock_proxy, 195 monkeypatch, 196): 197 monkeypatch.setattr(lava.utils.lava_job_definition, "FORCE_UART", force_uart) 198 # Do not actually connect to the LAVA server 199 mock_lava_proxy.return_value = mock_proxy 200 201 yaml = YAML() 202 yaml.default_flow_style = False 203 204 # Load the YAML output and check that it contains the expected keys and values 205 expected_job_dict = load_yaml_file(f"{mode}_force_uart={force_uart}_job_definition.yaml") 206 207 init_script_content = f"echo test {mode}" 208 job_submitter = job_submitter_factory(mode, shell_file(init_script_content)) 209 job_definition = LAVAJobDefinition(job_submitter).generate_lava_job_definition() 210 211 job_dict = yaml.load(job_definition) 212 213 # Uncomment the following to update the expected YAML files 214 # yaml.dump(job_dict, Path(f"../../data/{mode}_force_uart={force_uart}_job_definition.yaml")) 215 216 # Check that the generated job definition matches the expected one 217 assert job_dict == expected_job_dict 218