#!/usr/bin/env python3 # coding=utf-8 # # Copyright (c) 2023 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import os import json import shutil import shlex import subprocess import multiprocessing import sys import stat from multiprocessing import Process FLAGS = os.O_WRONLY | os.O_APPEND | os.O_CREAT MODES = stat.S_IWUSR | stat.S_IRUSR # 子系统json目录 SYSTEM_JSON = "test/testfwk/developer_test/localCoverage/codeCoverage/subsystem_config.json" # 覆盖率gcda COVERAGE_GCDA_RESULTS = "test/testfwk/developer_test/localCoverage/codeCoverage/results/coverage/data/cxx" # 报告路径 REPORT_PATH = "test/testfwk/developer_test/localCoverage/codeCoverage/results/coverage/reports/cxx" # llvm-gcov.sh LLVM_GCOV = "test/testfwk/developer_test/localCoverage/codeCoverage/llvm-gcov.sh" # 测试套划分步长 STEP_SIZE = 10 def _init_sys_config(): sys.localcoverage_path = os.path.join(current_path, "..") sys.path.insert(0, sys.localcoverage_path) def call(cmd_list, is_show_cmd=False, out=None, err=None): return_flag = False try: if is_show_cmd: print("execute command: {}".format(" ".join(cmd_list))) if 0 == subprocess.call(cmd_list, shell=False, stdout=out, stderr=err): return_flag = True except IOError: print("Error : command {} execute faild!".format(cmd_list)) return_flag = False return return_flag def execute_command(command, printflag=False): try: cmd_list = shlex.split(command) coverage_log_path = os.path.join( CODEPATH, "test/testfwk/developer_test/localCoverage", "coverage.log") with os.fdopen(os.open(coverage_log_path, FLAGS, MODES), 'a') as fd: call(cmd_list, printflag, fd, fd) except IOError: print("Error: Exception occur in open error") def get_subsystem_config_info(): filter_subsystem_name_list = [ "subsystem_examples", ] subsystem_info_dic = {} subsystem_config_filepath = os.path.join(CODEPATH, SYSTEM_JSON) if os.path.exists(subsystem_config_filepath): data = None with open(subsystem_config_filepath, "r", encoding="utf-8") as f: data = json.load(f) if not data: print("subsystem config file error.") for value in data.values(): subsystem_name = value.get("name") if subsystem_name in filter_subsystem_name_list: continue subsystem_dir = value.get('dir') subsystem_path = value.get('path') subsystem_project = value.get('project') subsystem_rootpath = [] for path in subsystem_path: subsystem_rootpath.append(os.path.join(CODEPATH, path)) subsystem_info_dic[subsystem_name] = [ subsystem_project, subsystem_dir, subsystem_path, subsystem_rootpath ] return subsystem_info_dic def get_subsystem_name_list(): subsystem_name_list = [] subsystem_info_dic = get_subsystem_config_info() for key in subsystem_info_dic.keys(): subsystem_rootpath = subsystem_info_dic[key][3] for subsystem_rootpath_item in subsystem_rootpath: if os.path.exists(subsystem_rootpath_item): subsystem_name_list.append(key) return subsystem_name_list def get_subsystem_rootpath(subsystem_name): subsystem_path = "" subsystem_rootpath = "" subsystem_info_dic = get_subsystem_config_info() for key in subsystem_info_dic.keys(): if key == subsystem_name: subsystem_path = subsystem_info_dic[key][2] subsystem_rootpath = subsystem_info_dic[key][3] break return subsystem_path, subsystem_rootpath def is_filterout_dir(ignore_prefix, check_path): filter_out_list = ["unittest", "third_party", "test"] for item in filter_out_list: check_list = check_path[len(ignore_prefix):].split("/") if item in check_list: return True return False def rm_unnecessary_dir(cov_path): topdir = os.path.join(cov_path, "obj") for root, dirs, files in os.walk(topdir): if is_filterout_dir(topdir, root): shutil.rmtree(root) def get_files_from_dir(find_path, postfix=None): names = os.listdir(find_path) file_list = [] for fn in names: if not os.path.isfile(os.path.join(find_path, fn)): continue if postfix is not None: if fn.endswith(postfix): file_list.append(fn) else: file_list.append(fn) return file_list def get_gcno_files(cov_path, dir_name): gcda_strip_path = dir_name[len(cov_path) + 1:] gcda_list = get_files_from_dir(dir_name, ".gcda") for file_name in gcda_list: gcno_name = f"{os.path.splitext(file_name)[0]}.gcno" gcno_path = os.path.join( os.path.join(CODEPATH, OUTPUT), gcda_strip_path, gcno_name ) if os.path.exists(gcno_path): shutil.copy(gcno_path, dir_name) else: print(f"{gcno_path} not exists!") def get_module_gcno_files(cov_path, dir_name): for root, dirs, files in os.walk(dir_name): get_gcno_files(cov_path, root) def gen_subsystem_trace_info(subsystem, data_dir, test_dir, lcovrc_path): src_dir = os.path.join(CODEPATH, OUTPUT) single_info_path = os.path.join( CODEPATH, REPORT_PATH, "single_test", test_dir ) if not os.path.exists(single_info_path): os.makedirs(single_info_path) output_name = os.path.join( CODEPATH, single_info_path, f"{subsystem}_output.info" ) if not os.path.exists(src_dir): print(f"Sours path {src_dir} not exists!") return cmd = "lcov -c -b {} -d {} --gcov-tool {} --config-file {} -o {} --ignore-errors source,gcov".format( src_dir, data_dir, os.path.join(CODEPATH, LLVM_GCOV), lcovrc_path, output_name) print("single_test**##father_pid:%s##child_pid:%s cmd:%s config file:%s" % ( os.getpid(), os.getppid(), cmd, lcovrc_path )) execute_command(cmd) def cut_info(subsystem, test_dir): trace_file = os.path.join( CODEPATH, REPORT_PATH, "single_test", test_dir, f"{subsystem}_output.info" ) output_name = os.path.join( CODEPATH, REPORT_PATH, "single_test", test_dir, f"{subsystem}_strip.info" ) remove = r"'*/third_party/*' '*/prebuilts/*' '*/unittest/*' " \ r"'*/moduletest/*' '*/systemtest/*' '*.h*'" if not os.path.exists(trace_file): print(f"Error: trace file {trace_file} not exists!") return cmd = "lcov --remove {} {} -o {}".format(trace_file, remove, output_name) execute_command(cmd) def gen_info(cov_path, test_dir, subsystem_list, lcovrc_path): if len(subsystem_list) == 0: print("Error: get subsystem list failed, can not generate trace info") return loop = 0 for subsystem in list(set(subsystem_list)): subsystem_path, subsystem_rootpath = get_subsystem_rootpath(subsystem) for subsys_path in subsystem_path: subsystem_data_abspath = os.path.join(cov_path, "obj", subsys_path) # check id subsystem data is exists if not os.path.exists(subsystem_data_abspath): continue # copy gcno to the gcda same directory get_module_gcno_files(cov_path, subsystem_data_abspath) # generate coverage info for each subsystem gen_subsystem_trace_info( f"{subsystem}#{subsys_path.replace('/', '_')}#{ str(loop)}", subsystem_data_abspath, test_dir, lcovrc_path ) # remove some type which useless cut_info(f"{subsystem}#{subsys_path.replace('/', '_')}#{str(loop)}", test_dir) loop += 1 def generate_coverage_info(single_test_dir_list, lcovrc_path, subsystem_list): cov_path = os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS) for index, cur_test_dir in enumerate(single_test_dir_list): cur_test_abs_dir = os.path.join(cov_path, cur_test_dir) gen_info(cur_test_abs_dir, cur_test_dir, subsystem_list, lcovrc_path) def gen_all_test_info(subsystem_list): cov_path = os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS) print(os.getpid(), os.getppid()) single_test_dir_list = [] for root, dirs, files in os.walk(cov_path): single_test_dir_list = dirs break return single_test_dir_list def merge_subsystem_info_from_all_test(subsystem): single_test_info_path = os.path.join( CODEPATH, REPORT_PATH, "single_test" ) subsystem_info_list = [] subsystem_info_name = f"{subsystem}_strip.info" for root, dirs, files in os.walk(single_test_info_path): for file in files: if file.startswith(subsystem) and file.endswith("_strip.info"): subsystem_info_path_tmp = os.path.join( single_test_info_path, root, file ) print(f"##{subsystem_info_path_tmp}") subsystem_info_list.append(subsystem_info_path_tmp) if len(subsystem_info_list) == 0: return info_output_name = os.path.join( CODEPATH, REPORT_PATH, subsystem_info_name ) cmd = "lcov -a {} -o {}".format( " -a ".join(subsystem_info_list), info_output_name ) execute_command(cmd) def merge_all_test_subsystem_info(subsystem_list): single_test_info_path = os.path.join( CODEPATH, REPORT_PATH, "single_test" ) if not os.path.exists(single_test_info_path): print(f"Error: the single test info path " f"{single_test_info_path} not exist") return for subsystem in subsystem_list: print(f"Merging all {subsystem} info from test data") merge_subsystem_info_from_all_test(subsystem) def merge_info(report_dir): if not os.path.exists(report_dir): print(f"Error: report dir {report_dir} not exists!") return subsystem_name_list = get_files_from_dir(report_dir, "_strip.info") if len(subsystem_name_list) == 0: print("Error: get subsystem trace files in report directory failed.") return trace_file_list = [] for subsystem in subsystem_name_list: trace_file_name = os.path.join(report_dir, subsystem) trace_file_list.append(trace_file_name) cmd = "lcov -a {} -o {}".format( " -a ".join(trace_file_list), os.path.join(report_dir, "ohos_codeCoverage.info") ) execute_command(cmd) def merge_all_subsystem_info(): print("Merging all the subsystem trace files") merge_info(os.path.join(CODEPATH, REPORT_PATH)) def gen_html(cov_path): tracefile = os.path.join(CODEPATH, REPORT_PATH, "ohos_codeCoverage.info") if not os.path.exists(tracefile): print(f"Error: the trace file {tracefile} not exists!") return cmd = "genhtml --branch-coverage --demangle-cpp -o {} -p {} --ignore-errors source {}".format( os.path.join(CODEPATH, REPORT_PATH, "html"), CODEPATH, tracefile) execute_command(cmd) def gen_final_report(cov_path): print("Generating the html report") gen_html(cov_path) if __name__ == '__main__': current_path = os.path.abspath(os.path.dirname(__name__)) CODEPATH = current_path.split("/test/testfwk/developer_test")[0] # lcovrc配置文件集合 LCOVRC_SET = f"{CODEPATH}/test/testfwk/developer_test/localCoverage/codeCoverage/coverage_rc" _init_sys_config() from localCoverage.utils import get_product_name # 编译生成的out路径 OUTPUT = "out/{}".format(get_product_name(CODEPATH)) case_list = gen_all_test_info(subsystem_list=get_subsystem_name_list()) multiprocessing.set_start_method("fork") # fork spawn forkserver start = end = 0 Tag = False process_list = [] for i in range(len(case_list)): lcov_path = f"{LCOVRC_SET}/lcovrc_cov_{str(i)}" print(lcov_path) if os.path.exists(lcov_path): print(f"{lcov_path}{'@' * 20}yes") else: raise Exception("mutilProcess have error -rc path not existed. " "please fix add run") start = end end += STEP_SIZE if end >= len(case_list): end = len(case_list) Tag = True p = Process(target=generate_coverage_info, args=(case_list[start:end], lcov_path, get_subsystem_name_list())) p.daemon = True p.start() process_list.append(p) if Tag: break for i in process_list: i.join() merge_all_test_subsystem_info(subsystem_list=get_subsystem_name_list()) merge_all_subsystem_info() gen_final_report(os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS))