1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import platform 21import shutil 22import time 23import zipfile 24from importlib import util 25from ast import literal_eval 26 27from _core.interface import IReporter 28from _core.plugin import Plugin 29from _core.constants import ModeType 30from _core.constants import TestType 31from _core.constants import FilePermission 32from _core.logger import platform_logger 33from _core.exception import ParamError 34from _core.utils import get_filename_extension 35from _core.report.encrypt import check_pub_key_exist 36from _core.report.encrypt import do_rsa_encrypt 37from _core.report.encrypt import get_file_summary 38from _core.report.reporter_helper import DataHelper 39from _core.report.reporter_helper import ExecInfo 40from _core.report.reporter_helper import VisionHelper 41from _core.report.reporter_helper import ReportConstant 42 43LOG = platform_logger("ResultReporter") 44 45 46@Plugin(type=Plugin.REPORTER, id=TestType.all) 47class ResultReporter(IReporter): 48 summary_report_result = [] 49 50 def __init__(self): 51 self.report_path = None 52 self.task_info = None 53 self.summary_data_path = None 54 self.summary_data_str = "" 55 self.exec_info = None 56 self.parsed_data = None 57 self.data_helper = None 58 self.vision_helper = None 59 60 def __generate_reports__(self, report_path, **kwargs): 61 LOG.info("") 62 LOG.info("**************************************************") 63 LOG.info("************** Start generate reports ************") 64 LOG.info("**************************************************") 65 LOG.info("") 66 67 if self._check_params(report_path, **kwargs): 68 # generate data report 69 self._generate_data_report() 70 71 # generate vision reports 72 self._generate_vision_reports() 73 74 # generate task info record 75 self._generate_task_info_record() 76 77 # generate summary ini 78 self._generate_summary() 79 80 # copy reports to reports/latest folder 81 self._copy_report() 82 83 self._transact_all() 84 85 LOG.info("") 86 LOG.info("**************************************************") 87 LOG.info("************** Ended generate reports ************") 88 LOG.info("**************************************************") 89 LOG.info("") 90 91 def _check_params(self, report_path, **kwargs): 92 task_info = kwargs.get("task_info", "") 93 if not report_path: 94 LOG.error("Report path is wrong", error_no="00440", 95 ReportPath=report_path) 96 return False 97 if not task_info or not isinstance(task_info, ExecInfo): 98 LOG.error("Task info is wrong", error_no="00441", 99 TaskInfo=task_info) 100 return False 101 102 os.makedirs(report_path, exist_ok=True) 103 self.report_path = report_path 104 self.task_info = task_info 105 self.summary_data_path = os.path.join( 106 self.report_path, ReportConstant.summary_data_report) 107 self.exec_info = task_info 108 self.data_helper = DataHelper() 109 self.vision_helper = VisionHelper() 110 return True 111 112 def _generate_data_report(self): 113 # initial element 114 test_suites_element = self.data_helper.initial_suites_element() 115 116 # update test suites element 117 update_flag = self._update_test_suites(test_suites_element) 118 if not update_flag: 119 return 120 121 # generate report 122 if not self._check_mode(ModeType.decc): 123 self.data_helper.generate_report(test_suites_element, 124 self.summary_data_path) 125 126 # set SuiteReporter.suite_report_result 127 if not check_pub_key_exist() and not self._check_mode( 128 ModeType.decc): 129 return 130 self.set_summary_report_result( 131 self.summary_data_path, DataHelper.to_string(test_suites_element)) 132 133 def _update_test_suites(self, test_suites_element): 134 # initial attributes for test suites element 135 test_suites_attributes, need_update_attributes = \ 136 self._init_attributes() 137 138 # get test suite elements that are children of test suites element 139 modules = dict() 140 test_suite_elements = [] 141 for data_report, module_name in self.data_reports: 142 if data_report.endswith(ReportConstant.summary_data_report): 143 continue 144 root = self.data_helper.parse_data_report(data_report) 145 if module_name == ReportConstant.empty_name: 146 module_name = self._get_module_name(data_report, root) 147 total = int(root.get(ReportConstant.tests, 0)) 148 modules[module_name] = modules.get(module_name, 0) + total 149 150 self._append_product_info(test_suites_attributes, root) 151 for child in root: 152 child.tail = self.data_helper.LINE_BREAK_INDENT 153 if not child.get(ReportConstant.module_name) or child.get( 154 ReportConstant.module_name) == \ 155 ReportConstant.empty_name: 156 child.set(ReportConstant.module_name, module_name) 157 self._check_tests_and_unavailable(child) 158 # covert the status of "notrun" to "ignored" 159 for element in child: 160 if element.get(ReportConstant.status, "") == \ 161 ReportConstant.not_run: 162 ignored = int(child.get(ReportConstant.ignored, 0)) + 1 163 child.set(ReportConstant.ignored, "%s" % ignored) 164 test_suite_elements.append(child) 165 for update_attribute in need_update_attributes: 166 update_value = child.get(update_attribute, 0) 167 if not update_value: 168 update_value = 0 169 test_suites_attributes[update_attribute] += int( 170 update_value) 171 172 if test_suite_elements: 173 child = test_suite_elements[-1] 174 child.tail = self.data_helper.LINE_BREAK 175 else: 176 LOG.error("Execute result not exists") 177 return False 178 179 # set test suites element attributes and children 180 modules_zero = [module_name for module_name, total in modules.items() 181 if total == 0] 182 if modules_zero: 183 LOG.info("The total tests of %s module is 0", ",".join( 184 modules_zero)) 185 test_suites_attributes[ReportConstant.run_modules] = \ 186 len(modules) - len(modules_zero) 187 test_suites_attributes[ReportConstant.modules] = len(modules) 188 self.data_helper.set_element_attributes(test_suites_element, 189 test_suites_attributes) 190 test_suites_element.extend(test_suite_elements) 191 return True 192 193 @classmethod 194 def _check_tests_and_unavailable(cls, child): 195 total = child.get(ReportConstant.tests, "0") 196 unavailable = child.get(ReportConstant.unavailable, "0") 197 if total and total != "0" and unavailable and \ 198 unavailable != "0": 199 child.set(ReportConstant.unavailable, "0") 200 LOG.warning("%s total: %s, unavailable: %s", child.get( 201 ReportConstant.name), total, unavailable) 202 203 @classmethod 204 def _append_product_info(cls, test_suites_attributes, root): 205 product_info = root.get(ReportConstant.product_info, "") 206 if not product_info: 207 return 208 try: 209 product_info = literal_eval(str(product_info)) 210 except SyntaxError as error: 211 LOG.error("%s %s", root.get(ReportConstant.name, ""), error.args) 212 product_info = {} 213 214 if not test_suites_attributes[ReportConstant.product_info]: 215 test_suites_attributes[ReportConstant.product_info] = \ 216 product_info 217 return 218 for key, value in product_info.items(): 219 exist_value = test_suites_attributes[ 220 ReportConstant.product_info].get(key, "") 221 222 if not exist_value: 223 test_suites_attributes[ 224 ReportConstant.product_info][key] = value 225 continue 226 if value in exist_value: 227 continue 228 test_suites_attributes[ReportConstant.product_info][key] = \ 229 "%s,%s" % (exist_value, value) 230 231 @classmethod 232 def _get_module_name(cls, data_report, root): 233 # get module name from data report 234 module_name = get_filename_extension(data_report)[0] 235 if "report" in module_name or "summary" in module_name or \ 236 "<" in data_report or ">" in data_report: 237 module_name = root.get(ReportConstant.name, 238 ReportConstant.empty_name) 239 if "report" in module_name or "summary" in module_name: 240 module_name = ReportConstant.empty_name 241 return module_name 242 243 def _init_attributes(self): 244 test_suites_attributes = { 245 ReportConstant.name: 246 ReportConstant.summary_data_report.split(".")[0], 247 ReportConstant.start_time: self.task_info.test_time, 248 ReportConstant.end_time: time.strftime(ReportConstant.time_format, 249 time.localtime()), 250 ReportConstant.errors: 0, ReportConstant.disabled: 0, 251 ReportConstant.failures: 0, ReportConstant.tests: 0, 252 ReportConstant.ignored: 0, ReportConstant.unavailable: 0, 253 ReportConstant.product_info: self.task_info.product_info, 254 ReportConstant.modules: 0, ReportConstant.run_modules: 0} 255 need_update_attributes = [ReportConstant.tests, ReportConstant.ignored, 256 ReportConstant.failures, 257 ReportConstant.disabled, 258 ReportConstant.errors, 259 ReportConstant.unavailable] 260 return test_suites_attributes, need_update_attributes 261 262 def _generate_vision_reports(self): 263 if not self._check_mode(ModeType.decc) and not \ 264 self.summary_data_report_exist: 265 LOG.error("Summary data report not exists") 266 return 267 268 if check_pub_key_exist() or self._check_mode(ModeType.decc): 269 if not self.summary_report_result_exists(): 270 LOG.error("Summary data report not exists") 271 return 272 self.summary_data_str = \ 273 self.get_result_of_summary_report() 274 if check_pub_key_exist(): 275 from xdevice import SuiteReporter 276 SuiteReporter.clear_report_result() 277 278 # parse data 279 if self.summary_data_str: 280 # only in decc mode and pub key, self.summary_data_str is not empty 281 summary_element_tree = self.data_helper.parse_data_report( 282 self.summary_data_str) 283 else: 284 summary_element_tree = self.data_helper.parse_data_report( 285 self.summary_data_path) 286 parsed_data = self.vision_helper.parse_element_data( 287 summary_element_tree, self.report_path, self.task_info) 288 self.parsed_data = parsed_data 289 self.exec_info, summary, _ = parsed_data 290 291 if self._check_mode(ModeType.decc): 292 return 293 294 LOG.info("Summary result: modules: %s, run modules: %s, total: " 295 "%s, passed: %s, failed: %s, blocked: %s, ignored: %s, " 296 "unavailable: %s", summary.modules, summary.run_modules, 297 summary.result.total, summary.result.passed, 298 summary.result.failed, summary.result.blocked, 299 summary.result.ignored, summary.result.unavailable) 300 LOG.info("Log path: %s", self.exec_info.log_path) 301 302 if summary.result.failed != 0 or summary.result.blocked != 0 or summary.result.unavailable != 0: 303 from xdevice import Scheduler 304 Scheduler.is_need_auto_retry = True 305 306 # generate summary vision report 307 report_generate_flag = self._generate_vision_report( 308 parsed_data, ReportConstant.summary_title, 309 ReportConstant.summary_vision_report) 310 311 # generate details vision report 312 if report_generate_flag and summary.result.total > 0: 313 self._generate_vision_report( 314 parsed_data, ReportConstant.details_title, 315 ReportConstant.details_vision_report) 316 317 # generate failures vision report 318 if summary.result.total != ( 319 summary.result.passed + summary.result.ignored) or \ 320 summary.result.unavailable > 0: 321 self._generate_vision_report( 322 parsed_data, ReportConstant.failures_title, 323 ReportConstant.failures_vision_report) 324 325 def _generate_vision_report(self, parsed_data, title, render_target): 326 327 # render data 328 report_context = self.vision_helper.render_data( 329 title, parsed_data, render_target=render_target) 330 331 # generate report 332 if report_context: 333 report_path = os.path.join(self.report_path, render_target) 334 self.vision_helper.generate_report(report_path, report_context) 335 return True 336 else: 337 LOG.error("Failed to generate %s", render_target) 338 return False 339 340 @property 341 def summary_data_report_exist(self): 342 return "<" in self.summary_data_str or \ 343 os.path.exists(self.summary_data_path) 344 345 @property 346 def data_reports(self): 347 if check_pub_key_exist() or self._check_mode(ModeType.decc): 348 from xdevice import SuiteReporter 349 suite_reports = SuiteReporter.get_report_result() 350 if self._check_mode(ModeType.decc): 351 LOG.debug("Handle history result, data reports length:{}". 352 format(len(suite_reports))) 353 SuiteReporter.clear_history_result() 354 SuiteReporter.append_history_result(suite_reports) 355 data_reports = [] 356 for report_path, report_result in suite_reports: 357 module_name = get_filename_extension(report_path)[0] 358 data_reports.append((report_result, module_name)) 359 SuiteReporter.clear_report_result() 360 return data_reports 361 362 if not os.path.isdir(self.report_path): 363 return [] 364 data_reports = [] 365 result_path = os.path.join(self.report_path, "result") 366 for root, _, files in os.walk(self.report_path): 367 for file_name in files: 368 if not file_name.endswith(self.data_helper.DATA_REPORT_SUFFIX): 369 continue 370 module_name = self._find_module_name(result_path, root) 371 data_reports.append((os.path.join(root, file_name), 372 module_name)) 373 return data_reports 374 375 @classmethod 376 def _find_module_name(cls, result_path, root): 377 # find module name from directory tree 378 common_path = os.path.commonpath([result_path, root]) 379 if os.path.normcase(result_path) != os.path.normcase(common_path) or \ 380 os.path.normcase(result_path) == os.path.normcase(root): 381 return ReportConstant.empty_name 382 383 root_dir, module_name = os.path.split(root) 384 if os.path.normcase(result_path) == os.path.normcase(root_dir): 385 return ReportConstant.empty_name 386 root_dir, subsystem_name = os.path.split(root_dir) 387 while os.path.normcase(result_path) != os.path.normcase(root_dir): 388 module_name = subsystem_name 389 root_dir, subsystem_name = os.path.split(root_dir) 390 return module_name 391 392 def _generate_summary(self): 393 if not self.summary_data_report_exist or \ 394 self._check_mode(ModeType.decc): 395 return 396 summary_ini_content = \ 397 "[default]\n" \ 398 "Platform=%s\n" \ 399 "Test Type=%s\n" \ 400 "Device Name=%s\n" \ 401 "Host Info=%s\n" \ 402 "Test Start/ End Time=%s\n" \ 403 "Execution Time=%s\n" % ( 404 self.exec_info.platform, self.exec_info.test_type, 405 self.exec_info.device_name, self.exec_info.host_info, 406 self.exec_info.test_time, self.exec_info.execute_time) 407 if self.exec_info.product_info: 408 for key, value in self.exec_info.product_info.items(): 409 summary_ini_content = "{}{}".format( 410 summary_ini_content, "%s=%s\n" % (key, value)) 411 412 if not self._check_mode(ModeType.factory): 413 summary_ini_content = "{}{}".format( 414 summary_ini_content, "Log Path=%s\n" % self.exec_info.log_path) 415 416 # write summary_ini_content 417 summary_filepath = os.path.join(self.report_path, 418 ReportConstant.summary_ini) 419 420 if platform.system() == "Windows": 421 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY 422 else: 423 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND 424 summary_filepath_open = os.open(summary_filepath, flags, 425 FilePermission.mode_755) 426 427 with os.fdopen(summary_filepath_open, "wb") as file_handler: 428 if check_pub_key_exist(): 429 try: 430 cipher_text = do_rsa_encrypt(summary_ini_content) 431 except ParamError as error: 432 LOG.error(error, error_no=error.error_no) 433 cipher_text = b"" 434 file_handler.write(cipher_text) 435 else: 436 file_handler.write(bytes(summary_ini_content, 'utf-8')) 437 file_handler.flush() 438 LOG.info("Generate summary ini: %s", summary_filepath) 439 440 def _copy_report(self): 441 from xdevice import Scheduler 442 if Scheduler.upload_address or self._check_mode(ModeType.decc): 443 return 444 445 from xdevice import Variables 446 dst_path = os.path.join(Variables.temp_dir, "latest") 447 try: 448 shutil.rmtree(dst_path, ignore_errors=True) 449 os.makedirs(dst_path, exist_ok=True) 450 LOG.info("Copy summary files to %s", dst_path) 451 452 # copy reports to reports/latest folder 453 for report_file in os.listdir(self.report_path): 454 src_file = os.path.join(self.report_path, report_file) 455 dst_file = os.path.join(dst_path, report_file) 456 if os.path.isfile(src_file): 457 shutil.copyfile(src_file, dst_file) 458 except OSError as _: 459 return 460 461 def _compress_report_folder(self): 462 if self._check_mode(ModeType.decc) or \ 463 self._check_mode(ModeType.factory): 464 return 465 466 if not os.path.isdir(self.report_path): 467 LOG.error("'%s' is not folder!" % self.report_path) 468 return 469 470 # get file path list 471 file_path_list = [] 472 for dir_path, _, file_names in os.walk(self.report_path): 473 f_path = dir_path.replace(self.report_path, '') 474 f_path = f_path and f_path + os.sep or '' 475 for filename in file_names: 476 file_path_list.append( 477 (os.path.join(dir_path, filename), f_path + filename)) 478 479 # compress file 480 zipped_file = "%s.zip" % os.path.join( 481 self.report_path, os.path.basename(self.report_path)) 482 zip_object = zipfile.ZipFile(zipped_file, 'w', zipfile.ZIP_DEFLATED, 483 allowZip64=True) 484 try: 485 LOG.info("Executing compress process, please wait...") 486 long_size_file = [] 487 for src_path, target_path in file_path_list: 488 long_size_file.append((src_path, target_path)) 489 self._write_long_size_file(zip_object, long_size_file) 490 491 LOG.info("Generate zip file: %s", zipped_file) 492 except zipfile.BadZipFile as bad_error: 493 LOG.error("Zip report folder error: %s" % bad_error.args) 494 finally: 495 zip_object.close() 496 497 # generate hex digest, then save it to summary_report.hash 498 hash_file = os.path.abspath(os.path.join( 499 self.report_path, ReportConstant.summary_report_hash)) 500 hash_file_open = os.open(hash_file, os.O_WRONLY | os.O_CREAT | 501 os.O_APPEND, FilePermission.mode_755) 502 with os.fdopen(hash_file_open, "w") as hash_file_handler: 503 hash_file_handler.write(get_file_summary(zipped_file)) 504 LOG.info("Generate hash file: %s", hash_file) 505 hash_file_handler.flush() 506 return zipped_file 507 508 @classmethod 509 def _check_mode(cls, mode): 510 from xdevice import Scheduler 511 return Scheduler.mode == mode 512 513 def _generate_task_info_record(self): 514 # under encryption status, don't handle anything directly 515 if check_pub_key_exist() and not self._check_mode(ModeType.decc): 516 return 517 518 # get info from command_queue 519 from xdevice import Scheduler 520 if not Scheduler.command_queue: 521 return 522 _, command, report_path = Scheduler.command_queue[-1] 523 524 # handle parsed data 525 record = self._parse_record_from_data(command, report_path) 526 527 def encode(content): 528 # inner function to encode 529 return ' '.join([bin(ord(c)).replace('0b', '') for c in content]) 530 531 # write into file 532 import json 533 record_file = os.path.join(self.report_path, 534 ReportConstant.task_info_record) 535 _record_json = json.dumps(record, indent=2) 536 537 with open(file=record_file, mode="wb") as file: 538 if Scheduler.mode == ModeType.decc: 539 # under decc, write in encoded text 540 file.write(bytes(encode(_record_json), encoding="utf-8")) 541 else: 542 # others, write in plain text 543 file.write(bytes(_record_json, encoding="utf-8")) 544 545 LOG.info("Generate record file: %s", record_file) 546 547 def _parse_record_from_data(self, command, report_path): 548 record = dict() 549 if self.parsed_data: 550 _, _, suites = self.parsed_data 551 unsuccessful = dict() 552 module_set = set() 553 for suite in suites: 554 module_set.add(suite.module_name) 555 556 failed = unsuccessful.get(suite.module_name, []) 557 # because suite not contains case's some attribute, 558 # for example, 'module', 'classname', 'name' . so 559 # if unavailable, only add module's name into list. 560 if int(suite.result.unavailable) > 0: 561 failed.append(suite.module_name) 562 else: 563 # others, get key attributes join string 564 for case in suite.get_cases(): 565 if not case.is_passed(): 566 failed.append( 567 "{}#{}".format(case.classname, case.name)) 568 unsuccessful.update({suite.module_name: failed}) 569 data_reports = self._get_data_reports(module_set) 570 record = {"command": command, 571 "session_id": os.path.split(report_path)[-1], 572 "report_path": report_path, 573 "unsuccessful_params": unsuccessful, 574 "data_reports": data_reports 575 } 576 return record 577 578 def _get_data_reports(self, module_set): 579 data_reports = dict() 580 if self._check_mode(ModeType.decc): 581 from xdevice import SuiteReporter 582 for module_name, report_path, report_result in \ 583 SuiteReporter.get_history_result_list(): 584 if module_name in module_set: 585 data_reports.update({module_name: report_path}) 586 else: 587 for report_path, module_name in self.data_reports: 588 if module_name == ReportConstant.empty_name: 589 root = self.data_helper.parse_data_report(report_path) 590 module_name = self._get_module_name(report_path, root) 591 if module_name in module_set: 592 data_reports.update({module_name: report_path}) 593 594 return data_reports 595 596 @classmethod 597 def get_task_info_params(cls, history_path): 598 # under encryption status, don't handle anything directly 599 if check_pub_key_exist() and not cls._check_mode(ModeType.decc): 600 return () 601 602 def decode(content): 603 return ''.join([chr(i) for i in [int(b, 2) for b in 604 content.split(' ')]]) 605 606 record_path = os.path.join(history_path, 607 ReportConstant.task_info_record) 608 if not os.path.exists(record_path): 609 LOG.error("%s not exists!", ReportConstant.task_info_record) 610 return () 611 612 import json 613 from xdevice import Scheduler 614 with open(record_path, mode="rb") as file: 615 if Scheduler.mode == ModeType.decc: 616 # under decc, read from encoded text 617 result = json.loads(decode(file.read().decode("utf-8"))) 618 else: 619 # others, read from plain text 620 result = json.loads(file.read()) 621 standard_length = 5 622 if not len(result.keys()) == standard_length: 623 LOG.error("%s error!", ReportConstant.task_info_record) 624 return () 625 626 return result 627 628 @classmethod 629 def set_summary_report_result(cls, summary_data_path, result_xml): 630 cls.summary_report_result.clear() 631 cls.summary_report_result.append((summary_data_path, result_xml)) 632 633 @classmethod 634 def get_result_of_summary_report(cls): 635 if cls.summary_report_result: 636 return cls.summary_report_result[0][1] 637 638 @classmethod 639 def summary_report_result_exists(cls): 640 return True if cls.summary_report_result else False 641 642 @classmethod 643 def get_path_of_summary_report(cls): 644 if cls.summary_report_result: 645 return cls.summary_report_result[0][0] 646 647 @classmethod 648 def _write_long_size_file(cls, zip_object, long_size_file): 649 for filename, arcname in long_size_file: 650 zip_info = zipfile.ZipInfo.from_file(filename, arcname) 651 zip_info.compress_type = getattr(zip_object, "compression", 652 zipfile.ZIP_DEFLATED) 653 if hasattr(zip_info, "_compresslevel"): 654 _compress_level = getattr(zip_object, "compresslevel", None) 655 setattr(zip_info, "_compresslevel", _compress_level) 656 with open(filename, "rb") as src, \ 657 zip_object.open(zip_info, "w") as des: 658 shutil.copyfileobj(src, des, 1024 * 1024 * 8) 659 660 def _transact_all(self): 661 from xdevice import Variables 662 tools_dir = os.path.join(Variables.res_dir, "tools", "binder.pyc") 663 if not os.path.exists(tools_dir): 664 return 665 module_spec = util.spec_from_file_location( 666 "binder", tools_dir) 667 if not module_spec: 668 return 669 module = util.module_from_spec(module_spec) 670 module_spec.loader.exec_module(module) 671 if hasattr(module, "transact") and callable(module.transact): 672 module.transact(self, LOG) 673 del module 674