1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import re 21import stat 22import uuid 23from abc import ABC 24from abc import abstractmethod 25 26from _core.plugin import Plugin 27from _core.plugin import get_plugin 28from _core.constants import TestType 29from _core.executor.bean import CaseResult 30from _core.executor.bean import SuiteResult 31from _core.executor.bean import SuitesResult 32from _core.interface import LifeCycle 33from _core.interface import IListener 34from _core.report.suite_reporter import SuiteReporter 35from _core.report.suite_reporter import ResultCode 36 37 38__all__ = ["UniversalReportListener", "PlusReportListener"] 39 40 41class AbsReportListener(IListener, ABC): 42 43 @abstractmethod 44 def _get_suite_result(self, test_result, create=False): 45 pass 46 47 @abstractmethod 48 def _get_test_result(self, test_result, create=False): 49 pass 50 51 def __started__(self, lifecycle, result): 52 if lifecycle == LifeCycle.TestSuites: 53 self._handle_test_suites_start(result) 54 elif lifecycle == LifeCycle.TestSuite: 55 self._handle_testsuite_start(result) 56 elif lifecycle == LifeCycle.TestCase: 57 self._handle_case_start(result) 58 59 def __ended__(self, lifecycle, test_result, **kwargs): 60 if lifecycle == LifeCycle.TestSuite: 61 self._handle_testsuite_end(test_result, kwargs) 62 elif lifecycle == LifeCycle.TestSuites: 63 self._handle_test_suites_end(test_result, kwargs) 64 elif lifecycle == LifeCycle.TestCase: 65 self._handle_case_end(test_result) 66 elif lifecycle == LifeCycle.TestTask: 67 self._handle_task_end(test_result, kwargs) 68 69 def __skipped__(self, lifecycle, result): 70 if lifecycle == LifeCycle.TestCase: 71 self._handle_case_skip(result) 72 73 def __failed__(self, lifecycle, test_result): 74 if lifecycle == LifeCycle.TestSuite: 75 self._handle_suite_fail(test_result) 76 elif lifecycle == LifeCycle.TestCase: 77 self._handle_case_fail(test_result) 78 79 @abstractmethod 80 def _handle_test_suites_start(self, test_result): 81 pass 82 83 @abstractmethod 84 def _handle_testsuite_start(self, test_result): 85 pass 86 87 @abstractmethod 88 def _handle_case_start(self, test_result): 89 pass 90 91 @abstractmethod 92 def _handle_testsuite_end(self, test_result, kwargs): 93 pass 94 95 @abstractmethod 96 def _handle_test_suites_end(self, test_result, kwargs): 97 pass 98 99 @abstractmethod 100 def _handle_case_end(self, test_result): 101 pass 102 103 @abstractmethod 104 def _handle_task_end(self, test_result, kwargs): 105 pass 106 107 @abstractmethod 108 def _handle_case_skip(self, test_result): 109 pass 110 111 @abstractmethod 112 def _handle_case_fail(self, test_result): 113 pass 114 115 @abstractmethod 116 def _handle_suite_fail(self, test_result): 117 pass 118 119 120class ReportEventListener(AbsReportListener, ABC): 121 122 def __init__(self): 123 self.result = list() 124 self.suites = dict() 125 self.tests = dict() 126 self.current_suite_id = 0 127 self.current_test_id = 0 128 self.report_path = "" 129 130 def _get_suite_result(self, test_result, create=False): 131 if test_result.index in self.suites: 132 return self.suites.get(test_result.index) 133 elif create: 134 suite = SuiteResult() 135 rid = uuid.uuid4().hex if test_result.index == "" else \ 136 test_result.index 137 suite.index = rid 138 return self.suites.setdefault(rid, suite) 139 else: 140 return self.suites.get(self.current_suite_id) 141 142 def _get_test_result(self, test_result, create=False): 143 if test_result.index in self.tests: 144 return self.tests.get(test_result.index) 145 elif create: 146 test = CaseResult() 147 rid = uuid.uuid4().hex if test_result.index == "" else \ 148 test_result.index 149 test.index = rid 150 return self.tests.setdefault(rid, test) 151 else: 152 return self.tests.get(self.current_test_id) 153 154 def _handle_case_start(self, test_result): 155 test = self._get_test_result(test_result=test_result, create=True) 156 test.test_name = test_result.test_name 157 test.test_class = test_result.test_class 158 self.current_test_id = test.index 159 160 def _handle_testsuite_start(self, test_result): 161 suite = self._get_suite_result(test_result=test_result, 162 create=True) 163 suite.suite_name = test_result.suite_name 164 suite.test_num = test_result.test_num 165 self.current_suite_id = suite.index 166 167 def _handle_test_suites_start(self, test_result): 168 suites = self._get_suite_result(test_result=test_result, 169 create=True) 170 suites.suites_name = test_result.suites_name 171 suites.test_num = test_result.test_num 172 self.current_suite_id = suites.index 173 174 def _handle_testsuite_end(self, test_result, kwargs): 175 suite = self._get_suite_result(test_result=test_result, 176 create=False) 177 if not suite: 178 return 179 suite.run_time = test_result.run_time 180 suite.code = test_result.code 181 suite.report = test_result.report 182 suite.test_num = max(test_result.test_num, len(self.tests)) 183 self._handle_suite_end_data(suite, kwargs) 184 185 def _handle_task_end(self, test_result, kwargs): 186 test_type = str(kwargs.get("test_type", TestType.all)) 187 reporter = get_plugin(plugin_type=Plugin.REPORTER, 188 plugin_id=test_type) 189 if not reporter: 190 reporter = get_plugin(plugin_type=Plugin.REPORTER, 191 plugin_id=TestType.all)[0] 192 else: 193 reporter = reporter[0] 194 reporter.__generate_reports__(self.report_path, 195 task_info=test_result) 196 197 def _handle_case_end(self, test_result): 198 test = self._get_test_result(test_result=test_result, create=False) 199 test.run_time = test_result.run_time 200 test.stacktrace = test_result.stacktrace 201 test.code = test_result.code 202 test.report = test_result.report 203 test.is_completed = test_result.is_completed 204 205 def _handle_test_suites_end(self, test_result, kwargs): 206 if not kwargs.get("suite_report", False): 207 result_dir = os.path.join(self.report_path, "result") 208 os.makedirs(result_dir, exist_ok=True) 209 message = kwargs.get("message", "") 210 # 有的场景传SuiteResult对象进来,导致报错,需要增加实例判断 211 if isinstance(test_result, SuitesResult): 212 message = test_result.stacktrace 213 suites_name = test_result.suites_name 214 else: 215 suites_name = kwargs.get("suites_name", "") 216 self._generate_data_report(result_dir, self.result, suites_name, message=message) 217 218 def _handle_case_skip(self, test_result): 219 test = self._get_test_result(test_result=test_result, create=False) 220 test.stacktrace = test_result.stacktrace 221 test.code = ResultCode.SKIPPED.value 222 223 def _handle_case_fail(self, test_result): 224 test = self._get_test_result(test_result=test_result, create=False) 225 test.stacktrace = test_result.stacktrace 226 test.code = ResultCode.FAILED.value 227 228 def _handle_suite_fail(self, test_result): 229 suite = self._get_suite_result(test_result=test_result, 230 create=False) 231 suite.stacktrace = test_result.stacktrace 232 suite.code = ResultCode.FAILED.value 233 234 @abstractmethod 235 def _generate_data_report(self, result_dir, results, name, **kwargs): 236 pass 237 238 @abstractmethod 239 def _handle_suite_end_data(self, suite, kwargs): 240 pass 241 242 243class UniversalReportListener(ReportEventListener, ABC): 244 245 def __init__(self): 246 super().__init__() 247 self.suite_distributions = dict() 248 249 def _handle_task_end(self, test_result, kwargs): 250 pass 251 252 @classmethod 253 def _generate_data_report(cls, result_dir, results, name, **kwargs): 254 suite_report = SuiteReporter(results, name, result_dir, **kwargs) 255 suite_report.generate_data_report() 256 257 258class PlusReportListener(ReportEventListener, ABC): 259 260 def __init__(self): 261 super().__init__() 262 self.device_sn = "" 263 self.suite_name = "" 264 265 @classmethod 266 def _generate_data_report(cls, result_dir, results, name, **kwargs): 267 suite_report = SuiteReporter(results, name, result_dir, **kwargs) 268 suite_report.generate_data_report() 269 270 def _handle_case_end(self, test_result): 271 test = self._get_test_result(test_result=test_result, create=False) 272 test.run_time = test_result.run_time 273 test.stacktrace = test_result.stacktrace 274 test.code = test_result.code 275 test.report = test_result.report 276 if getattr(test_result, "result_content", ""): 277 test.result_content = test_result.result_content 278 if hasattr(self, "report_plus") and self.report_plus: 279 self._update_result(test_result) 280 test.normal_screen_urls = test_result.normal_screen_urls 281 test.failure_screen_urls = test_result.failure_screen_urls 282 if hasattr(self, "device_log_back_fill_path"): 283 test_record, result_info = self._update_test_record(self.device_log_back_fill_path, test_result) 284 test.test_record = test_record 285 if hasattr(test_result, "result_info") and test_result.result_info: 286 test.result_info = test_result.result_info 287 else: 288 test.result_info = result_info 289 290 def _update_result(self, result): 291 serial_re = re.sub(r'[^A-Za-z0-9]', '_', self.device_sn) 292 screenshot_base_path = os.path.join( 293 self.report_path, "screenshot", serial_re, self.suite_name) 294 temp = "{}_{}=".format(result.test_class, result.test_name) 295 result.normal_screen_urls = os.path.join(screenshot_base_path, "normal", temp) 296 result.failure_screen_urls = os.path.join(screenshot_base_path, "failure", temp) 297 298 def _update_test_record(self, device_log_back_fill_path, result): 299 try: 300 split_str = "{}_{}".format(result.test_class, result.test_name) 301 test_record = "" 302 result_info = "" 303 test_class = '"{}"'.format(result.test_class) 304 test_name = '"{}"'.format(result.test_name) 305 306 fd = os.open(device_log_back_fill_path, os.O_RDONLY, stat.S_IWUSR | stat.S_IRUSR) 307 with os.fdopen(fd, "r", encoding="utf-8") as file_content: 308 for line in file_content.readlines(): 309 suite_name = "" 310 if hasattr(self, "suite_name"): 311 suite_name = self.suite_name 312 if suite_name and suite_name in line and split_str in line: 313 str_arr = line.split(split_str) 314 if str_arr[1]: 315 test_record = str_arr[1].strip() 316 elif test_class in line and test_name in line: 317 json_arr = line.split("HtsIgnoredTest") 318 if len(json_arr) == 2: 319 result_info = json_arr[1].strip() 320 return test_record, result_info 321 except (FileNotFoundError, IOError) as error: 322 raise error 323