1import re 2import xmlrpc 3from collections import defaultdict 4from datetime import datetime 5from typing import Any, Optional 6 7from lava.exceptions import ( 8 MesaCIException, 9 MesaCIKnownIssueException, 10 MesaCIParseException, 11 MesaCITimeoutError, 12) 13from lava.utils import CONSOLE_LOG 14from lava.utils.log_follower import print_log 15from lavacli.utils import flow_yaml as lava_yaml 16 17from .lava_proxy import call_proxy 18 19 20class LAVAJob: 21 COLOR_STATUS_MAP: dict[str, str] = { 22 "pass": CONSOLE_LOG["FG_GREEN"], 23 "hung": CONSOLE_LOG["FG_YELLOW"], 24 "fail": CONSOLE_LOG["FG_RED"], 25 "canceled": CONSOLE_LOG["FG_MAGENTA"], 26 } 27 28 def __init__(self, proxy, definition, log=defaultdict(str)) -> None: 29 self._job_id = None 30 self.proxy = proxy 31 self.definition = definition 32 self.last_log_line = 0 33 self.last_log_time = None 34 self._is_finished = False 35 self.log: dict[str, Any] = log 36 self.status = "not_submitted" 37 self.__exception: Optional[str] = None 38 39 def heartbeat(self) -> None: 40 self.last_log_time: datetime = datetime.now() 41 self.status = "running" 42 43 @property 44 def status(self) -> str: 45 return self._status 46 47 @status.setter 48 def status(self, new_status: str) -> None: 49 self._status = new_status 50 self.log["status"] = self._status 51 52 @property 53 def job_id(self) -> int: 54 return self._job_id 55 56 @job_id.setter 57 def job_id(self, new_id: int) -> None: 58 self._job_id = new_id 59 self.log["lava_job_id"] = self._job_id 60 61 @property 62 def is_finished(self) -> bool: 63 return self._is_finished 64 65 @property 66 def exception(self) -> str: 67 return self.__exception 68 69 @exception.setter 70 def exception(self, exception: Exception) -> None: 71 self.__exception = repr(exception) 72 self.log["dut_job_fail_reason"] = self.__exception 73 74 def validate(self) -> Optional[dict]: 75 """Returns a dict with errors, if the validation fails. 76 77 Returns: 78 Optional[dict]: a dict with the validation errors, if any 79 """ 80 return call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True) 81 82 def show(self) -> dict[str, str]: 83 return call_proxy(self.proxy.scheduler.jobs.show, self._job_id) 84 85 def get_lava_time(self, key, data) -> Optional[str]: 86 return data[key].value if data[key] else None 87 88 def refresh_log(self) -> None: 89 details = self.show() 90 self.log["dut_start_time"] = self.get_lava_time("start_time", details) 91 self.log["dut_submit_time"] = self.get_lava_time("submit_time", details) 92 self.log["dut_end_time"] = self.get_lava_time("end_time", details) 93 self.log["dut_name"] = details.get("device") 94 self.log["dut_state"] = details.get("state") 95 96 def submit(self) -> bool: 97 try: 98 self.job_id = call_proxy(self.proxy.scheduler.jobs.submit, self.definition) 99 self.status = "submitted" 100 self.refresh_log() 101 except MesaCIException: 102 return False 103 return True 104 105 def lava_state(self) -> str: 106 job_state: dict[str, str] = call_proxy( 107 self.proxy.scheduler.job_state, self._job_id 108 ) 109 return job_state["job_state"] 110 111 def cancel(self): 112 if self._job_id: 113 self.proxy.scheduler.jobs.cancel(self._job_id) 114 # If we don't have yet set another job's status, let's update it 115 # with canceled one 116 if self.status == "running": 117 self.status = "canceled" 118 119 def is_started(self) -> bool: 120 waiting_states = ("Submitted", "Scheduling", "Scheduled") 121 return self.lava_state() not in waiting_states 122 123 def is_post_processed(self) -> bool: 124 return self.lava_state() != "Running" 125 126 def _load_log_from_data(self, data) -> list[str]: 127 lines = [] 128 if isinstance(data, xmlrpc.client.Binary): 129 # We are dealing with xmlrpc.client.Binary 130 # Let's extract the data 131 data = data.data 132 # When there is no new log data, the YAML is empty 133 if loaded_lines := lava_yaml.load(data): 134 lines: list[str] = loaded_lines 135 self.last_log_line += len(lines) 136 return lines 137 138 def get_logs(self) -> list[str]: 139 try: 140 (finished, data) = call_proxy( 141 self.proxy.scheduler.jobs.logs, self._job_id, self.last_log_line 142 ) 143 self._is_finished = finished 144 return self._load_log_from_data(data) 145 146 except Exception as mesa_ci_err: 147 raise MesaCIParseException( 148 f"Could not get LAVA job logs. Reason: {mesa_ci_err}" 149 ) from mesa_ci_err 150 151 def parse_job_result_from_log( 152 self, lava_lines: list[dict[str, str]] 153 ) -> list[dict[str, str]]: 154 """Use the console log to catch if the job has completed successfully or 155 not. Returns the list of log lines until the result line.""" 156 157 last_line = None # Print all lines. lines[:None] == lines[:] 158 159 for idx, line in enumerate(lava_lines): 160 if result := re.search(r"hwci: mesa: (pass|fail)", line): 161 self._is_finished = True 162 self.status = result[1] 163 164 last_line = idx + 1 165 # We reached the log end here. hwci script has finished. 166 break 167 return lava_lines[:last_line] 168 169 def handle_exception(self, exception: Exception): 170 print_log(exception) 171 self.cancel() 172 self.exception = exception 173 174 # Give more accurate status depending on exception 175 if isinstance(exception, MesaCIKnownIssueException): 176 self.status = "canceled" 177 elif isinstance(exception, MesaCITimeoutError): 178 self.status = "hung" 179 elif isinstance(exception, MesaCIException): 180 self.status = "failed" 181 elif isinstance(exception, KeyboardInterrupt): 182 self.status = "interrupted" 183 print_log("LAVA job submitter was interrupted. Cancelling the job.") 184 raise 185 else: 186 self.status = "job_submitter_error" 187