1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021-2024 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import logging 19import os 20import random 21import shutil 22from enum import Enum 23from filecmp import cmp 24from itertools import tee 25from os import makedirs, path, remove 26from typing import TypeVar, Callable, Optional, Type, Union, Any, List, Iterator, Tuple, Iterable 27from pathlib import Path 28from urllib import request 29from urllib.error import URLError 30import zipfile 31 32from runner.logger import Log 33 34_LOGGER = logging.getLogger("runner.utils") 35 36EnumT = TypeVar("EnumT", bound=Enum) 37 38 39def progress(block_num: int, block_size: int, total_size: int) -> None: 40 Log.summary(_LOGGER, f"Downloaded block: {block_num} ({block_size}). Total: {total_size}") 41 42 43def download(name: str, git_url: str, revision: str, target_path: str, show_progress: bool = False) -> None: 44 archive_file = path.join(path.sep, 'tmp', f'{name}.zip') 45 url_file = f'{git_url}/{revision}.zip' 46 47 Log.summary(_LOGGER, f"Downloading from {url_file} to {archive_file}") 48 try: 49 if show_progress: 50 request.urlretrieve(url_file, archive_file, progress) 51 else: 52 request.urlretrieve(url_file, archive_file) 53 except URLError: 54 Log.exception_and_raise(_LOGGER, f'Downloading {url_file} file failed.') 55 56 Log.summary(_LOGGER, f"Extracting archive {archive_file} to {target_path}") 57 if path.exists(target_path): 58 shutil.rmtree(target_path) 59 60 try: 61 with zipfile.ZipFile(archive_file) as arch: 62 arch.extractall(path.dirname(target_path)) 63 except (zipfile.BadZipfile, zipfile.LargeZipFile): 64 Log.exception_and_raise(_LOGGER, f'Failed to unzip {archive_file} file') 65 66 remove(archive_file) 67 68 69ProcessCopy = Callable[[str, str], None] 70 71 72def generate(name: str, url: str, revision: str, generated_root: Path, *, 73 stamp_name: Optional[str] = None, test_subdir: str = "test", show_progress: bool = False, 74 process_copy: Optional[ProcessCopy] = None, force_download: bool = False) -> str: 75 Log.summary(_LOGGER, "Prepare test files") 76 stamp_name = f'{name}-{revision}' if not stamp_name else stamp_name 77 dest_path = path.join(generated_root, stamp_name) 78 makedirs(dest_path, exist_ok=True) 79 stamp_file = path.join(dest_path, f'{stamp_name}.stamp') 80 81 if not force_download and path.exists(stamp_file): 82 return dest_path 83 84 temp_path = path.join(path.sep, 'tmp', name, f'{name}-{revision}') 85 86 if force_download or not path.exists(temp_path): 87 download(name, url, revision, temp_path, show_progress) 88 89 if path.exists(dest_path): 90 shutil.rmtree(dest_path) 91 92 Log.summary(_LOGGER, "Copy and transform test files") 93 if process_copy is not None: 94 process_copy(path.join(temp_path, test_subdir), dest_path) 95 else: 96 copy(path.join(temp_path, test_subdir), dest_path) 97 98 Log.summary(_LOGGER, f"Create stamp file {stamp_file}") 99 with os.fdopen(os.open(stamp_file, os.O_RDWR | os.O_CREAT, 0o755), 100 'w+', encoding="utf-8") as _: # Create empty file-marker and close it at once 101 pass 102 103 return dest_path 104 105 106def copy(source_path: Union[Path, str], dest_path: Union[Path, str], remove_if_exist: bool = True) -> None: 107 try: 108 if source_path == dest_path: 109 return 110 if path.exists(dest_path) and remove_if_exist: 111 shutil.rmtree(dest_path) 112 shutil.copytree(source_path, dest_path, dirs_exist_ok=not remove_if_exist) 113 except OSError as ex: 114 Log.exception_and_raise(_LOGGER, str(ex)) 115 116 117def read_file(file_path: Union[Path, str]) -> str: 118 with os.fdopen(os.open(file_path, os.O_RDONLY, 0o755), "r", encoding='utf8') as f_handle: 119 text = f_handle.read() 120 return text 121 122 123def write_2_file(file_path: Union[Path, str], content: str) -> None: 124 """ 125 write content to file if file exists it will be truncated. if file does not exist it wil be created 126 """ 127 makedirs(path.dirname(file_path), exist_ok=True) 128 with os.fdopen(os.open(file_path, os.O_RDWR | os.O_CREAT, 0o755), mode='w+', encoding="utf-8") as f_handle: 129 f_handle.write(content) 130 131 132def purify(line: str) -> str: 133 return line.strip(" \n").replace(" ", "") 134 135 136def enum_from_str(item_name: str, enum_cls: Type[EnumT]) -> Optional[EnumT]: 137 for enum_value in enum_cls: 138 if enum_value.value.lower() == item_name.lower() or str(enum_value).lower() == item_name.lower(): 139 return enum_cls(enum_value) 140 return None 141 142 143def wrap_with_function(code: str, jit_preheat_repeats: int) -> str: 144 return f""" 145 function repeat_for_jit() {{ 146 {code} 147 }} 148 149 for(let i = 0; i < {jit_preheat_repeats}; i++) {{ 150 repeat_for_jit(i) 151 }} 152 """ 153 154 155def iter_files(dirpath: Union[Path, str], allowed_ext: List[str]) -> Iterator[Tuple[str, str]]: 156 dirpath_gen = ((name, path.join(str(dirpath), name)) for name in os.listdir(str(dirpath))) 157 for name, path_value in dirpath_gen: 158 if not path.isfile(path_value): 159 continue 160 _, ext = path.splitext(path_value) 161 if ext in allowed_ext: 162 yield name, path_value 163 164 165def is_type_of(value: Any, type_: str) -> bool: 166 return str(type(value)).find(type_) > 0 167 168 169def get_platform_binary_name(name: str) -> str: 170 if os.name.startswith("nt"): 171 return name + ".exe" 172 return name 173 174 175def get_group_number(test_id: str, total_groups: int) -> int: 176 """ 177 Calculates the number of a group by test_id 178 :param test_id: string value test path or test_id 179 :param total_groups: total number of groups 180 :return: the number of a group in the range [1, total_groups], 181 both boundaries are included. 182 """ 183 random.seed(test_id) 184 return random.randint(1, total_groups) 185 186 187# from itertools import pairwise when switching to python version >= 3.10 188# pylint: disable=invalid-name 189def pairwise(iterable: Iterable[Path]) -> Iterator[Tuple[Path, Path]]: 190 a, b = tee(iterable) 191 next(b, None) 192 return zip(a, b) 193 194 195def compare_files(files: List[Path]) -> bool: 196 for f1, f2 in pairwise(files): 197 if not cmp(f1, f2): 198 return False 199 return True 200