• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import json
21import shutil
22import shlex
23import subprocess
24import multiprocessing
25import sys
26from multiprocessing import Process
27
28# 子系统json目录
29SYSTEM_JSON = "test/testfwk/developer_test/localCoverage/codeCoverage/subsystem_config.json"
30# 覆盖率gcda
31COVERAGE_GCDA_RESULTS = "test/testfwk/developer_test/localCoverage/codeCoverage/results/coverage/data/cxx"
32# 报告路径
33REPORT_PATH = "test/testfwk/developer_test/localCoverage/codeCoverage/results/coverage/reports/cxx"
34# llvm-gcov.sh
35LLVM_GCOV = "test/testfwk/developer_test/localCoverage/codeCoverage/llvm-gcov.sh"
36# 测试套划分步长
37STEP_SIZE = 10
38
39
40def _init_sys_config():
41    sys.localcoverage_path = os.path.join(current_path, "..")
42    sys.path.insert(0, sys.localcoverage_path)
43
44
45def call(cmd_list, is_show_cmd=False, out=None, err=None):
46    return_flag = False
47    try:
48        if is_show_cmd:
49            print("execute command: {}".format(" ".join(cmd_list)))
50        if 0 == subprocess.call(cmd_list, shell=False, stdout=out, stderr=err):
51            return_flag = True
52    except IOError:
53        print("Error : command {} execute faild!".format(cmd_list))
54        return_flag = False
55
56    return return_flag
57
58
59def execute_command(command, printflag=False):
60    try:
61        cmd_list = shlex.split(command)
62        coverage_log_path = os.path.join(
63            CODEPATH, "test/testfwk/developer_test/localCoverage", "coverage.log")
64        with open(coverage_log_path, 'a') as fd:
65            call(cmd_list, printflag, fd, fd)
66    except IOError:
67        print("Error: Exception occur in open error")
68
69
70def get_subsystem_config_info():
71    filter_subsystem_name_list = [
72        "subsystem_examples",
73    ]
74    subsystem_info_dic = {}
75    subsystem_config_filepath = os.path.join(CODEPATH, SYSTEM_JSON)
76    if os.path.exists(subsystem_config_filepath):
77        data = None
78        with open(subsystem_config_filepath, "r", encoding="utf-8") as f:
79            data = json.load(f)
80        if not data:
81            print("subsystem config file error.")
82        for value in data.values():
83            subsystem_name = value.get("name")
84            if subsystem_name in filter_subsystem_name_list:
85                continue
86            subsystem_dir = value.get('dir')
87            subsystem_path = value.get('path')
88            subsystem_project = value.get('project')
89            subsystem_rootpath = []
90            for path in subsystem_path:
91                subsystem_rootpath.append(os.path.join(CODEPATH, path))
92            subsystem_info_dic[subsystem_name] = [
93                subsystem_project, subsystem_dir,
94                subsystem_path, subsystem_rootpath
95            ]
96    return subsystem_info_dic
97
98
99def get_subsystem_name_list():
100    subsystem_name_list = []
101    subsystem_info_dic = get_subsystem_config_info()
102    for key in subsystem_info_dic.keys():
103        subsystem_rootpath = subsystem_info_dic[key][3]
104        for subsystem_rootpath_item in subsystem_rootpath:
105            if os.path.exists(subsystem_rootpath_item):
106                subsystem_name_list.append(key)
107
108    return subsystem_name_list
109
110
111def get_subsystem_rootpath(subsystem_name):
112    subsystem_path = ""
113    subsystem_rootpath = ""
114    subsystem_info_dic = get_subsystem_config_info()
115    for key in subsystem_info_dic.keys():
116        if key == subsystem_name:
117            subsystem_path = subsystem_info_dic[key][2]
118            subsystem_rootpath = subsystem_info_dic[key][3]
119            break
120
121    return subsystem_path, subsystem_rootpath
122
123
124def is_filterout_dir(ignore_prefix, check_path):
125    filter_out_list = ["unittest", "third_party", "test"]
126    for item in filter_out_list:
127        check_list = check_path[len(ignore_prefix):].split("/")
128        if item in check_list:
129            return True
130
131    return False
132
133
134def rm_unnecessary_dir(cov_path):
135    topdir = os.path.join(cov_path, "obj")
136    for root, dirs, files in os.walk(topdir):
137        if is_filterout_dir(topdir, root):
138            shutil.rmtree(root)
139
140
141def get_files_from_dir(find_path, postfix=None):
142    names = os.listdir(find_path)
143    file_list = []
144    for fn in names:
145        if not os.path.isfile(os.path.join(find_path, fn)):
146            continue
147        if postfix is not None:
148            if fn.endswith(postfix):
149                file_list.append(fn)
150        else:
151            file_list.append(fn)
152
153    return file_list
154
155
156def get_gcno_files(cov_path, dir_name):
157    gcda_strip_path = dir_name[len(cov_path) + 1:]
158    gcda_list = get_files_from_dir(dir_name, ".gcda")
159    for file_name in gcda_list:
160        gcno_name = f"{os.path.splitext(file_name)[0]}.gcno"
161        gcno_path = os.path.join(
162            os.path.join(CODEPATH, OUTPUT), gcda_strip_path, gcno_name
163        )
164        if os.path.exists(gcno_path):
165            shutil.copy(gcno_path, dir_name)
166        else:
167            print(f"{gcno_path} not exists!")
168
169
170def get_module_gcno_files(cov_path, dir_name):
171    for root, dirs, files in os.walk(dir_name):
172        get_gcno_files(cov_path, root)
173
174
175def gen_subsystem_trace_info(subsystem, data_dir, test_dir, lcovrc_path):
176    src_dir = os.path.join(CODEPATH, OUTPUT)
177    single_info_path = os.path.join(
178        CODEPATH, REPORT_PATH, "single_test", test_dir
179    )
180    if not os.path.exists(single_info_path):
181        os.makedirs(single_info_path)
182    output_name = os.path.join(
183        CODEPATH, single_info_path, f"{subsystem}_output.info"
184    )
185    if not os.path.exists(src_dir):
186        print(f"Sours path {src_dir} not exists!")
187        return
188
189    cmd = "lcov -c -b {} -d {} --gcov-tool {} --config-file {} -o {} --ignore-errors source,gcov".format(
190        src_dir, data_dir, os.path.join(CODEPATH, LLVM_GCOV), lcovrc_path, output_name)
191    print("single_test**##father_pid:%s##child_pid:%s cmd:%s config file:%s" % (
192        os.getpid(), os.getppid(), cmd, lcovrc_path
193    ))
194    execute_command(cmd)
195
196
197def cut_info(subsystem, test_dir):
198    trace_file = os.path.join(
199        CODEPATH, REPORT_PATH, "single_test",
200        test_dir,  f"{subsystem}_output.info"
201    )
202    output_name = os.path.join(
203        CODEPATH, REPORT_PATH, "single_test",
204        test_dir, f"{subsystem}_strip.info"
205    )
206
207    remove = r"'*/third_party/*' 'sdk/android-arm64/*'"
208    if not os.path.exists(trace_file):
209        print(f"Error: trace file {trace_file} not exists!")
210        return
211
212    cmd = "lcov --remove {} {} -o {}".format(trace_file, remove, output_name)
213    execute_command(cmd)
214
215
216def gen_info(cov_path, test_dir, subsystem_list, lcovrc_path):
217    if len(subsystem_list) == 0:
218        print("Error: get subsystem list failed, can not generate trace info")
219        return
220
221    loop = 0
222    for subsystem in list(set(subsystem_list)):
223        subsystem_path, subsystem_rootpath = get_subsystem_rootpath(subsystem)
224        for subsys_path in subsystem_path:
225            subsystem_data_abspath = os.path.join(cov_path, "obj", subsys_path)
226            # check  id subsystem data is exists
227            if not os.path.exists(subsystem_data_abspath):
228                continue
229
230            # copy gcno to the gcda same directory
231            get_module_gcno_files(cov_path, subsystem_data_abspath)
232
233            # generate coverage info for each subsystem
234            gen_subsystem_trace_info(
235                f"{subsystem}#{subsys_path.replace('/', '_')}#{ str(loop)}",
236                subsystem_data_abspath, test_dir, lcovrc_path
237            )
238
239            # remove some type which useless
240            cut_info(f"{subsystem}#{subsys_path.replace('/', '_')}#{str(loop)}", test_dir)
241
242        loop += 1
243
244
245def generate_coverage_info(single_test_dir_list, lcovrc_path, subsystem_list):
246    cov_path = os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS)
247    for index, cur_test_dir in enumerate(single_test_dir_list):
248        cur_test_abs_dir = os.path.join(cov_path, cur_test_dir)
249        gen_info(cur_test_abs_dir, cur_test_dir, subsystem_list, lcovrc_path)
250
251
252def gen_all_test_info(subsystem_list):
253    cov_path = os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS)
254    print(os.getpid(), os.getppid())
255    single_test_dir_list = []
256    for root, dirs, files in os.walk(cov_path):
257        single_test_dir_list = dirs
258        break
259
260    return single_test_dir_list
261
262
263def merge_subsystem_info_from_all_test(subsystem):
264    single_test_info_path = os.path.join(
265        CODEPATH, REPORT_PATH, "single_test"
266    )
267    subsystem_info_list = []
268    subsystem_info_name = f"{subsystem}_strip.info"
269    for root, dirs, files in os.walk(single_test_info_path):
270        for file in files:
271            if file.startswith(subsystem) and file.endswith("_strip.info"):
272                subsystem_info_path_tmp = os.path.join(
273                    single_test_info_path, root, file
274                )
275                print(f"##{subsystem_info_path_tmp}")
276                subsystem_info_list.append(subsystem_info_path_tmp)
277
278    if len(subsystem_info_list) == 0:
279        return
280
281    info_output_name = os.path.join(
282        CODEPATH, REPORT_PATH, subsystem_info_name
283    )
284    cmd = "lcov -a {} -o {}".format(
285        " -a ".join(subsystem_info_list), info_output_name
286    )
287    execute_command(cmd)
288
289
290def merge_all_test_subsystem_info(subsystem_list):
291    single_test_info_path = os.path.join(
292        CODEPATH, REPORT_PATH, "single_test"
293    )
294    if not os.path.exists(single_test_info_path):
295        print(f"Error: the single test info path "
296              f"{single_test_info_path} not exist")
297        return
298
299    for subsystem in subsystem_list:
300        print(f"Merging all {subsystem} info from test data")
301        merge_subsystem_info_from_all_test(subsystem)
302
303
304def merge_info(report_dir):
305    if not os.path.exists(report_dir):
306        print(f"Error: report dir {report_dir} not exists!")
307        return
308
309    subsystem_name_list = get_files_from_dir(report_dir, "_strip.info")
310    if len(subsystem_name_list) == 0:
311        print("Error: get subsystem trace files in report directory failed.")
312        return
313
314    trace_file_list = []
315    for subsystem in subsystem_name_list:
316        trace_file_name = os.path.join(report_dir, subsystem)
317        trace_file_list.append(trace_file_name)
318
319    cmd = "lcov -a {} -o {}".format(
320        " -a ".join(trace_file_list), os.path.join(report_dir, "ohos_codeCoverage.info")
321    )
322    execute_command(cmd)
323
324
325def merge_all_subsystem_info():
326    print("Merging all the subsystem trace files")
327    merge_info(os.path.join(CODEPATH, REPORT_PATH))
328
329
330def gen_html(cov_path):
331    tracefile = os.path.join(CODEPATH, REPORT_PATH, "ohos_codeCoverage.info")
332    if not os.path.exists(tracefile):
333        print(f"Error: the trace file {tracefile} not exists!")
334        return
335
336    cmd = "genhtml --branch-coverage --demangle-cpp -o {} -p {} --ignore-errors source {}".format(
337        os.path.join(CODEPATH, REPORT_PATH, "html"), CODEPATH, tracefile)
338    execute_command(cmd)
339
340
341def gen_final_report(cov_path):
342    print("Generating the html report")
343    gen_html(cov_path)
344
345
346if __name__ == '__main__':
347    current_path = os.path.abspath(os.path.dirname(__name__))
348    CODEPATH = current_path.split("/test/testfwk/developer_test")[0]
349    # lcovrc配置文件集合
350    LCOVRC_SET = f"{CODEPATH}/test/testfwk/developer_test/localCoverage/codeCoverage/coverage_rc"
351    _init_sys_config()
352    from localCoverage.utils import get_product_name
353    # 编译生成的out路径
354    OUTPUT = "out/{}".format(get_product_name(CODEPATH))
355
356    case_list = gen_all_test_info(subsystem_list=get_subsystem_name_list())
357    multiprocessing.set_start_method("fork")  # fork spawn forkserver
358    start = end = 0
359    Tag = False
360    process_list = []
361    for i in range(len(case_list)):
362        lcov_path = f"{LCOVRC_SET}/lcovrc_cov_{str(i)}"
363        print(lcov_path)
364        if os.path.exists(lcov_path):
365            print(f"{lcov_path}{'@' * 20}yes")
366        else:
367            raise Exception("mutilProcess have error -rc path not existed. "
368                            "please fix add run")
369
370        start = end
371        end += STEP_SIZE
372        if end >= len(case_list):
373            end = len(case_list)
374            Tag = True
375
376        p = Process(target=generate_coverage_info,
377                    args=(case_list[start:end], lcov_path,
378                          get_subsystem_name_list()))
379        p.daemon = True
380        p.start()
381        process_list.append(p)
382        if Tag:
383            break
384
385    for i in process_list:
386        i.join()
387
388    merge_all_test_subsystem_info(subsystem_list=get_subsystem_name_list())
389    merge_all_subsystem_info()
390    gen_final_report(os.path.join(CODEPATH, COVERAGE_GCDA_RESULTS))
391