1import re 2import xmlrpc 3from collections import defaultdict 4from datetime import datetime, UTC 5from typing import Any, Optional 6 7from lava.exceptions import ( 8 MesaCIException, 9 MesaCIRetriableException, 10 MesaCIKnownIssueException, 11 MesaCIParseException, 12 MesaCITimeoutError, 13) 14from lava.utils import CONSOLE_LOG 15from lava.utils.log_follower import print_log 16from lavacli.utils import flow_yaml as lava_yaml 17 18from .lava_proxy import call_proxy 19 20 21class LAVAJob: 22 COLOR_STATUS_MAP: dict[str, str] = { 23 "pass": CONSOLE_LOG["FG_GREEN"], 24 "hung": CONSOLE_LOG["FG_BOLD_YELLOW"], 25 "fail": CONSOLE_LOG["FG_BOLD_RED"], 26 "canceled": CONSOLE_LOG["FG_BOLD_MAGENTA"], 27 } 28 29 def __init__(self, proxy, definition, log=defaultdict(str)) -> None: 30 self._job_id = None 31 self.proxy = proxy 32 self.definition = definition 33 self.last_log_line = 0 34 self.last_log_time = None 35 self._is_finished = False 36 self.log: dict[str, Any] = log 37 self.status = "not_submitted" 38 # Set the default exit code to 1 because we should set it to 0 only if the job has passed. 39 # If it fails or if it is interrupted, the exit code should be set to a non-zero value to 40 # make the GitLab job fail. 41 self._exit_code: int = 1 42 self.__exception: Optional[Exception] = None 43 44 def heartbeat(self) -> None: 45 self.last_log_time: datetime = datetime.now(tz=UTC) 46 self.status = "running" 47 48 @property 49 def status(self) -> str: 50 return self._status 51 52 @status.setter 53 def status(self, new_status: str) -> None: 54 self._status = new_status 55 self.log["status"] = self._status 56 57 @property 58 def exit_code(self) -> int: 59 return self._exit_code 60 61 @exit_code.setter 62 def exit_code(self, code: int) -> None: 63 self._exit_code = code 64 self.log["exit_code"] = self._exit_code 65 66 @property 67 def job_id(self) -> int: 68 return self._job_id 69 70 @job_id.setter 71 def job_id(self, new_id: int) -> None: 72 self._job_id = new_id 73 self.log["lava_job_id"] = self._job_id 74 75 @property 76 def is_finished(self) -> bool: 77 return self._is_finished 78 79 @property 80 def exception(self) -> Optional[Exception]: 81 return self.__exception 82 83 @exception.setter 84 def exception(self, exception: Exception) -> None: 85 self.__exception = exception 86 self.log["dut_job_fail_reason"] = repr(self.__exception) 87 88 def validate(self) -> Optional[dict]: 89 """Returns a dict with errors, if the validation fails. 90 91 Returns: 92 Optional[dict]: a dict with the validation errors, if any 93 """ 94 return call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True) 95 96 def show(self) -> dict[str, str]: 97 return call_proxy(self.proxy.scheduler.jobs.show, self._job_id) 98 99 def get_lava_time(self, key, data) -> Optional[str]: 100 return data[key].value if data[key] else None 101 102 def refresh_log(self) -> None: 103 details = self.show() 104 self.log["dut_start_time"] = self.get_lava_time("start_time", details) 105 self.log["dut_submit_time"] = self.get_lava_time("submit_time", details) 106 self.log["dut_end_time"] = self.get_lava_time("end_time", details) 107 self.log["dut_name"] = details.get("device") 108 self.log["dut_state"] = details.get("state") 109 110 def submit(self) -> bool: 111 try: 112 self.job_id = call_proxy(self.proxy.scheduler.jobs.submit, self.definition) 113 self.status = "submitted" 114 self.refresh_log() 115 except MesaCIException: 116 return False 117 return True 118 119 def lava_state(self) -> str: 120 job_state: dict[str, str] = call_proxy( 121 self.proxy.scheduler.job_state, self._job_id 122 ) 123 return job_state["job_state"] 124 125 def cancel(self): 126 if self._job_id: 127 self.proxy.scheduler.jobs.cancel(self._job_id) 128 # If we don't have yet set another job's status, let's update it 129 # with canceled one 130 if self.status == "running": 131 self.status = "canceled" 132 133 def is_started(self) -> bool: 134 waiting_states = ("Submitted", "Scheduling", "Scheduled") 135 return self.lava_state() not in waiting_states 136 137 def is_post_processed(self) -> bool: 138 return self.lava_state() != "Running" 139 140 def _load_log_from_data(self, data) -> list[str]: 141 lines = [] 142 if isinstance(data, xmlrpc.client.Binary): 143 # We are dealing with xmlrpc.client.Binary 144 # Let's extract the data 145 data = data.data 146 # When there is no new log data, the YAML is empty 147 if loaded_lines := lava_yaml.load(data): 148 lines: list[str] = loaded_lines 149 self.last_log_line += len(lines) 150 return lines 151 152 def get_logs(self) -> list[str]: 153 try: 154 (finished, data) = call_proxy( 155 self.proxy.scheduler.jobs.logs, self._job_id, self.last_log_line 156 ) 157 self._is_finished = finished 158 return self._load_log_from_data(data) 159 160 except Exception as mesa_ci_err: 161 raise MesaCIParseException( 162 f"Could not get LAVA job logs. Reason: {mesa_ci_err}" 163 ) from mesa_ci_err 164 165 def parse_job_result_from_log( 166 self, lava_lines: list[dict[str, str]] 167 ) -> list[dict[str, str]]: 168 """Use the console log to catch if the job has completed successfully or 169 not. Returns the list of log lines until the result line.""" 170 171 last_line = None # Print all lines. lines[:None] == lines[:] 172 173 for idx, line in enumerate(lava_lines): 174 if result := re.search(r"hwci: mesa: (pass|fail), exit_code: (\d+)", line): 175 self._is_finished = True 176 self.status = result.group(1) 177 self.exit_code = int(result.group(2)) 178 179 last_line = idx 180 # We reached the log end here. hwci script has finished. 181 break 182 return lava_lines[:last_line] 183 184 def handle_exception(self, exception: Exception): 185 print_log(exception) 186 self.cancel() 187 self.exception = exception 188 189 # Set the exit code to nonzero value 190 self.exit_code = 1 191 192 # Give more accurate status depending on exception 193 if isinstance(exception, MesaCIKnownIssueException): 194 self.status = "canceled" 195 elif isinstance(exception, MesaCITimeoutError): 196 self.status = "hung" 197 elif isinstance(exception, MesaCIRetriableException): 198 self.status = "failed" 199 elif isinstance(exception, KeyboardInterrupt): 200 self.status = "interrupted" 201 print_log("LAVA job submitter was interrupted. Cancelling the job.") 202 raise 203 elif isinstance(exception, MesaCIException): 204 self.status = "interrupted" 205 print_log("LAVA job submitter was interrupted. Cancelling the job.") 206 raise 207 else: 208 self.status = "job_submitter_error" 209