1#!/usr/bin/env python3 2# -- coding: utf-8 -- 3# 4# Copyright (c) 2024-2025 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17import fnmatch 18import os 19import re 20import shutil 21from collections import Counter 22from functools import cached_property 23from glob import glob 24from os import path 25from pathlib import Path 26from typing import ClassVar 27 28from runner.chapters import Chapters 29from runner.common_exceptions import FileNotFoundException, InvalidConfiguration, TestNotExistException 30from runner.enum_types.params import TestEnv 31from runner.logger import Log 32from runner.options.options import IOptions 33from runner.options.options_collections import CollectionsOptions 34from runner.suites.preparation_step import CopyStep, CustomGeneratorTestPreparationStep, TestPreparationStep 35from runner.suites.step_utils import StepUtils 36from runner.suites.test_lists import TestLists 37from runner.suites.test_standard_flow import TestStandardFlow 38from runner.utils import correct_path, get_group_number, get_test_id 39 40_LOGGER = Log.get_logger(__file__) 41 42 43class TestSuite: 44 CONST_COMMENT: ClassVar[str] = "#" 45 TEST_COMMENT_EXPR = re.compile(r"^\s*(?P<test>[^# ]+)?(\s*#\s*(?P<comment>.+))?", re.MULTILINE) 46 47 def __init__(self, test_env: TestEnv) -> None: 48 self.__test_env = test_env 49 self.__suite_name = test_env.config.test_suite.suite_name 50 self.__work_dir = test_env.work_dir 51 self._list_root = Path(test_env.config.test_suite.list_root) 52 53 self.config = test_env.config 54 55 self._preparation_steps: list[TestPreparationStep] = self.set_preparation_steps() 56 self.__collections_parameters = self.__get_collections_parameters() 57 58 self._explicit_file: Path | None = None 59 self._explicit_list: Path | None = None 60 self._test_lists = TestLists(self._list_root, self.__test_env) 61 self._test_lists.collect_excluded_test_lists(test_name=self.__suite_name) 62 self._test_lists.collect_ignored_test_lists(test_name=self.__suite_name) 63 self.excluded = 0 64 self.ignored_tests: list[Path] = [] 65 self.excluded_tests: list[Path] = [] 66 67 @property 68 def test_root(self) -> Path: 69 return self.__work_dir.gen 70 71 @staticmethod 72 def __load_list(test_root: Path, test_list_path: Path, prefixes: list[str]) -> tuple[list[Path], list[Path]]: 73 result: list[Path] = [] 74 not_found: list[Path] = [] 75 if not test_list_path.exists(): 76 return result, not_found 77 78 with os.fdopen(os.open(test_list_path, os.O_RDONLY, 0o755), 'r', encoding="utf-8") as file_handler: 79 for line in file_handler: 80 is_found, test_path = TestSuite.__load_line(line, test_root, prefixes) 81 if is_found and test_path is not None: 82 result.append(test_path) 83 elif not is_found and test_path is not None: 84 not_found.append(test_path) 85 return result, not_found 86 87 @staticmethod 88 def __load_line(line: str, test_root: Path, prefixes: list[str]) -> tuple[bool, Path | None]: 89 test, _ = TestSuite.__get_test_and_comment_from_line(line.strip(" \n")) 90 if test is None: 91 return False, None 92 test_path = test_root / test 93 if test_path.exists(): 94 return True, test_path 95 is_found, prefixed_test_path = TestSuite.__load_line_with_prefix(test_root, prefixes, test) 96 if prefixed_test_path is not None: 97 return is_found, prefixed_test_path 98 return False, test_path 99 100 @staticmethod 101 def __load_line_with_prefix(test_root: Path, prefixes: list[str], test: str) -> tuple[bool, Path | None]: 102 for prefix in prefixes: 103 test_path = test_root / prefix 104 if test_path.is_file() and test_path.name == test: 105 return True, test_path 106 if test_path.is_dir(): 107 test_path = test_path / test 108 if test_path.exists(): 109 return True, test_path 110 return False, None 111 112 @staticmethod 113 def __get_test_and_comment_from_line(line: str) -> tuple[str | None, str | None]: 114 line_parts = TestSuite.TEST_COMMENT_EXPR.search(line) 115 if line_parts: 116 return line_parts["test"], line_parts["comment"] 117 return None, None 118 119 @staticmethod 120 def __search_duplicates(original: list[Path], kind: str) -> None: 121 main_counter = Counter([str(test) for test in original]) 122 dupes = [test for test, frequency in main_counter.items() if frequency > 1] 123 if len(dupes) > 0: 124 _LOGGER.summary(f"There are {len(dupes)} duplicates in {kind} lists.") 125 for test in dupes: 126 Log.short(_LOGGER, f"\t{test}") 127 elif len(original) > 0: 128 _LOGGER.summary(f"No duplicates found in {kind} lists.") 129 130 @staticmethod 131 def __is_path_excluded(collection: CollectionsOptions, tested_path: Path) -> bool: 132 excluded = [excl for excl in collection.exclude if tested_path.as_posix().endswith(excl)] 133 return len(excluded) > 0 134 135 @cached_property 136 def name(self) -> str: 137 return self.__suite_name 138 139 @cached_property 140 def list_root(self) -> Path: 141 return self._list_root 142 143 @cached_property 144 def explicit_list(self) -> Path | None: 145 return self._explicit_list 146 147 @cached_property 148 def explicit_file(self) -> Path | None: 149 return self._explicit_file 150 151 def process(self, force_generate: bool) -> list[TestStandardFlow]: 152 raw_set = self.__get_raw_set(force_generate) 153 self._explicit_file = self.__set_explicit_file() 154 self._explicit_list = self.__set_explicit_list() 155 executed_set = self.__get_executed_files(raw_set) 156 self.__search_both_excluded_and_ignored_tests() 157 executed_set = self.__get_by_groups(executed_set) 158 tests = self.__create_tests(executed_set) 159 return tests 160 161 def set_preparation_steps(self) -> list[TestPreparationStep]: 162 steps: list[TestPreparationStep] = [] 163 for collection in self.config.test_suite.collections: 164 if collection.generator_script is not None or collection.generator_class is not None: 165 steps.append(CustomGeneratorTestPreparationStep( 166 test_source_path=collection.test_root / collection.name, 167 test_gen_path=self.test_root / collection.name, 168 config=self.config, 169 collection=collection, 170 extension=self.config.test_suite.extension(collection) 171 )) 172 copy_source_path = self.test_root / collection.name 173 real_source_path = collection.test_root / collection.name 174 else: 175 copy_source_path = collection.test_root / collection.name 176 real_source_path = copy_source_path 177 if not real_source_path.exists(): 178 error = (f"Source path '{real_source_path}' does not exist! " 179 f"Cannot process the collection '{collection.name}'") 180 _LOGGER.default(error) 181 continue 182 extension = self.config.test_suite.extension(collection) 183 with_js = self.config.test_suite.with_js(collection) 184 if (extension == "js" and with_js) or extension != "js": 185 steps.extend(self.__add_copy_steps(collection, copy_source_path, extension)) 186 return steps 187 188 def __add_copy_steps(self, collection: CollectionsOptions, copy_source_path: Path, extension: str) \ 189 -> list[TestPreparationStep]: 190 steps: list[TestPreparationStep] = [] 191 if collection.exclude: 192 for file_path in copy_source_path.iterdir(): 193 if self.__is_path_excluded(collection, file_path): 194 continue 195 steps.append(CopyStep( 196 test_source_path=file_path, 197 test_gen_path=self.test_root / collection.name / file_path.name, 198 config=self.config, 199 collection=collection, 200 extension=extension 201 )) 202 else: 203 steps.append(CopyStep( 204 test_source_path=copy_source_path, 205 test_gen_path=self.test_root / collection.name, 206 config=self.config, 207 collection=collection, 208 extension=extension 209 )) 210 return steps 211 212 def __get_raw_set(self, force_generate: bool) -> list[Path]: 213 util = StepUtils() 214 tests: list[Path] = [] 215 if not force_generate and util.are_tests_generated(self.test_root): 216 _LOGGER.all(f"Reused earlier generated tests from {self.test_root}") 217 return self.__load_generated_test_files() 218 _LOGGER.all("Generated folder : " + str(self.test_root)) 219 220 if self.test_root.exists(): 221 _LOGGER.all(f"INFO: {self.test_root.absolute()!s} already exist. WILL BE CLEANED") 222 shutil.rmtree(self.test_root) 223 224 for step in self._preparation_steps: 225 tests.extend(step.transform(force_generate)) 226 tests = list(set(tests)) 227 228 if len(tests) == 0: 229 raise InvalidConfiguration("No tests loaded to execution") 230 231 util.create_report(self.test_root, tests) 232 233 return tests 234 235 def __load_generated_test_files(self) -> list[Path]: 236 tests = [] 237 for collection in self.config.test_suite.collections: 238 extension = self.config.test_suite.extension(collection) 239 glob_expression = path.join(self.test_root, collection.name, f"**/*.{extension}") 240 tests.extend(fnmatch.filter( 241 glob(glob_expression, recursive=True), 242 path.join(self.test_root, self.config.test_suite.filter) 243 )) 244 return [Path(test) for test in set(tests)] 245 246 def __get_explicit_test_path(self, test_id: str) -> Path | None: 247 for collection in self.config.test_suite.collections: 248 if test_id.startswith(collection.name): 249 test_path: Path = correct_path(self.test_root, test_id) 250 if test_path.exists(): 251 return test_path 252 break 253 new_test_id = str(os.path.join(collection.name, test_id)) 254 test_path = correct_path(self.test_root, new_test_id) 255 if test_path and test_path.exists(): 256 return test_path 257 return None 258 259 def __get_executed_files(self, raw_test_files: list[Path]) -> list[Path]: 260 """ 261 Browse the directory, search for files with the specified extension 262 """ 263 _LOGGER.summary(f"Loading tests from the directory {self.test_root}") 264 test_files: list[Path] = [] 265 if self.explicit_file is not None: 266 test_files.append(self.explicit_file) 267 elif self.explicit_list is not None: 268 test_files.extend(self.__load_tests_from_lists([self.explicit_list])) 269 else: 270 if not self.config.test_suite.test_lists.skip_test_lists: 271 self.excluded_tests = self.__load_excluded_tests() 272 self.ignored_tests = self.__load_ignored_tests() 273 test_files.extend(self.__load_test_files(raw_test_files)) 274 return test_files 275 276 def __get_by_groups(self, raw_test_files: list[Path]) -> list[Path]: 277 if self.config.test_suite.groups.quantity > 1: 278 filtered_tests = {test for test in raw_test_files if self.__in_group_number(test)} 279 return list(filtered_tests) 280 return raw_test_files 281 282 def __get_n_group(self) -> int: 283 groups = self.config.test_suite.groups.quantity 284 n_group = self.config.test_suite.groups.number 285 return n_group if n_group <= groups else groups 286 287 def __in_group_number(self, test: Path) -> bool: 288 groups = self.config.test_suite.groups.quantity 289 n_group = self.__get_n_group() 290 return get_group_number(str(test.relative_to(self.test_root)), groups) == n_group 291 292 def __create_test(self, test_file: Path, is_ignored: bool) -> TestStandardFlow: 293 test_id = get_test_id(test_file, self.test_root) 294 coll_name = self.__get_coll_name(test_id) 295 params = self.__collections_parameters.get(coll_name, {}) if coll_name is not None else {} 296 test = TestStandardFlow( 297 test_env=self.__test_env, 298 test_path=test_file, 299 params=IOptions(params), 300 test_id=test_id) 301 test.ignored = is_ignored 302 return test 303 304 def __get_coll_name(self, test_id: str) -> str | None: 305 coll_names = [name for name in self.__collections_parameters if test_id.startswith(name)] 306 if len(coll_names) == 1: 307 return coll_names[0] 308 weights = {len(name): name for name in coll_names} 309 if weights: 310 name = max(weights.keys()) 311 return weights[name] 312 return None 313 314 def __create_tests(self, raw_test_files: list[Path]) -> list[TestStandardFlow]: 315 all_tests = {self.__create_test(test, test in self.ignored_tests) for test in raw_test_files} 316 not_tests = {t for t in all_tests if not t.is_valid_test} 317 valid_tests = all_tests - not_tests 318 319 _LOGGER.all(f"Loaded {len(valid_tests)} tests") 320 321 return list(valid_tests) 322 323 def __load_test_files(self, raw_test_files: list[Path]) -> list[Path]: 324 if self.config.test_suite.filter != "*" and self.config.test_suite.groups.chapters: 325 raise InvalidConfiguration( 326 "Incorrect configuration: specify either filter or chapter options" 327 ) 328 test_files: list[Path] = raw_test_files 329 excluded: list[Path] = list(self.excluded_tests)[:] 330 if self.config.test_suite.groups.chapters: 331 test_files = self.__filter_by_chapters(self.test_root, test_files) 332 pattern = re.compile(self.config.test_suite.filter.replace(".", r"\.").replace("*", ".*")) 333 return [test for test in test_files if test not in excluded and pattern.search(str(test))] 334 335 def __filter_by_chapters(self, base_folder: Path, files: list[Path]) -> list[Path]: 336 test_files: set[Path] = set() 337 chapters: Chapters = self.__parse_chapters() 338 for chapter in self.config.test_suite.groups.chapters: 339 test_files.update(chapters.filter_by_chapter(chapter, base_folder, files)) 340 return list(test_files) 341 342 def __parse_chapters(self) -> Chapters: 343 chapters: Chapters | None = None 344 if path.isfile(self.config.test_suite.groups.chapters_file): 345 chapters = Chapters(self.config.test_suite.groups.chapters_file) 346 else: 347 corrected_chapters_file = correct_path(self.list_root, 348 self.config.test_suite.groups.chapters_file) 349 if path.isfile(corrected_chapters_file): 350 chapters = Chapters(corrected_chapters_file) 351 else: 352 raise FileNotFoundException( 353 f"Not found either '{self.config.test_suite.groups.chapters_file}' or " 354 f"'{corrected_chapters_file}'") 355 return chapters 356 357 def __set_explicit_file(self) -> Path | None: 358 explicit_file = self.config.test_suite.test_lists.explicit_file 359 if explicit_file is not None and self.list_root is not None: 360 test_path = self.__get_explicit_test_path(explicit_file) 361 if test_path and test_path.exists(): 362 return test_path 363 raise TestNotExistException(f"Test '{explicit_file}' does not exist") 364 return None 365 366 def __set_explicit_list(self) -> Path | None: 367 if self.config.test_suite.test_lists.explicit_list is not None and self.list_root is not None: 368 return correct_path(self.list_root, self.config.test_suite.test_lists.explicit_list) 369 return None 370 371 def __load_tests_from_lists(self, lists: list[Path]) -> list[Path]: 372 tests = [] 373 any_not_found = False 374 report = [] 375 for list_path in lists: 376 _LOGGER.default(f"Loading tests from the list {list_path}") 377 prefixes: list[str] = [] 378 if len(self.config.test_suite.collections) > 1: 379 prefixes = [coll.name for coll in self.config.test_suite.collections] 380 loaded, not_found = self.__load_list(self.test_root, list_path, prefixes) 381 tests.extend(loaded) 382 if not_found: 383 any_not_found = True 384 report.append(f"List '{list_path}': following tests are not found on the file system:") 385 for test in not_found: 386 report.append(str(test)) 387 if any_not_found: 388 _LOGGER.summary("\n".join(report)) 389 return tests 390 391 def __load_excluded_tests(self) -> list[Path]: 392 """ 393 Read excluded_lists and load list of excluded tests 394 """ 395 excluded_tests = self.__load_tests_from_lists(self._test_lists.excluded_lists) 396 self.excluded = len(excluded_tests) 397 self.__search_duplicates(excluded_tests, "excluded") 398 return excluded_tests 399 400 def __load_ignored_tests(self) -> list[Path]: 401 """ 402 Read ignored_lists and load list of ignored tests 403 """ 404 ignored_tests = self.__load_tests_from_lists(self._test_lists.ignored_lists) 405 self.__search_duplicates(ignored_tests, "ignored") 406 return ignored_tests 407 408 def __search_both_excluded_and_ignored_tests(self) -> None: 409 already_excluded = [test for test in self.ignored_tests if test in self.excluded_tests] 410 if not already_excluded: 411 return 412 _LOGGER.summary(f"Found {len(already_excluded)} tests present both " 413 "in excluded and ignored test lists.") 414 for test in already_excluded: 415 _LOGGER.all(f"\t{test}") 416 self.ignored_tests.remove(test) 417 418 def __get_collections_parameters(self) -> dict[str, dict[str, list | str]]: 419 result: dict[str, dict[str, list | str]] = {} 420 for collection in self.__test_env.config.test_suite.collections: 421 result[collection.name] = collection.parameters 422 return result 423