1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021-2024 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import logging 19import re 20import subprocess 21import os 22from unittest import TestCase 23from glob import glob 24from typing import Optional 25 26from runner.logger import Log 27from runner.options.config import Config 28from runner.utils import generate, wrap_with_function 29from runner.plugins.work_dir import WorkDir 30 31HERMES_URL = "HERMES_URL" 32HERMES_REVISION = "HERMES_REVISION" 33 34_LOGGER = logging.getLogger("runner.hermes.util_hermes") 35 36 37class UtilHermes: 38 def __init__(self, config: Config, work_dir: WorkDir) -> None: 39 self.check_expr = re.compile(r"^\s*//\s?(?:CHECK-NEXT|CHECK-LABEL|CHECK):(.+)", re.MULTILINE) 40 41 self.show_progress = config.general.show_progress 42 self.force_download = config.general.force_download 43 self.jit = config.ark.jit.enable 44 self.jit_preheat_repeats = config.ark.jit.num_repeats 45 46 self.work_dir = work_dir 47 48 self.hermes_url: Optional[str] = os.getenv(HERMES_URL) 49 self.hermes_revision: Optional[str] = os.getenv(HERMES_REVISION) 50 if self.hermes_url is None: 51 Log.exception_and_raise(_LOGGER, f"No {HERMES_URL} environment variable set", EnvironmentError) 52 if self.hermes_revision is None: 53 Log.exception_and_raise(_LOGGER, f"No {HERMES_REVISION} environment variable set", EnvironmentError) 54 55 def generate(self) -> str: 56 stamp_name = f"hermes-{self.hermes_revision}" 57 if self.jit and self.jit_preheat_repeats > 1: 58 stamp_name += f"-jit-{self.jit_preheat_repeats}" 59 test = TestCase() 60 if self.hermes_url is not None and self.hermes_revision is not None: 61 return generate( 62 name="hermes", 63 stamp_name=stamp_name, 64 url=self.hermes_url, 65 revision=self.hermes_revision, 66 generated_root=self.work_dir.gen, 67 test_subdir="test/hermes", 68 show_progress=self.show_progress, 69 process_copy=self.process_copy, 70 force_download=self.force_download 71 ) 72 test.assertFalse(self.hermes_url is None) 73 test.assertFalse(self.hermes_revision is None) 74 return "" 75 76 def process_copy(self, src_path: str, dst_path: str) -> None: 77 Log.all(_LOGGER, "Generating tests") 78 79 glob_expression = os.path.join(src_path, "**/*.js") 80 files = glob(glob_expression, recursive=True) 81 82 for src_file in files: 83 dest_file = src_file.replace(src_path, dst_path) 84 os.makedirs(os.path.dirname(dest_file), exist_ok=True) 85 self.create_file(src_file, dest_file) 86 87 def create_file(self, src_file: str, dest_file: str) -> None: 88 with os.fdopen(os.open(src_file, os.O_RDONLY, 0o755), 'r', encoding="utf-8") as file_pointer: 89 input_str = file_pointer.read() 90 91 if self.jit and self.jit_preheat_repeats > 1: 92 out_str = wrap_with_function(input_str, self.jit_preheat_repeats) 93 else: 94 out_str = input_str 95 96 with os.fdopen(os.open(dest_file, os.O_RDWR | os.O_CREAT, 0o755), 'w', encoding="utf-8") as output: 97 output.write(out_str) 98 99 def run_filecheck(self, test_file: str, actual_output: str) -> bool: 100 with open(test_file, 'r', encoding="utf-8") as file_pointer: 101 input_str = file_pointer.read() 102 if not re.match(self.check_expr, input_str): 103 return True 104 105 TestCase().assertTrue(actual_output is not None, "Expected some output to check") 106 cmd = ['FileCheck-14', test_file] 107 with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) as process: 108 try: 109 if process.stdin is None: 110 Log.exception_and_raise(_LOGGER, f"Unexpected error on checking {test_file}") 111 process.stdin.write(actual_output.encode('utf-8')) 112 process.communicate(timeout=10) 113 return_code = process.returncode 114 except subprocess.TimeoutExpired as ex: 115 error_message = f"Timeout: {' '.join(cmd)} failed with {ex}" 116 _LOGGER.error(error_message) 117 process.kill() 118 return_code = -1 119 except Exception as ex: # pylint: disable=broad-except 120 error_message = f"Exception{' '.join(cmd)} failed with {ex}" 121 _LOGGER.error(error_message) 122 process.kill() 123 return_code = -1 124 125 return return_code == 0 126