• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2022-2025 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import argparse
19import logging
20import multiprocessing
21import os
22import re
23import shutil
24import subprocess
25import sys
26from abc import ABC, abstractmethod
27from glob import glob
28from pathlib import Path
29from typing import List
30
31from runner import utils
32from runner.logger import Log
33from runner.options.config import Config
34from runner.plugins.ets.ets_suites import EtsSuites
35from runner.plugins.ets.ets_templates.ets_templates_generator import EtsTemplatesGenerator
36from runner.plugins.ets.ets_templates.test_metadata import get_metadata
37from runner.plugins.ets.stdlib_templates.stdlib_templates_generator import StdlibTemplatesGenerator
38from runner.generators.ets_func_tests.ets_func_test_template_generator import EtsFuncTestsCodeGenerator
39from runner.plugins.ets.utils.exceptions import InvalidFileFormatException, InvalidFileStructureException, \
40    UnknownTemplateException
41from runner.plugins.ets.utils.file_structure import walk_test_subdirs
42from runner.utils import read_file, write_2_file
43
44_LOGGER = logging.getLogger("runner.plugins.ets.generate")
45
46
47class TestPreparationStep(ABC):
48    def __init__(self, test_source_path: Path, test_gen_path: Path, config: Config) -> None:
49        self.__test_source_path = test_source_path
50        self.__test_gen_path = test_gen_path
51        self.config = config
52
53    @property
54    def test_source_path(self) -> Path:
55        return self.__test_source_path
56
57    @property
58    def test_gen_path(self) -> Path:
59        return self.__test_gen_path
60
61    @abstractmethod
62    def transform(self, force_generated: bool) -> List[str]:
63        pass
64
65
66class CtsTestPreparationStep(TestPreparationStep):
67    def __str__(self) -> str:
68        return f"Test Generator for '{EtsSuites.CTS.value}' test suite"
69
70    def transform(self, force_generated: bool) -> List[str]:
71        ets_templates_generator = EtsTemplatesGenerator(self.test_source_path, self.test_gen_path)
72        return ets_templates_generator.generate()
73
74
75class CustomGeneratorTestPreparationStep(TestPreparationStep):
76    def __init__(self, test_source_path: Path, test_gen_path: Path, config: Config, extension: str) -> None:
77        super().__init__(test_source_path, test_gen_path, config)
78        self.extension = extension
79
80    def __str__(self) -> str:
81        return f"Test Generator for '{EtsSuites.CUSTOM.value} - {self.config.custom.suite_name}' test suite"
82
83    def transform(self, force_generated: bool) -> List[str]:
84        # call of the custom generator
85        if self.test_gen_path.exists() and force_generated:
86            shutil.rmtree(self.test_gen_path)
87        self.test_gen_path.mkdir(exist_ok=True)
88        cmd = [
89            self.config.custom.generator,
90            "--source", self.test_source_path,
91            "--target", self.test_gen_path
92        ]
93        cmd.extend(self.config.custom.generator_options)
94        timeout = 300
95        result: List[str] = []
96        with subprocess.Popen(
97                cmd,
98                stdout=subprocess.PIPE,
99                stderr=subprocess.PIPE,
100                encoding='utf-8',
101                errors='ignore',
102        ) as process:
103            try:
104                process.communicate(timeout=timeout)
105            except subprocess.TimeoutExpired:
106                process.kill()
107                Log.exception_and_raise(_LOGGER, f"Generation failed by timeout after {timeout} sec")
108            except Exception as ex:  # pylint: disable=broad-except
109                Log.exception_and_raise(_LOGGER, f"Generation failed by unknown reason: {ex}")
110            finally:
111                glob_expression = os.path.join(self.test_gen_path, f"**/*.{self.extension}")
112                result.extend(glob(glob_expression, recursive=True))
113
114        return result
115
116
117class FuncTestPreparationStep(TestPreparationStep):
118    def __str__(self) -> str:
119        return f"Test Generator for '{EtsSuites.FUNC.value}' test suite"
120
121    @staticmethod
122    def generate_template_tests(template_root_path: Path, test_gen_path: Path) -> List[str]:
123        """
124        Renders all templates and saves them.
125        """
126        if not template_root_path.is_dir():
127            Log.all(_LOGGER, f"ERROR: {str(template_root_path.absolute())} must be a directory")
128            return []
129
130        ets_render = StdlibTemplatesGenerator(template_root_path)
131        generated_tests = []
132        for dir_name in walk_test_subdirs(template_root_path):
133            dir_outpath = test_gen_path / dir_name.path.relative_to(template_root_path)
134            generated_tests_tmp = ets_render.render_and_write_templates(template_root_path, dir_name.path, dir_outpath)
135            generated_tests.extend(generated_tests_tmp)
136
137        return generated_tests
138
139    def transform(self, force_generated: bool) -> List[str]:
140        return self.generate_template_tests(self.test_source_path, self.test_gen_path)
141
142
143class FrontendFuncTestPreparationStep(TestPreparationStep):
144    def __str__(self) -> str:
145        return f"Test Generator for '{EtsSuites.FUNC.value}' test suite"
146
147    @staticmethod
148    def generate_template_tests(template_root_path: Path, test_gen_path: Path) -> List[str]:
149        """
150        Renders all templates and saves them.
151        """
152        if not template_root_path.is_dir():
153            Log.all(_LOGGER, f"ERROR: {str(template_root_path.absolute())} must be a directory")
154            return []
155
156        generated_tests = []
157        ets_func_test_render = EtsFuncTestsCodeGenerator(template_root_path)
158        for dir_name in walk_test_subdirs(template_root_path):
159            dir_outpath = test_gen_path / dir_name.path.relative_to(template_root_path)
160            generated_tests_tmp = ets_func_test_render.render_and_write_templates(template_root_path,
161                                                                                  dir_name.path,
162                                                                                  dir_outpath)
163            generated_tests.extend(generated_tests_tmp)
164
165        return generated_tests
166
167    def transform(self, force_generated: bool) -> List[str]:
168        return self.generate_template_tests(self.test_source_path, self.test_gen_path)
169
170
171class ESCheckedTestPreparationStep(TestPreparationStep):
172    def __str__(self) -> str:
173        return f"Test Generator for '{EtsSuites.ESCHECKED.value}' test suite"
174
175    def transform(self, force_generated: bool) -> List[str]:
176        confs = list(glob(os.path.join(self.test_source_path, "**/*.yaml"), recursive=True))
177        generator_root = (Path(self.config.general.static_core_root) /
178                          "tests" /
179                          "tests-u-runner" /
180                          "tools" /
181                          "generate-es-checked")
182        generator_executable = generator_root / "main.rb"
183        res = subprocess.run(
184            [
185                generator_executable,
186                '--proc',
187                str(self.config.general.processes),
188                '--out',
189                self.test_gen_path,
190                '--tmp',
191                self.test_gen_path / 'tmp',
192                '--ts-node',
193                f'npx:--prefix:{generator_root}:ts-node:-P:{generator_root / "tsconfig.json"}',
194                *confs
195            ],
196            capture_output=True,
197            encoding=sys.stdout.encoding,
198            check=False,
199            errors='replace',
200        )
201        if res.returncode != 0:
202            Log.default(_LOGGER,
203                        'Failed to run es cross-validator, please, make sure that '
204                        'all required tools are installed (see tests-u-runner/readme.md#ets-es-checked-dependencies)')
205            Log.exception_and_raise(_LOGGER, f"invalid return code {res.returncode}\n" + res.stdout + res.stderr)
206        glob_expression = os.path.join(self.test_gen_path, "**/*.ets")
207        return list(glob(glob_expression, recursive=True))
208
209
210class CopyStep(TestPreparationStep):
211    def __str__(self) -> str:
212        return "Test preparation step for any ets test suite: copying"
213
214    def transform(self, force_generated: bool) -> List[str]:
215        utils.copy(self.test_source_path, self.test_gen_path, remove_if_exist=False)
216        glob_expression = os.path.join(self.test_gen_path, "**/*.ets")
217        return list(glob(glob_expression, recursive=True))
218
219
220class JitStep(TestPreparationStep):
221    __main_pattern = r"\bfunction\s+(?P<main>main)\b"
222    __param_pattern = r"\s*\(\s*(?P<param>(?P<param_name>\w+)(\s*:\s*\w+))?\s*\)"
223    __return_pattern = r"\s*(:\s*(?P<return_type>\w+)\b)?"
224    __throws_pattern = r"\s*(?P<throws>throws)?"
225    __indent = "    "
226
227    def __init__(self, test_source_path: Path, test_gen_path: Path, config: Config, num_repeats: int = 0):
228        super().__init__(test_source_path, test_gen_path, config)
229        self.num_repeats = num_repeats
230        self.main_regexp = re.compile(
231            f"{self.__main_pattern}{self.__param_pattern}{self.__return_pattern}{self.__throws_pattern}",
232            re.MULTILINE
233        )
234
235    def __str__(self) -> str:
236        return "Test preparation step for any ets test suite: transforming for JIT testing"
237
238    def transform(self, force_generated: bool) -> List[str]:
239        glob_expression = os.path.join(self.test_gen_path, "**/*.ets")
240        tests = list(glob(glob_expression, recursive=True))
241        with multiprocessing.Pool(processes=self.config.general.processes) as pool:
242            run_tests = pool.imap_unordered(self.jit_transform_one_test, tests, chunksize=self.config.general.chunksize)
243            pool.close()
244            pool.join()
245
246        return list(run_tests)
247
248    def jit_transform_one_test(self, test_path: str) -> str:
249        metadata = get_metadata(Path(test_path))
250        is_convert = (not metadata.tags.not_a_test and
251                      not metadata.tags.compile_only and
252                      not metadata.tags.no_warmup)
253        if not is_convert:
254            return test_path
255
256        original = read_file(test_path)
257        match = self.main_regexp.search(original)
258        if match is None:
259            return test_path
260
261        is_int_main = match.group("return_type") == "int"
262        return_type = ": int" if is_int_main else ""
263        throws = "throws " if match.group("throws") else ""
264        param_line = param if (param := match.group("param")) is not None else ""
265        param_name = param if (param := match.group("param_name")) is not None else ""
266
267        tail = [f"\nfunction main({param_line}){return_type} {throws}{{"]
268        if is_int_main:
269            tail.append(f"{self.__indent}let result = 0")
270        tail.append(f"{self.__indent}for(let i = 0; i < {self.num_repeats}; i++) {{")
271        if is_int_main:
272            tail.append(f"{self.__indent * 2}result += main_run({param_name})")
273        else:
274            tail.append(f"{self.__indent * 2}main_run({param_name})")
275        tail.append(f"{self.__indent}}}")
276        if is_int_main:
277            tail.append(f"{self.__indent}return result;")
278        tail.append("}")
279
280        result = self.main_regexp.sub(lambda arg: arg.group(0).replace("main", "main_run"), original)
281        result += "\n".join(tail)
282        write_2_file(test_path, result)
283        return test_path
284
285
286def command_line_parser() -> argparse.Namespace:
287    parser = argparse.ArgumentParser(description="Generator test runner")
288    parser.add_argument("-t", "--templates-dir", type=Path, dest='templates_dir',
289                        help="Path to a root directory that contains test templates and parameters",
290                        required=True)
291    parser.add_argument("-o", "--output-dir", type=Path, dest='output_dir',
292                        help="Path to output directory. Output directory and all" +
293                             " subdirectories are created automatically",
294                        required=True)
295    return parser.parse_args()
296
297
298def main() -> None:
299    args = command_line_parser()
300
301    try:
302        if not FuncTestPreparationStep.generate_template_tests(args.templates_dir, args.output_dir):
303            sys.exit(1)
304    except InvalidFileFormatException as inv_format_exp:
305        Log.all(_LOGGER, f'Error:  {inv_format_exp.message}')
306        sys.exit(1)
307    except InvalidFileStructureException as inv_fs_exp:
308        Log.all(_LOGGER, f'Error:  {inv_fs_exp.message}')
309        sys.exit(1)
310    except UnknownTemplateException as unknown_template_exp:
311        Log.all(_LOGGER, f"{unknown_template_exp.filepath}: exception while processing template:")
312        Log.all(_LOGGER, f"\t {repr(unknown_template_exp.exception)}")
313        sys.exit(1)
314    Log.all(_LOGGER, "Finished")
315
316
317if __name__ == "__main__":
318    main()
319