1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4""" 5Copyright (c) 2024 Huawei Device Co., Ltd. 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17""" 18 19import argparse 20import difflib 21import logging 22import os 23import re 24import shutil 25import subprocess 26import sys 27import time 28 29from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED 30 31logging.basicConfig(stream=sys.stdout, level=logging.INFO) 32 33EXTENTION_JS = ".js" 34EXTENTION_TS = ".ts" 35EXTENTION_DTS = ".d.ts" 36EXTENTION_DETS = ".d.ets" 37 38 39class Extension: 40 TS = ".ts" 41 DTS = ".d.ts" 42 JS = ".js" 43 CJS = ".cjs" 44 MJS = ".mjs" 45 JSON = ".json" 46 ETS = ".ets" 47 DETS = ".d.ets" 48 49 50FILE_EXTENSION_LIST = [ 51 Extension.DETS, 52 Extension.ETS, 53 Extension.DTS, 54 Extension.TS, 55 Extension.JS, 56 Extension.JSON, 57] 58 59CONFIG_FILE_NAME = "obfConfig.json" 60PRINT_UNOBFUSCATION_SUFFIX = "keptNames.unobf.json" 61EXPECTED_UNOBFUSCATION_SUFFIX = "_expected_unobf.txt" 62TEST_TYPE = 'grammar' 63 64POOL_THREAD_COUNT = os.cpu_count() 65 66NO_NEED_RUN_WITH_NODE_FILES = [ 67 "name_as_export_api_1.ts", 68 "name_as_import_api_1.ts", 69 "ohmurl_test.ts", 70 "ohmurl_test_new.ts", 71 "export_struct_transform_class.ts", 72 "nosymbolIdentifierTest.ts", 73 '02_transformed_struct_01.ts', 74 'importFile1.ts', 75 'importFile2.ts', 76 'importFile3.ts', 77 'namespaceexport1.ts', 78 'noExistPath1.ts', 79 'noExistPath2.ts', 80 'noExistPath3.ts', 81 'innerTest1.ts', 82 'innerTest2.ts', 83 'innerTest3.ts', 84 'sameNameTest1.ts', 85 'sameNameTest2.ts', 86 'sameNameTest3.ts' 87] 88 89 90SOURCE_EXPECT_MAP = {} 91SOURCE_EXPECT_MAP['grammar'] = {'test/local': 'test/grammar'} 92SOURCE_EXPECT_MAP['combinations'] = {'test/local/combinations': 'test/combinations_expect'} 93# For debug 94VERBOSE = False 95 96 97def thread_pool(action, data_list): 98 all_task = [] 99 with ThreadPoolExecutor(max_workers=POOL_THREAD_COUNT) as pool: 100 for data in data_list: 101 all_task.append(pool.submit(action, data)) 102 103 wait(all_task, return_when=ALL_COMPLETED) 104 result = [i.result() for i in all_task] 105 106 return result 107 108 109def has_js_or_ts_files(file_name_list): 110 for one in file_name_list: 111 if ( 112 one.endswith(EXTENTION_JS) 113 or one.endswith(EXTENTION_TS) 114 or one.endswith(EXTENTION_DETS) 115 ): 116 return True 117 return False 118 119 120class FileSuffix: 121 def __init__(self, file_name, suffix): 122 self.file_name = file_name 123 self.suffix = suffix 124 125 126def get_file_suffix(file_path) -> FileSuffix: 127 for ext in FILE_EXTENSION_LIST: 128 if file_path.endswith(ext): 129 file_path_without_suffix = file_path[: -len(ext)] 130 return FileSuffix(file_path_without_suffix, ext) 131 return FileSuffix(file_path, "") 132 133 134def should_run_with_node(file_path): 135 for one in NO_NEED_RUN_WITH_NODE_FILES: 136 if file_path.endswith(one): 137 return False 138 139 if file_path.endswith(EXTENTION_JS): 140 return True 141 elif ( 142 file_path.endswith(EXTENTION_TS) 143 and not file_path.endswith(EXTENTION_DETS) 144 and not file_path.endswith(EXTENTION_DTS) 145 ): 146 return True 147 else: 148 return False 149 150 151def run_file_with_node(file_path): 152 cmd = "node ./node_modules/ts-node/dist/bin.js %s" % file_path 153 if VERBOSE: 154 logging.info(cmd) 155 return run_cmd(cmd) 156 157 158class Task: 159 def __init__(self, work_dir, obf_config_path, test_type): 160 self.work_dir = work_dir 161 self.obf_config_path = obf_config_path 162 self.test_type = test_type 163 164 165def obfuscate_dir(task: Task): 166 config_file_path = task.obf_config_path 167 work_dir = task.work_dir 168 test_type = task.test_type 169 cmd = "node lib/cli/SecHarmony.js %s --config-path %s --test-type %s" % ( 170 work_dir, 171 config_file_path, 172 test_type 173 ) 174 if VERBOSE: 175 logging.info("running test: %s", cmd) 176 return run_cmd(cmd) 177 178 179def list_all_js_or_ts_files(directory, file_list=None): 180 if file_list is None: 181 file_list = [] 182 183 files = sorted(os.listdir(directory)) 184 for one in files: 185 abs_path = os.path.join(directory, one) 186 if os.path.isdir(abs_path): 187 file_list = list_all_js_or_ts_files(abs_path, file_list) 188 elif abs_path.endswith(EXTENTION_JS): 189 file_list.append(abs_path) 190 elif abs_path.endswith(EXTENTION_TS): 191 file_list.append(abs_path) 192 elif abs_path.endswith(EXTENTION_DETS): 193 file_list.append(abs_path) 194 elif abs_path.endswith(EXTENTION_DTS): 195 file_list.append(abs_path) 196 197 return file_list 198 199 200def current_time_second(): 201 return time.time() 202 203 204def run_cmd(cmd): 205 cmd = re.split("\\s+", cmd) 206 process = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 207 timeout = 60 * 5 208 try: 209 out, err = process.communicate(timeout=timeout) 210 except subprocess.TimeoutExpired: 211 process.kill() 212 logging.info("running cmd timeout") 213 214 if err and VERBOSE: 215 logging.info("running cmd failed: ", err.decode("utf-8")) 216 217 if VERBOSE: 218 logging.info(cmd, "returncode = ", process.returncode) 219 return process.returncode 220 221 222class TimeUnit: 223 def __init__(self, tag, duration_second) -> None: 224 self.tag = tag 225 self.duration_second = duration_second 226 227 228class TestStat: 229 def __init__(self): 230 self.total_count = 0 231 self.success_count = 0 232 self.failed_count = 0 233 self.failed_cases = [] 234 235 236class Runner: 237 def __init__(self, test_filter, root_dir, test_type): 238 self.test_filter = test_filter 239 self.test_type = test_type 240 self.obfuscation_tasks = [] 241 self.obfuscate_result = TestStat() 242 self.run_with_node_result = TestStat() 243 self.content_compare_result = TestStat() 244 245 # For record duration 246 self.time_list = [] 247 self.tag = None 248 self.start_time = 0 249 250 # all source files in test directory 251 self.all_file_list = [] 252 253 self.obfscated_cache_root_dir = os.path.normpath(root_dir) 254 255 self.__init() 256 257 def start_record(self, tag): 258 self.start_time = current_time_second() 259 self.tag = tag 260 261 def end_record(self): 262 unit = TimeUnit(self.tag, current_time_second() - self.start_time) 263 self.time_list.append(unit) 264 self.tag = None 265 self.start_time = 0 266 267 def add_task(self, work_dir, obf_config_path): 268 task = Task(work_dir, obf_config_path, self.test_type) 269 self.obfuscation_tasks.append(task) 270 271 def obfuscate(self): 272 self.start_record("obfuscate") 273 274 results = thread_pool(obfuscate_dir, self.obfuscation_tasks) 275 276 for i, result in enumerate(results): 277 self.obfuscate_result.total_count += 1 278 if result == 0: 279 self.obfuscate_result.success_count += 1 280 else: 281 self.obfuscate_result.failed_count += 1 282 self.obfuscate_result.failed_cases.append(self.obfuscation_tasks[i]) 283 284 self.end_record() 285 self.clear_obfuscation_tasks() 286 287 def clear_obfuscation_tasks(self): 288 self.obfuscation_tasks.clear() 289 290 def run_with_node(self): 291 self.start_record("run_with_node") 292 self.__init_before_run_with_node() 293 294 run_with_node_list = [] 295 for one in self.all_file_list: 296 if should_run_with_node(one): 297 run_with_node_list.append(one) 298 299 results = thread_pool(run_file_with_node, run_with_node_list) 300 301 for i, result in enumerate(results): 302 self.run_with_node_result.total_count += 1 303 if result == 0: 304 self.run_with_node_result.success_count += 1 305 else: 306 self.run_with_node_result.failed_count += 1 307 logging.info( 308 "run with node failed: return code = %d, %s", results[i], run_with_node_list[i] 309 ) 310 self.run_with_node_result.failed_cases.append(run_with_node_list[i]) 311 312 self.end_record() 313 314 def content_compare(self): 315 self.start_record("content_compare") 316 # For content compare 317 for one in self.all_file_list: 318 self.__compare_content(one) 319 320 self.end_record() 321 322 def traverse_dirs(self, root_dir, root_config_path): 323 files = sorted(os.listdir(root_dir)) 324 325 current_config = None 326 if CONFIG_FILE_NAME in files: 327 current_config = os.path.join(root_dir, CONFIG_FILE_NAME) 328 329 target_config = current_config 330 if not target_config: 331 target_config = root_config_path 332 333 if has_js_or_ts_files(files): 334 self.add_task(root_dir, target_config) 335 return 336 337 for one in files: 338 sub_path = os.path.join(root_dir, one) 339 if os.path.isdir(sub_path): 340 self.traverse_dirs(sub_path, target_config) 341 342 def print_summary(self): 343 logging.info("------------------------------- Duration ----------------------------------------") 344 for unit in self.time_list: 345 logging.info("%s: %f", unit.tag, unit.duration_second) 346 347 logging.info("----------------------------- Grammar Test summary -----------------------------") 348 349 if self.obfuscate_result.failed_count > 0: 350 logging.info("obfuscation failed cases:") 351 for one in self.obfuscate_result.failed_cases: 352 logging.info(one) 353 354 if self.run_with_node_result.failed_count > 0: 355 logging.info("run with node failed cases:") 356 for one in self.run_with_node_result.failed_cases: 357 logging.info(one) 358 359 if self.content_compare_result.failed_count > 0: 360 logging.info("content compare failed cases:") 361 for one in self.content_compare_result.failed_cases: 362 logging.info(one) 363 364 logging.info("obfuscation passed : %d/%d failed: %d", self.obfuscate_result.success_count, 365 self.obfuscate_result.total_count, self.obfuscate_result.failed_count) 366 logging.info("run with node passed : %d/%d failed: %d", self.run_with_node_result.success_count, 367 self.run_with_node_result.total_count, self.run_with_node_result.failed_count) 368 logging.info("content compare passed: %d/%d failed: %d", self.content_compare_result.success_count, 369 self.content_compare_result.total_count, self.content_compare_result.failed_count) 370 371 def returncode(self): 372 if ( 373 self.obfuscate_result.failed_count 374 + self.run_with_node_result.failed_count 375 + self.content_compare_result.failed_count 376 > 0 377 ): 378 return -1 379 return 0 380 381 def has_failed_cases(self): 382 return (self.obfuscate_result.failed_count > 0) or \ 383 (self.run_with_node_result.failed_count > 0) or \ 384 (self.content_compare_result.failed_count > 0) 385 386 def get_expect_path(self, file_path): 387 base_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) 388 source_expect = SOURCE_EXPECT_MAP[self.test_type] 389 for source, expect in source_expect.items(): 390 if file_path.startswith(os.path.join(base_dir, source)): 391 return file_path.replace(source, expect) 392 393 def __filter_files(self, file_list): 394 target = [] 395 for one in file_list: 396 if re.match(f".*{self.test_filter}.*", one): 397 target.append(one) 398 return target 399 400 def __init(self): 401 try: 402 shutil.rmtree(self.obfscated_cache_root_dir) 403 logging.info("Directory %s deleted successfully", self.obfscated_cache_root_dir) 404 except FileNotFoundError: 405 logging.info("Directory %s not found", self.obfscated_cache_root_dir) 406 except OSError as e: 407 logging.info("Error deleting directory %s: %s", self.obfscated_cache_root_dir, e) 408 409 def __init_before_run_with_node(self): 410 self.all_file_list = list_all_js_or_ts_files(self.obfscated_cache_root_dir) 411 self.all_file_list = self.__filter_files(self.all_file_list) 412 413 if VERBOSE: 414 for i, one in enumerate(self.all_file_list): 415 logging.info("%d -> %s ", i, one) 416 417 def __compare_expected( 418 self, file_path, actual, expectation, actual_path, expectation_file 419 ): 420 self.content_compare_result.total_count += 1 421 if actual.strip() == expectation.strip(): 422 self.content_compare_result.success_count += 1 423 else: 424 self.content_compare_result.failed_count += 1 425 self.content_compare_result.failed_cases.append(file_path) 426 diff = difflib.ndiff(actual.splitlines(), expectation.splitlines()) 427 logging.info("compare file, actual path:") 428 logging.info(actual_path) 429 logging.info("compare file, expect path:") 430 logging.info(expectation_file) 431 logging.info("\n".join(diff)) 432 433 def __compare_content(self, file_path): 434 source_path = self.get_expect_path(file_path) 435 source_suffix = get_file_suffix(source_path) 436 expectation_path = f"{source_suffix.file_name}_expected.txt" 437 result_suffix = get_file_suffix(file_path) 438 result_cache_path = f"{result_suffix.file_name}.ts.cache.json" 439 expectation_cache_path = f"{source_suffix.file_name}_expected_cache.txt" 440 result_map_path = f"{result_suffix.file_name}.ts.map" 441 expectation_map_path = f"{source_suffix.file_name}_expected_map.txt" 442 443 result_unobfuscation_path = os.path.join( 444 os.path.dirname(result_suffix.file_name), PRINT_UNOBFUSCATION_SUFFIX 445 ) 446 expect_unobfuscation_path = ( 447 f"{source_suffix.file_name}{EXPECTED_UNOBFUSCATION_SUFFIX}" 448 ) 449 450 if os.path.exists(expectation_path): 451 with open(file_path) as actual_file, open( 452 expectation_path 453 ) as expectation_file: 454 actual = actual_file.read() 455 expectation = expectation_file.read() 456 self.__compare_expected( 457 file_path, actual, expectation, file_path, expectation_file 458 ) 459 460 if os.path.exists(expectation_cache_path) and os.path.exists(result_cache_path): 461 with open(result_cache_path) as actual_file, open( 462 expectation_cache_path 463 ) as expectation_file: 464 actual = actual_file.read() 465 expectation = expectation_file.read() 466 self.__compare_expected( 467 file_path, 468 actual, 469 expectation, 470 result_cache_path, 471 expectation_cache_path, 472 ) 473 474 if os.path.exists(expect_unobfuscation_path) and os.path.exists( 475 result_unobfuscation_path 476 ): 477 with open(result_unobfuscation_path) as actual_file, open( 478 expect_unobfuscation_path 479 ) as expectation_file: 480 actual = actual_file.read() 481 expectation = expectation_file.read() 482 self.__compare_expected( 483 file_path, 484 actual, 485 expectation, 486 result_unobfuscation_path, 487 expect_unobfuscation_path, 488 ) 489 490 if os.path.exists(expectation_map_path) and os.path.exists(result_map_path): 491 with open(result_map_path) as actual_file, open( 492 expectation_map_path 493 ) as expectation_file: 494 actual = actual_file.read() 495 expectation = expectation_file.read() 496 self.__compare_expected( 497 file_path, 498 actual, 499 expectation, 500 result_map_path, 501 expectation_map_path, 502 ) 503 504 505def parse_args(): 506 # Create an ArgumentParser object 507 parser = argparse.ArgumentParser( 508 description="Script to run arkguard grammar tests" 509 ) 510 511 # Define positional argument 512 parser.add_argument( 513 "--test-filter", 514 type=str, 515 default="", 516 help="only run the cases match the filter", 517 ) 518 519 # Parse the arguments from the command line 520 args = parser.parse_args() 521 522 return args 523 524 525def main(): 526 args = parse_args() 527 528 root_dir = os.path.join(os.path.dirname(__file__), "../test/grammar") 529 root_dir = os.path.normpath(root_dir) 530 531 root_config = os.path.join(root_dir, CONFIG_FILE_NAME) 532 533 local_root_dir = os.path.join(os.path.dirname(__file__), "../test/local") 534 runner = Runner(args.test_filter, local_root_dir, TEST_TYPE) 535 runner.traverse_dirs(root_dir, root_config) 536 runner.obfuscate() 537 runner.run_with_node() 538 runner.content_compare() 539 540 runner.print_summary() 541 return runner.returncode() 542 543if __name__ == '__main__': 544 code = main() 545 sys.exit(code) 546