• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import json
20import os
21import shutil
22import time
23from xml.etree import ElementTree
24
25from _core.constants import CaseResult
26from _core.constants import FilePermission
27from _core.logger import platform_logger
28from _core.report.reporter_helper import ReportConstant
29
30__all__ = [
31    "handle_repeat_result", "update_report_xml",
32    "report_not_executed", "get_case_result"]
33
34LOG = platform_logger("Context")
35
36
37def handle_repeat_result(report_xml, report_path, round_folder=""):
38    if not round_folder:
39        return report_xml
40    round_path = os.path.join(report_path, "result", round_folder)
41    if not os.path.exists(round_path):
42        os.makedirs(round_path)
43    LOG.debug("move result file to round folder")
44    target_path = os.path.join(round_path, os.path.basename(report_xml))
45    shutil.move(report_xml, target_path)
46    return target_path
47
48
49def update_report_xml(report_xml, props):
50    """update devices, start time, end time, etc. to the result file"""
51    if not os.path.exists(report_xml) or not props:
52        return
53    try:
54        root = ElementTree.parse(report_xml).getroot()
55    except ElementTree.ParseError as e:
56        LOG.error(f"parse result xml error! xml file {report_xml}")
57        LOG.error(f"error message: {e}")
58        return
59    for k, v in props.items():
60        if k == ReportConstant.devices:
61            v = f"<![CDATA[{json.dumps(v, separators=(',', ':'))}]]>"
62        root.set(k, v)
63    result_fd = os.open(report_xml, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, FilePermission.mode_644)
64    with os.fdopen(result_fd, mode="w", encoding="utf-8") as result_file:
65        result_file.write(ElementTree.tostring(root).decode())
66
67
68def report_not_executed(report_path, test_drivers, error_message, task=None):
69    from _core.utils import check_result_report
70    from _core.utils import get_repeat_round
71    from _core.utils import get_sub_path
72
73    repeat, repeat_round = 1, 1
74    if task is not None:
75        repeat = task.config.repeat
76
77    for test_driver in test_drivers:
78        _, test = test_driver
79        module_name = test.source.module_name
80        test_name = test.source.test_name
81        repeat_round = get_repeat_round(test.unique_id)
82        round_folder = f"round{repeat_round}" if repeat > 1 else ""
83
84        # get report file
85        if task and getattr(task.config, "testdict", ""):
86            report_file = os.path.join(get_sub_path(test.source.source_file), "%s.xml" % test_name)
87        else:
88            report_file = os.path.join(report_path, "result", round_folder, "%s.xml" % module_name)
89
90        # get report name
91        report_name = test_name if not test_name.startswith("{") else "report"
92        # here, normally create empty report and then upload result
93        check_result_report(report_path, report_file, error_message, report_name, module_name,
94                            result_kind=CaseResult.unavailable)
95
96        update_props = {
97            ReportConstant.start_time: time.strftime(ReportConstant.time_format, time.localtime()),
98            ReportConstant.end_time: time.strftime(ReportConstant.time_format, time.localtime()),
99            ReportConstant.repeat: str(repeat),
100            ReportConstant.round: str(repeat_round),
101            ReportConstant.test_type: test.source.test_type
102        }
103        update_report_xml(report_file, update_props)
104
105
106def get_case_result(ele_case):
107    error_msg = ele_case.get(ReportConstant.message, "")
108    result_kind = ele_case.get(ReportConstant.result_kind, "")
109    if result_kind != "":
110        return result_kind, error_msg
111    result = ele_case.get(ReportConstant.result, "")
112    status = ele_case.get(ReportConstant.status, "")
113    # 适配HCPTest的测试结果,其用例失败时,会在testcase下新建failure节点,存放错误信息
114    if len(ele_case) > 0:
115        error_msg = "\n\n".join([failure.get(ReportConstant.message, "") for failure in ele_case])
116        return CaseResult.failed, error_msg
117    if result == ReportConstant.false and (status == ReportConstant.run or status == ""):
118        return CaseResult.failed, error_msg
119    if status in [ReportConstant.blocked, ReportConstant.disable, ReportConstant.error]:
120        return CaseResult.blocked, error_msg
121    if status in [ReportConstant.skip, ReportConstant.not_run]:
122        return CaseResult.ignored, error_msg
123    if status in [ReportConstant.unavailable]:
124        return CaseResult.unavailable, error_msg
125    return CaseResult.passed, ""
126