• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import re
2import xmlrpc
3from collections import defaultdict
4from datetime import datetime, UTC
5from typing import Any, Optional
6
7from lava.exceptions import (
8    MesaCIException,
9    MesaCIRetriableException,
10    MesaCIKnownIssueException,
11    MesaCIParseException,
12    MesaCITimeoutError,
13)
14from lava.utils import CONSOLE_LOG
15from lava.utils.log_follower import print_log
16from lavacli.utils import flow_yaml as lava_yaml
17
18from .lava_proxy import call_proxy
19
20
21class LAVAJob:
22    COLOR_STATUS_MAP: dict[str, str] = {
23        "pass": CONSOLE_LOG["FG_GREEN"],
24        "hung": CONSOLE_LOG["FG_BOLD_YELLOW"],
25        "fail": CONSOLE_LOG["FG_BOLD_RED"],
26        "canceled": CONSOLE_LOG["FG_BOLD_MAGENTA"],
27    }
28
29    def __init__(self, proxy, definition, log=defaultdict(str)) -> None:
30        self._job_id = None
31        self.proxy = proxy
32        self.definition = definition
33        self.last_log_line = 0
34        self.last_log_time = None
35        self._is_finished = False
36        self.log: dict[str, Any] = log
37        self.status = "not_submitted"
38        # Set the default exit code to 1 because we should set it to 0 only if the job has passed.
39        # If it fails or if it is interrupted, the exit code should be set to a non-zero value to
40        # make the GitLab job fail.
41        self._exit_code: int = 1
42        self.__exception: Optional[Exception] = None
43
44    def heartbeat(self) -> None:
45        self.last_log_time: datetime = datetime.now(tz=UTC)
46        self.status = "running"
47
48    @property
49    def status(self) -> str:
50        return self._status
51
52    @status.setter
53    def status(self, new_status: str) -> None:
54        self._status = new_status
55        self.log["status"] = self._status
56
57    @property
58    def exit_code(self) -> int:
59        return self._exit_code
60
61    @exit_code.setter
62    def exit_code(self, code: int) -> None:
63        self._exit_code = code
64        self.log["exit_code"] = self._exit_code
65
66    @property
67    def job_id(self) -> int:
68        return self._job_id
69
70    @job_id.setter
71    def job_id(self, new_id: int) -> None:
72        self._job_id = new_id
73        self.log["lava_job_id"] = self._job_id
74
75    @property
76    def is_finished(self) -> bool:
77        return self._is_finished
78
79    @property
80    def exception(self) -> Optional[Exception]:
81        return self.__exception
82
83    @exception.setter
84    def exception(self, exception: Exception) -> None:
85        self.__exception = exception
86        self.log["dut_job_fail_reason"] = repr(self.__exception)
87
88    def validate(self) -> Optional[dict]:
89        """Returns a dict with errors, if the validation fails.
90
91        Returns:
92            Optional[dict]: a dict with the validation errors, if any
93        """
94        return call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True)
95
96    def show(self) -> dict[str, str]:
97        return call_proxy(self.proxy.scheduler.jobs.show, self._job_id)
98
99    def get_lava_time(self, key, data) -> Optional[str]:
100        return data[key].value if data[key] else None
101
102    def refresh_log(self) -> None:
103        details = self.show()
104        self.log["dut_start_time"] = self.get_lava_time("start_time", details)
105        self.log["dut_submit_time"] = self.get_lava_time("submit_time", details)
106        self.log["dut_end_time"] = self.get_lava_time("end_time", details)
107        self.log["dut_name"] = details.get("device")
108        self.log["dut_state"] = details.get("state")
109
110    def submit(self) -> bool:
111        try:
112            self.job_id = call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
113            self.status = "submitted"
114            self.refresh_log()
115        except MesaCIException:
116            return False
117        return True
118
119    def lava_state(self) -> str:
120        job_state: dict[str, str] = call_proxy(
121            self.proxy.scheduler.job_state, self._job_id
122        )
123        return job_state["job_state"]
124
125    def cancel(self):
126        if self._job_id:
127            self.proxy.scheduler.jobs.cancel(self._job_id)
128            # If we don't have yet set another job's status, let's update it
129            # with canceled one
130            if self.status == "running":
131                self.status = "canceled"
132
133    def is_started(self) -> bool:
134        waiting_states = ("Submitted", "Scheduling", "Scheduled")
135        return self.lava_state() not in waiting_states
136
137    def is_post_processed(self) -> bool:
138        return self.lava_state() != "Running"
139
140    def _load_log_from_data(self, data) -> list[str]:
141        lines = []
142        if isinstance(data, xmlrpc.client.Binary):
143            # We are dealing with xmlrpc.client.Binary
144            # Let's extract the data
145            data = data.data
146        # When there is no new log data, the YAML is empty
147        if loaded_lines := lava_yaml.load(data):
148            lines: list[str] = loaded_lines
149            self.last_log_line += len(lines)
150        return lines
151
152    def get_logs(self) -> list[str]:
153        try:
154            (finished, data) = call_proxy(
155                self.proxy.scheduler.jobs.logs, self._job_id, self.last_log_line
156            )
157            self._is_finished = finished
158            return self._load_log_from_data(data)
159
160        except Exception as mesa_ci_err:
161            raise MesaCIParseException(
162                f"Could not get LAVA job logs. Reason: {mesa_ci_err}"
163            ) from mesa_ci_err
164
165    def parse_job_result_from_log(
166        self, lava_lines: list[dict[str, str]]
167    ) -> list[dict[str, str]]:
168        """Use the console log to catch if the job has completed successfully or
169        not. Returns the list of log lines until the result line."""
170
171        last_line = None  # Print all lines. lines[:None] == lines[:]
172
173        for idx, line in enumerate(lava_lines):
174            if result := re.search(r"hwci: mesa: (pass|fail), exit_code: (\d+)", line):
175                self._is_finished = True
176                self.status = result.group(1)
177                self.exit_code = int(result.group(2))
178
179                last_line = idx
180                # We reached the log end here. hwci script has finished.
181                break
182        return lava_lines[:last_line]
183
184    def handle_exception(self, exception: Exception):
185        print_log(exception)
186        self.cancel()
187        self.exception = exception
188
189        # Set the exit code to nonzero value
190        self.exit_code = 1
191
192        # Give more accurate status depending on exception
193        if isinstance(exception, MesaCIKnownIssueException):
194            self.status = "canceled"
195        elif isinstance(exception, MesaCITimeoutError):
196            self.status = "hung"
197        elif isinstance(exception, MesaCIRetriableException):
198            self.status = "failed"
199        elif isinstance(exception, KeyboardInterrupt):
200            self.status = "interrupted"
201            print_log("LAVA job submitter was interrupted. Cancelling the job.")
202            raise
203        elif isinstance(exception, MesaCIException):
204            self.status = "interrupted"
205            print_log("LAVA job submitter was interrupted. Cancelling the job.")
206            raise
207        else:
208            self.status = "job_submitter_error"
209