1#!/usr/bin/env python3 2# 3# Copyright (C) 2022 Collabora Limited 4# Author: Guilherme Gallo <guilherme.gallo@collabora.com> 5# 6# SPDX-License-Identifier: MIT 7 8import os 9import xmlrpc.client 10from contextlib import nullcontext as does_not_raise 11from datetime import datetime 12from itertools import islice, repeat 13from pathlib import Path 14from typing import Generator 15from unittest.mock import MagicMock, patch 16 17import pytest 18from lava.exceptions import MesaCIException, MesaCIRetryError 19from lava.lava_job_submitter import ( 20 DEVICE_HANGING_TIMEOUT_SEC, 21 NUMBER_OF_RETRIES_TIMEOUT_DETECTION, 22 LAVAJob, 23 LAVAJobSubmitter, 24 bootstrap_log_follower, 25 follow_job_execution, 26 retriable_follow_job, 27) 28from lava.utils import LogSectionType 29 30from .lava.helpers import ( 31 generate_n_logs, 32 generate_testsuite_result, 33 jobs_logs_response, 34 mock_lava_signal, 35 mock_logs, 36 section_timeout, 37) 38 39NUMBER_OF_MAX_ATTEMPTS = NUMBER_OF_RETRIES_TIMEOUT_DETECTION + 1 40 41 42@pytest.fixture 43def mock_proxy_waiting_time(mock_proxy): 44 def update_mock_proxy(frozen_time, **kwargs): 45 wait_time = kwargs.pop("wait_time", 1) 46 proxy_mock = mock_proxy(**kwargs) 47 proxy_job_state = proxy_mock.scheduler.job_state 48 proxy_job_state.return_value = {"job_state": "Running"} 49 proxy_job_state.side_effect = frozen_time.tick(wait_time) 50 51 return proxy_mock 52 53 return update_mock_proxy 54 55 56@pytest.fixture(params=[{"CI": "true"}, {"CI": "false"}], ids=["Under CI", "Local run"]) 57def ci_environment(request): 58 with patch.dict(os.environ, request.param): 59 yield 60 61 62@pytest.fixture 63def lava_job_submitter( 64 ci_environment, 65 tmp_path, 66 mock_proxy, 67): 68 os.chdir(tmp_path) 69 tmp_file = Path(tmp_path) / "log.json" 70 71 with patch("lava.lava_job_submitter.setup_lava_proxy") as mock_setup_lava_proxy: 72 mock_setup_lava_proxy.return_value = mock_proxy() 73 yield LAVAJobSubmitter( 74 boot_method="test_boot", 75 ci_project_dir="test_dir", 76 device_type="test_device", 77 job_timeout_min=1, 78 structured_log_file=tmp_file, 79 ) 80 81 82@pytest.mark.parametrize("exception", [RuntimeError, SystemError, KeyError]) 83def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception): 84 with pytest.raises(MesaCIException): 85 proxy = mock_proxy(side_effect=exception) 86 job = LAVAJob(proxy, '') 87 log_follower = bootstrap_log_follower() 88 follow_job_execution(job, log_follower) 89 90 91NETWORK_EXCEPTION = xmlrpc.client.ProtocolError("", 0, "test", {}) 92XMLRPC_FAULT = xmlrpc.client.Fault(0, "test") 93 94PROXY_SCENARIOS = { 95 "simple pass case": (mock_logs(result="pass"), does_not_raise(), "pass", {}), 96 "simple fail case": (mock_logs(result="fail"), does_not_raise(), "fail", {}), 97 "simple hung case": ( 98 mock_logs( 99 messages={ 100 LogSectionType.TEST_CASE: [ 101 section_timeout(LogSectionType.TEST_CASE) + 1 102 ] 103 * 1000 104 }, 105 result="fail", 106 ), 107 pytest.raises(MesaCIRetryError), 108 "hung", 109 {}, 110 ), 111 "leftover dump from last job in boot section": ( 112 ( 113 mock_lava_signal(LogSectionType.LAVA_BOOT), 114 jobs_logs_response(finished=False, msg=None, result="fail"), 115 ), 116 pytest.raises(MesaCIRetryError), 117 "hung", 118 {}, 119 ), 120 "boot works at last retry": ( 121 mock_logs( 122 messages={ 123 LogSectionType.LAVA_BOOT: [ 124 section_timeout(LogSectionType.LAVA_BOOT) + 1 125 ] 126 * NUMBER_OF_RETRIES_TIMEOUT_DETECTION 127 + [1] 128 }, 129 result="pass", 130 ), 131 does_not_raise(), 132 "pass", 133 {}, 134 ), 135 "test case took too long": pytest.param( 136 mock_logs( 137 messages={ 138 LogSectionType.TEST_CASE: [ 139 section_timeout(LogSectionType.TEST_CASE) + 1 140 ] 141 * (NUMBER_OF_MAX_ATTEMPTS + 1) 142 }, 143 result="pass", 144 ), 145 pytest.raises(MesaCIRetryError), 146 "pass", 147 {}, 148 ), 149 "timed out more times than retry attempts": ( 150 generate_n_logs(n=4, tick_fn=9999999), 151 pytest.raises(MesaCIRetryError), 152 "fail", 153 {}, 154 ), 155 "long log case, no silence": ( 156 mock_logs( 157 messages={LogSectionType.TEST_CASE: [1] * (1000)}, 158 result="pass", 159 ), 160 does_not_raise(), 161 "pass", 162 {}, 163 ), 164 "no retries, testsuite succeed": ( 165 mock_logs(result="pass"), 166 does_not_raise(), 167 "pass", 168 { 169 "testsuite_results": [ 170 generate_testsuite_result(result="pass") 171 ] 172 }, 173 ), 174 "no retries, but testsuite fails": ( 175 mock_logs(result="fail"), 176 does_not_raise(), 177 "fail", 178 { 179 "testsuite_results": [ 180 generate_testsuite_result(result="fail") 181 ] 182 }, 183 ), 184 "no retries, one testsuite fails": ( 185 generate_n_logs(n=1, tick_fn=0, result="fail"), 186 does_not_raise(), 187 "fail", 188 { 189 "testsuite_results": [ 190 generate_testsuite_result(result="fail"), 191 generate_testsuite_result(result="pass") 192 ] 193 }, 194 ), 195 "very long silence": ( 196 generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS + 1, tick_fn=100000), 197 pytest.raises(MesaCIRetryError), 198 "fail", 199 {}, 200 ), 201 # If a protocol error happens, _call_proxy will retry without affecting timeouts 202 "unstable connection, ProtocolError followed by final message": ( 203 (NETWORK_EXCEPTION, *list(mock_logs(result="pass"))), 204 does_not_raise(), 205 "pass", 206 {}, 207 ), 208 # After an arbitrary number of retries, _call_proxy should call sys.exit 209 "unreachable case, subsequent ProtocolErrors": ( 210 repeat(NETWORK_EXCEPTION), 211 pytest.raises(SystemExit), 212 "fail", 213 {}, 214 ), 215 "XMLRPC Fault": ([XMLRPC_FAULT], pytest.raises(MesaCIRetryError), False, {}), 216} 217 218 219@pytest.mark.parametrize( 220 "test_log, expectation, job_result, proxy_args", 221 PROXY_SCENARIOS.values(), 222 ids=PROXY_SCENARIOS.keys(), 223) 224def test_retriable_follow_job( 225 mock_sleep, 226 test_log, 227 expectation, 228 job_result, 229 proxy_args, 230 mock_proxy, 231): 232 with expectation: 233 proxy = mock_proxy(side_effect=test_log, **proxy_args) 234 job: LAVAJob = retriable_follow_job(proxy, "") 235 assert job_result == job.status 236 237 238WAIT_FOR_JOB_SCENARIOS = {"one log run taking (sec):": (mock_logs(result="pass"))} 239 240 241@pytest.mark.parametrize("wait_time", (DEVICE_HANGING_TIMEOUT_SEC * 2,)) 242@pytest.mark.parametrize( 243 "side_effect", 244 WAIT_FOR_JOB_SCENARIOS.values(), 245 ids=WAIT_FOR_JOB_SCENARIOS.keys(), 246) 247def test_simulate_a_long_wait_to_start_a_job( 248 frozen_time, 249 wait_time, 250 side_effect, 251 mock_proxy_waiting_time, 252): 253 start_time = datetime.now() 254 job: LAVAJob = retriable_follow_job( 255 mock_proxy_waiting_time( 256 frozen_time, side_effect=side_effect, wait_time=wait_time 257 ), 258 "", 259 ) 260 261 end_time = datetime.now() 262 delta_time = end_time - start_time 263 264 assert job.status == "pass" 265 assert delta_time.total_seconds() >= wait_time 266 267 268 269CORRUPTED_LOG_SCENARIOS = { 270 "too much subsequent corrupted data": ( 271 [(False, "{'msg': 'Incomplete}")] * 100 + [jobs_logs_response(True)], 272 pytest.raises((MesaCIRetryError)), 273 ), 274 "one subsequent corrupted data": ( 275 [(False, "{'msg': 'Incomplete}")] * 2 + [jobs_logs_response(True)], 276 does_not_raise(), 277 ), 278} 279 280 281@pytest.mark.parametrize( 282 "data_sequence, expected_exception", 283 CORRUPTED_LOG_SCENARIOS.values(), 284 ids=CORRUPTED_LOG_SCENARIOS.keys(), 285) 286def test_log_corruption(mock_sleep, data_sequence, expected_exception, mock_proxy): 287 proxy_mock = mock_proxy() 288 proxy_logs_mock = proxy_mock.scheduler.jobs.logs 289 proxy_logs_mock.side_effect = data_sequence 290 with expected_exception: 291 retriable_follow_job(proxy_mock, "") 292 293 294LAVA_RESULT_LOG_SCENARIOS = { 295 # the submitter should accept xtrace logs 296 "Bash xtrace echo with kmsg interleaving": ( 297 "echo hwci: mesa: pass[ 737.673352] <LAVA_SIGNAL_ENDTC mesa-ci>", 298 "pass", 299 ), 300 # the submitter should accept xtrace logs 301 "kmsg result print": ( 302 "[ 737.673352] hwci: mesa: pass", 303 "pass", 304 ), 305 # if the job result echo has a very bad luck, it still can be interleaved 306 # with kmsg 307 "echo output with kmsg interleaving": ( 308 "hwci: mesa: pass[ 737.673352] <LAVA_SIGNAL_ENDTC mesa-ci>", 309 "pass", 310 ), 311 "fail case": ( 312 "hwci: mesa: fail", 313 "fail", 314 ), 315} 316 317 318@pytest.mark.parametrize( 319 "message, expectation", 320 LAVA_RESULT_LOG_SCENARIOS.values(), 321 ids=LAVA_RESULT_LOG_SCENARIOS.keys(), 322) 323def test_parse_job_result_from_log(message, expectation, mock_proxy): 324 job = LAVAJob(mock_proxy(), "") 325 job.parse_job_result_from_log([message]) 326 327 assert job.status == expectation 328 329 330@pytest.mark.slow( 331 reason="Slow and sketchy test. Needs a LAVA log raw file at /tmp/log.yaml" 332) 333@pytest.mark.skipif( 334 not Path("/tmp/log.yaml").is_file(), reason="Missing /tmp/log.yaml file." 335) 336def test_full_yaml_log(mock_proxy, frozen_time, lava_job_submitter): 337 import random 338 339 from lavacli.utils import flow_yaml as lava_yaml 340 341 def time_travel_from_log_chunk(data_chunk): 342 if not data_chunk: 343 return 344 345 first_log = lava_yaml.load(data_chunk[0])[0] 346 first_log_time = first_log["dt"] 347 frozen_time.move_to(first_log_time) 348 yield 349 350 last_log = lava_yaml.load(data_chunk[-1])[0] 351 last_log_time = last_log["dt"] 352 frozen_time.move_to(last_log_time) 353 yield 354 355 def time_travel_to_test_time(): 356 # Suppose that the first message timestamp of the entire LAVA job log is 357 # the same of from the job submitter execution 358 with open("/tmp/log.yaml", "r") as f: 359 first_log = f.readline() 360 first_log_time = lava_yaml.load(first_log)[0]["dt"] 361 frozen_time.move_to(first_log_time) 362 363 def load_lines() -> Generator[tuple[bool, str], None, None]: 364 with open("/tmp/log.yaml", "r") as f: 365 # data = yaml.safe_load(f) 366 log_lines = f.readlines() 367 serial_message: str = "" 368 chunk_start_line = 0 369 chunk_end_line = 0 370 chunk_max_size = 100 371 try: 372 while True: 373 chunk_end_line = chunk_start_line + random.randint(1, chunk_max_size) 374 # split the log in chunks of random size 375 log_chunk = list(islice(log_lines, chunk_start_line, chunk_end_line)) 376 chunk_start_line = chunk_end_line + 1 377 serial_message = "".join(log_chunk) 378 # time_traveller_gen will make the time trave according to the timestamp from 379 # the message 380 time_traveller_gen = time_travel_from_log_chunk(log_chunk) 381 # Suppose that the first message timestamp is the same of 382 # log fetch RPC call 383 next(time_traveller_gen) 384 yield False, "[]" 385 # Travel to the same datetime of the last fetched log line 386 # in the chunk 387 next(time_traveller_gen) 388 yield False, serial_message 389 except StopIteration: 390 yield True, serial_message 391 return 392 393 proxy = mock_proxy() 394 395 def reset_logs(*args): 396 proxy.scheduler.jobs.logs.side_effect = load_lines() 397 398 proxy.scheduler.jobs.submit = reset_logs 399 try: 400 time_travel_to_test_time() 401 start_time = datetime.now() 402 retriable_follow_job(proxy, "") 403 finally: 404 try: 405 # If the job fails, maybe there will be no structured log 406 print(lava_job_submitter.structured_log_file.read_text()) 407 finally: 408 end_time = datetime.now() 409 print("---- Reproduction log stats ----") 410 print(f"Start time: {start_time}") 411 print(f"End time: {end_time}") 412 print(f"Duration: {end_time - start_time}") 413 414 415@pytest.mark.parametrize( 416 "validate_only,finished_job_status,expected_combined_status,expected_exit_code", 417 [ 418 (True, "pass", None, None), 419 (False, "pass", "pass", 0), 420 (False, "fail", "fail", 1), 421 ], 422 ids=[ 423 "validate_only_no_job_submission", 424 "successful_job_submission", 425 "failed_job_submission", 426 ], 427) 428def test_job_combined_status( 429 lava_job_submitter, 430 validate_only, 431 finished_job_status, 432 expected_combined_status, 433 expected_exit_code, 434): 435 lava_job_submitter.validate_only = validate_only 436 437 with patch( 438 "lava.lava_job_submitter.retriable_follow_job" 439 ) as mock_retriable_follow_job, patch( 440 "lava.lava_job_submitter.LAVAJobSubmitter._LAVAJobSubmitter__prepare_submission" 441 ) as mock_prepare_submission, patch( 442 "sys.exit" 443 ): 444 from lava.lava_job_submitter import STRUCTURAL_LOG 445 446 mock_retriable_follow_job.return_value = MagicMock(status=finished_job_status) 447 448 mock_job_definition = MagicMock(spec=str) 449 mock_prepare_submission.return_value = mock_job_definition 450 original_status: str = STRUCTURAL_LOG.get("job_combined_status") 451 452 if validate_only: 453 lava_job_submitter.submit() 454 mock_retriable_follow_job.assert_not_called() 455 assert STRUCTURAL_LOG.get("job_combined_status") == original_status 456 return 457 458 try: 459 lava_job_submitter.submit() 460 461 except SystemExit as e: 462 assert e.code == expected_exit_code 463 464 assert STRUCTURAL_LOG["job_combined_status"] == expected_combined_status 465