1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import collections 20import copy 21import json 22import os 23import platform 24import shutil 25import time 26import zipfile 27from importlib import util 28from ast import literal_eval 29from xml.etree import ElementTree 30 31from _core.interface import IReporter 32from _core.plugin import Plugin 33from _core.constants import ModeType 34from _core.constants import TestType 35from _core.constants import FilePermission 36from _core.logger import platform_logger 37from _core.exception import ParamError 38from _core.utils import calculate_elapsed_time 39from _core.utils import copy_folder 40from _core.utils import get_filename_extension 41from _core.report.encrypt import check_pub_key_exist 42from _core.report.encrypt import do_rsa_encrypt 43from _core.report.encrypt import get_file_summary 44from _core.report.reporter_helper import CaseResult 45from _core.report.reporter_helper import DataHelper 46from _core.report.reporter_helper import ExecInfo 47from _core.report.reporter_helper import VisionHelper 48from _core.report.reporter_helper import ReportConstant 49from _core.report.repeater_helper import RepeatHelper 50from xdevice import Variables 51 52LOG = platform_logger("ResultReporter") 53 54 55class ResultSummary: 56 57 def __init__(self): 58 self.modules = 0 59 self.runmodules = 0 60 self.tests = 0 61 self.passed = 0 62 self.failed = 0 63 self.blocked = 0 64 self.ignored = 0 65 self.unavailable = 0 66 67 def get_data(self): 68 LOG.info(f"Summary result: modules: {self.modules}, run modules: {self.runmodules}, " 69 f"total: {self.tests}, passed: {self.passed}, failed: {self.failed}, " 70 f"blocked: {self.blocked}, ignored: {self.ignored}, unavailable: {self.unavailable}") 71 data = { 72 "modules": self.modules, 73 "runmodules": self.runmodules, 74 "tests": self.tests, 75 "passed": self.passed, 76 "failed": self.failed, 77 "blocked": self.blocked, 78 "ignored": self.ignored, 79 "unavailable": self.unavailable 80 } 81 return data 82 83 84@Plugin(type=Plugin.REPORTER, id=TestType.all) 85class ResultReporter(IReporter): 86 summary_report_result = [] 87 88 def __init__(self): 89 self.report_path = None 90 self.task_info = None 91 self.summary_data_path = None 92 self.summary_data_str = "" 93 self.exec_info = None 94 self.parsed_data = None 95 self.data_helper = None 96 self.vision_helper = None 97 self.repeat_helper = None 98 self.summary = ResultSummary() 99 100 # task_record.info数据 101 self._failed_cases = [] 102 self.record_params = {} 103 self.record_reports = {} 104 105 def __generate_reports__(self, report_path, **kwargs): 106 LOG.info("") 107 LOG.info("**************************************************") 108 LOG.info("************** Start generate reports ************") 109 LOG.info("**************************************************") 110 LOG.info("") 111 112 if self._check_params(report_path, **kwargs): 113 # generate data report 114 self._generate_data_report() 115 116 # generate vision reports 117 self._generate_vision_reports() 118 119 # generate task info record 120 self._generate_task_record() 121 122 # generate summary ini 123 self._generate_summary() 124 125 # copy reports to reports/latest folder 126 self._copy_report() 127 128 self._transact_all() 129 130 LOG.info("") 131 LOG.info("**************************************************") 132 LOG.info("************** Ended generate reports ************") 133 LOG.info("**************************************************") 134 LOG.info("") 135 136 def _check_params(self, report_path, **kwargs): 137 task_info = kwargs.get("task_info", "") 138 if not report_path: 139 LOG.error("Report path is wrong", error_no="00440", 140 ReportPath=report_path) 141 return False 142 if not task_info or not isinstance(task_info, ExecInfo): 143 LOG.error("Task info is wrong", error_no="00441", 144 TaskInfo=task_info) 145 return False 146 147 os.makedirs(report_path, exist_ok=True) 148 self.report_path = report_path 149 self.task_info = task_info 150 self.summary_data_path = os.path.join( 151 self.report_path, ReportConstant.summary_data_report) 152 self.exec_info = task_info 153 self.data_helper = DataHelper() 154 self.vision_helper = VisionHelper() 155 self.vision_helper.report_path = report_path 156 self.repeat_helper = RepeatHelper(report_path) 157 return True 158 159 def _generate_test_report(self): 160 report_template = os.path.join(Variables.res_dir, "template") 161 copy_folder(report_template, self.report_path) 162 content = json.dumps(self._get_summary_data()) 163 data_js = os.path.join(self.report_path, "static", "data.js") 164 data_fd = os.open(data_js, os.O_CREAT | os.O_WRONLY, FilePermission.mode_644) 165 with os.fdopen(data_fd, mode="w", encoding="utf-8") as jsf: 166 jsf.write(f"window.reportData = {content}") 167 test_report = os.path.join(self.report_path, ReportConstant.summary_vision_report).replace("\\", "/") 168 LOG.info(f"Log path: {self.report_path}") 169 LOG.info(f"Generate test report: file:///{test_report}") 170 # 重新生成对象,避免在retry场景数据统计有误 171 self.summary = ResultSummary() 172 173 def _get_summary_data(self): 174 modules = [] 175 for data_report, _ in self.data_reports: 176 if data_report.endswith(ReportConstant.summary_data_report): 177 continue 178 info = self._parse_module(data_report) 179 if info is not None: 180 modules.append(info) 181 if self.summary.failed != 0 or self.summary.blocked != 0 or self.summary.unavailable != 0: 182 from xdevice import Scheduler 183 Scheduler.is_need_auto_retry = True 184 data = { 185 "exec_info": self._get_exec_info(), 186 "summary": self.summary.get_data(), 187 "modules": modules, 188 } 189 return data 190 191 def _get_exec_info(self): 192 start_time = self.task_info.test_time 193 end_time = time.strftime(ReportConstant.time_format, time.localtime()) 194 test_time = "%s/ %s" % (start_time, end_time) 195 execute_time = calculate_elapsed_time( 196 time.mktime(time.strptime(start_time, ReportConstant.time_format)), 197 time.mktime(time.strptime(end_time, ReportConstant.time_format))) 198 host_info = platform.platform() 199 device_name = getattr(self.task_info, ReportConstant.device_name, "None") 200 device_type = getattr(self.task_info, ReportConstant.device_label, "None") 201 platform_info = getattr(self.task_info, ReportConstant.platform, "None") 202 test_type = getattr(self.task_info, ReportConstant.test_type, "None") 203 204 # 为报告文件summary.ini提供数据 205 exec_info = ExecInfo() 206 exec_info.device_name = device_name 207 exec_info.device_label = device_type 208 exec_info.execute_time = execute_time 209 exec_info.host_info = host_info 210 exec_info.log_path = self.report_path 211 exec_info.platform = platform_info 212 exec_info.test_time = test_time 213 exec_info.test_type = test_type 214 self.exec_info = exec_info 215 216 info = { 217 "platform": platform_info, 218 "test_type": test_type, 219 "device_name": device_name, 220 "device_type": device_type, 221 "test_time": test_time, 222 "execute_time": execute_time, 223 "host_info": host_info 224 } 225 return info 226 227 def _parse_module(self, xml_file): 228 """解析测试模块""" 229 file_name = os.path.basename(xml_file) 230 try: 231 ele_module = ElementTree.parse(xml_file).getroot() 232 except ElementTree.ParseError: 233 LOG.error(f"parse module result error, result file {file_name}") 234 return None 235 module = ResultReporter._count_result(ele_module) 236 module_name = file_name[:-4] if module.name == "" else module.name 237 suites = [self._parse_testsuite(ele_suite) for ele_suite in ele_module] 238 239 # 为报告文件task_record.info提供数据 240 self.record_reports.update({module_name: xml_file}) 241 if len(self._failed_cases) != 0: 242 self.record_params.update({module_name: copy.copy(self._failed_cases)}) 243 self._failed_cases.clear() 244 245 self.summary.modules += 1 246 self.summary.tests += module.tests 247 self.summary.passed += module.passed 248 self.summary.failed += module.failed 249 self.summary.blocked += module.blocked 250 self.summary.ignored += module.ignored 251 if module.unavailable == 0: 252 self.summary.runmodules += 1 253 else: 254 self.summary.unavailable += 1 255 256 module_report, module_time = module.report, module.time 257 if len(suites) == 1 and suites[0].get(ReportConstant.name) == module_name: 258 report = suites[0].get(ReportConstant.report) 259 if report != "": 260 module_report = report 261 module_time = suites[0].get(ReportConstant.time) 262 info = { 263 "name": module_name, 264 "report": module_report, 265 "time": module_time, 266 "execute_time": calculate_elapsed_time(0, module_time), 267 "tests": module.tests, 268 "passed": module.passed, 269 "failed": module.failed, 270 "blocked": module.blocked, 271 "ignored": module.ignored, 272 "unavailable": module.unavailable, 273 "passingrate": "0%" if module.tests == 0 else "{:.0%}".format(module.passed / module.tests), 274 "productinfo": ResultReporter._parse_product_info(ele_module), 275 "suites": suites 276 } 277 return info 278 279 def _parse_testsuite(self, ele_suite): 280 """解析测试套""" 281 suite = ResultReporter._count_result(ele_suite) 282 cases = [self._parse_testcase(case) for case in ele_suite] 283 info = { 284 "name": suite.name, 285 "report": suite.report, 286 "time": suite.time, 287 "tests": suite.tests, 288 "passed": suite.passed, 289 "failed": suite.failed, 290 "blocked": suite.blocked, 291 "ignored": suite.ignored, 292 "passingrate": "0%" if suite.tests == 0 else "{:.0%}".format(suite.passed / suite.tests), 293 "cases": cases 294 } 295 return info 296 297 def _parse_testcase(self, ele_case): 298 """解析测试用例""" 299 name = ele_case.get(ReportConstant.name) 300 class_name = ele_case.get(ReportConstant.class_name, "") 301 result = ResultReporter._get_case_result(ele_case) 302 if result != CaseResult.passed: 303 self._failed_cases.append(f"{class_name}#{name}") 304 return [name, class_name, result, ResultReporter._parse_time(ele_case), 305 ele_case.get(ReportConstant.message, ""), ele_case.get(ReportConstant.report, "")] 306 307 @staticmethod 308 def _parse_time(ele): 309 try: 310 _time = float(ele.get(ReportConstant.time, "0")) 311 except ValueError: 312 _time = 0.0 313 LOG.error("parse test time error, set it to 0.0") 314 return _time 315 316 @staticmethod 317 def _parse_product_info(ele_module): 318 product_info = ele_module.get(ReportConstant.product_info, "") 319 if product_info == "": 320 return {} 321 try: 322 return literal_eval(product_info) 323 except SyntaxError: 324 return {} 325 326 @staticmethod 327 def _count_result(ele): 328 name = ele.get(ReportConstant.name, "") 329 report = ele.get(ReportConstant.report, "") 330 _time = ResultReporter._parse_time(ele) 331 tests = int(ele.get(ReportConstant.tests, "0")) 332 failed = int(ele.get(ReportConstant.failures, "0")) 333 disabled = ele.get(ReportConstant.disabled, "0") 334 if disabled == "": 335 disabled = "0" 336 errors = ele.get(ReportConstant.errors, "0") 337 if errors == "": 338 errors = "0" 339 blocked = int(disabled) + int(errors) 340 ignored = int(ele.get(ReportConstant.ignored, "0")) 341 unavailable = int(ele.get(ReportConstant.unavailable, "0")) 342 343 tmp_pass = tests - failed - blocked - ignored 344 passed = tmp_pass if tmp_pass > 0 else 0 345 346 Result = collections.namedtuple( 347 'Result', 348 ['name', 'report', 'time', 'tests', 'passed', 'failed', 'blocked', 'ignored', 'unavailable']) 349 return Result(name, report, _time, tests, passed, failed, blocked, ignored, unavailable) 350 351 def _get_case_result(ele_case): 352 result_kind = ele_case.get(ReportConstant.result_kind, "") 353 if result_kind != "": 354 return result_kind 355 result = ele_case.get(ReportConstant.result, "") 356 status = ele_case.get(ReportConstant.status, "") 357 if result == ReportConstant.false and (status == ReportConstant.run or status == ""): 358 return CaseResult.failed 359 if status in [ReportConstant.blocked, ReportConstant.disabled, ReportConstant.error]: 360 return CaseResult.blocked 361 if status in [ReportConstant.skip, ReportConstant.not_run]: 362 return CaseResult.ignored 363 if status in [ReportConstant.unavailable]: 364 return CaseResult.unavailable 365 return CaseResult.passed 366 367 def _generate_data_report(self): 368 # initial element 369 test_suites_element = self.data_helper.initial_suites_element() 370 371 # update test suites element 372 update_flag = self._update_test_suites(test_suites_element) 373 if not update_flag: 374 return 375 376 # generate report 377 if not self._check_mode(ModeType.decc): 378 self.data_helper.generate_report(test_suites_element, 379 self.summary_data_path) 380 381 # set SuiteReporter.suite_report_result 382 if not check_pub_key_exist() and not self._check_mode( 383 ModeType.decc): 384 return 385 self.set_summary_report_result( 386 self.summary_data_path, DataHelper.to_string(test_suites_element)) 387 388 def _update_test_suites(self, test_suites_element): 389 # initial attributes for test suites element 390 test_suites_attributes, need_update_attributes = \ 391 self._init_attributes() 392 393 # get test suite elements that are children of test suites element 394 modules = dict() 395 test_suite_elements = [] 396 for data_report, module_name in self.data_reports: 397 if data_report.endswith(ReportConstant.summary_data_report): 398 continue 399 root = self.data_helper.parse_data_report(data_report) 400 if module_name == ReportConstant.empty_name: 401 module_name = self._get_module_name(data_report, root) 402 total = int(root.get(ReportConstant.tests, 0)) 403 if module_name not in modules.keys(): 404 modules[module_name] = list() 405 modules[module_name].append(total) 406 407 self._append_product_info(test_suites_attributes, root) 408 for child in root: 409 child.tail = self.data_helper.LINE_BREAK_INDENT 410 if not child.get(ReportConstant.module_name) or child.get( 411 ReportConstant.module_name) == \ 412 ReportConstant.empty_name: 413 child.set(ReportConstant.module_name, module_name) 414 self._check_tests_and_unavailable(child) 415 # covert the status of "notrun" to "ignored" 416 for element in child: 417 if element.get(ReportConstant.status, "") == \ 418 ReportConstant.not_run: 419 ignored = int(child.get(ReportConstant.ignored, 0)) + 1 420 child.set(ReportConstant.ignored, "%s" % ignored) 421 test_suite_elements.append(child) 422 for update_attribute in need_update_attributes: 423 update_value = child.get(update_attribute, 0) 424 if not update_value: 425 update_value = 0 426 test_suites_attributes[update_attribute] += int( 427 update_value) 428 429 if test_suite_elements: 430 child = test_suite_elements[-1] 431 child.tail = self.data_helper.LINE_BREAK 432 else: 433 LOG.error("Execute result not exists") 434 return False 435 436 # set test suites element attributes and children 437 self._handle_module_tests(modules, test_suites_attributes) 438 self.data_helper.set_element_attributes(test_suites_element, 439 test_suites_attributes) 440 test_suites_element.extend(test_suite_elements) 441 return True 442 443 @classmethod 444 def _check_tests_and_unavailable(cls, child): 445 total = child.get(ReportConstant.tests, "0") 446 unavailable = child.get(ReportConstant.unavailable, "0") 447 if total and total != "0" and unavailable and \ 448 unavailable != "0": 449 child.set(ReportConstant.unavailable, "0") 450 LOG.warning("%s total: %s, unavailable: %s", child.get( 451 ReportConstant.name), total, unavailable) 452 453 @classmethod 454 def _append_product_info(cls, test_suites_attributes, root): 455 product_info = root.get(ReportConstant.product_info, "") 456 if not product_info: 457 return 458 try: 459 product_info = literal_eval(str(product_info)) 460 except SyntaxError as error: 461 LOG.error("%s %s", root.get(ReportConstant.name, ""), error.args) 462 product_info = {} 463 464 if not test_suites_attributes[ReportConstant.product_info]: 465 test_suites_attributes[ReportConstant.product_info] = \ 466 product_info 467 return 468 for key, value in product_info.items(): 469 exist_value = test_suites_attributes[ 470 ReportConstant.product_info].get(key, "") 471 472 if not exist_value: 473 test_suites_attributes[ 474 ReportConstant.product_info][key] = value 475 continue 476 if value in exist_value: 477 continue 478 test_suites_attributes[ReportConstant.product_info][key] = \ 479 "%s,%s" % (exist_value, value) 480 481 @classmethod 482 def _get_module_name(cls, data_report, root): 483 # get module name from data report 484 module_name = get_filename_extension(data_report)[0] 485 if "report" in module_name or "summary" in module_name or \ 486 "<" in data_report or ">" in data_report: 487 module_name = root.get(ReportConstant.name, 488 ReportConstant.empty_name) 489 if "report" in module_name or "summary" in module_name: 490 module_name = ReportConstant.empty_name 491 return module_name 492 493 def _init_attributes(self): 494 test_suites_attributes = { 495 ReportConstant.name: 496 ReportConstant.summary_data_report.split(".")[0], 497 ReportConstant.start_time: self.task_info.test_time, 498 ReportConstant.end_time: time.strftime(ReportConstant.time_format, 499 time.localtime()), 500 ReportConstant.errors: 0, ReportConstant.disabled: 0, 501 ReportConstant.failures: 0, ReportConstant.tests: 0, 502 ReportConstant.ignored: 0, ReportConstant.unavailable: 0, 503 ReportConstant.product_info: self.task_info.product_info, 504 ReportConstant.modules: 0, ReportConstant.run_modules: 0} 505 need_update_attributes = [ReportConstant.tests, ReportConstant.ignored, 506 ReportConstant.failures, 507 ReportConstant.disabled, 508 ReportConstant.errors, 509 ReportConstant.unavailable] 510 return test_suites_attributes, need_update_attributes 511 512 def _generate_vision_reports(self): 513 if not self._check_mode(ModeType.decc) and not \ 514 self.summary_data_report_exist: 515 LOG.error("Summary data report not exists") 516 return 517 518 if check_pub_key_exist() or self._check_mode(ModeType.decc): 519 if not self.summary_report_result_exists(): 520 LOG.error("Summary data report not exists") 521 return 522 self.summary_data_str = \ 523 self.get_result_of_summary_report() 524 if check_pub_key_exist(): 525 from xdevice import SuiteReporter 526 SuiteReporter.clear_report_result() 527 528 # parse data 529 if self.summary_data_str: 530 # only in decc mode and pub key, self.summary_data_str is not empty 531 summary_element_tree = self.data_helper.parse_data_report( 532 self.summary_data_str) 533 else: 534 summary_element_tree = self.data_helper.parse_data_report( 535 self.summary_data_path) 536 parsed_data = self.vision_helper.parse_element_data( 537 summary_element_tree, self.report_path, self.task_info) 538 self.parsed_data = parsed_data 539 self.exec_info, summary, _ = parsed_data 540 541 if self._check_mode(ModeType.decc): 542 return 543 544 LOG.info("Summary result: modules: %s, run modules: %s, total: " 545 "%s, passed: %s, failed: %s, blocked: %s, ignored: %s, " 546 "unavailable: %s", summary.modules, summary.run_modules, 547 summary.result.total, summary.result.passed, 548 summary.result.failed, summary.result.blocked, 549 summary.result.ignored, summary.result.unavailable) 550 LOG.info("Log path: %s", self.exec_info.log_path) 551 552 if summary.result.failed != 0 or summary.result.blocked != 0 or\ 553 summary.result.unavailable != 0: 554 from xdevice import Scheduler 555 Scheduler.is_need_auto_retry = True 556 557 # generate summary vision report 558 report_generate_flag = self._generate_vision_report( 559 parsed_data, ReportConstant.summary_title, 560 ReportConstant.summary_vision_report) 561 562 # generate details vision report 563 if report_generate_flag and summary.result.total > 0: 564 self._generate_vision_report( 565 parsed_data, ReportConstant.details_title, 566 ReportConstant.details_vision_report) 567 568 # generate failures vision report 569 if summary.result.total != ( 570 summary.result.passed + summary.result.ignored) or \ 571 summary.result.unavailable > 0: 572 self._generate_vision_report( 573 parsed_data, ReportConstant.failures_title, 574 ReportConstant.failures_vision_report) 575 576 # generate passes vision report 577 if summary.result.passed != 0: 578 self._generate_vision_report( 579 parsed_data, ReportConstant.passes_title, ReportConstant.passes_vision_report) 580 581 # generate ignores vision report 582 if summary.result.ignored != 0: 583 self._generate_vision_report( 584 parsed_data, ReportConstant.ignores_title, ReportConstant.ignores_vision_report) 585 586 def _generate_vision_report(self, parsed_data, title, render_target): 587 588 # render data 589 report_context = self.vision_helper.render_data( 590 title, parsed_data, render_target=render_target) 591 592 # generate report 593 if report_context: 594 report_path = os.path.join(self.report_path, render_target) 595 self.vision_helper.generate_report(report_path, report_context) 596 return True 597 else: 598 LOG.error("Failed to generate %s", render_target) 599 return False 600 601 @property 602 def summary_data_report_exist(self): 603 return "<" in self.summary_data_str or \ 604 os.path.exists(self.summary_data_path) 605 606 @property 607 def data_reports(self): 608 if check_pub_key_exist() or self._check_mode(ModeType.decc): 609 from xdevice import SuiteReporter 610 suite_reports = SuiteReporter.get_report_result() 611 if self._check_mode(ModeType.decc): 612 LOG.debug("Handle history result, data reports length:{}". 613 format(len(suite_reports))) 614 SuiteReporter.clear_history_result() 615 SuiteReporter.append_history_result(suite_reports) 616 data_reports = [] 617 for report_path, report_result in suite_reports: 618 module_name = get_filename_extension(report_path)[0] 619 data_reports.append((report_result, module_name)) 620 SuiteReporter.clear_report_result() 621 return data_reports 622 623 if not os.path.isdir(self.report_path): 624 return [] 625 data_reports = [] 626 result_path = os.path.join(self.report_path, "result") 627 for root, _, files in os.walk(self.report_path): 628 for file_name in files: 629 if not file_name.endswith(self.data_helper.DATA_REPORT_SUFFIX): 630 continue 631 module_name = self._find_module_name(result_path, root) 632 data_reports.append((os.path.join(root, file_name), 633 module_name)) 634 return data_reports 635 636 @classmethod 637 def _find_module_name(cls, result_path, root): 638 # find module name from directory tree 639 common_path = os.path.commonpath([result_path, root]) 640 if os.path.normcase(result_path) != os.path.normcase(common_path) or \ 641 os.path.normcase(result_path) == os.path.normcase(root): 642 return ReportConstant.empty_name 643 644 root_dir, module_name = os.path.split(root) 645 if os.path.normcase(result_path) == os.path.normcase(root_dir): 646 return ReportConstant.empty_name 647 root_dir, subsystem_name = os.path.split(root_dir) 648 while os.path.normcase(result_path) != os.path.normcase(root_dir): 649 module_name = subsystem_name 650 root_dir, subsystem_name = os.path.split(root_dir) 651 return module_name 652 653 def _generate_summary(self): 654 if not self.summary_data_report_exist or \ 655 self._check_mode(ModeType.decc): 656 return 657 summary_ini_content = \ 658 "[default]\n" \ 659 "Platform={}\n" \ 660 "Test Type={}\n" \ 661 "Device Name={}\n" \ 662 "Host Info={}\n" \ 663 "Test Start/ End Time={}\n" \ 664 "Execution Time={}\n" \ 665 "Device Type={}\n".format( 666 self.exec_info.platform, self.exec_info.test_type, 667 self.exec_info.device_name, self.exec_info.host_info, 668 self.exec_info.test_time, self.exec_info.execute_time, 669 self.exec_info.device_label) 670 671 if self.exec_info.product_info: 672 for key, value in self.exec_info.product_info.items(): 673 summary_ini_content = "{}{}".format( 674 summary_ini_content, "%s=%s\n" % (key, value)) 675 676 if not self._check_mode(ModeType.factory): 677 summary_ini_content = "{}{}".format( 678 summary_ini_content, "Log Path=%s\n" % self.exec_info.log_path) 679 680 # write summary_ini_content 681 summary_filepath = os.path.join(self.report_path, 682 ReportConstant.summary_ini) 683 684 if platform.system() == "Windows": 685 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY 686 else: 687 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND 688 summary_filepath_open = os.open(summary_filepath, flags, 689 FilePermission.mode_755) 690 691 with os.fdopen(summary_filepath_open, "wb") as file_handler: 692 if check_pub_key_exist(): 693 try: 694 cipher_text = do_rsa_encrypt(summary_ini_content) 695 except ParamError as error: 696 LOG.error(error, error_no=error.error_no) 697 cipher_text = b"" 698 file_handler.write(cipher_text) 699 else: 700 file_handler.write(bytes(summary_ini_content, 'utf-8')) 701 file_handler.flush() 702 LOG.info("Generate summary ini: %s", summary_filepath) 703 self.repeat_helper.__generate_repeat_xml__(self.summary_data_path) 704 705 def _copy_report(self): 706 from xdevice import Scheduler 707 if Scheduler.upload_address or self._check_mode(ModeType.decc): 708 return 709 710 dst_path = os.path.join(Variables.temp_dir, "latest") 711 try: 712 shutil.rmtree(dst_path, ignore_errors=True) 713 os.makedirs(dst_path, exist_ok=True) 714 LOG.info("Copy summary files to %s", dst_path) 715 716 # copy reports to reports/latest folder 717 for report_file in os.listdir(self.report_path): 718 src_file = os.path.join(self.report_path, report_file) 719 dst_file = os.path.join(dst_path, report_file) 720 if os.path.isfile(src_file): 721 shutil.copyfile(src_file, dst_file) 722 except OSError as _: 723 return 724 725 def _compress_report_folder(self): 726 if self._check_mode(ModeType.decc) or \ 727 self._check_mode(ModeType.factory): 728 return None 729 730 if not os.path.isdir(self.report_path): 731 LOG.error("'%s' is not folder!" % self.report_path) 732 return None 733 734 # get file path list 735 file_path_list = [] 736 for dir_path, _, file_names in os.walk(self.report_path): 737 f_path = dir_path.replace(self.report_path, '') 738 f_path = f_path and f_path + os.sep or '' 739 for filename in file_names: 740 file_path_list.append( 741 (os.path.join(dir_path, filename), f_path + filename)) 742 743 # compress file 744 zipped_file = "%s.zip" % os.path.join( 745 self.report_path, os.path.basename(self.report_path)) 746 zip_object = zipfile.ZipFile(zipped_file, 'w', zipfile.ZIP_DEFLATED, 747 allowZip64=True) 748 try: 749 LOG.info("Executing compress process, please wait...") 750 long_size_file = [] 751 for src_path, target_path in file_path_list: 752 long_size_file.append((src_path, target_path)) 753 self._write_long_size_file(zip_object, long_size_file) 754 755 LOG.info("Generate zip file: %s", zipped_file) 756 except zipfile.BadZipFile as bad_error: 757 LOG.error("Zip report folder error: %s" % bad_error.args) 758 finally: 759 zip_object.close() 760 761 # generate hex digest, then save it to summary_report.hash 762 hash_file = os.path.abspath(os.path.join( 763 self.report_path, ReportConstant.summary_report_hash)) 764 hash_file_open = os.open(hash_file, os.O_WRONLY | os.O_CREAT | 765 os.O_APPEND, FilePermission.mode_755) 766 with os.fdopen(hash_file_open, "w") as hash_file_handler: 767 hash_file_handler.write(get_file_summary(zipped_file)) 768 LOG.info("Generate hash file: %s", hash_file) 769 hash_file_handler.flush() 770 return zipped_file 771 772 @classmethod 773 def _check_mode(cls, mode): 774 from xdevice import Scheduler 775 return Scheduler.mode == mode 776 777 def _generate_task_record(self): 778 # under encryption status, don't handle anything directly 779 if check_pub_key_exist() and not self._check_mode(ModeType.decc): 780 return 781 782 # get info from command_queue 783 from xdevice import Scheduler 784 if not Scheduler.command_queue: 785 return 786 _, command, report_path = Scheduler.command_queue[-1] 787 788 record_info = self._parse_record_from_data(command, report_path) 789 790 def encode(content): 791 # inner function to encode 792 return ' '.join([bin(ord(c)).replace('0b', '') for c in content]) 793 794 # write into file 795 record_file = os.path.join(self.report_path, 796 ReportConstant.task_info_record) 797 _record_json = json.dumps(record_info, indent=2) 798 799 with open(file=record_file, mode="wb") as file: 800 if Scheduler.mode == ModeType.decc: 801 # under decc, write in encoded text 802 file.write(bytes(encode(_record_json), encoding="utf-8")) 803 else: 804 # others, write in plain text 805 file.write(bytes(_record_json, encoding="utf-8")) 806 807 LOG.info("Generate record file: %s", record_file) 808 809 def _parse_record_from_data(self, command, report_path): 810 record = dict() 811 if self.parsed_data: 812 _, _, suites = self.parsed_data 813 unsuccessful = dict() 814 module_set = set() 815 for suite in suites: 816 module_set.add(suite.module_name) 817 818 failed = unsuccessful.get(suite.module_name, []) 819 # because suite not contains case's some attribute, 820 # for example, 'module', 'classname', 'name' . so 821 # if unavailable, only add module's name into list. 822 if int(suite.result.unavailable) > 0: 823 failed.append(suite.module_name) 824 else: 825 # others, get key attributes join string 826 for case in suite.get_cases(): 827 if not case.is_passed(): 828 failed.append( 829 "{}#{}".format(case.classname, case.name)) 830 unsuccessful.update({suite.module_name: failed}) 831 data_reports = self._get_data_reports(module_set) 832 record = {"command": command, 833 "session_id": os.path.split(report_path)[-1], 834 "report_path": report_path, 835 "unsuccessful_params": unsuccessful, 836 "data_reports": data_reports 837 } 838 return record 839 840 def _get_data_reports(self, module_set): 841 data_reports = dict() 842 if self._check_mode(ModeType.decc): 843 from xdevice import SuiteReporter 844 for module_name, report_path, _ in \ 845 SuiteReporter.get_history_result_list(): 846 if module_name in module_set: 847 data_reports.update({module_name: report_path}) 848 else: 849 for report_path, module_name in self.data_reports: 850 if module_name == ReportConstant.empty_name: 851 root = self.data_helper.parse_data_report(report_path) 852 module_name = self._get_module_name(report_path, root) 853 if module_name in module_set: 854 data_reports.update({module_name: report_path}) 855 856 return data_reports 857 858 @classmethod 859 def get_task_info_params(cls, history_path): 860 # under encryption status, don't handle anything directly 861 if check_pub_key_exist() and not cls._check_mode(ModeType.decc): 862 return () 863 864 def decode(content): 865 result_list = [] 866 for b in content.split(' '): 867 result_list.append(chr(int(b, 2))) 868 return ''.join(result_list) 869 870 record_path = os.path.join(history_path, 871 ReportConstant.task_info_record) 872 if not os.path.exists(record_path): 873 LOG.error("%s not exists!", ReportConstant.task_info_record) 874 return () 875 876 from xdevice import Scheduler 877 with open(record_path, mode="rb") as file: 878 if Scheduler.mode == ModeType.decc: 879 # under decc, read from encoded text 880 result = json.loads(decode(file.read().decode("utf-8"))) 881 else: 882 # others, read from plain text 883 result = json.loads(file.read()) 884 standard_length = 5 885 if not len(result.keys()) == standard_length: 886 LOG.error("%s error!", ReportConstant.task_info_record) 887 return () 888 889 return result 890 891 @classmethod 892 def set_summary_report_result(cls, summary_data_path, result_xml): 893 cls.summary_report_result.clear() 894 cls.summary_report_result.append((summary_data_path, result_xml)) 895 896 @classmethod 897 def get_result_of_summary_report(cls): 898 if cls.summary_report_result: 899 return cls.summary_report_result[0][1] 900 return None 901 902 @classmethod 903 def summary_report_result_exists(cls): 904 return True if cls.summary_report_result else False 905 906 @classmethod 907 def get_path_of_summary_report(cls): 908 if cls.summary_report_result: 909 return cls.summary_report_result[0][0] 910 return None 911 912 @classmethod 913 def _write_long_size_file(cls, zip_object, long_size_file): 914 for filename, arcname in long_size_file: 915 zip_info = zipfile.ZipInfo.from_file(filename, arcname) 916 zip_info.compress_type = getattr(zip_object, "compression", 917 zipfile.ZIP_DEFLATED) 918 if hasattr(zip_info, "_compresslevel"): 919 _compress_level = getattr(zip_object, "compresslevel", None) 920 setattr(zip_info, "_compresslevel", _compress_level) 921 with open(filename, "rb") as src, \ 922 zip_object.open(zip_info, "w") as des: 923 shutil.copyfileobj(src, des, 1024 * 1024 * 8) 924 925 def _transact_all(self): 926 pyc_path = os.path.join(Variables.res_dir, "tools", "binder.pyc") 927 if not os.path.exists(pyc_path): 928 return 929 module_spec = util.spec_from_file_location("binder", pyc_path) 930 if not module_spec: 931 return 932 module = util.module_from_spec(module_spec) 933 module_spec.loader.exec_module(module) 934 if hasattr(module, "transact") and callable(module.transact): 935 module.transact(self, LOG) 936 del module 937 938 @classmethod 939 def _handle_module_tests(cls, modules, test_suites_attributes): 940 modules_list = list() 941 modules_zero = list() 942 for module_name, detail_list in modules.items(): 943 for total in detail_list: 944 modules_list.append(total) 945 if total == 0: 946 modules_zero.append(module_name) 947 test_suites_attributes[ReportConstant.run_modules] = \ 948 len(modules_list) - len(modules_zero) 949 test_suites_attributes[ReportConstant.modules] = len(modules_list) 950 if modules_zero: 951 LOG.info("The total tests of %s module is 0", ",".join( 952 modules_zero))