1#!/usr/bin/env python3 2# -- coding: utf-8 -- 3# 4# Copyright (c) 2024-2025 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import importlib 19import os 20import platform 21import random 22import re 23import shutil 24import subprocess 25import threading 26import zipfile 27from collections.abc import Callable, Iterator 28from concurrent.futures import Future 29from dataclasses import dataclass 30from filecmp import cmp 31from itertools import pairwise 32from os import makedirs, path, remove 33from pathlib import Path 34from types import ModuleType 35from typing import Any 36from urllib import request 37from urllib.error import URLError 38 39import yaml 40 41from runner.common_exceptions import ( 42 DownloadException, 43 FileNotFoundException, 44 InvalidConfiguration, 45 UnzipException, 46 YamlException, 47) 48from runner.enum_types.base_enum import EnumT 49from runner.enum_types.configuration_kind import ArchitectureKind, OSKind 50from runner.logger import Log 51 52_LOGGER = Log.get_logger(__file__) 53 54 55def progress(block_num: int, block_size: int, total_size: int) -> None: 56 _LOGGER.summary(f"Downloaded block: {block_num} ({block_size}). Total: {total_size}") 57 58 59def download(name: str, *, git_url: str, revision: str, download_root: Path, target_path: Path, 60 show_progress: bool = False) -> None: 61 archive_file = path.join(download_root, f'{name}.zip') 62 url_file = f'{git_url}/{revision}.zip' 63 64 _LOGGER.summary(f"Downloading from {url_file} to {archive_file}") 65 try: 66 if show_progress: 67 request.urlretrieve(url_file, archive_file, progress) 68 else: 69 request.urlretrieve(url_file, archive_file) 70 except URLError as exc: 71 raise DownloadException(f'Downloading {url_file} file failed.') from exc 72 73 _LOGGER.summary(f"Extracting archive {archive_file} to {target_path}") 74 if path.exists(target_path): 75 shutil.rmtree(target_path) 76 77 try: 78 with zipfile.ZipFile(archive_file) as arch: 79 arch.extractall(path.dirname(target_path)) 80 except (zipfile.BadZipfile, zipfile.LargeZipFile) as exc: 81 raise UnzipException(f'Failed to unzip {archive_file} file') from exc 82 83 remove(archive_file) 84 85 86ProcessCopy = Callable[[Path, Path], None] 87 88 89def download_and_generate(name: str, url: str, revision: str, download_root: Path, generated_root: Path, *, 90 stamp_name: str | None = None, test_subdir: str = "test", show_progress: bool = False, 91 process_copy: ProcessCopy | None = None, force_download: bool = False) -> Path: 92 _LOGGER.summary("Prepare test files") 93 stamp_name = f'{name}-{revision}' if not stamp_name else stamp_name 94 dest_path = generated_root / stamp_name 95 makedirs(dest_path, exist_ok=True) 96 stamp_file = path.join(dest_path, f'{stamp_name}.stamp') 97 98 if not force_download and path.exists(stamp_file): 99 return dest_path 100 101 temp_path = download_root / f'{name}-{revision}' 102 103 if force_download or not path.exists(temp_path): 104 download( 105 name=name, 106 git_url=url, 107 revision=revision, 108 download_root=download_root, 109 target_path=temp_path, 110 show_progress=show_progress 111 ) 112 113 if dest_path.exists(): 114 shutil.rmtree(dest_path) 115 116 _LOGGER.summary("Copy and transform test files") 117 if process_copy is not None: 118 process_copy(temp_path / test_subdir, dest_path) 119 else: 120 copy(temp_path / test_subdir, dest_path) 121 122 _LOGGER.summary(f"Create stamp file {stamp_file}") 123 with os.fdopen(os.open(stamp_file, os.O_RDWR | os.O_CREAT, 0o755), 124 'w+', encoding="utf-8") as _: # Create empty file-marker and close it at once 125 pass 126 127 return dest_path 128 129 130def copy(source_path: Path, dest_path: Path, remove_if_exist: bool = True) -> None: 131 if not source_path.exists(): 132 error = f"Source path {source_path} does not exist! Cannot process this collection" 133 raise FileNotFoundException(error) 134 if source_path == dest_path: 135 return 136 if dest_path.exists() and remove_if_exist: 137 if dest_path.is_file(): 138 dest_path.unlink(missing_ok=True) 139 else: 140 shutil.rmtree(dest_path) 141 if source_path.is_file(): 142 shutil.copy(source_path, dest_path) 143 else: 144 shutil.copytree(source_path, dest_path, dirs_exist_ok=not remove_if_exist) 145 146 147def read_file(file_path: Path | str) -> str: 148 with os.fdopen(os.open(file_path, os.O_RDONLY, 0o755), "r", encoding='utf8') as f_handle: 149 text = f_handle.read() 150 return text 151 152 153def write_2_file(file_path: Path | str, content: str) -> None: 154 """ 155 write content to file if file exists it will be truncated. if file does not exist it wil be created 156 """ 157 makedirs(path.dirname(file_path), exist_ok=True) 158 with open(file_path, mode='w+', encoding="utf-8") as f_handle: 159 f_handle.write(content) 160 161 162def purify(line: str) -> str: 163 return line.strip(" \n").replace(" ", "") 164 165 166def get_all_enum_values(enum_cls: type[EnumT], *, delim: str = ", ", quotes: str = "'") -> str: 167 result = [] 168 for enum_value in enum_cls: 169 result.append(f"{quotes}{enum_value.value}{quotes}") 170 return delim.join(result) 171 172 173def wrap_with_function(code: str, jit_preheat_repeats: int) -> str: 174 return f""" 175 function repeat_for_jit() {{ 176 {code} 177 }} 178 179 for(let i = 0; i < {jit_preheat_repeats}; i++) {{ 180 repeat_for_jit(i) 181 }} 182 """ 183 184 185def iter_files(dirpath: Path | str, allowed_ext: list[str]) -> Iterator[tuple[str, str]]: 186 dirpath_gen = ((name, path.join(str(dirpath), name)) for name in os.listdir(str(dirpath))) 187 for name, path_value in dirpath_gen: 188 if not path.isfile(path_value): 189 continue 190 _, ext = path.splitext(path_value) 191 if ext in allowed_ext: 192 yield name, path_value 193 194 195def is_type_of(value: Any, type_: str) -> bool: # type: ignore[explicit-any] 196 return str(type(value)).find(type_) > 0 197 198 199def get_platform_binary_name(name: str) -> str: 200 if os.name.startswith("nt"): 201 return name + ".exe" 202 return name 203 204 205def get_group_number(test_id: str, total_groups: int) -> int: 206 """ 207 Calculates the number of a group by test_id 208 :param test_id: string value test path or test_id 209 :param total_groups: total number of groups 210 :return: the number of a group in the range [1, total_groups], 211 both boundaries are included. 212 """ 213 random.seed(test_id) 214 return random.randint(1, total_groups) 215 216 217def compare_files(files: list[Path]) -> bool: 218 for file1, file2 in pairwise(files): 219 if not cmp(file1, file2): 220 return False 221 return True 222 223 224def is_build_instrumented(binary: str, line: str, threshold: int = 0) -> bool: 225 check_instrumented = f"nm -an {binary}" 226 227 _LOGGER.all(f"Run: {check_instrumented}") 228 _, output = subprocess.getstatusoutput(check_instrumented) 229 230 return len(re.findall(line, output)) >= threshold 231 232 233def indent(times: int) -> str: 234 return " " * times 235 236 237def has_macro(prop_value: str) -> bool: 238 pattern = r"\$\{([^\}]+)\}" 239 match = re.search(pattern, str(prop_value)) 240 return match is not None 241 242 243def get_all_macros(prop_value: str) -> list[str]: 244 pattern = r"\$\{([^\}]+)\}" 245 result: list[str] = [] 246 for match in re.finditer(pattern, prop_value): 247 result.append(match.group(1)) 248 return result 249 250 251def replace_macro(prop_value: str, macro: str, replacing_value: str) -> str: 252 pattern = r"\$\{" + macro + r"\}" 253 if macro in prop_value: 254 match = re.sub(pattern, replacing_value, prop_value, count=0, flags=re.IGNORECASE) 255 return match 256 return prop_value 257 258 259def expand_file_name(arg: str) -> str | None: 260 if arg is None or arg.startswith("${"): 261 return arg 262 return path.abspath(path.expanduser(arg)) 263 264 265def is_directory(arg: str, create_if_not_exist: bool = False) -> str: 266 expanded: str | None = expand_file_name(arg) 267 if expanded is None: 268 raise InvalidConfiguration(f"The directory {arg} does not exist") 269 if not path.isdir(expanded): 270 if create_if_not_exist: 271 makedirs(expanded) 272 else: 273 raise InvalidConfiguration(f"The directory {arg} does not exist") 274 275 return str(expanded) 276 277 278@dataclass 279class UiUpdater: 280 title: str 281 timeout: int = 2 282 283 @staticmethod 284 def __in_progress(timer_function: Callable, future: Future) -> None: 285 _LOGGER.default(". ") 286 if future.running(): 287 timer_function(future) 288 289 @classmethod 290 def __timer(cls, future: Future) -> None: 291 retimer = threading.Timer(2, cls.__in_progress, args=(cls.__timer, future)) 292 retimer.start() 293 294 def start(self, future: Future) -> None: 295 _LOGGER.default(f"{self.title}. ") 296 self.__in_progress(self.__timer, future) 297 298 299def make_dir_if_not_exist(arg: str) -> str: 300 if not path.isdir(path.abspath(arg)): 301 makedirs(arg) 302 303 return str(path.abspath(arg)) 304 305 306def is_file(arg: str) -> str: 307 expanded: str | None = expand_file_name(arg) 308 if expanded is None or not path.isfile(expanded): 309 raise InvalidConfiguration(f"The file {arg} does not exist") 310 311 return str(expanded) 312 313 314def check_int(value: str, value_name: str, *, is_zero_allowed: bool = False) -> int: 315 ivalue = int(value) 316 if ivalue < 0 or (not is_zero_allowed and ivalue == 0): 317 raise InvalidConfiguration(f"{value} is an invalid {value_name} value") 318 319 return ivalue 320 321 322def convert_minus(line: str) -> str: 323 return line.replace("-", "_") 324 325 326def convert_underscore(line: str) -> str: 327 return line.replace("_", "-") 328 329 330def remove_prefix(line: str, prefix: str) -> str: 331 """Strip prefix from string.""" 332 return line[len(prefix):] if line.startswith(prefix) else line 333 334 335def pretty_divider(length: int = 100) -> str: 336 return '=' * length 337 338 339def get_class_by_name(clazz: str) -> type: 340 last_dot = clazz.rfind(".") 341 class_path = clazz[:last_dot] 342 class_name = clazz[last_dot + 1:] 343 class_module_runner: ModuleType = importlib.import_module(class_path) 344 class_obj: type = getattr(class_module_runner, class_name) 345 return class_obj 346 347 348def get_config_folder() -> Path: 349 urunner_folder = Path(__file__).parent.parent 350 return urunner_folder.joinpath("cfg") 351 352 353def get_config_workflow_folder() -> Path: 354 return get_config_folder().joinpath("workflows") 355 356 357def get_config_test_suite_folder() -> Path: 358 return get_config_folder().joinpath("test-suites") 359 360 361def load_config(config_path: str | None) -> dict[str, str | dict]: 362 __cfg_type = "type" 363 config_from_file = {} 364 if config_path is not None: 365 with open(config_path, encoding="utf-8") as stream: 366 try: 367 config_from_file = yaml.safe_load(stream) 368 except yaml.YAMLError as exc: 369 message = f"{exc}, {yaml.YAMLError}" 370 raise YamlException(message) from exc 371 if __cfg_type not in config_from_file: 372 error = f"Cannot detect type of config '{config_path}'. Have you specified key '{__cfg_type}'?" 373 raise YamlException(error) 374 return config_from_file 375 376 377def to_bool(value: Any) -> bool: # type: ignore[explicit-any] 378 if isinstance(value, bool): 379 return value 380 if isinstance(value, str) and str(value).lower() in ("true", "false"): 381 return str(value).lower() == "true" 382 raise InvalidConfiguration(f"'{value}' cannot be converted to 'bool'") 383 384 385def extract_parameter_name(param_name: str) -> str: 386 pattern = r"^\$\{parameters\.([^\.\}]+)\}$" 387 match = re.match(pattern, param_name) 388 if match: 389 return match.group(1) 390 return param_name 391 392 393def correct_path(root: str | Path, test_list: str | Path) -> Path: 394 if isinstance(test_list, str): 395 test_list = Path(test_list) 396 if isinstance(root, str): 397 root = Path(root) 398 return test_list.absolute() if test_list.exists() else root.joinpath(test_list) 399 400 401def get_test_id(file: Path, start_directory: Path) -> str: 402 relpath = file.relative_to(start_directory) 403 return str(relpath) 404 405 406def prepend_list(pre_list: list, post_list: list) -> list: 407 result = pre_list[:] 408 result.extend(post_list) 409 return result 410 411 412def detect_architecture() -> ArchitectureKind: 413 arch = platform.machine().lower() 414 if arch == "aarch64": 415 return ArchitectureKind.ARM64 416 return ArchitectureKind.AMD64 417 418 419def detect_operating_system() -> OSKind: 420 system = platform.system().lower() 421 if system == "linux": 422 return OSKind.LIN 423 if system == "windows": 424 return OSKind.WIN 425 return OSKind.MAC 426