1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4""" 5Copyright (c) 2024 Huawei Device Co., Ltd. 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17""" 18 19import argparse 20import difflib 21import logging 22import os 23import re 24import shutil 25import subprocess 26import sys 27import time 28 29from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED 30 31logging.basicConfig(level=logging.INFO) 32 33EXTENTION_JS = ".js" 34EXTENTION_TS = ".ts" 35EXTENTION_DTS = ".d.ts" 36EXTENTION_DETS = ".d.ets" 37 38 39class Extension: 40 TS = ".ts" 41 DTS = ".d.ts" 42 JS = ".js" 43 CJS = ".cjs" 44 MJS = ".mjs" 45 JSON = ".json" 46 ETS = ".ets" 47 DETS = ".d.ets" 48 49 50FILE_EXTENSION_LIST = [ 51 Extension.DETS, 52 Extension.ETS, 53 Extension.DTS, 54 Extension.TS, 55 Extension.JS, 56 Extension.JSON, 57] 58 59CONFIG_FILE_NAME = "obfConfig.json" 60PRINT_UNOBFUSCATION_SUFFIX = "keptNames.unobf.json" 61EXPECTED_UNOBFUSCATION_SUFFIX = "_expected_unobf.txt" 62 63POOL_THREAD_COUNT = os.cpu_count() 64 65NO_NEED_RUN_WITH_NODE_FILES = [ 66 "name_as_export_api_1.ts", 67 "name_as_import_api_1.ts", 68 "ohmurl_test.ts", 69 "ohmurl_test_new.ts", 70 "export_struct_transform_class.ts", 71 "nosymbolIdentifierTest.ts", 72] 73# For debug 74VERBOSE = False 75 76 77def thread_pool(action, data_list): 78 all_task = [] 79 with ThreadPoolExecutor(max_workers=POOL_THREAD_COUNT) as pool: 80 for data in data_list: 81 all_task.append(pool.submit(action, data)) 82 83 wait(all_task, return_when=ALL_COMPLETED) 84 result = [i.result() for i in all_task] 85 86 return result 87 88 89def has_js_or_ts_files(file_name_list): 90 for one in file_name_list: 91 if ( 92 one.endswith(EXTENTION_JS) 93 or one.endswith(EXTENTION_TS) 94 or one.endswith(EXTENTION_DETS) 95 ): 96 return True 97 return False 98 99 100class FileSuffix: 101 def __init__(self, file_name, suffix): 102 self.file_name = file_name 103 self.suffix = suffix 104 105 106def get_file_suffix(file_path) -> FileSuffix: 107 for ext in FILE_EXTENSION_LIST: 108 if file_path.endswith(ext): 109 file_path_without_suffix = file_path[: -len(ext)] 110 return FileSuffix(file_path_without_suffix, ext) 111 return FileSuffix(file_path, "") 112 113 114def should_run_with_node(file_path): 115 for one in NO_NEED_RUN_WITH_NODE_FILES: 116 if file_path.endswith(one): 117 return False 118 119 if file_path.endswith(EXTENTION_JS): 120 return True 121 elif ( 122 file_path.endswith(EXTENTION_TS) 123 and not file_path.endswith(EXTENTION_DETS) 124 and not file_path.endswith(EXTENTION_DTS) 125 ): 126 return True 127 else: 128 return False 129 130 131def run_file_with_node(file_path): 132 cmd = "node ./node_modules/ts-node/dist/bin.js %s" % file_path 133 if VERBOSE: 134 logging.info(cmd) 135 return run_cmd(cmd) 136 137 138class Task: 139 def __init__(self, work_dir, obf_config_path): 140 self.work_dir = work_dir 141 self.obf_config_path = obf_config_path 142 143 144def obfuscate_dir(task: Task): 145 config_file_path = task.obf_config_path 146 work_dir = task.work_dir 147 cmd = "node lib/cli/SecHarmony.js %s --config-path %s" % ( 148 work_dir, 149 config_file_path, 150 ) 151 if VERBOSE: 152 logging.info("running test: %s", cmd) 153 return run_cmd(cmd) 154 155 156def list_all_js_or_ts_files(directory, file_list=None): 157 if file_list is None: 158 file_list = [] 159 160 files = sorted(os.listdir(directory)) 161 for one in files: 162 abs_path = os.path.join(directory, one) 163 if os.path.isdir(abs_path): 164 file_list = list_all_js_or_ts_files(abs_path, file_list) 165 elif abs_path.endswith(EXTENTION_JS): 166 file_list.append(abs_path) 167 elif abs_path.endswith(EXTENTION_TS): 168 file_list.append(abs_path) 169 elif abs_path.endswith(EXTENTION_DETS): 170 file_list.append(abs_path) 171 elif abs_path.endswith(EXTENTION_DTS): 172 file_list.append(abs_path) 173 174 return file_list 175 176 177def current_time_second(): 178 return time.time() 179 180 181def run_cmd(cmd): 182 cmd = re.split("\\s+", cmd) 183 process = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 184 timeout = 60 * 5 185 try: 186 out, err = process.communicate(timeout=timeout) 187 except subprocess.TimeoutExpired: 188 process.kill() 189 logging.info("running cmd timeout") 190 191 if err and VERBOSE: 192 logging.info("running cmd failed: ", err.decode("utf-8")) 193 194 if VERBOSE: 195 logging.info(cmd, "returncode = ", process.returncode) 196 return process.returncode 197 198 199class TimeUnit: 200 def __init__(self, tag, duration_second) -> None: 201 self.tag = tag 202 self.duration_second = duration_second 203 204 205class TestStat: 206 def __init__(self): 207 self.total_count = 0 208 self.success_count = 0 209 self.failed_count = 0 210 self.failed_cases = [] 211 212 213class Runner: 214 def __init__(self, test_filter): 215 self.test_filter = test_filter 216 217 self.obfuscation_tasks = [] 218 self.obfuscate_result = TestStat() 219 self.run_with_node_result = TestStat() 220 self.content_compare_result = TestStat() 221 222 # For record duration 223 self.time_list = [] 224 self.tag = None 225 self.start_time = 0 226 227 # all source files in test directory 228 self.all_file_list = [] 229 230 root_dir = os.path.join(os.path.dirname(__file__), "../test/local") 231 self.obfscated_cache_root_dir = os.path.normpath(root_dir) 232 233 self.__init() 234 235 def start_record(self, tag): 236 self.start_time = current_time_second() 237 self.tag = tag 238 239 def end_record(self): 240 unit = TimeUnit(self.tag, current_time_second() - self.start_time) 241 self.time_list.append(unit) 242 self.tag = None 243 self.start_time = 0 244 245 def add_task(self, work_dir, obf_config_path): 246 task = Task(work_dir, obf_config_path) 247 self.obfuscation_tasks.append(task) 248 249 def obfuscate(self): 250 self.start_record("obfuscate") 251 252 results = thread_pool(obfuscate_dir, self.obfuscation_tasks) 253 254 for i, result in enumerate(results): 255 self.obfuscate_result.total_count += 1 256 if result == 0: 257 self.obfuscate_result.success_count += 1 258 else: 259 self.obfuscate_result.failed_count += 1 260 self.obfuscate_result.failed_cases.append(self.obfuscation_tasks[i]) 261 262 self.end_record() 263 264 def run_with_node(self): 265 self.start_record("run_with_node") 266 self.__init_before_run_with_node() 267 268 run_with_node_list = [] 269 for one in self.all_file_list: 270 if should_run_with_node(one): 271 run_with_node_list.append(one) 272 273 results = thread_pool(run_file_with_node, run_with_node_list) 274 275 for i, result in enumerate(results): 276 self.run_with_node_result.total_count += 1 277 if result == 0: 278 self.run_with_node_result.success_count += 1 279 else: 280 self.run_with_node_result.failed_count += 1 281 logging.info( 282 "run with node failed: return code = %d, %s", results[i], run_with_node_list[i] 283 ) 284 self.run_with_node_result.failed_cases.append(run_with_node_list[i]) 285 286 self.end_record() 287 288 def content_compare(self): 289 self.start_record("content_compare") 290 # For content compare 291 for one in self.all_file_list: 292 self.__compare_content(one) 293 294 self.end_record() 295 296 def traverse_dirs(self, root_dir, root_config_path): 297 files = sorted(os.listdir(root_dir)) 298 299 current_config = None 300 if CONFIG_FILE_NAME in files: 301 current_config = os.path.join(root_dir, CONFIG_FILE_NAME) 302 303 target_config = current_config 304 if not target_config: 305 target_config = root_config_path 306 307 if has_js_or_ts_files(files): 308 self.add_task(root_dir, target_config) 309 return 310 311 for one in files: 312 sub_path = os.path.join(root_dir, one) 313 if os.path.isdir(sub_path): 314 self.traverse_dirs(sub_path, target_config) 315 316 def print_summary(self): 317 logging.info("------------------------------- Duration ----------------------------------------") 318 for unit in self.time_list: 319 logging.info("%s: %f", unit.tag, unit.duration_second) 320 321 logging.info("----------------------------- Grammar Test summary -----------------------------") 322 323 if self.obfuscate_result.failed_count > 0: 324 logging.info("obfuscation failed cases:") 325 for one in self.obfuscate_result.failed_cases: 326 logging.info("->", one) 327 328 if self.run_with_node_result.failed_count > 0: 329 logging.info("run with node failed cases:") 330 for one in self.run_with_node_result.failed_cases: 331 logging.info("->", one) 332 333 if self.content_compare_result.failed_count > 0: 334 logging.info("content compare failed cases:") 335 for one in self.content_compare_result.failed_cases: 336 logging.info("->", one) 337 338 logging.info("obfuscation passed : %d/%d failed: %d", self.obfuscate_result.success_count, 339 self.obfuscate_result.total_count, self.obfuscate_result.failed_count) 340 logging.info("run with node passed : %d/%d failed: %d", self.run_with_node_result.success_count, 341 self.run_with_node_result.total_count, self.run_with_node_result.failed_count) 342 logging.info("content compare passed: %d/%d failed: %d", self.content_compare_result.success_count, 343 self.content_compare_result.total_count, self.content_compare_result.failed_count) 344 345 def returncode(self): 346 if ( 347 self.obfuscate_result.failed_count 348 + self.run_with_node_result.failed_count 349 + self.content_compare_result.failed_count 350 > 0 351 ): 352 return -1 353 return 0 354 355 def __filter_files(self, file_list): 356 target = [] 357 for one in file_list: 358 if re.match(f".*{self.test_filter}.*", one): 359 target.append(one) 360 return target 361 362 def __init(self): 363 try: 364 shutil.rmtree(self.obfscated_cache_root_dir) 365 logging.info("Directory %s deleted successfully", self.obfscated_cache_root_dir) 366 except FileNotFoundError: 367 logging.info("Directory %s not found", self.obfscated_cache_root_dir) 368 except OSError as e: 369 logging.info("Error deleting directory %s: %s", self.obfscated_cache_root_dir, e) 370 371 def __init_before_run_with_node(self): 372 self.all_file_list = list_all_js_or_ts_files(self.obfscated_cache_root_dir) 373 self.all_file_list = self.__filter_files(self.all_file_list) 374 375 if VERBOSE: 376 for i, one in enumerate(self.all_file_list): 377 logging.info("%d -> %s ", i, one) 378 379 def __compare_expected( 380 self, file_path, actual, expectation, actual_path, expectation_file 381 ): 382 self.content_compare_result.total_count += 1 383 if actual.strip() == expectation.strip(): 384 self.content_compare_result.success_count += 1 385 else: 386 self.content_compare_result.failed_count += 1 387 self.content_compare_result.failed_cases.append(file_path) 388 diff = difflib.ndiff(actual.splitlines(), expectation.splitlines()) 389 logging.info("compare file, actual path:", actual_path) 390 logging.info("compare file, expect path:", expectation_file) 391 logging.info("\n".join(diff)) 392 393 def __compare_content(self, file_path): 394 source_path = file_path.replace("/test/local/", "/test/grammar/") 395 source_suffix = get_file_suffix(source_path) 396 expectation_path = f"{source_suffix.file_name}_expected.txt" 397 result_suffix = get_file_suffix(file_path) 398 result_cache_path = f"{result_suffix.file_name}.ts.cache.json" 399 expectation_cache_path = f"{source_suffix.file_name}_expected_cache.txt" 400 401 result_unobfuscation_path = os.path.join( 402 os.path.dirname(result_suffix.file_name), PRINT_UNOBFUSCATION_SUFFIX 403 ) 404 expect_unobfuscation_path = ( 405 f"{source_suffix.file_name}{EXPECTED_UNOBFUSCATION_SUFFIX}" 406 ) 407 408 if os.path.exists(expectation_path): 409 with open(file_path) as actual_file, open( 410 expectation_path 411 ) as expectation_file: 412 actual = actual_file.read() 413 expectation = expectation_file.read() 414 self.__compare_expected( 415 file_path, actual, expectation, file_path, expectation_file 416 ) 417 418 if os.path.exists(expectation_cache_path) and os.path.exists(result_cache_path): 419 with open(result_cache_path) as actual_file, open( 420 expectation_cache_path 421 ) as expectation_file: 422 actual = actual_file.read() 423 expectation = expectation_file.read() 424 self.__compare_expected( 425 file_path, 426 actual, 427 expectation, 428 result_cache_path, 429 expectation_cache_path, 430 ) 431 432 if os.path.exists(expect_unobfuscation_path) and os.path.exists( 433 result_unobfuscation_path 434 ): 435 with open(result_unobfuscation_path) as actual_file, open( 436 expect_unobfuscation_path 437 ) as expectation_file: 438 actual = actual_file.read() 439 expectation = expectation_file.read() 440 self.__compare_expected( 441 file_path, 442 actual, 443 expectation, 444 result_unobfuscation_path, 445 expect_unobfuscation_path, 446 ) 447 448 449 450def parse_args(): 451 # Create an ArgumentParser object 452 parser = argparse.ArgumentParser( 453 description="Script to run arkguard grammar tests" 454 ) 455 456 # Define positional argument 457 parser.add_argument( 458 "--test-filter", 459 type=str, 460 default="", 461 help="only run the cases match the filter", 462 ) 463 464 # Parse the arguments from the command line 465 args = parser.parse_args() 466 467 return args 468 469 470def main(): 471 args = parse_args() 472 473 root_dir = os.path.join(os.path.dirname(__file__), "../test/grammar") 474 root_dir = os.path.normpath(root_dir) 475 476 root_config = os.path.join(root_dir, CONFIG_FILE_NAME) 477 478 runner = Runner(args.test_filter) 479 runner.traverse_dirs(root_dir, root_config) 480 runner.obfuscate() 481 runner.run_with_node() 482 runner.content_compare() 483 484 runner.print_summary() 485 return runner.returncode() 486 487if __name__ == '__main__': 488 code = main() 489 sys.exit(code) 490