• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2021-2024 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import logging
19import os
20import random
21import shutil
22import zipfile
23from enum import Enum
24from filecmp import cmp
25from itertools import tee
26from os import makedirs, path, remove
27from pathlib import Path
28from typing import TypeVar, Callable, Optional, Type, Union, Any, List, Iterator, Tuple, Iterable
29from urllib import request
30from urllib.error import URLError
31
32from runner.logger import Log
33
34_LOGGER = logging.getLogger("runner.utils")
35
36EnumT = TypeVar("EnumT", bound=Enum)
37
38
39def progress(block_num: int, block_size: int, total_size: int) -> None:
40    Log.summary(_LOGGER, f"Downloaded block: {block_num} ({block_size}). Total: {total_size}")
41
42
43def download(name: str, git_url: str, revision: str, target_path: str, show_progress: bool = False) -> None:
44    archive_file = path.join(path.sep, 'tmp', f'{name}.zip')
45    url_file = f'{git_url}/{revision}.zip'
46
47    Log.summary(_LOGGER, f"Downloading from {url_file} to {archive_file}")
48    try:
49        if show_progress:
50            request.urlretrieve(url_file, archive_file, progress)
51        else:
52            request.urlretrieve(url_file, archive_file)
53    except URLError:
54        Log.exception_and_raise(_LOGGER, f'Downloading {url_file} file failed.')
55
56    Log.summary(_LOGGER, f"Extracting archive {archive_file} to {target_path}")
57    if path.exists(target_path):
58        shutil.rmtree(target_path)
59
60    try:
61        with zipfile.ZipFile(archive_file) as arch:
62            arch.extractall(path.dirname(target_path))
63    except (zipfile.BadZipfile, zipfile.LargeZipFile):
64        Log.exception_and_raise(_LOGGER, f'Failed to unzip {archive_file} file')
65
66    remove(archive_file)
67
68
69ProcessCopy = Callable[[str, str], None]
70
71
72def generate(name: str, url: str, revision: str, generated_root: Path, *,
73             stamp_name: Optional[str] = None, test_subdir: str = "test", show_progress: bool = False,
74             process_copy: Optional[ProcessCopy] = None, force_download: bool = False) -> str:
75    Log.summary(_LOGGER, "Prepare test files")
76    stamp_name = f'{name}-{revision}' if not stamp_name else stamp_name
77    dest_path = path.join(generated_root, stamp_name)
78    makedirs(dest_path, exist_ok=True)
79    stamp_file = path.join(dest_path, f'{stamp_name}.stamp')
80
81    if not force_download and path.exists(stamp_file):
82        return dest_path
83
84    temp_path = path.join(path.sep, 'tmp', name, f'{name}-{revision}')
85
86    if force_download or not path.exists(temp_path):
87        download(name, url, revision, temp_path, show_progress)
88
89    if path.exists(dest_path):
90        shutil.rmtree(dest_path)
91
92    Log.summary(_LOGGER, "Copy and transform test files")
93    if process_copy is not None:
94        process_copy(path.join(temp_path, test_subdir), dest_path)
95    else:
96        copy(path.join(temp_path, test_subdir), dest_path)
97
98    Log.summary(_LOGGER, f"Create stamp file {stamp_file}")
99    with os.fdopen(os.open(stamp_file, os.O_RDWR | os.O_CREAT, 0o755),
100                   'w+', encoding="utf-8") as _:  # Create empty file-marker and close it at once
101        pass
102
103    return dest_path
104
105
106def copy(source_path: Union[Path, str], dest_path: Union[Path, str], remove_if_exist: bool = True) -> None:
107    try:
108        if source_path == dest_path:
109            return
110        if path.exists(dest_path) and remove_if_exist:
111            shutil.rmtree(dest_path)
112        shutil.copytree(source_path, dest_path, dirs_exist_ok=not remove_if_exist)
113        shutil.copymode(source_path, dest_path)
114    except OSError as ex:
115        Log.exception_and_raise(_LOGGER, str(ex))
116
117
118def read_file(file_path: Union[Path, str]) -> str:
119    with os.fdopen(os.open(file_path, os.O_RDONLY, 0o755), "r", encoding='utf8') as f_handle:
120        text = f_handle.read()
121    return text
122
123
124def write_2_file(file_path: Union[Path, str], content: str) -> None:
125    """
126    write content to file if file exists it will be truncated. if file does not exist it wil be created
127    """
128    makedirs(path.dirname(file_path), exist_ok=True)
129    f_descriptor = os.open(file_path, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o755)
130    with os.fdopen(f_descriptor, mode='w+', encoding="utf-8") as f_handle:
131        f_handle.write(content)
132
133
134def purify(line: str) -> str:
135    return line.strip(" \n").replace(" ", "")
136
137
138def enum_from_str(item_name: str, enum_cls: Type[EnumT]) -> Optional[EnumT]:
139    for enum_value in enum_cls:
140        if enum_value.value.lower() == item_name.lower() or str(enum_value).lower() == item_name.lower():
141            return enum_cls(enum_value)
142    return None
143
144
145def wrap_with_function(code: str, jit_preheat_repeats: int) -> str:
146    return f"""
147    function repeat_for_jit() {{
148        {code}
149    }}
150
151    for(let i = 0; i < {jit_preheat_repeats}; i++) {{
152        repeat_for_jit(i)
153    }}
154    """
155
156
157def iter_files(dirpath: Union[Path, str], allowed_ext: List[str]) -> Iterator[Tuple[str, str]]:
158    dirpath_gen = ((name, path.join(str(dirpath), name)) for name in os.listdir(str(dirpath)))
159    for name, path_value in dirpath_gen:
160        if not path.isfile(path_value):
161            continue
162        _, ext = path.splitext(path_value)
163        if ext in allowed_ext:
164            yield name, path_value
165
166
167def is_type_of(value: Any, type_: str) -> bool:
168    return str(type(value)).find(type_) > 0
169
170
171def get_platform_binary_name(name: str) -> str:
172    if os.name.startswith("nt"):
173        return name + ".exe"
174    return name
175
176
177def get_group_number(test_id: str, total_groups: int) -> int:
178    """
179    Calculates the number of a group by test_id
180    :param test_id: string value test path or test_id
181    :param total_groups: total number of groups
182    :return: the number of a group in the range [1, total_groups],
183    both boundaries are included.
184    """
185    random.seed(test_id)
186    return random.randint(1, total_groups)
187
188
189# from itertools import pairwise when switching to python version >= 3.10
190# pylint: disable=invalid-name
191def pairwise(iterable: Iterable[Path]) -> Iterator[Tuple[Path, Path]]:
192    a, b = tee(iterable)
193    next(b, None)
194    return zip(a, b)
195
196
197def compare_files(files: List[Path]) -> bool:
198    for f1, f2 in pairwise(files):
199        if not cmp(f1, f2):
200            return False
201    return True
202