1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021-2023 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17# This file does only contain a selection of the most common options. For a 18# full list see the documentation: 19# http://www.sphinx-doc.org/en/master/config 20 21import logging 22import os 23import random 24import shutil 25from enum import Enum 26from filecmp import cmp 27from itertools import tee 28from os import makedirs, path, remove 29from typing import TypeVar, Callable, Optional, Type, Union, Any, List, Iterator, Tuple, Iterable 30from pathlib import Path 31from urllib import request 32from urllib.error import URLError 33import zipfile 34 35from runner.logger import Log 36 37_LOGGER = logging.getLogger("runner.utils") 38 39EnumT = TypeVar("EnumT", bound=Enum) 40 41 42def progress(block_num: int, block_size: int, total_size: int) -> None: 43 Log.summary(_LOGGER, f"Downloaded block: {block_num} ({block_size}). Total: {total_size}") 44 45 46def download(name: str, git_url: str, revision: str, target_path: str, show_progress: bool = False) -> None: 47 archive_file = path.join(path.sep, 'tmp', f'{name}.zip') 48 url_file = f'{git_url}/{revision}.zip' 49 50 Log.summary(_LOGGER, f"Downloading from {url_file} to {archive_file}") 51 try: 52 if show_progress: 53 request.urlretrieve(url_file, archive_file, progress) 54 else: 55 request.urlretrieve(url_file, archive_file) 56 except URLError: 57 Log.exception_and_raise(_LOGGER, f'Downloading {url_file} file failed.') 58 59 Log.summary(_LOGGER, f"Extracting archive {archive_file} to {target_path}") 60 if path.exists(target_path): 61 shutil.rmtree(target_path) 62 63 try: 64 with zipfile.ZipFile(archive_file) as arch: 65 arch.extractall(path.dirname(target_path)) 66 except (zipfile.BadZipfile, zipfile.LargeZipFile): 67 Log.exception_and_raise(_LOGGER, f'Failed to unzip {archive_file} file') 68 69 remove(archive_file) 70 71 72ProcessCopy = Callable[[str, str], None] 73 74 75def generate(name: str, url: str, revision: str, generated_root: Path, *, 76 stamp_name: Optional[str] = None, test_subdir: str = "test", show_progress: bool = False, 77 process_copy: Optional[ProcessCopy] = None, force_download: bool = False) -> str: 78 Log.summary(_LOGGER, "Prepare test files") 79 stamp_name = f'{name}-{revision}' if not stamp_name else stamp_name 80 dest_path = path.join(generated_root, stamp_name) 81 makedirs(dest_path, exist_ok=True) 82 stamp_file = path.join(dest_path, f'{stamp_name}.stamp') 83 84 if not force_download and path.exists(stamp_file): 85 return dest_path 86 87 temp_path = path.join(path.sep, 'tmp', name, f'{name}-{revision}') 88 89 if force_download or not path.exists(temp_path): 90 download(name, url, revision, temp_path, show_progress) 91 92 if path.exists(dest_path): 93 shutil.rmtree(dest_path) 94 95 Log.summary(_LOGGER, "Copy and transform test files") 96 if process_copy is not None: 97 process_copy(path.join(temp_path, test_subdir), dest_path) 98 else: 99 copy(path.join(temp_path, test_subdir), dest_path) 100 101 Log.summary(_LOGGER, f"Create stamp file {stamp_file}") 102 with open(stamp_file, 'w+', encoding="utf-8") as _: # Create empty file-marker and close it at once 103 pass 104 105 return dest_path 106 107 108def copy(source_path: Union[Path, str], dest_path: Union[Path, str], remove_if_exist: bool = True) -> None: 109 try: 110 if path.exists(dest_path) and remove_if_exist: 111 shutil.rmtree(dest_path) 112 shutil.copytree(source_path, dest_path, dirs_exist_ok=not remove_if_exist) 113 except OSError as ex: 114 Log.exception_and_raise(_LOGGER, str(ex)) 115 116 117def read_file(file_path: Union[Path, str]) -> str: 118 with open(file_path, "r", encoding='utf8') as f_handle: 119 text = f_handle.read() 120 return text 121 122 123def write_2_file(file_path: Union[Path, str], content: str) -> None: 124 """ 125 write content to file if file exists it will be truncated. if file does not exist it wil be created 126 """ 127 makedirs(path.dirname(file_path), exist_ok=True) 128 with open(file_path, mode='w+', encoding="utf-8") as f_handle: 129 f_handle.write(content) 130 131 132def purify(line: str) -> str: 133 return line.strip(" \n").replace(" ", "") 134 135 136def enum_from_str(item_name: str, enum_cls: Type[EnumT]) -> Optional[EnumT]: 137 for enum_value in enum_cls: 138 if enum_value.value.lower() == item_name.lower() or str(enum_value).lower() == item_name.lower(): 139 return enum_cls(enum_value) 140 return None 141 142 143def wrap_with_function(code: str, jit_preheat_repeats: int) -> str: 144 return f""" 145 function repeat_for_jit() {{ 146 {code} 147 }} 148 149 for(let i = 0; i < {jit_preheat_repeats}; i++) {{ 150 repeat_for_jit(i) 151 }} 152 """ 153 154 155def iter_files(dirpath: Union[Path, str], allowed_ext: List[str]) -> Iterator[Tuple[str, str]]: 156 dirpath_gen = ((name, path.join(str(dirpath), name)) for name in os.listdir(str(dirpath))) 157 for name, path_value in dirpath_gen: 158 if not path.isfile(path_value): 159 continue 160 _, ext = path.splitext(path_value) 161 if ext in allowed_ext: 162 yield name, path_value 163 164 165def is_type_of(value: Any, type_: str) -> bool: 166 return str(type(value)).find(type_) > 0 167 168 169def get_platform_binary_name(name: str) -> str: 170 if os.name.startswith("nt"): 171 return name + ".exe" 172 return name 173 174 175def get_group_number(test_id: str, total_groups: int) -> int: 176 """ 177 Calculates the number of a group by test_id 178 :param test_id: string value test path or test_id 179 :param total_groups: total number of groups 180 :return: the number of a group in the range [1, total_groups], 181 both boundaries are included. 182 """ 183 random.seed(test_id) 184 return random.randint(1, total_groups) 185 186 187# from itertools import pairwise when switching to python version >= 3.10 188# pylint: disable=invalid-name 189def pairwise(iterable: Iterable[Path]) -> Iterator[Tuple[Path, Path]]: 190 a, b = tee(iterable) 191 next(b, None) 192 return zip(a, b) 193 194 195def compare_files(files: List[Path]) -> bool: 196 for f1, f2 in pairwise(files): 197 if not cmp(f1, f2): 198 return False 199 return True 200