• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 - 2023 Collabora Limited
4# Authors:
5#     Gustavo Padovan <gustavo.padovan@collabora.com>
6#     Guilherme Gallo <guilherme.gallo@collabora.com>
7#
8# SPDX-License-Identifier: MIT
9
10"""Send a job to LAVA, track it and collect log back"""
11
12import contextlib
13import json
14import pathlib
15import sys
16import time
17from collections import defaultdict
18from dataclasses import dataclass, fields
19from datetime import datetime, timedelta
20from os import environ, getenv, path
21from typing import Any, Optional
22
23import fire
24from lavacli.utils import flow_yaml as lava_yaml
25
26from lava.exceptions import (
27    MesaCIException,
28    MesaCIParseException,
29    MesaCIRetryError,
30    MesaCITimeoutError,
31)
32from lava.utils import (
33    CONSOLE_LOG,
34    GitlabSection,
35    LAVAJob,
36    LAVAJobDefinition,
37    LogFollower,
38    LogSectionType,
39    call_proxy,
40    fatal_err,
41    hide_sensitive_data,
42    print_log,
43    setup_lava_proxy,
44)
45from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS
46
47# Initialize structural logging with a defaultdict, it can be changed for more
48# sophisticated dict-like data abstractions.
49STRUCTURAL_LOG = defaultdict(list)
50
51try:
52    from ci.structured_logger import StructuredLogger
53except ImportError as e:
54    print_log(
55        f"Could not import StructuredLogger library: {e}. "
56        "Falling back to defaultdict based structured logger."
57    )
58
59# Timeout in seconds to decide if the device from the dispatched LAVA job has
60# hung or not due to the lack of new log output.
61DEVICE_HANGING_TIMEOUT_SEC = int(getenv("DEVICE_HANGING_TIMEOUT_SEC",  5*60))
62
63# How many seconds the script should wait before try a new polling iteration to
64# check if the dispatched LAVA job is running or waiting in the job queue.
65WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(
66    getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 1)
67)
68
69# How many seconds the script will wait to let LAVA finalize the job and give
70# the final details.
71WAIT_FOR_LAVA_POST_PROCESSING_SEC = int(getenv("LAVA_WAIT_LAVA_POST_PROCESSING_SEC", 5))
72WAIT_FOR_LAVA_POST_PROCESSING_RETRIES = int(
73    getenv("LAVA_WAIT_LAVA_POST_PROCESSING_RETRIES", 6)
74)
75
76# How many seconds to wait between log output LAVA RPC calls.
77LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5))
78
79# How many retries should be made when a timeout happen.
80NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(
81    getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2)
82)
83
84
85def raise_exception_from_metadata(metadata: dict, job_id: int) -> None:
86    """
87    Investigate infrastructure errors from the job metadata.
88    If it finds an error, raise it as MesaCIException.
89    """
90    if "result" not in metadata or metadata["result"] != "fail":
91        return
92    if "error_type" in metadata:
93        error_type = metadata["error_type"]
94        if error_type == "Infrastructure":
95            raise MesaCIException(
96                f"LAVA job {job_id} failed with Infrastructure Error. Retry."
97            )
98        if error_type == "Job":
99            # This happens when LAVA assumes that the job cannot terminate or
100            # with mal-formed job definitions. As we are always validating the
101            # jobs, only the former is probable to happen. E.g.: When some LAVA
102            # action timed out more times than expected in job definition.
103            raise MesaCIException(
104                f"LAVA job {job_id} failed with JobError "
105                "(possible LAVA timeout misconfiguration/bug). Retry."
106            )
107    if "case" in metadata and metadata["case"] == "validate":
108        raise MesaCIException(
109            f"LAVA job {job_id} failed validation (possible download error). Retry."
110        )
111
112
113def raise_lava_error(job) -> None:
114    # Look for infrastructure errors, raise them, and retry if we see them.
115    results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
116    results = lava_yaml.load(results_yaml)
117    for res in results:
118        metadata = res["metadata"]
119        raise_exception_from_metadata(metadata, job.job_id)
120
121    # If we reach this far, it means that the job ended without hwci script
122    # result and no LAVA infrastructure problem was found
123    job.status = "fail"
124
125
126def show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{CONSOLE_LOG['FG_GREEN']}"):
127    with GitlabSection(
128        "job_data",
129        "LAVA job info",
130        type=LogSectionType.LAVA_POST_PROCESSING,
131        start_collapsed=True,
132        colour=colour,
133    ):
134        wait_post_processing_retries: int = WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
135        while not job.is_post_processed() and wait_post_processing_retries > 0:
136            # Wait a little until LAVA finishes processing metadata
137            time.sleep(WAIT_FOR_LAVA_POST_PROCESSING_SEC)
138            wait_post_processing_retries -= 1
139
140        if not job.is_post_processed():
141            waited_for_sec: int = (
142                WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
143                * WAIT_FOR_LAVA_POST_PROCESSING_SEC
144            )
145            print_log(
146                f"Waited for {waited_for_sec} seconds "
147                "for LAVA to post-process the job, it haven't finished yet. "
148                "Dumping it's info anyway"
149            )
150
151        details: dict[str, str] = job.show()
152        for field, value in details.items():
153            print(f"{field:<15}: {value}")
154        job.refresh_log()
155
156
157def fetch_logs(job, max_idle_time, log_follower) -> None:
158    is_job_hanging(job, max_idle_time)
159
160    time.sleep(LOG_POLLING_TIME_SEC)
161    new_log_lines = fetch_new_log_lines(job)
162    parsed_lines = parse_log_lines(job, log_follower, new_log_lines)
163
164    for line in parsed_lines:
165        print_log(line)
166
167
168def is_job_hanging(job, max_idle_time):
169    # Poll to check for new logs, assuming that a prolonged period of
170    # silence means that the device has died and we should try it again
171    if datetime.now() - job.last_log_time > max_idle_time:
172        max_idle_time_min = max_idle_time.total_seconds() / 60
173
174        raise MesaCITimeoutError(
175            f"{CONSOLE_LOG['BOLD']}"
176            f"{CONSOLE_LOG['FG_YELLOW']}"
177            f"LAVA job {job.job_id} does not respond for {max_idle_time_min} "
178            "minutes. Retry."
179            f"{CONSOLE_LOG['RESET']}",
180            timeout_duration=max_idle_time,
181        )
182
183
184def parse_log_lines(job, log_follower, new_log_lines):
185
186    if log_follower.feed(new_log_lines):
187        # If we had non-empty log data, we can assure that the device is alive.
188        job.heartbeat()
189    parsed_lines = log_follower.flush()
190
191    # Only parse job results when the script reaches the end of the logs.
192    # Depending on how much payload the RPC scheduler.jobs.logs get, it may
193    # reach the LAVA_POST_PROCESSING phase.
194    if log_follower.current_section.type in (
195        LogSectionType.TEST_CASE,
196        LogSectionType.LAVA_POST_PROCESSING,
197    ):
198        parsed_lines = job.parse_job_result_from_log(parsed_lines)
199    return parsed_lines
200
201
202def fetch_new_log_lines(job):
203
204    # The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
205    # Retry the log fetching several times before exposing the error.
206    for _ in range(5):
207        with contextlib.suppress(MesaCIParseException):
208            new_log_lines = job.get_logs()
209            break
210    else:
211        raise MesaCIParseException
212    return new_log_lines
213
214
215def submit_job(job):
216    try:
217        job.submit()
218    except Exception as mesa_ci_err:
219        raise MesaCIException(
220            f"Could not submit LAVA job. Reason: {mesa_ci_err}"
221        ) from mesa_ci_err
222
223
224def wait_for_job_get_started(job):
225    print_log(f"Waiting for job {job.job_id} to start.")
226    while not job.is_started():
227        time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
228    job.refresh_log()
229    print_log(f"Job {job.job_id} started.")
230
231
232def bootstrap_log_follower() -> LogFollower:
233    gl = GitlabSection(
234        id="lava_boot",
235        header="LAVA boot",
236        type=LogSectionType.LAVA_BOOT,
237        start_collapsed=True,
238    )
239    print(gl.start())
240    return LogFollower(starting_section=gl)
241
242
243def follow_job_execution(job, log_follower):
244    with log_follower:
245        max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
246        # Start to check job's health
247        job.heartbeat()
248        while not job.is_finished:
249            fetch_logs(job, max_idle_time, log_follower)
250            structural_log_phases(job, log_follower)
251
252    # Mesa Developers expect to have a simple pass/fail job result.
253    # If this does not happen, it probably means a LAVA infrastructure error
254    # happened.
255    if job.status not in ["pass", "fail"]:
256        raise_lava_error(job)
257
258    # LogFollower does some cleanup after the early exit (trigger by
259    # `hwci: pass|fail` regex), let's update the phases after the cleanup.
260    structural_log_phases(job, log_follower)
261
262
263def structural_log_phases(job, log_follower):
264    phases: dict[str, Any] = {
265        s.header.split(" - ")[0]: {
266            k: str(getattr(s, k)) for k in ("start_time", "end_time")
267        }
268        for s in log_follower.section_history
269    }
270    job.log["dut_job_phases"] = phases
271
272
273def print_job_final_status(job):
274    if job.status == "running":
275        job.status = "hung"
276
277    color = LAVAJob.COLOR_STATUS_MAP.get(job.status, CONSOLE_LOG["FG_RED"])
278    print_log(
279        f"{color}"
280        f"LAVA Job finished with status: {job.status}"
281        f"{CONSOLE_LOG['RESET']}"
282    )
283
284    job.refresh_log()
285    show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{color}")
286
287
288def execute_job_with_retries(
289    proxy, job_definition, retry_count, jobs_log
290) -> Optional[LAVAJob]:
291    last_failed_job = None
292    for attempt_no in range(1, retry_count + 2):
293        # Need to get the logger value from its object to enable autosave
294        # features, if AutoSaveDict is enabled from StructuredLogging module
295        jobs_log.append({})
296        job_log = jobs_log[-1]
297        job = LAVAJob(proxy, job_definition, job_log)
298        STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no
299        try:
300            job_log["submitter_start_time"] = datetime.now().isoformat()
301            submit_job(job)
302            wait_for_job_get_started(job)
303            log_follower: LogFollower = bootstrap_log_follower()
304            follow_job_execution(job, log_follower)
305            return job
306
307        except (MesaCIException, KeyboardInterrupt) as exception:
308            job.handle_exception(exception)
309
310        finally:
311            print_job_final_status(job)
312            # If LAVA takes too long to post process the job, the submitter
313            # gives up and proceeds.
314            job_log["submitter_end_time"] = datetime.now().isoformat()
315            last_failed_job = job
316            print_log(
317                f"{CONSOLE_LOG['BOLD']}"
318                f"Finished executing LAVA job in the attempt #{attempt_no}"
319                f"{CONSOLE_LOG['RESET']}"
320            )
321
322    return last_failed_job
323
324
325def retriable_follow_job(proxy, job_definition) -> LAVAJob:
326    number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
327
328    last_attempted_job = execute_job_with_retries(
329        proxy, job_definition, number_of_retries, STRUCTURAL_LOG["dut_jobs"]
330    )
331
332    if last_attempted_job.exception is not None:
333        # Infra failed in all attempts
334        raise MesaCIRetryError(
335            f"{CONSOLE_LOG['BOLD']}"
336            f"{CONSOLE_LOG['FG_RED']}"
337            "Job failed after it exceeded the number of "
338            f"{number_of_retries} retries."
339            f"{CONSOLE_LOG['RESET']}",
340            retry_count=number_of_retries,
341            last_job=last_attempted_job,
342        )
343
344    return last_attempted_job
345
346
347@dataclass
348class PathResolver:
349    def __post_init__(self):
350        for field in fields(self):
351            value = getattr(self, field.name)
352            if not value:
353                continue
354            if field.type == pathlib.Path:
355                value = pathlib.Path(value)
356                setattr(self, field.name, value.resolve())
357
358
359@dataclass
360class LAVAJobSubmitter(PathResolver):
361    boot_method: str
362    ci_project_dir: str
363    device_type: str
364    job_timeout_min: int  # The job timeout in minutes
365    build_url: str = None
366    dtb_filename: str = None
367    dump_yaml: bool = False  # Whether to dump the YAML payload to stdout
368    first_stage_init: str = None
369    jwt_file: pathlib.Path = None
370    kernel_image_name: str = None
371    kernel_image_type: str = ""
372    kernel_url_prefix: str = None
373    kernel_external: str = None
374    lava_tags: str = ""  # Comma-separated LAVA tags for the job
375    mesa_job_name: str = "mesa_ci_job"
376    pipeline_info: str = ""
377    rootfs_url_prefix: str = None
378    validate_only: bool = False  # Whether to only validate the job, not execute it
379    visibility_group: str = None  # Only affects LAVA farm maintainers
380    job_rootfs_overlay_url: str = None
381    structured_log_file: pathlib.Path = None  # Log file path with structured LAVA log
382    ssh_client_image: str = None  # x86_64 SSH client image to follow the job's output
383    project_name: str = None  # Project name to be used in the job name
384    __structured_log_context = contextlib.nullcontext()  # Structured Logger context
385
386    def __post_init__(self) -> None:
387        super().__post_init__()
388        # Remove mesa job names with spaces, which breaks the lava-test-case command
389        self.mesa_job_name = self.mesa_job_name.split(" ")[0]
390
391        if not self.structured_log_file:
392            return
393
394        self.__structured_log_context = StructuredLoggerWrapper(self).logger_context()
395        self.proxy = setup_lava_proxy()
396
397    def __prepare_submission(self) -> str:
398        # Overwrite the timeout for the testcases with the value offered by the
399        # user. The testcase running time should be at least 4 times greater than
400        # the other sections (boot and setup), so we can safely ignore them.
401        # If LAVA fails to stop the job at this stage, it will fall back to the
402        # script section timeout with a reasonable delay.
403        GL_SECTION_TIMEOUTS[LogSectionType.TEST_CASE] = timedelta(
404            minutes=self.job_timeout_min
405        )
406
407        job_definition = LAVAJobDefinition(self).generate_lava_job_definition()
408
409        if self.dump_yaml:
410            self.dump_job_definition(job_definition)
411
412        validation_job = LAVAJob(self.proxy, job_definition)
413        if errors := validation_job.validate():
414            fatal_err(f"Error in LAVA job definition: {errors}")
415        print_log("LAVA job definition validated successfully")
416
417        return job_definition
418
419    @classmethod
420    def is_under_ci(cls):
421        ci_envvar: str = getenv("CI", "false")
422        return ci_envvar.lower() == "true"
423
424    def dump_job_definition(self, job_definition) -> None:
425        with GitlabSection(
426            "yaml_dump",
427            "LAVA job definition (YAML)",
428            type=LogSectionType.LAVA_BOOT,
429            start_collapsed=True,
430        ):
431            print(hide_sensitive_data(job_definition))
432
433    def submit(self) -> None:
434        """
435        Prepares and submits the LAVA job.
436        If `validate_only` is True, it validates the job without submitting it.
437        If the job finishes with a non-pass status or encounters an exception,
438        the program exits with a non-zero return code.
439        """
440        job_definition: str = self.__prepare_submission()
441
442        if self.validate_only:
443            return
444
445        with self.__structured_log_context:
446            last_attempt_job = None
447            try:
448                last_attempt_job = retriable_follow_job(self.proxy, job_definition)
449
450            except MesaCIRetryError as retry_exception:
451                last_attempt_job = retry_exception.last_job
452
453            except Exception as exception:
454                STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception)
455                raise exception
456
457            finally:
458                self.finish_script(last_attempt_job)
459
460    def print_log_artifact_url(self):
461        relative_log_path = self.structured_log_file.relative_to(pathlib.Path.cwd())
462        full_path = f"$ARTIFACTS_BASE_URL/{relative_log_path}"
463        artifact_url = path.expandvars(full_path)
464
465        print_log(f"Structural Logging data available at: {artifact_url}")
466
467    def finish_script(self, last_attempt_job):
468        if self.is_under_ci() and self.structured_log_file:
469            self.print_log_artifact_url()
470
471        if not last_attempt_job:
472            # No job was run, something bad happened
473            STRUCTURAL_LOG["job_combined_status"] = "script_crash"
474            current_exception = str(sys.exc_info()[0])
475            STRUCTURAL_LOG["job_combined_fail_reason"] = current_exception
476            raise SystemExit(1)
477
478        STRUCTURAL_LOG["job_combined_status"] = last_attempt_job.status
479
480        if last_attempt_job.status != "pass":
481            raise SystemExit(1)
482
483
484class StructuredLoggerWrapper:
485    def __init__(self, submitter: LAVAJobSubmitter) -> None:
486        self.__submitter: LAVAJobSubmitter = submitter
487
488    def _init_logger(self):
489        STRUCTURAL_LOG["fixed_tags"] = self.__submitter.lava_tags
490        STRUCTURAL_LOG["dut_job_type"] = self.__submitter.device_type
491        STRUCTURAL_LOG["job_combined_fail_reason"] = None
492        STRUCTURAL_LOG["job_combined_status"] = "not_submitted"
493        STRUCTURAL_LOG["dut_attempt_counter"] = 0
494
495        # Initialize dut_jobs list to enable appends
496        STRUCTURAL_LOG["dut_jobs"] = []
497
498    @contextlib.contextmanager
499    def _simple_logger_context(self):
500        log_file = pathlib.Path(self.__submitter.structured_log_file)
501        log_file.parent.mkdir(parents=True, exist_ok=True)
502        try:
503            # Truncate the file
504            log_file.write_text("")
505            yield
506        finally:
507            log_file.write_text(json.dumps(STRUCTURAL_LOG, indent=2))
508
509    def logger_context(self):
510        context = contextlib.nullcontext()
511        try:
512
513            global STRUCTURAL_LOG
514            STRUCTURAL_LOG = StructuredLogger(
515                self.__submitter.structured_log_file, truncate=True
516            ).data
517        except NameError:
518            context = self._simple_logger_context()
519
520        self._init_logger()
521        return context
522
523
524if __name__ == "__main__":
525    # given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
526    # GitLab runner -> GitLab primary -> user, safe to say we don't need any
527    # more buffering
528    sys.stdout.reconfigure(line_buffering=True)
529    sys.stderr.reconfigure(line_buffering=True)
530    # LAVA farm is giving datetime in UTC timezone, let's set it locally for the
531    # script run.
532    # Setting environ here will not affect the system time, as the os.environ
533    # lifetime follows the script one.
534    environ["TZ"] = "UTC"
535    time.tzset()
536
537    fire.Fire(LAVAJobSubmitter)
538