• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2023 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import os
18import time
19import json
20import stat
21import argparse
22import subprocess
23import multiprocessing
24import xml.etree.ElementTree as ET
25from datetime import datetime
26
27
28def build_target_name(xml_file_path):
29    last_backslash_index = xml_file_path.rfind('/')
30    last_dot_index = xml_file_path.rfind('.')
31    if last_backslash_index != -1 and last_dot_index != -1 and last_backslash_index < last_dot_index:
32        result = xml_file_path[last_backslash_index + 1:last_dot_index]
33        return result
34    else:
35        return "Build Target Not Find"
36
37def parse_xml(xml_file_path):
38    """
39    Parse the XML file of the execution output of the use case
40    """
41    test_module_name = build_target_name(xml_file_path)
42    tree = ET.parse(xml_file_path)
43    root = tree.getroot()
44    tests = root.attrib.get("tests")
45    failures = root.attrib.get("failures")
46    failed_info = {
47        "test_module_name": test_module_name,
48        "total_count": tests,
49        "failed_count": failures,
50        "failed_testcase_name": []
51    }
52    passed_info = {
53        "test_module_name": test_module_name,
54        "total_count": tests,
55        "passed_count": 0,
56        "passed_testcase_name": []
57    }
58    passed_count = 0
59    for testsuite in root.findall(".//testsuite"):
60        testsuite_name = testsuite.attrib.get("name")
61        testsuite_failures = testsuite.attrib.get("failures")
62        for testcase in testsuite.findall(".//testcase"):
63            testcase_name = testcase.attrib.get("name")
64            failure = testcase.find("failure")
65            if failure is not None:
66                failed_info["failed_testcase_name"].append("{}#{}".format(testsuite_name, testcase_name))
67            else:
68                passed_info["passed_testcase_name"].append("{}#{}".format(testsuite_name, testcase_name))
69                passed_count = passed_count+1
70    passed_info["passed_count"] = str(passed_count)
71    return failed_info, passed_info
72
73
74def run_command(test_binary_path: str, alter_cmds: list = None):
75    """
76    Run a gtest test binary.
77    """
78    default_cmds = []
79    default_cmds.append(test_binary_path)
80    default_cmds.append("--gtest_output=xml:{}.xml".format(test_binary_path))
81    default_cmds.append("--gtest_print_time=0")
82    default_cmds.append("--gtest_brief=1")
83    if alter_cmds is not None:
84        default_cmds.extend(alter_cmds)
85    subprocess.run(default_cmds)
86
87
88def run_single_test(tests_path, test_suite_name):
89    """
90    Run a gtest test suite
91    """
92    test_suite_path = None
93    for root, _, files in os.walk(tests_path):
94        for file in files:
95            if file.endswith(test_suite_name):
96                test_suite_path =  os.path.join(root, test_suite_name)
97    if test_suite_path is not None:
98        run_command(test_suite_path)
99    else:
100        print("TestSuite {} did not compile successfully.".format(test_suite_name))
101
102
103def run_tests_parallel(test_directory, process_number: int):
104    """
105    Run all gtest test binaries in parallel.
106    """
107    test_binaries = []
108    for root, _, files in os.walk(test_directory):
109        for file in files:
110            test_suite_path = os.path.join(root, file)
111            name, ext = os.path.splitext(file)
112            if ext == "":
113                test_binaries.append(test_suite_path)
114    start = time.time()
115    with multiprocessing.Pool(processes=process_number) as pool:
116        pool.map(run_command, iter(test_binaries))
117    end = time.time()
118    test_result = {
119        "time_stamp": str(datetime.now()),
120        "execute_time": 0,
121        "total_execute_tests": 0,
122        "failed_tests_count": 0,
123        "passed_tests_count": 0,
124        "unavailable": [],
125        "failed": [],
126        "passed": []
127    }
128    total_tests_count = 0
129    failed_tests_count = 0
130    passed_tests_count = 0
131    for test_binary in test_binaries:
132        xml_file_path = "{}.xml".format(test_binary)
133        if os.path.exists(xml_file_path):
134            failed_info, passed_info= parse_xml(xml_file_path)
135            total_tests_count = total_tests_count + int(failed_info.get('total_count', '0'))
136            failed_tests_count = failed_tests_count + int(failed_info.get('failed_count', '0'))
137            passed_tests_count = passed_tests_count + int(passed_info.get('passed_count', '0'))
138            if int(failed_info.get('failed_count', '0')):
139                test_result['failed'].append(failed_info)
140            test_result['passed'].append(passed_info)
141        else:
142            test_result["unavailable"].append(test_binary.split('/')[-1])
143    test_result["execute_time"] = "{} seconds".format(round(end - start, 2))
144    test_result['total_execute_tests'] = total_tests_count
145    test_result['failed_tests_count'] = failed_tests_count
146    test_result['passed_tests_count'] = passed_tests_count
147    json_file_path = os.path.join(test_directory, "test_result.json")
148    flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
149    mode = stat.S_IRUSR | stat.S_IWUSR
150    with os.fdopen(os.open(json_file_path, flags, mode), 'w') as json_file:
151        json.dump(test_result, json_file, indent=2)
152
153    print("The test results have been generated, path is {}".format(json_file_path))
154
155
156def get_tests_out_path():
157    """
158    Obtain the output directory of test cases
159    """
160    code_path = os.getcwd()
161    for _ in range(6):
162        code_path = os.path.dirname(code_path)
163    json_config_path =  os.path.join(code_path,"out/ohos_config.json")
164    if not os.path.exists(json_config_path):
165        print("{} not exist, please build linux_unittest first.".format(json_config_path))
166        code_path = os.path.join(code_path, "out/rk3568/clang_x64/tests/unittest/ace_engine")
167    else:
168        with open(json_config_path, 'r', encoding='utf-8') as file:
169            data = json.load(file)
170            code_path = os.path.join(data["out_path"], "clang_x64/tests/unittest/ace_engine")
171    return code_path
172
173
174def main():
175    """
176    Add unitest case execution parameters
177    """
178    parser = argparse.ArgumentParser()
179    parser.add_argument("-t", "--target", nargs='+', type=str, default=None)
180    parser.add_argument("-p", "--process", nargs='+', type=int, default=64)
181    tests_out_path = get_tests_out_path()
182    args = parser.parse_args()
183    targets = args.target
184    process = args.process
185    if targets is not None:
186        for target in targets:
187            run_single_test(tests_out_path, target)
188    else:
189        run_tests_parallel(tests_out_path, process)
190
191
192if __name__ == "__main__":
193    main()
194