• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -- coding: utf-8 --
3#
4# Copyright (c) 2024-2025 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17import fnmatch
18import os
19import re
20import shutil
21from collections import Counter
22from functools import cached_property
23from glob import glob
24from os import path
25from pathlib import Path
26from typing import ClassVar
27
28from runner.chapters import Chapters
29from runner.common_exceptions import FileNotFoundException, InvalidConfiguration, TestNotExistException
30from runner.enum_types.params import TestEnv
31from runner.logger import Log
32from runner.options.options import IOptions
33from runner.options.options_collections import CollectionsOptions
34from runner.suites.preparation_step import CopyStep, CustomGeneratorTestPreparationStep, TestPreparationStep
35from runner.suites.step_utils import StepUtils
36from runner.suites.test_lists import TestLists
37from runner.suites.test_standard_flow import TestStandardFlow
38from runner.utils import correct_path, get_group_number, get_test_id
39
40_LOGGER = Log.get_logger(__file__)
41
42
43class TestSuite:
44    CONST_COMMENT: ClassVar[str] = "#"
45    TEST_COMMENT_EXPR = re.compile(r"^\s*(?P<test>[^# ]+)?(\s*#\s*(?P<comment>.+))?", re.MULTILINE)
46
47    def __init__(self, test_env: TestEnv) -> None:
48        self.__test_env = test_env
49        self.__suite_name = test_env.config.test_suite.suite_name
50        self.__work_dir = test_env.work_dir
51        self._list_root = Path(test_env.config.test_suite.list_root)
52
53        self.config = test_env.config
54
55        self._preparation_steps: list[TestPreparationStep] = self.set_preparation_steps()
56        self.__collections_parameters = self.__get_collections_parameters()
57
58        self._explicit_file: Path | None = None
59        self._explicit_list: Path | None = None
60        self._test_lists = TestLists(self._list_root, self.__test_env)
61        self._test_lists.collect_excluded_test_lists(test_name=self.__suite_name)
62        self._test_lists.collect_ignored_test_lists(test_name=self.__suite_name)
63        self.excluded = 0
64        self.ignored_tests: list[Path] = []
65        self.excluded_tests: list[Path] = []
66
67    @property
68    def test_root(self) -> Path:
69        return self.__work_dir.gen
70
71    @staticmethod
72    def __load_list(test_root: Path, test_list_path: Path, prefixes: list[str]) -> tuple[list[Path], list[Path]]:
73        result: list[Path] = []
74        not_found: list[Path] = []
75        if not test_list_path.exists():
76            return result, not_found
77
78        with os.fdopen(os.open(test_list_path, os.O_RDONLY, 0o755), 'r', encoding="utf-8") as file_handler:
79            for line in file_handler:
80                is_found, test_path = TestSuite.__load_line(line, test_root, prefixes)
81                if is_found and test_path is not None:
82                    result.append(test_path)
83                elif not is_found and test_path is not None:
84                    not_found.append(test_path)
85        return result, not_found
86
87    @staticmethod
88    def __load_line(line: str, test_root: Path, prefixes: list[str]) -> tuple[bool, Path | None]:
89        test, _ = TestSuite.__get_test_and_comment_from_line(line.strip(" \n"))
90        if test is None:
91            return False, None
92        test_path = test_root / test
93        if test_path.exists():
94            return True, test_path
95        is_found, prefixed_test_path = TestSuite.__load_line_with_prefix(test_root, prefixes, test)
96        if prefixed_test_path is not None:
97            return is_found, prefixed_test_path
98        return False, test_path
99
100    @staticmethod
101    def __load_line_with_prefix(test_root: Path, prefixes: list[str], test: str) -> tuple[bool, Path | None]:
102        for prefix in prefixes:
103            test_path = test_root / prefix
104            if test_path.is_file() and test_path.name == test:
105                return True, test_path
106            if test_path.is_dir():
107                test_path = test_path / test
108                if test_path.exists():
109                    return True, test_path
110        return False, None
111
112    @staticmethod
113    def __get_test_and_comment_from_line(line: str) -> tuple[str | None, str | None]:
114        line_parts = TestSuite.TEST_COMMENT_EXPR.search(line)
115        if line_parts:
116            return line_parts["test"], line_parts["comment"]
117        return None, None
118
119    @staticmethod
120    def __search_duplicates(original: list[Path], kind: str) -> None:
121        main_counter = Counter([str(test) for test in original])
122        dupes = [test for test, frequency in main_counter.items() if frequency > 1]
123        if len(dupes) > 0:
124            _LOGGER.summary(f"There are {len(dupes)} duplicates in {kind} lists.")
125            for test in dupes:
126                Log.short(_LOGGER, f"\t{test}")
127        elif len(original) > 0:
128            _LOGGER.summary(f"No duplicates found in {kind} lists.")
129
130    @staticmethod
131    def __is_path_excluded(collection: CollectionsOptions, tested_path: Path) -> bool:
132        excluded = [excl for excl in collection.exclude if tested_path.as_posix().endswith(excl)]
133        return len(excluded) > 0
134
135    @cached_property
136    def name(self) -> str:
137        return self.__suite_name
138
139    @cached_property
140    def list_root(self) -> Path:
141        return self._list_root
142
143    @cached_property
144    def explicit_list(self) -> Path | None:
145        return self._explicit_list
146
147    @cached_property
148    def explicit_file(self) -> Path | None:
149        return self._explicit_file
150
151    def process(self, force_generate: bool) -> list[TestStandardFlow]:
152        raw_set = self.__get_raw_set(force_generate)
153        self._explicit_file = self.__set_explicit_file()
154        self._explicit_list = self.__set_explicit_list()
155        executed_set = self.__get_executed_files(raw_set)
156        self.__search_both_excluded_and_ignored_tests()
157        executed_set = self.__get_by_groups(executed_set)
158        tests = self.__create_tests(executed_set)
159        return tests
160
161    def set_preparation_steps(self) -> list[TestPreparationStep]:
162        steps: list[TestPreparationStep] = []
163        for collection in self.config.test_suite.collections:
164            if collection.generator_script is not None or collection.generator_class is not None:
165                steps.append(CustomGeneratorTestPreparationStep(
166                    test_source_path=collection.test_root / collection.name,
167                    test_gen_path=self.test_root / collection.name,
168                    config=self.config,
169                    collection=collection,
170                    extension=self.config.test_suite.extension(collection)
171                ))
172                copy_source_path = self.test_root / collection.name
173                real_source_path = collection.test_root / collection.name
174            else:
175                copy_source_path = collection.test_root / collection.name
176                real_source_path = copy_source_path
177            if not real_source_path.exists():
178                error = (f"Source path '{real_source_path}' does not exist! "
179                         f"Cannot process the collection '{collection.name}'")
180                _LOGGER.default(error)
181                continue
182            extension = self.config.test_suite.extension(collection)
183            with_js = self.config.test_suite.with_js(collection)
184            if (extension == "js" and with_js) or extension != "js":
185                steps.extend(self.__add_copy_steps(collection, copy_source_path, extension))
186        return steps
187
188    def __add_copy_steps(self, collection: CollectionsOptions, copy_source_path: Path, extension: str) \
189            -> list[TestPreparationStep]:
190        steps: list[TestPreparationStep] = []
191        if collection.exclude:
192            for file_path in copy_source_path.iterdir():
193                if self.__is_path_excluded(collection, file_path):
194                    continue
195                steps.append(CopyStep(
196                    test_source_path=file_path,
197                    test_gen_path=self.test_root / collection.name / file_path.name,
198                    config=self.config,
199                    collection=collection,
200                    extension=extension
201                ))
202        else:
203            steps.append(CopyStep(
204                test_source_path=copy_source_path,
205                test_gen_path=self.test_root / collection.name,
206                config=self.config,
207                collection=collection,
208                extension=extension
209            ))
210        return steps
211
212    def __get_raw_set(self, force_generate: bool) -> list[Path]:
213        util = StepUtils()
214        tests: list[Path] = []
215        if not force_generate and util.are_tests_generated(self.test_root):
216            _LOGGER.all(f"Reused earlier generated tests from {self.test_root}")
217            return self.__load_generated_test_files()
218        _LOGGER.all("Generated folder : " + str(self.test_root))
219
220        if self.test_root.exists():
221            _LOGGER.all(f"INFO: {self.test_root.absolute()!s} already exist. WILL BE CLEANED")
222            shutil.rmtree(self.test_root)
223
224        for step in self._preparation_steps:
225            tests.extend(step.transform(force_generate))
226        tests = list(set(tests))
227
228        if len(tests) == 0:
229            raise InvalidConfiguration("No tests loaded to execution")
230
231        util.create_report(self.test_root, tests)
232
233        return tests
234
235    def __load_generated_test_files(self) -> list[Path]:
236        tests = []
237        for collection in self.config.test_suite.collections:
238            extension = self.config.test_suite.extension(collection)
239            glob_expression = path.join(self.test_root, collection.name, f"**/*.{extension}")
240            tests.extend(fnmatch.filter(
241                glob(glob_expression, recursive=True),
242                path.join(self.test_root, self.config.test_suite.filter)
243            ))
244        return [Path(test) for test in set(tests)]
245
246    def __get_explicit_test_path(self, test_id: str) -> Path | None:
247        for collection in self.config.test_suite.collections:
248            if test_id.startswith(collection.name):
249                test_path: Path = correct_path(self.test_root, test_id)
250                if test_path.exists():
251                    return test_path
252                break
253            new_test_id = str(os.path.join(collection.name, test_id))
254            test_path = correct_path(self.test_root, new_test_id)
255            if test_path and test_path.exists():
256                return test_path
257        return None
258
259    def __get_executed_files(self, raw_test_files: list[Path]) -> list[Path]:
260        """
261        Browse the directory, search for files with the specified extension
262        """
263        _LOGGER.summary(f"Loading tests from the directory {self.test_root}")
264        test_files: list[Path] = []
265        if self.explicit_file is not None:
266            test_files.append(self.explicit_file)
267        elif self.explicit_list is not None:
268            test_files.extend(self.__load_tests_from_lists([self.explicit_list]))
269        else:
270            if not self.config.test_suite.test_lists.skip_test_lists:
271                self.excluded_tests = self.__load_excluded_tests()
272                self.ignored_tests = self.__load_ignored_tests()
273            test_files.extend(self.__load_test_files(raw_test_files))
274        return test_files
275
276    def __get_by_groups(self, raw_test_files: list[Path]) -> list[Path]:
277        if self.config.test_suite.groups.quantity > 1:
278            filtered_tests = {test for test in raw_test_files if self.__in_group_number(test)}
279            return list(filtered_tests)
280        return raw_test_files
281
282    def __get_n_group(self) -> int:
283        groups = self.config.test_suite.groups.quantity
284        n_group = self.config.test_suite.groups.number
285        return n_group if n_group <= groups else groups
286
287    def __in_group_number(self, test: Path) -> bool:
288        groups = self.config.test_suite.groups.quantity
289        n_group = self.__get_n_group()
290        return get_group_number(str(test.relative_to(self.test_root)), groups) == n_group
291
292    def __create_test(self, test_file: Path, is_ignored: bool) -> TestStandardFlow:
293        test_id = get_test_id(test_file, self.test_root)
294        coll_name = self.__get_coll_name(test_id)
295        params = self.__collections_parameters.get(coll_name, {}) if coll_name is not None else {}
296        test = TestStandardFlow(
297            test_env=self.__test_env,
298            test_path=test_file,
299            params=IOptions(params),
300            test_id=test_id)
301        test.ignored = is_ignored
302        return test
303
304    def __get_coll_name(self, test_id: str) -> str | None:
305        coll_names = [name for name in self.__collections_parameters if test_id.startswith(name)]
306        if len(coll_names) == 1:
307            return coll_names[0]
308        weights = {len(name): name for name in coll_names}
309        if weights:
310            name = max(weights.keys())
311            return weights[name]
312        return None
313
314    def __create_tests(self, raw_test_files: list[Path]) -> list[TestStandardFlow]:
315        all_tests = {self.__create_test(test, test in self.ignored_tests) for test in raw_test_files}
316        not_tests = {t for t in all_tests if not t.is_valid_test}
317        valid_tests = all_tests - not_tests
318
319        _LOGGER.all(f"Loaded {len(valid_tests)} tests")
320
321        return list(valid_tests)
322
323    def __load_test_files(self, raw_test_files: list[Path]) -> list[Path]:
324        if self.config.test_suite.filter != "*" and self.config.test_suite.groups.chapters:
325            raise InvalidConfiguration(
326                "Incorrect configuration: specify either filter or chapter options"
327            )
328        test_files: list[Path] = raw_test_files
329        excluded: list[Path] = list(self.excluded_tests)[:]
330        if self.config.test_suite.groups.chapters:
331            test_files = self.__filter_by_chapters(self.test_root, test_files)
332        pattern = re.compile(self.config.test_suite.filter.replace(".", r"\.").replace("*", ".*"))
333        return [test for test in test_files if test not in excluded and pattern.search(str(test))]
334
335    def __filter_by_chapters(self, base_folder: Path, files: list[Path]) -> list[Path]:
336        test_files: set[Path] = set()
337        chapters: Chapters = self.__parse_chapters()
338        for chapter in self.config.test_suite.groups.chapters:
339            test_files.update(chapters.filter_by_chapter(chapter, base_folder, files))
340        return list(test_files)
341
342    def __parse_chapters(self) -> Chapters:
343        chapters: Chapters | None = None
344        if path.isfile(self.config.test_suite.groups.chapters_file):
345            chapters = Chapters(self.config.test_suite.groups.chapters_file)
346        else:
347            corrected_chapters_file = correct_path(self.list_root,
348                                                   self.config.test_suite.groups.chapters_file)
349            if path.isfile(corrected_chapters_file):
350                chapters = Chapters(corrected_chapters_file)
351            else:
352                raise FileNotFoundException(
353                    f"Not found either '{self.config.test_suite.groups.chapters_file}' or "
354                    f"'{corrected_chapters_file}'")
355        return chapters
356
357    def __set_explicit_file(self) -> Path | None:
358        explicit_file = self.config.test_suite.test_lists.explicit_file
359        if explicit_file is not None and self.list_root is not None:
360            test_path = self.__get_explicit_test_path(explicit_file)
361            if test_path and test_path.exists():
362                return test_path
363            raise TestNotExistException(f"Test '{explicit_file}' does not exist")
364        return None
365
366    def __set_explicit_list(self) -> Path | None:
367        if self.config.test_suite.test_lists.explicit_list is not None and self.list_root is not None:
368            return correct_path(self.list_root, self.config.test_suite.test_lists.explicit_list)
369        return None
370
371    def __load_tests_from_lists(self, lists: list[Path]) -> list[Path]:
372        tests = []
373        any_not_found = False
374        report = []
375        for list_path in lists:
376            _LOGGER.default(f"Loading tests from the list {list_path}")
377            prefixes: list[str] = []
378            if len(self.config.test_suite.collections) > 1:
379                prefixes = [coll.name for coll in self.config.test_suite.collections]
380            loaded, not_found = self.__load_list(self.test_root, list_path, prefixes)
381            tests.extend(loaded)
382            if not_found:
383                any_not_found = True
384                report.append(f"List '{list_path}': following tests are not found on the file system:")
385                for test in not_found:
386                    report.append(str(test))
387        if any_not_found:
388            _LOGGER.summary("\n".join(report))
389        return tests
390
391    def __load_excluded_tests(self) -> list[Path]:
392        """
393        Read excluded_lists and load list of excluded tests
394        """
395        excluded_tests = self.__load_tests_from_lists(self._test_lists.excluded_lists)
396        self.excluded = len(excluded_tests)
397        self.__search_duplicates(excluded_tests, "excluded")
398        return excluded_tests
399
400    def __load_ignored_tests(self) -> list[Path]:
401        """
402        Read ignored_lists and load list of ignored tests
403        """
404        ignored_tests = self.__load_tests_from_lists(self._test_lists.ignored_lists)
405        self.__search_duplicates(ignored_tests, "ignored")
406        return ignored_tests
407
408    def __search_both_excluded_and_ignored_tests(self) -> None:
409        already_excluded = [test for test in self.ignored_tests if test in self.excluded_tests]
410        if not already_excluded:
411            return
412        _LOGGER.summary(f"Found {len(already_excluded)} tests present both "
413                        "in excluded and ignored test lists.")
414        for test in already_excluded:
415            _LOGGER.all(f"\t{test}")
416            self.ignored_tests.remove(test)
417
418    def __get_collections_parameters(self) -> dict[str, dict[str, list | str]]:
419        result: dict[str, dict[str, list | str]] = {}
420        for collection in self.__test_env.config.test_suite.collections:
421            result[collection.name] = collection.parameters
422        return result
423