1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import collections 20import copy 21import json 22import os 23import platform 24import re 25import shutil 26import time 27import zipfile 28from importlib import util 29from ast import literal_eval 30from xml.etree import ElementTree 31 32from _core.interface import IReporter 33from _core.plugin import Plugin 34from _core.constants import DeviceProperties 35from _core.constants import ModeType 36from _core.constants import TestType 37from _core.constants import FilePermission 38from _core.logger import platform_logger 39from _core.exception import ParamError 40from _core.utils import calculate_elapsed_time 41from _core.utils import copy_folder 42from _core.utils import get_filename_extension 43from _core.report.encrypt import check_pub_key_exist 44from _core.report.encrypt import do_rsa_encrypt 45from _core.report.encrypt import get_file_summary 46from _core.report.reporter_helper import CaseResult 47from _core.report.reporter_helper import DataHelper 48from _core.report.reporter_helper import ExecInfo 49from _core.report.reporter_helper import VisionHelper 50from _core.report.reporter_helper import ReportConstant 51from _core.report.repeater_helper import RepeatHelper 52from xdevice import Variables 53 54LOG = platform_logger("ResultReporter") 55 56 57class ResultSummary: 58 59 def __init__(self): 60 self.modules = 0 61 self.runmodules = 0 62 self.tests = 0 63 self.passed = 0 64 self.failed = 0 65 self.blocked = 0 66 self.ignored = 0 67 self.unavailable = 0 68 self.devices = [] 69 70 def get_data(self): 71 LOG.info(f"Summary result: modules: {self.modules}, run modules: {self.runmodules}, " 72 f"total: {self.tests}, passed: {self.passed}, failed: {self.failed}, " 73 f"blocked: {self.blocked}, ignored: {self.ignored}, unavailable: {self.unavailable}") 74 data = { 75 "modules": self.modules, 76 "runmodules": self.runmodules, 77 "tests": self.tests, 78 "passed": self.passed, 79 "failed": self.failed, 80 "blocked": self.blocked, 81 "ignored": self.ignored, 82 "unavailable": self.unavailable 83 } 84 return data 85 86 def get_devices(self): 87 return self.devices 88 89 90@Plugin(type=Plugin.REPORTER, id=TestType.all) 91class ResultReporter(IReporter): 92 summary_report_result = [] 93 94 def __init__(self): 95 self.report_path = None 96 self.task_info = None 97 self.summary_data_path = None 98 self.summary_data_str = "" 99 self.exec_info = None 100 self.parsed_data = None 101 self.data_helper = None 102 self.vision_helper = None 103 self.repeat_helper = None 104 self.summary = ResultSummary() 105 106 # task_record.info数据 107 self._failed_cases = [] 108 self.record_params = {} 109 self.record_reports = {} 110 111 def __generate_reports__(self, report_path, **kwargs): 112 LOG.info("") 113 LOG.info("**************************************************") 114 LOG.info("************** Start generate reports ************") 115 LOG.info("**************************************************") 116 LOG.info("") 117 118 if self._check_params(report_path, **kwargs): 119 # generate data report 120 self._generate_data_report() 121 122 # generate vision reports 123 self._generate_vision_reports() 124 125 # generate task info record 126 self._generate_task_record() 127 128 # generate summary ini 129 self._generate_summary() 130 131 # copy reports to reports/latest folder 132 self._copy_report() 133 134 self._transact_all() 135 136 LOG.info("") 137 LOG.info("**************************************************") 138 LOG.info("************** Ended generate reports ************") 139 LOG.info("**************************************************") 140 LOG.info("") 141 142 def _check_params(self, report_path, **kwargs): 143 task_info = kwargs.get("task_info", "") 144 if not report_path: 145 LOG.error("Report path is wrong", error_no="00440", 146 ReportPath=report_path) 147 return False 148 if not task_info or not isinstance(task_info, ExecInfo): 149 LOG.error("Task info is wrong", error_no="00441", 150 TaskInfo=task_info) 151 return False 152 153 os.makedirs(report_path, exist_ok=True) 154 self.report_path = report_path 155 self.task_info = task_info 156 self.summary_data_path = os.path.join( 157 self.report_path, ReportConstant.summary_data_report) 158 self.exec_info = task_info 159 self.data_helper = DataHelper() 160 self.vision_helper = VisionHelper() 161 self.vision_helper.report_path = report_path 162 self.repeat_helper = RepeatHelper(report_path) 163 return True 164 165 def _generate_test_report(self): 166 report_template = os.path.join(Variables.res_dir, "template") 167 copy_folder(report_template, self.report_path) 168 content = json.dumps(self._get_summary_data()) 169 data_js = os.path.join(self.report_path, "static", "data.js") 170 data_fd = os.open(data_js, os.O_CREAT | os.O_WRONLY, FilePermission.mode_644) 171 with os.fdopen(data_fd, mode="w", encoding="utf-8") as jsf: 172 jsf.write(f"window.reportData = {content}") 173 test_report = os.path.join(self.report_path, ReportConstant.summary_vision_report).replace("\\", "/") 174 LOG.info(f"Log path: {self.report_path}") 175 LOG.info(f"Generate test report: file:///{test_report}") 176 # 重新生成对象,避免在retry场景数据统计有误 177 self.summary = ResultSummary() 178 179 def _get_summary_data(self): 180 modules = [] 181 for data_report, _ in self.data_reports: 182 if data_report.endswith(ReportConstant.summary_data_report): 183 continue 184 info = self._parse_module(data_report) 185 if info is not None: 186 modules.append(info) 187 if self.summary.failed != 0 or self.summary.blocked != 0 or self.summary.unavailable != 0: 188 from xdevice import Scheduler 189 Scheduler.is_need_auto_retry = True 190 data = { 191 "exec_info": self._get_exec_info(), 192 "summary": self.summary.get_data(), 193 "devices": self.summary.get_devices(), 194 "modules": modules, 195 } 196 return data 197 198 def _get_exec_info(self): 199 start_time = self.task_info.test_time 200 end_time = time.strftime(ReportConstant.time_format, time.localtime()) 201 test_time = "%s/ %s" % (start_time, end_time) 202 execute_time = calculate_elapsed_time( 203 time.mktime(time.strptime(start_time, ReportConstant.time_format)), 204 time.mktime(time.strptime(end_time, ReportConstant.time_format))) 205 host_info = platform.platform() 206 device_name = getattr(self.task_info, ReportConstant.device_name, "None") 207 device_type = getattr(self.task_info, ReportConstant.device_label, "None") 208 platform_info = getattr(self.task_info, ReportConstant.platform, "None") 209 test_type = getattr(self.task_info, ReportConstant.test_type, "None") 210 211 # 为报告文件summary.ini提供数据 212 exec_info = ExecInfo() 213 exec_info.device_name = device_name 214 exec_info.device_label = device_type 215 exec_info.execute_time = execute_time 216 exec_info.host_info = host_info 217 exec_info.log_path = self.report_path 218 exec_info.platform = platform_info 219 exec_info.test_time = test_time 220 exec_info.test_type = test_type 221 self.exec_info = exec_info 222 223 info = { 224 "test_start": start_time, 225 "test_end": end_time, 226 "execute_time": execute_time, 227 "test_type": test_type, 228 "host_info": host_info, 229 "logs": self._get_task_log() 230 } 231 return info 232 233 def _parse_module(self, xml_file): 234 """解析测试模块""" 235 file_name = os.path.basename(xml_file) 236 try: 237 ele_module = ElementTree.parse(xml_file).getroot() 238 except ElementTree.ParseError: 239 LOG.error(f"parse module result error, result file {file_name}") 240 return None 241 module = ResultReporter._count_result(ele_module) 242 module_name = file_name[:-4] if module.name == "" else module.name 243 suites = [self._parse_testsuite(ele_suite) for ele_suite in ele_module] 244 245 # 为报告文件task_record.info提供数据 246 self.record_reports.update({module_name: xml_file}) 247 if len(self._failed_cases) != 0: 248 self.record_params.update({module_name: copy.copy(self._failed_cases)}) 249 self._failed_cases.clear() 250 251 self.summary.modules += 1 252 self.summary.tests += module.tests 253 self.summary.passed += module.passed 254 self.summary.failed += module.failed 255 self.summary.blocked += module.blocked 256 self.summary.ignored += module.ignored 257 if module.unavailable == 0: 258 self.summary.runmodules += 1 259 else: 260 self.summary.unavailable += 1 261 devices = self._parse_devices(ele_module) 262 263 module_report, module_time = module.report, module.time 264 if len(suites) == 1 and suites[0].get(ReportConstant.name) == module_name: 265 report = suites[0].get(ReportConstant.report) 266 if report != "": 267 module_report = report 268 module_time = suites[0].get(ReportConstant.time) 269 info = { 270 "name": module_name, 271 "report": module_report, 272 "test_start": "-", 273 "test_end": "-", 274 "time": module_time, 275 "execute_time": calculate_elapsed_time(0, module_time), 276 "tests": module.tests, 277 "passed": module.passed, 278 "failed": module.failed, 279 "blocked": module.blocked, 280 "ignored": module.ignored, 281 "unavailable": module.unavailable, 282 "passingrate": "0%" if module.tests == 0 else "{:.0%}".format(module.passed / module.tests), 283 "error": ele_module.get(ReportConstant.message, ""), 284 "logs": self._get_device_log(module_name), 285 "devices": devices, 286 "suites": suites 287 } 288 return info 289 290 def _parse_testsuite(self, ele_suite): 291 """解析测试套""" 292 suite = ResultReporter._count_result(ele_suite) 293 cases = [self._parse_testcase(case) for case in ele_suite] 294 info = { 295 "name": suite.name, 296 "report": suite.report, 297 "time": suite.time, 298 "tests": suite.tests, 299 "passed": suite.passed, 300 "failed": suite.failed, 301 "blocked": suite.blocked, 302 "ignored": suite.ignored, 303 "passingrate": "0%" if suite.tests == 0 else "{:.0%}".format(suite.passed / suite.tests), 304 "cases": cases 305 } 306 return info 307 308 def _parse_testcase(self, ele_case): 309 """解析测试用例""" 310 name = ele_case.get(ReportConstant.name) 311 class_name = ele_case.get(ReportConstant.class_name, "") 312 result = ResultReporter._get_case_result(ele_case) 313 if result != CaseResult.passed: 314 self._failed_cases.append(f"{class_name}#{name}") 315 return [name, class_name, result, ResultReporter._parse_time(ele_case), 316 ele_case.get(ReportConstant.message, ""), ele_case.get(ReportConstant.report, "")] 317 318 @staticmethod 319 def _parse_time(ele): 320 try: 321 _time = float(ele.get(ReportConstant.time, "0")) 322 except ValueError: 323 _time = 0.0 324 LOG.error("parse test time error, set it to 0.0") 325 return _time 326 327 def _parse_devices(self, ele_module): 328 devices_str = ele_module.get(ReportConstant.devices, "") 329 if devices_str == "": 330 return [] 331 try: 332 devices = literal_eval(devices_str) 333 except SyntaxError: 334 return [] 335 for device in devices: 336 device_sn = device.get(DeviceProperties.sn, "") 337 temp = [d for d in self.summary.get_devices() if d.get(DeviceProperties.sn, "") == device_sn] 338 if len(temp) != 0: 339 continue 340 self.summary.get_devices().append(device) 341 return devices 342 343 @staticmethod 344 def _count_result(ele): 345 name = ele.get(ReportConstant.name, "") 346 report = ele.get(ReportConstant.report, "") 347 _time = ResultReporter._parse_time(ele) 348 tests = int(ele.get(ReportConstant.tests, "0")) 349 failed = int(ele.get(ReportConstant.failures, "0")) 350 disabled = ele.get(ReportConstant.disabled, "0") 351 if disabled == "": 352 disabled = "0" 353 errors = ele.get(ReportConstant.errors, "0") 354 if errors == "": 355 errors = "0" 356 blocked = int(disabled) + int(errors) 357 ignored = int(ele.get(ReportConstant.ignored, "0")) 358 unavailable = int(ele.get(ReportConstant.unavailable, "0")) 359 360 tmp_pass = tests - failed - blocked - ignored 361 passed = tmp_pass if tmp_pass > 0 else 0 362 363 Result = collections.namedtuple( 364 'Result', 365 ['name', 'report', 'time', 'tests', 'passed', 'failed', 'blocked', 'ignored', 'unavailable']) 366 return Result(name, report, _time, tests, passed, failed, blocked, ignored, unavailable) 367 368 @staticmethod 369 def _get_case_result(ele_case): 370 result_kind = ele_case.get(ReportConstant.result_kind, "") 371 if result_kind != "": 372 return result_kind 373 result = ele_case.get(ReportConstant.result, "") 374 status = ele_case.get(ReportConstant.status, "") 375 if result == ReportConstant.false and (status == ReportConstant.run or status == ""): 376 return CaseResult.failed 377 if status in [ReportConstant.blocked, ReportConstant.disabled, ReportConstant.error]: 378 return CaseResult.blocked 379 if status in [ReportConstant.skip, ReportConstant.not_run]: 380 return CaseResult.ignored 381 if status in [ReportConstant.unavailable]: 382 return CaseResult.unavailable 383 return CaseResult.passed 384 385 def _get_device_log(self, module_name): 386 """黑盒用例的测试报告是单独生成的, 而xts只有模块级的设备日志,无用例级日志,故本方法仅支持获取模块级的设备日志""" 387 device_log = {} 388 log_path = os.path.join(self.report_path, "log", module_name) 389 module_log_path = f"log/{module_name}" 390 if os.path.exists(log_path): 391 for filename in os.listdir(log_path): 392 file_link = f"{module_log_path}/{filename}" 393 file_path = os.path.join(log_path, filename) 394 # 是目录,都提供链接 395 if os.path.isdir(file_path): 396 device_log.setdefault(filename, file_link) 397 continue 398 # 是文件,仅提供模块的日志链接 399 # 测试套日志命名格式device_log_sn.log、测试套子用例日志命名格式device_log_case_sn.log,后者”_“大于2 400 ret = re.fullmatch(r'(device_(?:hi)?log)_\S+\.log', filename) 401 if ret is None or filename.count("_") > 2: 402 continue 403 device_log.setdefault(ret.group(1), file_link) 404 return device_log 405 406 def _get_task_log(self): 407 log_path = os.path.join(self.report_path, "log") 408 return {f: f"log/{f}" for f in os.listdir(log_path) if f.startswith("task_log.log")} 409 410 def _generate_data_report(self): 411 # initial element 412 test_suites_element = self.data_helper.initial_suites_element() 413 414 # update test suites element 415 update_flag = self._update_test_suites(test_suites_element) 416 if not update_flag: 417 return 418 419 # generate report 420 if not self._check_mode(ModeType.decc): 421 self.data_helper.generate_report(test_suites_element, 422 self.summary_data_path) 423 424 # set SuiteReporter.suite_report_result 425 if not check_pub_key_exist() and not self._check_mode( 426 ModeType.decc): 427 return 428 self.set_summary_report_result( 429 self.summary_data_path, DataHelper.to_string(test_suites_element)) 430 431 def _update_test_suites(self, test_suites_element): 432 # initial attributes for test suites element 433 test_suites_attributes, need_update_attributes = \ 434 self._init_attributes() 435 436 # get test suite elements that are children of test suites element 437 modules = dict() 438 test_suite_elements = [] 439 for data_report, module_name in self.data_reports: 440 if data_report.endswith(ReportConstant.summary_data_report): 441 continue 442 root = self.data_helper.parse_data_report(data_report) 443 self._parse_devices(root) 444 if module_name == ReportConstant.empty_name: 445 module_name = self._get_module_name(data_report, root) 446 total = int(root.get(ReportConstant.tests, 0)) 447 if module_name not in modules.keys(): 448 modules[module_name] = list() 449 modules[module_name].append(total) 450 451 self._append_product_info(test_suites_attributes, root) 452 for child in root: 453 child.tail = self.data_helper.LINE_BREAK_INDENT 454 if not child.get(ReportConstant.module_name) or child.get( 455 ReportConstant.module_name) == \ 456 ReportConstant.empty_name: 457 child.set(ReportConstant.module_name, module_name) 458 self._check_tests_and_unavailable(child) 459 # covert the status of "notrun" to "ignored" 460 for element in child: 461 if element.get(ReportConstant.status, "") == \ 462 ReportConstant.not_run: 463 ignored = int(child.get(ReportConstant.ignored, 0)) + 1 464 child.set(ReportConstant.ignored, "%s" % ignored) 465 test_suite_elements.append(child) 466 for update_attribute in need_update_attributes: 467 update_value = child.get(update_attribute, 0) 468 if not update_value: 469 update_value = 0 470 test_suites_attributes[update_attribute] += int( 471 update_value) 472 473 if test_suite_elements: 474 child = test_suite_elements[-1] 475 child.tail = self.data_helper.LINE_BREAK 476 else: 477 LOG.error("Execute result not exists") 478 return False 479 480 # set test suites element attributes and children 481 self._handle_module_tests(modules, test_suites_attributes) 482 self.data_helper.set_element_attributes(test_suites_element, 483 test_suites_attributes) 484 test_suites_element.extend(test_suite_elements) 485 return True 486 487 @classmethod 488 def _check_tests_and_unavailable(cls, child): 489 total = child.get(ReportConstant.tests, "0") 490 unavailable = child.get(ReportConstant.unavailable, "0") 491 if total and total != "0" and unavailable and \ 492 unavailable != "0": 493 child.set(ReportConstant.unavailable, "0") 494 LOG.warning("%s total: %s, unavailable: %s", child.get( 495 ReportConstant.name), total, unavailable) 496 497 @classmethod 498 def _append_product_info(cls, test_suites_attributes, root): 499 product_info = root.get(ReportConstant.product_info, "") 500 if not product_info: 501 return 502 try: 503 product_info = literal_eval(str(product_info)) 504 except SyntaxError as error: 505 LOG.error("%s %s", root.get(ReportConstant.name, ""), error.args) 506 product_info = {} 507 508 if not test_suites_attributes[ReportConstant.product_info]: 509 test_suites_attributes[ReportConstant.product_info] = \ 510 product_info 511 return 512 for key, value in product_info.items(): 513 exist_value = test_suites_attributes[ 514 ReportConstant.product_info].get(key, "") 515 516 if not exist_value: 517 test_suites_attributes[ 518 ReportConstant.product_info][key] = value 519 continue 520 if value in exist_value: 521 continue 522 test_suites_attributes[ReportConstant.product_info][key] = \ 523 "%s,%s" % (exist_value, value) 524 525 @classmethod 526 def _get_module_name(cls, data_report, root): 527 # get module name from data report 528 module_name = get_filename_extension(data_report)[0] 529 if "report" in module_name or "summary" in module_name or \ 530 "<" in data_report or ">" in data_report: 531 module_name = root.get(ReportConstant.name, 532 ReportConstant.empty_name) 533 if "report" in module_name or "summary" in module_name: 534 module_name = ReportConstant.empty_name 535 return module_name 536 537 def _init_attributes(self): 538 test_suites_attributes = { 539 ReportConstant.name: 540 ReportConstant.summary_data_report.split(".")[0], 541 ReportConstant.start_time: self.task_info.test_time, 542 ReportConstant.end_time: time.strftime(ReportConstant.time_format, 543 time.localtime()), 544 ReportConstant.errors: 0, ReportConstant.disabled: 0, 545 ReportConstant.failures: 0, ReportConstant.tests: 0, 546 ReportConstant.ignored: 0, ReportConstant.unavailable: 0, 547 ReportConstant.product_info: self.task_info.product_info, 548 ReportConstant.modules: 0, ReportConstant.run_modules: 0} 549 need_update_attributes = [ReportConstant.tests, ReportConstant.ignored, 550 ReportConstant.failures, 551 ReportConstant.disabled, 552 ReportConstant.errors, 553 ReportConstant.unavailable] 554 return test_suites_attributes, need_update_attributes 555 556 def _generate_vision_reports(self): 557 if not self._check_mode(ModeType.decc) and not \ 558 self.summary_data_report_exist: 559 LOG.error("Summary data report not exists") 560 return 561 562 if check_pub_key_exist() or self._check_mode(ModeType.decc): 563 if not self.summary_report_result_exists(): 564 LOG.error("Summary data report not exists") 565 return 566 self.summary_data_str = \ 567 self.get_result_of_summary_report() 568 if check_pub_key_exist(): 569 from xdevice import SuiteReporter 570 SuiteReporter.clear_report_result() 571 572 # parse data 573 if self.summary_data_str: 574 # only in decc mode and pub key, self.summary_data_str is not empty 575 summary_element_tree = self.data_helper.parse_data_report( 576 self.summary_data_str) 577 else: 578 summary_element_tree = self.data_helper.parse_data_report( 579 self.summary_data_path) 580 parsed_data = self.vision_helper.parse_element_data( 581 summary_element_tree, self.report_path, self.task_info) 582 self.parsed_data = parsed_data 583 self.exec_info, summary, _ = parsed_data 584 585 if self._check_mode(ModeType.decc): 586 return 587 588 LOG.info("Summary result: modules: %s, run modules: %s, total: " 589 "%s, passed: %s, failed: %s, blocked: %s, ignored: %s, " 590 "unavailable: %s", summary.modules, summary.run_modules, 591 summary.result.total, summary.result.passed, 592 summary.result.failed, summary.result.blocked, 593 summary.result.ignored, summary.result.unavailable) 594 LOG.info("Log path: %s", self.exec_info.log_path) 595 596 if summary.result.failed != 0 or summary.result.blocked != 0 or\ 597 summary.result.unavailable != 0: 598 from xdevice import Scheduler 599 Scheduler.is_need_auto_retry = True 600 601 # generate summary vision report 602 report_generate_flag = self._generate_vision_report( 603 parsed_data, ReportConstant.summary_title, 604 ReportConstant.summary_vision_report) 605 606 # generate details vision report 607 if report_generate_flag and summary.result.total > 0: 608 self._generate_vision_report( 609 parsed_data, ReportConstant.details_title, 610 ReportConstant.details_vision_report) 611 612 # generate failures vision report 613 if summary.result.total != ( 614 summary.result.passed + summary.result.ignored) or \ 615 summary.result.unavailable > 0: 616 self._generate_vision_report( 617 parsed_data, ReportConstant.failures_title, 618 ReportConstant.failures_vision_report) 619 620 # generate passes vision report 621 if summary.result.passed != 0: 622 self._generate_vision_report( 623 parsed_data, ReportConstant.passes_title, ReportConstant.passes_vision_report) 624 625 # generate ignores vision report 626 if summary.result.ignored != 0: 627 self._generate_vision_report( 628 parsed_data, ReportConstant.ignores_title, ReportConstant.ignores_vision_report) 629 630 def _generate_vision_report(self, parsed_data, title, render_target): 631 632 # render data 633 report_context = self.vision_helper.render_data( 634 title, parsed_data, 635 render_target=render_target, devices=self.summary.get_devices()) 636 637 # generate report 638 if report_context: 639 report_path = os.path.join(self.report_path, render_target) 640 self.vision_helper.generate_report(report_path, report_context) 641 return True 642 else: 643 LOG.error("Failed to generate %s", render_target) 644 return False 645 646 @property 647 def summary_data_report_exist(self): 648 return "<" in self.summary_data_str or \ 649 os.path.exists(self.summary_data_path) 650 651 @property 652 def data_reports(self): 653 if check_pub_key_exist() or self._check_mode(ModeType.decc): 654 from xdevice import SuiteReporter 655 suite_reports = SuiteReporter.get_report_result() 656 if self._check_mode(ModeType.decc): 657 LOG.debug("Handle history result, data reports length:{}". 658 format(len(suite_reports))) 659 SuiteReporter.clear_history_result() 660 SuiteReporter.append_history_result(suite_reports) 661 data_reports = [] 662 for report_path, report_result in suite_reports: 663 module_name = get_filename_extension(report_path)[0] 664 data_reports.append((report_result, module_name)) 665 SuiteReporter.clear_report_result() 666 return data_reports 667 668 if not os.path.isdir(self.report_path): 669 return [] 670 data_reports = [] 671 result_path = os.path.join(self.report_path, "result") 672 for root, _, files in os.walk(self.report_path): 673 for file_name in files: 674 if not file_name.endswith(self.data_helper.DATA_REPORT_SUFFIX): 675 continue 676 module_name = self._find_module_name(result_path, root) 677 data_reports.append((os.path.join(root, file_name), 678 module_name)) 679 return data_reports 680 681 @classmethod 682 def _find_module_name(cls, result_path, root): 683 # find module name from directory tree 684 common_path = os.path.commonpath([result_path, root]) 685 if os.path.normcase(result_path) != os.path.normcase(common_path) or \ 686 os.path.normcase(result_path) == os.path.normcase(root): 687 return ReportConstant.empty_name 688 689 root_dir, module_name = os.path.split(root) 690 if os.path.normcase(result_path) == os.path.normcase(root_dir): 691 return ReportConstant.empty_name 692 root_dir, subsystem_name = os.path.split(root_dir) 693 while os.path.normcase(result_path) != os.path.normcase(root_dir): 694 module_name = subsystem_name 695 root_dir, subsystem_name = os.path.split(root_dir) 696 return module_name 697 698 def _generate_summary(self): 699 if not self.summary_data_report_exist or \ 700 self._check_mode(ModeType.decc): 701 return 702 summary_ini_content = \ 703 "[default]\n" \ 704 "Platform={}\n" \ 705 "Test Type={}\n" \ 706 "Device Name={}\n" \ 707 "Host Info={}\n" \ 708 "Test Start/ End Time={}\n" \ 709 "Execution Time={}\n" \ 710 "Device Type={}\n".format( 711 self.exec_info.platform, self.exec_info.test_type, 712 self.exec_info.device_name, self.exec_info.host_info, 713 self.exec_info.test_time, self.exec_info.execute_time, 714 self.exec_info.device_label) 715 716 if self.exec_info.product_info: 717 for key, value in self.exec_info.product_info.items(): 718 summary_ini_content = "{}{}".format( 719 summary_ini_content, "%s=%s\n" % (key, value)) 720 721 if not self._check_mode(ModeType.factory): 722 summary_ini_content = "{}{}".format( 723 summary_ini_content, "Log Path=%s\n" % self.exec_info.log_path) 724 725 # write summary_ini_content 726 summary_filepath = os.path.join(self.report_path, 727 ReportConstant.summary_ini) 728 729 if platform.system() == "Windows": 730 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY 731 else: 732 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND 733 summary_filepath_open = os.open(summary_filepath, flags, 734 FilePermission.mode_755) 735 736 with os.fdopen(summary_filepath_open, "wb") as file_handler: 737 if check_pub_key_exist(): 738 try: 739 cipher_text = do_rsa_encrypt(summary_ini_content) 740 except ParamError as error: 741 LOG.error(error, error_no=error.error_no) 742 cipher_text = b"" 743 file_handler.write(cipher_text) 744 else: 745 file_handler.write(bytes(summary_ini_content, 'utf-8')) 746 file_handler.flush() 747 LOG.info("Generate summary ini: %s", summary_filepath) 748 self.repeat_helper.__generate_repeat_xml__(self.summary_data_path) 749 750 def _copy_report(self): 751 from xdevice import Scheduler 752 if Scheduler.upload_address or self._check_mode(ModeType.decc): 753 return 754 755 dst_path = os.path.join(Variables.temp_dir, "latest") 756 try: 757 shutil.rmtree(dst_path, ignore_errors=True) 758 os.makedirs(dst_path, exist_ok=True) 759 LOG.info("Copy summary files to %s", dst_path) 760 761 # copy reports to reports/latest folder 762 for report_file in os.listdir(self.report_path): 763 src_file = os.path.join(self.report_path, report_file) 764 dst_file = os.path.join(dst_path, report_file) 765 if os.path.isfile(src_file): 766 shutil.copyfile(src_file, dst_file) 767 except OSError as _: 768 return 769 770 def _compress_report_folder(self): 771 if self._check_mode(ModeType.decc) or \ 772 self._check_mode(ModeType.factory): 773 return None 774 775 if not os.path.isdir(self.report_path): 776 LOG.error("'%s' is not folder!" % self.report_path) 777 return None 778 779 # get file path list 780 file_path_list = [] 781 for dir_path, _, file_names in os.walk(self.report_path): 782 f_path = dir_path.replace(self.report_path, '') 783 f_path = f_path and f_path + os.sep or '' 784 for filename in file_names: 785 file_path_list.append( 786 (os.path.join(dir_path, filename), f_path + filename)) 787 788 # compress file 789 zipped_file = "%s.zip" % os.path.join( 790 self.report_path, os.path.basename(self.report_path)) 791 zip_object = zipfile.ZipFile(zipped_file, 'w', zipfile.ZIP_DEFLATED, 792 allowZip64=True) 793 try: 794 LOG.info("Executing compress process, please wait...") 795 long_size_file = [] 796 for src_path, target_path in file_path_list: 797 long_size_file.append((src_path, target_path)) 798 self._write_long_size_file(zip_object, long_size_file) 799 800 LOG.info("Generate zip file: %s", zipped_file) 801 except zipfile.BadZipFile as bad_error: 802 LOG.error("Zip report folder error: %s" % bad_error.args) 803 finally: 804 zip_object.close() 805 806 # generate hex digest, then save it to summary_report.hash 807 hash_file = os.path.abspath(os.path.join( 808 self.report_path, ReportConstant.summary_report_hash)) 809 hash_file_open = os.open(hash_file, os.O_WRONLY | os.O_CREAT | 810 os.O_APPEND, FilePermission.mode_755) 811 with os.fdopen(hash_file_open, "w") as hash_file_handler: 812 hash_file_handler.write(get_file_summary(zipped_file)) 813 LOG.info("Generate hash file: %s", hash_file) 814 hash_file_handler.flush() 815 return zipped_file 816 817 @classmethod 818 def _check_mode(cls, mode): 819 from xdevice import Scheduler 820 return Scheduler.mode == mode 821 822 def _generate_task_record(self): 823 # under encryption status, don't handle anything directly 824 if check_pub_key_exist() and not self._check_mode(ModeType.decc): 825 return 826 827 # get info from command_queue 828 from xdevice import Scheduler 829 if not Scheduler.command_queue: 830 return 831 _, command, report_path = Scheduler.command_queue[-1] 832 833 record_info = self._parse_record_from_data(command, report_path) 834 835 def encode(content): 836 # inner function to encode 837 return ' '.join([bin(ord(c)).replace('0b', '') for c in content]) 838 839 # write into file 840 record_file = os.path.join(self.report_path, 841 ReportConstant.task_info_record) 842 _record_json = json.dumps(record_info, indent=2) 843 844 with open(file=record_file, mode="wb") as file: 845 if Scheduler.mode == ModeType.decc: 846 # under decc, write in encoded text 847 file.write(bytes(encode(_record_json), encoding="utf-8")) 848 else: 849 # others, write in plain text 850 file.write(bytes(_record_json, encoding="utf-8")) 851 852 LOG.info("Generate record file: %s", record_file) 853 854 def _parse_record_from_data(self, command, report_path): 855 record = dict() 856 if self.parsed_data: 857 _, _, suites = self.parsed_data 858 unsuccessful = dict() 859 module_set = set() 860 for suite in suites: 861 module_set.add(suite.module_name) 862 863 failed = unsuccessful.get(suite.module_name, []) 864 # because suite not contains case's some attribute, 865 # for example, 'module', 'classname', 'name' . so 866 # if unavailable, only add module's name into list. 867 if int(suite.result.unavailable) > 0: 868 failed.append(suite.module_name) 869 else: 870 # others, get key attributes join string 871 for case in suite.get_cases(): 872 if not case.is_passed(): 873 failed.append( 874 "{}#{}".format(case.classname, case.name)) 875 unsuccessful.update({suite.module_name: failed}) 876 data_reports = self._get_data_reports(module_set) 877 record = {"command": command, 878 "session_id": os.path.split(report_path)[-1], 879 "report_path": report_path, 880 "unsuccessful_params": unsuccessful, 881 "data_reports": data_reports 882 } 883 return record 884 885 def _get_data_reports(self, module_set): 886 data_reports = dict() 887 if self._check_mode(ModeType.decc): 888 from xdevice import SuiteReporter 889 for module_name, report_path, _ in \ 890 SuiteReporter.get_history_result_list(): 891 if module_name in module_set: 892 data_reports.update({module_name: report_path}) 893 else: 894 for report_path, module_name in self.data_reports: 895 if module_name == ReportConstant.empty_name: 896 root = self.data_helper.parse_data_report(report_path) 897 module_name = self._get_module_name(report_path, root) 898 if module_name in module_set: 899 data_reports.update({module_name: report_path}) 900 901 return data_reports 902 903 @classmethod 904 def get_task_info_params(cls, history_path): 905 # under encryption status, don't handle anything directly 906 if check_pub_key_exist() and not cls._check_mode(ModeType.decc): 907 return () 908 909 def decode(content): 910 result_list = [] 911 for b in content.split(' '): 912 result_list.append(chr(int(b, 2))) 913 return ''.join(result_list) 914 915 record_path = os.path.join(history_path, 916 ReportConstant.task_info_record) 917 if not os.path.exists(record_path): 918 LOG.error("%s not exists!", ReportConstant.task_info_record) 919 return () 920 921 from xdevice import Scheduler 922 with open(record_path, mode="rb") as file: 923 if Scheduler.mode == ModeType.decc: 924 # under decc, read from encoded text 925 result = json.loads(decode(file.read().decode("utf-8"))) 926 else: 927 # others, read from plain text 928 result = json.loads(file.read()) 929 standard_length = 5 930 if not len(result.keys()) == standard_length: 931 LOG.error("%s error!", ReportConstant.task_info_record) 932 return () 933 934 return result 935 936 @classmethod 937 def set_summary_report_result(cls, summary_data_path, result_xml): 938 cls.summary_report_result.clear() 939 cls.summary_report_result.append((summary_data_path, result_xml)) 940 941 @classmethod 942 def get_result_of_summary_report(cls): 943 if cls.summary_report_result: 944 return cls.summary_report_result[0][1] 945 return None 946 947 @classmethod 948 def summary_report_result_exists(cls): 949 return True if cls.summary_report_result else False 950 951 @classmethod 952 def get_path_of_summary_report(cls): 953 if cls.summary_report_result: 954 return cls.summary_report_result[0][0] 955 return None 956 957 @classmethod 958 def _write_long_size_file(cls, zip_object, long_size_file): 959 for filename, arcname in long_size_file: 960 zip_info = zipfile.ZipInfo.from_file(filename, arcname) 961 zip_info.compress_type = getattr(zip_object, "compression", 962 zipfile.ZIP_DEFLATED) 963 if hasattr(zip_info, "_compresslevel"): 964 _compress_level = getattr(zip_object, "compresslevel", None) 965 setattr(zip_info, "_compresslevel", _compress_level) 966 with open(filename, "rb") as src, \ 967 zip_object.open(zip_info, "w") as des: 968 shutil.copyfileobj(src, des, 1024 * 1024 * 8) 969 970 def _transact_all(self): 971 pyc_path = os.path.join(Variables.res_dir, "tools", "binder.pyc") 972 if not os.path.exists(pyc_path): 973 return 974 module_spec = util.spec_from_file_location("binder", pyc_path) 975 if not module_spec: 976 return 977 module = util.module_from_spec(module_spec) 978 module_spec.loader.exec_module(module) 979 if hasattr(module, "transact") and callable(module.transact): 980 module.transact(self, LOG) 981 del module 982 983 @classmethod 984 def _handle_module_tests(cls, modules, test_suites_attributes): 985 modules_list = list() 986 modules_zero = list() 987 for module_name, detail_list in modules.items(): 988 for total in detail_list: 989 modules_list.append(total) 990 if total == 0: 991 modules_zero.append(module_name) 992 test_suites_attributes[ReportConstant.run_modules] = \ 993 len(modules_list) - len(modules_zero) 994 test_suites_attributes[ReportConstant.modules] = len(modules_list) 995 if modules_zero: 996 LOG.info("The total tests of %s module is 0", ",".join( 997 modules_zero))