• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2021-2024 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import logging
19import re
20import subprocess
21import os
22from unittest import TestCase
23from glob import glob
24from typing import Optional
25
26from runner.logger import Log
27from runner.options.config import Config
28from runner.utils import generate, wrap_with_function
29from runner.plugins.work_dir import WorkDir
30
31HERMES_URL = "HERMES_URL"
32HERMES_REVISION = "HERMES_REVISION"
33
34_LOGGER = logging.getLogger("runner.hermes.util_hermes")
35
36
37class UtilHermes:
38    def __init__(self, config: Config, work_dir: WorkDir) -> None:
39        self.check_expr = re.compile(r"^\s*//\s?(?:CHECK-NEXT|CHECK-LABEL|CHECK):(.+)", re.MULTILINE)
40
41        self.show_progress = config.general.show_progress
42        self.force_download = config.general.force_download
43        self.jit = config.ark.jit.enable
44        self.jit_preheat_repeats = config.ark.jit.num_repeats
45
46        self.work_dir = work_dir
47
48        self.hermes_url: Optional[str] = os.getenv(HERMES_URL)
49        self.hermes_revision: Optional[str] = os.getenv(HERMES_REVISION)
50        if self.hermes_url is None:
51            Log.exception_and_raise(_LOGGER, f"No {HERMES_URL} environment variable set", EnvironmentError)
52        if self.hermes_revision is None:
53            Log.exception_and_raise(_LOGGER, f"No {HERMES_REVISION} environment variable set", EnvironmentError)
54
55    def generate(self) -> str:
56        stamp_name = f"hermes-{self.hermes_revision}"
57        if self.jit and self.jit_preheat_repeats > 1:
58            stamp_name += f"-jit-{self.jit_preheat_repeats}"
59        test = TestCase()
60        if self.hermes_url is not None and self.hermes_revision is not None:
61            return generate(
62                name="hermes",
63                stamp_name=stamp_name,
64                url=self.hermes_url,
65                revision=self.hermes_revision,
66                generated_root=self.work_dir.gen,
67                test_subdir="test/hermes",
68                show_progress=self.show_progress,
69                process_copy=self.process_copy,
70                force_download=self.force_download
71            )
72        test.assertFalse(self.hermes_url is None)
73        test.assertFalse(self.hermes_revision is None)
74        return ""
75
76    def process_copy(self, src_path: str, dst_path: str) -> None:
77        Log.all(_LOGGER, "Generating tests")
78
79        glob_expression = os.path.join(src_path, "**/*.js")
80        files = glob(glob_expression, recursive=True)
81
82        for src_file in files:
83            dest_file = src_file.replace(src_path, dst_path)
84            os.makedirs(os.path.dirname(dest_file), exist_ok=True)
85            self.create_file(src_file, dest_file)
86
87    def create_file(self, src_file: str, dest_file: str) -> None:
88        with os.fdopen(os.open(src_file, os.O_RDONLY, 0o755), 'r', encoding="utf-8") as file_pointer:
89            input_str = file_pointer.read()
90
91        if self.jit and self.jit_preheat_repeats > 1:
92            out_str = wrap_with_function(input_str, self.jit_preheat_repeats)
93        else:
94            out_str = input_str
95
96        with os.fdopen(os.open(dest_file, os.O_RDWR | os.O_CREAT, 0o755), 'w', encoding="utf-8") as output:
97            output.write(out_str)
98
99    def run_filecheck(self, test_file: str, actual_output: str) -> bool:
100        with open(test_file, 'r', encoding="utf-8") as file_pointer:
101            input_str = file_pointer.read()
102        if not re.match(self.check_expr, input_str):
103            return True
104
105        TestCase().assertTrue(actual_output is not None, "Expected some output to check")
106        cmd = ['FileCheck-14', test_file]
107        with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) as process:
108            try:
109                if process.stdin is None:
110                    Log.exception_and_raise(_LOGGER, f"Unexpected error on checking {test_file}")
111                process.stdin.write(actual_output.encode('utf-8'))
112                process.communicate(timeout=10)
113                return_code = process.returncode
114            except subprocess.TimeoutExpired as ex:
115                error_message = f"Timeout: {' '.join(cmd)} failed with {ex}"
116                _LOGGER.error(error_message)
117                process.kill()
118                return_code = -1
119            except Exception as ex:  # pylint: disable=broad-except
120                error_message = f"Exception{' '.join(cmd)} failed with {ex}"
121                _LOGGER.error(error_message)
122                process.kill()
123                return_code = -1
124
125        return return_code == 0
126