1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import json 21import shutil 22import shlex 23import subprocess 24import multiprocessing 25import sys 26import stat 27 28from multiprocessing import Process 29 30FLAGS = os.O_WRONLY | os.O_APPEND | os.O_CREAT 31MODES = stat.S_IWUSR | stat.S_IRUSR 32 33# 子系统json目录 34SYSTEM_JSON = "test/testfwk/developer_test/local_coverage/code_coverage/subsystem_config.json" 35# 覆盖率gcda 36COVERAGE_GCDA_RESULTS = "test/testfwk/developer_test/local_coverage/code_coverage/results/coverage/data/cxx" 37# 报告路径 38REPORT_PATH = "test/testfwk/developer_test/local_coverage/code_coverage/results/coverage/reports/cxx" 39# llvm-gcov.sh 40LLVM_GCOV = "test/testfwk/developer_test/local_coverage/code_coverage/llvm-gcov.sh" 41# 测试套划分步长 42STEP_SIZE = 10 43 44 45def _init_sys_config(): 46 sys.localcoverage_path = os.path.join(current_path, "..") 47 sys.path.insert(0, sys.localcoverage_path) 48 49 50def call(cmd_list, is_show_cmd=False, out=None, err=None): 51 return_flag = False 52 try: 53 if is_show_cmd: 54 print("execute command: {}".format(" ".join(cmd_list))) 55 if 0 == subprocess.call(cmd_list, shell=False, stdout=out, stderr=err): 56 return_flag = True 57 except IOError: 58 print("Error : command {} execute faild!".format(cmd_list)) 59 return_flag = False 60 61 return return_flag 62 63 64def execute_command(command, printflag=False): 65 try: 66 cmd_list = shlex.split(command) 67 coverage_log_path = os.path.join( 68 CODEPATH, "test/testfwk/developer_test/local_coverage", "coverage.log") 69 with os.fdopen(os.open(coverage_log_path, FLAGS, MODES), 'a') as fd: 70 call(cmd_list, printflag, fd, fd) 71 except IOError: 72 print("Error: Exception occur in open error") 73 74 75def get_subsystem_config_info(): 76 filter_subsystem_name_list = [ 77 "subsystem_examples", 78 ] 79 subsystem_info_dic = {} 80 subsystem_config_filepath = os.path.join(CODEPATH, SYSTEM_JSON) 81 if os.path.exists(subsystem_config_filepath): 82 data = None 83 with open(subsystem_config_filepath, "r", encoding="utf-8") as f: 84 data = json.load(f) 85 if not data: 86 print("subsystem config file error.") 87 for value in data.values(): 88 subsystem_name = value.get("name") 89 if subsystem_name in filter_subsystem_name_list: 90 continue 91 subsystem_dir = value.get('dir') 92 subsystem_path = value.get('path') 93 subsystem_project = value.get('project') 94 subsystem_rootpath = [] 95 for path in subsystem_path: 96 subsystem_rootpath.append(os.path.join(CODEPATH, path)) 97 subsystem_info_dic[subsystem_name] = [ 98 subsystem_project, subsystem_dir, 99 subsystem_path, subsystem_rootpath 100 ] 101 return subsystem_info_dic 102 103 104def get_subsystem_name_list(): 105 subsystem_name_list = [] 106 subsystem_info_dic = get_subsystem_config_info() 107 for key in subsystem_info_dic.keys(): 108 subsystem_rootpath = subsystem_info_dic[key][3] 109 for subsystem_rootpath_item in subsystem_rootpath: 110 if os.path.exists(subsystem_rootpath_item): 111 subsystem_name_list.append(key) 112 113 return subsystem_name_list 114 115 116def get_subsystem_rootpath(subsystem_name): 117 subsystem_path = "" 118 subsystem_rootpath = "" 119 subsystem_info_dic = get_subsystem_config_info() 120 for key in subsystem_info_dic.keys(): 121 if key == subsystem_name: 122 subsystem_path = subsystem_info_dic[key][2] 123 subsystem_rootpath = subsystem_info_dic[key][3] 124 break 125 126 return subsystem_path, subsystem_rootpath 127 128 129def is_filterout_dir(ignore_prefix, check_path): 130 filter_out_list = ["unittest", "third_party", "test"] 131 for item in filter_out_list: 132 check_list = check_path[len(ignore_prefix):].split("/") 133 if item in check_list: 134 return True 135 136 return False 137 138 139def rm_unnecessary_dir(cov_path): 140 topdir = os.path.join(cov_path, "obj") 141 for root, dirs, files in os.walk(topdir): 142 if is_filterout_dir(topdir, root): 143 shutil.rmtree(root) 144 145 146def get_files_from_dir(find_path, postfix=None): 147 names = os.listdir(find_path) 148 file_list = [] 149 for fn in names: 150 if not os.path.isfile(os.path.join(find_path, fn)): 151 continue 152 if postfix is not None: 153 if fn.endswith(postfix): 154 file_list.append(fn) 155 else: 156 file_list.append(fn) 157 158 return file_list 159 160 161def get_gcno_files(cov_path, dir_name): 162 gcda_strip_path = dir_name[len(cov_path) + 1:] 163 gcda_list = get_files_from_dir(dir_name, ".gcda") 164 for file_name in gcda_list: 165 gcno_name = f"{os.path.splitext(file_name)[0]}.gcno" 166 gcno_path = os.path.join( 167 os.path.join(CODEPATH, OUTPUT), gcda_strip_path, gcno_name 168 ) 169 if os.path.exists(gcno_path): 170 shutil.copy(gcno_path, dir_name) 171 else: 172 print(f"{gcno_path} not exists!") 173 174 175def get_module_gcno_files(cov_path, dir_name): 176 for root, dirs, files in os.walk(dir_name): 177 get_gcno_files(cov_path, root) 178 179 180def gen_subsystem_trace_info(subsystem, data_dir, test_dir, lcovrc_path): 181 src_dir = os.path.join(CODEPATH, OUTPUT) 182 single_info_path = os.path.join( 183 CODEPATH, REPORT_PATH, "single_test", test_dir 184 ) 185 if not os.path.exists(single_info_path): 186 os.makedirs(single_info_path) 187 output_name = os.path.join( 188 CODEPATH, single_info_path, f"{subsystem}_output.info" 189 ) 190 if not os.path.exists(src_dir): 191 print(f"Sours path {src_dir} not exists!") 192 return 193 194 cmd = "lcov -c -b {} -d {} --gcov-tool {} --config-file {} -o {} --ignore-errors source,gcov".format( 195 src_dir, data_dir, os.path.join(CODEPATH, LLVM_GCOV), lcovrc_path, output_name) 196 print("single_test**##father_pid:%s##child_pid:%s cmd:%s config file:%s" % ( 197 os.getpid(), os.getppid(), cmd, lcovrc_path 198 )) 199 execute_command(cmd) 200 201 202def cut_info(subsystem, test_dir): 203 trace_file = os.path.join( 204 CODEPATH, REPORT_PATH, "single_test", 205 test_dir, f"{subsystem}_output.info" 206 ) 207 output_name = os.path.join( 208 CODEPATH, REPORT_PATH, "single_test", 209 test_dir, f"{subsystem}_strip.info" 210 ) 211 212 remove = r"'*/third_party/*' '*/prebuilts/*' '*/unittest/*' " \ 213 r"'*/moduletest/*' '*/systemtest/*' '*.h*'" 214 if not os.path.exists(trace_file): 215 print(f"Error: trace file {trace_file} not exists!") 216 return 217 218 cmd = "lcov --remove {} {} -o {}".format(trace_file, remove, output_name) 219 execute_command(cmd) 220 delete_empty_info_file(output_name) 221 222 223def delete_empty_info_file(filename): 224 if os.path.exists(filename) and os.stat(filename).st_size == 0: 225 print(f"empty file {filename} deleted") 226 os.remove(filename) 227 228 229def gen_info(cov_path, test_dir, subsystem_list, lcovrc_path): 230 if len(subsystem_list) == 0: 231 print("Error: get subsystem list failed, can not generate trace info") 232 return 233 234 loop = 0 235 for subsystem in list(set(subsystem_list)): 236 subsystem_path, subsystem_rootpath = get_subsystem_rootpath(subsystem) 237 for subsys_path in subsystem_path: 238 subsystem_data_abspath = os.path.join(cov_path, "obj", subsys_path) 239 # check id subsystem data is exists 240 if not os.path.exists(subsystem_data_abspath): 241 continue 242 243 # copy gcno to the gcda same directory 244 get_module_gcno_files(cov_path, subsystem_data_abspath) 245 246 # generate coverage info for each subsystem 247 gen_subsystem_trace_info( 248 f"{subsystem}#{subsys_path.replace('/', '_')}#{ str(loop)}", 249 subsystem_data_abspath, test_dir, lcovrc_path 250 ) 251 252 # remove some type which useless 253 cut_info(f"{subsystem}#{subsys_path.replace('/', '_')}#{str(loop)}", test_dir) 254 255 loop += 1 256 257 258def generate_coverage_info(single_test_dir_list, lcovrc_path, subsystem_list): 259 cov_path = os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS) 260 for index, cur_test_dir in enumerate(single_test_dir_list): 261 cur_test_abs_dir = os.path.join(cov_path, cur_test_dir) 262 gen_info(cur_test_abs_dir, cur_test_dir, subsystem_list, lcovrc_path) 263 264 265def gen_all_test_info(subsystem_list): 266 cov_path = os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS) 267 print(os.getpid(), os.getppid()) 268 single_test_dir_list = [] 269 for root, dirs, files in os.walk(cov_path): 270 single_test_dir_list = dirs 271 break 272 273 return single_test_dir_list 274 275 276def merge_subsystem_info_from_all_test(subsystem): 277 single_test_info_path = os.path.join( 278 CODEPATH, REPORT_PATH, "single_test" 279 ) 280 subsystem_info_list = [] 281 subsystem_info_name = f"{subsystem}_strip.info" 282 for root, dirs, files in os.walk(single_test_info_path): 283 for file in files: 284 if file.startswith(subsystem) and file.endswith("_strip.info"): 285 subsystem_info_path_tmp = os.path.join( 286 single_test_info_path, root, file 287 ) 288 print(f"##{subsystem_info_path_tmp}") 289 subsystem_info_list.append(subsystem_info_path_tmp) 290 291 if len(subsystem_info_list) == 0: 292 return 293 294 info_output_name = os.path.join( 295 CODEPATH, REPORT_PATH, subsystem_info_name 296 ) 297 cmd = "lcov -a {} -o {}".format( 298 " -a ".join(subsystem_info_list), info_output_name 299 ) 300 execute_command(cmd) 301 302 303def merge_all_test_subsystem_info(subsystem_list): 304 single_test_info_path = os.path.join( 305 CODEPATH, REPORT_PATH, "single_test" 306 ) 307 if not os.path.exists(single_test_info_path): 308 print(f"Error: the single test info path " 309 f"{single_test_info_path} not exist") 310 return 311 312 for subsystem in subsystem_list: 313 print(f"Merging all {subsystem} info from test data") 314 merge_subsystem_info_from_all_test(subsystem) 315 316 317def merge_info(report_dir): 318 if not os.path.exists(report_dir): 319 print(f"Error: report dir {report_dir} not exists!") 320 return 321 322 subsystem_name_list = get_files_from_dir(report_dir, "_strip.info") 323 if len(subsystem_name_list) == 0: 324 print("Error: get subsystem trace files in report directory failed.") 325 return 326 327 trace_file_list = [] 328 for subsystem in subsystem_name_list: 329 trace_file_name = os.path.join(report_dir, subsystem) 330 trace_file_list.append(trace_file_name) 331 332 cmd = "lcov -a {} -o {}".format( 333 " -a ".join(trace_file_list), os.path.join(report_dir, "ohos_codeCoverage.info") 334 ) 335 execute_command(cmd) 336 337 338def merge_all_subsystem_info(): 339 print("Merging all the subsystem trace files") 340 merge_info(os.path.join(CODEPATH, REPORT_PATH)) 341 342 343def gen_html(cov_path): 344 tracefile = os.path.join(CODEPATH, REPORT_PATH, "ohos_codeCoverage.info") 345 if not os.path.exists(tracefile): 346 print(f"Error: the trace file {tracefile} not exists!") 347 return 348 349 cmd = "genhtml --branch-coverage --demangle-cpp -o {} -p {} --ignore-errors source {}".format( 350 os.path.join(CODEPATH, REPORT_PATH, "html"), CODEPATH, tracefile) 351 execute_command(cmd) 352 353 354def gen_final_report(cov_path): 355 print("Generating the html report") 356 gen_html(cov_path) 357 358 359if __name__ == '__main__': 360 current_path = os.path.abspath(os.path.dirname(__name__)) 361 CODEPATH = current_path.split("/test/testfwk/developer_test")[0] 362 # lcovrc配置文件集合 363 LCOVRC_SET = f"{CODEPATH}/test/testfwk/developer_test/local_coverage/code_coverage/coverage_rc" 364 _init_sys_config() 365 from local_coverage.utils import get_product_name 366 # 编译生成的out路径 367 OUTPUT = "out/{}".format(get_product_name(CODEPATH)) 368 369 case_list = gen_all_test_info(subsystem_list=get_subsystem_name_list()) 370 multiprocessing.set_start_method("fork") # fork spawn forkserver 371 start = end = 0 372 Tag = False 373 process_list = [] 374 for i in range(len(case_list)): 375 lcov_path = f"{LCOVRC_SET}/lcovrc_cov_{str(i)}" 376 print(lcov_path) 377 if os.path.exists(lcov_path): 378 print(f"{lcov_path}{'@' * 20}yes") 379 else: 380 raise Exception("mutilProcess have error -rc path not existed. " 381 "please fix add run") 382 383 start = end 384 end += STEP_SIZE 385 if end >= len(case_list): 386 end = len(case_list) 387 Tag = True 388 389 p = Process(target=generate_coverage_info, 390 args=(case_list[start:end], lcov_path, 391 get_subsystem_name_list())) 392 p.daemon = True 393 p.start() 394 process_list.append(p) 395 if Tag: 396 break 397 398 for i in process_list: 399 i.join() 400 401 merge_all_test_subsystem_info(subsystem_list=get_subsystem_name_list()) 402 merge_all_subsystem_info() 403 gen_final_report(os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS)) 404