1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (c) 2024 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import multiprocessing 18import os 19import stat 20import sys 21import re 22from functools import lru_cache 23import random 24from typing import List, Dict, Any, Tuple 25import logging 26 27import stress_common 28from stress_test import Test, Result 29from stress_test262 import Test262StressTest, EXCLUDED_TESTS 30from stress_common import SCRIPT_DIR, collect_from 31 32 33class Descriptor: 34 35 def __init__(self, input_file: str) -> None: 36 self.input_file = input_file 37 self.header = re.compile(r"/\*---(?P<header>.+)---\*/", re.DOTALL) 38 self.includes = re.compile(r"includes:\s+\[(?P<includes>.+)]") 39 self.content = self.get_content() 40 41 def get_fail_list_path(self) -> str: 42 return os.path.join(SCRIPT_DIR, 'fail_list_harness_with_runtime.json') 43 44 def get_content(self) -> str: 45 with open(self.input_file, "r", encoding="utf-8") as file_pointer: 46 input_str = file_pointer.read() 47 return input_str 48 49 def get_header(self) -> str: 50 header_comment = self.header.search(self.content) 51 return header_comment.group(0) if header_comment else "" 52 53 def parse_descriptor(self) -> Dict[str, Any]: 54 header = self.get_header() 55 result: Dict[str, Any] = {} 56 57 if len(header) == 0: 58 return result 59 60 includes = [] 61 match = self.includes.search(header) 62 includes += [ 63 incl.strip() for incl in match.group("includes").split(",") 64 ] if match else [] 65 66 result["includes"] = includes 67 return result 68 69 70@lru_cache(maxsize=100000) 71def get_harness_code(path: str = None) -> str: 72 return read_harness_code(path) 73 74 75def read_harness_code(path: str = None) -> str: 76 ajs = os.path.abspath(os.path.join(path, 'harness', 'assert.js')) 77 sjs = os.path.abspath(os.path.join(path, 'harness', 'sta.js')) 78 79 with open(ajs, 'r') as f: 80 assert_js = f.read() 81 with open(sjs, 'r') as f: 82 sta_js = f.read() 83 return assert_js + '\n' + sta_js + '\n' 84 85 86@lru_cache(maxsize=10000) 87def read_import(src: str) -> str: 88 with open(src, 'r') as f: 89 code = f.read() 90 return code 91 92 93class Test262StressTestWithRuntime(Test262StressTest): 94 95 def __init__(self, repeats: int, args) -> None: 96 super().__init__(args) 97 self.repeats = 3 if repeats is None else repeats 98 self.build_dir = args.build_dir 99 logger = stress_common.create_logger() 100 logger.debug('Repeats: %s with timeout: %s', self.repeats, 101 self.timeout) 102 103 def compile_single(self, src: str) -> Tuple[str, str, int]: 104 self.prepare_single(src) 105 src = src.replace("[", "_").replace("]", "_") 106 cr = super().compile_single(src + '.mjs') 107 return src, cr[1], cr[2] 108 109 def run_single(self, test: Test) -> Result: 110 test.abc = test.abc.replace("[", "_").replace("]", "_") 111 stress_abc = test.abc + '.stress.abc' 112 r1p = Test(test.source + ".mjs", test.abc) 113 r2p = Test(test.source + ".mjs", stress_abc) 114 115 test_result_one = self.run_js_test_single(r1p) # Run test once 116 117 result = stress_common.run_abckit(self.build_dir, test.source, 118 test.abc, stress_abc) 119 120 if result.returncode != 0: 121 error = stress_common.parse_stdout(result.returncode, 122 result.stdout) 123 return Result(test.source, error) 124 # Stress test passed 125 126 if test_result_one.result == -1: # First attempt JS Test failed with timeout. This bypass next test 127 return Result(test.source, "0") 128 129 test_result_two = self.run_js_test_single( 130 r2p, self.repeats) # Run test with defined repeats 131 132 if test_result_two.result == 0: 133 return Result(test.source, "0") 134 135 if test_result_one.result != test_result_two.result: 136 msg = f'JS Test result changed. Was {test_result_one.result}, now {test_result_two.result}' 137 return Result(test.source, msg) 138 139 return Result(test.source, "0") 140 141 def prepare_single(self, src: str) -> None: 142 flags = os.O_WRONLY | os.O_CREAT 143 mode = stat.S_IWUSR | stat.S_IRUSR 144 with os.fdopen( 145 os.open( 146 src.replace("[", "_").replace("]", "_") + ".mjs", flags, 147 mode), 'w') as out: 148 with os.fdopen(os.open(src, os.O_RDONLY), 'r') as sf: 149 out.write(get_harness_code(self.js_dir)) 150 for include in Descriptor(src).parse_descriptor().get( 151 'includes', []): 152 imp = os.path.abspath( 153 os.path.join(self.js_dir, 'harness', include)) 154 out.write(read_import(imp) + "\n") 155 out.write(sf.read()) 156 157 def collect(self) -> List[str]: 158 logger = stress_common.create_logger() 159 tests: List[str] = [] 160 sp = os.path.join(self.js_dir, 'test') 161 tests.extend( 162 collect_from( 163 sp, lambda name: name.endswith('.js') and not name.startswith( 164 '.'))) 165 sp = os.path.join(self.js_dir, 'implementation-contributed') 166 tests.extend( 167 collect_from( 168 sp, lambda name: name.endswith('.js') and not name.startswith( 169 '.'))) 170 random.shuffle(tests) 171 172 logger.debug('Total tests: %s', len(tests)) 173 for excluded in EXCLUDED_TESTS: 174 tests = list(filter(lambda name: excluded not in name, tests)) 175 logger.debug('Tests after exclude: %s', len(tests)) 176 177 return tests 178 179 def build(self) -> List[Test]: 180 logger = stress_common.create_logger() 181 tests: List[str] = self.collect() 182 183 logger.debug('Running compiler...') 184 compiled_tests: List[Test] = [] 185 counter = 0 186 with multiprocessing.pool.ThreadPool(stress_common.NPROC) as pool: 187 for js_path, abc_path, retcode in pool.imap(self.compile_single, 188 tests, 189 chunksize=20): 190 if retcode == 0: 191 compiled_tests.append(Test(js_path, abc_path)) 192 counter += 1 193 194 logger.debug('Tests successfully compiled: %s', len(compiled_tests)) 195 return compiled_tests 196