• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4"""
5Copyright (c) 2024 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17"""
18
19import argparse
20import difflib
21import logging
22import os
23import re
24import shutil
25import subprocess
26import sys
27import time
28
29from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
30
31logging.basicConfig(stream=sys.stdout, level=logging.INFO)
32
33EXTENTION_JS = ".js"
34EXTENTION_TS = ".ts"
35EXTENTION_DTS = ".d.ts"
36EXTENTION_DETS = ".d.ets"
37
38
39class Extension:
40    TS = ".ts"
41    DTS = ".d.ts"
42    JS = ".js"
43    CJS = ".cjs"
44    MJS = ".mjs"
45    JSON = ".json"
46    ETS = ".ets"
47    DETS = ".d.ets"
48
49
50FILE_EXTENSION_LIST = [
51    Extension.DETS,
52    Extension.ETS,
53    Extension.DTS,
54    Extension.TS,
55    Extension.JS,
56    Extension.JSON,
57]
58
59CONFIG_FILE_NAME = "obfConfig.json"
60PRINT_UNOBFUSCATION_SUFFIX = "keptNames.unobf.json"
61EXPECTED_UNOBFUSCATION_SUFFIX = "_expected_unobf.txt"
62TEST_TYPE = 'grammar'
63
64POOL_THREAD_COUNT = os.cpu_count()
65
66NO_NEED_RUN_WITH_NODE_FILES = [
67    "name_as_export_api_1.ts",
68    "name_as_import_api_1.ts",
69    "ohmurl_test.ts",
70    "ohmurl_test_new.ts",
71    "export_struct_transform_class.ts",
72    "nosymbolIdentifierTest.ts",
73    '02_transformed_struct_01.ts',
74    'importFile1.ts',
75    'importFile2.ts',
76    'importFile3.ts',
77    'namespaceexport1.ts',
78    'noExistPath1.ts',
79    'noExistPath2.ts',
80    'noExistPath3.ts',
81    'innerTest1.ts',
82    'innerTest2.ts',
83    'innerTest3.ts',
84    'sameNameTest1.ts',
85    'sameNameTest2.ts',
86    'sameNameTest3.ts'
87]
88
89
90SOURCE_EXPECT_MAP = {}
91SOURCE_EXPECT_MAP['grammar'] = {'test/local': 'test/grammar'}
92SOURCE_EXPECT_MAP['combinations'] = {'test/local/combinations': 'test/combinations_expect'}
93# For debug
94VERBOSE = False
95
96
97def thread_pool(action, data_list):
98    all_task = []
99    with ThreadPoolExecutor(max_workers=POOL_THREAD_COUNT) as pool:
100        for data in data_list:
101            all_task.append(pool.submit(action, data))
102
103        wait(all_task, return_when=ALL_COMPLETED)
104        result = [i.result() for i in all_task]
105
106        return result
107
108
109def has_js_or_ts_files(file_name_list):
110    for one in file_name_list:
111        if (
112            one.endswith(EXTENTION_JS)
113            or one.endswith(EXTENTION_TS)
114            or one.endswith(EXTENTION_DETS)
115        ):
116            return True
117    return False
118
119
120class FileSuffix:
121    def __init__(self, file_name, suffix):
122        self.file_name = file_name
123        self.suffix = suffix
124
125
126def get_file_suffix(file_path) -> FileSuffix:
127    for ext in FILE_EXTENSION_LIST:
128        if file_path.endswith(ext):
129            file_path_without_suffix = file_path[: -len(ext)]
130            return FileSuffix(file_path_without_suffix, ext)
131    return FileSuffix(file_path, "")
132
133
134def should_run_with_node(file_path):
135    for one in NO_NEED_RUN_WITH_NODE_FILES:
136        if file_path.endswith(one):
137            return False
138
139    if file_path.endswith(EXTENTION_JS):
140        return True
141    elif (
142        file_path.endswith(EXTENTION_TS)
143        and not file_path.endswith(EXTENTION_DETS)
144        and not file_path.endswith(EXTENTION_DTS)
145    ):
146        return True
147    else:
148        return False
149
150
151def run_file_with_node(file_path):
152    cmd = "node ./node_modules/ts-node/dist/bin.js %s" % file_path
153    if VERBOSE:
154        logging.info(cmd)
155    return run_cmd(cmd)
156
157
158class Task:
159    def __init__(self, work_dir, obf_config_path, test_type):
160        self.work_dir = work_dir
161        self.obf_config_path = obf_config_path
162        self.test_type = test_type
163
164
165def obfuscate_dir(task: Task):
166    config_file_path = task.obf_config_path
167    work_dir = task.work_dir
168    test_type = task.test_type
169    cmd = "node lib/cli/SecHarmony.js %s --config-path %s --test-type %s" % (
170        work_dir,
171        config_file_path,
172        test_type
173    )
174    if VERBOSE:
175        logging.info("running test: %s", cmd)
176    return run_cmd(cmd)
177
178
179def list_all_js_or_ts_files(directory, file_list=None):
180    if file_list is None:
181        file_list = []
182
183    files = sorted(os.listdir(directory))
184    for one in files:
185        abs_path = os.path.join(directory, one)
186        if os.path.isdir(abs_path):
187            file_list = list_all_js_or_ts_files(abs_path, file_list)
188        elif abs_path.endswith(EXTENTION_JS):
189            file_list.append(abs_path)
190        elif abs_path.endswith(EXTENTION_TS):
191            file_list.append(abs_path)
192        elif abs_path.endswith(EXTENTION_DETS):
193            file_list.append(abs_path)
194        elif abs_path.endswith(EXTENTION_DTS):
195            file_list.append(abs_path)
196
197    return file_list
198
199
200def current_time_second():
201    return time.time()
202
203
204def run_cmd(cmd):
205    cmd = re.split("\\s+", cmd)
206    process = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
207    timeout = 60 * 5
208    try:
209        out, err = process.communicate(timeout=timeout)
210    except subprocess.TimeoutExpired:
211        process.kill()
212        logging.info("running cmd timeout")
213
214    if err and VERBOSE:
215        logging.info("running cmd failed: ", err.decode("utf-8"))
216
217    if VERBOSE:
218        logging.info(cmd, "returncode = ", process.returncode)
219    return process.returncode
220
221
222class TimeUnit:
223    def __init__(self, tag, duration_second) -> None:
224        self.tag = tag
225        self.duration_second = duration_second
226
227
228class TestStat:
229    def __init__(self):
230        self.total_count = 0
231        self.success_count = 0
232        self.failed_count = 0
233        self.failed_cases = []
234
235
236class Runner:
237    def __init__(self, test_filter, root_dir, test_type):
238        self.test_filter = test_filter
239        self.test_type = test_type
240        self.obfuscation_tasks = []
241        self.obfuscate_result = TestStat()
242        self.run_with_node_result = TestStat()
243        self.content_compare_result = TestStat()
244
245        # For record duration
246        self.time_list = []
247        self.tag = None
248        self.start_time = 0
249
250        # all source files in test directory
251        self.all_file_list = []
252
253        self.obfscated_cache_root_dir = os.path.normpath(root_dir)
254
255        self.__init()
256
257    def start_record(self, tag):
258        self.start_time = current_time_second()
259        self.tag = tag
260
261    def end_record(self):
262        unit = TimeUnit(self.tag, current_time_second() - self.start_time)
263        self.time_list.append(unit)
264        self.tag = None
265        self.start_time = 0
266
267    def add_task(self, work_dir, obf_config_path):
268        task = Task(work_dir, obf_config_path, self.test_type)
269        self.obfuscation_tasks.append(task)
270
271    def obfuscate(self):
272        self.start_record("obfuscate")
273
274        results = thread_pool(obfuscate_dir, self.obfuscation_tasks)
275
276        for i, result in enumerate(results):
277            self.obfuscate_result.total_count += 1
278            if result == 0:
279                self.obfuscate_result.success_count += 1
280            else:
281                self.obfuscate_result.failed_count += 1
282                self.obfuscate_result.failed_cases.append(self.obfuscation_tasks[i])
283
284        self.end_record()
285        self.clear_obfuscation_tasks()
286
287    def clear_obfuscation_tasks(self):
288        self.obfuscation_tasks.clear()
289
290    def run_with_node(self):
291        self.start_record("run_with_node")
292        self.__init_before_run_with_node()
293
294        run_with_node_list = []
295        for one in self.all_file_list:
296            if should_run_with_node(one):
297                run_with_node_list.append(one)
298
299        results = thread_pool(run_file_with_node, run_with_node_list)
300
301        for i, result in enumerate(results):
302            self.run_with_node_result.total_count += 1
303            if result == 0:
304                self.run_with_node_result.success_count += 1
305            else:
306                self.run_with_node_result.failed_count += 1
307                logging.info(
308                    "run with node failed: return code = %d, %s", results[i], run_with_node_list[i]
309                )
310                self.run_with_node_result.failed_cases.append(run_with_node_list[i])
311
312        self.end_record()
313
314    def content_compare(self):
315        self.start_record("content_compare")
316        # For content compare
317        for one in self.all_file_list:
318            self.__compare_content(one)
319
320        self.end_record()
321
322    def traverse_dirs(self, root_dir, root_config_path):
323        files = sorted(os.listdir(root_dir))
324
325        current_config = None
326        if CONFIG_FILE_NAME in files:
327            current_config = os.path.join(root_dir, CONFIG_FILE_NAME)
328
329        target_config = current_config
330        if not target_config:
331            target_config = root_config_path
332
333        if has_js_or_ts_files(files):
334            self.add_task(root_dir, target_config)
335            return
336
337        for one in files:
338            sub_path = os.path.join(root_dir, one)
339            if os.path.isdir(sub_path):
340                self.traverse_dirs(sub_path, target_config)
341
342    def print_summary(self):
343        logging.info("------------------------------- Duration ----------------------------------------")
344        for unit in self.time_list:
345            logging.info("%s: %f", unit.tag, unit.duration_second)
346
347        logging.info("----------------------------- Grammar Test summary -----------------------------")
348
349        if self.obfuscate_result.failed_count > 0:
350            logging.info("obfuscation failed cases:")
351            for one in self.obfuscate_result.failed_cases:
352                logging.info(one)
353
354        if self.run_with_node_result.failed_count > 0:
355            logging.info("run with node failed cases:")
356            for one in self.run_with_node_result.failed_cases:
357                logging.info(one)
358
359        if self.content_compare_result.failed_count > 0:
360            logging.info("content compare failed cases:")
361            for one in self.content_compare_result.failed_cases:
362                logging.info(one)
363
364        logging.info("obfuscation passed    : %d/%d failed: %d", self.obfuscate_result.success_count,
365            self.obfuscate_result.total_count, self.obfuscate_result.failed_count)
366        logging.info("run with node passed  : %d/%d failed: %d", self.run_with_node_result.success_count,
367            self.run_with_node_result.total_count, self.run_with_node_result.failed_count)
368        logging.info("content compare passed: %d/%d failed: %d", self.content_compare_result.success_count,
369            self.content_compare_result.total_count, self.content_compare_result.failed_count)
370
371    def returncode(self):
372        if (
373            self.obfuscate_result.failed_count
374            + self.run_with_node_result.failed_count
375            + self.content_compare_result.failed_count
376            > 0
377        ):
378            return -1
379        return 0
380
381    def has_failed_cases(self):
382        return (self.obfuscate_result.failed_count > 0) or \
383        (self.run_with_node_result.failed_count > 0) or \
384        (self.content_compare_result.failed_count > 0)
385
386    def get_expect_path(self, file_path):
387        base_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..'))
388        source_expect = SOURCE_EXPECT_MAP[self.test_type]
389        for source, expect in source_expect.items():
390            if file_path.startswith(os.path.join(base_dir, source)):
391                return file_path.replace(source, expect)
392
393    def __filter_files(self, file_list):
394        target = []
395        for one in file_list:
396            if re.match(f".*{self.test_filter}.*", one):
397                target.append(one)
398        return target
399
400    def __init(self):
401        try:
402            shutil.rmtree(self.obfscated_cache_root_dir)
403            logging.info("Directory %s deleted successfully", self.obfscated_cache_root_dir)
404        except FileNotFoundError:
405            logging.info("Directory %s not found", self.obfscated_cache_root_dir)
406        except OSError as e:
407            logging.info("Error deleting directory %s: %s", self.obfscated_cache_root_dir, e)
408
409    def __init_before_run_with_node(self):
410        self.all_file_list = list_all_js_or_ts_files(self.obfscated_cache_root_dir)
411        self.all_file_list = self.__filter_files(self.all_file_list)
412
413        if VERBOSE:
414            for i, one in enumerate(self.all_file_list):
415                logging.info("%d -> %s ", i, one)
416
417    def __compare_expected(
418        self, file_path, actual, expectation, actual_path, expectation_file
419    ):
420        self.content_compare_result.total_count += 1
421        if actual.strip() == expectation.strip():
422            self.content_compare_result.success_count += 1
423        else:
424            self.content_compare_result.failed_count += 1
425            self.content_compare_result.failed_cases.append(file_path)
426            diff = difflib.ndiff(actual.splitlines(), expectation.splitlines())
427            logging.info("compare file, actual path:")
428            logging.info(actual_path)
429            logging.info("compare file, expect path:")
430            logging.info(expectation_file)
431            logging.info("\n".join(diff))
432
433    def __compare_content(self, file_path):
434        source_path = self.get_expect_path(file_path)
435        source_suffix = get_file_suffix(source_path)
436        expectation_path = f"{source_suffix.file_name}_expected.txt"
437        result_suffix = get_file_suffix(file_path)
438        result_cache_path = f"{result_suffix.file_name}.ts.cache.json"
439        expectation_cache_path = f"{source_suffix.file_name}_expected_cache.txt"
440        result_map_path = f"{result_suffix.file_name}.ts.map"
441        expectation_map_path = f"{source_suffix.file_name}_expected_map.txt"
442
443        result_unobfuscation_path = os.path.join(
444            os.path.dirname(result_suffix.file_name), PRINT_UNOBFUSCATION_SUFFIX
445        )
446        expect_unobfuscation_path = (
447            f"{source_suffix.file_name}{EXPECTED_UNOBFUSCATION_SUFFIX}"
448        )
449
450        if os.path.exists(expectation_path):
451            with open(file_path) as actual_file, open(
452                expectation_path
453            ) as expectation_file:
454                actual = actual_file.read()
455                expectation = expectation_file.read()
456                self.__compare_expected(
457                    file_path, actual, expectation, file_path, expectation_file
458                )
459
460        if os.path.exists(expectation_cache_path) and os.path.exists(result_cache_path):
461            with open(result_cache_path) as actual_file, open(
462                expectation_cache_path
463            ) as expectation_file:
464                actual = actual_file.read()
465                expectation = expectation_file.read()
466                self.__compare_expected(
467                    file_path,
468                    actual,
469                    expectation,
470                    result_cache_path,
471                    expectation_cache_path,
472                )
473
474        if os.path.exists(expect_unobfuscation_path) and os.path.exists(
475            result_unobfuscation_path
476        ):
477            with open(result_unobfuscation_path) as actual_file, open(
478                expect_unobfuscation_path
479            ) as expectation_file:
480                actual = actual_file.read()
481                expectation = expectation_file.read()
482                self.__compare_expected(
483                    file_path,
484                    actual,
485                    expectation,
486                    result_unobfuscation_path,
487                    expect_unobfuscation_path,
488                )
489
490        if os.path.exists(expectation_map_path) and os.path.exists(result_map_path):
491            with open(result_map_path) as actual_file, open(
492                expectation_map_path
493            ) as expectation_file:
494                actual = actual_file.read()
495                expectation = expectation_file.read()
496                self.__compare_expected(
497                    file_path,
498                    actual,
499                    expectation,
500                    result_map_path,
501                    expectation_map_path,
502                )
503
504
505def parse_args():
506    # Create an ArgumentParser object
507    parser = argparse.ArgumentParser(
508        description="Script to run arkguard grammar tests"
509    )
510
511    # Define positional argument
512    parser.add_argument(
513        "--test-filter",
514        type=str,
515        default="",
516        help="only run the cases match the filter",
517    )
518
519    # Parse the arguments from the command line
520    args = parser.parse_args()
521
522    return args
523
524
525def main():
526    args = parse_args()
527
528    root_dir = os.path.join(os.path.dirname(__file__), "../test/grammar")
529    root_dir = os.path.normpath(root_dir)
530
531    root_config = os.path.join(root_dir, CONFIG_FILE_NAME)
532
533    local_root_dir = os.path.join(os.path.dirname(__file__), "../test/local")
534    runner = Runner(args.test_filter, local_root_dir, TEST_TYPE)
535    runner.traverse_dirs(root_dir, root_config)
536    runner.obfuscate()
537    runner.run_with_node()
538    runner.content_compare()
539
540    runner.print_summary()
541    return runner.returncode()
542
543if __name__ == '__main__':
544    code = main()
545    sys.exit(code)
546