1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021-2024 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import logging 19import os 20import random 21import shutil 22import zipfile 23from enum import Enum 24from filecmp import cmp 25from itertools import tee 26from os import makedirs, path, remove 27from pathlib import Path 28from typing import TypeVar, Callable, Optional, Type, Union, Any, List, Iterator, Tuple, Iterable 29from urllib import request 30from urllib.error import URLError 31 32from runner.logger import Log 33 34_LOGGER = logging.getLogger("runner.utils") 35 36EnumT = TypeVar("EnumT", bound=Enum) 37 38 39def progress(block_num: int, block_size: int, total_size: int) -> None: 40 Log.summary(_LOGGER, f"Downloaded block: {block_num} ({block_size}). Total: {total_size}") 41 42 43def download(name: str, git_url: str, revision: str, target_path: str, show_progress: bool = False) -> None: 44 archive_file = path.join(path.sep, 'tmp', f'{name}.zip') 45 url_file = f'{git_url}/{revision}.zip' 46 47 Log.summary(_LOGGER, f"Downloading from {url_file} to {archive_file}") 48 try: 49 if show_progress: 50 request.urlretrieve(url_file, archive_file, progress) 51 else: 52 request.urlretrieve(url_file, archive_file) 53 except URLError: 54 Log.exception_and_raise(_LOGGER, f'Downloading {url_file} file failed.') 55 56 Log.summary(_LOGGER, f"Extracting archive {archive_file} to {target_path}") 57 if path.exists(target_path): 58 shutil.rmtree(target_path) 59 60 try: 61 with zipfile.ZipFile(archive_file) as arch: 62 arch.extractall(path.dirname(target_path)) 63 except (zipfile.BadZipfile, zipfile.LargeZipFile): 64 Log.exception_and_raise(_LOGGER, f'Failed to unzip {archive_file} file') 65 66 remove(archive_file) 67 68 69ProcessCopy = Callable[[str, str], None] 70 71 72def generate(name: str, url: str, revision: str, generated_root: Path, *, 73 stamp_name: Optional[str] = None, test_subdir: str = "test", show_progress: bool = False, 74 process_copy: Optional[ProcessCopy] = None, force_download: bool = False) -> str: 75 Log.summary(_LOGGER, "Prepare test files") 76 stamp_name = f'{name}-{revision}' if not stamp_name else stamp_name 77 dest_path = path.join(generated_root, stamp_name) 78 makedirs(dest_path, exist_ok=True) 79 stamp_file = path.join(dest_path, f'{stamp_name}.stamp') 80 81 if not force_download and path.exists(stamp_file): 82 return dest_path 83 84 temp_path = path.join(path.sep, 'tmp', name, f'{name}-{revision}') 85 86 if force_download or not path.exists(temp_path): 87 download(name, url, revision, temp_path, show_progress) 88 89 if path.exists(dest_path): 90 shutil.rmtree(dest_path) 91 92 Log.summary(_LOGGER, "Copy and transform test files") 93 if process_copy is not None: 94 process_copy(path.join(temp_path, test_subdir), dest_path) 95 else: 96 copy(path.join(temp_path, test_subdir), dest_path) 97 98 Log.summary(_LOGGER, f"Create stamp file {stamp_file}") 99 with os.fdopen(os.open(stamp_file, os.O_RDWR | os.O_CREAT, 0o755), 100 'w+', encoding="utf-8") as _: # Create empty file-marker and close it at once 101 pass 102 103 return dest_path 104 105 106def copy(source_path: Union[Path, str], dest_path: Union[Path, str], remove_if_exist: bool = True) -> None: 107 try: 108 if source_path == dest_path: 109 return 110 if path.exists(dest_path) and remove_if_exist: 111 shutil.rmtree(dest_path) 112 shutil.copytree(source_path, dest_path, dirs_exist_ok=not remove_if_exist) 113 shutil.copymode(source_path, dest_path) 114 except OSError as ex: 115 Log.exception_and_raise(_LOGGER, str(ex)) 116 117 118def read_file(file_path: Union[Path, str]) -> str: 119 with os.fdopen(os.open(file_path, os.O_RDONLY, 0o755), "r", encoding='utf8') as f_handle: 120 text = f_handle.read() 121 return text 122 123 124def write_2_file(file_path: Union[Path, str], content: str) -> None: 125 """ 126 write content to file if file exists it will be truncated. if file does not exist it wil be created 127 """ 128 makedirs(path.dirname(file_path), exist_ok=True) 129 f_descriptor = os.open(file_path, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o755) 130 with os.fdopen(f_descriptor, mode='w+', encoding="utf-8") as f_handle: 131 f_handle.write(content) 132 133 134def purify(line: str) -> str: 135 return line.strip(" \n").replace(" ", "") 136 137 138def enum_from_str(item_name: str, enum_cls: Type[EnumT]) -> Optional[EnumT]: 139 for enum_value in enum_cls: 140 if enum_value.value.lower() == item_name.lower() or str(enum_value).lower() == item_name.lower(): 141 return enum_cls(enum_value) 142 return None 143 144 145def wrap_with_function(code: str, jit_preheat_repeats: int) -> str: 146 return f""" 147 function repeat_for_jit() {{ 148 {code} 149 }} 150 151 for(let i = 0; i < {jit_preheat_repeats}; i++) {{ 152 repeat_for_jit(i) 153 }} 154 """ 155 156 157def iter_files(dirpath: Union[Path, str], allowed_ext: List[str]) -> Iterator[Tuple[str, str]]: 158 dirpath_gen = ((name, path.join(str(dirpath), name)) for name in os.listdir(str(dirpath))) 159 for name, path_value in dirpath_gen: 160 if not path.isfile(path_value): 161 continue 162 _, ext = path.splitext(path_value) 163 if ext in allowed_ext: 164 yield name, path_value 165 166 167def is_type_of(value: Any, type_: str) -> bool: 168 return str(type(value)).find(type_) > 0 169 170 171def get_platform_binary_name(name: str) -> str: 172 if os.name.startswith("nt"): 173 return name + ".exe" 174 return name 175 176 177def get_group_number(test_id: str, total_groups: int) -> int: 178 """ 179 Calculates the number of a group by test_id 180 :param test_id: string value test path or test_id 181 :param total_groups: total number of groups 182 :return: the number of a group in the range [1, total_groups], 183 both boundaries are included. 184 """ 185 random.seed(test_id) 186 return random.randint(1, total_groups) 187 188 189# from itertools import pairwise when switching to python version >= 3.10 190# pylint: disable=invalid-name 191def pairwise(iterable: Iterable[Path]) -> Iterator[Tuple[Path, Path]]: 192 a, b = tee(iterable) 193 next(b, None) 194 return zip(a, b) 195 196 197def compare_files(files: List[Path]) -> bool: 198 for f1, f2 in pairwise(files): 199 if not cmp(f1, f2): 200 return False 201 return True 202