• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import json
21import shutil
22import shlex
23import subprocess
24import multiprocessing
25import sys
26import stat
27
28from multiprocessing import Process
29
30FLAGS = os.O_WRONLY | os.O_APPEND | os.O_CREAT
31MODES = stat.S_IWUSR | stat.S_IRUSR
32
33# 子系统json目录
34SYSTEM_JSON = "test/testfwk/developer_test/local_coverage/code_coverage/subsystem_config.json"
35# 覆盖率gcda
36COVERAGE_GCDA_RESULTS = "test/testfwk/developer_test/local_coverage/code_coverage/results/coverage/data/cxx"
37# 报告路径
38REPORT_PATH = "test/testfwk/developer_test/local_coverage/code_coverage/results/coverage/reports/cxx"
39# llvm-gcov.sh
40LLVM_GCOV = "test/testfwk/developer_test/local_coverage/code_coverage/llvm-gcov.sh"
41# 测试套划分步长
42STEP_SIZE = 10
43
44
45def _init_sys_config():
46    sys.localcoverage_path = os.path.join(current_path, "..")
47    sys.path.insert(0, sys.localcoverage_path)
48
49
50def call(cmd_list, is_show_cmd=False, out=None, err=None):
51    return_flag = False
52    try:
53        if is_show_cmd:
54            print("execute command: {}".format(" ".join(cmd_list)))
55        if 0 == subprocess.call(cmd_list, shell=False, stdout=out, stderr=err):
56            return_flag = True
57    except IOError:
58        print("Error : command {} execute faild!".format(cmd_list))
59        return_flag = False
60
61    return return_flag
62
63
64def execute_command(command, printflag=False):
65    try:
66        cmd_list = shlex.split(command)
67        coverage_log_path = os.path.join(
68            CODEPATH, "test/testfwk/developer_test/local_coverage", "coverage.log")
69        with os.fdopen(os.open(coverage_log_path, FLAGS, MODES), 'a') as fd:
70            call(cmd_list, printflag, fd, fd)
71    except IOError:
72        print("Error: Exception occur in open error")
73
74
75def get_subsystem_config_info():
76    filter_subsystem_name_list = [
77        "subsystem_examples",
78    ]
79    subsystem_info_dic = {}
80    subsystem_config_filepath = os.path.join(CODEPATH, SYSTEM_JSON)
81    if os.path.exists(subsystem_config_filepath):
82        data = None
83        with open(subsystem_config_filepath, "r", encoding="utf-8") as f:
84            data = json.load(f)
85        if not data:
86            print("subsystem config file error.")
87        for value in data.values():
88            subsystem_name = value.get("name")
89            if subsystem_name in filter_subsystem_name_list:
90                continue
91            subsystem_dir = value.get('dir')
92            subsystem_path = value.get('path')
93            subsystem_project = value.get('project')
94            subsystem_rootpath = []
95            for path in subsystem_path:
96                subsystem_rootpath.append(os.path.join(CODEPATH, path))
97            subsystem_info_dic[subsystem_name] = [
98                subsystem_project, subsystem_dir,
99                subsystem_path, subsystem_rootpath
100            ]
101    return subsystem_info_dic
102
103
104def get_subsystem_name_list():
105    subsystem_name_list = []
106    subsystem_info_dic = get_subsystem_config_info()
107    for key in subsystem_info_dic.keys():
108        subsystem_rootpath = subsystem_info_dic[key][3]
109        for subsystem_rootpath_item in subsystem_rootpath:
110            if os.path.exists(subsystem_rootpath_item):
111                subsystem_name_list.append(key)
112
113    return subsystem_name_list
114
115
116def get_subsystem_rootpath(subsystem_name):
117    subsystem_path = ""
118    subsystem_rootpath = ""
119    subsystem_info_dic = get_subsystem_config_info()
120    for key in subsystem_info_dic.keys():
121        if key == subsystem_name:
122            subsystem_path = subsystem_info_dic[key][2]
123            subsystem_rootpath = subsystem_info_dic[key][3]
124            break
125
126    return subsystem_path, subsystem_rootpath
127
128
129def is_filterout_dir(ignore_prefix, check_path):
130    filter_out_list = ["unittest", "third_party", "test"]
131    for item in filter_out_list:
132        check_list = check_path[len(ignore_prefix):].split("/")
133        if item in check_list:
134            return True
135
136    return False
137
138
139def rm_unnecessary_dir(cov_path):
140    topdir = os.path.join(cov_path, "obj")
141    for root, dirs, files in os.walk(topdir):
142        if is_filterout_dir(topdir, root):
143            shutil.rmtree(root)
144
145
146def get_files_from_dir(find_path, postfix=None):
147    names = os.listdir(find_path)
148    file_list = []
149    for fn in names:
150        if not os.path.isfile(os.path.join(find_path, fn)):
151            continue
152        if postfix is not None:
153            if fn.endswith(postfix):
154                file_list.append(fn)
155        else:
156            file_list.append(fn)
157
158    return file_list
159
160
161def get_gcno_files(cov_path, dir_name):
162    gcda_strip_path = dir_name[len(cov_path) + 1:]
163    gcda_list = get_files_from_dir(dir_name, ".gcda")
164    for file_name in gcda_list:
165        gcno_name = f"{os.path.splitext(file_name)[0]}.gcno"
166        gcno_path = os.path.join(
167            os.path.join(CODEPATH, OUTPUT), gcda_strip_path, gcno_name
168        )
169        if os.path.exists(gcno_path):
170            shutil.copy(gcno_path, dir_name)
171        else:
172            print(f"{gcno_path} not exists!")
173
174
175def get_module_gcno_files(cov_path, dir_name):
176    for root, dirs, files in os.walk(dir_name):
177        get_gcno_files(cov_path, root)
178
179
180def gen_subsystem_trace_info(subsystem, data_dir, test_dir, lcovrc_path):
181    src_dir = os.path.join(CODEPATH, OUTPUT)
182    single_info_path = os.path.join(
183        CODEPATH, REPORT_PATH, "single_test", test_dir
184    )
185    if not os.path.exists(single_info_path):
186        os.makedirs(single_info_path)
187    output_name = os.path.join(
188        CODEPATH, single_info_path, f"{subsystem}_output.info"
189    )
190    if not os.path.exists(src_dir):
191        print(f"Sours path {src_dir} not exists!")
192        return
193
194    cmd = "lcov -c -b {} -d {} --gcov-tool {} --config-file {} -o {} --ignore-errors source,gcov".format(
195        src_dir, data_dir, os.path.join(CODEPATH, LLVM_GCOV), lcovrc_path, output_name)
196    print("single_test**##father_pid:%s##child_pid:%s cmd:%s config file:%s" % (
197        os.getpid(), os.getppid(), cmd, lcovrc_path
198    ))
199    execute_command(cmd)
200
201
202def cut_info(subsystem, test_dir):
203    trace_file = os.path.join(
204        CODEPATH, REPORT_PATH, "single_test",
205        test_dir, f"{subsystem}_output.info"
206    )
207    output_name = os.path.join(
208        CODEPATH, REPORT_PATH, "single_test",
209        test_dir, f"{subsystem}_strip.info"
210    )
211
212    remove = r"'*/third_party/*' '*/prebuilts/*' '*/unittest/*' " \
213             r"'*/moduletest/*' '*/systemtest/*' '*.h*'"
214    if not os.path.exists(trace_file):
215        print(f"Error: trace file {trace_file} not exists!")
216        return
217
218    cmd = "lcov --remove {} {} -o {}".format(trace_file, remove, output_name)
219    execute_command(cmd)
220
221
222def gen_info(cov_path, test_dir, subsystem_list, lcovrc_path):
223    if len(subsystem_list) == 0:
224        print("Error: get subsystem list failed, can not generate trace info")
225        return
226
227    loop = 0
228    for subsystem in list(set(subsystem_list)):
229        subsystem_path, subsystem_rootpath = get_subsystem_rootpath(subsystem)
230        for subsys_path in subsystem_path:
231            subsystem_data_abspath = os.path.join(cov_path, "obj", subsys_path)
232            # check  id subsystem data is exists
233            if not os.path.exists(subsystem_data_abspath):
234                continue
235
236            # copy gcno to the gcda same directory
237            get_module_gcno_files(cov_path, subsystem_data_abspath)
238
239            # generate coverage info for each subsystem
240            gen_subsystem_trace_info(
241                f"{subsystem}#{subsys_path.replace('/', '_')}#{ str(loop)}",
242                subsystem_data_abspath, test_dir, lcovrc_path
243            )
244
245            # remove some type which useless
246            cut_info(f"{subsystem}#{subsys_path.replace('/', '_')}#{str(loop)}", test_dir)
247
248        loop += 1
249
250
251def generate_coverage_info(single_test_dir_list, lcovrc_path, subsystem_list):
252    cov_path = os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS)
253    for index, cur_test_dir in enumerate(single_test_dir_list):
254        cur_test_abs_dir = os.path.join(cov_path, cur_test_dir)
255        gen_info(cur_test_abs_dir, cur_test_dir, subsystem_list, lcovrc_path)
256
257
258def gen_all_test_info(subsystem_list):
259    cov_path = os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS)
260    print(os.getpid(), os.getppid())
261    single_test_dir_list = []
262    for root, dirs, files in os.walk(cov_path):
263        single_test_dir_list = dirs
264        break
265
266    return single_test_dir_list
267
268
269def merge_subsystem_info_from_all_test(subsystem):
270    single_test_info_path = os.path.join(
271        CODEPATH, REPORT_PATH, "single_test"
272    )
273    subsystem_info_list = []
274    subsystem_info_name = f"{subsystem}_strip.info"
275    for root, dirs, files in os.walk(single_test_info_path):
276        for file in files:
277            if file.startswith(subsystem) and file.endswith("_strip.info"):
278                subsystem_info_path_tmp = os.path.join(
279                    single_test_info_path, root, file
280                )
281                print(f"##{subsystem_info_path_tmp}")
282                subsystem_info_list.append(subsystem_info_path_tmp)
283
284    if len(subsystem_info_list) == 0:
285        return
286
287    info_output_name = os.path.join(
288        CODEPATH, REPORT_PATH, subsystem_info_name
289    )
290    cmd = "lcov -a {} -o {}".format(
291        " -a ".join(subsystem_info_list), info_output_name
292    )
293    execute_command(cmd)
294
295
296def merge_all_test_subsystem_info(subsystem_list):
297    single_test_info_path = os.path.join(
298        CODEPATH, REPORT_PATH, "single_test"
299    )
300    if not os.path.exists(single_test_info_path):
301        print(f"Error: the single test info path "
302              f"{single_test_info_path} not exist")
303        return
304
305    for subsystem in subsystem_list:
306        print(f"Merging all {subsystem} info from test data")
307        merge_subsystem_info_from_all_test(subsystem)
308
309
310def merge_info(report_dir):
311    if not os.path.exists(report_dir):
312        print(f"Error: report dir {report_dir} not exists!")
313        return
314
315    subsystem_name_list = get_files_from_dir(report_dir, "_strip.info")
316    if len(subsystem_name_list) == 0:
317        print("Error: get subsystem trace files in report directory failed.")
318        return
319
320    trace_file_list = []
321    for subsystem in subsystem_name_list:
322        trace_file_name = os.path.join(report_dir, subsystem)
323        trace_file_list.append(trace_file_name)
324
325    cmd = "lcov -a {} -o {}".format(
326        " -a ".join(trace_file_list), os.path.join(report_dir, "ohos_codeCoverage.info")
327    )
328    execute_command(cmd)
329
330
331def merge_all_subsystem_info():
332    print("Merging all the subsystem trace files")
333    merge_info(os.path.join(CODEPATH, REPORT_PATH))
334
335
336def gen_html(cov_path):
337    tracefile = os.path.join(CODEPATH, REPORT_PATH, "ohos_codeCoverage.info")
338    if not os.path.exists(tracefile):
339        print(f"Error: the trace file {tracefile} not exists!")
340        return
341
342    cmd = "genhtml --branch-coverage --demangle-cpp -o {} -p {} --ignore-errors source {}".format(
343        os.path.join(CODEPATH, REPORT_PATH, "html"), CODEPATH, tracefile)
344    execute_command(cmd)
345
346
347def gen_final_report(cov_path):
348    print("Generating the html report")
349    gen_html(cov_path)
350
351
352if __name__ == '__main__':
353    current_path = os.path.abspath(os.path.dirname(__name__))
354    CODEPATH = current_path.split("/test/testfwk/developer_test")[0]
355    # lcovrc配置文件集合
356    LCOVRC_SET = f"{CODEPATH}/test/testfwk/developer_test/local_coverage/code_coverage/coverage_rc"
357    _init_sys_config()
358    from local_coverage.utils import get_product_name
359    # 编译生成的out路径
360    OUTPUT = "out/{}".format(get_product_name(CODEPATH))
361
362    case_list = gen_all_test_info(subsystem_list=get_subsystem_name_list())
363    multiprocessing.set_start_method("fork")  # fork spawn forkserver
364    start = end = 0
365    Tag = False
366    process_list = []
367    for i in range(len(case_list)):
368        lcov_path = f"{LCOVRC_SET}/lcovrc_cov_{str(i)}"
369        print(lcov_path)
370        if os.path.exists(lcov_path):
371            print(f"{lcov_path}{'@' * 20}yes")
372        else:
373            raise Exception("mutilProcess have error -rc path not existed. "
374                            "please fix add run")
375
376        start = end
377        end += STEP_SIZE
378        if end >= len(case_list):
379            end = len(case_list)
380            Tag = True
381
382        p = Process(target=generate_coverage_info,
383                    args=(case_list[start:end], lcov_path,
384                          get_subsystem_name_list()))
385        p.daemon = True
386        p.start()
387        process_list.append(p)
388        if Tag:
389            break
390
391    for i in process_list:
392        i.join()
393
394    merge_all_test_subsystem_info(subsystem_list=get_subsystem_name_list())
395    merge_all_subsystem_info()
396    gen_final_report(os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS))
397