• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2021-2024 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import logging
19import os
20import random
21import shutil
22from enum import Enum
23from filecmp import cmp
24from itertools import tee
25from os import makedirs, path, remove
26from typing import TypeVar, Callable, Optional, Type, Union, Any, List, Iterator, Tuple, Iterable
27from pathlib import Path
28from urllib import request
29from urllib.error import URLError
30import zipfile
31
32from runner.logger import Log
33
34_LOGGER = logging.getLogger("runner.utils")
35
36EnumT = TypeVar("EnumT", bound=Enum)
37
38
39def progress(block_num: int, block_size: int, total_size: int) -> None:
40    Log.summary(_LOGGER, f"Downloaded block: {block_num} ({block_size}). Total: {total_size}")
41
42
43def download(name: str, git_url: str, revision: str, target_path: str, show_progress: bool = False) -> None:
44    archive_file = path.join(path.sep, 'tmp', f'{name}.zip')
45    url_file = f'{git_url}/{revision}.zip'
46
47    Log.summary(_LOGGER, f"Downloading from {url_file} to {archive_file}")
48    try:
49        if show_progress:
50            request.urlretrieve(url_file, archive_file, progress)
51        else:
52            request.urlretrieve(url_file, archive_file)
53    except URLError:
54        Log.exception_and_raise(_LOGGER, f'Downloading {url_file} file failed.')
55
56    Log.summary(_LOGGER, f"Extracting archive {archive_file} to {target_path}")
57    if path.exists(target_path):
58        shutil.rmtree(target_path)
59
60    try:
61        with zipfile.ZipFile(archive_file) as arch:
62            arch.extractall(path.dirname(target_path))
63    except (zipfile.BadZipfile, zipfile.LargeZipFile):
64        Log.exception_and_raise(_LOGGER, f'Failed to unzip {archive_file} file')
65
66    remove(archive_file)
67
68
69ProcessCopy = Callable[[str, str], None]
70
71
72def generate(name: str, url: str, revision: str, generated_root: Path, *,
73             stamp_name: Optional[str] = None, test_subdir: str = "test", show_progress: bool = False,
74             process_copy: Optional[ProcessCopy] = None, force_download: bool = False) -> str:
75    Log.summary(_LOGGER, "Prepare test files")
76    stamp_name = f'{name}-{revision}' if not stamp_name else stamp_name
77    dest_path = path.join(generated_root, stamp_name)
78    makedirs(dest_path, exist_ok=True)
79    stamp_file = path.join(dest_path, f'{stamp_name}.stamp')
80
81    if not force_download and path.exists(stamp_file):
82        return dest_path
83
84    temp_path = path.join(path.sep, 'tmp', name, f'{name}-{revision}')
85
86    if force_download or not path.exists(temp_path):
87        download(name, url, revision, temp_path, show_progress)
88
89    if path.exists(dest_path):
90        shutil.rmtree(dest_path)
91
92    Log.summary(_LOGGER, "Copy and transform test files")
93    if process_copy is not None:
94        process_copy(path.join(temp_path, test_subdir), dest_path)
95    else:
96        copy(path.join(temp_path, test_subdir), dest_path)
97
98    Log.summary(_LOGGER, f"Create stamp file {stamp_file}")
99    with os.fdopen(os.open(stamp_file, os.O_RDWR | os.O_CREAT, 0o755),
100                   'w+', encoding="utf-8") as _:  # Create empty file-marker and close it at once
101        pass
102
103    return dest_path
104
105
106def copy(source_path: Union[Path, str], dest_path: Union[Path, str], remove_if_exist: bool = True) -> None:
107    try:
108        if source_path == dest_path:
109            return
110        if path.exists(dest_path) and remove_if_exist:
111            shutil.rmtree(dest_path)
112        shutil.copytree(source_path, dest_path, dirs_exist_ok=not remove_if_exist)
113    except OSError as ex:
114        Log.exception_and_raise(_LOGGER, str(ex))
115
116
117def read_file(file_path: Union[Path, str]) -> str:
118    with os.fdopen(os.open(file_path, os.O_RDONLY, 0o755), "r", encoding='utf8') as f_handle:
119        text = f_handle.read()
120    return text
121
122
123def write_2_file(file_path: Union[Path, str], content: str) -> None:
124    """
125    write content to file if file exists it will be truncated. if file does not exist it wil be created
126    """
127    makedirs(path.dirname(file_path), exist_ok=True)
128    with os.fdopen(os.open(file_path, os.O_RDWR | os.O_CREAT, 0o755), mode='w+', encoding="utf-8") as f_handle:
129        f_handle.write(content)
130
131
132def purify(line: str) -> str:
133    return line.strip(" \n").replace(" ", "")
134
135
136def enum_from_str(item_name: str, enum_cls: Type[EnumT]) -> Optional[EnumT]:
137    for enum_value in enum_cls:
138        if enum_value.value.lower() == item_name.lower() or str(enum_value).lower() == item_name.lower():
139            return enum_cls(enum_value)
140    return None
141
142
143def wrap_with_function(code: str, jit_preheat_repeats: int) -> str:
144    return f"""
145    function repeat_for_jit() {{
146        {code}
147    }}
148
149    for(let i = 0; i < {jit_preheat_repeats}; i++) {{
150        repeat_for_jit(i)
151    }}
152    """
153
154
155def iter_files(dirpath: Union[Path, str], allowed_ext: List[str]) -> Iterator[Tuple[str, str]]:
156    dirpath_gen = ((name, path.join(str(dirpath), name)) for name in os.listdir(str(dirpath)))
157    for name, path_value in dirpath_gen:
158        if not path.isfile(path_value):
159            continue
160        _, ext = path.splitext(path_value)
161        if ext in allowed_ext:
162            yield name, path_value
163
164
165def is_type_of(value: Any, type_: str) -> bool:
166    return str(type(value)).find(type_) > 0
167
168
169def get_platform_binary_name(name: str) -> str:
170    if os.name.startswith("nt"):
171        return name + ".exe"
172    return name
173
174
175def get_group_number(test_id: str, total_groups: int) -> int:
176    """
177    Calculates the number of a group by test_id
178    :param test_id: string value test path or test_id
179    :param total_groups: total number of groups
180    :return: the number of a group in the range [1, total_groups],
181    both boundaries are included.
182    """
183    random.seed(test_id)
184    return random.randint(1, total_groups)
185
186
187# from itertools import pairwise when switching to python version >= 3.10
188# pylint: disable=invalid-name
189def pairwise(iterable: Iterable[Path]) -> Iterator[Tuple[Path, Path]]:
190    a, b = tee(iterable)
191    next(b, None)
192    return zip(a, b)
193
194
195def compare_files(files: List[Path]) -> bool:
196    for f1, f2 in pairwise(files):
197        if not cmp(f1, f2):
198            return False
199    return True
200