1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21import shutil 22import time 23from xml.etree import ElementTree 24 25from _core.constants import CaseResult 26from _core.constants import FilePermission 27from _core.logger import platform_logger 28from _core.report.reporter_helper import ReportConstant 29 30__all__ = [ 31 "handle_repeat_result", "update_report_xml", 32 "report_not_executed", "get_case_result"] 33 34LOG = platform_logger("Context") 35 36 37def handle_repeat_result(report_xml, report_path, round_folder=""): 38 if not round_folder: 39 return report_xml 40 round_path = os.path.join(report_path, "result", round_folder) 41 if not os.path.exists(round_path): 42 os.makedirs(round_path) 43 LOG.debug("move result file to round folder") 44 target_path = os.path.join(round_path, os.path.basename(report_xml)) 45 shutil.move(report_xml, target_path) 46 return target_path 47 48 49def update_report_xml(report_xml, props): 50 """update devices, start time, end time, etc. to the result file""" 51 if not os.path.exists(report_xml) or not props: 52 return 53 try: 54 root = ElementTree.parse(report_xml).getroot() 55 except ElementTree.ParseError as e: 56 LOG.error(f"parse result xml error! xml file {report_xml}") 57 LOG.error(f"error message: {e}") 58 return 59 for k, v in props.items(): 60 if k == ReportConstant.devices: 61 v = f"<![CDATA[{json.dumps(v, separators=(',', ':'))}]]>" 62 root.set(k, v) 63 result_fd = os.open(report_xml, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, FilePermission.mode_644) 64 with os.fdopen(result_fd, mode="w", encoding="utf-8") as result_file: 65 result_file.write(ElementTree.tostring(root).decode()) 66 67 68def report_not_executed(report_path, test_drivers, error_message, task=None): 69 from _core.utils import check_result_report 70 from _core.utils import get_repeat_round 71 from _core.utils import get_sub_path 72 73 repeat, repeat_round = 1, 1 74 if task is not None: 75 repeat = task.config.repeat 76 77 for test_driver in test_drivers: 78 _, test = test_driver 79 module_name = test.source.module_name 80 test_name = test.source.test_name 81 repeat_round = get_repeat_round(test.unique_id) 82 round_folder = f"round{repeat_round}" if repeat > 1 else "" 83 84 # get report file 85 if task and getattr(task.config, "testdict", ""): 86 report_file = os.path.join(get_sub_path(test.source.source_file), "%s.xml" % test_name) 87 else: 88 report_file = os.path.join(report_path, "result", round_folder, "%s.xml" % module_name) 89 90 # get report name 91 report_name = test_name if not test_name.startswith("{") else "report" 92 # here, normally create empty report and then upload result 93 check_result_report(report_path, report_file, error_message, report_name, module_name, 94 result_kind=CaseResult.unavailable) 95 96 update_props = { 97 ReportConstant.start_time: time.strftime(ReportConstant.time_format, time.localtime()), 98 ReportConstant.end_time: time.strftime(ReportConstant.time_format, time.localtime()), 99 ReportConstant.repeat: str(repeat), 100 ReportConstant.round: str(repeat_round), 101 ReportConstant.test_type: test.source.test_type 102 } 103 update_report_xml(report_file, update_props) 104 105 106def get_case_result(ele_case): 107 error_msg = ele_case.get(ReportConstant.message, "") 108 result_kind = ele_case.get(ReportConstant.result_kind, "") 109 if result_kind != "": 110 return result_kind, error_msg 111 result = ele_case.get(ReportConstant.result, "") 112 status = ele_case.get(ReportConstant.status, "") 113 # 适配HCPTest的测试结果,其用例失败时,会在testcase下新建failure节点,存放错误信息 114 if len(ele_case) > 0: 115 error_msg = "\n\n".join([failure.get(ReportConstant.message, "") for failure in ele_case]) 116 return CaseResult.failed, error_msg 117 if result == ReportConstant.false and (status == ReportConstant.run or status == ""): 118 return CaseResult.failed, error_msg 119 if status in [ReportConstant.blocked, ReportConstant.disable, ReportConstant.error]: 120 return CaseResult.blocked, error_msg 121 if status in [ReportConstant.skip, ReportConstant.not_run]: 122 return CaseResult.ignored, error_msg 123 if status in [ReportConstant.unavailable]: 124 return CaseResult.unavailable, error_msg 125 return CaseResult.passed, "" 126