• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (c) 2024 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import multiprocessing
18import os
19import stat
20import sys
21import re
22from functools import lru_cache
23import random
24from typing import List, Dict, Any, Tuple
25import logging
26
27import stress_common
28from stress_test import Test, Result
29from stress_test262 import Test262StressTest, EXCLUDED_TESTS
30from stress_common import SCRIPT_DIR, collect_from
31
32
33class Descriptor:
34
35    def __init__(self, input_file: str) -> None:
36        self.input_file = input_file
37        self.header = re.compile(r"/\*---(?P<header>.+)---\*/", re.DOTALL)
38        self.includes = re.compile(r"includes:\s+\[(?P<includes>.+)]")
39        self.content = self.get_content()
40
41    def get_fail_list_path(self) -> str:
42        return os.path.join(SCRIPT_DIR, 'fail_list_harness_with_runtime.json')
43
44    def get_content(self) -> str:
45        with open(self.input_file, "r", encoding="utf-8") as file_pointer:
46            input_str = file_pointer.read()
47        return input_str
48
49    def get_header(self) -> str:
50        header_comment = self.header.search(self.content)
51        return header_comment.group(0) if header_comment else ""
52
53    def parse_descriptor(self) -> Dict[str, Any]:
54        header = self.get_header()
55        result: Dict[str, Any] = {}
56
57        if len(header) == 0:
58            return result
59
60        includes = []
61        match = self.includes.search(header)
62        includes += [
63            incl.strip() for incl in match.group("includes").split(",")
64        ] if match else []
65
66        result["includes"] = includes
67        return result
68
69
70@lru_cache(maxsize=100000)
71def get_harness_code(path: str = None) -> str:
72    return read_harness_code(path)
73
74
75def read_harness_code(path: str = None) -> str:
76    ajs = os.path.abspath(os.path.join(path, 'harness', 'assert.js'))
77    sjs = os.path.abspath(os.path.join(path, 'harness', 'sta.js'))
78
79    with open(ajs, 'r') as f:
80        assert_js = f.read()
81    with open(sjs, 'r') as f:
82        sta_js = f.read()
83    return assert_js + '\n' + sta_js + '\n'
84
85
86@lru_cache(maxsize=10000)
87def read_import(src: str) -> str:
88    with open(src, 'r') as f:
89        code = f.read()
90    return code
91
92
93class Test262StressTestWithRuntime(Test262StressTest):
94
95    def __init__(self, repeats: int, args) -> None:
96        super().__init__(args)
97        self.repeats = 3 if repeats is None else repeats
98        self.build_dir = args.build_dir
99        logger = stress_common.create_logger()
100        logger.debug('Repeats: %s with timeout: %s', self.repeats,
101                     self.timeout)
102
103    def compile_single(self, src: str) -> Tuple[str, str, int]:
104        self.prepare_single(src)
105        src = src.replace("[", "_").replace("]", "_")
106        cr = super().compile_single(src + '.mjs')
107        return src, cr[1], cr[2]
108
109    def run_single(self, test: Test) -> Result:
110        test.abc = test.abc.replace("[", "_").replace("]", "_")
111        stress_abc = test.abc + '.stress.abc'
112        r1p = Test(test.source + ".mjs", test.abc)
113        r2p = Test(test.source + ".mjs", stress_abc)
114
115        test_result_one = self.run_js_test_single(r1p)  # Run test once
116
117        result = stress_common.run_abckit(self.build_dir, test.source,
118                                          test.abc, stress_abc)
119
120        if result.returncode != 0:
121            error = stress_common.parse_stdout(result.returncode,
122                                               result.stdout)
123            return Result(test.source, error)
124        # Stress test passed
125
126        if test_result_one.result == -1:  # First attempt JS Test failed with timeout. This bypass next test
127            return Result(test.source, "0")
128
129        test_result_two = self.run_js_test_single(
130            r2p, self.repeats)  # Run test with defined repeats
131
132        if test_result_two.result == 0:
133            return Result(test.source, "0")
134
135        if test_result_one.result != test_result_two.result:
136            msg = f'JS Test result changed. Was {test_result_one.result}, now {test_result_two.result}'
137            return Result(test.source, msg)
138
139        return Result(test.source, "0")
140
141    def prepare_single(self, src: str) -> None:
142        flags = os.O_WRONLY | os.O_CREAT
143        mode = stat.S_IWUSR | stat.S_IRUSR
144        with os.fdopen(
145                os.open(
146                    src.replace("[", "_").replace("]", "_") + ".mjs", flags,
147                    mode), 'w') as out:
148            with os.fdopen(os.open(src, os.O_RDONLY), 'r') as sf:
149                out.write(get_harness_code(self.js_dir))
150                for include in Descriptor(src).parse_descriptor().get(
151                        'includes', []):
152                    imp = os.path.abspath(
153                        os.path.join(self.js_dir, 'harness', include))
154                    out.write(read_import(imp) + "\n")
155                out.write(sf.read())
156
157    def collect(self) -> List[str]:
158        logger = stress_common.create_logger()
159        tests: List[str] = []
160        sp = os.path.join(self.js_dir, 'test')
161        tests.extend(
162            collect_from(
163                sp, lambda name: name.endswith('.js') and not name.startswith(
164                    '.')))
165        sp = os.path.join(self.js_dir, 'implementation-contributed')
166        tests.extend(
167            collect_from(
168                sp, lambda name: name.endswith('.js') and not name.startswith(
169                    '.')))
170        random.shuffle(tests)
171
172        logger.debug('Total tests: %s', len(tests))
173        for excluded in EXCLUDED_TESTS:
174            tests = list(filter(lambda name: excluded not in name, tests))
175        logger.debug('Tests after exclude: %s', len(tests))
176
177        return tests
178
179    def build(self) -> List[Test]:
180        logger = stress_common.create_logger()
181        tests: List[str] = self.collect()
182
183        logger.debug('Running compiler...')
184        compiled_tests: List[Test] = []
185        counter = 0
186        with multiprocessing.pool.ThreadPool(stress_common.NPROC) as pool:
187            for js_path, abc_path, retcode in pool.imap(self.compile_single,
188                                                        tests,
189                                                        chunksize=20):
190                if retcode == 0:
191                    compiled_tests.append(Test(js_path, abc_path))
192                counter += 1
193
194        logger.debug('Tests successfully compiled: %s', len(compiled_tests))
195        return compiled_tests
196