• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4"""
5Copyright (c) 2023-2024 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18Description: run regress test case
19"""
20import argparse
21import dataclasses
22import datetime
23import json
24import logging
25import multiprocessing
26import os
27import platform
28import re
29import shutil
30import stat
31import subprocess
32import sys
33from abc import ABC
34from typing import Optional, List, Type, Dict, Set, Tuple, Callable
35from os.path import dirname, join
36from pathlib import Path
37import xml.etree.cElementTree as XTree
38from enum import Enum, auto
39
40from regress_test_config import RegressTestConfig
41
42ENV_PATTERN = re.compile(r"//\s+Environment Variables:(.*)")
43
44
45def init_log_file(args):
46    logging.basicConfig(filename=args.out_log, format=RegressTestConfig.DEFAULT_LOG_FORMAT, level=logging.INFO)
47
48
49def parse_args():
50    parser = argparse.ArgumentParser()
51    parser.add_argument('--test-dir', metavar='DIR',
52                        help='Directory to test ')
53    parser.add_argument('--test-file', metavar='FILE',
54                        help='File to test')
55    parser.add_argument('--test-list', metavar='FILE', dest="test_list", default=None,
56                        help='File with list of tests to run')
57    parser.add_argument('--ignore-list', metavar='FILE', dest="ignore_list", default=None,
58                        help='File with known failed tests list')
59    parser.add_argument('--timeout', default=RegressTestConfig.DEFAULT_TIMEOUT, type=int,
60                        help='Set a custom test timeout in seconds !!!\n')
61    parser.add_argument('--processes', default=RegressTestConfig.DEFAULT_PROCESSES, type=int,
62                        help='set number of processes to use. Default value: 1\n')
63    parser.add_argument('--stub-path',
64                        help="stub file for run in AOT modes")
65    parser.add_argument('--LD_LIBRARY_PATH', '--libs-dir',
66                        dest='ld_library_path', default=None, help='LD_LIBRARY_PATH')
67    parser.add_argument('--icu-path',
68                        dest='icu_path', help='icu-data-path')
69    parser.add_argument('--out-dir',
70                        default=None, help='target out dir')
71    parser = parse_args_run_mode(parser)
72    return parser.parse_args()
73
74
75def parse_args_run_mode(parser):
76    parser.add_argument('--merge-abc-binary',
77                        help="merge-abc's binary tool")
78    parser.add_argument('--ark-tool',
79                        help="ark's binary tool")
80    parser.add_argument('--ark-aot-tool',
81                        help="ark_aot's binary tool")
82    parser.add_argument('--ark-aot', default=False, action='store_true',
83                        help="runs in ark-aot mode")
84    parser.add_argument('--run-pgo', default=False, action='store_true',
85                        help="runs in pgo mode")
86    parser.add_argument('--enable-litecg', default=False, action='store_true',
87                        help="runs in litecg mode")
88    parser.add_argument('--ark-frontend-binary',
89                        help="ark frontend conversion binary tool")
90    parser.add_argument('--force-clone', action="store_true",
91                        default=False, help='Force to clone tests folder')
92    parser.add_argument('--ark-arch',
93                        default=RegressTestConfig.DEFAULT_ARK_ARCH,
94                        required=False,
95                        nargs='?', choices=RegressTestConfig.ARK_ARCH_LIST, type=str)
96    parser.add_argument('--ark-arch-root',
97                        default=RegressTestConfig.DEFAULT_ARK_ARCH,
98                        required=False,
99                        help="the root path for qemu-aarch64 or qemu-arm")
100    parser.add_argument('--disable-force-gc', action='store_true',
101                        help="Run regress tests with close force-gc")
102    parser.add_argument('--multi-context', action='store_true',
103                        help="Run regress tests with multi context")
104    parser.add_argument('--compiler-opt-track-field', default=False, action='store',
105                        dest="compiler_opt_track_field",
106                        help='enable compiler opt track field. Default value: False\n')
107    return parser
108
109
110def check_ark_frontend_binary(args) -> bool:
111    if args.ark_frontend_binary is None:
112        output('ark_frontend_binary is required, please add this parameter')
113        return False
114    return True
115
116
117def check_frontend_library(args) -> bool:
118    current_dir = str(os.getcwd())
119    current_frontend_binary = os.path.join(current_dir, str(args.ark_frontend_binary))
120    test_tool_frontend_binary = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_frontend_binary)
121    if not os.path.exists(current_frontend_binary) and not os.path.exists(test_tool_frontend_binary):
122        output('entered ark_frontend_binary does not exist. please confirm')
123        return False
124    args.ark_frontend_binary = current_frontend_binary if os.path.exists(
125        current_frontend_binary) else test_tool_frontend_binary
126    args.ark_frontend_binary = os.path.abspath(args.ark_frontend_binary)
127    return True
128
129
130def check_ark_tool(args) -> bool:
131    current_dir = str(os.getcwd())
132    if args.ark_tool is None:
133        output('ark_tool is required, please add this parameter')
134        return False
135
136    current_ark_tool = os.path.join(current_dir, str(args.ark_tool))
137    test_tool_ark_tool = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_tool)
138    if not os.path.exists(current_ark_tool) and not os.path.exists(test_tool_ark_tool):
139        output('entered ark_tool does not exist. please confirm')
140        return False
141
142    args.ark_tool = current_ark_tool if os.path.exists(current_ark_tool) else test_tool_ark_tool
143    args.ark_tool = os.path.abspath(args.ark_tool)
144    return True
145
146
147def check_ark_aot(args) -> bool:
148    if args.ark_aot:
149        current_dir = str(os.getcwd())
150        current_ark_aot_tool = os.path.join(current_dir, str(args.ark_aot_tool))
151        test_tool_ark_aot_tool = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_aot_tool)
152        if not os.path.exists(current_ark_aot_tool) and not os.path.exists(test_tool_ark_aot_tool):
153            output(f'entered ark_aot_tool "{args.ark_aot_tool}" does not exist. Please check')
154            return False
155        args.ark_aot_tool = current_ark_aot_tool if os.path.exists(current_ark_aot_tool) else test_tool_ark_aot_tool
156        args.ark_aot_tool = os.path.abspath(args.ark_aot_tool)
157        return True
158    if args.run_pgo and not args.ark_aot:
159        output('pgo mode cannot be used without aot')
160        return False
161    return True
162
163
164def check_stub_path(args) -> bool:
165    if args.stub_path:
166        current_dir = str(os.getcwd())
167        stub_path = os.path.join(current_dir, str(args.stub_path))
168        if not os.path.exists(stub_path):
169            output(f'entered stub-file "{args.stub_path}" does not exist. Please check')
170            return False
171        args.stub_path = os.path.abspath(args.stub_path)
172    return True
173
174
175def is_ignore_file_present(ignore_path: str) -> bool:
176    if os.path.exists(ignore_path):
177        return True
178    output(f"Cannot find ignore list '{ignore_path}'")
179    return False
180
181
182def check_ignore_list(args) -> bool:
183    if args.ignore_list:
184        if os.path.isabs(args.ignore_list):
185            return is_ignore_file_present(args.ignore_list)
186        args.ignore_list = str(os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ignore_list))
187        return is_ignore_file_present(args.ignore_list)
188    return True
189
190
191def check_args(args):
192    result = check_ark_frontend_binary(args)
193    result = result and check_frontend_library(args)
194    result = result and check_ark_tool(args)
195    result = result and check_ark_aot(args)
196    result = result and check_stub_path(args)
197    result = result and check_ignore_list(args)
198
199    if not result:
200        return False
201
202    if args.ld_library_path is not None:
203        libs = args.ld_library_path.split(":")
204        current_dir = str(os.getcwd())
205        libs = [os.path.abspath(os.path.join(current_dir, str(lib))) for lib in libs]
206        args.ld_library_path = ":".join(libs)
207    else:
208        args.ld_library_path = RegressTestConfig.DEFAULT_LIBS_DIR
209    if args.icu_path is None:
210        args.icu_path = RegressTestConfig.ICU_PATH
211    if args.out_dir is None:
212        args.out_dir = RegressTestConfig.PROJECT_BASE_OUT_DIR
213    else:
214        args.out_dir = os.path.abspath(os.path.join(RegressTestConfig.CURRENT_PATH, args.out_dir))
215    if not args.out_dir.endswith("/"):
216        args.out_dir = f"{args.out_dir}/"
217    args.regress_out_dir = os.path.join(args.out_dir, "regresstest")
218    args.out_result = os.path.join(args.regress_out_dir, 'result.txt')
219    args.junit_report = os.path.join(args.regress_out_dir, 'report.xml')
220    args.out_log = os.path.join(args.regress_out_dir, 'test.log')
221    args.test_case_out_dir = os.path.join(args.regress_out_dir, RegressTestConfig.REGRESS_GIT_REPO)
222    return True
223
224
225def remove_dir(path):
226    if os.path.exists(path):
227        shutil.rmtree(path)
228
229
230def output(msg):
231    print(str(msg))
232    logging.info(str(msg))
233
234
235def output_debug(msg):
236    logging.debug(str(msg))
237
238
239def get_extra_error_message(ret_code: int) -> str:
240    error_messages = {
241        0: '',
242        -6: 'Aborted (core dumped)',
243        -4: 'Aborted (core dumped)',
244        -11: 'Segmentation fault (core dumped)',
245        255: '(uncaught error)'
246    }
247    error_message = error_messages.get(ret_code, f'Unknown Error: {str(ret_code)}')
248    return error_message
249
250
251@dataclasses.dataclass
252class StepResult:
253    step_name: str  # a copy of the step name
254    is_passed: bool = False  # True if passed, any other state is False
255    command: List[str] = dataclasses.field(default_factory=list)  # command to run
256    return_code: int = -1
257    stdout: Optional[str] = None  # present only if there is some output
258    stderr: Optional[str] = None  # can be present only if is_passed == False
259    fileinfo: Optional[str] = None  # content of fileinfo file if present
260
261    def report(self) -> str:
262        stdout = self.stdout if self.stdout else ''
263        stderr = self.stderr if self.stderr else ''
264        cmd = " ".join([str(cmd) for cmd in self.command])
265        result: List[str] = [
266            f"{self.step_name}:",
267            f"\tCommand: {cmd}",
268            f"\treturn code={self.return_code}",
269            f"\toutput='{stdout}'",
270            f"\terrors='{stderr}'"]
271        if self.fileinfo:
272            result.append(f"\tFileInfo:\n{self.fileinfo}")
273        return "\n".join(result)
274
275
276@dataclasses.dataclass
277class TestReport:
278    src_path: str  # full path to the source test
279    test_id: str = ""  # path starting from regresstest
280    out_path: str = ""  # full path to intermediate files up to folder
281    passed: bool = False  # False if the test has not started or failed
282    is_skipped: bool = False  # True if the test has found in the skipped (excluded) list
283    is_ignored: bool = False  # True if the test has found in the ignored list
284    steps: List[StepResult] = dataclasses.field(default_factory=list)  # list of results
285
286    def report(self) -> str:
287        result: List[str] = [f"{self.test_id}:"]
288        if self.steps is None:
289            return ""
290        for step in self.steps:
291            result.append(f"\t{step.report()}")
292        return "\n".join(result)
293
294
295class RegressTestStep(ABC):
296    step_obj: Optional['RegressTestStep'] = None
297
298    def __init__(self, args, name):
299        output(f"--- Start step {name} ---")
300        self.args = args
301        self.__start: Optional[datetime.datetime] = None
302        self.__end: Optional[datetime.datetime] = None
303        self.__duration: Optional[datetime.timedelta] = None
304        self.name: str = name
305
306    @staticmethod
307    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
308        pass
309
310    def get_duration(self) -> datetime.timedelta:
311        if self.__duration is None:
312            output(f"Step {self.name} not started or not completed")
313            sys.exit(1)
314        return self.__duration
315
316    def _start(self):
317        self.__start = datetime.datetime.now()
318
319    def _end(self):
320        self.__end = datetime.datetime.now()
321        self.__duration = self.__end - self.__start
322
323
324class RegressTestRepoPrepare(RegressTestStep):
325    def __init__(self, args):
326        RegressTestStep.__init__(self, args, "Repo preparation")
327        self.test_list: List[str] = self.read_test_list(args.test_list)
328
329    @staticmethod
330    def read_test_list(test_list_name: Optional[str]) -> List[str]:
331        if test_list_name is None:
332            return []
333        filename = join(dirname(__file__), test_list_name)
334        if not Path(filename).exists():
335            output(f"File {filename} set as --test-list value cannot be found")
336            exit(1)
337        with open(filename, 'r') as stream:
338            return stream.read().split("\n")
339
340    @staticmethod
341    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
342        repo = RegressTestRepoPrepare(args)
343        RegressTestRepoPrepare.step_obj = repo
344        repo._start()
345
346        repo.run_regress_test_prepare()
347        repo.prepare_clean_data()
348        repo.get_test_case()
349        test_list = repo.get_regress_test_files()
350        skip_list = repo.get_skip_test_cases()
351        if test_reports is None:
352            test_reports = []
353        for test in test_list:
354            shorten = Utils.get_inside_path(test)
355            test_id = f"regresstest/ark-regress/{shorten}"
356            if shorten not in skip_list:
357                report = TestReport(src_path=test, test_id=test_id)
358                test_reports.append(report)
359
360        repo._end()
361        return test_reports
362
363    @staticmethod
364    def git_checkout(checkout_options, check_out_dir=os.getcwd()):
365        cmds = ['git', 'checkout', checkout_options]
366        result = True
367        with subprocess.Popen(cmds, cwd=check_out_dir) as proc:
368            ret = proc.wait()
369            if ret:
370                output(f"\n error: git checkout '{checkout_options}' failed.")
371                result = False
372        return result
373
374    @staticmethod
375    def git_pull(check_out_dir=os.getcwd()):
376        cmds = ['git', 'pull', '--rebase']
377        with subprocess.Popen(cmds, cwd=check_out_dir) as proc:
378            proc.wait()
379
380    @staticmethod
381    def git_clean(clean_dir=os.getcwd()):
382        cmds = ['git', 'checkout', '--', '.']
383        with subprocess.Popen(cmds, cwd=clean_dir) as proc:
384            proc.wait()
385
386    @staticmethod
387    def git_clone(git_url, code_dir):
388        cmds = ['git', 'clone', git_url, code_dir]
389        retries = RegressTestConfig.DEFAULT_RETRIES
390        while retries > 0:
391            with subprocess.Popen(cmds) as proc:
392                ret = proc.wait()
393                if ret:
394                    output(f"\n Error: Cloning '{git_url}' failed. Retry remaining '{retries}' times")
395                    retries -= 1
396                else:
397                    return True
398        sys.exit(1)
399
400    @staticmethod
401    def get_skip_test_cases() -> List[str]:
402        return Utils.read_skip_list(RegressTestConfig.SKIP_LIST_FILE)
403
404    def get_test_case(self):
405        if not os.path.isdir(os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, '.git')):
406            self.git_clone(RegressTestConfig.REGRESS_GIT_URL, RegressTestConfig.REGRESS_TEST_CASE_DIR)
407            return self.git_checkout(RegressTestConfig.REGRESS_GIT_HASH, RegressTestConfig.REGRESS_TEST_CASE_DIR)
408        return True
409
410    def prepare_clean_data(self):
411        self.git_clean(RegressTestConfig.REGRESS_TEST_CASE_DIR)
412        self.git_pull(RegressTestConfig.REGRESS_TEST_CASE_DIR)
413        self.git_checkout(RegressTestConfig.REGRESS_GIT_HASH, RegressTestConfig.REGRESS_TEST_CASE_DIR)
414
415    def run_regress_test_prepare(self):
416        if self.args.force_clone:
417            remove_dir(self.args.regress_out_dir)
418            remove_dir(RegressTestConfig.REGRESS_TEST_CASE_DIR)
419        os.makedirs(self.args.regress_out_dir, exist_ok=True)
420        os.makedirs(RegressTestConfig.REGRESS_TEST_CASE_DIR, exist_ok=True)
421        init_log_file(self.args)
422
423    def get_regress_test_files(self) -> List[str]:
424        result: List[str] = []
425        if self.args.test_file is not None and len(self.args.test_file) > 0:
426            test_file_list = os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, self.args.test_file)
427            result.append(str(test_file_list))
428            return result
429        elif self.args.test_dir is not None and len(self.args.test_dir) > 0:
430            test_file_list = os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, self.args.test_dir)
431        else:
432            test_file_list = RegressTestConfig.REGRESS_TEST_CASE_DIR
433        for dir_path, path, filenames in os.walk(test_file_list):
434            if dir_path.find(".git") != -1:
435                continue
436            for filename in filenames:
437                if filename.endswith(".js") or filename.endswith(".mjs"):
438                    result.append(str(os.path.join(dir_path, filename)))
439        return result
440
441
442class RegressTestCompile(RegressTestStep):
443    def __init__(self, args, test_reports: List[TestReport]):
444        RegressTestStep.__init__(self, args, "Regress test compilation")
445        self.out_dir = args.out_dir
446        self.test_reports = test_reports
447        for test in self.test_reports:
448            test.out_path = os.path.dirname(os.path.join(self.out_dir, test.test_id))
449
450    @staticmethod
451    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
452        if test_reports is None:
453            output("No tests loaded")
454            exit(-1)
455        test_prepare = RegressTestCompile(args, test_reports)
456        RegressTestCompile.step_obj = test_prepare
457        test_prepare._start()
458        test_reports = test_prepare.gen_abc_files()
459        test_prepare._end()
460        return test_reports
461
462    @staticmethod
463    def create_files_info(test_report: TestReport) -> Tuple[str, str]:
464        src_files_info = [
465            RegressTestConfig.REGRESS_TEST_TOOL_DIR,
466            test_report.src_path
467        ]
468        file_info_content: List[str] = []
469        file_info_path = str(os.path.join(
470            test_report.out_path,
471            f"{Utils.get_file_only_name(test_report.src_path)}-filesInfo.txt"))
472        os.makedirs(test_report.out_path, exist_ok=True)
473        with os.fdopen(
474                os.open(file_info_path, flags=os.O_RDWR | os.O_CREAT, mode=stat.S_IRUSR | stat.S_IWUSR),
475                mode="w+", encoding="utf-8"
476        ) as fp:
477            for src_file_info in src_files_info:
478                line = f"{src_file_info};{Utils.get_file_only_name(src_file_info)};esm;xxx;yyy\n"
479                file_info_content.append(line)
480                fp.write(line)
481        return file_info_path, "\n".join(file_info_content)
482
483    def gen_abc_files(self) -> List[TestReport]:
484        with multiprocessing.Pool(processes=self.args.processes) as pool:
485            results = pool.imap_unordered(self.gen_abc_file, self.test_reports)
486            results = list(results)
487            pool.close()
488            pool.join()
489
490        return results
491
492    def gen_abc_file(self, test_report: TestReport) -> Optional[TestReport]:
493        if test_report.src_path == RegressTestConfig.REGRESS_TEST_TOOL_DIR:
494            return None
495        file_info_path, file_info_content = self.create_files_info(test_report)
496        out_file = change_extension(test_report.src_path, '.out')
497        expect_file_exists = os.path.exists(out_file)
498        output_file = change_extension(
499            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
500            ".abc")
501        command = [
502            self.args.ark_frontend_binary,
503            f"@{file_info_path}",
504            "--merge-abc",
505            "--module",
506            f'--output={output_file}'
507        ]
508        step_result = StepResult(self.name, command=command, fileinfo=file_info_content)
509        Utils.exec_command(command, test_report.test_id, step_result, self.args.timeout,
510                           lambda rt, _, _2: get_extra_error_message(rt))
511        test_report.steps.append(step_result)
512        test_report.passed = step_result.is_passed
513        if expect_file_exists:
514            out_file_path = os.path.join(test_report.out_path, change_extension(test_report.test_id, '.out'))
515            shutil.copy(str(out_file), str(out_file_path))
516        return test_report
517
518
519class RegressTestPgo(RegressTestStep):
520    def __init__(self, args):
521        RegressTestStep.__init__(self, args, "Regress Test PGO ")
522
523    @staticmethod
524    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
525        pgo = RegressTestPgo(args)
526        RegressTestPgo.step_obj = pgo
527        pgo._start()
528        test_reports = pgo.generate_aps(test_reports)
529        pgo._end()
530        return test_reports
531
532    def get_test_ap_cmd(self, test_report: TestReport) -> List[str]:
533        abc_file = change_extension(
534            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
535            ".abc")
536        ap_file = change_extension(abc_file, ".ap")
537        entry_point = Utils.get_file_only_name(RegressTestConfig.TEST_TOOL_FILE_JS_NAME)
538        os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
539        gen_ap_cmd = []
540        if self.args.ark_arch == RegressTestConfig.ARK_ARCH_LIST[1]:
541            qemu_tool = "qemu-aarch64"
542            gen_ap_cmd = [
543                qemu_tool,
544                "-L",
545                self.args.ark_arch_root
546            ]
547        gen_ap_cmd.append(self.args.ark_tool)
548        gen_ap_cmd.append("--log-level=info")
549        gen_ap_cmd.append(f"--icu-data-path={self.args.icu_path}")
550        gen_ap_cmd.append("--compiler-target-triple=aarch64-unknown-linux-gn")
551        gen_ap_cmd.append("--enable-pgo-profiler=true")
552        gen_ap_cmd.append("--compiler-opt-inlining=true")
553        gen_ap_cmd.append(f"--compiler-pgo-profiler-path={ap_file}")
554        gen_ap_cmd.append("--asm-interpreter=true")
555        gen_ap_cmd.append(f"--entry-point={entry_point}")
556        gen_ap_cmd.append(f"{abc_file}")
557        return gen_ap_cmd
558
559    def generate_ap(self, test_report: Optional[TestReport]) -> Optional[TestReport]:
560        if test_report is None or not test_report.passed:
561            return test_report
562        command = self.get_test_ap_cmd(test_report)
563        step = StepResult(self.name, command=command)
564        Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
565                           lambda rt, _, _2: get_extra_error_message(rt))
566        test_report.steps.append(step)
567        test_report.passed = step.is_passed
568        return test_report
569
570    def generate_aps(self, test_reports: List[TestReport]) -> List[TestReport]:
571        with multiprocessing.Pool(processes=self.args.processes) as pool:
572            results = pool.imap_unordered(self.generate_ap, test_reports)
573            results = list(results)
574            pool.close()
575            pool.join()
576
577        return results
578
579
580class Utils:
581    ark_regress = "ark-regress"
582
583    @staticmethod
584    def get_file_only_name(full_file_name: str) -> str:
585        _, file_name = os.path.split(full_file_name)
586        only_name, _ = os.path.splitext(file_name)
587        return only_name
588
589    @staticmethod
590    def get_file_name(full_file_name: str) -> str:
591        _, file_name = os.path.split(full_file_name)
592        return file_name
593
594    @staticmethod
595    def mk_dst_dir(file, src_dir, dist_dir):
596        idx = file.rfind(src_dir)
597        fpath, _ = os.path.split(file[idx:])
598        fpath = fpath.replace(src_dir, dist_dir)
599        os.makedirs(fpath, exist_ok=True)
600
601    @staticmethod
602    def get_inside_path(file_path: str, marker: Optional[str] = None) -> str:
603        if marker is None:
604            marker = Utils.ark_regress
605        index = file_path.find(marker)
606        if index > -1:
607            return file_path[index + len(marker) + 1:]
608        return file_path
609
610    @staticmethod
611    def exec_command(cmd_args, test_id: str, step_result: StepResult, timeout=RegressTestConfig.DEFAULT_TIMEOUT,
612                     get_extra_error_msg: Optional[Callable[[int, str, str], str]] = None) -> None:
613        code_format = 'utf-8'
614        if platform.system() == "Windows":
615            code_format = 'gbk'
616        cmd_string = "\n\t".join([str(arg).strip() for arg in cmd_args if arg is not None])
617        msg_cmd = "\n".join([
618            f"Run command:\n{cmd_string}",
619            f"Env: {os.environ.get('LD_LIBRARY_PATH')}"
620        ])
621        msg_result = f"TEST ({step_result.step_name.strip()}): {test_id}"
622        try:
623            with subprocess.Popen(cmd_args, stderr=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True,
624                                  start_new_session=True) as process:
625                output_res, errs = process.communicate(timeout=timeout)
626                ret_code = process.poll()
627                step_result.return_code = ret_code
628                stderr = str(errs.decode(code_format, 'ignore').strip())
629                stdout = str(output_res.decode(code_format, 'ignore').strip())
630                extra_message = get_extra_error_msg(ret_code, stdout, stderr) if get_extra_error_msg is not None else ""
631                step_result.stderr = f"{extra_message + '. ' if extra_message else '' }{stderr if stderr else ''}"
632                step_result.stdout = stdout
633                if ret_code == 0:
634                    msg_result = f"{msg_result} PASSED"
635                    step_result.is_passed = True
636                else:
637                    msg_result = f"{msg_result} FAILED"
638        except subprocess.TimeoutExpired:
639            process.kill()
640            process.terminate()
641            step_result.return_code = -1
642            step_result.stderr = f"Timeout: timed out after {timeout} seconds"
643            msg_result = f"{msg_result} FAILED"
644        except Exception as exc:
645            step_result.return_code = -1
646            step_result.stderr = f"Unknown error: {exc}"
647            msg_result = f"{msg_result} FAILED"
648        if step_result.is_passed:
649            output(msg_result)
650            output_debug(msg_cmd)
651        else:
652            output(f"{msg_result}\n{step_result.stderr}\n{msg_cmd}")
653
654    @staticmethod
655    def read_skip_list(skip_list_path: str) -> List[str]:
656        skip_tests_list = []
657        with os.fdopen(os.open(skip_list_path, os.O_RDONLY, stat.S_IRUSR), "r") as file_object:
658            json_data = json.load(file_object)
659            for key in json_data:
660                skip_tests_list.extend(key["files"])
661        return skip_tests_list
662
663    @staticmethod
664    def read_file_as_str(file_name: str) -> str:
665        with os.fdopen(os.open(file_name, os.O_RDONLY, stat.S_IRUSR), "r") as file_object:
666            content = [line.strip() for line in file_object.readlines()]
667        return "\n".join(content)
668
669
670class RegressTestAot(RegressTestStep):
671    def __init__(self, args):
672        RegressTestStep.__init__(self, args, "Regress Test AOT mode")
673
674    @staticmethod
675    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
676        aot = RegressTestAot(args)
677        RegressTestAot.step_obj = aot
678        aot._start()
679        test_reports = aot.compile_aots(test_reports)
680        aot._end()
681        return test_reports
682
683    def get_test_aot_cmd(self, test_report: TestReport) -> List[str]:
684        abc_file = change_extension(
685            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
686            ".abc")
687        ap_file = change_extension(abc_file, ".ap")
688        aot_file = change_extension(abc_file, "")
689        compiler_opt_track_field = self.args.compiler_opt_track_field
690        os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
691        if self.args.ark_arch == RegressTestConfig.ARK_ARCH_LIST[1]:
692            aot_cmd = [
693                "qemu-aarch64",
694                "-L",
695                self.args.ark_arch_root,
696                self.args.ark_aot_tool,
697                "--compiler-target-triple=aarch64-unknown-linux-gnu",
698                f"--aot-file={aot_file}"
699            ]
700        else:
701            aot_cmd = [
702            self.args.ark_aot_tool,
703            f"--aot-file={aot_file}",
704        ]
705        pgo = [
706            "--compiler-opt-loop-peeling=true",
707            "--compiler-fast-compile=false",
708            f"--compiler-opt-track-field={compiler_opt_track_field}",
709            "--compiler-opt-inlining=true",
710            "--compiler-max-inline-bytecodes=45",
711            "--compiler-opt-level=2",
712            f"--compiler-pgo-profiler-path={ap_file}",
713        ]
714        litecg = [
715            "--compiler-enable-litecg=true",
716        ]
717        aot_cmd_tail = [
718            f"{abc_file}",
719        ]
720
721        if self.args.run_pgo:
722            aot_cmd.extend(pgo)
723        if self.args.enable_litecg:
724            aot_cmd.extend(litecg)
725        aot_cmd.extend(aot_cmd_tail)
726        return aot_cmd
727
728    def compile_aot(self, test_report: Optional[TestReport]) -> Optional[TestReport]:
729        if test_report is None or not test_report.passed:
730            return test_report
731        command = self.get_test_aot_cmd(test_report)
732        step = StepResult(self.name, command=command)
733        Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
734                           lambda rt, _, _2: get_extra_error_message(rt))
735        test_report.steps.append(step)
736        test_report.passed = step.is_passed
737        return test_report
738
739    def compile_aots(self, test_reports: List[TestReport]) -> List[TestReport]:
740        with multiprocessing.Pool(processes=self.args.processes) as pool:
741            results = pool.imap_unordered(self.compile_aot, test_reports)
742            results = list(results)
743            pool.close()
744            pool.join()
745
746        return results
747
748
749class RegressOption(Enum):
750    NO_FORCE_GC = auto()
751    ELEMENTS_KIND = auto()
752
753
754def get_regress_groups() -> Dict[RegressOption, List[str]]:
755    groups = {}
756    with os.fdopen(os.open(RegressTestConfig.REGRESS_TEST_OPTIONS, os.O_RDONLY, stat.S_IRUSR), "r") as file:
757        for group in json.load(file):
758            groups[RegressOption[group["name"]]] = group["files"]
759    return groups
760
761
762def get_test_options(test: str, test_groups: Dict[RegressOption, List[str]], regress_option: RegressOption) -> List[str]:
763    opt_values: Dict[RegressOption, str] = {
764        RegressOption.NO_FORCE_GC: "--enable-force-gc=",
765        RegressOption.ELEMENTS_KIND: "--enable-elements-kind="
766    }
767
768    def match(opt: RegressOption) -> bool:
769        return test in test_groups.get(opt, [])
770
771    def to_flag(b: bool) -> str:
772        return str(b).lower()
773
774    try:
775        return [opt_values.get(regress_option) + to_flag(not match(regress_option))]
776    except KeyError:
777        return []
778
779
780class RegressTestRun(RegressTestStep):
781    def __init__(self, args):
782        RegressTestStep.__init__(self, args, "Regress Test Run ")
783        self.test_groups: Dict[RegressOption, List[str]] = get_regress_groups()
784
785    @staticmethod
786    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
787        runner = RegressTestRun(args)
788        RegressTestRun.step_obj = runner
789        runner._start()
790        test_reports = runner.run_test_case_dir(test_reports)
791        runner._end()
792        return test_reports
793
794    @staticmethod
795    def extra_check_with_expect(ret_code: int, test_report: TestReport, expect_file, stdout: str, stderr: str) -> str:
796        expect_output_str = read_expect_file(expect_file, test_report.src_path)
797        test_case_file = Utils.read_file_as_str(test_report.src_path)
798        if stdout == expect_output_str.strip() or stderr == expect_output_str.strip():
799            if ret_code == 0 or (ret_code == 255 and "/fail/" in test_case_file):
800                return ""
801            else:
802                return get_extra_error_message(ret_code)
803        msg = f'expect: [{expect_output_str}]\nbut got: [{stderr}]'
804        return msg
805
806    @staticmethod
807    def extra_check_with_assert(ret_code: int, stderr: Optional[str]) -> str:
808        if ret_code == 0 and stderr and "[ecmascript] Stack overflow" not in stderr:
809            return str(stderr)
810        return get_extra_error_message(ret_code)
811
812    def run_test_case_dir(self, test_reports: List[TestReport]) -> List[TestReport]:
813        with multiprocessing.Pool(processes=self.args.processes, initializer=init_worker,
814                                  initargs=(self.args,)) as pool:
815            results = pool.imap_unordered(self.run_test_case, test_reports)
816            results = list(results)
817            pool.close()
818            pool.join()
819
820        return results
821
822    def run_test_case(self, test_report: TestReport) -> Optional[TestReport]:
823        self.args = worker_wrapper_args
824        if self.args is None or test_report is None or not test_report.passed:
825            return test_report
826        if test_report.src_path.endswith(RegressTestConfig.TEST_TOOL_FILE_JS_NAME):
827            return None
828
829        abc_file = change_extension(
830            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
831            ".abc")
832        aot_file = change_extension(abc_file, "")
833        expect_file = change_extension(abc_file, ".out")
834        entry_point = Utils.get_file_only_name(RegressTestConfig.TEST_TOOL_FILE_JS_NAME)
835
836        os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
837
838        # test environ LC_ALL/TZ
839        test_name = test_report.test_id.replace('regresstest/ark-regress/', '')
840        set_test_environ(test_report.src_path)
841        command = []
842        if self.args.ark_arch == RegressTestConfig.ARK_ARCH_LIST[1]:
843            qemu_tool = "qemu-aarch64"
844            qemu_arg1 = "-L"
845            qemu_arg2 = self.args.ark_arch_root
846            command = [qemu_tool, qemu_arg1, qemu_arg2]
847        command.append(self.args.ark_tool)
848        command.append(f"--icu-data-path={self.args.icu_path}")
849        command.append(f"--entry-point={entry_point}")
850        if self.args.ark_aot:
851            command.append(f"--stub-file={self.args.stub_path}")
852            command.append(f"--aot-file={aot_file}")
853        if self.args.multi_context:
854            command.append("--multi-context=true")
855        if self.args.disable_force_gc:
856            command.append("--enable-force-gc=false")
857        else:
858            command.extend(get_test_options(test_name, self.test_groups, RegressOption.NO_FORCE_GC))
859        command.extend(get_test_options(test_name, self.test_groups, RegressOption.ELEMENTS_KIND))
860        command.append(abc_file)
861
862        return self.run_test_case_file(command, test_report, expect_file)
863
864    def run_test_case_file(self, command, test_report: TestReport, expect_file) -> TestReport:
865        expect_file_exits = os.path.exists(expect_file)
866        step = StepResult(self.name, command=command)
867        if expect_file_exits:
868            self.run_test_case_with_expect(command, step, test_report, expect_file)
869        else:
870            self.run_test_case_with_assert(command, step, test_report)
871        test_report.steps.append(step)
872        test_report.passed = step.is_passed
873        return test_report
874
875    def run_test_case_with_expect(self, command, step: StepResult, test_report: TestReport, expect_file) -> None:
876        Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
877                           lambda rt, out, err: self.extra_check_with_expect(rt, test_report, expect_file, out, err))
878
879    def run_test_case_with_assert(self, command, step: StepResult, test_report: TestReport) -> None:
880        Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
881                           lambda rt, _, err: self.extra_check_with_assert(rt, err))
882
883
884class Stats:
885    def __init__(self, args, test_reports: List[TestReport]):
886        self.args = args
887        self.pass_count = 0
888        self.fail_count = 0
889        self.test_reports = test_reports
890        self.errors: Dict[str, List[TestReport]] = {}
891
892    def read_ignore_list(self) -> Optional[Set[str]]:
893        if self.args.ignore_list is None:
894            return None
895        with os.fdopen(os.open(self.args.ignore_list, os.O_RDWR, stat.S_IRUSR), "r+") as file_object:
896            lines = file_object.readlines()
897            lines = [line.strip() for line in lines if not line.strip().startswith('#')]
898        return set(lines)
899
900    def get_new_failures(self) -> Optional[List[TestReport]]:
901        ignore_list = self.read_ignore_list()
902        if ignore_list is None:
903            return None
904        new_failures: List[TestReport] = []
905        for test_report in self.test_reports:
906            if test_report and not test_report.passed and test_report.steps:
907                if test_report.test_id not in ignore_list:
908                    new_failures.append(test_report)
909        return new_failures
910
911    def statistics(self):
912        root = XTree.Element("testsuite")
913        root.set("name", "Regression")
914
915        result_file = open_write_file(self.args.out_result, False)
916        for test_report in self.test_reports:
917            if test_report is None:
918                continue
919            testcase = XTree.SubElement(root, "testcase")
920            testcase.set("name", f"{test_report.test_id}")
921            if test_report.passed:
922                write_result_file(f"PASS: {test_report.test_id}", result_file)
923                self.pass_count += 1
924            else:
925                self.fail_count += 1
926                write_result_file(f"FAIL: {test_report.test_id}", result_file)
927                failed_step = test_report.steps[-1]
928                if failed_step.step_name not in self.errors:
929                    self.errors[failed_step.step_name] = []
930                self.errors[failed_step.step_name].append(test_report)
931                XTree.SubElement(testcase, "failure").text = f"<![CDATA[{test_report.report()}]]>"
932
933        root.set("tests", f"{self.pass_count + self.fail_count}")
934        root.set("failures", f"{self.fail_count}")
935
936        tree = XTree.ElementTree(root)
937        tree.write(self.args.junit_report, xml_declaration=True, encoding="UTF-8")
938        result_file.close()
939
940    def print_result(self, args, steps):
941        result_file = open_write_file(args.out_result, True)
942        summary_duration = datetime.timedelta()
943        for step in steps:
944            output(f"Step {step.step_obj.name} - duration {step.step_obj.get_duration()}")
945            summary_duration += step.step_obj.get_duration()
946        msg = f'\npass count: {self.pass_count}'
947        write_result_file(msg, result_file)
948        output(msg)
949        msg = f'fail count: {self.fail_count}'
950        write_result_file(msg, result_file)
951        output(msg)
952        msg = f'total count: {self.fail_count + self.pass_count}'
953        write_result_file(msg, result_file)
954        output(msg)
955        msg = f'total used time is: {str(summary_duration)}'
956        write_result_file(msg, result_file)
957        output(msg)
958        result_file.close()
959
960    def print_failed_tests(self):
961        output("=== Failed tests ===")
962        for key, values in self.errors.items():
963            output(f"{key}: {len(values)} tests")
964
965
966def change_extension(path, new_ext: str):
967    base_path, ext = os.path.splitext(path)
968    if ext:
969        new_path = base_path + new_ext
970    else:
971        new_path = path + new_ext
972    return new_path
973
974
975def get_files_by_ext(start_dir, suffix):
976    result = []
977    for dir_path, dir_names, filenames in os.walk(start_dir):
978        for filename in filenames:
979            if filename.endswith(suffix):
980                result.append(os.path.join(dir_path, filename))
981    return result
982
983
984def read_expect_file(expect_file, test_case_file):
985    with os.fdopen(os.open(expect_file, os.O_RDWR, stat.S_IRUSR), "r+") as file_object:
986        lines = file_object.readlines()
987        lines = [line for line in lines if not line.strip().startswith('#')]
988        expect_output = ''.join(lines)
989        if test_case_file.startswith("/"):
990            test_case_file = test_case_file.lstrip("/")
991        expect_file = test_case_file.replace('regresstest/', '')
992        test_file_path = os.path.join(RegressTestConfig.REGRESS_BASE_TEST_DIR, expect_file)
993        expect_output_str = expect_output.replace('*%(basename)s', test_file_path)
994    return expect_output_str
995
996
997def open_write_file(file_path, append):
998    if append:
999        args = os.O_RDWR | os.O_CREAT | os.O_APPEND
1000    else:
1001        args = os.O_RDWR | os.O_CREAT
1002    file_descriptor = os.open(file_path, args, stat.S_IRUSR | stat.S_IWUSR)
1003    file_object = os.fdopen(file_descriptor, "w+")
1004    return file_object
1005
1006
1007def open_result_excel(file_path):
1008    file_descriptor = os.open(file_path, os.O_RDWR | os.O_CREAT | os.O_APPEND, stat.S_IRUSR | stat.S_IWUSR)
1009    file_object = os.fdopen(file_descriptor, "w+")
1010    return file_object
1011
1012
1013def get_file_source(file):
1014    with open(file, encoding='ISO-8859-1') as f:
1015        return f.read()
1016
1017
1018def set_test_environ(case):
1019    # intl environ LC_ALL
1020    if 'LC_ALL' in os.environ:
1021        del os.environ['LC_ALL']
1022    if 'TZ' in os.environ:
1023        del os.environ['TZ']
1024    if not os.path.exists(case):
1025        return
1026    source = get_file_source(case)
1027    env_match = ENV_PATTERN.search(source)
1028    if env_match:
1029        for env_pair in env_match.group(1).strip().split():
1030            var, value = env_pair.split('=')
1031            if var.find('TZ') >= 0:
1032                os.environ['TZ'] = value
1033            if var.find('LC_ALL') >= 0:
1034                os.environ['LC_ALL'] = value
1035            break
1036
1037
1038# pylint: disable=invalid-name,global-statement
1039worker_wrapper_args = None
1040
1041
1042def init_worker(args):
1043    global worker_wrapper_args
1044    worker_wrapper_args = args
1045
1046
1047def write_result_file(msg: str, result_file):
1048    result_file.write(f'{msg}\n')
1049
1050
1051def main(args):
1052    if not check_args(args):
1053        return 1
1054    output("\nStart regresstest........")
1055    steps: List[Type[RegressTestStep]] = [
1056        RegressTestRepoPrepare,
1057        RegressTestCompile,
1058    ]
1059    if args.ark_aot:
1060        if args.run_pgo:
1061            steps.append(RegressTestPgo)
1062        steps.append(RegressTestAot)
1063    steps.append(RegressTestRun)
1064
1065    test_reports: List[TestReport] = []
1066    for step in steps:
1067        test_reports = step.run(args, test_reports)
1068
1069    stats = Stats(args, test_reports)
1070    stats.statistics()
1071    stats.print_result(args, steps)
1072    stats.print_failed_tests()
1073    new_failures = stats.get_new_failures()
1074    if new_failures is None:
1075        return 0
1076    if len(new_failures) > 0:
1077        msg = [f"Found {len(new_failures)} new failures:"]
1078        for failure in new_failures:
1079            msg.append(f"\t{failure.test_id}")
1080        output("\n".join(msg))
1081    else:
1082        output("No new failures have been found")
1083    return len(new_failures)
1084
1085
1086if __name__ == "__main__":
1087    sys.exit(main(parse_args()))
1088