1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import time 21from enum import Enum 22from threading import RLock 23 24from _core.constants import ModeType 25from _core.logger import platform_logger 26from _core.report.encrypt import check_pub_key_exist 27from _core.report.reporter_helper import DataHelper 28from _core.report.reporter_helper import ReportConstant 29 30LOG = platform_logger("SuiteReporter") 31SUITE_REPORTER_LOCK = RLock() 32 33 34class ResultCode(Enum): 35 UNKNOWN = -1010 36 BLOCKED = -1 37 PASSED = 0 38 FAILED = 1 39 SKIPPED = 2 40 41 42class SuiteReporter: 43 suite_list = [] 44 suite_report_result = [] 45 failed_case_list = [] 46 history_report_result = [] 47 48 def __init__(self, results, report_name, report_path=None, **kwargs): 49 """ 50 Create suite report 51 :param results: [(suite_result, [case_results]), 52 (suite_result, [case_results]), ...] 53 :param report_name: suite report name 54 :param report_path: suite report path 55 """ 56 self.results = results 57 self.data_helper = DataHelper() 58 self.report_name = report_name 59 self.report_path = report_path 60 self.suite_data_path = os.path.join( 61 self.report_path, "%s%s" % ( 62 report_name, self.data_helper.DATA_REPORT_SUFFIX)) 63 self.kwargs = kwargs 64 from xdevice import Scheduler 65 if not check_pub_key_exist() and Scheduler.mode != ModeType.decc: 66 SuiteReporter.suite_report_result.clear() 67 68 def create_empty_report(self): 69 # create empty data report only for single suite 70 if len(self.results) != 1: 71 LOG.error("Can only create one empty data report once") 72 return 73 suite_result, _ = self.results[0] 74 75 # initial test suites element 76 test_suites_element, test_suites_attributes, _ = \ 77 self._initial_test_suites() 78 test_suites_attributes[ReportConstant.unavailable] = 1 79 self.data_helper.set_element_attributes(test_suites_element, 80 test_suites_attributes) 81 82 # initial test suite element 83 test_suite_element, test_suite_attributes = self._initial_test_suite( 84 suite_result) 85 test_suite_element.text, test_suite_element.tail = \ 86 "", self.data_helper.LINE_BREAK 87 test_suite_attributes[ReportConstant.unavailable] = 1 88 test_suite_attributes[ReportConstant.message] = suite_result.stacktrace 89 90 from xdevice import Scheduler 91 if Scheduler.mode == ModeType.decc: 92 test_suite_attributes[ReportConstant.result] = ReportConstant.false 93 self.data_helper.set_element_attributes(test_suite_element, 94 test_suite_attributes) 95 96 # append test suite element 97 test_suites_element.append(test_suite_element) 98 99 # generate report 100 if test_suites_element: 101 from xdevice import Scheduler 102 if Scheduler.mode != ModeType.decc: 103 self.data_helper.generate_report(test_suites_element, 104 self.suite_data_path) 105 SuiteReporter.append_report_result(( 106 self.suite_data_path, self.data_helper.to_string( 107 test_suites_element))) 108 109 def generate_data_report(self): 110 # construct test suites element 111 test_suites_element = self._construct_test_suites() 112 113 # generate report 114 if test_suites_element: 115 self.data_helper.generate_report(test_suites_element, 116 self.suite_data_path) 117 SuiteReporter.append_report_result(( 118 self.suite_data_path, self.data_helper.to_string( 119 test_suites_element))) 120 121 def _construct_test_suites(self): 122 # initial test suites element 123 test_suites_element, test_suites_attributes, need_update_attributes = \ 124 self._initial_test_suites() 125 126 # construct test suite element 127 for suite_result, case_results in self.results: 128 test_suite_element, test_suite_attributes = \ 129 self._construct_test_suite(suite_result, case_results) 130 131 # add test suite element 132 test_suites_element.append(test_suite_element) 133 134 # update and set test suites element attributes 135 for need_update_attribute in need_update_attributes: 136 test_suites_attributes[need_update_attribute] += \ 137 test_suite_attributes.get(need_update_attribute, 0) 138 test_suites_attributes[ReportConstant.time] = \ 139 round(test_suites_attributes.get(ReportConstant.time), 3) 140 141 if test_suites_element: 142 test_suite_element = test_suites_element[-1] 143 test_suite_element.tail = self.data_helper.LINE_BREAK 144 else: 145 LOG.error("%s no suite result exists" % self.report_name) 146 147 # set test suites element attributes 148 self.data_helper.set_element_attributes(test_suites_element, 149 test_suites_attributes) 150 return test_suites_element 151 152 def _initial_test_suites(self): 153 test_suites_element = self.data_helper.initial_suites_element() 154 test_suites_attributes = { 155 ReportConstant.name: self.report_name, 156 ReportConstant.time_stamp: time.strftime(ReportConstant.time_format, time.localtime()), 157 ReportConstant.time: 0, 158 ReportConstant.errors: 0, 159 ReportConstant.disabled: 0, 160 ReportConstant.failures: 0, 161 ReportConstant.tests: 0, 162 ReportConstant.ignored: 0, 163 ReportConstant.unavailable: 0, 164 ReportConstant.product_info: self.kwargs.get(ReportConstant.product_info_, ""), 165 # module's failure message 166 ReportConstant.message: self.kwargs.get(ReportConstant.message, "") 167 } 168 module_name = self.kwargs.get(ReportConstant.module_name, "") 169 if module_name: 170 test_suites_attributes[ReportConstant.name] = module_name 171 need_update_attributes = [ 172 ReportConstant.time, 173 ReportConstant.errors, 174 ReportConstant.tests, 175 ReportConstant.ignored, 176 ReportConstant.disabled, 177 ReportConstant.failures, 178 ReportConstant.unavailable 179 ] 180 return test_suites_element, test_suites_attributes, need_update_attributes 181 182 def _construct_test_suite(self, suite_result, case_results): 183 # initial test suite element 184 test_suite_element, test_suite_attributes = self._initial_test_suite( 185 suite_result) 186 187 # get test case elements that are children of test suite element 188 test_case_elements = [] 189 for case_result in case_results: 190 # initial test case element 191 test_case_element, test_case_attributes = self._initial_test_case( 192 case_result) 193 194 # update attributes according to case result 195 self.update_attributes(case_result, test_case_attributes, 196 test_suite_attributes) 197 198 # set test case attributes and add to test_suite_element 199 self.data_helper.set_element_attributes(test_case_element, 200 test_case_attributes) 201 test_case_elements.append(test_case_element) 202 test_suite_attributes[ReportConstant.disabled] += max(int( 203 test_suite_attributes.get(ReportConstant.tests) - 204 len(test_case_elements)), 0) 205 if test_case_elements: 206 child = test_case_elements[-1] 207 child.tail = self.data_helper.LINE_BREAK_INDENT 208 else: 209 LOG.debug("No case executed") 210 test_suite_element.extend(test_case_elements) 211 212 # set test suite attributes 213 self.data_helper.set_element_attributes(test_suite_element, 214 test_suite_attributes) 215 return test_suite_element, test_suite_attributes 216 217 @classmethod 218 def update_attributes(cls, case_result, test_case_attributes, 219 test_suite_attributes): 220 if case_result.code == ResultCode.PASSED.value: 221 test_case_attributes[ReportConstant.status] = ReportConstant.run 222 test_case_attributes[ReportConstant.result] = ReportConstant.true 223 test_case_attributes[ReportConstant.message] = "" 224 elif case_result.code == ResultCode.FAILED.value: 225 test_case_attributes[ReportConstant.status] = ReportConstant.run 226 test_case_attributes[ReportConstant.result] = ReportConstant.false 227 test_suite_attributes[ReportConstant.failures] = \ 228 test_suite_attributes[ReportConstant.failures] + 1 229 elif case_result.code == ResultCode.SKIPPED.value: 230 test_case_attributes[ReportConstant.status] = ReportConstant.skip 231 test_case_attributes[ReportConstant.result] = ReportConstant.false 232 test_suite_attributes[ReportConstant.ignored] = \ 233 test_suite_attributes[ReportConstant.ignored] + 1 234 else: # ResultCode.UNKNOWN.value or other value 235 test_case_attributes[ReportConstant.status] = \ 236 ReportConstant.disable 237 test_case_attributes[ReportConstant.result] = ReportConstant.false 238 test_suite_attributes[ReportConstant.disabled] = \ 239 test_suite_attributes[ReportConstant.disabled] + 1 240 241 def _initial_test_suite(self, suite_result): 242 test_suite_element = self.data_helper.initial_suite_element() 243 test_suite_attributes = { 244 ReportConstant.name: suite_result.suite_name, 245 ReportConstant.time: round(float(suite_result.run_time) / 1000, 3), 246 ReportConstant.errors: 0, 247 ReportConstant.disabled: 0, 248 ReportConstant.failures: 0, 249 ReportConstant.ignored: 0, 250 ReportConstant.tests: suite_result.test_num, 251 ReportConstant.message: suite_result.stacktrace 252 } 253 if self.kwargs.get(ReportConstant.module_name, ""): 254 test_suite_attributes[ReportConstant.module_name] = self.kwargs.get( 255 ReportConstant.module_name, "") 256 return test_suite_element, test_suite_attributes 257 258 def _initial_test_case(self, case_result): 259 test_case_element = self.data_helper.initial_case_element() 260 case_stacktrace = str(case_result.stacktrace) 261 for char_index in range(32): 262 if char_index in [10, 13]: # chr(10): LF, chr(13): CR 263 continue 264 case_stacktrace = case_stacktrace.replace(chr(char_index), "") 265 test_case_attributes = {ReportConstant.name: case_result.test_name, 266 ReportConstant.status: "", 267 ReportConstant.time: round(float( 268 case_result.run_time) / 1000, 3), 269 ReportConstant.class_name: 270 case_result.test_class, 271 ReportConstant.result: "", 272 ReportConstant.level: 1, 273 ReportConstant.message: case_stacktrace} 274 return test_case_element, test_case_attributes 275 276 @classmethod 277 def clear_report_result(cls): 278 with SUITE_REPORTER_LOCK: 279 LOG.debug("Clear report result") 280 cls.suite_report_result.clear() 281 282 @classmethod 283 def clear_failed_case_list(cls): 284 with SUITE_REPORTER_LOCK: 285 LOG.debug("Clear failed case list") 286 cls.failed_case_list.clear() 287 288 @classmethod 289 def append_report_result(cls, report_result): 290 with SUITE_REPORTER_LOCK: 291 if not isinstance(report_result, tuple) or len(report_result) != 2: 292 LOG.error("Report result should be a tuple with length 2") 293 return 294 data_path = report_result[0] 295 for index, exist_result in enumerate(cls.suite_report_result): 296 if exist_result[0] == data_path: 297 LOG.debug("Data report %s generate again", data_path) 298 cls.suite_report_result[index] = report_result 299 return 300 cls.suite_report_result.append(report_result) 301 cls._upload_case_result(report_result[1]) 302 303 @classmethod 304 def _upload_case_result(cls, result_str): 305 from xdevice import Scheduler 306 if Scheduler.mode != ModeType.decc: 307 return 308 element = DataHelper.parse_data_report(result_str) 309 if len(element) == 0: 310 LOG.debug("%s is error", result_str) 311 return 312 element = element[0] 313 result, error_msg = Scheduler.get_script_result(element) 314 case_name = element.get(ReportConstant.name, "") 315 try: 316 from agent.decc import Handler 317 LOG.info("Upload case result to decc") 318 Handler.upload_case_result(case_name, result, error_msg) 319 except ModuleNotFoundError as error: 320 from xdevice import Scheduler 321 if Scheduler.mode == ModeType.decc: 322 LOG.error("Module not found %s", error.args) 323 324 @classmethod 325 def get_report_result(cls): 326 with SUITE_REPORTER_LOCK: 327 LOG.debug("Get report result, length is {}". 328 format(len(cls.suite_report_result))) 329 return SuiteReporter.suite_report_result 330 331 @classmethod 332 def set_suite_list(cls, suite_list): 333 LOG.debug("Set suite list, length is {}".format(len(suite_list))) 334 cls.suite_list = suite_list 335 336 @classmethod 337 def get_suite_list(cls): 338 with SUITE_REPORTER_LOCK: 339 LOG.debug("Get suite list, length is {}". 340 format(len(cls.suite_list))) 341 return SuiteReporter.suite_list 342 343 @classmethod 344 def get_failed_case_list(cls): 345 with SUITE_REPORTER_LOCK: 346 LOG.debug("Get failed case list, length is {}". 347 format(len(cls.failed_case_list))) 348 return SuiteReporter.failed_case_list 349 350 @classmethod 351 def append_history_result(cls, suite_reports): 352 from _core.utils import get_filename_extension 353 with SUITE_REPORTER_LOCK: 354 LOG.debug("Append history result,suite reports length is {}". 355 format(len(suite_reports))) 356 for report_path, report_result in suite_reports: 357 module_name = get_filename_extension(report_path)[0] 358 cls.history_report_result. \ 359 append((module_name, report_path, report_result)) 360 361 @classmethod 362 def clear_history_result(cls): 363 with SUITE_REPORTER_LOCK: 364 LOG.debug("Clear history result") 365 cls.history_report_result.clear() 366 367 @classmethod 368 def get_history_result_by_module(cls, name): 369 with SUITE_REPORTER_LOCK: 370 LOG.debug("Get history result by module,module_name:{}". 371 format(name)) 372 for module_name, report_path, report_result in \ 373 cls.history_report_result: 374 if name == module_name: 375 return report_path, report_result 376 return "", "" 377 378 @classmethod 379 def get_history_result_list(cls): 380 with SUITE_REPORTER_LOCK: 381 LOG.debug("Get history result list,length is {}". 382 format(len(cls.history_report_result))) 383 return cls.history_report_result 384