• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2022 Collabora Limited
4# Author: Guilherme Gallo <guilherme.gallo@collabora.com>
5#
6# SPDX-License-Identifier: MIT
7
8import os
9import xmlrpc.client
10from contextlib import nullcontext as does_not_raise
11from datetime import UTC, datetime
12from itertools import cycle, islice, repeat
13from pathlib import Path
14from typing import Generator
15from unittest.mock import MagicMock, patch
16
17import pytest
18from lava.exceptions import MesaCIException, MesaCIRetryError, MesaCIFatalException
19from lava.lava_job_submitter import (
20    DEVICE_HANGING_TIMEOUT_SEC,
21    NUMBER_OF_RETRIES_TIMEOUT_DETECTION,
22    LAVAJob,
23    LAVAJobSubmitter,
24    bootstrap_log_follower,
25    follow_job_execution,
26    retriable_follow_job,
27    wait_for_job_get_started,
28)
29from lava.utils import LogSectionType
30
31from .lava.helpers import (
32    generate_n_logs,
33    generate_testsuite_result,
34    jobs_logs_response,
35    mock_lava_signal,
36    mock_logs,
37    section_timeout,
38)
39
40NUMBER_OF_MAX_ATTEMPTS = NUMBER_OF_RETRIES_TIMEOUT_DETECTION + 1
41
42
43@pytest.fixture
44def mock_proxy_waiting_time(mock_proxy):
45    def update_mock_proxy(frozen_time, **kwargs):
46        def mock_job_state(jid) -> dict[str, str]:
47            frozen_time.tick(wait_time)
48            return {"job_state": "Running"}
49
50        wait_time = kwargs.pop("wait_time", 1)
51        proxy_mock = mock_proxy(**kwargs)
52        proxy_job_state = proxy_mock.scheduler.job_state
53        proxy_job_state.side_effect = mock_job_state
54
55        return proxy_mock
56
57    return update_mock_proxy
58
59
60@pytest.fixture(params=[{"CI": "true"}, {"CI": "false"}], ids=["Under CI", "Local run"])
61def ci_environment(request):
62    with patch.dict(os.environ, request.param):
63        yield
64
65
66@pytest.fixture
67def lava_job_submitter(
68    ci_environment,
69    tmp_path,
70    mock_proxy,
71):
72    os.chdir(tmp_path)
73    tmp_file = Path(tmp_path) / "log.json"
74
75    with patch("lava.lava_job_submitter.setup_lava_proxy") as mock_setup_lava_proxy:
76        mock_setup_lava_proxy.return_value = mock_proxy()
77        yield LAVAJobSubmitter(
78            boot_method="test_boot",
79            device_type="test_device",
80            farm="test_farm",
81            job_timeout_min=1,
82            structured_log_file=tmp_file,
83        )
84
85
86@pytest.mark.parametrize("exception", [RuntimeError, SystemError, KeyError])
87def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception):
88    with pytest.raises(MesaCIException):
89        proxy = mock_proxy(side_effect=exception)
90        job = LAVAJob(proxy, "")
91        log_follower = bootstrap_log_follower(main_test_case="", timestamp_relative_to=None)
92        follow_job_execution(job, log_follower)
93
94
95NETWORK_EXCEPTION = xmlrpc.client.ProtocolError("", 0, "test", {})
96XMLRPC_FAULT = xmlrpc.client.Fault(0, "test")
97
98PROXY_SCENARIOS = {
99    "simple pass case": (mock_logs(result="pass", exit_code=0), does_not_raise(), "pass", 0, {}),
100    "simple fail case": (mock_logs(result="fail", exit_code=1), does_not_raise(), "fail", 1, {}),
101    "simple hung case": (
102        mock_logs(
103            messages={
104                LogSectionType.TEST_CASE: [
105                    section_timeout(LogSectionType.TEST_CASE) + 1
106                ]
107                * 1000
108            },
109            result="fail",
110            exit_code=1,
111        ),
112        pytest.raises(MesaCIRetryError),
113        "hung",
114        1,
115        {},
116    ),
117    "leftover dump from last job in boot section": (
118        (
119            mock_lava_signal(LogSectionType.LAVA_BOOT),
120            jobs_logs_response(finished=False, msg=None, result="fail", exit_code=1),
121        ),
122        pytest.raises(MesaCIRetryError),
123        "hung",
124        1,
125        {},
126    ),
127    "boot works at last retry": (
128        mock_logs(
129            messages={
130                LogSectionType.LAVA_BOOT: [
131                    section_timeout(LogSectionType.LAVA_BOOT) + 1
132                ]
133                * NUMBER_OF_RETRIES_TIMEOUT_DETECTION
134                + [1]
135            },
136            result="pass",
137            exit_code=0,
138        ),
139        does_not_raise(),
140        "pass",
141        0,
142        {},
143    ),
144    "test case took too long": pytest.param(
145        mock_logs(
146            messages={
147                LogSectionType.TEST_CASE: [
148                    section_timeout(LogSectionType.TEST_CASE) + 1
149                ]
150                * (NUMBER_OF_MAX_ATTEMPTS + 1)
151            },
152            result="pass",
153            exit_code=0,
154        ),
155        pytest.raises(MesaCIRetryError),
156        "pass",
157        0,
158        {},
159    ),
160    "timed out more times than retry attempts": (
161        generate_n_logs(n=4, tick_fn=9999999),
162        pytest.raises(MesaCIRetryError),
163        "fail",
164        1,
165        {},
166    ),
167    "long log case, no silence": (
168        mock_logs(
169            messages={LogSectionType.TEST_CASE: [1] * (1000)},
170            result="pass",
171            exit_code=0,
172        ),
173        does_not_raise(),
174        "pass",
175        0,
176        {},
177    ),
178    "no retries, testsuite succeed": (
179        mock_logs(result="pass", exit_code=0),
180        does_not_raise(),
181        "pass",
182        0,
183        {"testsuite_results": [generate_testsuite_result(result="pass", exit_code=0)]},
184    ),
185    "no retries, but testsuite fails": (
186        mock_logs(result="fail", exit_code=1),
187        does_not_raise(),
188        "fail",
189        1,
190        {"testsuite_results": [generate_testsuite_result(result="fail", exit_code=1)]},
191    ),
192    "no retries, one testsuite fails": (
193        mock_logs(result="fail", exit_code=1),
194        does_not_raise(),
195        "fail",
196        1,
197        {
198            "testsuite_results": [
199                generate_testsuite_result(result="fail", exit_code=1),
200                generate_testsuite_result(result="pass", exit_code=0),
201            ]
202        },
203    ),
204    "very long silence": (
205        generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS + 1, tick_fn=100000),
206        pytest.raises(MesaCIRetryError),
207        "fail",
208        1,
209        {},
210    ),
211    # If a protocol error happens, _call_proxy will retry without affecting timeouts
212    "unstable connection, ProtocolError followed by final message": (
213        (NETWORK_EXCEPTION, *list(mock_logs(result="pass", exit_code=0))),
214        does_not_raise(),
215        "pass",
216        0,
217        {},
218    ),
219    # After an arbitrary number of retries, _call_proxy should call sys.exit
220    "unreachable case, subsequent ProtocolErrors": (
221        repeat(NETWORK_EXCEPTION),
222        pytest.raises(SystemExit),
223        "fail",
224        1,
225        {},
226    ),
227    "XMLRPC Fault": ([XMLRPC_FAULT], pytest.raises(MesaCIRetryError), False, 1, {}),
228}
229
230
231@pytest.mark.parametrize(
232    "test_log, expectation, job_result, exit_code, proxy_args",
233    PROXY_SCENARIOS.values(),
234    ids=PROXY_SCENARIOS.keys(),
235)
236def test_retriable_follow_job(
237    mock_sleep,
238    test_log,
239    expectation,
240    job_result,
241    exit_code,
242    proxy_args,
243    mock_proxy,
244):
245    with expectation:
246        proxy = mock_proxy(side_effect=test_log, **proxy_args)
247        job: LAVAJob = retriable_follow_job(proxy, "", "", None)
248        assert job_result == job.status
249        assert exit_code == job.exit_code
250
251
252
253WAIT_FOR_JOB_SCENARIOS = {"one log run taking (sec):": (mock_logs(result="pass", exit_code=0))}
254
255@pytest.mark.parametrize("wait_time", (DEVICE_HANGING_TIMEOUT_SEC * 2,))
256@pytest.mark.parametrize(
257    "side_effect",
258    WAIT_FOR_JOB_SCENARIOS.values(),
259    ids=WAIT_FOR_JOB_SCENARIOS.keys(),
260)
261def test_simulate_a_long_wait_to_start_a_job(
262    frozen_time,
263    wait_time,
264    side_effect,
265    mock_proxy_waiting_time,
266):
267    start_time = datetime.now(tz=UTC)
268    job: LAVAJob = retriable_follow_job(
269        mock_proxy_waiting_time(
270            frozen_time, side_effect=side_effect, wait_time=wait_time
271        ),
272        "",
273        "",
274        None
275    )
276
277    end_time = datetime.now(tz=UTC)
278    delta_time = end_time - start_time
279
280    assert job.status == "pass"
281    assert job.exit_code == 0
282    assert delta_time.total_seconds() >= wait_time
283
284
285LONG_LAVA_QUEUE_SCENARIOS = {
286    "no_time_to_run": (0, pytest.raises(MesaCIFatalException)),
287    "enough_time_to_run": (9999999999, does_not_raise()),
288}
289
290
291@pytest.mark.parametrize(
292    "job_timeout, expectation",
293    LONG_LAVA_QUEUE_SCENARIOS.values(),
294    ids=LONG_LAVA_QUEUE_SCENARIOS.keys(),
295)
296def test_wait_for_job_get_started_no_time_to_run(monkeypatch, job_timeout, expectation):
297    monkeypatch.setattr("lava.lava_job_submitter.CI_JOB_TIMEOUT_SEC", job_timeout)
298    monkeypatch.setattr("lava.lava_job_submitter.CI_JOB_STARTED_AT", datetime.now(tz=UTC))
299    job = MagicMock()
300    # Make it escape the loop
301    job.is_started.side_effect = (False, False, True)
302    with expectation as e:
303        wait_for_job_get_started(job, 1)
304    if e:
305        job.cancel.assert_called_with()
306
307
308CORRUPTED_LOG_SCENARIOS = {
309    "too much subsequent corrupted data": (
310        [(False, "{'msg': 'Incomplete}")] * 100 + [jobs_logs_response(True)],
311        pytest.raises((MesaCIRetryError)),
312    ),
313    "one subsequent corrupted data": (
314        [(False, "{'msg': 'Incomplete}")] * 2 + [jobs_logs_response(True)],
315        does_not_raise(),
316    ),
317}
318
319
320@pytest.mark.parametrize(
321    "data_sequence, expected_exception",
322    CORRUPTED_LOG_SCENARIOS.values(),
323    ids=CORRUPTED_LOG_SCENARIOS.keys(),
324)
325def test_log_corruption(mock_sleep, data_sequence, expected_exception, mock_proxy):
326    proxy_mock = mock_proxy()
327    proxy_logs_mock = proxy_mock.scheduler.jobs.logs
328    proxy_logs_mock.side_effect = data_sequence
329    with expected_exception:
330        retriable_follow_job(proxy_mock, "", "", None)
331
332
333LAVA_RESULT_LOG_SCENARIOS = {
334    # the submitter should accept xtrace logs
335    "Bash xtrace echo with kmsg interleaving": (
336        "echo hwci: mesa: pass, exit_code: 0[  737.673352] <LAVA_SIGNAL_ENDTC mesa-ci>",
337        "pass", 0,
338    ),
339    # the submitter should accept xtrace logs
340    "kmsg result print": (
341        "[  737.673352] hwci: mesa: pass, exit_code: 0",
342        "pass", 0,
343    ),
344    # if the job result echo has a very bad luck, it still can be interleaved
345    # with kmsg
346    "echo output with kmsg interleaving": (
347        "hwci: mesa: pass, exit_code: 0[  737.673352] <LAVA_SIGNAL_ENDTC mesa-ci>",
348        "pass", 0,
349    ),
350    "fail case": (
351        "hwci: mesa: fail, exit_code: 1",
352        "fail", 1,
353    ),
354    # fail case with different exit code
355    "fail case (exit code 101)": (
356        "hwci: mesa: fail, exit_code: 101",
357        "fail", 101,
358    ),
359}
360
361
362@pytest.mark.parametrize(
363    "message, expected_status, expected_exit_code",
364    LAVA_RESULT_LOG_SCENARIOS.values(),
365    ids=LAVA_RESULT_LOG_SCENARIOS.keys(),
366)
367def test_parse_job_result_from_log(message, expected_status, expected_exit_code, mock_proxy):
368    job = LAVAJob(mock_proxy(), "")
369    job.parse_job_result_from_log([message])
370
371    assert job.status == expected_status
372    assert job.exit_code == expected_exit_code
373
374
375@pytest.mark.slow(
376    reason="Slow and sketchy test. Needs a LAVA log raw file at /tmp/log.yaml"
377)
378@pytest.mark.skipif(
379    not Path("/tmp/log.yaml").is_file(), reason="Missing /tmp/log.yaml file."
380)
381def test_full_yaml_log(mock_proxy, frozen_time, lava_job_submitter):
382    import random
383
384    from lavacli.utils import flow_yaml as lava_yaml
385
386    def time_travel_from_log_chunk(data_chunk):
387        if not data_chunk:
388            return
389
390        first_log = lava_yaml.load(data_chunk[0])[0]
391        first_log_time = first_log["dt"]
392        frozen_time.move_to(first_log_time)
393        yield
394
395        last_log = lava_yaml.load(data_chunk[-1])[0]
396        last_log_time = last_log["dt"]
397        frozen_time.move_to(last_log_time)
398        yield
399
400    def time_travel_to_test_time():
401        # Suppose that the first message timestamp of the entire LAVA job log is
402        # the same of from the job submitter execution
403        with open("/tmp/log.yaml", "r") as f:
404            first_log = f.readline()
405            first_log_time = lava_yaml.load(first_log)[0]["dt"]
406            frozen_time.move_to(first_log_time)
407
408    def load_lines() -> Generator[tuple[bool, str], None, None]:
409        with open("/tmp/log.yaml", "r") as f:
410            # data = yaml.safe_load(f)
411            log_lines = f.readlines()
412            serial_message: str = ""
413            chunk_start_line = 0
414            chunk_end_line = 0
415            chunk_max_size = 100
416            try:
417                while True:
418                    chunk_end_line = chunk_start_line + random.randint(1, chunk_max_size)
419                    # split the log in chunks of random size
420                    log_chunk = list(islice(log_lines, chunk_start_line, chunk_end_line))
421                    chunk_start_line = chunk_end_line + 1
422                    serial_message = "".join(log_chunk)
423                    # time_traveller_gen will make the time trave according to the timestamp from
424                    # the message
425                    time_traveller_gen = time_travel_from_log_chunk(log_chunk)
426                    # Suppose that the first message timestamp is the same of
427                    # log fetch RPC call
428                    next(time_traveller_gen)
429                    yield False, "[]"
430                    # Travel to the same datetime of the last fetched log line
431                    # in the chunk
432                    next(time_traveller_gen)
433                    yield False, serial_message
434            except StopIteration:
435                yield True, serial_message
436                return
437
438    proxy = mock_proxy()
439
440    def reset_logs(*args):
441        proxy.scheduler.jobs.logs.side_effect = load_lines()
442
443    proxy.scheduler.jobs.submit = reset_logs
444    try:
445        time_travel_to_test_time()
446        start_time = datetime.now(tz=UTC)
447        retriable_follow_job(proxy, "", "", None)
448    finally:
449        try:
450            # If the job fails, maybe there will be no structured log
451            print(lava_job_submitter.structured_log_file.read_text())
452        finally:
453            end_time = datetime.now(tz=UTC)
454            print("---- Reproduction log stats ----")
455            print(f"Start time: {start_time}")
456            print(f"End time: {end_time}")
457            print(f"Duration: {end_time - start_time}")
458
459
460@pytest.mark.parametrize(
461    "validate_only,finished_job_status,job_exit_code,expected_combined_status",
462    [
463        (True, "pass", None, None,),
464        (False, "pass", 0, "pass",),
465        (False, "fail", 1, "fail",),
466    ],
467    ids=[
468        "validate_only_no_job_submission",
469        "successful_job_submission",
470        "failed_job_submission",
471    ],
472)
473def test_job_combined_status(
474    mock_proxy,
475    lava_job_submitter,
476    validate_only,
477    finished_job_status,
478    job_exit_code,
479    expected_combined_status,
480):
481    lava_job_submitter.validate_only = validate_only
482
483    with patch(
484        "lava.lava_job_submitter.retriable_follow_job"
485    ) as mock_retriable_follow_job, patch(
486        "lava.lava_job_submitter.LAVAJobSubmitter._LAVAJobSubmitter__prepare_submission"
487    ) as mock_prepare_submission, patch("sys.exit"):
488        from lava.lava_job_submitter import STRUCTURAL_LOG
489
490        mock_retriable_follow_job.return_value = MagicMock(
491            status=finished_job_status, exit_code=job_exit_code
492        )
493
494        mock_job_definition = MagicMock(spec=str)
495        mock_prepare_submission.return_value = mock_job_definition
496        original_status: str = STRUCTURAL_LOG.get("job_combined_status")
497        original_exit_code: int = STRUCTURAL_LOG.get("job_exit_code")
498
499        if validate_only:
500            lava_job_submitter.submit()
501            mock_retriable_follow_job.assert_not_called()
502            assert STRUCTURAL_LOG.get("job_combined_status") == original_status
503            assert STRUCTURAL_LOG.get("job_exit_code") == original_exit_code
504            return
505
506        try:
507            lava_job_submitter.submit()
508
509        except SystemExit as e:
510            assert e.code == job_exit_code
511
512        assert STRUCTURAL_LOG["job_combined_status"] == expected_combined_status
513        assert STRUCTURAL_LOG["job_exit_code"] == job_exit_code
514
515
516SUBMIT_SCENARIOS = {
517    "submit job pass": (cycle(mock_logs(result="pass", exit_code=0)), does_not_raise(), 0),
518    "submit job fails": (
519        cycle(mock_logs(result="fail", exit_code=1)),
520        pytest.raises(SystemExit),
521        1,
522    ),
523    "user interrupts the script": (
524        (jobs_logs_response(), KeyboardInterrupt, jobs_logs_response()),
525        pytest.raises(SystemExit),
526        1,
527    ),
528    "job finishes without hwci response": (
529        (jobs_logs_response(), jobs_logs_response()),
530        pytest.raises(SystemExit),
531        1,
532    ),
533}
534
535
536@pytest.mark.parametrize(
537    "test_log, expectation, exit_code",
538    SUBMIT_SCENARIOS.values(),
539    ids=SUBMIT_SCENARIOS.keys(),
540)
541def test_submission_exit_code(
542    request, mock_proxy, lava_job_submitter, test_log, expectation, exit_code
543):
544    lava_job_submitter._LAVAJobSubmitter__prepare_submission = MagicMock()
545    proxy = mock_proxy(side_effect=test_log)
546    lava_job_submitter.proxy = proxy
547
548    with expectation as e:
549        lava_job_submitter.submit()
550        # If the job fails, there should be a SystemExit exception
551        if e:
552            assert e.value.code == exit_code
553