1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import time 21from enum import Enum 22from threading import RLock 23 24from _core.constants import ModeType 25from _core.logger import platform_logger 26from _core.report.encrypt import check_pub_key_exist 27from _core.report.reporter_helper import DataHelper 28from _core.report.reporter_helper import ReportConstant 29 30LOG = platform_logger("SuiteReporter") 31SUITE_REPORTER_LOCK = RLock() 32 33 34class ResultCode(Enum): 35 UNKNOWN = -1010 36 BLOCKED = -1 37 PASSED = 0 38 FAILED = 1 39 SKIPPED = 2 40 41 42class SuiteReporter: 43 suite_list = [] 44 suite_report_result = [] 45 failed_case_list = [] 46 history_report_result = [] 47 48 def __init__(self, results, report_name, report_path=None, **kwargs): 49 """ 50 create suite report 51 :param results: [(suite_result, [case_results]), 52 (suite_result, [case_results]), ...] 53 :param report_name: suite report name 54 :param report_path: suite report path 55 """ 56 self.results = results 57 self.data_helper = DataHelper() 58 self.report_name = report_name 59 self.report_path = report_path 60 self.suite_data_path = os.path.join( 61 self.report_path, "%s%s" % ( 62 report_name, self.data_helper.DATA_REPORT_SUFFIX)) 63 self.args = kwargs 64 from xdevice import Scheduler 65 if not check_pub_key_exist() and Scheduler.mode != ModeType.decc: 66 SuiteReporter.suite_report_result.clear() 67 68 def create_empty_report(self): 69 # create empty data report only for single suite 70 if len(self.results) != 1: 71 LOG.error("can only create one empty data report once") 72 return 73 suite_result, _ = self.results[0] 74 75 # initial test suites element 76 test_suites_element, test_suites_attributes, _ = \ 77 self._initial_test_suites() 78 test_suites_attributes[ReportConstant.unavailable] = 1 79 self.data_helper.set_element_attributes(test_suites_element, 80 test_suites_attributes) 81 82 # initial test suite element 83 test_suite_element, test_suite_attributes = self._initial_test_suite( 84 suite_result) 85 test_suite_element.text, test_suite_element.tail = \ 86 "", self.data_helper.LINE_BREAK 87 test_suite_attributes[ReportConstant.unavailable] = 1 88 test_suite_attributes[ReportConstant.message] = suite_result.stacktrace 89 90 from xdevice import Scheduler 91 if Scheduler.mode == ModeType.decc: 92 test_suite_attributes[ReportConstant.result] = ReportConstant.false 93 self.data_helper.set_element_attributes(test_suite_element, 94 test_suite_attributes) 95 96 # append test suite element 97 test_suites_element.append(test_suite_element) 98 99 # generate report 100 if test_suites_element: 101 from xdevice import Scheduler 102 if Scheduler.mode != ModeType.decc: 103 self.data_helper.generate_report(test_suites_element, 104 self.suite_data_path) 105 SuiteReporter.append_report_result(( 106 self.suite_data_path, self.data_helper.to_string( 107 test_suites_element))) 108 109 def generate_data_report(self): 110 # construct test suites element 111 test_suites_element = self._construct_test_suites() 112 113 # generate report 114 if test_suites_element: 115 self.data_helper.generate_report(test_suites_element, 116 self.suite_data_path) 117 SuiteReporter.append_report_result(( 118 self.suite_data_path, self.data_helper.to_string( 119 test_suites_element))) 120 121 def _construct_test_suites(self): 122 # initial test suites element 123 test_suites_element, test_suites_attributes, need_update_attributes = \ 124 self._initial_test_suites() 125 126 # construct test suite element 127 for suite_result, case_results in self.results: 128 test_suite_element, test_suite_attributes = \ 129 self._construct_test_suite(suite_result, case_results) 130 131 # add test suite element 132 test_suites_element.append(test_suite_element) 133 134 # update and set test suites element attributes 135 for need_update_attribute in need_update_attributes: 136 test_suites_attributes[need_update_attribute] += \ 137 test_suite_attributes.get(need_update_attribute, 0) 138 test_suites_attributes[ReportConstant.time] = \ 139 round(test_suites_attributes[ReportConstant.time], 3) 140 141 if test_suites_element: 142 test_suite_element = test_suites_element[-1] 143 test_suite_element.tail = self.data_helper.LINE_BREAK 144 else: 145 LOG.error("%s no suite result exists" % self.report_name) 146 147 # set test suites element attributes 148 self.data_helper.set_element_attributes(test_suites_element, 149 test_suites_attributes) 150 return test_suites_element 151 152 def _initial_test_suites(self): 153 test_suites_element = self.data_helper.initial_suites_element() 154 test_suites_attributes = {ReportConstant.name: self.report_name, 155 ReportConstant.time_stamp: time.strftime( 156 ReportConstant.time_format, 157 time.localtime()), 158 ReportConstant.time: 0, 159 ReportConstant.errors: 0, 160 ReportConstant.disabled: 0, 161 ReportConstant.failures: 0, 162 ReportConstant.tests: 0, 163 ReportConstant.ignored: 0, 164 ReportConstant.unavailable: 0, 165 ReportConstant.product_info: self.args.get( 166 ReportConstant.product_info_, "")} 167 if self.args.get(ReportConstant.module_name, ""): 168 test_suites_attributes[ReportConstant.name] = self.args.get( 169 ReportConstant.module_name, "") 170 need_update_attributes = [ReportConstant.time, ReportConstant.errors, 171 ReportConstant.tests, ReportConstant.ignored, 172 ReportConstant.disabled, 173 ReportConstant.failures, 174 ReportConstant.unavailable] 175 return test_suites_element, test_suites_attributes, \ 176 need_update_attributes 177 178 def _construct_test_suite(self, suite_result, case_results): 179 # initial test suite element 180 test_suite_element, test_suite_attributes = self._initial_test_suite( 181 suite_result) 182 183 # get test case elements that are children of test suite element 184 test_case_elements = [] 185 for case_result in case_results: 186 # initial test case element 187 test_case_element, test_case_attributes = self._initial_test_case( 188 case_result) 189 190 # update attributes according to case result 191 self.update_attributes(case_result, test_case_attributes, 192 test_suite_attributes) 193 194 # set test case attributes and add to test_suite_element 195 self.data_helper.set_element_attributes(test_case_element, 196 test_case_attributes) 197 test_case_elements.append(test_case_element) 198 test_suite_attributes[ReportConstant.disabled] += max(int( 199 test_suite_attributes[ReportConstant.tests] - 200 len(test_case_elements)), 0) 201 if test_case_elements: 202 child = test_case_elements[-1] 203 child.tail = self.data_helper.LINE_BREAK_INDENT 204 else: 205 LOG.debug("no case executed") 206 test_suite_element.extend(test_case_elements) 207 208 # set test suite attributes 209 self.data_helper.set_element_attributes(test_suite_element, 210 test_suite_attributes) 211 return test_suite_element, test_suite_attributes 212 213 @classmethod 214 def update_attributes(cls, case_result, test_case_attributes, 215 test_suite_attributes): 216 if case_result.code == ResultCode.PASSED.value: 217 test_case_attributes[ReportConstant.status] = ReportConstant.run 218 test_case_attributes[ReportConstant.result] = ReportConstant.true 219 test_case_attributes[ReportConstant.message] = "" 220 elif case_result.code == ResultCode.FAILED.value: 221 test_case_attributes[ReportConstant.status] = ReportConstant.run 222 test_case_attributes[ReportConstant.result] = ReportConstant.false 223 test_suite_attributes[ReportConstant.failures] = \ 224 test_suite_attributes[ReportConstant.failures] + 1 225 elif case_result.code == ResultCode.SKIPPED.value: 226 test_case_attributes[ReportConstant.status] = ReportConstant.skip 227 test_case_attributes[ReportConstant.result] = ReportConstant.false 228 test_suite_attributes[ReportConstant.ignored] = \ 229 test_suite_attributes[ReportConstant.ignored] + 1 230 else: # ResultCode.UNKNOWN.value or other value 231 test_case_attributes[ReportConstant.status] = \ 232 ReportConstant.disable 233 test_case_attributes[ReportConstant.result] = ReportConstant.false 234 test_suite_attributes[ReportConstant.disabled] = \ 235 test_suite_attributes[ReportConstant.disabled] + 1 236 237 def _initial_test_suite(self, suite_result): 238 test_suite_element = self.data_helper.initial_suite_element() 239 test_suite_attributes = {ReportConstant.name: suite_result.suite_name, 240 ReportConstant.time: round(float( 241 suite_result.run_time) / 1000, 3), 242 ReportConstant.errors: 0, 243 ReportConstant.disabled: 0, 244 ReportConstant.failures: 0, 245 ReportConstant.ignored: 0, 246 ReportConstant.tests: suite_result.test_num, 247 ReportConstant.message: 248 suite_result.stacktrace 249 } 250 if self.args.get(ReportConstant.module_name, ""): 251 test_suite_attributes[ReportConstant.module_name] = self.args.get( 252 ReportConstant.module_name, "") 253 return test_suite_element, test_suite_attributes 254 255 def _initial_test_case(self, case_result): 256 test_case_element = self.data_helper.initial_case_element() 257 case_stacktrace = str(case_result.stacktrace) 258 for char_index in range(32): 259 if char_index in [10, 13]: # chr(10): LF, chr(13): CR 260 continue 261 case_stacktrace = case_stacktrace.replace(chr(char_index), "") 262 test_case_attributes = {ReportConstant.name: case_result.test_name, 263 ReportConstant.status: "", 264 ReportConstant.time: float( 265 case_result.run_time) / 1000, 266 ReportConstant.class_name: 267 case_result.test_class, 268 ReportConstant.result: "", 269 ReportConstant.level: 1, 270 ReportConstant.message: case_stacktrace} 271 return test_case_element, test_case_attributes 272 273 @classmethod 274 def clear_report_result(cls): 275 with SUITE_REPORTER_LOCK: 276 LOG.debug("clear_report_result") 277 cls.suite_report_result.clear() 278 279 @classmethod 280 def clear_failed_case_list(cls): 281 with SUITE_REPORTER_LOCK: 282 LOG.debug("clear_failed_case_list") 283 cls.failed_case_list.clear() 284 285 @classmethod 286 def append_report_result(cls, report_result): 287 with SUITE_REPORTER_LOCK: 288 if not isinstance(report_result, tuple) or len(report_result) != 2: 289 LOG.error("report result should be a tuple with length 2") 290 return 291 data_path = report_result[0] 292 for index, exist_result in enumerate(cls.suite_report_result): 293 if exist_result[0] == data_path: 294 LOG.debug("data report %s generate again", data_path) 295 cls.suite_report_result[index] = report_result 296 return 297 cls.suite_report_result.append(report_result) 298 cls._upload_case_result(report_result[1]) 299 300 @classmethod 301 def _upload_case_result(cls, result_str): 302 from xdevice import Scheduler 303 if Scheduler.mode != ModeType.decc: 304 return 305 element = DataHelper.parse_data_report(result_str) 306 if len(element) == 0: 307 LOG.debug("%s is error", result_str) 308 return 309 element = element[0] 310 result, error_msg = Scheduler.get_script_result(element) 311 case_name = element.get(ReportConstant.name, "") 312 try: 313 from agent.decc import Handler 314 LOG.info("upload case result to decc") 315 Handler.upload_case_result(case_name, result, error_msg) 316 except ModuleNotFoundError as error: 317 from xdevice import Scheduler 318 if Scheduler.mode == ModeType.decc: 319 LOG.error("module not found %s", error.args) 320 321 @classmethod 322 def get_report_result(cls): 323 with SUITE_REPORTER_LOCK: 324 LOG.debug("get_report_result, length is {}". 325 format(len(cls.suite_report_result))) 326 return SuiteReporter.suite_report_result 327 328 @classmethod 329 def set_suite_list(cls, suite_list): 330 LOG.debug("set_suite_list, length is {}".format(len(suite_list))) 331 cls.suite_list = suite_list 332 333 @classmethod 334 def get_suite_list(cls): 335 with SUITE_REPORTER_LOCK: 336 LOG.debug("get_suite_list, length is {}". 337 format(len(cls.suite_list))) 338 return SuiteReporter.suite_list 339 340 @classmethod 341 def get_failed_case_list(cls): 342 with SUITE_REPORTER_LOCK: 343 LOG.debug("get_failed_case_list, length is {}". 344 format(len(cls.failed_case_list))) 345 return SuiteReporter.failed_case_list 346 347 @classmethod 348 def append_history_result(cls, suite_reports): 349 from _core.utils import get_filename_extension 350 with SUITE_REPORTER_LOCK: 351 LOG.debug("append_history_result,suite_reports length is {}". 352 format(len(suite_reports))) 353 for report_path, report_result in suite_reports: 354 module_name = get_filename_extension(report_path)[0] 355 cls.history_report_result. \ 356 append((module_name, report_path, report_result)) 357 358 @classmethod 359 def clear_history_result(cls): 360 with SUITE_REPORTER_LOCK: 361 LOG.debug("clear_history_result") 362 cls.history_report_result.clear() 363 364 @classmethod 365 def get_history_result_by_module(cls, name): 366 with SUITE_REPORTER_LOCK: 367 LOG.debug("get_history_result_by_module,module_name:{}". 368 format(name)) 369 for module_name, report_path, report_result in \ 370 cls.history_report_result: 371 if name == module_name: 372 return report_path, report_result 373 return "", "" 374 375 @classmethod 376 def get_history_result_list(cls): 377 with SUITE_REPORTER_LOCK: 378 LOG.debug("get_history_result_list,length is {}". 379 format(len(cls.history_report_result))) 380 return cls.history_report_result 381