• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4"""
5Copyright (c) 2024 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17"""
18
19import argparse
20import difflib
21import logging
22import os
23import re
24import shutil
25import subprocess
26import sys
27import time
28
29from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
30
31logging.basicConfig(level=logging.INFO)
32
33EXTENTION_JS = ".js"
34EXTENTION_TS = ".ts"
35EXTENTION_DTS = ".d.ts"
36EXTENTION_DETS = ".d.ets"
37
38
39class Extension:
40    TS = ".ts"
41    DTS = ".d.ts"
42    JS = ".js"
43    CJS = ".cjs"
44    MJS = ".mjs"
45    JSON = ".json"
46    ETS = ".ets"
47    DETS = ".d.ets"
48
49
50FILE_EXTENSION_LIST = [
51    Extension.DETS,
52    Extension.ETS,
53    Extension.DTS,
54    Extension.TS,
55    Extension.JS,
56    Extension.JSON,
57]
58
59CONFIG_FILE_NAME = "obfConfig.json"
60PRINT_UNOBFUSCATION_SUFFIX = "keptNames.unobf.json"
61EXPECTED_UNOBFUSCATION_SUFFIX = "_expected_unobf.txt"
62
63POOL_THREAD_COUNT = os.cpu_count()
64
65NO_NEED_RUN_WITH_NODE_FILES = [
66    "name_as_export_api_1.ts",
67    "name_as_import_api_1.ts",
68    "ohmurl_test.ts",
69    "ohmurl_test_new.ts",
70    "export_struct_transform_class.ts",
71    "nosymbolIdentifierTest.ts",
72]
73# For debug
74VERBOSE = False
75
76
77def thread_pool(action, data_list):
78    all_task = []
79    with ThreadPoolExecutor(max_workers=POOL_THREAD_COUNT) as pool:
80        for data in data_list:
81            all_task.append(pool.submit(action, data))
82
83        wait(all_task, return_when=ALL_COMPLETED)
84        result = [i.result() for i in all_task]
85
86        return result
87
88
89def has_js_or_ts_files(file_name_list):
90    for one in file_name_list:
91        if (
92            one.endswith(EXTENTION_JS)
93            or one.endswith(EXTENTION_TS)
94            or one.endswith(EXTENTION_DETS)
95        ):
96            return True
97    return False
98
99
100class FileSuffix:
101    def __init__(self, file_name, suffix):
102        self.file_name = file_name
103        self.suffix = suffix
104
105
106def get_file_suffix(file_path) -> FileSuffix:
107    for ext in FILE_EXTENSION_LIST:
108        if file_path.endswith(ext):
109            file_path_without_suffix = file_path[: -len(ext)]
110            return FileSuffix(file_path_without_suffix, ext)
111    return FileSuffix(file_path, "")
112
113
114def should_run_with_node(file_path):
115    for one in NO_NEED_RUN_WITH_NODE_FILES:
116        if file_path.endswith(one):
117            return False
118
119    if file_path.endswith(EXTENTION_JS):
120        return True
121    elif (
122        file_path.endswith(EXTENTION_TS)
123        and not file_path.endswith(EXTENTION_DETS)
124        and not file_path.endswith(EXTENTION_DTS)
125    ):
126        return True
127    else:
128        return False
129
130
131def run_file_with_node(file_path):
132    cmd = "node ./node_modules/ts-node/dist/bin.js %s" % file_path
133    if VERBOSE:
134        logging.info(cmd)
135    return run_cmd(cmd)
136
137
138class Task:
139    def __init__(self, work_dir, obf_config_path):
140        self.work_dir = work_dir
141        self.obf_config_path = obf_config_path
142
143
144def obfuscate_dir(task: Task):
145    config_file_path = task.obf_config_path
146    work_dir = task.work_dir
147    cmd = "node lib/cli/SecHarmony.js %s --config-path %s" % (
148        work_dir,
149        config_file_path,
150    )
151    if VERBOSE:
152        logging.info("running test: %s", cmd)
153    return run_cmd(cmd)
154
155
156def list_all_js_or_ts_files(directory, file_list=None):
157    if file_list is None:
158        file_list = []
159
160    files = sorted(os.listdir(directory))
161    for one in files:
162        abs_path = os.path.join(directory, one)
163        if os.path.isdir(abs_path):
164            file_list = list_all_js_or_ts_files(abs_path, file_list)
165        elif abs_path.endswith(EXTENTION_JS):
166            file_list.append(abs_path)
167        elif abs_path.endswith(EXTENTION_TS):
168            file_list.append(abs_path)
169        elif abs_path.endswith(EXTENTION_DETS):
170            file_list.append(abs_path)
171        elif abs_path.endswith(EXTENTION_DTS):
172            file_list.append(abs_path)
173
174    return file_list
175
176
177def current_time_second():
178    return time.time()
179
180
181def run_cmd(cmd):
182    cmd = re.split("\\s+", cmd)
183    process = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
184    timeout = 60 * 5
185    try:
186        out, err = process.communicate(timeout=timeout)
187    except subprocess.TimeoutExpired:
188        process.kill()
189        logging.info("running cmd timeout")
190
191    if err and VERBOSE:
192        logging.info("running cmd failed: ", err.decode("utf-8"))
193
194    if VERBOSE:
195        logging.info(cmd, "returncode = ", process.returncode)
196    return process.returncode
197
198
199class TimeUnit:
200    def __init__(self, tag, duration_second) -> None:
201        self.tag = tag
202        self.duration_second = duration_second
203
204
205class TestStat:
206    def __init__(self):
207        self.total_count = 0
208        self.success_count = 0
209        self.failed_count = 0
210        self.failed_cases = []
211
212
213class Runner:
214    def __init__(self, test_filter):
215        self.test_filter = test_filter
216
217        self.obfuscation_tasks = []
218        self.obfuscate_result = TestStat()
219        self.run_with_node_result = TestStat()
220        self.content_compare_result = TestStat()
221
222        # For record duration
223        self.time_list = []
224        self.tag = None
225        self.start_time = 0
226
227        # all source files in test directory
228        self.all_file_list = []
229
230        root_dir = os.path.join(os.path.dirname(__file__), "../test/local")
231        self.obfscated_cache_root_dir = os.path.normpath(root_dir)
232
233        self.__init()
234
235    def start_record(self, tag):
236        self.start_time = current_time_second()
237        self.tag = tag
238
239    def end_record(self):
240        unit = TimeUnit(self.tag, current_time_second() - self.start_time)
241        self.time_list.append(unit)
242        self.tag = None
243        self.start_time = 0
244
245    def add_task(self, work_dir, obf_config_path):
246        task = Task(work_dir, obf_config_path)
247        self.obfuscation_tasks.append(task)
248
249    def obfuscate(self):
250        self.start_record("obfuscate")
251
252        results = thread_pool(obfuscate_dir, self.obfuscation_tasks)
253
254        for i, result in enumerate(results):
255            self.obfuscate_result.total_count += 1
256            if result == 0:
257                self.obfuscate_result.success_count += 1
258            else:
259                self.obfuscate_result.failed_count += 1
260                self.obfuscate_result.failed_cases.append(self.obfuscation_tasks[i])
261
262        self.end_record()
263
264    def run_with_node(self):
265        self.start_record("run_with_node")
266        self.__init_before_run_with_node()
267
268        run_with_node_list = []
269        for one in self.all_file_list:
270            if should_run_with_node(one):
271                run_with_node_list.append(one)
272
273        results = thread_pool(run_file_with_node, run_with_node_list)
274
275        for i, result in enumerate(results):
276            self.run_with_node_result.total_count += 1
277            if result == 0:
278                self.run_with_node_result.success_count += 1
279            else:
280                self.run_with_node_result.failed_count += 1
281                logging.info(
282                    "run with node failed: return code = %d, %s", results[i], run_with_node_list[i]
283                )
284                self.run_with_node_result.failed_cases.append(run_with_node_list[i])
285
286        self.end_record()
287
288    def content_compare(self):
289        self.start_record("content_compare")
290        # For content compare
291        for one in self.all_file_list:
292            self.__compare_content(one)
293
294        self.end_record()
295
296    def traverse_dirs(self, root_dir, root_config_path):
297        files = sorted(os.listdir(root_dir))
298
299        current_config = None
300        if CONFIG_FILE_NAME in files:
301            current_config = os.path.join(root_dir, CONFIG_FILE_NAME)
302
303        target_config = current_config
304        if not target_config:
305            target_config = root_config_path
306
307        if has_js_or_ts_files(files):
308            self.add_task(root_dir, target_config)
309            return
310
311        for one in files:
312            sub_path = os.path.join(root_dir, one)
313            if os.path.isdir(sub_path):
314                self.traverse_dirs(sub_path, target_config)
315
316    def print_summary(self):
317        logging.info("------------------------------- Duration ----------------------------------------")
318        for unit in self.time_list:
319            logging.info("%s: %f", unit.tag, unit.duration_second)
320
321        logging.info("----------------------------- Grammar Test summary -----------------------------")
322
323        if self.obfuscate_result.failed_count > 0:
324            logging.info("obfuscation failed cases:")
325            for one in self.obfuscate_result.failed_cases:
326                logging.info("->", one)
327
328        if self.run_with_node_result.failed_count > 0:
329            logging.info("run with node failed cases:")
330            for one in self.run_with_node_result.failed_cases:
331                logging.info("->", one)
332
333        if self.content_compare_result.failed_count > 0:
334            logging.info("content compare failed cases:")
335            for one in self.content_compare_result.failed_cases:
336                logging.info("->", one)
337
338        logging.info("obfuscation passed    : %d/%d failed: %d", self.obfuscate_result.success_count,
339            self.obfuscate_result.total_count, self.obfuscate_result.failed_count)
340        logging.info("run with node passed  : %d/%d failed: %d", self.run_with_node_result.success_count,
341            self.run_with_node_result.total_count, self.run_with_node_result.failed_count)
342        logging.info("content compare passed: %d/%d failed: %d", self.content_compare_result.success_count,
343            self.content_compare_result.total_count, self.content_compare_result.failed_count)
344
345    def returncode(self):
346        if (
347            self.obfuscate_result.failed_count
348            + self.run_with_node_result.failed_count
349            + self.content_compare_result.failed_count
350            > 0
351        ):
352            return -1
353        return 0
354
355    def __filter_files(self, file_list):
356        target = []
357        for one in file_list:
358            if re.match(f".*{self.test_filter}.*", one):
359                target.append(one)
360        return target
361
362    def __init(self):
363        try:
364            shutil.rmtree(self.obfscated_cache_root_dir)
365            logging.info("Directory %s deleted successfully", self.obfscated_cache_root_dir)
366        except FileNotFoundError:
367            logging.info("Directory %s not found", self.obfscated_cache_root_dir)
368        except OSError as e:
369            logging.info("Error deleting directory %s: %s", self.obfscated_cache_root_dir, e)
370
371    def __init_before_run_with_node(self):
372        self.all_file_list = list_all_js_or_ts_files(self.obfscated_cache_root_dir)
373        self.all_file_list = self.__filter_files(self.all_file_list)
374
375        if VERBOSE:
376            for i, one in enumerate(self.all_file_list):
377                logging.info("%d -> %s ", i, one)
378
379    def __compare_expected(
380        self, file_path, actual, expectation, actual_path, expectation_file
381    ):
382        self.content_compare_result.total_count += 1
383        if actual.strip() == expectation.strip():
384            self.content_compare_result.success_count += 1
385        else:
386            self.content_compare_result.failed_count += 1
387            self.content_compare_result.failed_cases.append(file_path)
388            diff = difflib.ndiff(actual.splitlines(), expectation.splitlines())
389            logging.info("compare file, actual path:", actual_path)
390            logging.info("compare file, expect path:", expectation_file)
391            logging.info("\n".join(diff))
392
393    def __compare_content(self, file_path):
394        source_path = file_path.replace("/test/local/", "/test/grammar/")
395        source_suffix = get_file_suffix(source_path)
396        expectation_path = f"{source_suffix.file_name}_expected.txt"
397        result_suffix = get_file_suffix(file_path)
398        result_cache_path = f"{result_suffix.file_name}.ts.cache.json"
399        expectation_cache_path = f"{source_suffix.file_name}_expected_cache.txt"
400
401        result_unobfuscation_path = os.path.join(
402            os.path.dirname(result_suffix.file_name), PRINT_UNOBFUSCATION_SUFFIX
403        )
404        expect_unobfuscation_path = (
405            f"{source_suffix.file_name}{EXPECTED_UNOBFUSCATION_SUFFIX}"
406        )
407
408        if os.path.exists(expectation_path):
409            with open(file_path) as actual_file, open(
410                expectation_path
411            ) as expectation_file:
412                actual = actual_file.read()
413                expectation = expectation_file.read()
414                self.__compare_expected(
415                    file_path, actual, expectation, file_path, expectation_file
416                )
417
418        if os.path.exists(expectation_cache_path) and os.path.exists(result_cache_path):
419            with open(result_cache_path) as actual_file, open(
420                expectation_cache_path
421            ) as expectation_file:
422                actual = actual_file.read()
423                expectation = expectation_file.read()
424                self.__compare_expected(
425                    file_path,
426                    actual,
427                    expectation,
428                    result_cache_path,
429                    expectation_cache_path,
430                )
431
432        if os.path.exists(expect_unobfuscation_path) and os.path.exists(
433            result_unobfuscation_path
434        ):
435            with open(result_unobfuscation_path) as actual_file, open(
436                expect_unobfuscation_path
437            ) as expectation_file:
438                actual = actual_file.read()
439                expectation = expectation_file.read()
440                self.__compare_expected(
441                    file_path,
442                    actual,
443                    expectation,
444                    result_unobfuscation_path,
445                    expect_unobfuscation_path,
446                )
447
448
449
450def parse_args():
451    # Create an ArgumentParser object
452    parser = argparse.ArgumentParser(
453        description="Script to run arkguard grammar tests"
454    )
455
456    # Define positional argument
457    parser.add_argument(
458        "--test-filter",
459        type=str,
460        default="",
461        help="only run the cases match the filter",
462    )
463
464    # Parse the arguments from the command line
465    args = parser.parse_args()
466
467    return args
468
469
470def main():
471    args = parse_args()
472
473    root_dir = os.path.join(os.path.dirname(__file__), "../test/grammar")
474    root_dir = os.path.normpath(root_dir)
475
476    root_config = os.path.join(root_dir, CONFIG_FILE_NAME)
477
478    runner = Runner(args.test_filter)
479    runner.traverse_dirs(root_dir, root_config)
480    runner.obfuscate()
481    runner.run_with_node()
482    runner.content_compare()
483
484    runner.print_summary()
485    return runner.returncode()
486
487if __name__ == '__main__':
488    code = main()
489    sys.exit(code)
490