• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2022 Collabora Limited
4# Author: Guilherme Gallo <guilherme.gallo@collabora.com>
5#
6# SPDX-License-Identifier: MIT
7
8"""
9Some utilities to analyse logs, create gitlab sections and other quality of life
10improvements
11"""
12
13import logging
14import re
15import sys
16from dataclasses import dataclass, field
17from datetime import datetime, timedelta
18from typing import Optional, Union
19
20from lava.exceptions import MesaCITimeoutError
21from lava.utils.console_format import CONSOLE_LOG
22from lava.utils.gitlab_section import GitlabSection
23from lava.utils.lava_farm import LavaFarm, get_lava_farm
24from lava.utils.lava_log_hints import LAVALogHints
25from lava.utils.log_section import (
26    DEFAULT_GITLAB_SECTION_TIMEOUTS,
27    FALLBACK_GITLAB_SECTION_TIMEOUT,
28    LOG_SECTIONS,
29    LogSectionType,
30)
31
32
33@dataclass
34class LogFollower:
35    starting_section: Optional[GitlabSection] = None
36    _current_section: Optional[GitlabSection] = None
37    section_history: list[GitlabSection] = field(default_factory=list, init=False)
38    timeout_durations: dict[LogSectionType, timedelta] = field(
39        default_factory=lambda: DEFAULT_GITLAB_SECTION_TIMEOUTS,
40    )
41    fallback_timeout: timedelta = FALLBACK_GITLAB_SECTION_TIMEOUT
42    _buffer: list[str] = field(default_factory=list, init=False)
43    log_hints: LAVALogHints = field(init=False)
44    lava_farm: LavaFarm = field(init=False, default=get_lava_farm())
45    _merge_next_line: str = field(default_factory=str, init=False)
46
47    def __post_init__(self):
48        # Make it trigger current_section setter to populate section history
49        self.current_section = self.starting_section
50        section_is_created = bool(self._current_section)
51        section_has_started = bool(
52            self._current_section and self._current_section.has_started
53        )
54        self.log_hints = LAVALogHints(self)
55        assert (
56            section_is_created == section_has_started
57        ), "Can't follow logs beginning from uninitialized GitLab sections."
58
59        # Initialize fix_lava_gitlab_section_log generator
60        self.gl_section_fix_gen = fix_lava_gitlab_section_log()
61        next(self.gl_section_fix_gen)
62
63    @property
64    def current_section(self):
65        return self._current_section
66
67    @current_section.setter
68    def current_section(self, new_section: GitlabSection) -> None:
69        if old_section := self._current_section:
70            self.section_history.append(old_section)
71        self._current_section = new_section
72
73    @property
74    def phase(self) -> LogSectionType:
75        return (
76            self._current_section.type
77            if self._current_section
78            else LogSectionType.UNKNOWN
79        )
80
81    def __enter__(self):
82        return self
83
84    def __exit__(self, exc_type, exc_val, exc_tb):
85        """Cleanup existing buffer if this object gets out from the context"""
86        self.clear_current_section()
87        last_lines = self.flush()
88        for line in last_lines:
89            print(line)
90
91    def watchdog(self):
92        if not self._current_section:
93            return
94
95        timeout_duration = self.timeout_durations.get(
96            self._current_section.type, self.fallback_timeout
97        )
98
99        if self._current_section.delta_time() > timeout_duration:
100            raise MesaCITimeoutError(
101                f"Gitlab Section {self._current_section} has timed out",
102                timeout_duration=timeout_duration,
103            )
104
105    def clear_current_section(self):
106        if self._current_section and not self._current_section.has_finished:
107            self._buffer.append(self._current_section.end())
108            self.current_section = None
109
110    def update_section(self, new_section: GitlabSection):
111        # Sections can have redundant regex to find them to mitigate LAVA
112        # interleaving kmsg and stderr/stdout issue.
113        if self.current_section and self.current_section.id == new_section.id:
114            return
115        self.clear_current_section()
116        self.current_section = new_section
117        self._buffer.append(new_section.start())
118
119    def manage_gl_sections(self, line):
120        if isinstance(line["msg"], list):
121            logging.debug("Ignoring messages as list. Kernel dumps.")
122            return
123
124        for log_section in LOG_SECTIONS:
125            if new_section := log_section.from_log_line_to_section(line):
126                self.update_section(new_section)
127                break
128
129    def detect_kernel_dump_line(self, line: dict[str, Union[str, list]]) -> bool:
130        # line["msg"] can be a list[str] when there is a kernel dump
131        if isinstance(line["msg"], list):
132            return line["lvl"] == "debug"
133
134        # result level has dict line["msg"]
135        if not isinstance(line["msg"], str):
136            return False
137
138        # we have a line, check if it is a kernel message
139        if re.search(r"\[[\d\s]{5}\.[\d\s]{6}\] +\S{2,}", line["msg"]):
140            print_log(f"{CONSOLE_LOG['BOLD']}{line['msg']}{CONSOLE_LOG['RESET']}")
141            return True
142
143        return False
144
145    def remove_trailing_whitespace(self, line: dict[str, str]) -> None:
146        """
147        Removes trailing whitespace from the end of the `msg` value in the log line dictionary.
148
149        Args:
150            line: A dictionary representing a single log line.
151
152        Note:
153            LAVA treats carriage return characters as a line break, so each carriage return in an output console
154            is mapped to a console line in LAVA. This method removes trailing `\r\n` characters from log lines.
155        """
156        msg: Optional[str] = line.get("msg")
157        if not msg:
158            return False
159
160        messages = [msg] if isinstance(msg, str) else msg
161
162        for message in messages:
163            # LAVA logs brings raw messages, which includes newlines characters as \r\n.
164            line["msg"]: str = re.sub(r"\r\n$", "", message)
165
166    def merge_carriage_return_lines(self, line: dict[str, str]) -> bool:
167        """
168        Merges lines that end with a carriage return character into a single line.
169
170        Args:
171            line: A dictionary representing a single log line.
172
173        Returns:
174            A boolean indicating whether the current line has been merged with the next line.
175
176        Note:
177            LAVA treats carriage return characters as a line break, so each carriage return in an output console
178            is mapped to a console line in LAVA.
179        """
180        if line["msg"].endswith("\r"):
181            self._merge_next_line += line["msg"]
182            return True
183
184        if self._merge_next_line:
185            line["msg"] = self._merge_next_line + line["msg"]
186            self._merge_next_line = ""
187
188        return False
189
190    def ignore_dut_feedback_lines(self, line: dict[str, str]) -> bool:
191        """
192        Ignores feedback lines from LAVA.
193        If we only receive this level of message for some time, it means that the job is
194        misbehaving. E.g Rebooting.
195
196        Args:
197            line: A dictionary representing a single log line.
198
199        Returns:
200            A boolean indicating whether the current line is a feedback line.
201        """
202        if line["lvl"] == "feedback" and line["ns"] == "dut":
203            return True
204        if line["lvl"] == "debug":
205            # This message happens after LAVA end receiving the feedback from the DUT
206            if line["msg"] == "Listened to connection for namespace 'dut' done":
207                return True
208        return False
209
210    def feed(self, new_lines: list[dict[str, str]]) -> bool:
211        """Input data to be processed by LogFollower instance
212        Returns true if the DUT (device under test) seems to be alive.
213        """
214
215        self.watchdog()
216
217        # No signal of job health in the log
218        is_job_healthy = False
219
220        for line in new_lines:
221            self.remove_trailing_whitespace(line)
222
223            if self.detect_kernel_dump_line(line):
224                continue
225
226            if self.merge_carriage_return_lines(line):
227                continue
228
229            if self.ignore_dut_feedback_lines(line):
230                continue
231
232            # At least we are fed with a non-kernel dump log, it seems that the
233            # job is progressing
234            is_job_healthy = True
235            self.manage_gl_sections(line)
236            if parsed_line := self.parse_lava_line(line):
237                self._buffer.append(parsed_line)
238
239        self.log_hints.detect_failure(new_lines)
240
241        return is_job_healthy
242
243    def flush(self) -> list[str]:
244        buffer = self._buffer
245        self._buffer = []
246        return buffer
247
248    def parse_lava_line(self, line) -> Optional[str]:
249        prefix = ""
250        suffix = ""
251
252        if line["lvl"] in ["results", "feedback", "debug"]:
253            return
254        elif line["lvl"] in ["warning", "error"]:
255            prefix = CONSOLE_LOG["FG_RED"]
256            suffix = CONSOLE_LOG["RESET"]
257        elif line["lvl"] == "input":
258            prefix = "$ "
259            suffix = ""
260        elif line["lvl"] == "target" and self.lava_farm != LavaFarm.COLLABORA:
261            # gl_section_fix_gen will output the stored line if it can't find a
262            # match for the first split line
263            # So we can recover it and put it back to the buffer
264            if recovered_first_line := self.gl_section_fix_gen.send(line):
265                self._buffer.append(recovered_first_line)
266
267        return f'{prefix}{line["msg"]}{suffix}'
268
269def fix_lava_gitlab_section_log():
270    """This function is a temporary solution for the Gitlab section markers
271    splitting problem. Gitlab parses the following lines to define a collapsible
272    gitlab section in their log:
273    - \x1b[0Ksection_start:timestamp:section_id[collapsible=true/false]\r\x1b[0Ksection_header
274    - \x1b[0Ksection_end:timestamp:section_id\r\x1b[0K
275    There is some problem in message passing between the LAVA dispatcher and the
276    device under test (DUT), that replaces \r control characters into \n. When
277    this problem is fixed on the LAVA side, one should remove this function.
278    """
279    while True:
280        line = yield False
281        first_line = None
282        split_line_pattern = re.compile(r"\x1b\[0K(section_\w+):(\d+):([^\s\r]+)$")
283        second_line_pattern = re.compile(r"\x1b\[0K([\S ]+)?")
284
285        if not re.search(split_line_pattern, line["msg"]):
286            continue
287
288        first_line = line["msg"]
289        # Delete the current line and hold this log line stream to be able to
290        # possibly merge it with the next line.
291        line["msg"] = ""
292        line = yield False
293
294        # This code reached when we detect a possible first split line
295        if re.search(second_line_pattern, line["msg"]):
296            assert first_line
297            line["msg"] = f"{first_line}\r{line['msg']}"
298        else:
299            # The current line doesn't match with the previous one, send back the
300            # latter to give the user the chance to recover it.
301            yield first_line
302
303
304
305def print_log(msg: str, *args) -> None:
306    # Reset color from timestamp, since `msg` can tint the terminal color
307    print(f"{CONSOLE_LOG['RESET']}{datetime.now()}: {msg}", *args)
308
309
310def fatal_err(msg, exception=None):
311    colored_msg = f"{CONSOLE_LOG['FG_RED']}"
312    print_log(colored_msg, f"{msg}", f"{CONSOLE_LOG['RESET']}")
313    if exception:
314        raise exception
315    sys.exit(1)
316
317
318def hide_sensitive_data(yaml_data: str, start_hide: str = "HIDE_START", end_hide: str = "HIDE_END") -> str:
319    skip_line = False
320    dump_data: list[str] = []
321    for line in yaml_data.splitlines(True):
322        if start_hide in line:
323            skip_line = True
324        elif end_hide in line:
325            skip_line = False
326
327        if skip_line:
328            continue
329
330        dump_data.append(line)
331
332    return "".join(dump_data)
333