1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import platform 21import shutil 22import time 23import zipfile 24from importlib import util 25from ast import literal_eval 26 27from _core.interface import IReporter 28from _core.plugin import Plugin 29from _core.constants import ModeType 30from _core.constants import TestType 31from _core.constants import FilePermission 32from _core.logger import platform_logger 33from _core.exception import ParamError 34from _core.utils import get_filename_extension 35from _core.report.encrypt import check_pub_key_exist 36from _core.report.encrypt import do_rsa_encrypt 37from _core.report.encrypt import get_file_summary 38from _core.report.reporter_helper import DataHelper 39from _core.report.reporter_helper import ExecInfo 40from _core.report.reporter_helper import VisionHelper 41from _core.report.reporter_helper import ReportConstant 42 43LOG = platform_logger("ResultReporter") 44 45 46@Plugin(type=Plugin.REPORTER, id=TestType.all) 47class ResultReporter(IReporter): 48 summary_report_result = [] 49 50 def __init__(self): 51 self.report_path = None 52 self.task_info = None 53 self.summary_data_path = None 54 self.summary_data_str = "" 55 self.exec_info = None 56 self.parsed_data = None 57 self.data_helper = None 58 self.vision_helper = None 59 60 def __generate_reports__(self, report_path, **kwargs): 61 LOG.info("") 62 LOG.info("**************************************************") 63 LOG.info("************** Start generate reports ************") 64 LOG.info("**************************************************") 65 LOG.info("") 66 67 if self._check_params(report_path, **kwargs): 68 # generate data report 69 self._generate_data_report() 70 71 # generate vision reports 72 self._generate_vision_reports() 73 74 # generate task info record 75 self._generate_task_info_record() 76 77 # generate summary ini 78 self._generate_summary() 79 80 # copy reports to reports/latest folder 81 self._copy_report() 82 83 self._transact_all() 84 85 LOG.info("") 86 LOG.info("**************************************************") 87 LOG.info("************** Ended generate reports ************") 88 LOG.info("**************************************************") 89 LOG.info("") 90 91 def _check_params(self, report_path, **kwargs): 92 task_info = kwargs.get("task_info", "") 93 if not report_path: 94 LOG.error("Report path is wrong", error_no="00440", 95 ReportPath=report_path) 96 return False 97 if not task_info or not isinstance(task_info, ExecInfo): 98 LOG.error("Task info is wrong", error_no="00441", 99 TaskInfo=task_info) 100 return False 101 102 os.makedirs(report_path, exist_ok=True) 103 self.report_path = report_path 104 self.task_info = task_info 105 self.summary_data_path = os.path.join( 106 self.report_path, ReportConstant.summary_data_report) 107 self.exec_info = task_info 108 self.data_helper = DataHelper() 109 self.vision_helper = VisionHelper() 110 return True 111 112 def _generate_data_report(self): 113 # initial element 114 test_suites_element = self.data_helper.initial_suites_element() 115 116 # update test suites element 117 update_flag = self._update_test_suites(test_suites_element) 118 if not update_flag: 119 return 120 121 # generate report 122 if not self._check_mode(ModeType.decc): 123 self.data_helper.generate_report(test_suites_element, 124 self.summary_data_path) 125 126 # set SuiteReporter.suite_report_result 127 if not check_pub_key_exist() and not self._check_mode( 128 ModeType.decc): 129 return 130 self.set_summary_report_result( 131 self.summary_data_path, DataHelper.to_string(test_suites_element)) 132 133 def _update_test_suites(self, test_suites_element): 134 # initial attributes for test suites element 135 test_suites_attributes, need_update_attributes = \ 136 self._init_attributes() 137 138 # get test suite elements that are children of test suites element 139 modules = dict() 140 test_suite_elements = [] 141 for data_report, module_name in self.data_reports: 142 if data_report.endswith(ReportConstant.summary_data_report): 143 continue 144 root = self.data_helper.parse_data_report(data_report) 145 if module_name == ReportConstant.empty_name: 146 module_name = self._get_module_name(data_report, root) 147 total = int(root.get(ReportConstant.tests, 0)) 148 modules[module_name] = modules.get(module_name, 0) + total 149 150 self._append_product_info(test_suites_attributes, root) 151 for child in root: 152 child.tail = self.data_helper.LINE_BREAK_INDENT 153 if not child.get(ReportConstant.module_name) or child.get( 154 ReportConstant.module_name) == \ 155 ReportConstant.empty_name: 156 child.set(ReportConstant.module_name, module_name) 157 self._check_tests_and_unavailable(child) 158 # covert the status of "notrun" to "ignored" 159 for element in child: 160 if element.get(ReportConstant.status, "") == \ 161 ReportConstant.not_run: 162 ignored = int(child.get(ReportConstant.ignored, 0)) + 1 163 child.set(ReportConstant.ignored, "%s" % ignored) 164 test_suite_elements.append(child) 165 for update_attribute in need_update_attributes: 166 update_value = child.get(update_attribute, 0) 167 if not update_value: 168 update_value = 0 169 test_suites_attributes[update_attribute] += int( 170 update_value) 171 172 if test_suite_elements: 173 child = test_suite_elements[-1] 174 child.tail = self.data_helper.LINE_BREAK 175 else: 176 LOG.error("Execute result not exists") 177 return False 178 179 # set test suites element attributes and children 180 modules_zero = [module_name for module_name, total in modules.items() 181 if total == 0] 182 if modules_zero: 183 LOG.info("The total tests of %s module is 0", ",".join( 184 modules_zero)) 185 test_suites_attributes[ReportConstant.run_modules] = \ 186 len(modules) - len(modules_zero) 187 test_suites_attributes[ReportConstant.modules] = len(modules) 188 self.data_helper.set_element_attributes(test_suites_element, 189 test_suites_attributes) 190 test_suites_element.extend(test_suite_elements) 191 return True 192 193 @classmethod 194 def _check_tests_and_unavailable(cls, child): 195 total = child.get(ReportConstant.tests, "0") 196 unavailable = child.get(ReportConstant.unavailable, "0") 197 if total and total != "0" and unavailable and \ 198 unavailable != "0": 199 child.set(ReportConstant.unavailable, "0") 200 LOG.warning("%s total: %s, unavailable: %s", child.get( 201 ReportConstant.name), total, unavailable) 202 203 @classmethod 204 def _append_product_info(cls, test_suites_attributes, root): 205 product_info = root.get(ReportConstant.product_info, "") 206 if not product_info: 207 return 208 try: 209 product_info = literal_eval(str(product_info)) 210 except SyntaxError as error: 211 LOG.error("%s %s", root.get(ReportConstant.name, ""), error.args) 212 product_info = {} 213 214 if not test_suites_attributes[ReportConstant.product_info]: 215 test_suites_attributes[ReportConstant.product_info] = \ 216 product_info 217 return 218 for key, value in product_info.items(): 219 exist_value = test_suites_attributes[ 220 ReportConstant.product_info].get(key, "") 221 222 if not exist_value: 223 test_suites_attributes[ 224 ReportConstant.product_info][key] = value 225 continue 226 if value in exist_value: 227 continue 228 test_suites_attributes[ReportConstant.product_info][key] = \ 229 "%s,%s" % (exist_value, value) 230 231 @classmethod 232 def _get_module_name(cls, data_report, root): 233 # get module name from data report 234 module_name = get_filename_extension(data_report)[0] 235 if "report" in module_name or "summary" in module_name or \ 236 "<" in data_report or ">" in data_report: 237 module_name = root.get(ReportConstant.name, 238 ReportConstant.empty_name) 239 if "report" in module_name or "summary" in module_name: 240 module_name = ReportConstant.empty_name 241 return module_name 242 243 def _init_attributes(self): 244 test_suites_attributes = { 245 ReportConstant.name: 246 ReportConstant.summary_data_report.split(".")[0], 247 ReportConstant.start_time: self.task_info.test_time, 248 ReportConstant.end_time: time.strftime(ReportConstant.time_format, 249 time.localtime()), 250 ReportConstant.errors: 0, ReportConstant.disabled: 0, 251 ReportConstant.failures: 0, ReportConstant.tests: 0, 252 ReportConstant.ignored: 0, ReportConstant.unavailable: 0, 253 ReportConstant.product_info: self.task_info.product_info, 254 ReportConstant.modules: 0, ReportConstant.run_modules: 0} 255 need_update_attributes = [ReportConstant.tests, ReportConstant.ignored, 256 ReportConstant.failures, 257 ReportConstant.disabled, 258 ReportConstant.errors, 259 ReportConstant.unavailable] 260 return test_suites_attributes, need_update_attributes 261 262 def _generate_vision_reports(self): 263 if not self._check_mode(ModeType.decc) and not \ 264 self.summary_data_report_exist: 265 LOG.error("Summary data report not exists") 266 return 267 268 if check_pub_key_exist() or self._check_mode(ModeType.decc): 269 if not self.summary_report_result_exists(): 270 LOG.error("Summary data report not exists") 271 return 272 self.summary_data_str = \ 273 self.get_result_of_summary_report() 274 if check_pub_key_exist(): 275 from xdevice import SuiteReporter 276 SuiteReporter.clear_report_result() 277 278 # parse data 279 if self.summary_data_str: 280 # only in decc mode and pub key, self.summary_data_str is not empty 281 summary_element_tree = self.data_helper.parse_data_report( 282 self.summary_data_str) 283 else: 284 summary_element_tree = self.data_helper.parse_data_report( 285 self.summary_data_path) 286 parsed_data = self.vision_helper.parse_element_data( 287 summary_element_tree, self.report_path, self.task_info) 288 self.parsed_data = parsed_data 289 self.exec_info, summary, _ = parsed_data 290 291 if self._check_mode(ModeType.decc): 292 return 293 294 LOG.info("Summary result: modules: %s, run modules: %s, total: " 295 "%s, passed: %s, failed: %s, blocked: %s, ignored: %s, " 296 "unavailable: %s", summary.modules, summary.run_modules, 297 summary.result.total, summary.result.passed, 298 summary.result.failed, summary.result.blocked, 299 summary.result.ignored, summary.result.unavailable) 300 LOG.info("Log path: %s", self.exec_info.log_path) 301 302 # generate summary vision report 303 report_generate_flag = self._generate_vision_report( 304 parsed_data, ReportConstant.summary_title, 305 ReportConstant.summary_vision_report) 306 307 # generate details vision report 308 if report_generate_flag and summary.result.total > 0: 309 self._generate_vision_report( 310 parsed_data, ReportConstant.details_title, 311 ReportConstant.details_vision_report) 312 313 # generate failures vision report 314 if summary.result.total != ( 315 summary.result.passed + summary.result.ignored) or \ 316 summary.result.unavailable > 0: 317 self._generate_vision_report( 318 parsed_data, ReportConstant.failures_title, 319 ReportConstant.failures_vision_report) 320 321 def _generate_vision_report(self, parsed_data, title, render_target): 322 323 # render data 324 report_context = self.vision_helper.render_data( 325 title, parsed_data, render_target=render_target) 326 327 # generate report 328 if report_context: 329 report_path = os.path.join(self.report_path, render_target) 330 self.vision_helper.generate_report(report_path, report_context) 331 return True 332 else: 333 LOG.error("Failed to generate %s", render_target) 334 return False 335 336 @property 337 def summary_data_report_exist(self): 338 return "<" in self.summary_data_str or \ 339 os.path.exists(self.summary_data_path) 340 341 @property 342 def data_reports(self): 343 if check_pub_key_exist() or self._check_mode(ModeType.decc): 344 from xdevice import SuiteReporter 345 suite_reports = SuiteReporter.get_report_result() 346 if self._check_mode(ModeType.decc): 347 LOG.debug("Handle history result, data reports length:{}". 348 format(len(suite_reports))) 349 SuiteReporter.clear_history_result() 350 SuiteReporter.append_history_result(suite_reports) 351 data_reports = [] 352 for report_path, report_result in suite_reports: 353 module_name = get_filename_extension(report_path)[0] 354 data_reports.append((report_result, module_name)) 355 SuiteReporter.clear_report_result() 356 return data_reports 357 358 if not os.path.isdir(self.report_path): 359 return [] 360 data_reports = [] 361 result_path = os.path.join(self.report_path, "result") 362 for root, _, files in os.walk(self.report_path): 363 for file_name in files: 364 if not file_name.endswith(self.data_helper.DATA_REPORT_SUFFIX): 365 continue 366 module_name = self._find_module_name(result_path, root) 367 data_reports.append((os.path.join(root, file_name), 368 module_name)) 369 return data_reports 370 371 @classmethod 372 def _find_module_name(cls, result_path, root): 373 # find module name from directory tree 374 common_path = os.path.commonpath([result_path, root]) 375 if os.path.normcase(result_path) != os.path.normcase(common_path) or \ 376 os.path.normcase(result_path) == os.path.normcase(root): 377 return ReportConstant.empty_name 378 379 root_dir, module_name = os.path.split(root) 380 if os.path.normcase(result_path) == os.path.normcase(root_dir): 381 return ReportConstant.empty_name 382 root_dir, subsystem_name = os.path.split(root_dir) 383 while os.path.normcase(result_path) != os.path.normcase(root_dir): 384 module_name = subsystem_name 385 root_dir, subsystem_name = os.path.split(root_dir) 386 return module_name 387 388 def _generate_summary(self): 389 if not self.summary_data_report_exist or \ 390 self._check_mode(ModeType.decc): 391 return 392 summary_ini_content = \ 393 "[default]\n" \ 394 "Platform=%s\n" \ 395 "Test Type=%s\n" \ 396 "Device Name=%s\n" \ 397 "Host Info=%s\n" \ 398 "Test Start/ End Time=%s\n" \ 399 "Execution Time=%s\n" % ( 400 self.exec_info.platform, self.exec_info.test_type, 401 self.exec_info.device_name, self.exec_info.host_info, 402 self.exec_info.test_time, self.exec_info.execute_time) 403 if self.exec_info.product_info: 404 for key, value in self.exec_info.product_info.items(): 405 summary_ini_content = "{}{}".format( 406 summary_ini_content, "%s=%s\n" % (key, value)) 407 408 if not self._check_mode(ModeType.factory): 409 summary_ini_content = "{}{}".format( 410 summary_ini_content, "Log Path=%s\n" % self.exec_info.log_path) 411 412 # write summary_ini_content 413 summary_filepath = os.path.join(self.report_path, 414 ReportConstant.summary_ini) 415 416 if platform.system() == "Windows": 417 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY 418 else: 419 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND 420 summary_filepath_open = os.open(summary_filepath, flags, 421 FilePermission.mode_755) 422 423 with os.fdopen(summary_filepath_open, "wb") as file_handler: 424 if check_pub_key_exist(): 425 try: 426 cipher_text = do_rsa_encrypt(summary_ini_content) 427 except ParamError as error: 428 LOG.error(error, error_no=error.error_no) 429 cipher_text = b"" 430 file_handler.write(cipher_text) 431 else: 432 file_handler.write(bytes(summary_ini_content, 'utf-8')) 433 file_handler.flush() 434 LOG.info("Generate summary ini: %s", summary_filepath) 435 436 def _copy_report(self): 437 from xdevice import Scheduler 438 if Scheduler.upload_address or self._check_mode(ModeType.decc): 439 return 440 441 from xdevice import Variables 442 dst_path = os.path.join(Variables.temp_dir, "latest") 443 try: 444 shutil.rmtree(dst_path, ignore_errors=True) 445 os.makedirs(dst_path, exist_ok=True) 446 LOG.info("Copy summary files to %s", dst_path) 447 448 # copy reports to reports/latest folder 449 for report_file in os.listdir(self.report_path): 450 src_file = os.path.join(self.report_path, report_file) 451 dst_file = os.path.join(dst_path, report_file) 452 if os.path.isfile(src_file): 453 shutil.copyfile(src_file, dst_file) 454 except OSError as _: 455 return 456 457 def _compress_report_folder(self): 458 if self._check_mode(ModeType.decc) or \ 459 self._check_mode(ModeType.factory): 460 return 461 462 if not os.path.isdir(self.report_path): 463 LOG.error("'%s' is not folder!" % self.report_path) 464 return 465 466 # get file path list 467 file_path_list = [] 468 for dir_path, _, file_names in os.walk(self.report_path): 469 f_path = dir_path.replace(self.report_path, '') 470 f_path = f_path and f_path + os.sep or '' 471 for filename in file_names: 472 file_path_list.append( 473 (os.path.join(dir_path, filename), f_path + filename)) 474 475 # compress file 476 zipped_file = "%s.zip" % os.path.join( 477 self.report_path, os.path.basename(self.report_path)) 478 zip_object = zipfile.ZipFile(zipped_file, 'w', zipfile.ZIP_DEFLATED, 479 allowZip64=True) 480 try: 481 LOG.info("Executing compress process, please wait...") 482 long_size_file = [] 483 for src_path, target_path in file_path_list: 484 long_size_file.append((src_path, target_path)) 485 self._write_long_size_file(zip_object, long_size_file) 486 487 LOG.info("Generate zip file: %s", zipped_file) 488 except zipfile.BadZipFile as bad_error: 489 LOG.error("Zip report folder error: %s" % bad_error.args) 490 finally: 491 zip_object.close() 492 493 # generate hex digest, then save it to summary_report.hash 494 hash_file = os.path.abspath(os.path.join( 495 self.report_path, ReportConstant.summary_report_hash)) 496 hash_file_open = os.open(hash_file, os.O_WRONLY | os.O_CREAT | 497 os.O_APPEND, FilePermission.mode_755) 498 with os.fdopen(hash_file_open, "w") as hash_file_handler: 499 hash_file_handler.write(get_file_summary(zipped_file)) 500 LOG.info("Generate hash file: %s", hash_file) 501 hash_file_handler.flush() 502 return zipped_file 503 504 @classmethod 505 def _check_mode(cls, mode): 506 from xdevice import Scheduler 507 return Scheduler.mode == mode 508 509 def _generate_task_info_record(self): 510 # under encryption status, don't handle anything directly 511 if check_pub_key_exist() and not self._check_mode(ModeType.decc): 512 return 513 514 # get info from command_queue 515 from xdevice import Scheduler 516 if not Scheduler.command_queue: 517 return 518 _, command, report_path = Scheduler.command_queue[-1] 519 520 # handle parsed data 521 record = self._parse_record_from_data(command, report_path) 522 523 def encode(content): 524 # inner function to encode 525 return ' '.join([bin(ord(c)).replace('0b', '') for c in content]) 526 527 # write into file 528 import json 529 record_file = os.path.join(self.report_path, 530 ReportConstant.task_info_record) 531 _record_json = json.dumps(record, indent=2) 532 533 with open(file=record_file, mode="wb") as file: 534 if Scheduler.mode == ModeType.decc: 535 # under decc, write in encoded text 536 file.write(bytes(encode(_record_json), encoding="utf-8")) 537 else: 538 # others, write in plain text 539 file.write(bytes(_record_json, encoding="utf-8")) 540 541 LOG.info("Generate record file: %s", record_file) 542 543 def _parse_record_from_data(self, command, report_path): 544 record = dict() 545 if self.parsed_data: 546 _, _, suites = self.parsed_data 547 unsuccessful = dict() 548 module_set = set() 549 for suite in suites: 550 module_set.add(suite.module_name) 551 552 failed = unsuccessful.get(suite.module_name, []) 553 # because suite not contains case's some attribute, 554 # for example, 'module', 'classname', 'name' . so 555 # if unavailable, only add module's name into list. 556 if int(suite.result.unavailable) > 0: 557 failed.append(suite.module_name) 558 else: 559 # others, get key attributes join string 560 for case in suite.get_cases(): 561 if not case.is_passed(): 562 failed.append( 563 "{}#{}".format(case.classname, case.name)) 564 unsuccessful.update({suite.module_name: failed}) 565 data_reports = self._get_data_reports(module_set) 566 record = {"command": command, 567 "session_id": os.path.split(report_path)[-1], 568 "report_path": report_path, 569 "unsuccessful_params": unsuccessful, 570 "data_reports": data_reports 571 } 572 return record 573 574 def _get_data_reports(self, module_set): 575 data_reports = dict() 576 if self._check_mode(ModeType.decc): 577 from xdevice import SuiteReporter 578 for module_name, report_path, report_result in \ 579 SuiteReporter.get_history_result_list(): 580 if module_name in module_set: 581 data_reports.update({module_name: report_path}) 582 else: 583 for report_path, module_name in self.data_reports: 584 if module_name == ReportConstant.empty_name: 585 root = self.data_helper.parse_data_report(report_path) 586 module_name = self._get_module_name(report_path, root) 587 if module_name in module_set: 588 data_reports.update({module_name: report_path}) 589 590 return data_reports 591 592 @classmethod 593 def get_task_info_params(cls, history_path): 594 # under encryption status, don't handle anything directly 595 if check_pub_key_exist() and not cls._check_mode(ModeType.decc): 596 return () 597 598 def decode(content): 599 return ''.join([chr(i) for i in [int(b, 2) for b in 600 content.split(' ')]]) 601 602 record_path = os.path.join(history_path, 603 ReportConstant.task_info_record) 604 if not os.path.exists(record_path): 605 LOG.error("%s not exists!", ReportConstant.task_info_record) 606 return () 607 608 import json 609 from xdevice import Scheduler 610 with open(record_path, mode="rb") as file: 611 if Scheduler.mode == ModeType.decc: 612 # under decc, read from encoded text 613 result = json.loads(decode(file.read().decode("utf-8"))) 614 else: 615 # others, read from plain text 616 result = json.loads(file.read()) 617 standard_length = 5 618 if not len(result.keys()) == standard_length: 619 LOG.error("%s error!", ReportConstant.task_info_record) 620 return () 621 622 return result 623 624 @classmethod 625 def set_summary_report_result(cls, summary_data_path, result_xml): 626 cls.summary_report_result.clear() 627 cls.summary_report_result.append((summary_data_path, result_xml)) 628 629 @classmethod 630 def get_result_of_summary_report(cls): 631 if cls.summary_report_result: 632 return cls.summary_report_result[0][1] 633 634 @classmethod 635 def summary_report_result_exists(cls): 636 return True if cls.summary_report_result else False 637 638 @classmethod 639 def get_path_of_summary_report(cls): 640 if cls.summary_report_result: 641 return cls.summary_report_result[0][0] 642 643 @classmethod 644 def _write_long_size_file(cls, zip_object, long_size_file): 645 for filename, arcname in long_size_file: 646 zip_info = zipfile.ZipInfo.from_file(filename, arcname) 647 zip_info.compress_type = getattr(zip_object, "compression", 648 zipfile.ZIP_DEFLATED) 649 if hasattr(zip_info, "_compresslevel"): 650 _compress_level = getattr(zip_object, "compresslevel", None) 651 setattr(zip_info, "_compresslevel", _compress_level) 652 with open(filename, "rb") as src, \ 653 zip_object.open(zip_info, "w") as des: 654 shutil.copyfileobj(src, des, 1024 * 1024 * 8) 655 656 def _transact_all(self): 657 from xdevice import Variables 658 tools_dir = os.path.join(Variables.res_dir, "tools", "binder.pyc") 659 if not os.path.exists(tools_dir): 660 return 661 module_spec = util.spec_from_file_location( 662 "binder", tools_dir) 663 if not module_spec: 664 return 665 module = util.module_from_spec(module_spec) 666 module_spec.loader.exec_module(module) 667 if hasattr(module, "transact") and callable(module.transact): 668 module.transact(self, LOG) 669 del module 670