• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 - 2023 Collabora Limited
4# Authors:
5#     Gustavo Padovan <gustavo.padovan@collabora.com>
6#     Guilherme Gallo <guilherme.gallo@collabora.com>
7#
8# SPDX-License-Identifier: MIT
9
10"""Send a job to LAVA, track it and collect log back"""
11
12import contextlib
13import json
14import pathlib
15import sys
16import time
17from collections import defaultdict
18from dataclasses import dataclass, field, fields
19from datetime import datetime, timedelta, UTC
20from os import environ, getenv
21from typing import Any, Optional, Self
22
23import fire
24from lavacli.utils import flow_yaml as lava_yaml
25
26from lava.exceptions import (
27    MesaCIException,
28    MesaCIFatalException,
29    MesaCIRetriableException,
30    MesaCIParseException,
31    MesaCIRetryError,
32    MesaCITimeoutError,
33)
34from lava.utils import (
35    CONSOLE_LOG,
36    GitlabSection,
37    LAVAJob,
38    LAVAJobDefinition,
39    LogFollower,
40    LogSectionType,
41    call_proxy,
42    fatal_err,
43    hide_sensitive_data,
44    print_log,
45    setup_lava_proxy,
46)
47from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS
48
49# Initialize structural logging with a defaultdict, it can be changed for more
50# sophisticated dict-like data abstractions.
51STRUCTURAL_LOG = defaultdict(list)
52
53try:
54    from structured_logger import StructuredLogger
55except ImportError as e:
56    print_log(
57        f"Could not import StructuredLogger library: {e}. "
58        "Falling back to defaultdict based structured logger."
59    )
60
61# Timeout in seconds to decide if the device from the dispatched LAVA job has
62# hung or not due to the lack of new log output.
63DEVICE_HANGING_TIMEOUT_SEC = int(getenv("DEVICE_HANGING_TIMEOUT_SEC", 5 * 60))
64
65# How many seconds the script should wait before try a new polling iteration to
66# check if the dispatched LAVA job is running or waiting in the job queue.
67WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(
68    getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 1)
69)
70
71# How many seconds the script will wait to let LAVA finalize the job and give
72# the final details.
73WAIT_FOR_LAVA_POST_PROCESSING_SEC = int(getenv("LAVA_WAIT_LAVA_POST_PROCESSING_SEC", 5))
74WAIT_FOR_LAVA_POST_PROCESSING_RETRIES = int(
75    getenv("LAVA_WAIT_LAVA_POST_PROCESSING_RETRIES", 6)
76)
77
78# How many seconds to wait between log output LAVA RPC calls.
79LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5))
80
81# How many retries should be made when a timeout happen.
82NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(
83    getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2)
84)
85
86CI_JOB_TIMEOUT_SEC = int(getenv("CI_JOB_TIMEOUT", 3600))
87# How many seconds the script will wait to let LAVA run the job and give the final details.
88EXPECTED_JOB_DURATION_SEC = int(getenv("EXPECTED_JOB_DURATION_SEC", 60 * 10))
89# CI_JOB_STARTED is given by GitLab CI/CD in UTC timezone by default.
90CI_JOB_STARTED_AT_RAW = getenv("CI_JOB_STARTED_AT", "")
91CI_JOB_STARTED_AT: datetime = (
92    datetime.fromisoformat(CI_JOB_STARTED_AT_RAW)
93    if CI_JOB_STARTED_AT_RAW
94    else datetime.now(tz=UTC)
95)
96
97
98def raise_exception_from_metadata(metadata: dict, job_id: int) -> None:
99    """
100    Investigate infrastructure errors from the job metadata.
101    If it finds an error, raise it as MesaCIRetriableException.
102    """
103    if "result" not in metadata or metadata["result"] != "fail":
104        return
105    if "error_type" in metadata:
106        error_type: str = metadata["error_type"]
107        error_msg: str = metadata.get("error_msg", "")
108        full_err_msg: str = error_type if not error_msg else f"{error_type}: {error_msg}"
109        if error_type == "Job":
110            # This happens when LAVA assumes that the job cannot terminate or
111            # with mal-formed job definitions. As we are always validating the
112            # jobs, only the former is probable to happen. E.g.: When some LAVA
113            # action timed out more times than expected in job definition.
114            raise MesaCIRetriableException(
115                f"LAVA job {job_id} failed with {full_err_msg}. Retry."
116                "(possible LAVA timeout misconfiguration/bug). Retry."
117            )
118        if error_type:
119            raise MesaCIRetriableException(
120                f"LAVA job {job_id} failed with error type: {full_err_msg}. Retry."
121            )
122    if "case" in metadata and metadata["case"] == "validate":
123        raise MesaCIRetriableException(
124            f"LAVA job {job_id} failed validation (possible download error). Retry."
125        )
126
127
128def raise_lava_error(job) -> None:
129    # Look for infrastructure errors, raise them, and retry if we see them.
130    results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
131    results = lava_yaml.load(results_yaml)
132    for res in results:
133        metadata = res["metadata"]
134        raise_exception_from_metadata(metadata, job.job_id)
135
136    # If we reach this far, it means that the job ended without hwci script
137    # result and no LAVA infrastructure problem was found
138    job.status = "fail"
139
140
141
142def fetch_logs(job, max_idle_time, log_follower) -> None:
143    is_job_hanging(job, max_idle_time)
144
145    time.sleep(LOG_POLLING_TIME_SEC)
146    new_log_lines = fetch_new_log_lines(job)
147    parsed_lines = parse_log_lines(job, log_follower, new_log_lines)
148
149    for line in parsed_lines:
150        print_log(line)
151
152
153def is_job_hanging(job, max_idle_time):
154    # Poll to check for new logs, assuming that a prolonged period of
155    # silence means that the device has died and we should try it again
156    if datetime.now(tz=UTC) - job.last_log_time > max_idle_time:
157        max_idle_time_min = max_idle_time.total_seconds() / 60
158
159        raise MesaCITimeoutError(
160            f"{CONSOLE_LOG['FG_BOLD_YELLOW']}"
161            f"LAVA job {job.job_id} unresponsive for {max_idle_time_min} "
162            "minutes; retrying the job."
163            f"{CONSOLE_LOG['RESET']}",
164            timeout_duration=max_idle_time,
165        )
166
167
168def parse_log_lines(job, log_follower, new_log_lines):
169    if log_follower.feed(new_log_lines):
170        # If we had non-empty log data, we can assure that the device is alive.
171        job.heartbeat()
172    parsed_lines = log_follower.flush()
173
174    # Only parse job results when the script reaches the end of the logs.
175    # Depending on how much payload the RPC scheduler.jobs.logs get, it may
176    # reach the LAVA_POST_PROCESSING phase.
177    if log_follower.current_section.type in (
178        LogSectionType.TEST_CASE,
179        LogSectionType.LAVA_POST_PROCESSING,
180    ):
181        parsed_lines = job.parse_job_result_from_log(parsed_lines)
182    return parsed_lines
183
184
185def fetch_new_log_lines(job):
186    # The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
187    # Retry the log fetching several times before exposing the error.
188    for _ in range(5):
189        with contextlib.suppress(MesaCIParseException):
190            new_log_lines = job.get_logs()
191            break
192    else:
193        raise MesaCIParseException
194    return new_log_lines
195
196
197def submit_job(job):
198    try:
199        job.submit()
200    except Exception as mesa_ci_err:
201        raise MesaCIRetriableException(
202            f"Could not submit LAVA job. Reason: {mesa_ci_err}"
203        ) from mesa_ci_err
204
205
206def wait_for_job_get_started(job, attempt_no):
207    print_log(f"Waiting for job {job.job_id} to start.")
208    while not job.is_started():
209        current_job_duration_sec: int = int(
210            (datetime.now(tz=UTC) - CI_JOB_STARTED_AT).total_seconds()
211        )
212        remaining_time_sec: int = max(0, CI_JOB_TIMEOUT_SEC - current_job_duration_sec)
213        if remaining_time_sec < EXPECTED_JOB_DURATION_SEC:
214            job.cancel()
215            raise MesaCIFatalException(
216                f"{CONSOLE_LOG['FG_BOLD_YELLOW']}"
217                f"Job {job.job_id} only has {remaining_time_sec} seconds "
218                "remaining to run, but it is expected to take at least "
219                f"{EXPECTED_JOB_DURATION_SEC} seconds."
220                f"{CONSOLE_LOG['RESET']}",
221            )
222        time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
223    job.refresh_log()
224    print_log(f"Job {job.job_id} started.")
225
226
227def bootstrap_log_follower(main_test_case, timestamp_relative_to) -> LogFollower:
228    start_section = GitlabSection(
229        id="dut_boot",
230        header="Booting hardware device",
231        type=LogSectionType.LAVA_BOOT,
232        start_collapsed=True,
233        suppress_end=True, # init-stage2 prints the end for us
234        timestamp_relative_to=timestamp_relative_to,
235    )
236    print(start_section.start())
237    return LogFollower(
238        starting_section=start_section,
239        main_test_case=main_test_case,
240        timestamp_relative_to=timestamp_relative_to
241    )
242
243
244def follow_job_execution(job, log_follower):
245    with log_follower:
246        max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
247        # Start to check job's health
248        job.heartbeat()
249        while not job.is_finished:
250            fetch_logs(job, max_idle_time, log_follower)
251            structural_log_phases(job, log_follower)
252
253    # Mesa Developers expect to have a simple pass/fail job result.
254    # If this does not happen, it probably means a LAVA infrastructure error
255    # happened.
256    if job.status not in ["pass", "fail"]:
257        raise_lava_error(job)
258
259    # LogFollower does some cleanup after the early exit (trigger by
260    # `hwci: pass|fail` regex), let's update the phases after the cleanup.
261    structural_log_phases(job, log_follower)
262
263
264def structural_log_phases(job, log_follower):
265    phases: dict[str, Any] = {
266        s.header.split(" - ")[0]: {
267            k: str(getattr(s, k)) for k in ("start_time", "end_time")
268        }
269        for s in log_follower.section_history
270    }
271    job.log["dut_job_phases"] = phases
272
273
274def print_job_final_status(job, timestamp_relative_to):
275    job.refresh_log()
276    if job.status == "running":
277        job.status = "hung"
278
279    colour = LAVAJob.COLOR_STATUS_MAP.get(job.status, CONSOLE_LOG["FG_RED"])
280    with GitlabSection(
281        "job_data",
282        f"Hardware job info for {job.status} job",
283        type=LogSectionType.LAVA_POST_PROCESSING,
284        start_collapsed=True,
285        colour=colour,
286        timestamp_relative_to=timestamp_relative_to,
287    ):
288        wait_post_processing_retries: int = WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
289        while not job.is_post_processed() and wait_post_processing_retries > 0:
290            # Wait a little until LAVA finishes processing metadata
291            time.sleep(WAIT_FOR_LAVA_POST_PROCESSING_SEC)
292            wait_post_processing_retries -= 1
293
294        if not job.is_post_processed():
295            waited_for_sec: int = (
296                WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
297                * WAIT_FOR_LAVA_POST_PROCESSING_SEC
298            )
299            print_log(
300                "Timed out waiting for LAVA post-processing after "
301                f"{waited_for_sec} seconds. Printing incomplete information "
302                "anyway."
303            )
304
305        details: dict[str, str] = job.show()
306        for field, value in details.items():
307            print(f"{field:<15}: {value}")
308        job.refresh_log()
309
310
311def execute_job_with_retries(
312    proxy, job_definition, retry_count, jobs_log, main_test_case,
313    timestamp_relative_to
314) -> Optional[LAVAJob]:
315    last_failed_job = None
316    for attempt_no in range(1, retry_count + 2):
317        # Need to get the logger value from its object to enable autosave
318        # features, if AutoSaveDict is enabled from StructuredLogging module
319        jobs_log.append({})
320        job_log = jobs_log[-1]
321        job = LAVAJob(proxy, job_definition, job_log)
322        STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no
323        try:
324            job_log["submitter_start_time"] = datetime.now(tz=UTC).isoformat()
325            submit_job(job)
326            queue_section = GitlabSection(
327                id="dut_queue",
328                header="Waiting for hardware device to become available",
329                type=LogSectionType.LAVA_QUEUE,
330                start_collapsed=False,
331                timestamp_relative_to=timestamp_relative_to
332            )
333            with queue_section as section:
334                wait_for_job_get_started(job, attempt_no)
335            log_follower: LogFollower = bootstrap_log_follower(
336                main_test_case, timestamp_relative_to
337            )
338            follow_job_execution(job, log_follower)
339            return job
340
341        except (MesaCIException, KeyboardInterrupt) as exception:
342            job.handle_exception(exception)
343
344        finally:
345            print_job_final_status(job, timestamp_relative_to)
346            # If LAVA takes too long to post process the job, the submitter
347            # gives up and proceeds.
348            job_log["submitter_end_time"] = datetime.now(tz=UTC).isoformat()
349            last_failed_job = job
350            print_log(
351                f"{CONSOLE_LOG['BOLD']}"
352                f"Finished executing LAVA job in the attempt #{attempt_no}"
353                f"{CONSOLE_LOG['RESET']}"
354            )
355            if job.exception and not isinstance(job.exception, MesaCIRetriableException):
356                break
357
358    return last_failed_job
359
360
361def retriable_follow_job(
362    proxy, job_definition, main_test_case, timestamp_relative_to
363) -> LAVAJob:
364    number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
365
366    last_attempted_job = execute_job_with_retries(
367        proxy, job_definition, number_of_retries, STRUCTURAL_LOG["dut_jobs"],
368        main_test_case, timestamp_relative_to
369    )
370
371    if last_attempted_job.exception is not None:
372        # Infra failed in all attempts
373        raise MesaCIRetryError(
374            f"{CONSOLE_LOG['BOLD']}"
375            f"{CONSOLE_LOG['FG_RED']}"
376            "Job failed after it exceeded the number of "
377            f"{number_of_retries} retries."
378            f"{CONSOLE_LOG['RESET']}",
379            retry_count=number_of_retries,
380            last_job=last_attempted_job,
381        )
382
383    return last_attempted_job
384
385
386@dataclass
387class PathResolver:
388    def __post_init__(self):
389        for field in fields(self):
390            value = getattr(self, field.name)
391            if not value:
392                continue
393            if field.type == pathlib.Path:
394                value = pathlib.Path(value)
395                setattr(self, field.name, value.resolve())
396
397
398@dataclass
399class LAVAJobSubmitter(PathResolver):
400    boot_method: str
401    device_type: str
402    farm: str
403    job_timeout_min: int  # The job timeout in minutes
404    dtb_filename: str = None
405    dump_yaml: bool = False  # Whether to dump the YAML payload to stdout
406    first_stage_init: str = None
407    jwt_file: pathlib.Path = None
408    kernel_image_name: str = None
409    kernel_image_type: str = ""
410    kernel_url_prefix: str = None
411    kernel_external: str = None
412    lava_tags: str | tuple[str, ...] = ()  # Comma-separated LAVA tags for the job
413    mesa_job_name: str = "mesa_ci_job"
414    pipeline_info: str = ""
415    rootfs_url: str = None
416    validate_only: bool = False  # Whether to only validate the job, not execute it
417    visibility_group: str = None  # Only affects LAVA farm maintainers
418    structured_log_file: pathlib.Path = None  # Log file path with structured LAVA log
419    ssh_client_image: str = None  # x86_64 SSH client image to follow the job's output
420    project_name: str = None  # Project name to be used in the job name
421    starting_section: str = None # GitLab section used to start
422    job_submitted_at: [str | datetime] = None
423    __structured_log_context = contextlib.nullcontext()  # Structured Logger context
424    _overlays: dict = field(default_factory=dict, init=False)
425
426    def __post_init__(self) -> Self:
427        super().__post_init__()
428        # Remove mesa job names with spaces, which breaks the lava-test-case command
429        self.mesa_job_name = self.mesa_job_name.split(" ")[0]
430
431        if self.structured_log_file:
432            self.__structured_log_context = StructuredLoggerWrapper(self).logger_context()
433
434        if self.job_submitted_at:
435            self.job_submitted_at = datetime.fromisoformat(self.job_submitted_at)
436        self.proxy = setup_lava_proxy()
437
438        return self
439
440    def append_overlay(
441        self, compression: str, name: str, path: str, url: str, format: str = "tar"
442    ) -> Self:
443        """
444        Append an overlay to the LAVA job definition.
445
446        Args:
447            compression (str): The compression type of the overlay (e.g., "gz", "xz").
448            name (str): The name of the overlay.
449            path (str): The path where the overlay should be applied.
450            url (str): The URL from where the overlay can be downloaded.
451            format (str, optional): The format of the overlay (default is "tar").
452
453        Returns:
454            Self: The instance of LAVAJobSubmitter with the overlay appended.
455        """
456        self._overlays[name] = {
457            "compression": compression,
458            "format": format,
459            "path": path,
460            "url": url,
461        }
462        return self
463
464    def print(self) -> Self:
465        """
466        Prints the dictionary representation of the instance and returns the instance itself.
467
468        Returns:
469            Self: The instance of the class.
470        """
471        print(self.__dict__)
472        return self
473
474    def __prepare_submission(self) -> str:
475        # Overwrite the timeout for the testcases with the value offered by the
476        # user. The testcase running time should be at least 4 times greater than
477        # the other sections (boot and setup), so we can safely ignore them.
478        # If LAVA fails to stop the job at this stage, it will fall back to the
479        # script section timeout with a reasonable delay.
480        GL_SECTION_TIMEOUTS[LogSectionType.TEST_CASE] = timedelta(
481            minutes=self.job_timeout_min
482        )
483
484        job_definition = LAVAJobDefinition(self).generate_lava_job_definition()
485
486        if self.dump_yaml:
487            self.dump_job_definition(job_definition)
488
489        validation_job = LAVAJob(self.proxy, job_definition)
490        if errors := validation_job.validate():
491            fatal_err(f"Error in LAVA job definition: {errors}")
492
493        return job_definition
494
495    @classmethod
496    def is_under_ci(cls):
497        ci_envvar: str = getenv("CI", "false")
498        return ci_envvar.lower() == "true"
499
500    def dump_job_definition(self, job_definition) -> None:
501        with GitlabSection(
502            "yaml_dump",
503            "LAVA job definition (YAML)",
504            type=LogSectionType.LAVA_BOOT,
505            start_collapsed=True,
506        ):
507            print(hide_sensitive_data(job_definition))
508
509    def submit(self) -> None:
510        """
511        Prepares and submits the LAVA job.
512        If `validate_only` is True, it validates the job without submitting it.
513        If the job finishes with a non-pass status or encounters an exception,
514        the program exits with a non-zero return code.
515        """
516        job_definition: str = self.__prepare_submission()
517
518        if self.validate_only:
519            return
520
521        if self.starting_section:
522            gl = GitlabSection(
523                id=self.starting_section,
524                header="Preparing to submit job for scheduling",
525                type=LogSectionType.LAVA_SUBMIT,
526                start_collapsed=True,
527                timestamp_relative_to=self.job_submitted_at,
528            )
529            gl.start()
530            print(gl.end())
531
532        with self.__structured_log_context:
533            last_attempt_job = None
534            try:
535                last_attempt_job = retriable_follow_job(
536                    self.proxy, job_definition,
537                    f'{self.project_name}_{self.mesa_job_name}',
538                    self.job_submitted_at)
539
540            except MesaCIRetryError as retry_exception:
541                last_attempt_job = retry_exception.last_job
542
543            except Exception as exception:
544                STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception)
545                raise exception
546
547            finally:
548                self.finish_script(last_attempt_job)
549
550    def finish_script(self, last_attempt_job):
551        if not last_attempt_job:
552            # No job was run, something bad happened
553            STRUCTURAL_LOG["job_combined_status"] = "script_crash"
554            current_exception = str(sys.exc_info()[1])
555            STRUCTURAL_LOG["job_combined_fail_reason"] = current_exception
556            print(f"Interrupting the script. Reason: {current_exception}")
557            raise SystemExit(1)
558
559        STRUCTURAL_LOG["job_combined_status"] = last_attempt_job.status
560        STRUCTURAL_LOG["job_exit_code"] = last_attempt_job.exit_code
561
562        if last_attempt_job.status != "pass":
563            raise SystemExit(last_attempt_job.exit_code)
564
565
566class StructuredLoggerWrapper:
567    def __init__(self, submitter: LAVAJobSubmitter) -> None:
568        self.__submitter: LAVAJobSubmitter = submitter
569
570    def _init_logger(self):
571        STRUCTURAL_LOG["fixed_tags"] = self.__submitter.lava_tags
572        STRUCTURAL_LOG["dut_job_type"] = self.__submitter.device_type
573        STRUCTURAL_LOG["farm"] = self.__submitter.farm
574        STRUCTURAL_LOG["job_combined_fail_reason"] = None
575        STRUCTURAL_LOG["job_combined_status"] = "not_submitted"
576        STRUCTURAL_LOG["job_exit_code"] = None
577        STRUCTURAL_LOG["dut_attempt_counter"] = 0
578
579        # Initialize dut_jobs list to enable appends
580        STRUCTURAL_LOG["dut_jobs"] = []
581
582    @contextlib.contextmanager
583    def _simple_logger_context(self):
584        log_file = pathlib.Path(self.__submitter.structured_log_file)
585        log_file.parent.mkdir(parents=True, exist_ok=True)
586        try:
587            # Truncate the file
588            log_file.write_text("")
589            yield
590        finally:
591            log_file.write_text(json.dumps(STRUCTURAL_LOG, indent=2))
592
593    def logger_context(self):
594        context = contextlib.nullcontext()
595        try:
596            global STRUCTURAL_LOG
597            STRUCTURAL_LOG = StructuredLogger(
598                self.__submitter.structured_log_file, truncate=True
599            ).data
600        except NameError:
601            context = self._simple_logger_context()
602
603        self._init_logger()
604        return context
605
606
607if __name__ == "__main__":
608    # given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
609    # GitLab runner -> GitLab primary -> user, safe to say we don't need any
610    # more buffering
611    sys.stdout.reconfigure(line_buffering=True)
612    sys.stderr.reconfigure(line_buffering=True)
613
614    fire.Fire(LAVAJobSubmitter)
615