• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import json
21import shutil
22import shlex
23import subprocess
24import multiprocessing
25import sys
26import stat
27
28from multiprocessing import Process
29
30FLAGS = os.O_WRONLY | os.O_APPEND | os.O_CREAT
31MODES = stat.S_IWUSR | stat.S_IRUSR
32
33# 子系统json目录
34SYSTEM_JSON = "test/testfwk/developer_test/local_coverage/code_coverage/subsystem_config.json"
35# 覆盖率gcda
36COVERAGE_GCDA_RESULTS = "test/testfwk/developer_test/local_coverage/code_coverage/results/coverage/data/cxx"
37# 报告路径
38REPORT_PATH = "test/testfwk/developer_test/local_coverage/code_coverage/results/coverage/reports/cxx"
39# llvm-gcov.sh
40LLVM_GCOV = "test/testfwk/developer_test/local_coverage/code_coverage/llvm-gcov.sh"
41# 测试套划分步长
42STEP_SIZE = 10
43
44
45def _init_sys_config():
46    sys.localcoverage_path = os.path.join(current_path, "..")
47    sys.path.insert(0, sys.localcoverage_path)
48
49
50def call(cmd_list, is_show_cmd=False, out=None, err=None):
51    return_flag = False
52    try:
53        if is_show_cmd:
54            print("execute command: {}".format(" ".join(cmd_list)))
55        if 0 == subprocess.call(cmd_list, shell=False, stdout=out, stderr=err):
56            return_flag = True
57    except IOError:
58        print("Error : command {} execute faild!".format(cmd_list))
59        return_flag = False
60
61    return return_flag
62
63
64def execute_command(command, printflag=False):
65    try:
66        cmd_list = shlex.split(command)
67        coverage_log_path = os.path.join(
68            CODEPATH, "test/testfwk/developer_test/local_coverage", "coverage.log")
69        with os.fdopen(os.open(coverage_log_path, FLAGS, MODES), 'a') as fd:
70            call(cmd_list, printflag, fd, fd)
71    except IOError:
72        print("Error: Exception occur in open error")
73
74
75def get_subsystem_config_info():
76    filter_subsystem_name_list = [
77        "subsystem_examples",
78    ]
79    subsystem_info_dic = {}
80    subsystem_config_filepath = os.path.join(CODEPATH, SYSTEM_JSON)
81    if os.path.exists(subsystem_config_filepath):
82        data = None
83        with open(subsystem_config_filepath, "r", encoding="utf-8") as f:
84            data = json.load(f)
85        if not data:
86            print("subsystem config file error.")
87        for value in data.values():
88            subsystem_name = value.get("name")
89            if subsystem_name in filter_subsystem_name_list:
90                continue
91            subsystem_dir = value.get('dir')
92            subsystem_path = value.get('path')
93            subsystem_project = value.get('project')
94            subsystem_rootpath = []
95            for path in subsystem_path:
96                subsystem_rootpath.append(os.path.join(CODEPATH, path))
97            subsystem_info_dic[subsystem_name] = [
98                subsystem_project, subsystem_dir,
99                subsystem_path, subsystem_rootpath
100            ]
101    return subsystem_info_dic
102
103
104def get_subsystem_name_list():
105    subsystem_name_list = []
106    subsystem_info_dic = get_subsystem_config_info()
107    for key in subsystem_info_dic.keys():
108        subsystem_rootpath = subsystem_info_dic[key][3]
109        for subsystem_rootpath_item in subsystem_rootpath:
110            if os.path.exists(subsystem_rootpath_item):
111                subsystem_name_list.append(key)
112
113    return subsystem_name_list
114
115
116def get_subsystem_rootpath(subsystem_name):
117    subsystem_path = ""
118    subsystem_rootpath = ""
119    subsystem_info_dic = get_subsystem_config_info()
120    for key in subsystem_info_dic.keys():
121        if key == subsystem_name:
122            subsystem_path = subsystem_info_dic[key][2]
123            subsystem_rootpath = subsystem_info_dic[key][3]
124            break
125
126    return subsystem_path, subsystem_rootpath
127
128
129def is_filterout_dir(ignore_prefix, check_path):
130    filter_out_list = ["unittest", "third_party", "test"]
131    for item in filter_out_list:
132        check_list = check_path[len(ignore_prefix):].split("/")
133        if item in check_list:
134            return True
135
136    return False
137
138
139def rm_unnecessary_dir(cov_path):
140    topdir = os.path.join(cov_path, "obj")
141    for root, dirs, files in os.walk(topdir):
142        if is_filterout_dir(topdir, root):
143            shutil.rmtree(root)
144
145
146def get_files_from_dir(find_path, postfix=None):
147    names = os.listdir(find_path)
148    file_list = []
149    for fn in names:
150        if not os.path.isfile(os.path.join(find_path, fn)):
151            continue
152        if postfix is not None:
153            if fn.endswith(postfix):
154                file_list.append(fn)
155        else:
156            file_list.append(fn)
157
158    return file_list
159
160
161def get_gcno_files(cov_path, dir_name):
162    gcda_strip_path = dir_name[len(cov_path) + 1:]
163    gcda_list = get_files_from_dir(dir_name, ".gcda")
164    for file_name in gcda_list:
165        gcno_name = f"{os.path.splitext(file_name)[0]}.gcno"
166        gcno_path = os.path.join(
167            os.path.join(CODEPATH, OUTPUT), gcda_strip_path, gcno_name
168        )
169        if os.path.exists(gcno_path):
170            shutil.copy(gcno_path, dir_name)
171        else:
172            print(f"{gcno_path} not exists!")
173
174
175def get_module_gcno_files(cov_path, dir_name):
176    for root, dirs, files in os.walk(dir_name):
177        get_gcno_files(cov_path, root)
178
179
180def gen_subsystem_trace_info(subsystem, data_dir, test_dir, lcovrc_path):
181    src_dir = os.path.join(CODEPATH, OUTPUT)
182    single_info_path = os.path.join(
183        CODEPATH, REPORT_PATH, "single_test", test_dir
184    )
185    if not os.path.exists(single_info_path):
186        os.makedirs(single_info_path)
187    output_name = os.path.join(
188        CODEPATH, single_info_path, f"{subsystem}_output.info"
189    )
190    if not os.path.exists(src_dir):
191        print(f"Sours path {src_dir} not exists!")
192        return
193
194    cmd = "lcov -c -b {} -d {} --gcov-tool {} --config-file {} -o {} --ignore-errors source,gcov".format(
195        src_dir, data_dir, os.path.join(CODEPATH, LLVM_GCOV), lcovrc_path, output_name)
196    print("single_test**##father_pid:%s##child_pid:%s cmd:%s config file:%s" % (
197        os.getpid(), os.getppid(), cmd, lcovrc_path
198    ))
199    execute_command(cmd)
200
201
202def cut_info(subsystem, test_dir):
203    trace_file = os.path.join(
204        CODEPATH, REPORT_PATH, "single_test",
205        test_dir, f"{subsystem}_output.info"
206    )
207    output_name = os.path.join(
208        CODEPATH, REPORT_PATH, "single_test",
209        test_dir, f"{subsystem}_strip.info"
210    )
211
212    remove = r"'*/third_party/*' '*/prebuilts/*' '*/unittest/*' " \
213             r"'*/moduletest/*' '*/systemtest/*' '*.h*'"
214    if not os.path.exists(trace_file):
215        print(f"Error: trace file {trace_file} not exists!")
216        return
217
218    cmd = "lcov --remove {} {} -o {}".format(trace_file, remove, output_name)
219    execute_command(cmd)
220    delete_empty_info_file(output_name)
221
222
223def delete_empty_info_file(filename):
224    if os.path.exists(filename) and os.stat(filename).st_size == 0:
225        print(f"empty file {filename} deleted")
226        os.remove(filename)
227
228
229def gen_info(cov_path, test_dir, subsystem_list, lcovrc_path):
230    if len(subsystem_list) == 0:
231        print("Error: get subsystem list failed, can not generate trace info")
232        return
233
234    loop = 0
235    for subsystem in list(set(subsystem_list)):
236        subsystem_path, subsystem_rootpath = get_subsystem_rootpath(subsystem)
237        for subsys_path in subsystem_path:
238            subsystem_data_abspath = os.path.join(cov_path, "obj", subsys_path)
239            # check  id subsystem data is exists
240            if not os.path.exists(subsystem_data_abspath):
241                continue
242
243            # copy gcno to the gcda same directory
244            get_module_gcno_files(cov_path, subsystem_data_abspath)
245
246            # generate coverage info for each subsystem
247            gen_subsystem_trace_info(
248                f"{subsystem}#{subsys_path.replace('/', '_')}#{ str(loop)}",
249                subsystem_data_abspath, test_dir, lcovrc_path
250            )
251
252            # remove some type which useless
253            cut_info(f"{subsystem}#{subsys_path.replace('/', '_')}#{str(loop)}", test_dir)
254
255        loop += 1
256
257
258def generate_coverage_info(single_test_dir_list, lcovrc_path, subsystem_list):
259    cov_path = os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS)
260    for index, cur_test_dir in enumerate(single_test_dir_list):
261        cur_test_abs_dir = os.path.join(cov_path, cur_test_dir)
262        gen_info(cur_test_abs_dir, cur_test_dir, subsystem_list, lcovrc_path)
263
264
265def gen_all_test_info(subsystem_list):
266    cov_path = os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS)
267    print(os.getpid(), os.getppid())
268    single_test_dir_list = []
269    for root, dirs, files in os.walk(cov_path):
270        single_test_dir_list = dirs
271        break
272
273    return single_test_dir_list
274
275
276def merge_subsystem_info_from_all_test(subsystem):
277    single_test_info_path = os.path.join(
278        CODEPATH, REPORT_PATH, "single_test"
279    )
280    subsystem_info_list = []
281    subsystem_info_name = f"{subsystem}_strip.info"
282    for root, dirs, files in os.walk(single_test_info_path):
283        for file in files:
284            if file.startswith(subsystem) and file.endswith("_strip.info"):
285                subsystem_info_path_tmp = os.path.join(
286                    single_test_info_path, root, file
287                )
288                print(f"##{subsystem_info_path_tmp}")
289                subsystem_info_list.append(subsystem_info_path_tmp)
290
291    if len(subsystem_info_list) == 0:
292        return
293
294    info_output_name = os.path.join(
295        CODEPATH, REPORT_PATH, subsystem_info_name
296    )
297    cmd = "lcov -a {} -o {}".format(
298        " -a ".join(subsystem_info_list), info_output_name
299    )
300    execute_command(cmd)
301
302
303def merge_all_test_subsystem_info(subsystem_list):
304    single_test_info_path = os.path.join(
305        CODEPATH, REPORT_PATH, "single_test"
306    )
307    if not os.path.exists(single_test_info_path):
308        print(f"Error: the single test info path "
309              f"{single_test_info_path} not exist")
310        return
311
312    for subsystem in subsystem_list:
313        print(f"Merging all {subsystem} info from test data")
314        merge_subsystem_info_from_all_test(subsystem)
315
316
317def merge_info(report_dir):
318    if not os.path.exists(report_dir):
319        print(f"Error: report dir {report_dir} not exists!")
320        return
321
322    subsystem_name_list = get_files_from_dir(report_dir, "_strip.info")
323    if len(subsystem_name_list) == 0:
324        print("Error: get subsystem trace files in report directory failed.")
325        return
326
327    trace_file_list = []
328    for subsystem in subsystem_name_list:
329        trace_file_name = os.path.join(report_dir, subsystem)
330        trace_file_list.append(trace_file_name)
331
332    cmd = "lcov -a {} -o {}".format(
333        " -a ".join(trace_file_list), os.path.join(report_dir, "ohos_codeCoverage.info")
334    )
335    execute_command(cmd)
336
337
338def merge_all_subsystem_info():
339    print("Merging all the subsystem trace files")
340    merge_info(os.path.join(CODEPATH, REPORT_PATH))
341
342
343def gen_html(cov_path):
344    tracefile = os.path.join(CODEPATH, REPORT_PATH, "ohos_codeCoverage.info")
345    if not os.path.exists(tracefile):
346        print(f"Error: the trace file {tracefile} not exists!")
347        return
348
349    cmd = "genhtml --branch-coverage --demangle-cpp -o {} -p {} --ignore-errors source {}".format(
350        os.path.join(CODEPATH, REPORT_PATH, "html"), CODEPATH, tracefile)
351    execute_command(cmd)
352
353
354def gen_final_report(cov_path):
355    print("Generating the html report")
356    gen_html(cov_path)
357
358
359if __name__ == '__main__':
360    current_path = os.path.abspath(os.path.dirname(__name__))
361    CODEPATH = current_path.split("/test/testfwk/developer_test")[0]
362    # lcovrc配置文件集合
363    LCOVRC_SET = f"{CODEPATH}/test/testfwk/developer_test/local_coverage/code_coverage/coverage_rc"
364    _init_sys_config()
365    from local_coverage.utils import get_product_name
366    # 编译生成的out路径
367    OUTPUT = "out/{}".format(get_product_name(CODEPATH))
368
369    case_list = gen_all_test_info(subsystem_list=get_subsystem_name_list())
370    multiprocessing.set_start_method("fork")  # fork spawn forkserver
371    start = end = 0
372    Tag = False
373    process_list = []
374    for i in range(len(case_list)):
375        lcov_path = f"{LCOVRC_SET}/lcovrc_cov_{str(i)}"
376        print(lcov_path)
377        if os.path.exists(lcov_path):
378            print(f"{lcov_path}{'@' * 20}yes")
379        else:
380            raise Exception("mutilProcess have error -rc path not existed. "
381                            "please fix add run")
382
383        start = end
384        end += STEP_SIZE
385        if end >= len(case_list):
386            end = len(case_list)
387            Tag = True
388
389        p = Process(target=generate_coverage_info,
390                    args=(case_list[start:end], lcov_path,
391                          get_subsystem_name_list()))
392        p.daemon = True
393        p.start()
394        process_list.append(p)
395        if Tag:
396            break
397
398    for i in process_list:
399        i.join()
400
401    merge_all_test_subsystem_info(subsystem_list=get_subsystem_name_list())
402    merge_all_subsystem_info()
403    gen_final_report(os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS))
404