1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (c) 2023 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import argparse 18import os 19import platform 20import re 21import subprocess 22import signal 23import sys 24 25 26DEFAULT_TIMEOUT = 60000 27 28 29def parse_args(): 30 parser = argparse.ArgumentParser() 31 parser.add_argument( 32 '--testcase-list', help='the file path of testcase list', required=True) 33 parser.add_argument( 34 '--build-dir', help='the build dir of es2abc', required=True) 35 parser.add_argument( 36 '--output-dir', help='the output path', required=True) 37 return parser.parse_args() 38 39 40def create_dir(out_dir, mode=0o775): 41 if os.path.isabs(out_dir) and not os.path.exists(out_dir): 42 os.makedirs(out_dir, mode) 43 44 45def output(retcode, msg): 46 if retcode == 0: 47 if msg != '': 48 print(str(msg)) 49 elif retcode == -6: 50 sys.stderr.write("Aborted (core dumped)") 51 elif retcode == -4: 52 sys.stderr.write("Aborted (core dumped)") 53 elif retcode == -11: 54 sys.stderr.write("Segmentation fault (core dumped)") 55 elif msg != '': 56 sys.stderr.write(str(msg)) 57 else: 58 sys.stderr.write("Unknown Error: " + str(retcode)) 59 60 61def run_command(cmd_args, timeout=DEFAULT_TIMEOUT): 62 proc = subprocess.Popen(cmd_args, 63 stderr=subprocess.PIPE, 64 stdout=subprocess.PIPE, 65 close_fds=True, 66 start_new_session=True) 67 cmd_string = " ".join(cmd_args) 68 code_format = 'utf-8' 69 if platform.system() == "Windows": 70 code_format = 'gbk' 71 72 try: 73 (output_res, errs) = proc.communicate(timeout=timeout) 74 ret_code = proc.poll() 75 76 if errs.decode(code_format, 'ignore') != '': 77 output(1, errs.decode(code_format, 'ignore')) 78 return 1 79 80 if ret_code and ret_code != 1: 81 code = ret_code 82 msg = f"Command {cmd_string}: \n" 83 msg += f"error: {str(errs.decode(code_format,'ignore'))}" 84 else: 85 code = 0 86 msg = str(output_res.decode(code_format, 'ignore')) 87 88 except subprocess.TimeoutExpired: 89 proc.kill() 90 proc.terminate() 91 os.kill(proc.pid, signal.SIGTERM) 92 code = 1 93 msg = f"Timeout:'{cmd_string}' timed out after' {str(timeout)} seconds" 94 except Exception as err: 95 code = 1 96 msg = f"{cmd_string}: unknown error: {str(err)}" 97 output(code, msg) 98 return code 99 100 101def search_dependency(file, directory): 102 for root, dirs, files in os.walk(directory, topdown=True): 103 for f in files: 104 if f == file: 105 return os.path.join(root, f) 106 return "FILE_NOT_FOUND" 107 108 109def collect_dependencies(tc_file, search_dir, traversed_dependencies): 110 dependencies = [] 111 traversed_dependencies.append(tc_file) 112 with open(tc_file, 'r', encoding='utf-8') as f: 113 content = f.read() 114 module_import_list = re.findall(r'(import|from)(?:\s*)\(?(\'(\.\/.*)\'|"(\.\/.*)")\)?', content) 115 116 for result in list(set(module_import_list)): 117 specifier = (result[2] if len(result[2]) != 0 else result[3]).lstrip('./') 118 if os.path.basename(tc_file) is not specifier: 119 dependency = search_dependency(specifier, search_dir) 120 if dependency == "FILE_NOT_FOUND": 121 continue 122 123 if dependency not in traversed_dependencies: 124 dependencies.extend(collect_dependencies(dependency, search_dir, 125 list(set(traversed_dependencies)))) 126 dependencies.append(dependency) 127 128 return dependencies 129 130 131def check_compile_mode(file): 132 with open(file, 'r', encoding='utf-8') as check_file: 133 content_file = check_file.read() 134 module_pattern = '((?:export|import)\s+(?:{[\s\S]+}|\*))|' 135 module_pattern += '(export\s+(?:let|const|var|function|class|default))|' 136 module_pattern += '(import\s+[\'\"].+[\'\"])' 137 module_mode_list = re.findall(module_pattern, content_file) 138 139 for module_mode in list(set(module_mode_list)): 140 if len(module_mode[0]) != 0 or len(module_mode[1]) != 0 or \ 141 len(module_mode[2]) != 0: 142 return True 143 144 if "flags: [module]" in content_file or "/module/" in file: 145 return True 146 147 return False 148 149 150def gen_merged_abc(tc_name_pre, build_dir, output_dir, dependencies): 151 cmd = [] 152 retcode = 0 153 proto_bin_suffix = "protoBin" 154 merge_abc_binary = os.path.join(build_dir, 'merge_abc') 155 out_merged_abc_name = f"{tc_name_pre}.abc" 156 157 for dependency in list(set(dependencies)): 158 out_dependency_dir = [] 159 out_merged_dependency_proto_dir = [] 160 if "/module/" in dependency: 161 out_dependency_dir = os.path.join(output_dir, 'disassember_tests', 'module') 162 163 out_merged_dependency_proto_dir = os.path.join(out_dependency_dir, tc_name_pre) 164 if not os.path.exists(out_merged_dependency_proto_dir): 165 create_dir(out_merged_dependency_proto_dir) 166 167 cmd = [merge_abc_binary, '--input', out_merged_dependency_proto_dir, '--suffix', proto_bin_suffix, 168 '--outputFilePath', out_dependency_dir, '--output', out_merged_abc_name] 169 170 retcode = run_command(cmd) 171 return retcode 172 173 174def gen_module_and_dynamicimport_abc(args, tc, tc_dir, out_dir): 175 cmd = [] 176 search_dir = tc_dir 177 retcode = 0 178 frontend_binary = os.path.join(args.build_dir, 'es2abc') 179 180 testcase, module_flag = tc.split(' ') 181 tc_name = os.path.basename(testcase) 182 tc_name_pre = tc_name[0: tc_name.rfind('.')] 183 tc_file = os.path.join(tc_dir, testcase) 184 out_file = os.path.join(out_dir, tc_name_pre + '.abc') 185 186 dependencies = collect_dependencies(tc_file, search_dir, []) 187 if list(set(dependencies)): 188 for dependency in list(set(dependencies)): 189 cmd = [] 190 out_dependency_abc_dir = [] 191 dependency_name = os.path.basename(dependency) 192 dependency_name_pre = os.path.splitext(dependency_name)[0] 193 194 # dependent output abc directory 195 if "/dynamic-import/" in dependency: 196 out_dependency_abc_dir = os.path.join(args.output_dir, 'disassember_tests', 'dynamic-import') 197 elif "/module/" in dependency: 198 out_dependency_abc_dir = os.path.join(args.output_dir, 'disassember_tests', 'module') 199 200 # generate output dependent-abc directory or output dependent-protobin directory 201 if not os.path.exists(out_dependency_abc_dir): 202 create_dir(out_dependency_abc_dir) 203 out_dependency_proto_dir = os.path.join(out_dependency_abc_dir, tc_name_pre) 204 if not os.path.exists(out_dependency_proto_dir): 205 create_dir(out_dependency_proto_dir) 206 207 # dependent output-protobin for module, dependent output-abc for dynamic-import 208 out_dependency_proto_file = os.path.join(out_dependency_abc_dir, tc_name_pre, 209 dependency_name_pre + '.protoBin') 210 out_dependency_aparted_abc_file = os.path.join(out_dependency_abc_dir, dependency_name_pre + '.abc') 211 212 if "/dynamic-import/" in dependency: 213 cmd = [frontend_binary, dependency, '--output', out_dependency_aparted_abc_file] 214 elif "/module/" in dependency: 215 cmd = [frontend_binary, dependency, '--outputProto', out_dependency_proto_file, '--merge-abc'] 216 217 if check_compile_mode(dependency): 218 cmd.append('--module') 219 220 retcode = run_command(cmd) 221 222 # generate self-protobin for module, generate self-abc for dynamic-import 223 if module_flag == '1' and "/module/" in tc_file: 224 if len(dependencies) != 0: 225 out_self_proto_file = os.path.join(out_dir, tc_name_pre, tc_name_pre + '.protoBin') 226 cmd = [frontend_binary, tc_file, '--outputProto', out_self_proto_file, '--merge-abc'] 227 else: 228 cmd = [frontend_binary, tc_file, '--output', out_file, '--merge-abc'] 229 elif module_flag == '0' and "/dynamic-import/" in tc_file: 230 cmd = [frontend_binary, tc_file, '--output', out_file] 231 232 if module_flag == '1': 233 cmd.append('--module') 234 235 retcode = run_command(cmd) 236 237 # generate merged-abc for module 238 if module_flag == '1' and "/module/" in tc_file and len(dependencies) != 0: 239 retcode = gen_merged_abc(tc_name_pre, args.build_dir, args.output_dir, dependencies) 240 241 return retcode 242 243 244def gen_abc(args): 245 retcode = 0 246 frontend_binary = os.path.join(args.build_dir, 'es2abc') 247 tc_dir = os.path.dirname(args.testcase_list) 248 out_dir_disassember_test = os.path.join(args.output_dir, 'disassember_tests') 249 250 if os.path.exists(out_dir_disassember_test): 251 subprocess.run(['rm', '-rf', out_dir_disassember_test]) 252 253 with open(args.testcase_list) as testcase_file: 254 for tc in testcase_file.readlines(): 255 tc = tc.strip() 256 annotation_index = tc.find('#') 257 if len(tc) == 0 or annotation_index == 0: 258 continue 259 260 cmd = [] 261 testcase, module_flag = tc.split(' ') 262 tc_name = os.path.basename(testcase) 263 tc_name_pre = tc_name[0: tc_name.rfind('.')] 264 tc_file = os.path.join(tc_dir, testcase) 265 out_dir = os.path.join(args.output_dir, 'disassember_tests', testcase[0: testcase.rfind('/')]) 266 if not os.path.exists(out_dir): 267 create_dir(out_dir) 268 out_file = os.path.join(out_dir, tc_name_pre + '.abc') 269 270 if ("/module/" in tc_file or "/dynamic-import/" in tc_file): 271 retcode = gen_module_and_dynamicimport_abc(args, tc, tc_dir, out_dir) 272 else: 273 cmd = [frontend_binary, tc_file, '--output', out_file] 274 if module_flag == '1': 275 cmd.append('--module') 276 277 if not os.path.exists(out_file): 278 retcode = run_command(cmd) 279 280 return retcode 281 282 283def get_pa_command(testcase, disasm_tool, output_dir): 284 tc_name = os.path.basename(testcase) 285 tc_name_pre = tc_name[0: tc_name.rfind('.')] 286 out_tc_abc_dir = os.path.join(output_dir, 'disassember_tests', testcase[0: testcase.rfind('/')]) 287 out_tc_abc_file = os.path.join(out_tc_abc_dir, tc_name_pre + '.abc') 288 out_tc_pa_file = os.path.join(out_tc_abc_dir, tc_name_pre + '.pa') 289 290 cmd = [disasm_tool, out_tc_abc_file, out_tc_pa_file] 291 return cmd 292 293 294def gen_pa(args): 295 retcode = 0 296 disasm_tool = os.path.join(args.output_dir, 'ark_disasm') 297 298 with open(args.testcase_list) as testcase_file: 299 for tc in testcase_file.readlines(): 300 tc = tc.strip() 301 annotation_index = tc.find('#') 302 if len(tc) == 0 or annotation_index == 0: 303 continue 304 305 tc_dir = os.path.dirname(args.testcase_list) 306 testcase, module_flag = tc.split(' ') 307 tc_file = os.path.join(tc_dir, testcase) 308 if not os.path.exists(tc_file): 309 retcode = 1 310 311 if "dynamic-import" in tc: 312 # generate self-pa for dynamic-import 313 cmd = get_pa_command(testcase, disasm_tool, args.output_dir) 314 315 # generate dependent-pa for dynamic-import 316 search_dir = tc_dir 317 dependencies = collect_dependencies(tc_file, search_dir, []) 318 if list(set(dependencies)): 319 for dependency in list(set(dependencies)): 320 dependency_testcase_name = os.path.basename(dependency) 321 dependency_testcase = os.path.join('dynamic-import', dependency_testcase_name) 322 cmd = get_pa_command(dependency_testcase, disasm_tool, args.output_dir) 323 retcode = run_command(cmd) 324 325 cmd = get_pa_command(testcase, disasm_tool, args.output_dir) 326 retcode = run_command(cmd) 327 328 return retcode 329 330 331def cmp_pa_file(testcase, tc_dir, output_dir): 332 retcode = 0 333 tc_name = os.path.basename(testcase) 334 tc_name = tc_name[0: tc_name.rfind('.')] 335 336 out_abc_dir = os.path.join(output_dir, 'disassember_tests', testcase[0: testcase.rfind('/')]) 337 out_tc_pa_file = os.path.join(out_abc_dir, tc_name + '.pa') 338 is_out_tc_pa_file = os.path.exists(out_tc_pa_file) 339 340 target_pa_dir = os.path.join(tc_dir, 'expected') 341 target_pa_file = os.path.join(target_pa_dir, tc_name + '.pa') 342 is_target_pa_file = os.path.exists(target_pa_file) 343 344 if not is_out_tc_pa_file or not is_target_pa_file: 345 retcode = 1 346 print(" --- pa not exist ---") 347 348 print(" --- Running disassember testcase: " + testcase + " ---") 349 with open(out_tc_pa_file, 'r') as tc_file, open(target_pa_file, 'r') as target_file: 350 for tc_line, target_line in zip(tc_file, target_file): 351 if tc_line != target_line: 352 print(" error --- out_pa_line != target_pa_line ---") 353 retcode = 1 354 return retcode 355 356 print(" --- Pass ---") 357 return retcode 358 359 360def check_pa(args): 361 retcode = 0 362 tc_dir = os.path.dirname(args.testcase_list) 363 364 with open(args.testcase_list) as testcase_file: 365 for tc in testcase_file.readlines(): 366 tc = tc.strip() 367 annotation_index = tc.find('#') 368 if len(tc) == 0 or annotation_index == 0: 369 continue 370 371 testcase, module_flag = tc.split(' ') 372 tc_file = os.path.join(tc_dir, testcase) 373 374 if "dynamic-import" in tc: 375 retcode = cmp_pa_file(testcase, tc_dir, args.output_dir) 376 search_dir = tc_dir 377 dependencies = collect_dependencies(tc_file, search_dir, []) 378 if list(set(dependencies)): 379 for dependency in list(set(dependencies)): 380 dependency_testcase_name = os.path.basename(dependency) 381 dependency_testcase = os.path.join('dynamic-import', dependency_testcase_name) 382 retcode = cmp_pa_file(dependency_testcase, tc_dir, args.output_dir) 383 else: 384 retcode = cmp_pa_file(testcase, tc_dir, args.output_dir) 385 386 return retcode 387 388 389def main(): 390 input_args = parse_args() 391 392 if gen_abc(input_args): 393 print("[0/0] --- gen abc fail ---") 394 return 395 396 if gen_pa(input_args): 397 print("[0/0] --- gen pa fail ---") 398 return 399 400 if check_pa(input_args): 401 print("[0/0] --- check pa fail ---") 402 return 403 404 405if __name__ == '__main__': 406 main() 407