1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2023 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import os 18import time 19import json 20import stat 21import argparse 22import subprocess 23import multiprocessing 24import xml.etree.ElementTree as ET 25from datetime import datetime 26 27 28def build_target_name(xml_file_path): 29 last_backslash_index = xml_file_path.rfind('/') 30 last_dot_index = xml_file_path.rfind('.') 31 if last_backslash_index != -1 and last_dot_index != -1 and last_backslash_index < last_dot_index: 32 result = xml_file_path[last_backslash_index + 1:last_dot_index] 33 return result 34 else: 35 return "Build Target Not Find" 36 37def parse_xml(xml_file_path): 38 """ 39 Parse the XML file of the execution output of the use case 40 """ 41 test_module_name = build_target_name(xml_file_path) 42 tree = ET.parse(xml_file_path) 43 root = tree.getroot() 44 tests = root.attrib.get("tests") 45 failures = root.attrib.get("failures") 46 failed_info = { 47 "test_module_name": test_module_name, 48 "total_count": tests, 49 "failed_count": failures, 50 "failed_testcase_name": [] 51 } 52 passed_info = { 53 "test_module_name": test_module_name, 54 "total_count": tests, 55 "passed_count": 0, 56 "passed_testcase_name": [] 57 } 58 passed_count = 0 59 for testsuite in root.findall(".//testsuite"): 60 testsuite_name = testsuite.attrib.get("name") 61 testsuite_failures = testsuite.attrib.get("failures") 62 for testcase in testsuite.findall(".//testcase"): 63 testcase_name = testcase.attrib.get("name") 64 failure = testcase.find("failure") 65 if failure is not None: 66 failed_info["failed_testcase_name"].append("{}#{}".format(testsuite_name, testcase_name)) 67 else: 68 passed_info["passed_testcase_name"].append("{}#{}".format(testsuite_name, testcase_name)) 69 passed_count = passed_count+1 70 passed_info["passed_count"] = str(passed_count) 71 return failed_info, passed_info 72 73 74def run_command(test_binary_path: str, alter_cmds: list = None): 75 """ 76 Run a gtest test binary. 77 """ 78 default_cmds = [] 79 default_cmds.append(test_binary_path) 80 default_cmds.append("--gtest_output=xml:{}.xml".format(test_binary_path)) 81 default_cmds.append("--gtest_print_time=0") 82 default_cmds.append("--gtest_brief=1") 83 if alter_cmds is not None: 84 default_cmds.extend(alter_cmds) 85 subprocess.run(default_cmds) 86 87 88def run_single_test(tests_path, test_suite_name): 89 """ 90 Run a gtest test suite 91 """ 92 test_suite_path = None 93 for root, _, files in os.walk(tests_path): 94 for file in files: 95 if file.endswith(test_suite_name): 96 test_suite_path = os.path.join(root, test_suite_name) 97 if test_suite_path is not None: 98 run_command(test_suite_path) 99 else: 100 print("TestSuite {} did not compile successfully.".format(test_suite_name)) 101 102 103def run_tests_parallel(test_directory, process_number: int): 104 """ 105 Run all gtest test binaries in parallel. 106 """ 107 test_binaries = [] 108 for root, _, files in os.walk(test_directory): 109 for file in files: 110 test_suite_path = os.path.join(root, file) 111 name, ext = os.path.splitext(file) 112 if ext == "": 113 test_binaries.append(test_suite_path) 114 start = time.time() 115 with multiprocessing.Pool(processes=process_number) as pool: 116 pool.map(run_command, iter(test_binaries)) 117 end = time.time() 118 test_result = { 119 "time_stamp": str(datetime.now()), 120 "execute_time": 0, 121 "total_execute_tests": 0, 122 "failed_tests_count": 0, 123 "passed_tests_count": 0, 124 "unavailable": [], 125 "failed": [], 126 "passed": [] 127 } 128 total_tests_count = 0 129 failed_tests_count = 0 130 passed_tests_count = 0 131 for test_binary in test_binaries: 132 xml_file_path = "{}.xml".format(test_binary) 133 if os.path.exists(xml_file_path): 134 failed_info, passed_info= parse_xml(xml_file_path) 135 total_tests_count = total_tests_count + int(failed_info.get('total_count', '0')) 136 failed_tests_count = failed_tests_count + int(failed_info.get('failed_count', '0')) 137 passed_tests_count = passed_tests_count + int(passed_info.get('passed_count', '0')) 138 if int(failed_info.get('failed_count', '0')): 139 test_result['failed'].append(failed_info) 140 test_result['passed'].append(passed_info) 141 else: 142 test_result["unavailable"].append(test_binary.split('/')[-1]) 143 test_result["execute_time"] = "{} seconds".format(round(end - start, 2)) 144 test_result['total_execute_tests'] = total_tests_count 145 test_result['failed_tests_count'] = failed_tests_count 146 test_result['passed_tests_count'] = passed_tests_count 147 json_file_path = os.path.join(test_directory, "test_result.json") 148 flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC 149 mode = stat.S_IRUSR | stat.S_IWUSR 150 with os.fdopen(os.open(json_file_path, flags, mode), 'w') as json_file: 151 json.dump(test_result, json_file, indent=2) 152 153 print("The test results have been generated, path is {}".format(json_file_path)) 154 155 156def get_tests_out_path(): 157 """ 158 Obtain the output directory of test cases 159 """ 160 code_path = os.getcwd() 161 for _ in range(6): 162 code_path = os.path.dirname(code_path) 163 json_config_path = os.path.join(code_path,"out/ohos_config.json") 164 if not os.path.exists(json_config_path): 165 print("{} not exist, please build linux_unittest first.".format(json_config_path)) 166 code_path = os.path.join(code_path, "out/rk3568/clang_x64/tests/unittest/ace_engine") 167 else: 168 with open(json_config_path, 'r', encoding='utf-8') as file: 169 data = json.load(file) 170 code_path = os.path.join(data["out_path"], "clang_x64/tests/unittest/ace_engine") 171 return code_path 172 173 174def main(): 175 """ 176 Add unitest case execution parameters 177 """ 178 parser = argparse.ArgumentParser() 179 parser.add_argument("-t", "--target", nargs='+', type=str, default=None) 180 parser.add_argument("-p", "--process", nargs='+', type=int, default=64) 181 tests_out_path = get_tests_out_path() 182 args = parser.parse_args() 183 targets = args.target 184 process = args.process 185 if targets is not None: 186 for target in targets: 187 run_single_test(tests_out_path, target) 188 else: 189 run_tests_parallel(tests_out_path, process) 190 191 192if __name__ == "__main__": 193 main() 194