• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2022 Collabora Limited
4# Author: Guilherme Gallo <guilherme.gallo@collabora.com>
5#
6# SPDX-License-Identifier: MIT
7
8"""
9Some utilities to analyse logs, create gitlab sections and other quality of life
10improvements
11"""
12
13import logging
14import re
15import sys
16from dataclasses import dataclass, field
17from datetime import datetime, timedelta, UTC
18from typing import Optional, Union
19
20from lava.exceptions import MesaCITimeoutError
21from lava.utils.console_format import CONSOLE_LOG
22from lava.utils.gitlab_section import GitlabSection
23from lava.utils.lava_farm import LavaFarm, get_lava_farm
24from lava.utils.lava_log_hints import LAVALogHints
25from lava.utils.log_section import (
26    DEFAULT_GITLAB_SECTION_TIMEOUTS,
27    FALLBACK_GITLAB_SECTION_TIMEOUT,
28    LOG_SECTIONS,
29    LogSectionType,
30)
31
32
33@dataclass
34class LogFollower:
35    starting_section: Optional[GitlabSection] = None
36    main_test_case: Optional[str] = None
37    timestamp_relative_to: Optional[datetime] = None
38    _current_section: Optional[GitlabSection] = None
39    section_history: list[GitlabSection] = field(default_factory=list, init=False)
40    timeout_durations: dict[LogSectionType, timedelta] = field(
41        default_factory=lambda: DEFAULT_GITLAB_SECTION_TIMEOUTS,
42    )
43    fallback_timeout: timedelta = FALLBACK_GITLAB_SECTION_TIMEOUT
44    _buffer: list[str] = field(default_factory=list, init=False)
45    log_hints: LAVALogHints = field(init=False)
46    lava_farm: LavaFarm = field(init=False, default=get_lava_farm())
47    _merge_next_line: str = field(default_factory=str, init=False)
48
49    def __post_init__(self):
50        # Make it trigger current_section setter to populate section history
51        self.current_section = self.starting_section
52        section_is_created = bool(self._current_section)
53        section_has_started = bool(
54            self._current_section and self._current_section.has_started
55        )
56        self.log_hints = LAVALogHints(self)
57        assert (
58            section_is_created == section_has_started
59        ), "Can't follow logs beginning from uninitialized GitLab sections."
60
61        # Initialize fix_lava_gitlab_section_log generator
62        self.gl_section_fix_gen = fix_lava_gitlab_section_log()
63        next(self.gl_section_fix_gen)
64
65    @property
66    def current_section(self):
67        return self._current_section
68
69    @current_section.setter
70    def current_section(self, new_section: GitlabSection) -> None:
71        if old_section := self._current_section:
72            self.section_history.append(old_section)
73        self._current_section = new_section
74
75    @property
76    def phase(self) -> LogSectionType:
77        return (
78            self._current_section.type
79            if self._current_section
80            else LogSectionType.UNKNOWN
81        )
82
83    def __enter__(self):
84        return self
85
86    def __exit__(self, exc_type, exc_val, exc_tb):
87        """Cleanup existing buffer if this object gets out from the context"""
88        self.clear_current_section()
89        last_lines = self.flush()
90        for line in last_lines:
91            print(line)
92
93    def watchdog(self):
94        if not self._current_section:
95            return
96
97        timeout_duration = self.timeout_durations.get(
98            self._current_section.type, self.fallback_timeout
99        )
100
101        if self._current_section.delta_time() > timeout_duration:
102            raise MesaCITimeoutError(
103                f"Gitlab Section {self._current_section} has timed out",
104                timeout_duration=timeout_duration,
105            )
106
107    def clear_current_section(self):
108        if self._current_section and not self._current_section.has_finished:
109            self._buffer.append(self._current_section.end())
110            self.current_section = None
111
112    def update_section(self, new_section: GitlabSection):
113        # Sections can have redundant regex to find them to mitigate LAVA
114        # interleaving kmsg and stderr/stdout issue.
115        if self.current_section and self.current_section.id == new_section.id:
116            return
117        self.clear_current_section()
118        self.current_section = new_section
119        self._buffer.append(new_section.start())
120
121    def manage_gl_sections(self, line):
122        if isinstance(line["msg"], list):
123            logging.debug("Ignoring messages as list. Kernel dumps.")
124            return
125
126        for log_section in LOG_SECTIONS:
127            if new_section := log_section.from_log_line_to_section(
128                line, self.main_test_case, self.timestamp_relative_to
129            ):
130                self.update_section(new_section)
131                break
132
133    def detect_kernel_dump_line(self, line: dict[str, Union[str, list]]) -> bool:
134        # line["msg"] can be a list[str] when there is a kernel dump
135        if isinstance(line["msg"], list):
136            return line["lvl"] == "debug"
137
138        # result level has dict line["msg"]
139        if not isinstance(line["msg"], str):
140            return False
141
142        # we have a line, check if it is a kernel message
143        if re.search(r"\[[\d\s]{5}\.[\d\s]{6}\] +\S{2,}", line["msg"]):
144            print_log(f"{CONSOLE_LOG['BOLD']}{line['msg']}{CONSOLE_LOG['RESET']}")
145            return True
146
147        return False
148
149    def remove_trailing_whitespace(self, line: dict[str, str]) -> None:
150        """
151        Removes trailing whitespace from the end of the `msg` value in the log line dictionary.
152
153        Args:
154            line: A dictionary representing a single log line.
155
156        Note:
157            LAVA treats carriage return characters as a line break, so each carriage return in an output console
158            is mapped to a console line in LAVA. This method removes trailing `\r\n` characters from log lines.
159        """
160        msg: Optional[str] = line.get("msg")
161        if not msg:
162            return False
163
164        messages = [msg] if isinstance(msg, str) else msg
165
166        for message in messages:
167            # LAVA logs brings raw messages, which includes newlines characters as \r\n.
168            line["msg"]: str = re.sub(r"\r\n$", "", message)
169
170    def merge_carriage_return_lines(self, line: dict[str, str]) -> bool:
171        """
172        Merges lines that end with a carriage return character into a single line.
173
174        Args:
175            line: A dictionary representing a single log line.
176
177        Returns:
178            A boolean indicating whether the current line has been merged with the next line.
179
180        Note:
181            LAVA treats carriage return characters as a line break, so each carriage return in an output console
182            is mapped to a console line in LAVA.
183        """
184        if line["msg"].endswith("\r"):
185            self._merge_next_line += line["msg"]
186            return True
187
188        if self._merge_next_line:
189            line["msg"] = self._merge_next_line + line["msg"]
190            self._merge_next_line = ""
191
192        return False
193
194    def ignore_dut_feedback_lines(self, line: dict[str, str]) -> bool:
195        """
196        Ignores feedback lines from LAVA.
197        If we only receive this level of message for some time, it means that the job is
198        misbehaving. E.g Rebooting.
199
200        Args:
201            line: A dictionary representing a single log line.
202
203        Returns:
204            A boolean indicating whether the current line is a feedback line.
205        """
206        if line["lvl"] == "feedback" and line["ns"] == "dut":
207            return True
208        if line["lvl"] == "debug":
209            # This message happens after LAVA end receiving the feedback from the DUT
210            if line["msg"] == "Listened to connection for namespace 'dut' done":
211                return True
212        return False
213
214    def feed(self, new_lines: list[dict[str, str]]) -> bool:
215        """Input data to be processed by LogFollower instance
216        Returns true if the DUT (device under test) seems to be alive.
217        """
218
219        self.watchdog()
220
221        # No signal of job health in the log
222        is_job_healthy = False
223
224        for line in new_lines:
225            self.remove_trailing_whitespace(line)
226
227            if self.detect_kernel_dump_line(line):
228                continue
229
230            if self.merge_carriage_return_lines(line):
231                continue
232
233            if self.ignore_dut_feedback_lines(line):
234                continue
235
236            # At least we are fed with a non-kernel dump log, it seems that the
237            # job is progressing
238            is_job_healthy = True
239            self.manage_gl_sections(line)
240            if parsed_line := self.parse_lava_line(line):
241                self._buffer.append(parsed_line)
242
243        self.log_hints.detect_failure(new_lines)
244
245        return is_job_healthy
246
247    def flush(self) -> list[str]:
248        buffer = self._buffer
249        self._buffer = []
250        return buffer
251
252    def parse_lava_line(self, line) -> Optional[str]:
253        prefix = ""
254        suffix = ""
255
256        if line["lvl"] in ["results", "feedback", "debug"]:
257            return
258        elif line["lvl"] in ["warning", "error"]:
259            prefix = CONSOLE_LOG["FG_BOLD_RED"]
260            suffix = CONSOLE_LOG["RESET"]
261        elif line["lvl"] == "input":
262            prefix = "$ "
263            suffix = ""
264        elif line["lvl"] == "target" and self.lava_farm != LavaFarm.COLLABORA:
265            # gl_section_fix_gen will output the stored line if it can't find a
266            # match for the first split line
267            # So we can recover it and put it back to the buffer
268            if recovered_first_line := self.gl_section_fix_gen.send(line):
269                self._buffer.append(recovered_first_line)
270
271        return f'{prefix}{line["msg"]}{suffix}'
272
273def fix_lava_gitlab_section_log():
274    """This function is a temporary solution for the Gitlab section markers
275    splitting problem. Gitlab parses the following lines to define a collapsible
276    gitlab section in their log:
277    - \x1b[0Ksection_start:timestamp:section_id[collapsible=true/false]\r\x1b[0Ksection_header
278    - \x1b[0Ksection_end:timestamp:section_id\r\x1b[0K
279    There is some problem in message passing between the LAVA dispatcher and the
280    device under test (DUT), that replaces \r control characters into \n. When
281    this problem is fixed on the LAVA side, one should remove this function.
282    """
283    while True:
284        line = yield False
285        first_line = None
286        split_line_pattern = re.compile(r"\x1b\[0K(section_\w+):(\d+):([^\s\r]+)$")
287        second_line_pattern = re.compile(r"\x1b\[0K([\S ]+)?")
288
289        if not re.search(split_line_pattern, line["msg"]):
290            continue
291
292        first_line = line["msg"]
293        # Delete the current line and hold this log line stream to be able to
294        # possibly merge it with the next line.
295        line["msg"] = ""
296        line = yield False
297
298        # This code reached when we detect a possible first split line
299        if re.search(second_line_pattern, line["msg"]):
300            assert first_line
301            line["msg"] = f"{first_line}\r{line['msg']}"
302        else:
303            # The current line doesn't match with the previous one, send back the
304            # latter to give the user the chance to recover it.
305            yield first_line
306
307
308
309def print_log(msg: str, *args) -> None:
310    # Reset color from timestamp, since `msg` can tint the terminal color
311    ts = datetime.now(tz=UTC)
312    ts_str = f"{ts.hour:02}:{ts.minute:02}:{ts.second:02}.{int(ts.microsecond / 1000):03}"
313    print(f"{CONSOLE_LOG['RESET']}{ts_str}: {msg}", *args)
314
315
316def fatal_err(msg, exception=None):
317    colored_msg = f"{CONSOLE_LOG['FG_BOLD_RED']}"
318    print_log(colored_msg, f"{msg}", f"{CONSOLE_LOG['RESET']}")
319    if exception:
320        raise exception
321    sys.exit(1)
322
323
324def hide_sensitive_data(yaml_data: str, start_hide: str = "HIDE_START", end_hide: str = "HIDE_END") -> str:
325    skip_line = False
326    dump_data: list[str] = []
327    for line in yaml_data.splitlines(True):
328        if start_hide in line:
329            skip_line = True
330        elif end_hide in line:
331            skip_line = False
332
333        if skip_line:
334            continue
335
336        dump_data.append(line)
337
338    return "".join(dump_data)
339