1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2022-2025 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import argparse 19import logging 20import multiprocessing 21import os 22import re 23import shutil 24import subprocess 25import sys 26from abc import ABC, abstractmethod 27from glob import glob 28from pathlib import Path 29from typing import List 30 31from runner import utils 32from runner.logger import Log 33from runner.options.config import Config 34from runner.plugins.ets.ets_suites import EtsSuites 35from runner.plugins.ets.ets_templates.ets_templates_generator import EtsTemplatesGenerator 36from runner.plugins.ets.ets_templates.test_metadata import get_metadata 37from runner.plugins.ets.stdlib_templates.stdlib_templates_generator import StdlibTemplatesGenerator 38from runner.generators.ets_func_tests.ets_func_test_template_generator import EtsFuncTestsCodeGenerator 39from runner.plugins.ets.utils.exceptions import InvalidFileFormatException, InvalidFileStructureException, \ 40 UnknownTemplateException 41from runner.plugins.ets.utils.file_structure import walk_test_subdirs 42from runner.utils import read_file, write_2_file 43 44_LOGGER = logging.getLogger("runner.plugins.ets.generate") 45 46 47class TestPreparationStep(ABC): 48 def __init__(self, test_source_path: Path, test_gen_path: Path, config: Config) -> None: 49 self.__test_source_path = test_source_path 50 self.__test_gen_path = test_gen_path 51 self.config = config 52 53 @property 54 def test_source_path(self) -> Path: 55 return self.__test_source_path 56 57 @property 58 def test_gen_path(self) -> Path: 59 return self.__test_gen_path 60 61 @abstractmethod 62 def transform(self, force_generated: bool) -> List[str]: 63 pass 64 65 66class CtsTestPreparationStep(TestPreparationStep): 67 def __str__(self) -> str: 68 return f"Test Generator for '{EtsSuites.CTS.value}' test suite" 69 70 def transform(self, force_generated: bool) -> List[str]: 71 ets_templates_generator = EtsTemplatesGenerator(self.test_source_path, self.test_gen_path) 72 return ets_templates_generator.generate() 73 74 75class CustomGeneratorTestPreparationStep(TestPreparationStep): 76 def __init__(self, test_source_path: Path, test_gen_path: Path, config: Config, extension: str) -> None: 77 super().__init__(test_source_path, test_gen_path, config) 78 self.extension = extension 79 80 def __str__(self) -> str: 81 return f"Test Generator for '{EtsSuites.CUSTOM.value} - {self.config.custom.suite_name}' test suite" 82 83 def transform(self, force_generated: bool) -> List[str]: 84 # call of the custom generator 85 if self.test_gen_path.exists() and force_generated: 86 shutil.rmtree(self.test_gen_path) 87 self.test_gen_path.mkdir(exist_ok=True) 88 cmd = [ 89 self.config.custom.generator, 90 "--source", self.test_source_path, 91 "--target", self.test_gen_path 92 ] 93 cmd.extend(self.config.custom.generator_options) 94 timeout = 300 95 result: List[str] = [] 96 with subprocess.Popen( 97 cmd, 98 stdout=subprocess.PIPE, 99 stderr=subprocess.PIPE, 100 encoding='utf-8', 101 errors='ignore', 102 ) as process: 103 try: 104 process.communicate(timeout=timeout) 105 except subprocess.TimeoutExpired: 106 process.kill() 107 Log.exception_and_raise(_LOGGER, f"Generation failed by timeout after {timeout} sec") 108 except Exception as ex: # pylint: disable=broad-except 109 Log.exception_and_raise(_LOGGER, f"Generation failed by unknown reason: {ex}") 110 finally: 111 glob_expression = os.path.join(self.test_gen_path, f"**/*.{self.extension}") 112 result.extend(glob(glob_expression, recursive=True)) 113 114 return result 115 116 117class FuncTestPreparationStep(TestPreparationStep): 118 def __str__(self) -> str: 119 return f"Test Generator for '{EtsSuites.FUNC.value}' test suite" 120 121 @staticmethod 122 def generate_template_tests(template_root_path: Path, test_gen_path: Path) -> List[str]: 123 """ 124 Renders all templates and saves them. 125 """ 126 if not template_root_path.is_dir(): 127 Log.all(_LOGGER, f"ERROR: {str(template_root_path.absolute())} must be a directory") 128 return [] 129 130 ets_render = StdlibTemplatesGenerator(template_root_path) 131 generated_tests = [] 132 for dir_name in walk_test_subdirs(template_root_path): 133 dir_outpath = test_gen_path / dir_name.path.relative_to(template_root_path) 134 generated_tests_tmp = ets_render.render_and_write_templates(template_root_path, dir_name.path, dir_outpath) 135 generated_tests.extend(generated_tests_tmp) 136 137 return generated_tests 138 139 def transform(self, force_generated: bool) -> List[str]: 140 return self.generate_template_tests(self.test_source_path, self.test_gen_path) 141 142 143class FrontendFuncTestPreparationStep(TestPreparationStep): 144 def __str__(self) -> str: 145 return f"Test Generator for '{EtsSuites.FUNC.value}' test suite" 146 147 @staticmethod 148 def generate_template_tests(template_root_path: Path, test_gen_path: Path) -> List[str]: 149 """ 150 Renders all templates and saves them. 151 """ 152 if not template_root_path.is_dir(): 153 Log.all(_LOGGER, f"ERROR: {str(template_root_path.absolute())} must be a directory") 154 return [] 155 156 generated_tests = [] 157 ets_func_test_render = EtsFuncTestsCodeGenerator(template_root_path) 158 for dir_name in walk_test_subdirs(template_root_path): 159 dir_outpath = test_gen_path / dir_name.path.relative_to(template_root_path) 160 generated_tests_tmp = ets_func_test_render.render_and_write_templates(template_root_path, 161 dir_name.path, 162 dir_outpath) 163 generated_tests.extend(generated_tests_tmp) 164 165 return generated_tests 166 167 def transform(self, force_generated: bool) -> List[str]: 168 return self.generate_template_tests(self.test_source_path, self.test_gen_path) 169 170 171class ESCheckedTestPreparationStep(TestPreparationStep): 172 def __str__(self) -> str: 173 return f"Test Generator for '{EtsSuites.ESCHECKED.value}' test suite" 174 175 def transform(self, force_generated: bool) -> List[str]: 176 confs = list(glob(os.path.join(self.test_source_path, "**/*.yaml"), recursive=True)) 177 generator_root = (Path(self.config.general.static_core_root) / 178 "tests" / 179 "tests-u-runner" / 180 "tools" / 181 "generate-es-checked") 182 generator_executable = generator_root / "main.rb" 183 res = subprocess.run( 184 [ 185 generator_executable, 186 '--proc', 187 str(self.config.general.processes), 188 '--out', 189 self.test_gen_path, 190 '--tmp', 191 self.test_gen_path / 'tmp', 192 '--ts-node', 193 f'npx:--prefix:{generator_root}:ts-node:-P:{generator_root / "tsconfig.json"}', 194 *confs 195 ], 196 capture_output=True, 197 encoding=sys.stdout.encoding, 198 check=False, 199 errors='replace', 200 ) 201 if res.returncode != 0: 202 Log.default(_LOGGER, 203 'Failed to run es cross-validator, please, make sure that ' 204 'all required tools are installed (see tests-u-runner/readme.md#ets-es-checked-dependencies)') 205 Log.exception_and_raise(_LOGGER, f"invalid return code {res.returncode}\n" + res.stdout + res.stderr) 206 glob_expression = os.path.join(self.test_gen_path, "**/*.ets") 207 return list(glob(glob_expression, recursive=True)) 208 209 210class CopyStep(TestPreparationStep): 211 def __str__(self) -> str: 212 return "Test preparation step for any ets test suite: copying" 213 214 def transform(self, force_generated: bool) -> List[str]: 215 utils.copy(self.test_source_path, self.test_gen_path, remove_if_exist=False) 216 glob_expression = os.path.join(self.test_gen_path, "**/*.ets") 217 return list(glob(glob_expression, recursive=True)) 218 219 220class JitStep(TestPreparationStep): 221 __main_pattern = r"\bfunction\s+(?P<main>main)\b" 222 __param_pattern = r"\s*\(\s*(?P<param>(?P<param_name>\w+)(\s*:\s*\w+))?\s*\)" 223 __return_pattern = r"\s*(:\s*(?P<return_type>\w+)\b)?" 224 __throws_pattern = r"\s*(?P<throws>throws)?" 225 __indent = " " 226 227 def __init__(self, test_source_path: Path, test_gen_path: Path, config: Config, num_repeats: int = 0): 228 super().__init__(test_source_path, test_gen_path, config) 229 self.num_repeats = num_repeats 230 self.main_regexp = re.compile( 231 f"{self.__main_pattern}{self.__param_pattern}{self.__return_pattern}{self.__throws_pattern}", 232 re.MULTILINE 233 ) 234 235 def __str__(self) -> str: 236 return "Test preparation step for any ets test suite: transforming for JIT testing" 237 238 def transform(self, force_generated: bool) -> List[str]: 239 glob_expression = os.path.join(self.test_gen_path, "**/*.ets") 240 tests = list(glob(glob_expression, recursive=True)) 241 with multiprocessing.Pool(processes=self.config.general.processes) as pool: 242 run_tests = pool.imap_unordered(self.jit_transform_one_test, tests, chunksize=self.config.general.chunksize) 243 pool.close() 244 pool.join() 245 246 return list(run_tests) 247 248 def jit_transform_one_test(self, test_path: str) -> str: 249 metadata = get_metadata(Path(test_path)) 250 is_convert = (not metadata.tags.not_a_test and 251 not metadata.tags.compile_only and 252 not metadata.tags.no_warmup) 253 if not is_convert: 254 return test_path 255 256 original = read_file(test_path) 257 match = self.main_regexp.search(original) 258 if match is None: 259 return test_path 260 261 is_int_main = match.group("return_type") == "int" 262 return_type = ": int" if is_int_main else "" 263 throws = "throws " if match.group("throws") else "" 264 param_line = param if (param := match.group("param")) is not None else "" 265 param_name = param if (param := match.group("param_name")) is not None else "" 266 267 tail = [f"\nfunction main({param_line}){return_type} {throws}{{"] 268 if is_int_main: 269 tail.append(f"{self.__indent}let result = 0") 270 tail.append(f"{self.__indent}for(let i = 0; i < {self.num_repeats}; i++) {{") 271 if is_int_main: 272 tail.append(f"{self.__indent * 2}result += main_run({param_name})") 273 else: 274 tail.append(f"{self.__indent * 2}main_run({param_name})") 275 tail.append(f"{self.__indent}}}") 276 if is_int_main: 277 tail.append(f"{self.__indent}return result;") 278 tail.append("}") 279 280 result = self.main_regexp.sub(lambda arg: arg.group(0).replace("main", "main_run"), original) 281 result += "\n".join(tail) 282 write_2_file(test_path, result) 283 return test_path 284 285 286def command_line_parser() -> argparse.Namespace: 287 parser = argparse.ArgumentParser(description="Generator test runner") 288 parser.add_argument("-t", "--templates-dir", type=Path, dest='templates_dir', 289 help="Path to a root directory that contains test templates and parameters", 290 required=True) 291 parser.add_argument("-o", "--output-dir", type=Path, dest='output_dir', 292 help="Path to output directory. Output directory and all" + 293 " subdirectories are created automatically", 294 required=True) 295 return parser.parse_args() 296 297 298def main() -> None: 299 args = command_line_parser() 300 301 try: 302 if not FuncTestPreparationStep.generate_template_tests(args.templates_dir, args.output_dir): 303 sys.exit(1) 304 except InvalidFileFormatException as inv_format_exp: 305 Log.all(_LOGGER, f'Error: {inv_format_exp.message}') 306 sys.exit(1) 307 except InvalidFileStructureException as inv_fs_exp: 308 Log.all(_LOGGER, f'Error: {inv_fs_exp.message}') 309 sys.exit(1) 310 except UnknownTemplateException as unknown_template_exp: 311 Log.all(_LOGGER, f"{unknown_template_exp.filepath}: exception while processing template:") 312 Log.all(_LOGGER, f"\t {repr(unknown_template_exp.exception)}") 313 sys.exit(1) 314 Log.all(_LOGGER, "Finished") 315 316 317if __name__ == "__main__": 318 main() 319