1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import json 21import shutil 22import shlex 23import subprocess 24import multiprocessing 25import sys 26from multiprocessing import Process 27 28# 子系统json目录 29SYSTEM_JSON = "test/testfwk/developer_test/localCoverage/codeCoverage/subsystem_config.json" 30# 覆盖率gcda 31COVERAGE_GCDA_RESULTS = "test/testfwk/developer_test/localCoverage/codeCoverage/results/coverage/data/cxx" 32# 报告路径 33REPORT_PATH = "test/testfwk/developer_test/localCoverage/codeCoverage/results/coverage/reports/cxx" 34# llvm-gcov.sh 35LLVM_GCOV = "test/testfwk/developer_test/localCoverage/codeCoverage/llvm-gcov.sh" 36# 测试套划分步长 37STEP_SIZE = 10 38 39 40def _init_sys_config(): 41 sys.localcoverage_path = os.path.join(current_path, "..") 42 sys.path.insert(0, sys.localcoverage_path) 43 44 45def call(cmd_list, is_show_cmd=False, out=None, err=None): 46 return_flag = False 47 try: 48 if is_show_cmd: 49 print("execute command: {}".format(" ".join(cmd_list))) 50 if 0 == subprocess.call(cmd_list, shell=False, stdout=out, stderr=err): 51 return_flag = True 52 except IOError: 53 print("Error : command {} execute faild!".format(cmd_list)) 54 return_flag = False 55 56 return return_flag 57 58 59def execute_command(command, printflag=False): 60 try: 61 cmd_list = shlex.split(command) 62 coverage_log_path = os.path.join( 63 CODEPATH, "test/testfwk/developer_test/localCoverage", "coverage.log") 64 with open(coverage_log_path, 'a') as fd: 65 call(cmd_list, printflag, fd, fd) 66 except IOError: 67 print("Error: Exception occur in open error") 68 69 70def get_subsystem_config_info(): 71 filter_subsystem_name_list = [ 72 "subsystem_examples", 73 ] 74 subsystem_info_dic = {} 75 subsystem_config_filepath = os.path.join(CODEPATH, SYSTEM_JSON) 76 if os.path.exists(subsystem_config_filepath): 77 data = None 78 with open(subsystem_config_filepath, "r", encoding="utf-8") as f: 79 data = json.load(f) 80 if not data: 81 print("subsystem config file error.") 82 for value in data.values(): 83 subsystem_name = value.get("name") 84 if subsystem_name in filter_subsystem_name_list: 85 continue 86 subsystem_dir = value.get('dir') 87 subsystem_path = value.get('path') 88 subsystem_project = value.get('project') 89 subsystem_rootpath = [] 90 for path in subsystem_path: 91 subsystem_rootpath.append(os.path.join(CODEPATH, path)) 92 subsystem_info_dic[subsystem_name] = [ 93 subsystem_project, subsystem_dir, 94 subsystem_path, subsystem_rootpath 95 ] 96 return subsystem_info_dic 97 98 99def get_subsystem_name_list(): 100 subsystem_name_list = [] 101 subsystem_info_dic = get_subsystem_config_info() 102 for key in subsystem_info_dic.keys(): 103 subsystem_rootpath = subsystem_info_dic[key][3] 104 for subsystem_rootpath_item in subsystem_rootpath: 105 if os.path.exists(subsystem_rootpath_item): 106 subsystem_name_list.append(key) 107 108 return subsystem_name_list 109 110 111def get_subsystem_rootpath(subsystem_name): 112 subsystem_path = "" 113 subsystem_rootpath = "" 114 subsystem_info_dic = get_subsystem_config_info() 115 for key in subsystem_info_dic.keys(): 116 if key == subsystem_name: 117 subsystem_path = subsystem_info_dic[key][2] 118 subsystem_rootpath = subsystem_info_dic[key][3] 119 break 120 121 return subsystem_path, subsystem_rootpath 122 123 124def is_filterout_dir(ignore_prefix, check_path): 125 filter_out_list = ["unittest", "third_party", "test"] 126 for item in filter_out_list: 127 check_list = check_path[len(ignore_prefix):].split("/") 128 if item in check_list: 129 return True 130 131 return False 132 133 134def rm_unnecessary_dir(cov_path): 135 topdir = os.path.join(cov_path, "obj") 136 for root, dirs, files in os.walk(topdir): 137 if is_filterout_dir(topdir, root): 138 shutil.rmtree(root) 139 140 141def get_files_from_dir(find_path, postfix=None): 142 names = os.listdir(find_path) 143 file_list = [] 144 for fn in names: 145 if not os.path.isfile(os.path.join(find_path, fn)): 146 continue 147 if postfix is not None: 148 if fn.endswith(postfix): 149 file_list.append(fn) 150 else: 151 file_list.append(fn) 152 153 return file_list 154 155 156def get_gcno_files(cov_path, dir_name): 157 gcda_strip_path = dir_name[len(cov_path) + 1:] 158 gcda_list = get_files_from_dir(dir_name, ".gcda") 159 for file_name in gcda_list: 160 gcno_name = f"{os.path.splitext(file_name)[0]}.gcno" 161 gcno_path = os.path.join( 162 os.path.join(CODEPATH, OUTPUT), gcda_strip_path, gcno_name 163 ) 164 if os.path.exists(gcno_path): 165 shutil.copy(gcno_path, dir_name) 166 else: 167 print(f"{gcno_path} not exists!") 168 169 170def get_module_gcno_files(cov_path, dir_name): 171 for root, dirs, files in os.walk(dir_name): 172 get_gcno_files(cov_path, root) 173 174 175def gen_subsystem_trace_info(subsystem, data_dir, test_dir, lcovrc_path): 176 src_dir = os.path.join(CODEPATH, OUTPUT) 177 single_info_path = os.path.join( 178 CODEPATH, REPORT_PATH, "single_test", test_dir 179 ) 180 if not os.path.exists(single_info_path): 181 os.makedirs(single_info_path) 182 output_name = os.path.join( 183 CODEPATH, single_info_path, f"{subsystem}_output.info" 184 ) 185 if not os.path.exists(src_dir): 186 print(f"Sours path {src_dir} not exists!") 187 return 188 189 cmd = "lcov -c -b {} -d {} --gcov-tool {} --config-file {} -o {} --ignore-errors source,gcov".format( 190 src_dir, data_dir, os.path.join(CODEPATH, LLVM_GCOV), lcovrc_path, output_name) 191 print("single_test**##father_pid:%s##child_pid:%s cmd:%s config file:%s" % ( 192 os.getpid(), os.getppid(), cmd, lcovrc_path 193 )) 194 execute_command(cmd) 195 196 197def cut_info(subsystem, test_dir): 198 trace_file = os.path.join( 199 CODEPATH, REPORT_PATH, "single_test", 200 test_dir, f"{subsystem}_output.info" 201 ) 202 output_name = os.path.join( 203 CODEPATH, REPORT_PATH, "single_test", 204 test_dir, f"{subsystem}_strip.info" 205 ) 206 207 remove = r"'*/third_party/*' 'sdk/android-arm64/*'" 208 if not os.path.exists(trace_file): 209 print(f"Error: trace file {trace_file} not exists!") 210 return 211 212 cmd = "lcov --remove {} {} -o {}".format(trace_file, remove, output_name) 213 execute_command(cmd) 214 215 216def gen_info(cov_path, test_dir, subsystem_list, lcovrc_path): 217 if len(subsystem_list) == 0: 218 print("Error: get subsystem list failed, can not generate trace info") 219 return 220 221 loop = 0 222 for subsystem in list(set(subsystem_list)): 223 subsystem_path, subsystem_rootpath = get_subsystem_rootpath(subsystem) 224 for subsys_path in subsystem_path: 225 subsystem_data_abspath = os.path.join(cov_path, "obj", subsys_path) 226 # check id subsystem data is exists 227 if not os.path.exists(subsystem_data_abspath): 228 continue 229 230 # copy gcno to the gcda same directory 231 get_module_gcno_files(cov_path, subsystem_data_abspath) 232 233 # generate coverage info for each subsystem 234 gen_subsystem_trace_info( 235 f"{subsystem}#{subsys_path.replace('/', '_')}#{ str(loop)}", 236 subsystem_data_abspath, test_dir, lcovrc_path 237 ) 238 239 # remove some type which useless 240 cut_info(f"{subsystem}#{subsys_path.replace('/', '_')}#{str(loop)}", test_dir) 241 242 loop += 1 243 244 245def generate_coverage_info(single_test_dir_list, lcovrc_path, subsystem_list): 246 cov_path = os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS) 247 for index, cur_test_dir in enumerate(single_test_dir_list): 248 cur_test_abs_dir = os.path.join(cov_path, cur_test_dir) 249 gen_info(cur_test_abs_dir, cur_test_dir, subsystem_list, lcovrc_path) 250 251 252def gen_all_test_info(subsystem_list): 253 cov_path = os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS) 254 print(os.getpid(), os.getppid()) 255 single_test_dir_list = [] 256 for root, dirs, files in os.walk(cov_path): 257 single_test_dir_list = dirs 258 break 259 260 return single_test_dir_list 261 262 263def merge_subsystem_info_from_all_test(subsystem): 264 single_test_info_path = os.path.join( 265 CODEPATH, REPORT_PATH, "single_test" 266 ) 267 subsystem_info_list = [] 268 subsystem_info_name = f"{subsystem}_strip.info" 269 for root, dirs, files in os.walk(single_test_info_path): 270 for file in files: 271 if file.startswith(subsystem) and file.endswith("_strip.info"): 272 subsystem_info_path_tmp = os.path.join( 273 single_test_info_path, root, file 274 ) 275 print(f"##{subsystem_info_path_tmp}") 276 subsystem_info_list.append(subsystem_info_path_tmp) 277 278 if len(subsystem_info_list) == 0: 279 return 280 281 info_output_name = os.path.join( 282 CODEPATH, REPORT_PATH, subsystem_info_name 283 ) 284 cmd = "lcov -a {} -o {}".format( 285 " -a ".join(subsystem_info_list), info_output_name 286 ) 287 execute_command(cmd) 288 289 290def merge_all_test_subsystem_info(subsystem_list): 291 single_test_info_path = os.path.join( 292 CODEPATH, REPORT_PATH, "single_test" 293 ) 294 if not os.path.exists(single_test_info_path): 295 print(f"Error: the single test info path " 296 f"{single_test_info_path} not exist") 297 return 298 299 for subsystem in subsystem_list: 300 print(f"Merging all {subsystem} info from test data") 301 merge_subsystem_info_from_all_test(subsystem) 302 303 304def merge_info(report_dir): 305 if not os.path.exists(report_dir): 306 print(f"Error: report dir {report_dir} not exists!") 307 return 308 309 subsystem_name_list = get_files_from_dir(report_dir, "_strip.info") 310 if len(subsystem_name_list) == 0: 311 print("Error: get subsystem trace files in report directory failed.") 312 return 313 314 trace_file_list = [] 315 for subsystem in subsystem_name_list: 316 trace_file_name = os.path.join(report_dir, subsystem) 317 trace_file_list.append(trace_file_name) 318 319 cmd = "lcov -a {} -o {}".format( 320 " -a ".join(trace_file_list), os.path.join(report_dir, "ohos_codeCoverage.info") 321 ) 322 execute_command(cmd) 323 324 325def merge_all_subsystem_info(): 326 print("Merging all the subsystem trace files") 327 merge_info(os.path.join(CODEPATH, REPORT_PATH)) 328 329 330def gen_html(cov_path): 331 tracefile = os.path.join(CODEPATH, REPORT_PATH, "ohos_codeCoverage.info") 332 if not os.path.exists(tracefile): 333 print(f"Error: the trace file {tracefile} not exists!") 334 return 335 336 cmd = "genhtml --branch-coverage --demangle-cpp -o {} -p {} --ignore-errors source {}".format( 337 os.path.join(CODEPATH, REPORT_PATH, "html"), CODEPATH, tracefile) 338 execute_command(cmd) 339 340 341def gen_final_report(cov_path): 342 print("Generating the html report") 343 gen_html(cov_path) 344 345 346if __name__ == '__main__': 347 current_path = os.path.abspath(os.path.dirname(__name__)) 348 CODEPATH = current_path.split("/test/testfwk/developer_test")[0] 349 # lcovrc配置文件集合 350 LCOVRC_SET = f"{CODEPATH}/test/testfwk/developer_test/localCoverage/codeCoverage/coverage_rc" 351 _init_sys_config() 352 from localCoverage.utils import get_product_name 353 # 编译生成的out路径 354 OUTPUT = "out/{}".format(get_product_name(CODEPATH)) 355 356 case_list = gen_all_test_info(subsystem_list=get_subsystem_name_list()) 357 multiprocessing.set_start_method("fork") # fork spawn forkserver 358 start = end = 0 359 Tag = False 360 process_list = [] 361 for i in range(len(case_list)): 362 lcov_path = f"{LCOVRC_SET}/lcovrc_cov_{str(i)}" 363 print(lcov_path) 364 if os.path.exists(lcov_path): 365 print(f"{lcov_path}{'@' * 20}yes") 366 else: 367 raise Exception("mutilProcess have error -rc path not existed. " 368 "please fix add run") 369 370 start = end 371 end += STEP_SIZE 372 if end >= len(case_list): 373 end = len(case_list) 374 Tag = True 375 376 p = Process(target=generate_coverage_info, 377 args=(case_list[start:end], lcov_path, 378 get_subsystem_name_list())) 379 p.daemon = True 380 p.start() 381 process_list.append(p) 382 if Tag: 383 break 384 385 for i in process_list: 386 i.join() 387 388 merge_all_test_subsystem_info(subsystem_list=get_subsystem_name_list()) 389 merge_all_subsystem_info() 390 gen_final_report(os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS)) 391