1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021-2025 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import logging 19import os 20import random 21import shutil 22import zipfile 23from enum import Enum 24from filecmp import cmp 25from itertools import tee 26from os import makedirs, path, remove 27from pathlib import Path 28from typing import TypeVar, Callable, Optional, Type, Union, Any, List, Iterator, Tuple, Iterable 29from urllib import request 30from urllib.error import URLError 31 32from runner.logger import Log 33 34_LOGGER = logging.getLogger("runner.utils") 35 36EnumT = TypeVar("EnumT", bound=Enum) 37 38 39def progress(block_num: int, block_size: int, total_size: int) -> None: 40 Log.summary(_LOGGER, f"Downloaded block: {block_num} ({block_size}). Total: {total_size}") 41 42 43def download(name: str, git_url: str, revision: str, target_path: str, show_progress: bool = False) -> None: 44 archive_file = path.join(path.sep, 'tmp', f'{name}.zip') 45 url_file = f'{git_url}/{revision}.zip' 46 47 Log.summary(_LOGGER, f"Downloading from {url_file} to {archive_file}") 48 try: 49 if show_progress: 50 request.urlretrieve(url_file, archive_file, progress) 51 else: 52 request.urlretrieve(url_file, archive_file) 53 except URLError: 54 Log.exception_and_raise(_LOGGER, f'Downloading {url_file} file failed.') 55 56 Log.summary(_LOGGER, f"Extracting archive {archive_file} to {target_path}") 57 if path.exists(target_path): 58 shutil.rmtree(target_path) 59 60 try: 61 with zipfile.ZipFile(archive_file) as arch: 62 arch.extractall(path.dirname(target_path)) 63 except (zipfile.BadZipfile, zipfile.LargeZipFile): 64 Log.exception_and_raise(_LOGGER, f'Failed to unzip {archive_file} file') 65 66 remove(archive_file) 67 68 69ProcessCopy = Callable[[str, str], None] 70 71 72def generate(name: str, url: str, revision: str, generated_root: Path, *, 73 stamp_name: Optional[str] = None, test_subdir: str = "test", show_progress: bool = False, 74 process_copy: Optional[ProcessCopy] = None, force_download: bool = False) -> str: 75 Log.summary(_LOGGER, "Prepare test files") 76 stamp_name = f'{name}-{revision}' if not stamp_name else stamp_name 77 dest_path = path.join(generated_root, stamp_name) 78 makedirs(dest_path, exist_ok=True) 79 stamp_file = path.join(dest_path, f'{stamp_name}.stamp') 80 81 if not force_download and path.exists(stamp_file): 82 return dest_path 83 84 temp_path = path.join(path.sep, 'tmp', name, f'{name}-{revision}') 85 86 if force_download or not path.exists(temp_path): 87 download(name, url, revision, temp_path, show_progress) 88 89 if path.exists(dest_path): 90 shutil.rmtree(dest_path) 91 92 Log.summary(_LOGGER, "Copy and transform test files") 93 if process_copy is not None: 94 process_copy(path.join(temp_path, test_subdir), dest_path) 95 else: 96 copy(path.join(temp_path, test_subdir), dest_path) 97 98 Log.summary(_LOGGER, f"Create stamp file {stamp_file}") 99 with os.fdopen(os.open(stamp_file, os.O_RDWR | os.O_CREAT, 0o755), 100 'w+', encoding="utf-8") as _: # Create empty file-marker and close it at once 101 pass 102 103 return dest_path 104 105 106def copy(source_path: Union[Path, str], dest_path: Union[Path, str], remove_if_exist: bool = True) -> None: 107 try: 108 if source_path == dest_path: 109 return 110 if path.exists(dest_path) and remove_if_exist: 111 shutil.rmtree(dest_path) 112 shutil.copytree(source_path, dest_path, dirs_exist_ok=not remove_if_exist) 113 shutil.copymode(source_path, dest_path) 114 except OSError as ex: 115 Log.exception_and_raise(_LOGGER, str(ex)) 116 117 118def read_file(file_path: Union[Path, str]) -> str: 119 with os.fdopen(os.open(file_path, os.O_RDONLY, 0o755), "r", encoding='utf8') as f_handle: 120 text = f_handle.read() 121 return text 122 123 124def write_2_file(file_path: Union[Path, str], content: str) -> None: 125 """ 126 write content to file if file exists it will be truncated. if file does not exist it wil be created 127 """ 128 makedirs(path.dirname(file_path), exist_ok=True) 129 f_descriptor = os.open(file_path, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o755) 130 with os.fdopen(f_descriptor, mode='w+', encoding="utf-8") as f_handle: 131 f_handle.write(content) 132 133 134def get_opener(mode: int) -> Callable[[Union[str, Path], int], int]: 135 def opener(path_name: Union[str, Path], flags: int) -> int: 136 return os.open(path_name, os.O_RDWR | os.O_APPEND | os.O_CREAT | flags, mode) 137 return opener 138 139 140def write_list_to_file(list_for_write: list, file_path: Path, mode: int = 0o644) -> None: 141 custom_opener = get_opener(mode) 142 with open(file_path, mode='w+', encoding='utf-8', opener=custom_opener) as file: 143 for entity in list_for_write: 144 file.write(f"{entity}\n") 145 146 147def purify(line: str) -> str: 148 return line.strip(" \n").replace(" ", "") 149 150 151def enum_from_str(item_name: str, enum_cls: Type[EnumT]) -> Optional[EnumT]: 152 for enum_value in enum_cls: 153 if enum_value.value.lower() == item_name.lower() or str(enum_value).lower() == item_name.lower(): 154 return enum_cls(enum_value) 155 return None 156 157 158def wrap_with_function(code: str, jit_preheat_repeats: int) -> str: 159 return f""" 160 function repeat_for_jit() {{ 161 {code} 162 }} 163 164 for(let i = 0; i < {jit_preheat_repeats}; i++) {{ 165 repeat_for_jit(i) 166 }} 167 """ 168 169 170def iter_files(dirpath: Union[Path, str], allowed_ext: List[str]) -> Iterator[Tuple[str, str]]: 171 dirpath_gen = ((name, path.join(str(dirpath), name)) for name in os.listdir(str(dirpath))) 172 for name, path_value in dirpath_gen: 173 if not path.isfile(path_value): 174 continue 175 _, ext = path.splitext(path_value) 176 if ext in allowed_ext: 177 yield name, path_value 178 179 180def is_type_of(value: Any, type_: str) -> bool: 181 return str(type(value)).find(type_) > 0 182 183 184def get_platform_binary_name(name: str) -> str: 185 if os.name.startswith("nt"): 186 return name + ".exe" 187 return name 188 189 190def get_group_number(test_id: str, total_groups: int) -> int: 191 """ 192 Calculates the number of a group by test_id 193 :param test_id: string value test path or test_id 194 :param total_groups: total number of groups 195 :return: the number of a group in the range [1, total_groups], 196 both boundaries are included. 197 """ 198 random.seed(test_id) 199 return random.randint(1, total_groups) 200 201 202# from itertools import pairwise when switching to python version >= 3.10 203# pylint: disable=invalid-name 204def pairwise(iterable: Iterable[Path]) -> Iterator[Tuple[Path, Path]]: 205 a, b = tee(iterable) 206 next(b, None) 207 return zip(a, b) 208 209 210def compare_files(files: List[Path]) -> bool: 211 for f1, f2 in pairwise(files): 212 if not cmp(f1, f2): 213 return False 214 return True 215