1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import time 21from enum import Enum 22from threading import RLock 23 24from _core.constants import CaseResult 25from _core.constants import ModeType 26from _core.logger import platform_logger 27from _core.report.encrypt import check_pub_key_exist 28from _core.report.reporter_helper import DataHelper 29from _core.report.reporter_helper import ReportConstant 30from _core.context.center import Context 31from _core.context.handler import get_case_result 32 33LOG = platform_logger("SuiteReporter") 34SUITE_REPORTER_LOCK = RLock() 35 36 37class ResultCode(Enum): 38 UNKNOWN = -1010 39 BLOCKED = -1 40 PASSED = 0 41 FAILED = 1 42 SKIPPED = 2 43 44 45class SuiteReporter: 46 suite_list = [] 47 suite_report_result = [] 48 failed_case_list = [] 49 history_report_result = [] 50 51 def __init__(self, results, report_name, report_path=None, **kwargs): 52 """ 53 Create suite report 54 :param results: [(suite_result, [case_results]), 55 (suite_result, [case_results]), ...] 56 :param report_name: suite report name 57 :param report_path: suite report path 58 """ 59 self.results = results 60 self.data_helper = DataHelper() 61 self.report_name = report_name 62 self.report_path = report_path 63 self.suite_data_path = os.path.join( 64 self.report_path, "%s%s" % ( 65 report_name, self.data_helper.DATA_REPORT_SUFFIX)) 66 self.kwargs = kwargs 67 if not check_pub_key_exist() and Context.session().mode != ModeType.decc: 68 SuiteReporter.suite_report_result.clear() 69 70 def create_empty_report(self): 71 # create empty data report only for single suite 72 if len(self.results) != 1: 73 LOG.error("Can only create one empty data report once") 74 return 75 suite_result, _ = self.results[0] 76 77 # initial test suites element 78 test_suites_element, test_suites_attributes, _ = \ 79 self._initial_test_suites() 80 if self.kwargs.get(ReportConstant.result_kind) == CaseResult.unavailable: 81 test_suites_attributes[ReportConstant.unavailable] = 1 82 else: 83 test_suites_attributes[ReportConstant.disabled] = 1 84 self.data_helper.set_element_attributes(test_suites_element, 85 test_suites_attributes) 86 87 # initial test suite element 88 test_suite_element, test_suite_attributes = self._initial_test_suite( 89 suite_result) 90 test_suite_element.text, test_suite_element.tail = \ 91 "", self.data_helper.LINE_BREAK 92 if self.kwargs.get(ReportConstant.result_kind) == CaseResult.unavailable: 93 test_suite_attributes[ReportConstant.unavailable] = 1 94 else: 95 test_suite_attributes[ReportConstant.disabled] = 1 96 test_suite_attributes[ReportConstant.message] = suite_result.stacktrace 97 98 if Context.session().mode == ModeType.decc: 99 test_suite_attributes[ReportConstant.result] = ReportConstant.false 100 self.data_helper.set_element_attributes(test_suite_element, 101 test_suite_attributes) 102 103 # append test suite element 104 test_suites_element.append(test_suite_element) 105 106 # generate report 107 if test_suites_element: 108 if Context.session().mode != ModeType.decc: 109 self.data_helper.generate_report(test_suites_element, 110 self.suite_data_path) 111 SuiteReporter.append_report_result(( 112 self.suite_data_path, self.data_helper.to_string( 113 test_suites_element))) 114 115 def generate_data_report(self): 116 # construct test suites element 117 test_suites_element = self._construct_test_suites() 118 119 # generate report 120 if test_suites_element: 121 self.data_helper.generate_report(test_suites_element, 122 self.suite_data_path) 123 SuiteReporter.append_report_result(( 124 self.suite_data_path, self.data_helper.to_string( 125 test_suites_element))) 126 127 def _construct_test_suites(self): 128 # initial test suites element 129 test_suites_element, test_suites_attributes, need_update_attributes = \ 130 self._initial_test_suites() 131 132 # construct test suite element 133 for suite_result, case_results in self.results: 134 test_suite_element, test_suite_attributes = \ 135 self._construct_test_suite(suite_result, case_results) 136 137 # add test suite element 138 test_suites_element.append(test_suite_element) 139 140 # update and set test suites element attributes 141 for need_update_attribute in need_update_attributes: 142 test_suites_attributes[need_update_attribute] += \ 143 test_suite_attributes.get(need_update_attribute, 0) 144 test_suites_attributes[ReportConstant.time] = \ 145 round(test_suites_attributes.get(ReportConstant.time), 3) 146 147 if test_suites_element: 148 test_suite_element = test_suites_element[-1] 149 test_suite_element.tail = self.data_helper.LINE_BREAK 150 else: 151 LOG.error("%s no suite result exists" % self.report_name) 152 153 # set test suites element attributes 154 self.data_helper.set_element_attributes(test_suites_element, 155 test_suites_attributes) 156 return test_suites_element 157 158 def _initial_test_suites(self): 159 test_suites_element = self.data_helper.initial_suites_element() 160 test_suites_attributes = { 161 ReportConstant.name: self.report_name, 162 ReportConstant.time_stamp: time.strftime(ReportConstant.time_format, time.localtime()), 163 ReportConstant.time: 0, 164 ReportConstant.errors: 0, 165 ReportConstant.disabled: 0, 166 ReportConstant.failures: 0, 167 ReportConstant.tests: 0, 168 ReportConstant.ignored: 0, 169 ReportConstant.unavailable: 0, 170 # module's failure message 171 ReportConstant.message: self.kwargs.get(ReportConstant.message, "") 172 } 173 module_name = self.kwargs.get(ReportConstant.module_name, "") 174 if module_name: 175 test_suites_attributes[ReportConstant.name] = module_name 176 need_update_attributes = [ 177 ReportConstant.time, 178 ReportConstant.errors, 179 ReportConstant.tests, 180 ReportConstant.ignored, 181 ReportConstant.disabled, 182 ReportConstant.failures, 183 ReportConstant.unavailable 184 ] 185 return test_suites_element, test_suites_attributes, need_update_attributes 186 187 def _construct_test_suite(self, suite_result, case_results): 188 # initial test suite element 189 test_suite_element, test_suite_attributes = self._initial_test_suite( 190 suite_result) 191 192 # get test case elements that are children of test suite element 193 test_case_elements = [] 194 for case_result in case_results: 195 # initial test case element 196 test_case_element, test_case_attributes = self._initial_test_case( 197 case_result) 198 199 # update attributes according to case result 200 self.update_attributes(case_result, test_case_attributes, 201 test_suite_attributes) 202 203 # set test case attributes and add to test_suite_element 204 self.data_helper.set_element_attributes(test_case_element, 205 test_case_attributes) 206 test_case_elements.append(test_case_element) 207 test_suite_attributes[ReportConstant.disabled] += max(int( 208 test_suite_attributes.get(ReportConstant.tests) - 209 len(test_case_elements)), 0) 210 if test_case_elements: 211 child = test_case_elements[-1] 212 child.tail = self.data_helper.LINE_BREAK_INDENT 213 else: 214 LOG.debug("No case executed") 215 test_suite_element.extend(test_case_elements) 216 217 # set test suite attributes 218 self.data_helper.set_element_attributes(test_suite_element, 219 test_suite_attributes) 220 return test_suite_element, test_suite_attributes 221 222 @classmethod 223 def update_attributes(cls, case_result, test_case_attributes, 224 test_suite_attributes): 225 if case_result.code == ResultCode.PASSED.value: 226 test_case_attributes[ReportConstant.status] = ReportConstant.run 227 test_case_attributes[ReportConstant.result] = ReportConstant.true 228 test_case_attributes[ReportConstant.message] = "" 229 elif case_result.code == ResultCode.FAILED.value: 230 test_case_attributes[ReportConstant.status] = ReportConstant.run 231 test_case_attributes[ReportConstant.result] = ReportConstant.false 232 test_suite_attributes[ReportConstant.failures] = \ 233 test_suite_attributes[ReportConstant.failures] + 1 234 elif case_result.code == ResultCode.SKIPPED.value: 235 test_case_attributes[ReportConstant.status] = ReportConstant.skip 236 test_case_attributes[ReportConstant.result] = ReportConstant.false 237 test_suite_attributes[ReportConstant.ignored] = \ 238 test_suite_attributes[ReportConstant.ignored] + 1 239 else: # ResultCode.UNKNOWN.value or other value 240 test_case_attributes[ReportConstant.status] = \ 241 ReportConstant.disable 242 test_case_attributes[ReportConstant.result] = ReportConstant.false 243 test_suite_attributes[ReportConstant.disabled] = \ 244 test_suite_attributes[ReportConstant.disabled] + 1 245 246 def _initial_test_suite(self, suite_result): 247 test_suite_element = self.data_helper.initial_suite_element() 248 test_suite_attributes = { 249 ReportConstant.name: suite_result.suite_name, 250 ReportConstant.time: round(float(suite_result.run_time) / 1000, 3), 251 ReportConstant.errors: 0, 252 ReportConstant.disabled: 0, 253 ReportConstant.failures: 0, 254 ReportConstant.ignored: 0, 255 ReportConstant.tests: suite_result.test_num, 256 ReportConstant.message: suite_result.stacktrace, 257 ReportConstant.report: suite_result.report 258 } 259 if self.kwargs.get(ReportConstant.module_name, ""): 260 test_suite_attributes[ReportConstant.module_name] = self.kwargs.get( 261 ReportConstant.module_name, "") 262 return test_suite_element, test_suite_attributes 263 264 def _initial_test_case(self, case_result): 265 test_case_element = self.data_helper.initial_case_element() 266 case_stacktrace = str(case_result.stacktrace) 267 for char_index in range(32): 268 if char_index in [10, 13]: # chr(10): LF, chr(13): CR 269 continue 270 case_stacktrace = case_stacktrace.replace(chr(char_index), "") 271 test_case_attributes = { 272 ReportConstant.name: case_result.test_name, 273 ReportConstant.status: "", 274 ReportConstant.time: round(float(case_result.run_time) / 1000, 3), 275 ReportConstant.class_name: case_result.test_class, 276 ReportConstant.result: "", 277 ReportConstant.level: 1, 278 ReportConstant.message: case_stacktrace, 279 ReportConstant.report: case_result.report 280 } 281 result_content = getattr(case_result, ReportConstant.result_content, "") 282 if result_content: 283 test_case_attributes.update({ReportConstant.result_content: result_content}) 284 return test_case_element, test_case_attributes 285 286 @classmethod 287 def clear_report_result(cls): 288 with SUITE_REPORTER_LOCK: 289 LOG.debug("Clear report result") 290 cls.suite_report_result.clear() 291 292 @classmethod 293 def clear_failed_case_list(cls): 294 with SUITE_REPORTER_LOCK: 295 LOG.debug("Clear failed case list") 296 cls.failed_case_list.clear() 297 298 @classmethod 299 def append_report_result(cls, report_result): 300 with SUITE_REPORTER_LOCK: 301 if not isinstance(report_result, tuple) or len(report_result) != 2: 302 LOG.error("Report result should be a tuple with length 2") 303 return 304 data_path = report_result[0] 305 for index, exist_result in enumerate(cls.suite_report_result): 306 if exist_result[0] == data_path: 307 LOG.debug("Data report %s generate again", data_path) 308 cls.suite_report_result[index] = report_result 309 return 310 cls.suite_report_result.append(report_result) 311 cls._upload_case_result(report_result[1]) 312 313 @classmethod 314 def _upload_case_result(cls, result_str): 315 if Context.session().mode != ModeType.decc: 316 return 317 element = DataHelper.parse_data_report(result_str) 318 if len(element) == 0: 319 LOG.debug("%s is error", result_str) 320 return 321 element = element[0] 322 result, error_msg = get_case_result(element) 323 case_name = element.get(ReportConstant.name, "") 324 try: 325 from agent.decc import Handler 326 LOG.info("Upload case result to decc") 327 Handler.upload_case_result(case_name, result, error_msg) 328 except ModuleNotFoundError as error: 329 if Context.session().mode == ModeType.decc: 330 LOG.error("Module not found %s", error.args) 331 332 @classmethod 333 def get_report_result(cls): 334 with SUITE_REPORTER_LOCK: 335 LOG.debug("Get report result, length is {}". 336 format(len(cls.suite_report_result))) 337 return SuiteReporter.suite_report_result 338 339 @classmethod 340 def set_suite_list(cls, suite_list): 341 LOG.debug("Set suite list, length is {}".format(len(suite_list))) 342 cls.suite_list = suite_list 343 344 @classmethod 345 def get_suite_list(cls): 346 with SUITE_REPORTER_LOCK: 347 LOG.debug("Get suite list, length is {}". 348 format(len(cls.suite_list))) 349 return SuiteReporter.suite_list 350 351 @classmethod 352 def get_failed_case_list(cls): 353 with SUITE_REPORTER_LOCK: 354 LOG.debug("Get failed case list, length is {}". 355 format(len(cls.failed_case_list))) 356 return SuiteReporter.failed_case_list 357 358 @classmethod 359 def append_history_result(cls, suite_reports): 360 from _core.utils import get_filename_extension 361 with SUITE_REPORTER_LOCK: 362 LOG.debug("Append history result,suite reports length is {}". 363 format(len(suite_reports))) 364 for report_path, report_result in suite_reports: 365 module_name = get_filename_extension(report_path)[0] 366 cls.history_report_result. \ 367 append((module_name, report_path, report_result)) 368 369 @classmethod 370 def clear_history_result(cls): 371 with SUITE_REPORTER_LOCK: 372 LOG.debug("Clear history result") 373 cls.history_report_result.clear() 374 375 @classmethod 376 def get_history_result_by_module(cls, name): 377 with SUITE_REPORTER_LOCK: 378 LOG.debug("Get history result by module,module_name:{}". 379 format(name)) 380 for module_name, report_path, report_result in \ 381 cls.history_report_result: 382 if name == module_name: 383 return report_path, report_result 384 return "", "" 385 386 @classmethod 387 def get_history_result_list(cls): 388 with SUITE_REPORTER_LOCK: 389 LOG.debug("Get history result list,length is {}". 390 format(len(cls.history_report_result))) 391 return cls.history_report_result 392