• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import re
2import xmlrpc
3from collections import defaultdict
4from datetime import datetime
5from typing import Any, Optional
6
7from lava.exceptions import (
8    MesaCIException,
9    MesaCIKnownIssueException,
10    MesaCIParseException,
11    MesaCITimeoutError,
12)
13from lava.utils import CONSOLE_LOG
14from lava.utils.log_follower import print_log
15from lavacli.utils import flow_yaml as lava_yaml
16
17from .lava_proxy import call_proxy
18
19
20class LAVAJob:
21    COLOR_STATUS_MAP: dict[str, str] = {
22        "pass": CONSOLE_LOG["FG_GREEN"],
23        "hung": CONSOLE_LOG["FG_YELLOW"],
24        "fail": CONSOLE_LOG["FG_RED"],
25        "canceled": CONSOLE_LOG["FG_MAGENTA"],
26    }
27
28    def __init__(self, proxy, definition, log=defaultdict(str)) -> None:
29        self._job_id = None
30        self.proxy = proxy
31        self.definition = definition
32        self.last_log_line = 0
33        self.last_log_time = None
34        self._is_finished = False
35        self.log: dict[str, Any] = log
36        self.status = "not_submitted"
37        self.__exception: Optional[str] = None
38
39    def heartbeat(self) -> None:
40        self.last_log_time: datetime = datetime.now()
41        self.status = "running"
42
43    @property
44    def status(self) -> str:
45        return self._status
46
47    @status.setter
48    def status(self, new_status: str) -> None:
49        self._status = new_status
50        self.log["status"] = self._status
51
52    @property
53    def job_id(self) -> int:
54        return self._job_id
55
56    @job_id.setter
57    def job_id(self, new_id: int) -> None:
58        self._job_id = new_id
59        self.log["lava_job_id"] = self._job_id
60
61    @property
62    def is_finished(self) -> bool:
63        return self._is_finished
64
65    @property
66    def exception(self) -> str:
67        return self.__exception
68
69    @exception.setter
70    def exception(self, exception: Exception) -> None:
71        self.__exception = repr(exception)
72        self.log["dut_job_fail_reason"] = self.__exception
73
74    def validate(self) -> Optional[dict]:
75        """Returns a dict with errors, if the validation fails.
76
77        Returns:
78            Optional[dict]: a dict with the validation errors, if any
79        """
80        return call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True)
81
82    def show(self) -> dict[str, str]:
83        return call_proxy(self.proxy.scheduler.jobs.show, self._job_id)
84
85    def get_lava_time(self, key, data) -> Optional[str]:
86        return data[key].value if data[key] else None
87
88    def refresh_log(self) -> None:
89        details = self.show()
90        self.log["dut_start_time"] = self.get_lava_time("start_time", details)
91        self.log["dut_submit_time"] = self.get_lava_time("submit_time", details)
92        self.log["dut_end_time"] = self.get_lava_time("end_time", details)
93        self.log["dut_name"] = details.get("device")
94        self.log["dut_state"] = details.get("state")
95
96    def submit(self) -> bool:
97        try:
98            self.job_id = call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
99            self.status = "submitted"
100            self.refresh_log()
101        except MesaCIException:
102            return False
103        return True
104
105    def lava_state(self) -> str:
106        job_state: dict[str, str] = call_proxy(
107            self.proxy.scheduler.job_state, self._job_id
108        )
109        return job_state["job_state"]
110
111    def cancel(self):
112        if self._job_id:
113            self.proxy.scheduler.jobs.cancel(self._job_id)
114            # If we don't have yet set another job's status, let's update it
115            # with canceled one
116            if self.status == "running":
117                self.status = "canceled"
118
119    def is_started(self) -> bool:
120        waiting_states = ("Submitted", "Scheduling", "Scheduled")
121        return self.lava_state() not in waiting_states
122
123    def is_post_processed(self) -> bool:
124        return self.lava_state() != "Running"
125
126    def _load_log_from_data(self, data) -> list[str]:
127        lines = []
128        if isinstance(data, xmlrpc.client.Binary):
129            # We are dealing with xmlrpc.client.Binary
130            # Let's extract the data
131            data = data.data
132        # When there is no new log data, the YAML is empty
133        if loaded_lines := lava_yaml.load(data):
134            lines: list[str] = loaded_lines
135            self.last_log_line += len(lines)
136        return lines
137
138    def get_logs(self) -> list[str]:
139        try:
140            (finished, data) = call_proxy(
141                self.proxy.scheduler.jobs.logs, self._job_id, self.last_log_line
142            )
143            self._is_finished = finished
144            return self._load_log_from_data(data)
145
146        except Exception as mesa_ci_err:
147            raise MesaCIParseException(
148                f"Could not get LAVA job logs. Reason: {mesa_ci_err}"
149            ) from mesa_ci_err
150
151    def parse_job_result_from_log(
152        self, lava_lines: list[dict[str, str]]
153    ) -> list[dict[str, str]]:
154        """Use the console log to catch if the job has completed successfully or
155        not. Returns the list of log lines until the result line."""
156
157        last_line = None  # Print all lines. lines[:None] == lines[:]
158
159        for idx, line in enumerate(lava_lines):
160            if result := re.search(r"hwci: mesa: (pass|fail)", line):
161                self._is_finished = True
162                self.status = result[1]
163
164                last_line = idx + 1
165                # We reached the log end here. hwci script has finished.
166                break
167        return lava_lines[:last_line]
168
169    def handle_exception(self, exception: Exception):
170        print_log(exception)
171        self.cancel()
172        self.exception = exception
173
174        # Give more accurate status depending on exception
175        if isinstance(exception, MesaCIKnownIssueException):
176            self.status = "canceled"
177        elif isinstance(exception, MesaCITimeoutError):
178            self.status = "hung"
179        elif isinstance(exception, MesaCIException):
180            self.status = "failed"
181        elif isinstance(exception, KeyboardInterrupt):
182            self.status = "interrupted"
183            print_log("LAVA job submitter was interrupted. Cancelling the job.")
184            raise
185        else:
186            self.status = "job_submitter_error"
187