1#!/usr/bin/env python3 2# 3# Copyright (C) 2022 Collabora Limited 4# Author: Guilherme Gallo <guilherme.gallo@collabora.com> 5# 6# SPDX-License-Identifier: MIT 7 8import os 9import xmlrpc.client 10from contextlib import nullcontext as does_not_raise 11from datetime import UTC, datetime 12from itertools import cycle, islice, repeat 13from pathlib import Path 14from typing import Generator 15from unittest.mock import MagicMock, patch 16 17import pytest 18from lava.exceptions import MesaCIException, MesaCIRetryError, MesaCIFatalException 19from lava.lava_job_submitter import ( 20 DEVICE_HANGING_TIMEOUT_SEC, 21 NUMBER_OF_RETRIES_TIMEOUT_DETECTION, 22 LAVAJob, 23 LAVAJobSubmitter, 24 bootstrap_log_follower, 25 follow_job_execution, 26 retriable_follow_job, 27 wait_for_job_get_started, 28) 29from lava.utils import LogSectionType 30 31from .lava.helpers import ( 32 generate_n_logs, 33 generate_testsuite_result, 34 jobs_logs_response, 35 mock_lava_signal, 36 mock_logs, 37 section_timeout, 38) 39 40NUMBER_OF_MAX_ATTEMPTS = NUMBER_OF_RETRIES_TIMEOUT_DETECTION + 1 41 42 43@pytest.fixture 44def mock_proxy_waiting_time(mock_proxy): 45 def update_mock_proxy(frozen_time, **kwargs): 46 def mock_job_state(jid) -> dict[str, str]: 47 frozen_time.tick(wait_time) 48 return {"job_state": "Running"} 49 50 wait_time = kwargs.pop("wait_time", 1) 51 proxy_mock = mock_proxy(**kwargs) 52 proxy_job_state = proxy_mock.scheduler.job_state 53 proxy_job_state.side_effect = mock_job_state 54 55 return proxy_mock 56 57 return update_mock_proxy 58 59 60@pytest.fixture(params=[{"CI": "true"}, {"CI": "false"}], ids=["Under CI", "Local run"]) 61def ci_environment(request): 62 with patch.dict(os.environ, request.param): 63 yield 64 65 66@pytest.fixture 67def lava_job_submitter( 68 ci_environment, 69 tmp_path, 70 mock_proxy, 71): 72 os.chdir(tmp_path) 73 tmp_file = Path(tmp_path) / "log.json" 74 75 with patch("lava.lava_job_submitter.setup_lava_proxy") as mock_setup_lava_proxy: 76 mock_setup_lava_proxy.return_value = mock_proxy() 77 yield LAVAJobSubmitter( 78 boot_method="test_boot", 79 device_type="test_device", 80 farm="test_farm", 81 job_timeout_min=1, 82 structured_log_file=tmp_file, 83 ) 84 85 86@pytest.mark.parametrize("exception", [RuntimeError, SystemError, KeyError]) 87def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception): 88 with pytest.raises(MesaCIException): 89 proxy = mock_proxy(side_effect=exception) 90 job = LAVAJob(proxy, "") 91 log_follower = bootstrap_log_follower(main_test_case="", timestamp_relative_to=None) 92 follow_job_execution(job, log_follower) 93 94 95NETWORK_EXCEPTION = xmlrpc.client.ProtocolError("", 0, "test", {}) 96XMLRPC_FAULT = xmlrpc.client.Fault(0, "test") 97 98PROXY_SCENARIOS = { 99 "simple pass case": (mock_logs(result="pass", exit_code=0), does_not_raise(), "pass", 0, {}), 100 "simple fail case": (mock_logs(result="fail", exit_code=1), does_not_raise(), "fail", 1, {}), 101 "simple hung case": ( 102 mock_logs( 103 messages={ 104 LogSectionType.TEST_CASE: [ 105 section_timeout(LogSectionType.TEST_CASE) + 1 106 ] 107 * 1000 108 }, 109 result="fail", 110 exit_code=1, 111 ), 112 pytest.raises(MesaCIRetryError), 113 "hung", 114 1, 115 {}, 116 ), 117 "leftover dump from last job in boot section": ( 118 ( 119 mock_lava_signal(LogSectionType.LAVA_BOOT), 120 jobs_logs_response(finished=False, msg=None, result="fail", exit_code=1), 121 ), 122 pytest.raises(MesaCIRetryError), 123 "hung", 124 1, 125 {}, 126 ), 127 "boot works at last retry": ( 128 mock_logs( 129 messages={ 130 LogSectionType.LAVA_BOOT: [ 131 section_timeout(LogSectionType.LAVA_BOOT) + 1 132 ] 133 * NUMBER_OF_RETRIES_TIMEOUT_DETECTION 134 + [1] 135 }, 136 result="pass", 137 exit_code=0, 138 ), 139 does_not_raise(), 140 "pass", 141 0, 142 {}, 143 ), 144 "test case took too long": pytest.param( 145 mock_logs( 146 messages={ 147 LogSectionType.TEST_CASE: [ 148 section_timeout(LogSectionType.TEST_CASE) + 1 149 ] 150 * (NUMBER_OF_MAX_ATTEMPTS + 1) 151 }, 152 result="pass", 153 exit_code=0, 154 ), 155 pytest.raises(MesaCIRetryError), 156 "pass", 157 0, 158 {}, 159 ), 160 "timed out more times than retry attempts": ( 161 generate_n_logs(n=4, tick_fn=9999999), 162 pytest.raises(MesaCIRetryError), 163 "fail", 164 1, 165 {}, 166 ), 167 "long log case, no silence": ( 168 mock_logs( 169 messages={LogSectionType.TEST_CASE: [1] * (1000)}, 170 result="pass", 171 exit_code=0, 172 ), 173 does_not_raise(), 174 "pass", 175 0, 176 {}, 177 ), 178 "no retries, testsuite succeed": ( 179 mock_logs(result="pass", exit_code=0), 180 does_not_raise(), 181 "pass", 182 0, 183 {"testsuite_results": [generate_testsuite_result(result="pass", exit_code=0)]}, 184 ), 185 "no retries, but testsuite fails": ( 186 mock_logs(result="fail", exit_code=1), 187 does_not_raise(), 188 "fail", 189 1, 190 {"testsuite_results": [generate_testsuite_result(result="fail", exit_code=1)]}, 191 ), 192 "no retries, one testsuite fails": ( 193 mock_logs(result="fail", exit_code=1), 194 does_not_raise(), 195 "fail", 196 1, 197 { 198 "testsuite_results": [ 199 generate_testsuite_result(result="fail", exit_code=1), 200 generate_testsuite_result(result="pass", exit_code=0), 201 ] 202 }, 203 ), 204 "very long silence": ( 205 generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS + 1, tick_fn=100000), 206 pytest.raises(MesaCIRetryError), 207 "fail", 208 1, 209 {}, 210 ), 211 # If a protocol error happens, _call_proxy will retry without affecting timeouts 212 "unstable connection, ProtocolError followed by final message": ( 213 (NETWORK_EXCEPTION, *list(mock_logs(result="pass", exit_code=0))), 214 does_not_raise(), 215 "pass", 216 0, 217 {}, 218 ), 219 # After an arbitrary number of retries, _call_proxy should call sys.exit 220 "unreachable case, subsequent ProtocolErrors": ( 221 repeat(NETWORK_EXCEPTION), 222 pytest.raises(SystemExit), 223 "fail", 224 1, 225 {}, 226 ), 227 "XMLRPC Fault": ([XMLRPC_FAULT], pytest.raises(MesaCIRetryError), False, 1, {}), 228} 229 230 231@pytest.mark.parametrize( 232 "test_log, expectation, job_result, exit_code, proxy_args", 233 PROXY_SCENARIOS.values(), 234 ids=PROXY_SCENARIOS.keys(), 235) 236def test_retriable_follow_job( 237 mock_sleep, 238 test_log, 239 expectation, 240 job_result, 241 exit_code, 242 proxy_args, 243 mock_proxy, 244): 245 with expectation: 246 proxy = mock_proxy(side_effect=test_log, **proxy_args) 247 job: LAVAJob = retriable_follow_job(proxy, "", "", None) 248 assert job_result == job.status 249 assert exit_code == job.exit_code 250 251 252 253WAIT_FOR_JOB_SCENARIOS = {"one log run taking (sec):": (mock_logs(result="pass", exit_code=0))} 254 255@pytest.mark.parametrize("wait_time", (DEVICE_HANGING_TIMEOUT_SEC * 2,)) 256@pytest.mark.parametrize( 257 "side_effect", 258 WAIT_FOR_JOB_SCENARIOS.values(), 259 ids=WAIT_FOR_JOB_SCENARIOS.keys(), 260) 261def test_simulate_a_long_wait_to_start_a_job( 262 frozen_time, 263 wait_time, 264 side_effect, 265 mock_proxy_waiting_time, 266): 267 start_time = datetime.now(tz=UTC) 268 job: LAVAJob = retriable_follow_job( 269 mock_proxy_waiting_time( 270 frozen_time, side_effect=side_effect, wait_time=wait_time 271 ), 272 "", 273 "", 274 None 275 ) 276 277 end_time = datetime.now(tz=UTC) 278 delta_time = end_time - start_time 279 280 assert job.status == "pass" 281 assert job.exit_code == 0 282 assert delta_time.total_seconds() >= wait_time 283 284 285LONG_LAVA_QUEUE_SCENARIOS = { 286 "no_time_to_run": (0, pytest.raises(MesaCIFatalException)), 287 "enough_time_to_run": (9999999999, does_not_raise()), 288} 289 290 291@pytest.mark.parametrize( 292 "job_timeout, expectation", 293 LONG_LAVA_QUEUE_SCENARIOS.values(), 294 ids=LONG_LAVA_QUEUE_SCENARIOS.keys(), 295) 296def test_wait_for_job_get_started_no_time_to_run(monkeypatch, job_timeout, expectation): 297 monkeypatch.setattr("lava.lava_job_submitter.CI_JOB_TIMEOUT_SEC", job_timeout) 298 monkeypatch.setattr("lava.lava_job_submitter.CI_JOB_STARTED_AT", datetime.now(tz=UTC)) 299 job = MagicMock() 300 # Make it escape the loop 301 job.is_started.side_effect = (False, False, True) 302 with expectation as e: 303 wait_for_job_get_started(job, 1) 304 if e: 305 job.cancel.assert_called_with() 306 307 308CORRUPTED_LOG_SCENARIOS = { 309 "too much subsequent corrupted data": ( 310 [(False, "{'msg': 'Incomplete}")] * 100 + [jobs_logs_response(True)], 311 pytest.raises((MesaCIRetryError)), 312 ), 313 "one subsequent corrupted data": ( 314 [(False, "{'msg': 'Incomplete}")] * 2 + [jobs_logs_response(True)], 315 does_not_raise(), 316 ), 317} 318 319 320@pytest.mark.parametrize( 321 "data_sequence, expected_exception", 322 CORRUPTED_LOG_SCENARIOS.values(), 323 ids=CORRUPTED_LOG_SCENARIOS.keys(), 324) 325def test_log_corruption(mock_sleep, data_sequence, expected_exception, mock_proxy): 326 proxy_mock = mock_proxy() 327 proxy_logs_mock = proxy_mock.scheduler.jobs.logs 328 proxy_logs_mock.side_effect = data_sequence 329 with expected_exception: 330 retriable_follow_job(proxy_mock, "", "", None) 331 332 333LAVA_RESULT_LOG_SCENARIOS = { 334 # the submitter should accept xtrace logs 335 "Bash xtrace echo with kmsg interleaving": ( 336 "echo hwci: mesa: pass, exit_code: 0[ 737.673352] <LAVA_SIGNAL_ENDTC mesa-ci>", 337 "pass", 0, 338 ), 339 # the submitter should accept xtrace logs 340 "kmsg result print": ( 341 "[ 737.673352] hwci: mesa: pass, exit_code: 0", 342 "pass", 0, 343 ), 344 # if the job result echo has a very bad luck, it still can be interleaved 345 # with kmsg 346 "echo output with kmsg interleaving": ( 347 "hwci: mesa: pass, exit_code: 0[ 737.673352] <LAVA_SIGNAL_ENDTC mesa-ci>", 348 "pass", 0, 349 ), 350 "fail case": ( 351 "hwci: mesa: fail, exit_code: 1", 352 "fail", 1, 353 ), 354 # fail case with different exit code 355 "fail case (exit code 101)": ( 356 "hwci: mesa: fail, exit_code: 101", 357 "fail", 101, 358 ), 359} 360 361 362@pytest.mark.parametrize( 363 "message, expected_status, expected_exit_code", 364 LAVA_RESULT_LOG_SCENARIOS.values(), 365 ids=LAVA_RESULT_LOG_SCENARIOS.keys(), 366) 367def test_parse_job_result_from_log(message, expected_status, expected_exit_code, mock_proxy): 368 job = LAVAJob(mock_proxy(), "") 369 job.parse_job_result_from_log([message]) 370 371 assert job.status == expected_status 372 assert job.exit_code == expected_exit_code 373 374 375@pytest.mark.slow( 376 reason="Slow and sketchy test. Needs a LAVA log raw file at /tmp/log.yaml" 377) 378@pytest.mark.skipif( 379 not Path("/tmp/log.yaml").is_file(), reason="Missing /tmp/log.yaml file." 380) 381def test_full_yaml_log(mock_proxy, frozen_time, lava_job_submitter): 382 import random 383 384 from lavacli.utils import flow_yaml as lava_yaml 385 386 def time_travel_from_log_chunk(data_chunk): 387 if not data_chunk: 388 return 389 390 first_log = lava_yaml.load(data_chunk[0])[0] 391 first_log_time = first_log["dt"] 392 frozen_time.move_to(first_log_time) 393 yield 394 395 last_log = lava_yaml.load(data_chunk[-1])[0] 396 last_log_time = last_log["dt"] 397 frozen_time.move_to(last_log_time) 398 yield 399 400 def time_travel_to_test_time(): 401 # Suppose that the first message timestamp of the entire LAVA job log is 402 # the same of from the job submitter execution 403 with open("/tmp/log.yaml", "r") as f: 404 first_log = f.readline() 405 first_log_time = lava_yaml.load(first_log)[0]["dt"] 406 frozen_time.move_to(first_log_time) 407 408 def load_lines() -> Generator[tuple[bool, str], None, None]: 409 with open("/tmp/log.yaml", "r") as f: 410 # data = yaml.safe_load(f) 411 log_lines = f.readlines() 412 serial_message: str = "" 413 chunk_start_line = 0 414 chunk_end_line = 0 415 chunk_max_size = 100 416 try: 417 while True: 418 chunk_end_line = chunk_start_line + random.randint(1, chunk_max_size) 419 # split the log in chunks of random size 420 log_chunk = list(islice(log_lines, chunk_start_line, chunk_end_line)) 421 chunk_start_line = chunk_end_line + 1 422 serial_message = "".join(log_chunk) 423 # time_traveller_gen will make the time trave according to the timestamp from 424 # the message 425 time_traveller_gen = time_travel_from_log_chunk(log_chunk) 426 # Suppose that the first message timestamp is the same of 427 # log fetch RPC call 428 next(time_traveller_gen) 429 yield False, "[]" 430 # Travel to the same datetime of the last fetched log line 431 # in the chunk 432 next(time_traveller_gen) 433 yield False, serial_message 434 except StopIteration: 435 yield True, serial_message 436 return 437 438 proxy = mock_proxy() 439 440 def reset_logs(*args): 441 proxy.scheduler.jobs.logs.side_effect = load_lines() 442 443 proxy.scheduler.jobs.submit = reset_logs 444 try: 445 time_travel_to_test_time() 446 start_time = datetime.now(tz=UTC) 447 retriable_follow_job(proxy, "", "", None) 448 finally: 449 try: 450 # If the job fails, maybe there will be no structured log 451 print(lava_job_submitter.structured_log_file.read_text()) 452 finally: 453 end_time = datetime.now(tz=UTC) 454 print("---- Reproduction log stats ----") 455 print(f"Start time: {start_time}") 456 print(f"End time: {end_time}") 457 print(f"Duration: {end_time - start_time}") 458 459 460@pytest.mark.parametrize( 461 "validate_only,finished_job_status,job_exit_code,expected_combined_status", 462 [ 463 (True, "pass", None, None,), 464 (False, "pass", 0, "pass",), 465 (False, "fail", 1, "fail",), 466 ], 467 ids=[ 468 "validate_only_no_job_submission", 469 "successful_job_submission", 470 "failed_job_submission", 471 ], 472) 473def test_job_combined_status( 474 mock_proxy, 475 lava_job_submitter, 476 validate_only, 477 finished_job_status, 478 job_exit_code, 479 expected_combined_status, 480): 481 lava_job_submitter.validate_only = validate_only 482 483 with patch( 484 "lava.lava_job_submitter.retriable_follow_job" 485 ) as mock_retriable_follow_job, patch( 486 "lava.lava_job_submitter.LAVAJobSubmitter._LAVAJobSubmitter__prepare_submission" 487 ) as mock_prepare_submission, patch("sys.exit"): 488 from lava.lava_job_submitter import STRUCTURAL_LOG 489 490 mock_retriable_follow_job.return_value = MagicMock( 491 status=finished_job_status, exit_code=job_exit_code 492 ) 493 494 mock_job_definition = MagicMock(spec=str) 495 mock_prepare_submission.return_value = mock_job_definition 496 original_status: str = STRUCTURAL_LOG.get("job_combined_status") 497 original_exit_code: int = STRUCTURAL_LOG.get("job_exit_code") 498 499 if validate_only: 500 lava_job_submitter.submit() 501 mock_retriable_follow_job.assert_not_called() 502 assert STRUCTURAL_LOG.get("job_combined_status") == original_status 503 assert STRUCTURAL_LOG.get("job_exit_code") == original_exit_code 504 return 505 506 try: 507 lava_job_submitter.submit() 508 509 except SystemExit as e: 510 assert e.code == job_exit_code 511 512 assert STRUCTURAL_LOG["job_combined_status"] == expected_combined_status 513 assert STRUCTURAL_LOG["job_exit_code"] == job_exit_code 514 515 516SUBMIT_SCENARIOS = { 517 "submit job pass": (cycle(mock_logs(result="pass", exit_code=0)), does_not_raise(), 0), 518 "submit job fails": ( 519 cycle(mock_logs(result="fail", exit_code=1)), 520 pytest.raises(SystemExit), 521 1, 522 ), 523 "user interrupts the script": ( 524 (jobs_logs_response(), KeyboardInterrupt, jobs_logs_response()), 525 pytest.raises(SystemExit), 526 1, 527 ), 528 "job finishes without hwci response": ( 529 (jobs_logs_response(), jobs_logs_response()), 530 pytest.raises(SystemExit), 531 1, 532 ), 533} 534 535 536@pytest.mark.parametrize( 537 "test_log, expectation, exit_code", 538 SUBMIT_SCENARIOS.values(), 539 ids=SUBMIT_SCENARIOS.keys(), 540) 541def test_submission_exit_code( 542 request, mock_proxy, lava_job_submitter, test_log, expectation, exit_code 543): 544 lava_job_submitter._LAVAJobSubmitter__prepare_submission = MagicMock() 545 proxy = mock_proxy(side_effect=test_log) 546 lava_job_submitter.proxy = proxy 547 548 with expectation as e: 549 lava_job_submitter.submit() 550 # If the job fails, there should be a SystemExit exception 551 if e: 552 assert e.value.code == exit_code 553