• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4"""
5Copyright (c) 2023-2024 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18Description: run regress test case
19"""
20import argparse
21import dataclasses
22import datetime
23import json
24import logging
25import multiprocessing
26import os
27import platform
28import re
29import shutil
30import stat
31import subprocess
32import sys
33from abc import ABC
34from typing import Optional, List, Type, Dict, Set, Tuple
35from os.path import dirname, join
36from pathlib import Path
37import xml.etree.cElementTree as XTree
38from enum import Enum, auto
39
40from regress_test_config import RegressTestConfig
41
42ENV_PATTERN = re.compile(r"//\s+Environment Variables:(.*)")
43
44
45def init_log_file(args):
46    logging.basicConfig(filename=args.out_log, format=RegressTestConfig.DEFAULT_LOG_FORMAT, level=logging.INFO)
47
48
49def parse_args():
50    parser = argparse.ArgumentParser()
51    parser.add_argument('--test-dir', metavar='DIR',
52                        help='Directory to test ')
53    parser.add_argument('--test-file', metavar='FILE',
54                        help='File to test')
55    parser.add_argument('--test-list', metavar='FILE', dest="test_list", default=None,
56                        help='File with list of tests to run')
57    parser.add_argument('--ignore-list', metavar='FILE', dest="ignore_list", default=None,
58                        help='File with known failed tests list')
59    parser.add_argument('--timeout', default=RegressTestConfig.DEFAULT_TIMEOUT, type=int,
60                        help='Set a custom test timeout in seconds !!!\n')
61    parser.add_argument('--processes', default=RegressTestConfig.DEFAULT_PROCESSES, type=int,
62                        help='set number of processes to use. Default value: 1\n')
63    parser.add_argument('--merge-abc-binary',
64                        help="merge-abc's binary tool")
65    parser.add_argument('--ark-tool',
66                        help="ark's binary tool")
67    parser.add_argument('--ark-aot-tool',
68                        help="ark_aot's binary tool")
69    parser.add_argument('--ark-aot', default=False, action='store_true',
70                        help="runs in ark-aot mode")
71    parser.add_argument('--run-pgo', default=False, action='store_true',
72                        help="runs in pgo mode")
73    parser.add_argument('--enable-litecg', default=False, action='store_true',
74                        help="runs in litecg mode")
75    parser.add_argument('--ark-frontend-binary',
76                        help="ark frontend conversion binary tool")
77    parser.add_argument('--stub-path',
78                        help="stub file for run in AOT modes")
79    parser.add_argument('--LD_LIBRARY_PATH', '--libs-dir',
80                        dest='ld_library_path', default=None, help='LD_LIBRARY_PATH')
81    parser.add_argument('--icu-path',
82                        dest='icu_path', help='icu-data-path')
83    parser.add_argument('--out-dir',
84                        default=None, help='target out dir')
85    parser.add_argument('--force-clone', action="store_true",
86                        default=False, help='Force to clone tests folder')
87    return parser.parse_args()
88
89
90def check_ark_frontend_binary(args) -> bool:
91    if args.ark_frontend_binary is None:
92        print('ark_frontend_binary is required, please add this parameter')
93        return False
94    return True
95
96
97def check_frontend_library(args) -> bool:
98    current_dir = str(os.getcwd())
99    current_frontend_binary = os.path.join(current_dir, str(args.ark_frontend_binary))
100    test_tool_frontend_binary = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_frontend_binary)
101    if not os.path.exists(current_frontend_binary) and not os.path.exists(test_tool_frontend_binary):
102        print('entered ark_frontend_binary does not exist. please confirm')
103        return False
104    args.ark_frontend_binary = current_frontend_binary if os.path.exists(
105        current_frontend_binary) else test_tool_frontend_binary
106    args.ark_frontend_binary = os.path.abspath(args.ark_frontend_binary)
107    return True
108
109
110def check_ark_tool(args) -> bool:
111    current_dir = str(os.getcwd())
112    if args.ark_tool is None:
113        print('ark_tool is required, please add this parameter')
114        return False
115
116    current_ark_tool = os.path.join(current_dir, str(args.ark_tool))
117    test_tool_ark_tool = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_tool)
118    if not os.path.exists(current_ark_tool) and not os.path.exists(test_tool_ark_tool):
119        print('entered ark_tool does not exist. please confirm')
120        return False
121
122    args.ark_tool = current_ark_tool if os.path.exists(current_ark_tool) else test_tool_ark_tool
123    args.ark_tool = os.path.abspath(args.ark_tool)
124    return True
125
126
127def check_ark_aot(args) -> bool:
128    if args.ark_aot:
129        current_dir = str(os.getcwd())
130        current_ark_aot_tool = os.path.join(current_dir, str(args.ark_aot_tool))
131        test_tool_ark_aot_tool = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_aot_tool)
132        if not os.path.exists(current_ark_aot_tool) and not os.path.exists(test_tool_ark_aot_tool):
133            print(f'entered ark_aot_tool "{args.ark_aot_tool}" does not exist. Please check')
134            return False
135        args.ark_aot_tool = current_ark_aot_tool if os.path.exists(current_ark_aot_tool) else test_tool_ark_aot_tool
136        args.ark_aot_tool = os.path.abspath(args.ark_aot_tool)
137        return True
138    if args.run_pgo and not args.ark_aot:
139        print('pgo mode cannot be used without aot')
140        return False
141    return True
142
143
144def check_stub_path(args) -> bool:
145    if args.stub_path:
146        current_dir = str(os.getcwd())
147        stub_path = os.path.join(current_dir, str(args.stub_path))
148        if not os.path.exists(stub_path):
149            print(f'entered stub-file "{args.stub_path}" does not exist. Please check')
150            return False
151        args.stub_path = os.path.abspath(args.stub_path)
152    return True
153
154
155def is_ignore_file_present(ignore_path: str) -> bool:
156    if os.path.exists(ignore_path):
157        return True
158    print(f"Cannot find ignore list '{ignore_path}'")
159    return False
160
161
162def check_ignore_list(args) -> bool:
163    if args.ignore_list:
164        if os.path.isabs(args.ignore_list):
165            return is_ignore_file_present(args.ignore_list)
166        args.ignore_list = str(os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ignore_list))
167        return is_ignore_file_present(args.ignore_list)
168    return True
169
170
171def check_args(args):
172    result = check_ark_frontend_binary(args)
173    result = result and check_frontend_library(args)
174    result = result and check_ark_tool(args)
175    result = result and check_ark_aot(args)
176    result = result and check_stub_path(args)
177    result = result and check_ignore_list(args)
178
179    if not result:
180        return False
181
182    if args.ld_library_path is not None:
183        libs = args.ld_library_path.split(":")
184        current_dir = str(os.getcwd())
185        libs = [os.path.abspath(os.path.join(current_dir, str(lib))) for lib in libs]
186        args.ld_library_path = ":".join(libs)
187    else:
188        args.ld_library_path = RegressTestConfig.DEFAULT_LIBS_DIR
189    if args.icu_path is None:
190        args.icu_path = RegressTestConfig.ICU_PATH
191    if args.out_dir is None:
192        args.out_dir = RegressTestConfig.PROJECT_BASE_OUT_DIR
193    else:
194        args.out_dir = os.path.abspath(os.path.join(RegressTestConfig.CURRENT_PATH, args.out_dir))
195    if not args.out_dir.endswith("/"):
196        args.out_dir = f"{args.out_dir}/"
197    args.regress_out_dir = os.path.join(args.out_dir, "regresstest")
198    args.out_result = os.path.join(args.regress_out_dir, 'result.txt')
199    args.junit_report = os.path.join(args.regress_out_dir, 'report.xml')
200    args.out_log = os.path.join(args.regress_out_dir, 'test.log')
201    args.test_case_out_dir = os.path.join(args.regress_out_dir, RegressTestConfig.REGRESS_GIT_REPO)
202    return True
203
204
205def remove_dir(path):
206    if os.path.exists(path):
207        shutil.rmtree(path)
208
209
210def output(msg):
211    print(str(msg))
212    logging.info(str(msg))
213
214
215def out_put_std(ret_code, cmds, msg):
216    error_messages = {
217        0: msg,
218        -6: f'{msg}:{cmds}\nAborted (core dumped)',
219        -4: f'{msg}:{cmds}\nAborted (core dumped)',
220        -11: f'{msg}:{cmds}\nSegmentation fault (core dumped)',
221        255: f'{msg}:{cmds}\n(uncaught error)'
222    }
223    error_message = error_messages.get(ret_code, f'{cmds}:{msg}: Unknown Error: {str(ret_code)}')
224    if error_message != '':
225        output(str(error_message))
226
227
228@dataclasses.dataclass
229class StepResult:
230    step_name: str  # a copy of the step name
231    is_passed: bool = False  # True if passed, any other state is False
232    command: List[str] = dataclasses.field(default_factory=list)  # command to run
233    return_code: int = -1
234    stdout: Optional[str] = None  # present only if there is some output
235    stderr: Optional[str] = None  # can be present only if is_passed == False
236    fileinfo: Optional[str] = None  # content of fileinfo file if present
237
238    def report(self) -> str:
239        stdout = self.stdout if self.stdout else ''
240        stderr = self.stderr if self.stderr else ''
241        cmd = " ".join([str(cmd) for cmd in self.command])
242        result: List[str] = [
243            f"{self.step_name}:",
244            f"\tCommand: {cmd}",
245            f"\treturn code={self.return_code}",
246            f"\toutput='{stdout}'",
247            f"\terrors='{stderr}'"]
248        if self.fileinfo:
249            result.append(f"\tFileInfo:\n{self.fileinfo}")
250        return "\n".join(result)
251
252
253@dataclasses.dataclass
254class TestReport:
255    src_path: str  # full path to the source test
256    test_id: str = ""  # path starting from regresstest
257    out_path: str = ""  # full path to intermediate files up to folder
258    passed: bool = False  # False if the test has not started or failed
259    is_skipped: bool = False  # True if the test has found in the skipped (excluded) list
260    is_ignored: bool = False  # True if the test has found in the ignored list
261    steps: List[StepResult] = dataclasses.field(default_factory=list)  # list of results
262
263    def report(self) -> str:
264        result: List[str] = [f"{self.test_id}:"]
265        if self.steps is None:
266            return ""
267        for step in self.steps:
268            result.append(f"\t{step.report()}")
269        return "\n".join(result)
270
271
272class RegressTestStep(ABC):
273    step_obj: Optional['RegressTestStep'] = None
274
275    def __init__(self, args, name):
276        print(f"--- Start step {name} ---")
277        self.args = args
278        self.__start: Optional[datetime.datetime] = None
279        self.__end: Optional[datetime.datetime] = None
280        self.__duration: Optional[datetime.timedelta] = None
281        self.name: str = name
282
283    @staticmethod
284    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
285        pass
286
287    def get_duration(self) -> datetime.timedelta:
288        if self.__duration is None:
289            print(f"Step {self.name} not started or not completed")
290            sys.exit(1)
291        return self.__duration
292
293    def _start(self):
294        self.__start = datetime.datetime.now()
295
296    def _end(self):
297        self.__end = datetime.datetime.now()
298        self.__duration = self.__end - self.__start
299
300
301class RegressTestRepoPrepare(RegressTestStep):
302    def __init__(self, args):
303        RegressTestStep.__init__(self, args, "Repo preparation")
304        self.test_list: List[str] = self.read_test_list(args.test_list)
305
306    @staticmethod
307    def read_test_list(test_list_name: Optional[str]) -> List[str]:
308        if test_list_name is None:
309            return []
310        filename = join(dirname(__file__), test_list_name)
311        if not Path(filename).exists():
312            print(f"File {filename} set as --test-list value cannot be found")
313            exit(1)
314        with open(filename, 'r') as stream:
315            return stream.read().split("\n")
316
317    @staticmethod
318    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
319        repo = RegressTestRepoPrepare(args)
320        RegressTestRepoPrepare.step_obj = repo
321        repo._start()
322
323        repo.run_regress_test_prepare()
324        repo.prepare_clean_data()
325        repo.get_test_case()
326        test_list = repo.get_regress_test_files()
327        skip_list = repo.get_skip_test_cases()
328        if test_reports is None:
329            test_reports = []
330        for test in test_list:
331            shorten = Utils.get_inside_path(test)
332            test_id = f"regresstest/ark-regress/{shorten}"
333            if shorten not in skip_list:
334                report = TestReport(src_path=test, test_id=test_id)
335                test_reports.append(report)
336
337        repo._end()
338        return test_reports
339
340    @staticmethod
341    def git_checkout(checkout_options, check_out_dir=os.getcwd()):
342        cmds = ['git', 'checkout', checkout_options]
343        result = True
344        with subprocess.Popen(cmds, cwd=check_out_dir) as proc:
345            ret = proc.wait()
346            if ret:
347                print(f"\n error: git checkout '{checkout_options}' failed.")
348                result = False
349        return result
350
351    @staticmethod
352    def git_pull(check_out_dir=os.getcwd()):
353        cmds = ['git', 'pull', '--rebase']
354        with subprocess.Popen(cmds, cwd=check_out_dir) as proc:
355            proc.wait()
356
357    @staticmethod
358    def git_clean(clean_dir=os.getcwd()):
359        cmds = ['git', 'checkout', '--', '.']
360        with subprocess.Popen(cmds, cwd=clean_dir) as proc:
361            proc.wait()
362
363    @staticmethod
364    def git_clone(git_url, code_dir):
365        cmds = ['git', 'clone', git_url, code_dir]
366        retries = RegressTestConfig.DEFAULT_RETRIES
367        while retries > 0:
368            with subprocess.Popen(cmds) as proc:
369                ret = proc.wait()
370                if ret:
371                    print(f"\n Error: Cloning '{git_url}' failed. Retry remaining '{retries}' times")
372                    retries -= 1
373                else:
374                    return True
375        sys.exit(1)
376
377    @staticmethod
378    def get_skip_test_cases() -> List[str]:
379        return Utils.read_skip_list(RegressTestConfig.SKIP_LIST_FILE)
380
381    def get_test_case(self):
382        if not os.path.isdir(os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, '.git')):
383            self.git_clone(RegressTestConfig.REGRESS_GIT_URL, RegressTestConfig.REGRESS_TEST_CASE_DIR)
384            return self.git_checkout(RegressTestConfig.REGRESS_GIT_HASH, RegressTestConfig.REGRESS_TEST_CASE_DIR)
385        return True
386
387    def prepare_clean_data(self):
388        self.git_clean(RegressTestConfig.REGRESS_TEST_CASE_DIR)
389        self.git_pull(RegressTestConfig.REGRESS_TEST_CASE_DIR)
390        self.git_checkout(RegressTestConfig.REGRESS_GIT_HASH, RegressTestConfig.REGRESS_TEST_CASE_DIR)
391
392    def run_regress_test_prepare(self):
393        if self.args.force_clone:
394            remove_dir(self.args.regress_out_dir)
395            remove_dir(RegressTestConfig.REGRESS_TEST_CASE_DIR)
396        os.makedirs(self.args.regress_out_dir, exist_ok=True)
397        os.makedirs(RegressTestConfig.REGRESS_TEST_CASE_DIR, exist_ok=True)
398        init_log_file(self.args)
399
400    def get_regress_test_files(self) -> List[str]:
401        result: List[str] = []
402        if self.args.test_file is not None and len(self.args.test_file) > 0:
403            test_file_list = os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, self.args.test_file)
404            result.append(str(test_file_list))
405            return result
406        elif self.args.test_dir is not None and len(self.args.test_dir) > 0:
407            test_file_list = os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, self.args.test_dir)
408        else:
409            test_file_list = RegressTestConfig.REGRESS_TEST_CASE_DIR
410        for dir_path, path, filenames in os.walk(test_file_list):
411            if dir_path.find(".git") != -1:
412                continue
413            for filename in filenames:
414                if filename.endswith(".js") or filename.endswith(".mjs"):
415                    result.append(str(os.path.join(dir_path, filename)))
416        return result
417
418
419class RegressTestCompile(RegressTestStep):
420    def __init__(self, args, test_reports: List[TestReport]):
421        RegressTestStep.__init__(self, args, "Regress test compilation")
422        self.out_dir = args.out_dir
423        self.test_reports = test_reports
424        for test in self.test_reports:
425            test.out_path = os.path.dirname(os.path.join(self.out_dir, test.test_id))
426
427    @staticmethod
428    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
429        if test_reports is None:
430            output("No tests loaded")
431            exit(-1)
432        test_prepare = RegressTestCompile(args, test_reports)
433        RegressTestCompile.step_obj = test_prepare
434        test_prepare._start()
435        test_reports = test_prepare.gen_abc_files()
436        test_prepare._end()
437        return test_reports
438
439    @staticmethod
440    def create_files_info(test_report: TestReport) -> Tuple[str, str]:
441        src_files_info = [
442            RegressTestConfig.REGRESS_TEST_TOOL_DIR,
443            test_report.src_path
444        ]
445        file_info_content: List[str] = []
446        file_info_path = str(os.path.join(
447            test_report.out_path,
448            f"{Utils.get_file_only_name(test_report.src_path)}-filesInfo.txt"))
449        os.makedirs(test_report.out_path, exist_ok=True)
450        with os.fdopen(
451                os.open(file_info_path, flags=os.O_RDWR | os.O_CREAT, mode=stat.S_IRUSR | stat.S_IWUSR),
452                mode="w+", encoding="utf-8"
453        ) as fp:
454            for src_file_info in src_files_info:
455                line = f"{src_file_info};{Utils.get_file_only_name(src_file_info)};esm;xxx;yyy\n"
456                file_info_content.append(line)
457                fp.write(line)
458        return file_info_path, "\n".join(file_info_content)
459
460    def gen_abc_files(self) -> List[TestReport]:
461        with multiprocessing.Pool(processes=self.args.processes) as pool:
462            results = pool.imap_unordered(self.gen_abc_file, self.test_reports)
463            results = list(results)
464            pool.close()
465            pool.join()
466
467        return results
468
469    def gen_abc_file(self, test_report: TestReport) -> Optional[TestReport]:
470        if test_report.src_path == RegressTestConfig.REGRESS_TEST_TOOL_DIR:
471            return None
472        file_info_path, file_info_content = self.create_files_info(test_report)
473        out_file = change_extension(test_report.src_path, '.out')
474        expect_file_exists = os.path.exists(out_file)
475        output_file = change_extension(
476            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
477            ".abc")
478        command = [
479            self.args.ark_frontend_binary,
480            f"@{file_info_path}",
481            "--merge-abc",
482            "--module",
483            f'--output={output_file}'
484        ]
485        step_result = StepResult(self.name, command=command, fileinfo=file_info_content)
486        Utils.exec_command(command, test_report.test_id, step_result, self.args.timeout)
487        step_result.is_passed = step_result.return_code == 0
488        test_report.steps.append(step_result)
489        test_report.passed = step_result.is_passed
490        if expect_file_exists:
491            out_file_path = os.path.join(test_report.out_path, change_extension(test_report.test_id, '.out'))
492            shutil.copy(str(out_file), str(out_file_path))
493        return test_report
494
495
496class RegressTestPgo(RegressTestStep):
497    def __init__(self, args):
498        RegressTestStep.__init__(self, args, "Regress Test PGO ")
499
500    @staticmethod
501    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
502        pgo = RegressTestPgo(args)
503        RegressTestPgo.step_obj = pgo
504        pgo._start()
505        test_reports = pgo.generate_aps(test_reports)
506        pgo._end()
507        return test_reports
508
509    def get_test_ap_cmd(self, test_report: TestReport) -> List[str]:
510        abc_file = change_extension(
511            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
512            ".abc")
513        ap_file = change_extension(abc_file, ".ap")
514        entry_point = Utils.get_file_only_name(RegressTestConfig.TEST_TOOL_FILE_JS_NAME)
515        os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
516        gen_ap_cmd = [
517            self.args.ark_tool,
518            "--log-level=info",
519            f"--icu-data-path={self.args.icu_path}",
520            "--enable-pgo-profiler=true",
521            "--compiler-opt-inlining=true",
522            f"--compiler-pgo-profiler-path={ap_file}",
523            "--asm-interpreter=true",
524            f"--entry-point={entry_point}",
525            f"{abc_file}",
526        ]
527        return gen_ap_cmd
528
529    def generate_ap(self, test_report: Optional[TestReport]) -> Optional[TestReport]:
530        if test_report is None or not test_report.passed:
531            return test_report
532        command = self.get_test_ap_cmd(test_report)
533        step = StepResult(self.name, command=command)
534        Utils.exec_command(command, test_report.test_id, step, self.args.timeout)
535        step.is_passed = step.return_code == 0
536        test_report.steps.append(step)
537        test_report.passed = step.is_passed
538        return test_report
539
540    def generate_aps(self, test_reports: List[TestReport]) -> List[TestReport]:
541        with multiprocessing.Pool(processes=self.args.processes) as pool:
542            results = pool.imap_unordered(self.generate_ap, test_reports)
543            results = list(results)
544            pool.close()
545            pool.join()
546
547        return results
548
549
550class Utils:
551    ark_regress = "ark-regress"
552
553    @staticmethod
554    def get_file_only_name(full_file_name: str) -> str:
555        _, file_name = os.path.split(full_file_name)
556        only_name, _ = os.path.splitext(file_name)
557        return only_name
558
559    @staticmethod
560    def get_file_name(full_file_name: str) -> str:
561        _, file_name = os.path.split(full_file_name)
562        return file_name
563
564    @staticmethod
565    def mk_dst_dir(file, src_dir, dist_dir):
566        idx = file.rfind(src_dir)
567        fpath, _ = os.path.split(file[idx:])
568        fpath = fpath.replace(src_dir, dist_dir)
569        os.makedirs(fpath, exist_ok=True)
570
571    @staticmethod
572    def get_inside_path(file_path: str, marker: Optional[str] = None) -> str:
573        if marker is None:
574            marker = Utils.ark_regress
575        index = file_path.find(marker)
576        if index > -1:
577            return file_path[index + len(marker) + 1:]
578        return file_path
579
580    @staticmethod
581    def exec_command(cmd_args, test_id: str, step_result: StepResult, timeout=RegressTestConfig.DEFAULT_TIMEOUT):
582        code_format = 'utf-8'
583        if platform.system() == "Windows":
584            code_format = 'gbk'
585        code = 0
586        cmd_string = "\n\t".join([str(arg).strip() for arg in cmd_args if arg is not None])
587        msg = [
588            f"TEST: {test_id}",
589            f"Run command:\n{cmd_string}",
590            f"Env: {os.environ.get('LD_LIBRARY_PATH')}"
591        ]
592        try:
593            with subprocess.Popen(cmd_args, stderr=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True,
594                                  start_new_session=True) as process:
595                output_res, errs = process.communicate(timeout=timeout)
596                ret_code = process.poll()
597                step_result.return_code = ret_code
598                stderr = errs.decode(code_format, 'ignore').strip()
599                stdout = output_res.decode(code_format, 'ignore').strip()
600                if stderr:
601                    step_result.stderr = stderr
602                if stdout:
603                    step_result.stdout = stdout
604        except subprocess.TimeoutExpired:
605            process.kill()
606            process.terminate()
607            code = 1
608            timeout_msg = f"Timeout: timed out after {timeout} seconds"
609            msg.append(timeout_msg)
610            step_result.return_code = code
611            step_result.stderr = timeout_msg
612        except Exception as exc:
613            code = 1
614            error_msg = f"unknown error: {exc}"
615            msg.append(error_msg)
616            step_result.return_code = code
617            step_result.stderr = error_msg
618        out_put_std(code, cmd_args, "\n".join(msg))
619
620    @staticmethod
621    def read_skip_list(skip_list_path: str) -> List[str]:
622        skip_tests_list = []
623        with os.fdopen(os.open(skip_list_path, os.O_RDONLY, stat.S_IRUSR), "r") as file_object:
624            json_data = json.load(file_object)
625            for key in json_data:
626                skip_tests_list.extend(key["files"])
627        return skip_tests_list
628
629
630class RegressTestAot(RegressTestStep):
631    def __init__(self, args):
632        RegressTestStep.__init__(self, args, "Regress Test AOT mode")
633
634    @staticmethod
635    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
636        aot = RegressTestAot(args)
637        RegressTestAot.step_obj = aot
638        aot._start()
639        test_reports = aot.compile_aots(test_reports)
640        aot._end()
641        return test_reports
642
643    def get_test_aot_cmd(self, test_report: TestReport) -> List[str]:
644        abc_file = change_extension(
645            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
646            ".abc")
647        ap_file = change_extension(abc_file, ".ap")
648        aot_file = change_extension(abc_file, "")
649        os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
650        pgo = [
651            "--compiler-opt-loop-peeling=true",
652            "--compiler-fast-compile=false",
653            "--compiler-opt-track-field=true",
654            "--compiler-opt-inlining=true",
655            "--compiler-max-inline-bytecodes=45",
656            "--compiler-opt-level=2",
657            f"--compiler-pgo-profiler-path={ap_file}",
658        ]
659        litecg = [
660            "--compiler-enable-litecg=true",
661        ]
662        aot_cmd = [
663            self.args.ark_aot_tool,
664            f"--aot-file={aot_file}",
665        ]
666        aot_cmd_tail = [
667            f"{abc_file}",
668        ]
669
670        if self.args.run_pgo:
671            aot_cmd.extend(pgo)
672        if self.args.enable_litecg:
673            aot_cmd.extend(litecg)
674        aot_cmd.extend(aot_cmd_tail)
675        return aot_cmd
676
677    def compile_aot(self, test_report: Optional[TestReport]) -> Optional[TestReport]:
678        if test_report is None or not test_report.passed:
679            return test_report
680        command = self.get_test_aot_cmd(test_report)
681        step = StepResult(self.name, command=command)
682        Utils.exec_command(command, test_report.test_id, step, self.args.timeout)
683        step.is_passed = step.return_code == 0
684        test_report.steps.append(step)
685        test_report.passed = step.is_passed
686        return test_report
687
688    def compile_aots(self, test_reports: List[TestReport]) -> List[TestReport]:
689        with multiprocessing.Pool(processes=self.args.processes) as pool:
690            results = pool.imap_unordered(self.compile_aot, test_reports)
691            results = list(results)
692            pool.close()
693            pool.join()
694
695        return results
696
697
698class RegressOption(Enum):
699    NO_FORCE_GC = auto()
700    ELEMENTS_KIND = auto()
701
702
703def get_regress_groups():
704    groups = {}
705    with os.fdopen(os.open(RegressTestConfig.REGRESS_TEST_OPTIONS, os.O_RDONLY, stat.S_IRUSR), "r") as file:
706        for group in json.load(file):
707            groups[RegressOption[group["name"]]] = group["files"]
708    return groups
709
710
711def get_test_options(test, test_groups):
712    def match(opt):
713        return test in test_groups[opt]
714
715    def toflag(b):
716        return "true" if b else "false"
717
718    opts = []
719    opts.append("--enable-force-gc=" + toflag(not match(RegressOption.NO_FORCE_GC)))
720    opts.append("--enable-elements-kind=" + toflag(match(RegressOption.ELEMENTS_KIND)))
721    return opts
722
723
724class RegressTestRun(RegressTestStep):
725    def __init__(self, args):
726        RegressTestStep.__init__(self, args, "Regress Test Run ")
727        self.test_groups = get_regress_groups()
728
729    @staticmethod
730    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
731        runner = RegressTestRun(args)
732        RegressTestRun.step_obj = runner
733        runner._start()
734        test_reports = runner.run_test_case_dir(test_reports)
735        runner._end()
736        return test_reports
737
738    def run_test_case_dir(self, test_reports: List[TestReport]) -> List[TestReport]:
739        with multiprocessing.Pool(processes=self.args.processes, initializer=init_worker,
740                                  initargs=(self.args,)) as pool:
741            results = pool.imap_unordered(self.run_test_case, test_reports)
742            results = list(results)
743            pool.close()
744            pool.join()
745
746        return results
747
748    def run_test_case(self, test_report: TestReport) -> Optional[TestReport]:
749        self.args = worker_wrapper_args
750        if self.args is None or test_report is None or not test_report.passed:
751            return test_report
752        if test_report.src_path.endswith(RegressTestConfig.TEST_TOOL_FILE_JS_NAME):
753            return None
754
755        abc_file = change_extension(
756            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
757            ".abc")
758        aot_file = change_extension(abc_file, "")
759        expect_file = change_extension(abc_file, ".out")
760        entry_point = Utils.get_file_only_name(RegressTestConfig.TEST_TOOL_FILE_JS_NAME)
761
762        os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
763
764        # test environ LC_ALL/TZ
765        test_name = test_report.test_id.replace('regresstest/ark-regress/', '')
766        set_test_environ(test_report.src_path)
767        command = []
768        command.append(self.args.ark_tool)
769        command.append(f"--icu-data-path={self.args.icu_path}")
770        command.append(f"--entry-point={entry_point}")
771        if self.args.ark_aot:
772            command.append(f"--stub-file={self.args.stub_path}")
773            command.append(f"--aot-file={aot_file}")
774        command += get_test_options(test_name, self.test_groups)
775        command.append(abc_file)
776
777        return self.run_test_case_file(command, test_report, expect_file)
778
779    def run_test_case_file(self, command, test_report: TestReport, expect_file) -> TestReport:
780        expect_file_exits = os.path.exists(expect_file)
781        if expect_file_exits:
782            return self.run_test_case_with_expect(command, test_report, expect_file)
783        else:
784            return self.run_test_case_with_assert(command, test_report)
785
786    def run_test_case_with_expect(self, command, test_report: TestReport, expect_file) -> TestReport:
787        expect_output_str = read_expect_file(expect_file, test_report.src_path)
788        step = StepResult(self.name, command=command)
789        Utils.exec_command(command, test_report.test_id, step, self.args.timeout)
790        ret_code = step.return_code
791        err_str = step.stderr
792        out_str = step.stdout
793
794        if ((ret_code == 0 or (ret_code == 255 and "/fail/" in test_case_file)) \
795            and (out_str == expect_output_str.strip() or err_str == expect_output_str.strip())):
796            result_message = f'PASS {test_report.test_id} \n'
797            out_put_std(ret_code, command, result_message)
798            step.is_passed = True
799        else:
800            msg = f'FAIL: {test_report.test_id} \nexpect: [{expect_output_str}]\nbut got: [{err_str}]'
801            out_put_std(ret_code, command, msg)
802            result_message = f'{msg} \n Command: {command} \n'
803            out_put_std(ret_code, command, result_message)
804            step.is_passed = False
805
806        test_report.steps.append(step)
807        test_report.passed = step.is_passed
808        return test_report
809
810    def run_test_case_with_assert(self, command, test_report: TestReport) -> TestReport:
811        step = StepResult(self.name, command=command)
812        Utils.exec_command(command, test_report.test_id, step, self.args.timeout)
813        ret_code = step.return_code
814        err_str = step.stderr
815
816        if ret_code != 0 or (err_str and "[ecmascript] Stack overflow" not in err_str):
817            result_message = f'FAIL: {test_report.test_id} \nerr: {str(err_str)}'
818            out_put_std(ret_code, command, result_message)
819            step.is_passed = False
820        else:
821            result_message = f'PASS {test_report.test_id} \n'
822            out_put_std(ret_code, command, result_message)
823            step.is_passed = True
824
825        test_report.steps.append(step)
826        test_report.passed = step.is_passed
827        return test_report
828
829
830class Stats:
831    def __init__(self, args, test_reports: List[TestReport]):
832        self.args = args
833        self.pass_count = 0
834        self.fail_count = 0
835        self.test_reports = test_reports
836        self.errors: Dict[str, List[TestReport]] = {}
837
838    def read_ignore_list(self) -> Optional[Set[str]]:
839        if self.args.ignore_list is None:
840            return None
841        with os.fdopen(os.open(self.args.ignore_list, os.O_RDWR, stat.S_IRUSR), "r+") as file_object:
842            lines = file_object.readlines()
843            lines = [line.strip() for line in lines if not line.strip().startswith('#')]
844        return set(lines)
845
846    def get_new_failures(self) -> Optional[List[TestReport]]:
847        ignore_list = self.read_ignore_list()
848        if ignore_list is None:
849            return None
850        new_failures: List[TestReport] = []
851        for test_report in self.test_reports:
852            if test_report and not test_report.passed and test_report.steps:
853                if test_report.test_id not in ignore_list:
854                    new_failures.append(test_report)
855        return new_failures
856
857    def statistics(self):
858        root = XTree.Element("testsuite")
859        root.set("name", "Regression")
860
861        result_file = open_write_file(self.args.out_result, False)
862        for test_report in self.test_reports:
863            if test_report is None:
864                continue
865            testcase = XTree.SubElement(root, "testcase")
866            testcase.set("name", f"{test_report.test_id}")
867            if test_report.passed:
868                write_result_file(f"PASS: {test_report.test_id}", result_file)
869                self.pass_count += 1
870            else:
871                self.fail_count += 1
872                write_result_file(f"FAIL: {test_report.test_id}", result_file)
873                failed_step = test_report.steps[-1]
874                if failed_step.step_name not in self.errors:
875                    self.errors[failed_step.step_name] = []
876                self.errors[failed_step.step_name].append(test_report)
877                XTree.SubElement(testcase, "failure").text = f"<![CDATA[{test_report.report()}]]>"
878
879        root.set("tests", f"{self.pass_count + self.fail_count}")
880        root.set("failures", f"{self.fail_count}")
881
882        tree = XTree.ElementTree(root)
883        tree.write(self.args.junit_report, xml_declaration=True, encoding="UTF-8")
884        result_file.close()
885
886    def print_result(self, args, steps):
887        result_file = open_write_file(args.out_result, True)
888        summary_duration = datetime.timedelta()
889        for step in steps:
890            print(f"Step {step.step_obj.name} - duration {step.step_obj.get_duration()}")
891            summary_duration += step.step_obj.get_duration()
892        msg = f'\npass count: {self.pass_count}'
893        write_result_file(msg, result_file)
894        print(msg)
895        msg = f'fail count: {self.fail_count}'
896        write_result_file(msg, result_file)
897        print(msg)
898        msg = f'total count: {self.fail_count + self.pass_count}'
899        write_result_file(msg, result_file)
900        print(msg)
901        msg = f'total used time is: {str(summary_duration)}'
902        write_result_file(msg, result_file)
903        print(msg)
904        result_file.close()
905
906    def print_failed_tests(self):
907        print("=== Failed tests ===")
908        for key, values in self.errors.items():
909            print(f"{key}: {len(values)} tests")
910
911
912def change_extension(path, new_ext: str):
913    base_path, ext = os.path.splitext(path)
914    if ext:
915        new_path = base_path + new_ext
916    else:
917        new_path = path + new_ext
918    return new_path
919
920
921def get_files_by_ext(start_dir, suffix):
922    result = []
923    for dir_path, dir_names, filenames in os.walk(start_dir):
924        for filename in filenames:
925            if filename.endswith(suffix):
926                result.append(os.path.join(dir_path, filename))
927    return result
928
929
930def read_expect_file(expect_file, test_case_file):
931    with os.fdopen(os.open(expect_file, os.O_RDWR, stat.S_IRUSR), "r+") as file_object:
932        lines = file_object.readlines()
933        lines = [line for line in lines if not line.strip().startswith('#')]
934        expect_output = ''.join(lines)
935        if test_case_file.startswith("/"):
936            test_case_file = test_case_file.lstrip("/")
937        expect_file = test_case_file.replace('regresstest/', '')
938        test_file_path = os.path.join(RegressTestConfig.REGRESS_BASE_TEST_DIR, expect_file)
939        expect_output_str = expect_output.replace('*%(basename)s', test_file_path)
940    return expect_output_str
941
942
943def open_write_file(file_path, append):
944    if append:
945        args = os.O_RDWR | os.O_CREAT | os.O_APPEND
946    else:
947        args = os.O_RDWR | os.O_CREAT
948    file_descriptor = os.open(file_path, args, stat.S_IRUSR | stat.S_IWUSR)
949    file_object = os.fdopen(file_descriptor, "w+")
950    return file_object
951
952
953def open_result_excel(file_path):
954    file_descriptor = os.open(file_path, os.O_RDWR | os.O_CREAT | os.O_APPEND, stat.S_IRUSR | stat.S_IWUSR)
955    file_object = os.fdopen(file_descriptor, "w+")
956    return file_object
957
958
959def get_file_source(file):
960    with open(file, encoding='ISO-8859-1') as f:
961        return f.read()
962
963
964def set_test_environ(case):
965    # intl environ LC_ALL
966    if 'LC_ALL' in os.environ:
967        del os.environ['LC_ALL']
968    if 'TZ' in os.environ:
969        del os.environ['TZ']
970    if not os.path.exists(case):
971        return
972    source = get_file_source(case)
973    env_match = ENV_PATTERN.search(source)
974    if env_match:
975        for env_pair in env_match.group(1).strip().split():
976            var, value = env_pair.split('=')
977            if var.find('TZ') >= 0:
978                os.environ['TZ'] = value
979            if var.find('LC_ALL') >= 0:
980                os.environ['LC_ALL'] = value
981            break
982
983
984# pylint: disable=invalid-name,global-statement
985worker_wrapper_args = None
986
987
988def init_worker(args):
989    global worker_wrapper_args
990    worker_wrapper_args = args
991
992
993def write_result_file(msg: str, result_file):
994    result_file.write(f'{msg}\n')
995
996
997def main(args):
998    if not check_args(args):
999        return 1
1000    print("\nStart regresstest........")
1001    steps: List[Type[RegressTestStep]] = [
1002        RegressTestRepoPrepare,
1003        RegressTestCompile,
1004    ]
1005    if args.ark_aot:
1006        if args.run_pgo:
1007            steps.append(RegressTestPgo)
1008        steps.append(RegressTestAot)
1009    steps.append(RegressTestRun)
1010
1011    test_reports: List[TestReport] = []
1012    for step in steps:
1013        test_reports = step.run(args, test_reports)
1014
1015    stats = Stats(args, test_reports)
1016    stats.statistics()
1017    stats.print_result(args, steps)
1018    stats.print_failed_tests()
1019    new_failures = stats.get_new_failures()
1020    if new_failures is None:
1021        return 0
1022    if len(new_failures) > 0:
1023        msg = [f"Found {len(new_failures)} new failures:"]
1024        for failure in new_failures:
1025            msg.append(f"\t{failure.test_id}")
1026        output("\n".join(msg))
1027    else:
1028        output("No new failures have been found")
1029    return len(new_failures)
1030
1031
1032if __name__ == "__main__":
1033    sys.exit(main(parse_args()))
1034