• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2022 Collabora Limited
4# Author: Guilherme Gallo <guilherme.gallo@collabora.com>
5#
6# SPDX-License-Identifier: MIT
7
8import os
9import xmlrpc.client
10from contextlib import nullcontext as does_not_raise
11from datetime import datetime
12from itertools import islice, repeat
13from pathlib import Path
14from typing import Generator
15from unittest.mock import MagicMock, patch
16
17import pytest
18from lava.exceptions import MesaCIException, MesaCIRetryError
19from lava.lava_job_submitter import (
20    DEVICE_HANGING_TIMEOUT_SEC,
21    NUMBER_OF_RETRIES_TIMEOUT_DETECTION,
22    LAVAJob,
23    LAVAJobSubmitter,
24    bootstrap_log_follower,
25    follow_job_execution,
26    retriable_follow_job,
27)
28from lava.utils import LogSectionType
29
30from .lava.helpers import (
31    generate_n_logs,
32    generate_testsuite_result,
33    jobs_logs_response,
34    mock_lava_signal,
35    mock_logs,
36    section_timeout,
37)
38
39NUMBER_OF_MAX_ATTEMPTS = NUMBER_OF_RETRIES_TIMEOUT_DETECTION + 1
40
41
42@pytest.fixture
43def mock_proxy_waiting_time(mock_proxy):
44    def update_mock_proxy(frozen_time, **kwargs):
45        wait_time = kwargs.pop("wait_time", 1)
46        proxy_mock = mock_proxy(**kwargs)
47        proxy_job_state = proxy_mock.scheduler.job_state
48        proxy_job_state.return_value = {"job_state": "Running"}
49        proxy_job_state.side_effect = frozen_time.tick(wait_time)
50
51        return proxy_mock
52
53    return update_mock_proxy
54
55
56@pytest.fixture(params=[{"CI": "true"}, {"CI": "false"}], ids=["Under CI", "Local run"])
57def ci_environment(request):
58    with patch.dict(os.environ, request.param):
59        yield
60
61
62@pytest.fixture
63def lava_job_submitter(
64    ci_environment,
65    tmp_path,
66    mock_proxy,
67):
68    os.chdir(tmp_path)
69    tmp_file = Path(tmp_path) / "log.json"
70
71    with patch("lava.lava_job_submitter.setup_lava_proxy") as mock_setup_lava_proxy:
72        mock_setup_lava_proxy.return_value = mock_proxy()
73        yield LAVAJobSubmitter(
74            boot_method="test_boot",
75            ci_project_dir="test_dir",
76            device_type="test_device",
77            job_timeout_min=1,
78            structured_log_file=tmp_file,
79        )
80
81
82@pytest.mark.parametrize("exception", [RuntimeError, SystemError, KeyError])
83def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception):
84    with pytest.raises(MesaCIException):
85        proxy = mock_proxy(side_effect=exception)
86        job = LAVAJob(proxy, '')
87        log_follower = bootstrap_log_follower()
88        follow_job_execution(job, log_follower)
89
90
91NETWORK_EXCEPTION = xmlrpc.client.ProtocolError("", 0, "test", {})
92XMLRPC_FAULT = xmlrpc.client.Fault(0, "test")
93
94PROXY_SCENARIOS = {
95    "simple pass case": (mock_logs(result="pass"), does_not_raise(), "pass", {}),
96    "simple fail case": (mock_logs(result="fail"), does_not_raise(), "fail", {}),
97    "simple hung case": (
98        mock_logs(
99            messages={
100                LogSectionType.TEST_CASE: [
101                    section_timeout(LogSectionType.TEST_CASE) + 1
102                ]
103                * 1000
104            },
105            result="fail",
106        ),
107        pytest.raises(MesaCIRetryError),
108        "hung",
109        {},
110    ),
111    "leftover dump from last job in boot section": (
112        (
113            mock_lava_signal(LogSectionType.LAVA_BOOT),
114            jobs_logs_response(finished=False, msg=None, result="fail"),
115        ),
116        pytest.raises(MesaCIRetryError),
117        "hung",
118        {},
119    ),
120    "boot works at last retry": (
121        mock_logs(
122            messages={
123                LogSectionType.LAVA_BOOT: [
124                    section_timeout(LogSectionType.LAVA_BOOT) + 1
125                ]
126                * NUMBER_OF_RETRIES_TIMEOUT_DETECTION
127                + [1]
128            },
129            result="pass",
130        ),
131        does_not_raise(),
132        "pass",
133        {},
134    ),
135    "test case took too long": pytest.param(
136        mock_logs(
137            messages={
138                LogSectionType.TEST_CASE: [
139                    section_timeout(LogSectionType.TEST_CASE) + 1
140                ]
141                * (NUMBER_OF_MAX_ATTEMPTS + 1)
142            },
143            result="pass",
144        ),
145        pytest.raises(MesaCIRetryError),
146        "pass",
147        {},
148    ),
149    "timed out more times than retry attempts": (
150        generate_n_logs(n=4, tick_fn=9999999),
151        pytest.raises(MesaCIRetryError),
152        "fail",
153        {},
154    ),
155    "long log case, no silence": (
156        mock_logs(
157            messages={LogSectionType.TEST_CASE: [1] * (1000)},
158            result="pass",
159        ),
160        does_not_raise(),
161        "pass",
162        {},
163    ),
164    "no retries, testsuite succeed": (
165        mock_logs(result="pass"),
166        does_not_raise(),
167        "pass",
168        {
169            "testsuite_results": [
170                generate_testsuite_result(result="pass")
171            ]
172        },
173    ),
174    "no retries, but testsuite fails": (
175        mock_logs(result="fail"),
176        does_not_raise(),
177        "fail",
178        {
179            "testsuite_results": [
180                generate_testsuite_result(result="fail")
181            ]
182        },
183    ),
184    "no retries, one testsuite fails": (
185        generate_n_logs(n=1, tick_fn=0, result="fail"),
186        does_not_raise(),
187        "fail",
188        {
189            "testsuite_results": [
190                generate_testsuite_result(result="fail"),
191                generate_testsuite_result(result="pass")
192            ]
193        },
194    ),
195    "very long silence": (
196        generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS + 1, tick_fn=100000),
197        pytest.raises(MesaCIRetryError),
198        "fail",
199        {},
200    ),
201    # If a protocol error happens, _call_proxy will retry without affecting timeouts
202    "unstable connection, ProtocolError followed by final message": (
203        (NETWORK_EXCEPTION, *list(mock_logs(result="pass"))),
204        does_not_raise(),
205        "pass",
206        {},
207    ),
208    # After an arbitrary number of retries, _call_proxy should call sys.exit
209    "unreachable case, subsequent ProtocolErrors": (
210        repeat(NETWORK_EXCEPTION),
211        pytest.raises(SystemExit),
212        "fail",
213        {},
214    ),
215    "XMLRPC Fault": ([XMLRPC_FAULT], pytest.raises(MesaCIRetryError), False, {}),
216}
217
218
219@pytest.mark.parametrize(
220    "test_log, expectation, job_result, proxy_args",
221    PROXY_SCENARIOS.values(),
222    ids=PROXY_SCENARIOS.keys(),
223)
224def test_retriable_follow_job(
225    mock_sleep,
226    test_log,
227    expectation,
228    job_result,
229    proxy_args,
230    mock_proxy,
231):
232    with expectation:
233        proxy = mock_proxy(side_effect=test_log, **proxy_args)
234        job: LAVAJob = retriable_follow_job(proxy, "")
235        assert job_result == job.status
236
237
238WAIT_FOR_JOB_SCENARIOS = {"one log run taking (sec):": (mock_logs(result="pass"))}
239
240
241@pytest.mark.parametrize("wait_time", (DEVICE_HANGING_TIMEOUT_SEC * 2,))
242@pytest.mark.parametrize(
243    "side_effect",
244    WAIT_FOR_JOB_SCENARIOS.values(),
245    ids=WAIT_FOR_JOB_SCENARIOS.keys(),
246)
247def test_simulate_a_long_wait_to_start_a_job(
248    frozen_time,
249    wait_time,
250    side_effect,
251    mock_proxy_waiting_time,
252):
253    start_time = datetime.now()
254    job: LAVAJob = retriable_follow_job(
255        mock_proxy_waiting_time(
256            frozen_time, side_effect=side_effect, wait_time=wait_time
257        ),
258        "",
259    )
260
261    end_time = datetime.now()
262    delta_time = end_time - start_time
263
264    assert job.status == "pass"
265    assert delta_time.total_seconds() >= wait_time
266
267
268
269CORRUPTED_LOG_SCENARIOS = {
270    "too much subsequent corrupted data": (
271        [(False, "{'msg': 'Incomplete}")] * 100 + [jobs_logs_response(True)],
272        pytest.raises((MesaCIRetryError)),
273    ),
274    "one subsequent corrupted data": (
275        [(False, "{'msg': 'Incomplete}")] * 2 + [jobs_logs_response(True)],
276        does_not_raise(),
277    ),
278}
279
280
281@pytest.mark.parametrize(
282    "data_sequence, expected_exception",
283    CORRUPTED_LOG_SCENARIOS.values(),
284    ids=CORRUPTED_LOG_SCENARIOS.keys(),
285)
286def test_log_corruption(mock_sleep, data_sequence, expected_exception, mock_proxy):
287    proxy_mock = mock_proxy()
288    proxy_logs_mock = proxy_mock.scheduler.jobs.logs
289    proxy_logs_mock.side_effect = data_sequence
290    with expected_exception:
291        retriable_follow_job(proxy_mock, "")
292
293
294LAVA_RESULT_LOG_SCENARIOS = {
295    # the submitter should accept xtrace logs
296    "Bash xtrace echo with kmsg interleaving": (
297        "echo hwci: mesa: pass[  737.673352] <LAVA_SIGNAL_ENDTC mesa-ci>",
298        "pass",
299    ),
300    # the submitter should accept xtrace logs
301    "kmsg result print": (
302        "[  737.673352] hwci: mesa: pass",
303        "pass",
304    ),
305    # if the job result echo has a very bad luck, it still can be interleaved
306    # with kmsg
307    "echo output with kmsg interleaving": (
308        "hwci: mesa: pass[  737.673352] <LAVA_SIGNAL_ENDTC mesa-ci>",
309        "pass",
310    ),
311    "fail case": (
312        "hwci: mesa: fail",
313        "fail",
314    ),
315}
316
317
318@pytest.mark.parametrize(
319    "message, expectation",
320    LAVA_RESULT_LOG_SCENARIOS.values(),
321    ids=LAVA_RESULT_LOG_SCENARIOS.keys(),
322)
323def test_parse_job_result_from_log(message, expectation, mock_proxy):
324    job = LAVAJob(mock_proxy(), "")
325    job.parse_job_result_from_log([message])
326
327    assert job.status == expectation
328
329
330@pytest.mark.slow(
331    reason="Slow and sketchy test. Needs a LAVA log raw file at /tmp/log.yaml"
332)
333@pytest.mark.skipif(
334    not Path("/tmp/log.yaml").is_file(), reason="Missing /tmp/log.yaml file."
335)
336def test_full_yaml_log(mock_proxy, frozen_time, lava_job_submitter):
337    import random
338
339    from lavacli.utils import flow_yaml as lava_yaml
340
341    def time_travel_from_log_chunk(data_chunk):
342        if not data_chunk:
343            return
344
345        first_log = lava_yaml.load(data_chunk[0])[0]
346        first_log_time = first_log["dt"]
347        frozen_time.move_to(first_log_time)
348        yield
349
350        last_log = lava_yaml.load(data_chunk[-1])[0]
351        last_log_time = last_log["dt"]
352        frozen_time.move_to(last_log_time)
353        yield
354
355    def time_travel_to_test_time():
356        # Suppose that the first message timestamp of the entire LAVA job log is
357        # the same of from the job submitter execution
358        with open("/tmp/log.yaml", "r") as f:
359            first_log = f.readline()
360            first_log_time = lava_yaml.load(first_log)[0]["dt"]
361            frozen_time.move_to(first_log_time)
362
363    def load_lines() -> Generator[tuple[bool, str], None, None]:
364        with open("/tmp/log.yaml", "r") as f:
365            # data = yaml.safe_load(f)
366            log_lines = f.readlines()
367            serial_message: str = ""
368            chunk_start_line = 0
369            chunk_end_line = 0
370            chunk_max_size = 100
371            try:
372                while True:
373                    chunk_end_line = chunk_start_line + random.randint(1, chunk_max_size)
374                    # split the log in chunks of random size
375                    log_chunk = list(islice(log_lines, chunk_start_line, chunk_end_line))
376                    chunk_start_line = chunk_end_line + 1
377                    serial_message = "".join(log_chunk)
378                    # time_traveller_gen will make the time trave according to the timestamp from
379                    # the message
380                    time_traveller_gen = time_travel_from_log_chunk(log_chunk)
381                    # Suppose that the first message timestamp is the same of
382                    # log fetch RPC call
383                    next(time_traveller_gen)
384                    yield False, "[]"
385                    # Travel to the same datetime of the last fetched log line
386                    # in the chunk
387                    next(time_traveller_gen)
388                    yield False, serial_message
389            except StopIteration:
390                yield True, serial_message
391                return
392
393    proxy = mock_proxy()
394
395    def reset_logs(*args):
396        proxy.scheduler.jobs.logs.side_effect = load_lines()
397
398    proxy.scheduler.jobs.submit = reset_logs
399    try:
400        time_travel_to_test_time()
401        start_time = datetime.now()
402        retriable_follow_job(proxy, "")
403    finally:
404        try:
405            # If the job fails, maybe there will be no structured log
406            print(lava_job_submitter.structured_log_file.read_text())
407        finally:
408            end_time = datetime.now()
409            print("---- Reproduction log stats ----")
410            print(f"Start time: {start_time}")
411            print(f"End time: {end_time}")
412            print(f"Duration: {end_time - start_time}")
413
414
415@pytest.mark.parametrize(
416    "validate_only,finished_job_status,expected_combined_status,expected_exit_code",
417    [
418        (True, "pass", None, None),
419        (False, "pass", "pass", 0),
420        (False, "fail", "fail", 1),
421    ],
422    ids=[
423        "validate_only_no_job_submission",
424        "successful_job_submission",
425        "failed_job_submission",
426    ],
427)
428def test_job_combined_status(
429    lava_job_submitter,
430    validate_only,
431    finished_job_status,
432    expected_combined_status,
433    expected_exit_code,
434):
435    lava_job_submitter.validate_only = validate_only
436
437    with patch(
438        "lava.lava_job_submitter.retriable_follow_job"
439    ) as mock_retriable_follow_job, patch(
440        "lava.lava_job_submitter.LAVAJobSubmitter._LAVAJobSubmitter__prepare_submission"
441    ) as mock_prepare_submission, patch(
442        "sys.exit"
443    ):
444        from lava.lava_job_submitter import STRUCTURAL_LOG
445
446        mock_retriable_follow_job.return_value = MagicMock(status=finished_job_status)
447
448        mock_job_definition = MagicMock(spec=str)
449        mock_prepare_submission.return_value = mock_job_definition
450        original_status: str = STRUCTURAL_LOG.get("job_combined_status")
451
452        if validate_only:
453            lava_job_submitter.submit()
454            mock_retriable_follow_job.assert_not_called()
455            assert STRUCTURAL_LOG.get("job_combined_status") == original_status
456            return
457
458        try:
459            lava_job_submitter.submit()
460
461        except SystemExit as e:
462            assert e.code == expected_exit_code
463
464        assert STRUCTURAL_LOG["job_combined_status"] == expected_combined_status
465