1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import collections 20import copy 21import json 22import os 23import platform 24import re 25import shutil 26import time 27import stat 28from importlib import util 29from xml.etree import ElementTree 30 31from _core.interface import IReporter 32from _core.plugin import Plugin 33from _core.constants import CaseResult 34from _core.constants import DeviceProperties 35from _core.constants import ModeType 36from _core.constants import TestType 37from _core.constants import FilePermission 38from _core.logger import platform_logger 39from _core.exception import ParamError 40from _core.utils import calculate_elapsed_time 41from _core.utils import calculate_percent 42from _core.utils import copy_folder 43from _core.utils import get_filename_extension 44from _core.utils import parse_xml_cdata 45from _core.utils import show_current_environment 46from _core.report.encrypt import check_pub_key_exist 47from _core.report.encrypt import do_rsa_encrypt 48from _core.report.reporter_helper import DataHelper 49from _core.report.reporter_helper import ExecInfo 50from _core.report.reporter_helper import ReportConstant 51from _core.report.repeater_helper import RepeatHelper 52from _core.variables import Variables 53from _core.context.center import Context 54from _core.context.handler import get_case_result 55from _core.context.upload import Uploader 56 57LOG = platform_logger("ResultReporter") 58 59 60class ResultSummary: 61 62 def __init__(self): 63 self.modules = 0 64 self.repeat = 1 65 self.runmodules = 0 66 self.tests = 0 67 self.passed = 0 68 self.failed = 0 69 self.blocked = 0 70 self.ignored = 0 71 self.unavailable = 0 72 self.devices = [] 73 self.__module_list = [] 74 75 def get_data(self): 76 self.__module_list.clear() 77 LOG.info(f"Test Summary: modules: {self.modules}, repeat: {self.repeat}, run modules: {self.runmodules}, " 78 f"total: {self.tests}, passed: {self.passed}, failed: {self.failed}, " 79 f"blocked: {self.blocked}, ignored: {self.ignored}, unavailable: {self.unavailable}") 80 data = { 81 "modules": self.modules, 82 "repeat": self.repeat, 83 "runmodules": self.runmodules, 84 "tests": self.tests, 85 "passed": self.passed, 86 "failed": self.failed, 87 "blocked": self.blocked, 88 "ignored": self.ignored, 89 "unavailable": self.unavailable 90 } 91 return data 92 93 def get_devices(self): 94 return self.devices 95 96 def add_module(self, name): 97 if name not in self.__module_list: 98 self.__module_list.append(name) 99 self.modules += 1 100 101 102@Plugin(type=Plugin.REPORTER, id=TestType.all) 103class ResultReporter(IReporter): 104 105 def __init__(self): 106 self.report_path = None 107 self.task_info = None 108 self.summary_data_path = None 109 self.summary_data_str = "" 110 self.exec_info = None 111 self.data_helper = None 112 self.repeat_helper = None 113 self.summary = ResultSummary() 114 115 # task_record.info数据 116 self._failed_cases = [] 117 self.record_params = {} 118 self.record_reports = {} 119 120 def __generate_reports__(self, report_path, **kwargs): 121 LOG.info("") 122 LOG.info("**************************************************") 123 LOG.info("************** Start generate reports ************") 124 LOG.info("**************************************************") 125 LOG.info("") 126 127 if self._check_params(report_path, **kwargs): 128 # generate data report 129 self._generate_data_report() 130 131 # generate vision reports 132 self._generate_test_report() 133 134 # generate task info record 135 self._generate_task_record() 136 137 # generate summary ini 138 self._generate_summary() 139 140 # copy reports to reports/latest folder 141 self._copy_report() 142 143 self._transact_all() 144 145 LOG.info("") 146 LOG.info("**************************************************") 147 LOG.info("************** Ended generate reports ************") 148 LOG.info("**************************************************") 149 LOG.info("") 150 show_current_environment() 151 152 def _check_params(self, report_path, **kwargs): 153 task_info = kwargs.get("task_info", "") 154 if not report_path: 155 LOG.error("Report path is wrong", error_no="00440", 156 ReportPath=report_path) 157 return False 158 if not task_info or not isinstance(task_info, ExecInfo): 159 LOG.error("Task info is wrong", error_no="00441", 160 TaskInfo=task_info) 161 return False 162 163 os.makedirs(report_path, exist_ok=True) 164 self.report_path = report_path 165 self.task_info = task_info 166 self.summary_data_path = os.path.join( 167 self.report_path, ReportConstant.summary_data_report) 168 self.exec_info = task_info 169 self.data_helper = DataHelper() 170 self.repeat_helper = RepeatHelper(report_path) 171 return True 172 173 def _generate_test_report(self): 174 report_template = os.path.join(Variables.res_dir, "template") 175 missing_files = [] 176 for source in ReportConstant.new_template_sources: 177 file = source.get("file") 178 to_path = os.path.join(report_template, file) 179 if not os.path.exists(to_path) or os.path.getsize(to_path) == 0: 180 missing_files.append(to_path) 181 # 若新报告模板文件缺失,则使用旧报告模板生成测试报告 182 if len(missing_files) > 0: 183 self._generate_vision_reports() 184 return 185 186 copy_folder(report_template, self.report_path) 187 temp_file_report_html = os.path.join(self.report_path, "report.html") 188 if os.path.exists(temp_file_report_html): 189 os.remove(temp_file_report_html) 190 content = json.dumps(self._get_summary_data(), separators=(",", ":")) 191 data_js = os.path.join(self.report_path, "static", "data.js") 192 data_fd = os.open(data_js, os.O_CREAT | os.O_WRONLY, FilePermission.mode_644) 193 with os.fdopen(data_fd, mode="w", encoding="utf-8") as jsf: 194 jsf.write(f"window.reportData = {content}") 195 test_report = os.path.join(self.report_path, ReportConstant.summary_vision_report).replace("\\", "/") 196 LOG.info(f"Log path: {self.report_path}") 197 LOG.info(f"Generate test report: file:///{test_report}") 198 # 重新生成对象,避免在retry场景数据统计有误 199 self.summary = ResultSummary() 200 201 def _get_summary_data(self): 202 self.summary.repeat = self.task_info.repeat 203 modules = [] 204 for data_report, _ in self.data_reports: 205 if data_report.endswith(ReportConstant.summary_data_report): 206 continue 207 info = self._parse_module(data_report) 208 if info is not None: 209 modules.append(info) 210 if self.summary.failed != 0 or self.summary.blocked != 0 or self.summary.unavailable != 0: 211 if Context.get_scheduler(): 212 Context.get_scheduler().set_need_auto_retry(True) 213 data = { 214 "exec_info": self._get_exec_info(), 215 "summary": self.summary.get_data(), 216 "devices": self.summary.get_devices(), 217 "modules": modules, 218 } 219 return data 220 221 def _get_exec_info(self): 222 start_time = self.task_info.test_time 223 end_time = time.strftime(ReportConstant.time_format, time.localtime()) 224 test_time = "%s/ %s" % (start_time, end_time) 225 execute_time = calculate_elapsed_time( 226 time.mktime(time.strptime(start_time, ReportConstant.time_format)), 227 time.mktime(time.strptime(end_time, ReportConstant.time_format))) 228 host_info = platform.platform() 229 device_name = getattr(self.task_info, ReportConstant.device_name, "None") 230 device_type = getattr(self.task_info, ReportConstant.device_label, "None") 231 platform_info = getattr(self.task_info, ReportConstant.platform, "None") 232 test_type = getattr(self.task_info, ReportConstant.test_type, "None") 233 234 # 为报告文件summary.ini提供数据 235 exec_info = ExecInfo() 236 exec_info.device_name = device_name 237 exec_info.device_label = device_type 238 exec_info.execute_time = execute_time 239 exec_info.host_info = host_info 240 exec_info.log_path = self.report_path 241 exec_info.platform = platform_info 242 exec_info.test_time = test_time 243 exec_info.test_type = test_type 244 self.exec_info = exec_info 245 246 info = { 247 "test_start": start_time, 248 "test_end": end_time, 249 "execute_time": execute_time, 250 "test_type": test_type, 251 "host_info": host_info, 252 "logs": self._get_task_log() 253 } 254 return info 255 256 def _parse_module(self, xml_file): 257 """解析测试模块""" 258 file_name = os.path.basename(xml_file) 259 try: 260 xml_file_open = os.open(xml_file, os.O_RDWR, stat.S_IWUSR | stat.S_IRUSR) 261 with os.fdopen(xml_file_open, mode="r", encoding="utf-8") as file_handler: 262 xml_str = file_handler.read() 263 for char_index in range(32): 264 if char_index in [10, 13]: 265 continue 266 xml_str = xml_str.replace(chr(char_index), "") 267 ele_module = ElementTree.fromstring(xml_str) 268 except ElementTree.ParseError as e: 269 LOG.error(f"parse result xml error! xml file {xml_file}") 270 LOG.error(f"error message: {e}") 271 return None 272 module = ResultReporter._count_result(ele_module) 273 # 当模块名为空或为AllTests,将模块名设为结果xml的文件名 274 module_name = file_name[:-4] if module.name in ["", "AllTests"] else module.name.strip() 275 suites = [self._parse_testsuite(ele_suite) for ele_suite in ele_module] 276 277 # 为报告文件task_record.info提供数据 278 self.record_reports.update({module_name: xml_file}) 279 if len(self._failed_cases) != 0: 280 self.record_params.update({module_name: copy.copy(self._failed_cases)}) 281 self._failed_cases.clear() 282 283 self.summary.add_module(module_name) 284 self.summary.tests += module.tests 285 self.summary.passed += module.passed 286 self.summary.failed += module.failed 287 self.summary.blocked += module.blocked 288 self.summary.ignored += module.ignored 289 if module.unavailable == 0: 290 self.summary.runmodules += 1 291 else: 292 self.summary.unavailable += 1 293 devices = self._parse_devices(ele_module) 294 295 module_report, module_time = module.report, module.time 296 if len(suites) == 1 and suites[0].get(ReportConstant.name) == module_name: 297 report = suites[0].get(ReportConstant.report) 298 if report != "": 299 module_report = report 300 module_time = suites[0].get(ReportConstant.time) 301 repeat = int(ele_module.get(ReportConstant.repeat, "1")) 302 if self.summary.repeat < repeat: 303 self.summary.repeat = repeat 304 repeat_round = int(ele_module.get(ReportConstant.round, "1")) 305 test_type = ele_module.get(ReportConstant.test_type, "-") 306 test_start = ele_module.get(ReportConstant.start_time, "-") 307 test_end = ele_module.get(ReportConstant.end_time, "-") 308 if test_start != "-" and test_end != "-": 309 execute_time = calculate_elapsed_time( 310 time.mktime(time.strptime(test_start, ReportConstant.time_format)), 311 time.mktime(time.strptime(test_end, ReportConstant.time_format))) 312 else: 313 execute_time = calculate_elapsed_time(0, module_time) 314 info = { 315 "name": module_name, 316 "report": module_report, 317 "round": repeat_round, 318 "test_type": test_type, 319 "test_start": test_start, 320 "test_end": test_end, 321 "time": module_time, 322 "execute_time": execute_time, 323 "tests": module.tests, 324 "passed": module.passed, 325 "failed": module.failed, 326 "blocked": module.blocked, 327 "ignored": module.ignored, 328 "unavailable": module.unavailable, 329 "passingrate": calculate_percent(module.passed, module.tests), 330 "error": ele_module.get(ReportConstant.message, ""), 331 "logs": self._get_module_logs(module_name, repeat=repeat, repeat_round=repeat_round), 332 "devices": devices, 333 "suites": suites 334 } 335 return info 336 337 def _parse_testsuite(self, ele_suite): 338 """解析测试套""" 339 suite = ResultReporter._count_result(ele_suite) 340 cases = [self._parse_testcase(case) for case in ele_suite] 341 info = { 342 "name": suite.name, 343 "report": suite.report, 344 "time": suite.time, 345 "tests": suite.tests, 346 "passed": suite.passed, 347 "failed": suite.failed, 348 "blocked": suite.blocked, 349 "ignored": suite.ignored, 350 "passingrate": calculate_percent(suite.passed, suite.tests), 351 "cases": cases 352 } 353 return info 354 355 def _parse_testcase(self, ele_case): 356 """解析测试用例""" 357 name = ele_case.get(ReportConstant.name) 358 class_name = ele_case.get(ReportConstant.class_name, "") 359 result, error = get_case_result(ele_case) 360 if result != CaseResult.passed: 361 self._failed_cases.append(f"{class_name}#{name}") 362 return [name, class_name, result, ResultReporter._parse_time(ele_case), 363 error, ele_case.get(ReportConstant.report, "")] 364 365 @staticmethod 366 def _parse_time(ele): 367 try: 368 _time = float(ele.get(ReportConstant.time, "0")) 369 except ValueError: 370 _time = 0.0 371 LOG.error("parse test time error, set it to 0.0") 372 return _time 373 374 def _parse_devices(self, ele_module): 375 devices_str = ele_module.get(ReportConstant.devices, "") 376 if devices_str == "": 377 return [] 378 try: 379 devices = json.loads(parse_xml_cdata(devices_str)) 380 except Exception as e: 381 LOG.warning(f"parse devices from xml failed, {e}") 382 return [] 383 # 汇总测试设备信息 384 for device in devices: 385 device_sn = device.get(DeviceProperties.sn, "") 386 temp = [d for d in self.summary.get_devices() if d.get(DeviceProperties.sn, "") == device_sn] 387 if len(temp) != 0: 388 continue 389 self.summary.get_devices().append(device) 390 return devices 391 392 @staticmethod 393 def _count_result(ele): 394 name = ele.get(ReportConstant.name, "") 395 report = ele.get(ReportConstant.report, "") 396 _time = ResultReporter._parse_time(ele) 397 tests = int(ele.get(ReportConstant.tests, "0")) 398 failed = int(ele.get(ReportConstant.failures, "0")) 399 disabled = ele.get(ReportConstant.disabled, "0") 400 if disabled == "": 401 disabled = "0" 402 errors = ele.get(ReportConstant.errors, "0") 403 if errors == "": 404 errors = "0" 405 blocked = int(disabled) + int(errors) 406 ignored = int(ele.get(ReportConstant.ignored, "0")) 407 unavailable = int(ele.get(ReportConstant.unavailable, "0")) 408 409 tmp_pass = tests - failed - blocked - ignored 410 passed = tmp_pass if tmp_pass > 0 else 0 411 412 Result = collections.namedtuple( 413 'Result', 414 ['name', 'report', 'time', 'tests', 'passed', 'failed', 'blocked', 'ignored', 'unavailable']) 415 return Result(name, report, _time, tests, passed, failed, blocked, ignored, unavailable) 416 417 def _get_module_logs(self, module_name, repeat=1, repeat_round=1): 418 """获取模块运行日志和设备日志 419 注:黑盒用例的测试报告是单独生成的,而xts只有模块级的设备日志,无用例级日志,故本方法仅支持获取模块级的设备日志 420 """ 421 device_log = {} 422 round_folder = f"round{repeat_round}" if repeat > 1 else "" 423 log_path = os.path.join(self.report_path, "log", round_folder, module_name) 424 if not os.path.exists(log_path): 425 return device_log 426 module_log_uri = f"log/{round_folder}/{module_name}" if round_folder else f"log/{module_name}" 427 for filename in os.listdir(log_path): 428 file_link = f"{module_log_uri}/{filename}" 429 file_path = os.path.join(log_path, filename) 430 # 目录和模块运行日志,都提供链接 431 if os.path.isdir(file_path) or filename.startswith(ReportConstant.module_run_log): 432 device_log.setdefault(filename, file_link) 433 continue 434 # 是文件,仅提供模块级的设备日志链接 435 # 测试套日志命名格式device_log_sn.log、测试套子用例日志命名格式device_log_case_sn.log,后者“_”大于2 436 ret = re.fullmatch(r'(device_(?:hi)?log)_\S+\.log', filename) 437 if ret is None or filename.count("_") > 2: 438 continue 439 device_log.setdefault(ret.group(1), file_link) 440 return device_log 441 442 def _get_task_log(self): 443 log_path = os.path.join(self.report_path, "log") 444 if not os.path.exists(log_path): 445 return {} 446 return {f: f"log/{f}" for f in os.listdir(log_path) if f.startswith(ReportConstant.task_run_log)} 447 448 def _generate_data_report(self): 449 # initial element 450 test_suites_element = self.data_helper.initial_suites_element() 451 452 # update test suites element 453 update_flag = self._update_test_suites(test_suites_element) 454 if not update_flag: 455 return 456 457 # generate report 458 if not self._check_mode(ModeType.decc): 459 self.data_helper.generate_report(test_suites_element, 460 self.summary_data_path) 461 462 def _update_test_suites(self, test_suites_element): 463 # initial attributes for test suites element 464 test_suites_attributes, need_update_attributes = \ 465 self._init_attributes() 466 467 # get test suite elements that are children of test suites element 468 modules = dict() 469 test_suite_elements = [] 470 for data_report, module_name in self.data_reports: 471 if data_report.endswith(ReportConstant.summary_data_report): 472 continue 473 root = self.data_helper.parse_data_report(data_report) 474 self._parse_devices(root) 475 if module_name == ReportConstant.empty_name: 476 module_name = self._get_module_name(data_report, root) 477 total = int(root.get(ReportConstant.tests, 0)) 478 if module_name not in modules.keys(): 479 modules[module_name] = list() 480 modules[module_name].append(total) 481 482 failed_cases = [] 483 for child in root: 484 child.tail = self.data_helper.LINE_BREAK_INDENT 485 if not child.get(ReportConstant.module_name) or child.get( 486 ReportConstant.module_name) == \ 487 ReportConstant.empty_name: 488 child.set(ReportConstant.module_name, module_name) 489 self._check_tests_and_unavailable(child) 490 # covert the status of "notrun" to "ignored" 491 for element in child: 492 if element.get(ReportConstant.status, "") == \ 493 ReportConstant.not_run: 494 ignored = int(child.get(ReportConstant.ignored, 0)) + 1 495 child.set(ReportConstant.ignored, "%s" % ignored) 496 # 记录运行失败的测试用例的编号 497 if get_case_result(element)[0] != CaseResult.passed: 498 class_name = element.get(ReportConstant.class_name, "") 499 name = element.get(ReportConstant.name) 500 failed_cases.append(f"{class_name}#{name}") 501 test_suite_elements.append(child) 502 for update_attribute in need_update_attributes: 503 update_value = child.get(update_attribute, 0) 504 if not update_value: 505 update_value = 0 506 test_suites_attributes[update_attribute] += int( 507 update_value) 508 if failed_cases: 509 self.record_params.update({module_name: failed_cases}) 510 self.record_reports.update({module_name: data_report}) 511 512 if test_suite_elements: 513 child = test_suite_elements[-1] 514 child.tail = self.data_helper.LINE_BREAK 515 else: 516 LOG.error("Execute result not exists") 517 return False 518 519 # set test suites element attributes and children 520 self._handle_module_tests(modules, test_suites_attributes) 521 self.data_helper.set_element_attributes(test_suites_element, 522 test_suites_attributes) 523 test_suites_element.extend(test_suite_elements) 524 return True 525 526 @classmethod 527 def _check_tests_and_unavailable(cls, child): 528 total = child.get(ReportConstant.tests, "0") 529 unavailable = child.get(ReportConstant.unavailable, "0") 530 if total and total != "0" and unavailable and \ 531 unavailable != "0": 532 child.set(ReportConstant.unavailable, "0") 533 LOG.warning("%s total: %s, unavailable: %s", child.get( 534 ReportConstant.name), total, unavailable) 535 536 @classmethod 537 def _get_module_name(cls, data_report, root): 538 # get module name from data report 539 module_name = get_filename_extension(data_report)[0] 540 if "report" in module_name or "summary" in module_name or \ 541 "<" in data_report or ">" in data_report: 542 module_name = root.get(ReportConstant.name, 543 ReportConstant.empty_name) 544 if "report" in module_name or "summary" in module_name: 545 module_name = ReportConstant.empty_name 546 return module_name 547 548 def _init_attributes(self): 549 test_suites_attributes = { 550 ReportConstant.name: 551 ReportConstant.summary_data_report.split(".")[0], 552 ReportConstant.start_time: self.task_info.test_time, 553 ReportConstant.end_time: time.strftime(ReportConstant.time_format, 554 time.localtime()), 555 ReportConstant.errors: 0, ReportConstant.disabled: 0, 556 ReportConstant.failures: 0, ReportConstant.tests: 0, 557 ReportConstant.ignored: 0, ReportConstant.unavailable: 0, 558 ReportConstant.modules: 0, ReportConstant.run_modules: 0} 559 need_update_attributes = [ReportConstant.tests, ReportConstant.ignored, 560 ReportConstant.failures, 561 ReportConstant.disabled, 562 ReportConstant.errors, 563 ReportConstant.unavailable] 564 return test_suites_attributes, need_update_attributes 565 566 @property 567 def summary_data_report_exist(self): 568 return "<" in self.summary_data_str or \ 569 os.path.exists(self.summary_data_path) 570 571 @property 572 def data_reports(self): 573 if check_pub_key_exist() or self._check_mode(ModeType.decc): 574 from xdevice import SuiteReporter 575 suite_reports = SuiteReporter.get_report_result() 576 if self._check_mode(ModeType.decc): 577 LOG.debug("Handle history result, data reports length:{}". 578 format(len(suite_reports))) 579 SuiteReporter.clear_history_result() 580 SuiteReporter.append_history_result(suite_reports) 581 data_reports = [] 582 for report_path, report_result in suite_reports: 583 module_name = get_filename_extension(report_path)[0] 584 data_reports.append((report_result, module_name)) 585 SuiteReporter.clear_report_result() 586 return data_reports 587 588 if not os.path.isdir(self.report_path): 589 return [] 590 data_reports = [] 591 result_path = os.path.join(self.report_path, "result") 592 for root, _, files in os.walk(result_path): 593 for file_name in files: 594 if not file_name.endswith(self.data_helper.DATA_REPORT_SUFFIX): 595 continue 596 module_name = self._find_module_name(result_path, root) 597 data_reports.append((os.path.join(root, file_name), 598 module_name)) 599 return data_reports 600 601 @classmethod 602 def _find_module_name(cls, result_path, root): 603 # find module name from directory tree 604 common_path = os.path.commonpath([result_path, root]) 605 if os.path.normcase(result_path) != os.path.normcase(common_path) or \ 606 os.path.normcase(result_path) == os.path.normcase(root): 607 return ReportConstant.empty_name 608 609 root_dir, module_name = os.path.split(root) 610 if os.path.normcase(result_path) == os.path.normcase(root_dir): 611 return ReportConstant.empty_name 612 root_dir, subsystem_name = os.path.split(root_dir) 613 while os.path.normcase(result_path) != os.path.normcase(root_dir): 614 module_name = subsystem_name 615 root_dir, subsystem_name = os.path.split(root_dir) 616 return module_name 617 618 def _generate_summary(self): 619 if not self.summary_data_report_exist or \ 620 self._check_mode(ModeType.decc): 621 return 622 summary_ini_content = \ 623 "[default]\n" \ 624 "Platform={}\n" \ 625 "Test Type={}\n" \ 626 "Device Name={}\n" \ 627 "Host Info={}\n" \ 628 "Test Start/ End Time={}\n" \ 629 "Execution Time={}\n" \ 630 "Device Type={}\n".format( 631 self.exec_info.platform, self.exec_info.test_type, 632 self.exec_info.device_name, self.exec_info.host_info, 633 self.exec_info.test_time, self.exec_info.execute_time, 634 self.exec_info.device_label) 635 636 if self.exec_info.product_info: 637 for key, value in self.exec_info.product_info.items(): 638 summary_ini_content = "{}{}".format( 639 summary_ini_content, "%s=%s\n" % (key, value)) 640 641 if not self._check_mode(ModeType.factory): 642 summary_ini_content = "{}{}".format( 643 summary_ini_content, "Log Path=%s\n" % self.exec_info.log_path) 644 645 # write summary_ini_content 646 summary_filepath = os.path.join(self.report_path, 647 ReportConstant.summary_ini) 648 649 if platform.system() == "Windows": 650 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY 651 else: 652 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND 653 summary_filepath_open = os.open(summary_filepath, flags, 654 FilePermission.mode_755) 655 656 with os.fdopen(summary_filepath_open, "wb") as file_handler: 657 if check_pub_key_exist(): 658 try: 659 cipher_text = do_rsa_encrypt(summary_ini_content) 660 except ParamError as error: 661 LOG.error(error, error_no=error.error_no) 662 cipher_text = b"" 663 file_handler.write(cipher_text) 664 else: 665 file_handler.write(bytes(summary_ini_content, 'utf-8')) 666 file_handler.flush() 667 LOG.info("Generate summary ini: %s", summary_filepath) 668 self.repeat_helper.__generate_repeat_xml__(self.summary_data_path) 669 670 def _copy_report(self): 671 if Uploader.is_enable() or self._check_mode(ModeType.decc): 672 return 673 674 dst_path = os.path.join(Variables.temp_dir, "latest") 675 try: 676 shutil.rmtree(dst_path, ignore_errors=True) 677 os.makedirs(dst_path, exist_ok=True) 678 LOG.info("Copy summary files to %s", dst_path) 679 680 # copy reports to reports/latest folder 681 for report_file in os.listdir(self.report_path): 682 src_file = os.path.join(self.report_path, report_file) 683 dst_file = os.path.join(dst_path, report_file) 684 if os.path.isfile(src_file): 685 shutil.copyfile(src_file, dst_file) 686 except OSError as _: 687 return 688 689 @classmethod 690 def _check_mode(cls, mode): 691 return Context.session().mode == mode 692 693 def _generate_task_record(self): 694 # under encryption status, don't handle anything directly 695 if check_pub_key_exist() and not self._check_mode(ModeType.decc): 696 return 697 698 # get info from command_queue 699 if Context.command_queue().size() == 0: 700 return 701 _, command, report_path = Context.command_queue().get(-1) 702 703 record_info = { 704 "command": command, 705 "session_id": os.path.split(report_path)[-1], 706 "report_path": report_path, 707 "unsuccessful_params": self.record_params, 708 "data_reports": self.record_reports 709 } 710 711 def encode(content): 712 # inner function to encode 713 return ' '.join([bin(ord(c)).replace('0b', '') for c in content]) 714 715 # write into file 716 record_file = os.path.join(self.report_path, 717 ReportConstant.task_info_record) 718 _record_json = json.dumps(record_info, indent=2) 719 720 with open(file=record_file, mode="wb") as file: 721 if Context.session().mode == ModeType.decc: 722 # under decc, write in encoded text 723 file.write(bytes(encode(_record_json), encoding="utf-8")) 724 else: 725 # others, write in plain text 726 file.write(bytes(_record_json, encoding="utf-8")) 727 728 LOG.info("Generate record file: %s", record_file) 729 730 @classmethod 731 def get_task_info_params(cls, history_path): 732 # under encryption status, don't handle anything directly 733 if check_pub_key_exist() and not cls._check_mode(ModeType.decc): 734 return () 735 736 def decode(content): 737 result_list = [] 738 for b in content.split(' '): 739 result_list.append(chr(int(b, 2))) 740 return ''.join(result_list) 741 742 record_path = os.path.join(history_path, 743 ReportConstant.task_info_record) 744 if not os.path.exists(record_path): 745 LOG.error("%s not exists!", ReportConstant.task_info_record) 746 return () 747 748 with open(record_path, mode="rb") as file: 749 if Context.session().mode == ModeType.decc: 750 # under decc, read from encoded text 751 result = json.loads(decode(file.read().decode("utf-8"))) 752 else: 753 # others, read from plain text 754 result = json.loads(file.read()) 755 standard_length = 5 756 if not len(result.keys()) == standard_length: 757 LOG.error("%s error!", ReportConstant.task_info_record) 758 return () 759 760 return result 761 762 def _transact_all(self): 763 pyc_path = os.path.join(Variables.res_dir, "tools", "binder.pyc") 764 if not os.path.exists(pyc_path): 765 return 766 module_spec = util.spec_from_file_location("binder", pyc_path) 767 if not module_spec: 768 return 769 module = util.module_from_spec(module_spec) 770 module_spec.loader.exec_module(module) 771 if hasattr(module, "transact") and callable(module.transact): 772 module.transact(self, LOG) 773 del module 774 775 @classmethod 776 def _handle_module_tests(cls, modules, test_suites_attributes): 777 modules_list = list() 778 modules_zero = list() 779 for module_name, detail_list in modules.items(): 780 for total in detail_list: 781 modules_list.append(total) 782 if total == 0: 783 modules_zero.append(module_name) 784 test_suites_attributes[ReportConstant.run_modules] = \ 785 len(modules_list) - len(modules_zero) 786 test_suites_attributes[ReportConstant.modules] = len(modules_list) 787 if modules_zero: 788 LOG.info("The total tests of %s module is 0", ",".join( 789 modules_zero)) 790 791 # ******************** 使用旧报告模板的代码 BEGIN ******************** 792 def _generate_vision_reports(self): 793 from _core.report.reporter_helper import VisionHelper 794 vision_helper = VisionHelper() 795 vision_helper.report_path = self.report_path 796 if not self._check_mode(ModeType.decc) and not \ 797 self.summary_data_report_exist: 798 LOG.error("Summary data report not exists") 799 return 800 801 if check_pub_key_exist() or self._check_mode(ModeType.decc): 802 from xdevice import SuiteReporter 803 SuiteReporter.clear_report_result() 804 805 # parse data 806 if self.summary_data_str: 807 # only in decc mode and pub key, self.summary_data_str is not empty 808 summary_element_tree = self.data_helper.parse_data_report( 809 self.summary_data_str) 810 else: 811 summary_element_tree = self.data_helper.parse_data_report( 812 self.summary_data_path) 813 parsed_data = vision_helper.parse_element_data( 814 summary_element_tree, self.report_path, self.task_info) 815 self.parsed_data = parsed_data 816 self.exec_info, summary, _ = parsed_data 817 818 if self._check_mode(ModeType.decc): 819 return 820 821 LOG.info("Summary result: modules: %s, run modules: %s, total: " 822 "%s, passed: %s, failed: %s, blocked: %s, ignored: %s, " 823 "unavailable: %s", summary.modules, summary.run_modules, 824 summary.result.total, summary.result.passed, 825 summary.result.failed, summary.result.blocked, 826 summary.result.ignored, summary.result.unavailable) 827 LOG.info("Log path: %s", self.exec_info.log_path) 828 829 if summary.result.failed != 0 or summary.result.blocked != 0 or\ 830 summary.result.unavailable != 0: 831 from xdevice import Scheduler 832 Scheduler.is_need_auto_retry = True 833 834 # generate summary vision report 835 report_generate_flag = self._generate_vision_report( 836 vision_helper, parsed_data, ReportConstant.summary_title, 837 ReportConstant.summary_vision_report) 838 839 # generate details vision report 840 if report_generate_flag and summary.result.total > 0: 841 self._generate_vision_report( 842 vision_helper, parsed_data, ReportConstant.details_title, 843 ReportConstant.details_vision_report) 844 845 # generate failures vision report 846 if summary.result.total != ( 847 summary.result.passed + summary.result.ignored) or \ 848 summary.result.unavailable > 0: 849 self._generate_vision_report( 850 vision_helper, parsed_data, ReportConstant.failures_title, 851 ReportConstant.failures_vision_report) 852 853 # generate passes vision report 854 if summary.result.passed != 0: 855 self._generate_vision_report( 856 vision_helper, parsed_data, ReportConstant.passes_title, 857 ReportConstant.passes_vision_report) 858 859 # generate ignores vision report 860 if summary.result.ignored != 0: 861 self._generate_vision_report( 862 vision_helper, parsed_data, ReportConstant.ignores_title, 863 ReportConstant.ignores_vision_report) 864 865 def _generate_vision_report(self, vision_helper, parsed_data, title, render_target): 866 # render data 867 report_context = vision_helper.render_data( 868 title, parsed_data, 869 render_target=render_target, devices=self.summary.get_devices()) 870 871 # generate report 872 if report_context: 873 report_path = os.path.join(self.report_path, render_target) 874 vision_helper.generate_report(report_path, report_context) 875 return True 876 else: 877 LOG.error("Failed to generate %s", render_target) 878 return False 879 # ******************** 使用旧报告模板的代码 END ******************** 880