1#!/usr/bin/env python3 2# 3# Copyright (C) 2022 Collabora Limited 4# Author: Guilherme Gallo <guilherme.gallo@collabora.com> 5# 6# SPDX-License-Identifier: MIT 7 8""" 9Some utilities to analyse logs, create gitlab sections and other quality of life 10improvements 11""" 12 13import logging 14import re 15import sys 16from dataclasses import dataclass, field 17from datetime import datetime, timedelta 18from typing import Optional, Union 19 20from lava.exceptions import MesaCITimeoutError 21from lava.utils.console_format import CONSOLE_LOG 22from lava.utils.gitlab_section import GitlabSection 23from lava.utils.lava_farm import LavaFarm, get_lava_farm 24from lava.utils.lava_log_hints import LAVALogHints 25from lava.utils.log_section import ( 26 DEFAULT_GITLAB_SECTION_TIMEOUTS, 27 FALLBACK_GITLAB_SECTION_TIMEOUT, 28 LOG_SECTIONS, 29 LogSectionType, 30) 31 32 33@dataclass 34class LogFollower: 35 starting_section: Optional[GitlabSection] = None 36 _current_section: Optional[GitlabSection] = None 37 section_history: list[GitlabSection] = field(default_factory=list, init=False) 38 timeout_durations: dict[LogSectionType, timedelta] = field( 39 default_factory=lambda: DEFAULT_GITLAB_SECTION_TIMEOUTS, 40 ) 41 fallback_timeout: timedelta = FALLBACK_GITLAB_SECTION_TIMEOUT 42 _buffer: list[str] = field(default_factory=list, init=False) 43 log_hints: LAVALogHints = field(init=False) 44 lava_farm: LavaFarm = field(init=False, default=get_lava_farm()) 45 _merge_next_line: str = field(default_factory=str, init=False) 46 47 def __post_init__(self): 48 # Make it trigger current_section setter to populate section history 49 self.current_section = self.starting_section 50 section_is_created = bool(self._current_section) 51 section_has_started = bool( 52 self._current_section and self._current_section.has_started 53 ) 54 self.log_hints = LAVALogHints(self) 55 assert ( 56 section_is_created == section_has_started 57 ), "Can't follow logs beginning from uninitialized GitLab sections." 58 59 # Initialize fix_lava_gitlab_section_log generator 60 self.gl_section_fix_gen = fix_lava_gitlab_section_log() 61 next(self.gl_section_fix_gen) 62 63 @property 64 def current_section(self): 65 return self._current_section 66 67 @current_section.setter 68 def current_section(self, new_section: GitlabSection) -> None: 69 if old_section := self._current_section: 70 self.section_history.append(old_section) 71 self._current_section = new_section 72 73 @property 74 def phase(self) -> LogSectionType: 75 return ( 76 self._current_section.type 77 if self._current_section 78 else LogSectionType.UNKNOWN 79 ) 80 81 def __enter__(self): 82 return self 83 84 def __exit__(self, exc_type, exc_val, exc_tb): 85 """Cleanup existing buffer if this object gets out from the context""" 86 self.clear_current_section() 87 last_lines = self.flush() 88 for line in last_lines: 89 print(line) 90 91 def watchdog(self): 92 if not self._current_section: 93 return 94 95 timeout_duration = self.timeout_durations.get( 96 self._current_section.type, self.fallback_timeout 97 ) 98 99 if self._current_section.delta_time() > timeout_duration: 100 raise MesaCITimeoutError( 101 f"Gitlab Section {self._current_section} has timed out", 102 timeout_duration=timeout_duration, 103 ) 104 105 def clear_current_section(self): 106 if self._current_section and not self._current_section.has_finished: 107 self._buffer.append(self._current_section.end()) 108 self.current_section = None 109 110 def update_section(self, new_section: GitlabSection): 111 # Sections can have redundant regex to find them to mitigate LAVA 112 # interleaving kmsg and stderr/stdout issue. 113 if self.current_section and self.current_section.id == new_section.id: 114 return 115 self.clear_current_section() 116 self.current_section = new_section 117 self._buffer.append(new_section.start()) 118 119 def manage_gl_sections(self, line): 120 if isinstance(line["msg"], list): 121 logging.debug("Ignoring messages as list. Kernel dumps.") 122 return 123 124 for log_section in LOG_SECTIONS: 125 if new_section := log_section.from_log_line_to_section(line): 126 self.update_section(new_section) 127 break 128 129 def detect_kernel_dump_line(self, line: dict[str, Union[str, list]]) -> bool: 130 # line["msg"] can be a list[str] when there is a kernel dump 131 if isinstance(line["msg"], list): 132 return line["lvl"] == "debug" 133 134 # result level has dict line["msg"] 135 if not isinstance(line["msg"], str): 136 return False 137 138 # we have a line, check if it is a kernel message 139 if re.search(r"\[[\d\s]{5}\.[\d\s]{6}\] +\S{2,}", line["msg"]): 140 print_log(f"{CONSOLE_LOG['BOLD']}{line['msg']}{CONSOLE_LOG['RESET']}") 141 return True 142 143 return False 144 145 def remove_trailing_whitespace(self, line: dict[str, str]) -> None: 146 """ 147 Removes trailing whitespace from the end of the `msg` value in the log line dictionary. 148 149 Args: 150 line: A dictionary representing a single log line. 151 152 Note: 153 LAVA treats carriage return characters as a line break, so each carriage return in an output console 154 is mapped to a console line in LAVA. This method removes trailing `\r\n` characters from log lines. 155 """ 156 msg: Optional[str] = line.get("msg") 157 if not msg: 158 return False 159 160 messages = [msg] if isinstance(msg, str) else msg 161 162 for message in messages: 163 # LAVA logs brings raw messages, which includes newlines characters as \r\n. 164 line["msg"]: str = re.sub(r"\r\n$", "", message) 165 166 def merge_carriage_return_lines(self, line: dict[str, str]) -> bool: 167 """ 168 Merges lines that end with a carriage return character into a single line. 169 170 Args: 171 line: A dictionary representing a single log line. 172 173 Returns: 174 A boolean indicating whether the current line has been merged with the next line. 175 176 Note: 177 LAVA treats carriage return characters as a line break, so each carriage return in an output console 178 is mapped to a console line in LAVA. 179 """ 180 if line["msg"].endswith("\r"): 181 self._merge_next_line += line["msg"] 182 return True 183 184 if self._merge_next_line: 185 line["msg"] = self._merge_next_line + line["msg"] 186 self._merge_next_line = "" 187 188 return False 189 190 def ignore_dut_feedback_lines(self, line: dict[str, str]) -> bool: 191 """ 192 Ignores feedback lines from LAVA. 193 If we only receive this level of message for some time, it means that the job is 194 misbehaving. E.g Rebooting. 195 196 Args: 197 line: A dictionary representing a single log line. 198 199 Returns: 200 A boolean indicating whether the current line is a feedback line. 201 """ 202 if line["lvl"] == "feedback" and line["ns"] == "dut": 203 return True 204 if line["lvl"] == "debug": 205 # This message happens after LAVA end receiving the feedback from the DUT 206 if line["msg"] == "Listened to connection for namespace 'dut' done": 207 return True 208 return False 209 210 def feed(self, new_lines: list[dict[str, str]]) -> bool: 211 """Input data to be processed by LogFollower instance 212 Returns true if the DUT (device under test) seems to be alive. 213 """ 214 215 self.watchdog() 216 217 # No signal of job health in the log 218 is_job_healthy = False 219 220 for line in new_lines: 221 self.remove_trailing_whitespace(line) 222 223 if self.detect_kernel_dump_line(line): 224 continue 225 226 if self.merge_carriage_return_lines(line): 227 continue 228 229 if self.ignore_dut_feedback_lines(line): 230 continue 231 232 # At least we are fed with a non-kernel dump log, it seems that the 233 # job is progressing 234 is_job_healthy = True 235 self.manage_gl_sections(line) 236 if parsed_line := self.parse_lava_line(line): 237 self._buffer.append(parsed_line) 238 239 self.log_hints.detect_failure(new_lines) 240 241 return is_job_healthy 242 243 def flush(self) -> list[str]: 244 buffer = self._buffer 245 self._buffer = [] 246 return buffer 247 248 def parse_lava_line(self, line) -> Optional[str]: 249 prefix = "" 250 suffix = "" 251 252 if line["lvl"] in ["results", "feedback", "debug"]: 253 return 254 elif line["lvl"] in ["warning", "error"]: 255 prefix = CONSOLE_LOG["FG_RED"] 256 suffix = CONSOLE_LOG["RESET"] 257 elif line["lvl"] == "input": 258 prefix = "$ " 259 suffix = "" 260 elif line["lvl"] == "target" and self.lava_farm != LavaFarm.COLLABORA: 261 # gl_section_fix_gen will output the stored line if it can't find a 262 # match for the first split line 263 # So we can recover it and put it back to the buffer 264 if recovered_first_line := self.gl_section_fix_gen.send(line): 265 self._buffer.append(recovered_first_line) 266 267 return f'{prefix}{line["msg"]}{suffix}' 268 269def fix_lava_gitlab_section_log(): 270 """This function is a temporary solution for the Gitlab section markers 271 splitting problem. Gitlab parses the following lines to define a collapsible 272 gitlab section in their log: 273 - \x1b[0Ksection_start:timestamp:section_id[collapsible=true/false]\r\x1b[0Ksection_header 274 - \x1b[0Ksection_end:timestamp:section_id\r\x1b[0K 275 There is some problem in message passing between the LAVA dispatcher and the 276 device under test (DUT), that replaces \r control characters into \n. When 277 this problem is fixed on the LAVA side, one should remove this function. 278 """ 279 while True: 280 line = yield False 281 first_line = None 282 split_line_pattern = re.compile(r"\x1b\[0K(section_\w+):(\d+):([^\s\r]+)$") 283 second_line_pattern = re.compile(r"\x1b\[0K([\S ]+)?") 284 285 if not re.search(split_line_pattern, line["msg"]): 286 continue 287 288 first_line = line["msg"] 289 # Delete the current line and hold this log line stream to be able to 290 # possibly merge it with the next line. 291 line["msg"] = "" 292 line = yield False 293 294 # This code reached when we detect a possible first split line 295 if re.search(second_line_pattern, line["msg"]): 296 assert first_line 297 line["msg"] = f"{first_line}\r{line['msg']}" 298 else: 299 # The current line doesn't match with the previous one, send back the 300 # latter to give the user the chance to recover it. 301 yield first_line 302 303 304 305def print_log(msg: str, *args) -> None: 306 # Reset color from timestamp, since `msg` can tint the terminal color 307 print(f"{CONSOLE_LOG['RESET']}{datetime.now()}: {msg}", *args) 308 309 310def fatal_err(msg, exception=None): 311 colored_msg = f"{CONSOLE_LOG['FG_RED']}" 312 print_log(colored_msg, f"{msg}", f"{CONSOLE_LOG['RESET']}") 313 if exception: 314 raise exception 315 sys.exit(1) 316 317 318def hide_sensitive_data(yaml_data: str, start_hide: str = "HIDE_START", end_hide: str = "HIDE_END") -> str: 319 skip_line = False 320 dump_data: list[str] = [] 321 for line in yaml_data.splitlines(True): 322 if start_hide in line: 323 skip_line = True 324 elif end_hide in line: 325 skip_line = False 326 327 if skip_line: 328 continue 329 330 dump_data.append(line) 331 332 return "".join(dump_data) 333