• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from contextlib import nullcontext as does_not_raise
2from datetime import UTC, datetime
3from io import StringIO
4from itertools import cycle
5from typing import Any, Callable, Generator, Iterable, Optional, Tuple, Union
6
7from freezegun import freeze_time
8from lava.utils.log_section import (
9    DEFAULT_GITLAB_SECTION_TIMEOUTS,
10    FALLBACK_GITLAB_SECTION_TIMEOUT,
11    LogSectionType,
12)
13from lavacli.utils import flow_yaml as lava_yaml
14
15
16def yaml_dump(data: dict[str, Any]) -> str:
17    stream = StringIO()
18    lava_yaml.dump(data, stream)
19    return stream.getvalue()
20
21
22def section_timeout(section_type: LogSectionType) -> int:
23    return int(
24        DEFAULT_GITLAB_SECTION_TIMEOUTS.get(
25            section_type, FALLBACK_GITLAB_SECTION_TIMEOUT
26        ).total_seconds()
27    )
28
29
30def create_lava_yaml_msg(
31    dt: Callable = datetime.now, msg="test", lvl="target"
32) -> dict[str, str]:
33    return {"dt": str(dt()), "msg": msg, "lvl": lvl}
34
35
36def generate_testsuite_result(
37    name="test-mesa-ci", result="pass", exit_code=0, metadata_extra=None, extra=None
38):
39    if metadata_extra is None:
40        metadata_extra = {}
41    if extra is None:
42        extra = {}
43    return {"metadata": {"result": result, "exit_code": exit_code, **metadata_extra}, "name": name}
44
45
46def jobs_logs_response(
47    finished=False, msg=None, lvl="target", result=None, exit_code=None
48) -> Tuple[bool, str]:
49    timed_msg = {"dt": str(datetime.now(tz=UTC)), "msg": "New message", "lvl": lvl}
50    if result:
51        timed_msg["lvl"] = "target"
52        timed_msg["msg"] = f"hwci: mesa: {result}, exit_code: {exit_code}"
53
54    logs = [timed_msg] if msg is None else msg
55
56    return finished, yaml_dump(logs)
57
58
59def section_aware_message_generator(
60    messages: dict[LogSectionType,
61    Iterable[int]],
62    result: Optional[str] = None,
63    exit_code: Optional[int] = None
64) -> Iterable[tuple[dict, Iterable[int]]]:
65    default = [1]
66
67    result_message_section = LogSectionType.TEST_CASE
68
69    for section_type in LogSectionType:
70        delay = messages.get(section_type, default)
71        yield mock_lava_signal(section_type), delay
72        if result and section_type == result_message_section:
73            # To consider the job finished, the result `echo` should be produced
74            # in the correct section
75            yield create_lava_yaml_msg(msg=f"hwci: mesa: {result}, exit_code: {exit_code}"), delay
76
77
78def message_generator():
79    for section_type in LogSectionType:
80        yield mock_lava_signal(section_type)
81
82
83def level_generator():
84    # Tests all known levels by default
85    yield from cycle(("results", "feedback", "warning", "error", "debug", "target"))
86
87
88def generate_n_logs(
89    n=1,
90    tick_fn: Union[Generator, Iterable[int], int] = 1,
91    level_fn=level_generator,
92    result="pass",
93    exit_code=0,
94):
95    """Simulate a log partitionated in n components"""
96    level_gen = level_fn()
97
98    if isinstance(tick_fn, Generator):
99        tick_gen = tick_fn
100    elif isinstance(tick_fn, Iterable):
101        tick_gen = cycle(tick_fn)
102    else:
103        tick_gen = cycle((tick_fn,))
104
105    with freeze_time(datetime.now(tz=UTC)) as time_travel:
106        tick_sec: int = next(tick_gen)
107        while True:
108            # Simulate a scenario where the target job is waiting for being started
109            for _ in range(n - 1):
110                level: str = next(level_gen)
111
112                time_travel.tick(tick_sec)
113                yield jobs_logs_response(finished=False, msg=[], lvl=level)
114
115            time_travel.tick(tick_sec)
116            yield jobs_logs_response(finished=True, result=result, exit_code=exit_code)
117
118
119def to_iterable(tick_fn):
120    if isinstance(tick_fn, Generator):
121        return tick_fn
122    elif isinstance(tick_fn, Iterable):
123        return cycle(tick_fn)
124    else:
125        return cycle((tick_fn,))
126
127
128def mock_logs(messages=None, result=None, exit_code=None):
129    if messages is None:
130        messages = {}
131    with freeze_time(datetime.now(tz=UTC)) as time_travel:
132        # Simulate a complete run given by message_fn
133        for msg, tick_list in section_aware_message_generator(messages, result, exit_code):
134            for tick_sec in tick_list:
135                yield jobs_logs_response(finished=False, msg=[msg])
136                time_travel.tick(tick_sec)
137
138
139def mock_lava_signal(type: LogSectionType) -> dict[str, str]:
140    return {
141        LogSectionType.TEST_CASE: create_lava_yaml_msg(
142            msg="<STARTTC> case", lvl="debug"
143        ),
144        LogSectionType.TEST_SUITE: create_lava_yaml_msg(
145            msg="<STARTRUN> suite", lvl="debug"
146        ),
147        LogSectionType.LAVA_POST_PROCESSING: create_lava_yaml_msg(
148            msg="<LAVA_SIGNAL_ENDTC case>", lvl="target"
149        ),
150    }.get(type, create_lava_yaml_msg())
151