• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2021-2025 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import logging
19import os
20import random
21import shutil
22import zipfile
23from enum import Enum
24from filecmp import cmp
25from itertools import tee
26from os import makedirs, path, remove
27from pathlib import Path
28from typing import TypeVar, Callable, Optional, Type, Union, Any, List, Iterator, Tuple, Iterable
29from urllib import request
30from urllib.error import URLError
31
32from runner.logger import Log
33
34_LOGGER = logging.getLogger("runner.utils")
35
36EnumT = TypeVar("EnumT", bound=Enum)
37
38
39def progress(block_num: int, block_size: int, total_size: int) -> None:
40    Log.summary(_LOGGER, f"Downloaded block: {block_num} ({block_size}). Total: {total_size}")
41
42
43def download(name: str, git_url: str, revision: str, target_path: str, show_progress: bool = False) -> None:
44    archive_file = path.join(path.sep, 'tmp', f'{name}.zip')
45    url_file = f'{git_url}/{revision}.zip'
46
47    Log.summary(_LOGGER, f"Downloading from {url_file} to {archive_file}")
48    try:
49        if show_progress:
50            request.urlretrieve(url_file, archive_file, progress)
51        else:
52            request.urlretrieve(url_file, archive_file)
53    except URLError:
54        Log.exception_and_raise(_LOGGER, f'Downloading {url_file} file failed.')
55
56    Log.summary(_LOGGER, f"Extracting archive {archive_file} to {target_path}")
57    if path.exists(target_path):
58        shutil.rmtree(target_path)
59
60    try:
61        with zipfile.ZipFile(archive_file) as arch:
62            arch.extractall(path.dirname(target_path))
63    except (zipfile.BadZipfile, zipfile.LargeZipFile):
64        Log.exception_and_raise(_LOGGER, f'Failed to unzip {archive_file} file')
65
66    remove(archive_file)
67
68
69ProcessCopy = Callable[[str, str], None]
70
71
72def generate(name: str, url: str, revision: str, generated_root: Path, *,
73             stamp_name: Optional[str] = None, test_subdir: str = "test", show_progress: bool = False,
74             process_copy: Optional[ProcessCopy] = None, force_download: bool = False) -> str:
75    Log.summary(_LOGGER, "Prepare test files")
76    stamp_name = f'{name}-{revision}' if not stamp_name else stamp_name
77    dest_path = path.join(generated_root, stamp_name)
78    makedirs(dest_path, exist_ok=True)
79    stamp_file = path.join(dest_path, f'{stamp_name}.stamp')
80
81    if not force_download and path.exists(stamp_file):
82        return dest_path
83
84    temp_path = path.join(path.sep, 'tmp', name, f'{name}-{revision}')
85
86    if force_download or not path.exists(temp_path):
87        download(name, url, revision, temp_path, show_progress)
88
89    if path.exists(dest_path):
90        shutil.rmtree(dest_path)
91
92    Log.summary(_LOGGER, "Copy and transform test files")
93    if process_copy is not None:
94        process_copy(path.join(temp_path, test_subdir), dest_path)
95    else:
96        copy(path.join(temp_path, test_subdir), dest_path)
97
98    Log.summary(_LOGGER, f"Create stamp file {stamp_file}")
99    with os.fdopen(os.open(stamp_file, os.O_RDWR | os.O_CREAT, 0o755),
100                   'w+', encoding="utf-8") as _:  # Create empty file-marker and close it at once
101        pass
102
103    return dest_path
104
105
106def copy(source_path: Union[Path, str], dest_path: Union[Path, str], remove_if_exist: bool = True) -> None:
107    try:
108        if source_path == dest_path:
109            return
110        if path.exists(dest_path) and remove_if_exist:
111            shutil.rmtree(dest_path)
112        shutil.copytree(source_path, dest_path, dirs_exist_ok=not remove_if_exist)
113        shutil.copymode(source_path, dest_path)
114    except OSError as ex:
115        Log.exception_and_raise(_LOGGER, str(ex))
116
117
118def read_file(file_path: Union[Path, str]) -> str:
119    with os.fdopen(os.open(file_path, os.O_RDONLY, 0o755), "r", encoding='utf8') as f_handle:
120        text = f_handle.read()
121    return text
122
123
124def write_2_file(file_path: Union[Path, str], content: str) -> None:
125    """
126    write content to file if file exists it will be truncated. if file does not exist it wil be created
127    """
128    makedirs(path.dirname(file_path), exist_ok=True)
129    f_descriptor = os.open(file_path, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o755)
130    with os.fdopen(f_descriptor, mode='w+', encoding="utf-8") as f_handle:
131        f_handle.write(content)
132
133
134def get_opener(mode: int) -> Callable[[Union[str, Path], int], int]:
135    def opener(path_name: Union[str, Path], flags: int) -> int:
136        return os.open(path_name, os.O_RDWR | os.O_APPEND | os.O_CREAT | flags, mode)
137    return opener
138
139
140def write_list_to_file(list_for_write: list, file_path: Path, mode: int = 0o644) -> None:
141    custom_opener = get_opener(mode)
142    with open(file_path, mode='w+', encoding='utf-8', opener=custom_opener) as file:
143        for entity in list_for_write:
144            file.write(f"{entity}\n")
145
146
147def purify(line: str) -> str:
148    return line.strip(" \n").replace(" ", "")
149
150
151def enum_from_str(item_name: str, enum_cls: Type[EnumT]) -> Optional[EnumT]:
152    for enum_value in enum_cls:
153        if enum_value.value.lower() == item_name.lower() or str(enum_value).lower() == item_name.lower():
154            return enum_cls(enum_value)
155    return None
156
157
158def wrap_with_function(code: str, jit_preheat_repeats: int) -> str:
159    return f"""
160    function repeat_for_jit() {{
161        {code}
162    }}
163
164    for(let i = 0; i < {jit_preheat_repeats}; i++) {{
165        repeat_for_jit(i)
166    }}
167    """
168
169
170def iter_files(dirpath: Union[Path, str], allowed_ext: List[str]) -> Iterator[Tuple[str, str]]:
171    dirpath_gen = ((name, path.join(str(dirpath), name)) for name in os.listdir(str(dirpath)))
172    for name, path_value in dirpath_gen:
173        if not path.isfile(path_value):
174            continue
175        _, ext = path.splitext(path_value)
176        if ext in allowed_ext:
177            yield name, path_value
178
179
180def is_type_of(value: Any, type_: str) -> bool:
181    return str(type(value)).find(type_) > 0
182
183
184def get_platform_binary_name(name: str) -> str:
185    if os.name.startswith("nt"):
186        return name + ".exe"
187    return name
188
189
190def get_group_number(test_id: str, total_groups: int) -> int:
191    """
192    Calculates the number of a group by test_id
193    :param test_id: string value test path or test_id
194    :param total_groups: total number of groups
195    :return: the number of a group in the range [1, total_groups],
196    both boundaries are included.
197    """
198    random.seed(test_id)
199    return random.randint(1, total_groups)
200
201
202# from itertools import pairwise when switching to python version >= 3.10
203# pylint: disable=invalid-name
204def pairwise(iterable: Iterable[Path]) -> Iterator[Tuple[Path, Path]]:
205    a, b = tee(iterable)
206    next(b, None)
207    return zip(a, b)
208
209
210def compare_files(files: List[Path]) -> bool:
211    for f1, f2 in pairwise(files):
212        if not cmp(f1, f2):
213            return False
214    return True
215