• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2022-2024 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import argparse
19import logging
20import multiprocessing
21import os
22import re
23import shutil
24import subprocess
25import sys
26from abc import ABC, abstractmethod
27from glob import glob
28from pathlib import Path
29from typing import List
30
31from runner import utils
32from runner.logger import Log
33from runner.options.config import Config
34from runner.plugins.ets.ets_suites import EtsSuites
35from runner.plugins.ets.ets_templates.ets_templates_generator import EtsTemplatesGenerator
36from runner.plugins.ets.ets_templates.test_metadata import get_metadata
37from runner.plugins.ets.stdlib_templates.stdlib_templates_generator import StdlibTemplatesGenerator
38from runner.plugins.ets.utils.exceptions import InvalidFileFormatException, InvalidFileStructureException, \
39    UnknownTemplateException
40from runner.plugins.ets.utils.file_structure import walk_test_subdirs
41from runner.utils import read_file, write_2_file
42
43_LOGGER = logging.getLogger("runner.plugins.ets.generate")
44
45
46class TestPreparationStep(ABC):
47    def __init__(self, test_source_path: Path, test_gen_path: Path, config: Config) -> None:
48        self.__test_source_path = test_source_path
49        self.__test_gen_path = test_gen_path
50        self.config = config
51
52    @property
53    def test_source_path(self) -> Path:
54        return self.__test_source_path
55
56    @property
57    def test_gen_path(self) -> Path:
58        return self.__test_gen_path
59
60    @abstractmethod
61    def transform(self, force_generated: bool) -> List[str]:
62        pass
63
64
65class CtsTestPreparationStep(TestPreparationStep):
66    def __str__(self) -> str:
67        return f"Test Generator for '{EtsSuites.CTS.value}' test suite"
68
69    def transform(self, force_generated: bool) -> List[str]:
70        ets_templates_generator = EtsTemplatesGenerator(self.test_source_path, self.test_gen_path)
71        return ets_templates_generator.generate()
72
73
74class CustomGeneratorTestPreparationStep(TestPreparationStep):
75    def __init__(self, test_source_path: Path, test_gen_path: Path, config: Config, extension: str) -> None:
76        super().__init__(test_source_path, test_gen_path, config)
77        self.extension = extension
78
79    def __str__(self) -> str:
80        return f"Test Generator for '{EtsSuites.CUSTOM.value} - {self.config.custom.suite_name}' test suite"
81
82    def transform(self, force_generated: bool) -> List[str]:
83        # call of the custom generator
84        if self.test_gen_path.exists() and force_generated:
85            shutil.rmtree(self.test_gen_path)
86        self.test_gen_path.mkdir(exist_ok=True)
87        cmd = [self.config.custom.generator,
88               "--source", self.test_source_path,
89               "--target", self.test_gen_path]
90        cmd.extend(self.config.custom.generator_options)
91        timeout = 300
92        result: List[str] = []
93        with subprocess.Popen(
94                cmd,
95                stdout=subprocess.PIPE,
96                stderr=subprocess.PIPE,
97                encoding='utf-8',
98                errors='ignore',
99        ) as process:
100            try:
101                process.communicate(timeout=timeout)
102            except subprocess.TimeoutExpired:
103                process.kill()
104                Log.exception_and_raise(_LOGGER, f"Generation failed by timeout after {timeout} sec")
105            except Exception as ex:  # pylint: disable=broad-except
106                Log.exception_and_raise(_LOGGER, f"Generation failed by unknown reason: {ex}")
107            finally:
108                glob_expression = os.path.join(self.test_gen_path, f"**/*.{self.extension}")
109                result.extend(glob(glob_expression, recursive=True))
110
111        return result
112
113
114class FuncTestPreparationStep(TestPreparationStep):
115    def __str__(self) -> str:
116        return f"Test Generator for '{EtsSuites.FUNC.value}' test suite"
117
118    @staticmethod
119    def generate_template_tests(template_root_path: Path, test_gen_path: Path) -> List[str]:
120        """
121        Renders all templates and saves them.
122        """
123        if not template_root_path.is_dir():
124            Log.all(_LOGGER, f"ERROR: {str(template_root_path.absolute())} must be a directory")
125            return []
126
127        ets_render = StdlibTemplatesGenerator(template_root_path)
128        generated_tests = []
129        for dir_name in walk_test_subdirs(template_root_path):
130            dir_outpath = test_gen_path / dir_name.path.relative_to(template_root_path)
131            generated_tests_tmp = ets_render.render_and_write_templates(template_root_path, dir_name.path, dir_outpath)
132            generated_tests.extend(generated_tests_tmp)
133        return generated_tests
134
135    def transform(self, force_generated: bool) -> List[str]:
136        return self.generate_template_tests(self.test_source_path, self.test_gen_path)
137
138
139class ESCheckedTestPreparationStep(TestPreparationStep):
140    def __str__(self) -> str:
141        return f"Test Generator for '{EtsSuites.ESCHECKED.value}' test suite"
142
143    def transform(self, force_generated: bool) -> List[str]:
144        confs = list(glob(os.path.join(self.test_source_path, "**/*.yaml"), recursive=True))
145        generator_root = Path(self.config.general.static_core_root) / \
146                         "tests" / \
147                         "tests-u-runner" / \
148                         "tools" / \
149                         "generate-es-checked"
150        generator_executable = generator_root / "main.rb"
151        res = subprocess.run(
152            [
153                generator_executable,
154                '--out',
155                self.test_gen_path,
156                '--tmp',
157                self.test_gen_path / 'tmp',
158                '--ts-node',
159                f'npx:--prefix:{generator_root}:ts-node:-P:{generator_root / "tsconfig.json"}',
160                *confs
161            ],
162            capture_output=True,
163            encoding=sys.stdout.encoding,
164            check=False,
165            errors='replace',
166        )
167        if res.returncode != 0:
168            Log.default(_LOGGER,
169                        'Failed to run es cross-validator, please, make sure that' \
170                        'all required tools are installed (see tests-u-runner/readme.md#ets-es-checked-dependencies)')
171            Log.exception_and_raise(_LOGGER, f"invalid return code {res.returncode}\n" + res.stdout + res.stderr)
172        glob_expression = os.path.join(self.test_gen_path, "**/*.sts")
173        return list(glob(glob_expression, recursive=True))
174
175
176class CopyStep(TestPreparationStep):
177    def __str__(self) -> str:
178        return "Test preparation step for any ets test suite: copying"
179
180    def transform(self, force_generated: bool) -> List[str]:
181        utils.copy(self.test_source_path, self.test_gen_path, remove_if_exist=False)
182        glob_expression = os.path.join(self.test_gen_path, "**/*.sts")
183        return list(glob(glob_expression, recursive=True))
184
185
186class JitStep(TestPreparationStep):
187    __main_pattern = r"\bfunction\s+(?P<main>main)\b"
188    __param_pattern = r"\s*\(\s*(?P<param>(?P<param_name>\w+)(\s*:\s*\w+))?\s*\)"
189    __return_pattern = r"\s*(:\s*(?P<return_type>\w+)\b)?"
190    __throws_pattern = r"\s*(?P<throws>throws)?"
191    __indent = "    "
192
193    def __init__(self, test_source_path: Path, test_gen_path: Path, config: Config, num_repeats: int = 0):
194        super().__init__(test_source_path, test_gen_path, config)
195        self.num_repeats = num_repeats
196        self.main_regexp = re.compile(
197            f"{self.__main_pattern}{self.__param_pattern}{self.__return_pattern}{self.__throws_pattern}",
198            re.MULTILINE
199        )
200
201    def __str__(self) -> str:
202        return "Test preparation step for any ets test suite: transforming for JIT testing"
203
204    def transform(self, force_generated: bool) -> List[str]:
205        glob_expression = os.path.join(self.test_gen_path, "**/*.sts")
206        tests = list(glob(glob_expression, recursive=True))
207        with multiprocessing.Pool(processes=self.config.general.processes) as pool:
208            run_tests = pool.imap_unordered(self.jit_transform_one_test, tests, chunksize=self.config.general.chunksize)
209            pool.close()
210            pool.join()
211
212        return list(run_tests)
213
214    def jit_transform_one_test(self, test_path: str) -> str:
215        metadata = get_metadata(Path(test_path))
216        is_convert = not metadata.tags.not_a_test and \
217                     not metadata.tags.compile_only and \
218                     not metadata.tags.no_warmup
219        if not is_convert:
220            return test_path
221
222        original = read_file(test_path)
223        match = self.main_regexp.search(original)
224        if match is None:
225            return test_path
226
227        is_int_main = match.group("return_type") == "int"
228        return_type = ": int" if is_int_main else ""
229        throws = "throws " if match.group("throws") else ""
230        param_line = param if (param := match.group("param")) is not None else ""
231        param_name = param if (param := match.group("param_name")) is not None else ""
232
233        tail = [f"\nfunction main({param_line}){return_type} {throws}{{"]
234        if is_int_main:
235            tail.append(f"{self.__indent}let result = 0")
236        tail.append(f"{self.__indent}for(let i = 0; i < {self.num_repeats}; i++) {{")
237        if is_int_main:
238            tail.append(f"{self.__indent * 2}result += main_run({param_name})")
239        else:
240            tail.append(f"{self.__indent * 2}main_run({param_name})")
241        tail.append(f"{self.__indent}}}")
242        if is_int_main:
243            tail.append(f"{self.__indent}return result;")
244        tail.append("}")
245
246        result = self.main_regexp.sub(lambda arg: arg.group(0).replace("main", "main_run"), original)
247        result += "\n".join(tail)
248        write_2_file(test_path, result)
249        return test_path
250
251
252def command_line_parser() -> argparse.Namespace:
253    parser = argparse.ArgumentParser(description="Generator test runner")
254    parser.add_argument("-t", "--templates-dir", type=Path, dest='templates_dir',
255                        help="Path to a root directory that contains test templates and parameters",
256                        required=True)
257    parser.add_argument("-o", "--output-dir", type=Path, dest='output_dir',
258                        help="Path to output directory. Output directory and all" +
259                             " subdirectories are created automatically",
260                        required=True)
261    return parser.parse_args()
262
263
264def main() -> None:
265    args = command_line_parser()
266
267    try:
268        if not FuncTestPreparationStep.generate_template_tests(args.templates_dir, args.output_dir):
269            sys.exit(1)
270    except InvalidFileFormatException as inv_format_exp:
271        Log.all(_LOGGER, f'Error:  {inv_format_exp.message}')
272        sys.exit(1)
273    except InvalidFileStructureException as inv_fs_exp:
274        Log.all(_LOGGER, f'Error:  {inv_fs_exp.message}')
275        sys.exit(1)
276    except UnknownTemplateException as unknown_template_exp:
277        Log.all(_LOGGER, f"{unknown_template_exp.filepath}: exception while processing template:")
278        Log.all(_LOGGER, f"\t {repr(unknown_template_exp.exception)}")
279        sys.exit(1)
280    Log.all(_LOGGER, "Finished")
281
282
283if __name__ == "__main__":
284    main()
285