1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import platform 21import shutil 22import time 23import zipfile 24from ast import literal_eval 25 26from _core.interface import IReporter 27from _core.plugin import Plugin 28from _core.constants import ModeType 29from _core.constants import TestType 30from _core.logger import platform_logger 31from _core.exception import ParamError 32from _core.utils import get_filename_extension 33from _core.report.encrypt import check_pub_key_exist 34from _core.report.encrypt import do_rsa_encrypt 35from _core.report.encrypt import get_file_summary 36from _core.report.reporter_helper import DataHelper 37from _core.report.reporter_helper import ExecInfo 38from _core.report.reporter_helper import VisionHelper 39from _core.report.reporter_helper import ReportConstant 40 41LOG = platform_logger("ResultReporter") 42 43 44@Plugin(type=Plugin.REPORTER, id=TestType.all) 45class ResultReporter(IReporter): 46 summary_report_result = [] 47 48 def __init__(self): 49 self.report_path = None 50 self.task_info = None 51 self.summary_data_path = None 52 self.summary_data_str = "" 53 self.exec_info = None 54 self.parsed_data = None 55 self.data_helper = None 56 self.vision_helper = None 57 58 def __generate_reports__(self, report_path, **kwargs): 59 LOG.info("") 60 LOG.info("**************************************************") 61 LOG.info("************** Start generate reports ************") 62 LOG.info("**************************************************") 63 LOG.info("") 64 65 if self._check_params(report_path, **kwargs): 66 # generate data report 67 self._generate_data_report() 68 69 # generate vision reports 70 self._generate_vision_reports() 71 72 # generate task info record 73 self._generate_task_info_record() 74 75 # generate summary ini 76 self._generate_summary() 77 78 # copy reports to reports/latest folder 79 self._copy_report() 80 81 # compress report folder 82 self._compress_report_folder() 83 84 LOG.info("") 85 LOG.info("**************************************************") 86 LOG.info("************** Ended generate reports ************") 87 LOG.info("**************************************************") 88 LOG.info("") 89 90 def _check_params(self, report_path, **kwargs): 91 task_info = kwargs.get("task_info", "") 92 if not report_path: 93 LOG.error("report path is wrong", error_no="00440", 94 ReportPath=report_path) 95 return False 96 if not task_info or not isinstance(task_info, ExecInfo): 97 LOG.error("task info is wrong", error_no="00441", 98 TaskInfo=task_info) 99 return False 100 101 os.makedirs(report_path, exist_ok=True) 102 self.report_path = report_path 103 self.task_info = task_info 104 self.summary_data_path = os.path.join( 105 self.report_path, ReportConstant.summary_data_report) 106 self.exec_info = task_info 107 self.data_helper = DataHelper() 108 self.vision_helper = VisionHelper() 109 return True 110 111 def _generate_data_report(self): 112 # initial element 113 test_suites_element = self.data_helper.initial_suites_element() 114 115 # update test suites element 116 update_flag = self._update_test_suites(test_suites_element) 117 if not update_flag: 118 return 119 120 # generate report 121 if not self._check_mode(ModeType.decc): 122 self.data_helper.generate_report(test_suites_element, 123 self.summary_data_path) 124 125 # set SuiteReporter.suite_report_result 126 if not check_pub_key_exist() and not self._check_mode( 127 ModeType.decc): 128 return 129 self.set_summary_report_result( 130 self.summary_data_path, DataHelper.to_string(test_suites_element)) 131 132 if self._check_mode(ModeType.decc): 133 try: 134 from agent.decc import Handler 135 from xdevice import Scheduler 136 LOG.info("upload task summary result to decc") 137 Handler.upload_task_summary_results( 138 self.get_result_of_summary_report()) 139 except ModuleNotFoundError as error: 140 LOG.error("module not found %s", error.args) 141 142 def _update_test_suites(self, test_suites_element): 143 # initial attributes for test suites element 144 test_suites_attributes, need_update_attributes = \ 145 self._init_attributes() 146 147 # get test suite elements that are children of test suites element 148 modules = dict() 149 test_suite_elements = [] 150 for data_report, module_name in self.data_reports: 151 if data_report.endswith(ReportConstant.summary_data_report): 152 continue 153 root = self.data_helper.parse_data_report(data_report) 154 if module_name == ReportConstant.empty_name: 155 module_name = self._get_module_name(data_report, root) 156 total = int(root.get(ReportConstant.tests, 0)) 157 modules[module_name] = modules.get(module_name, 0) + total 158 159 self._append_product_info(test_suites_attributes, root) 160 for child in root: 161 child.tail = self.data_helper.LINE_BREAK_INDENT 162 if not child.get(ReportConstant.module_name) or child.get( 163 ReportConstant.module_name) == \ 164 ReportConstant.empty_name: 165 child.set(ReportConstant.module_name, module_name) 166 self._check_tests_and_unavailable(child) 167 # covert "notrun" to "ignored" for the test case status 168 for element in child: 169 if element.get(ReportConstant.status, "") == \ 170 ReportConstant.not_run: 171 ignored = int(child.get(ReportConstant.ignored, 0)) + 1 172 child.set(ReportConstant.ignored, "%s" % ignored) 173 test_suite_elements.append(child) 174 for update_attribute in need_update_attributes: 175 update_value = child.get(update_attribute, 0) 176 if not update_value: 177 update_value = 0 178 test_suites_attributes[update_attribute] += int( 179 update_value) 180 181 if test_suite_elements: 182 child = test_suite_elements[-1] 183 child.tail = self.data_helper.LINE_BREAK 184 else: 185 LOG.error("execute result not exists") 186 return False 187 188 # set test suites element attributes and children 189 modules_zero = [module_name for module_name, total in modules.items() 190 if total == 0] 191 if modules_zero: 192 LOG.info("the total tests of %s module is 0", ",".join( 193 modules_zero)) 194 test_suites_attributes[ReportConstant.run_modules] = \ 195 len(modules) - len(modules_zero) 196 test_suites_attributes[ReportConstant.modules] = len(modules) 197 self.data_helper.set_element_attributes(test_suites_element, 198 test_suites_attributes) 199 test_suites_element.extend(test_suite_elements) 200 return True 201 202 @classmethod 203 def _check_tests_and_unavailable(cls, child): 204 total = child.get(ReportConstant.tests, "0") 205 unavailable = child.get(ReportConstant.unavailable, "0") 206 if total and total != "0" and unavailable and \ 207 unavailable != "0": 208 child.set(ReportConstant.unavailable, "0") 209 LOG.warning("%s total: %s, unavailable: %s", child.get( 210 ReportConstant.name), total, unavailable) 211 212 @classmethod 213 def _append_product_info(cls, test_suites_attributes, root): 214 product_info = root.get(ReportConstant.product_info, "") 215 if not product_info: 216 return 217 try: 218 product_info = literal_eval(str(product_info)) 219 except SyntaxError as error: 220 LOG.error("%s %s", root.get(ReportConstant.name, ""), error.args) 221 product_info = {} 222 223 if not test_suites_attributes[ReportConstant.product_info]: 224 test_suites_attributes[ReportConstant.product_info] = \ 225 product_info 226 return 227 for key, value in product_info.items(): 228 exist_value = test_suites_attributes[ 229 ReportConstant.product_info].get(key, "") 230 231 if not exist_value: 232 test_suites_attributes[ 233 ReportConstant.product_info][key] = value 234 continue 235 if value in exist_value: 236 continue 237 test_suites_attributes[ReportConstant.product_info][key] = \ 238 "%s,%s" % (exist_value, value) 239 240 @classmethod 241 def _get_module_name(cls, data_report, root): 242 # get module name from data report 243 module_name = get_filename_extension(data_report)[0] 244 if "report" in module_name or "summary" in module_name or \ 245 "<" in data_report or ">" in data_report: 246 module_name = root.get(ReportConstant.name, 247 ReportConstant.empty_name) 248 if "report" in module_name or "summary" in module_name: 249 module_name = ReportConstant.empty_name 250 return module_name 251 252 def _init_attributes(self): 253 test_suites_attributes = { 254 ReportConstant.name: 255 ReportConstant.summary_data_report.split(".")[0], 256 ReportConstant.start_time: self.task_info.test_time, 257 ReportConstant.end_time: time.strftime(ReportConstant.time_format, 258 time.localtime()), 259 ReportConstant.errors: 0, ReportConstant.disabled: 0, 260 ReportConstant.failures: 0, ReportConstant.tests: 0, 261 ReportConstant.ignored: 0, ReportConstant.unavailable: 0, 262 ReportConstant.product_info: self.task_info.product_info, 263 ReportConstant.modules: 0, ReportConstant.run_modules: 0} 264 need_update_attributes = [ReportConstant.tests, ReportConstant.ignored, 265 ReportConstant.failures, 266 ReportConstant.disabled, 267 ReportConstant.errors, 268 ReportConstant.unavailable] 269 return test_suites_attributes, need_update_attributes 270 271 def _generate_vision_reports(self): 272 if not self._check_mode(ModeType.decc) and not \ 273 self.summary_data_report_exist: 274 LOG.error("summary data report not exists") 275 return 276 277 if check_pub_key_exist() or self._check_mode(ModeType.decc): 278 if not self.summary_report_result_exists(): 279 LOG.error("summary data report not exists") 280 return 281 self.summary_data_str = \ 282 self.get_result_of_summary_report() 283 if check_pub_key_exist(): 284 from xdevice import SuiteReporter 285 SuiteReporter.clear_report_result() 286 287 # parse data 288 if self.summary_data_str: 289 # only in decc mode and pub key, self.summary_data_str is not empty 290 summary_element_tree = self.data_helper.parse_data_report( 291 self.summary_data_str) 292 else: 293 summary_element_tree = self.data_helper.parse_data_report( 294 self.summary_data_path) 295 parsed_data = self.vision_helper.parse_element_data( 296 summary_element_tree, self.report_path, self.task_info) 297 self.parsed_data = parsed_data 298 self.exec_info, summary, _ = parsed_data 299 300 if self._check_mode(ModeType.decc): 301 return 302 303 LOG.info("Summary result: modules: %s, run modules: %s, total: " 304 "%s, passed: %s, failed: %s, blocked: %s, ignored: %s, " 305 "unavailable: %s", summary.modules, summary.run_modules, 306 summary.result.total, summary.result.passed, 307 summary.result.failed, summary.result.blocked, 308 summary.result.ignored, summary.result.unavailable) 309 LOG.info("Log path: %s", self.exec_info.log_path) 310 311 # generate summary vision report 312 report_generate_flag = self._generate_vision_report( 313 parsed_data, ReportConstant.summary_title, 314 ReportConstant.summary_vision_report) 315 316 # generate details vision report 317 if report_generate_flag and summary.result.total > 0: 318 self._generate_vision_report( 319 parsed_data, ReportConstant.details_title, 320 ReportConstant.details_vision_report) 321 322 # generate failures vision report 323 if summary.result.total != ( 324 summary.result.passed + summary.result.ignored) or \ 325 summary.result.unavailable > 0: 326 self._generate_vision_report( 327 parsed_data, ReportConstant.failures_title, 328 ReportConstant.failures_vision_report) 329 330 def _generate_vision_report(self, parsed_data, title, render_target): 331 332 # render data 333 report_context = self.vision_helper.render_data( 334 title, parsed_data, render_target=render_target) 335 336 # generate report 337 if report_context: 338 report_path = os.path.join(self.report_path, render_target) 339 self.vision_helper.generate_report(report_path, report_context) 340 return True 341 else: 342 LOG.error("Failed to generate %s", render_target) 343 return False 344 345 @property 346 def summary_data_report_exist(self): 347 return "<" in self.summary_data_str or \ 348 os.path.exists(self.summary_data_path) 349 350 @property 351 def data_reports(self): 352 if check_pub_key_exist() or self._check_mode(ModeType.decc): 353 from xdevice import SuiteReporter 354 suite_reports = SuiteReporter.get_report_result() 355 if self._check_mode(ModeType.decc): 356 LOG.debug("handle history result, data_reports length:{}". 357 format(len(suite_reports))) 358 SuiteReporter.clear_history_result() 359 SuiteReporter.append_history_result(suite_reports) 360 data_reports = [] 361 for report_path, report_result in suite_reports: 362 module_name = get_filename_extension(report_path)[0] 363 data_reports.append((report_result, module_name)) 364 SuiteReporter.clear_report_result() 365 return data_reports 366 367 if not os.path.isdir(self.report_path): 368 return [] 369 data_reports = [] 370 result_path = os.path.join(self.report_path, "result") 371 for root, _, files in os.walk(self.report_path): 372 for file_name in files: 373 if not file_name.endswith(self.data_helper.DATA_REPORT_SUFFIX): 374 continue 375 module_name = self._find_module_name(result_path, root) 376 data_reports.append((os.path.join(root, file_name), 377 module_name)) 378 return data_reports 379 380 @classmethod 381 def _find_module_name(cls, result_path, root): 382 # find module name from directory tree 383 common_path = os.path.commonpath([result_path, root]) 384 if os.path.normcase(result_path) != os.path.normcase(common_path) or \ 385 os.path.normcase(result_path) == os.path.normcase(root): 386 return ReportConstant.empty_name 387 388 root_dir, module_name = os.path.split(root) 389 if os.path.normcase(result_path) == os.path.normcase(root_dir): 390 return ReportConstant.empty_name 391 root_dir, subsystem_name = os.path.split(root_dir) 392 while os.path.normcase(result_path) != os.path.normcase(root_dir): 393 module_name = subsystem_name 394 root_dir, subsystem_name = os.path.split(root_dir) 395 return module_name 396 397 def _generate_summary(self): 398 if not self.summary_data_report_exist or \ 399 self._check_mode(ModeType.decc): 400 return 401 summary_ini_content = \ 402 "[default]\n" \ 403 "Platform=%s\n" \ 404 "Test Type=%s\n" \ 405 "Device Name=%s\n" \ 406 "Host Info=%s\n" \ 407 "Test Start/ End Time=%s\n" \ 408 "Execution Time=%s\n" % ( 409 self.exec_info.platform, self.exec_info.test_type, 410 self.exec_info.device_name, self.exec_info.host_info, 411 self.exec_info.test_time, self.exec_info.execute_time) 412 if self.exec_info.product_info: 413 for key, value in self.exec_info.product_info.items(): 414 summary_ini_content = "{}{}".format( 415 summary_ini_content, "%s=%s\n" % (key, value)) 416 417 if not self._check_mode(ModeType.factory): 418 summary_ini_content = "{}{}".format( 419 summary_ini_content, "Log Path=%s\n" % self.exec_info.log_path) 420 421 # write summary_ini_content 422 summary_filepath = os.path.join(self.report_path, 423 ReportConstant.summary_ini) 424 425 if platform.system() == "Windows": 426 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY 427 else: 428 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND 429 summary_filepath_open = os.open(summary_filepath, flags, 0o755) 430 431 with os.fdopen(summary_filepath_open, "wb") as file_handler: 432 if check_pub_key_exist(): 433 try: 434 cipher_text = do_rsa_encrypt(summary_ini_content) 435 except ParamError as error: 436 LOG.error(error, error_no=error.error_no) 437 cipher_text = b"" 438 file_handler.write(cipher_text) 439 else: 440 file_handler.write(bytes(summary_ini_content, 'utf-8')) 441 file_handler.flush() 442 LOG.info("generate summary ini: %s", summary_filepath) 443 444 def _copy_report(self): 445 from xdevice import Scheduler 446 if Scheduler.upload_address or self._check_mode(ModeType.decc): 447 return 448 449 from xdevice import Variables 450 dst_path = os.path.join(Variables.exec_dir, 451 Variables.report_vars.report_dir, "latest") 452 try: 453 shutil.rmtree(dst_path, ignore_errors=True) 454 os.makedirs(dst_path, exist_ok=True) 455 LOG.info("copy summary files to %s", dst_path) 456 457 # copy reports to reports/latest folder 458 for report_file in os.listdir(self.report_path): 459 src_file = os.path.join(self.report_path, report_file) 460 dst_file = os.path.join(dst_path, report_file) 461 if os.path.isfile(src_file): 462 shutil.copyfile(src_file, dst_file) 463 except OSError: 464 return 465 466 def _compress_report_folder(self): 467 if self._check_mode(ModeType.decc): 468 return 469 470 if not os.path.isdir(self.report_path): 471 LOG.error("'%s' is not folder!" % self.report_path) 472 return 473 474 # get file path list 475 file_path_list = [] 476 for dir_path, _, file_names in os.walk(self.report_path): 477 f_path = dir_path.replace(self.report_path, '') 478 f_path = f_path and f_path + os.sep or '' 479 for filename in file_names: 480 file_path_list.append( 481 (os.path.join(dir_path, filename), f_path + filename)) 482 483 # compress file 484 zipped_file = "%s.zip" % os.path.join( 485 self.report_path, os.path.basename(self.report_path)) 486 zip_object = zipfile.ZipFile(zipped_file, 'w', zipfile.ZIP_DEFLATED, 487 allowZip64=True) 488 try: 489 LOG.info("executing compress process, please wait...") 490 long_size_file = [] 491 for src_path, target_path in file_path_list: 492 long_size_file.append((src_path, target_path)) 493 self._write_long_size_file(zip_object, long_size_file) 494 LOG.info("generate zip file: %s", zipped_file) 495 except zipfile.BadZipFile as bad_error: 496 LOG.error("zip report folder error: %s" % bad_error.args) 497 finally: 498 zip_object.close() 499 500 # generate hex digest, then save it to summary_report.hash 501 hash_file = os.path.abspath(os.path.join( 502 self.report_path, ReportConstant.summary_report_hash)) 503 hash_file_open = os.open(hash_file, 504 os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o755) 505 with os.fdopen(hash_file_open, "w") as hash_file_handler: 506 hash_file_handler.write(get_file_summary(zipped_file)) 507 LOG.info("generate hash file: %s", hash_file) 508 hash_file_handler.flush() 509 return zipped_file 510 511 @classmethod 512 def _check_mode(cls, mode): 513 from xdevice import Scheduler 514 return Scheduler.mode == mode 515 516 def _generate_task_info_record(self): 517 # under encryption status, don't handle anything directly 518 if check_pub_key_exist() and not self._check_mode(ModeType.decc): 519 return 520 521 # get info from command_queue 522 from xdevice import Scheduler 523 if not Scheduler.command_queue: 524 return 525 _, command, report_path = Scheduler.command_queue[-1] 526 527 # handle parsed data 528 record = self._parse_record_from_data(command, report_path) 529 530 def encode(content): 531 # inner function to encode 532 return ' '.join([bin(ord(c)).replace('0b', '') for c in content]) 533 534 # write into file 535 import json 536 record_file = os.path.join(self.report_path, 537 ReportConstant.task_info_record) 538 _record_json = json.dumps(record, indent=2) 539 540 with open(file=record_file, mode="wb") as file: 541 if Scheduler.mode == ModeType.decc: 542 # under decc, write in encoded text 543 file.write(bytes(encode(_record_json), encoding="utf-8")) 544 else: 545 # others, write in plain text 546 file.write(bytes(_record_json, encoding="utf-8")) 547 548 LOG.info("generate record file: %s", record_file) 549 550 def _parse_record_from_data(self, command, report_path): 551 record = dict() 552 if self.parsed_data: 553 _, _, suites = self.parsed_data 554 unsuccessful = dict() 555 module_set = set() 556 for suite in suites: 557 module_set.add(suite.module_name) 558 559 failed = unsuccessful.get(suite.module_name, []) 560 # because suite not contains case's some attribute, 561 # for example, 'module', 'classname', 'name' . so 562 # if unavailable, only add module's name into list. 563 if int(suite.result.unavailable) > 0: 564 failed.append(suite.module_name) 565 else: 566 # others, get key attributes join string 567 for case in suite.get_cases(): 568 if not case.is_passed(): 569 failed.append( 570 "{}#{}".format(case.classname, case.name)) 571 unsuccessful.update({suite.module_name: failed}) 572 data_reports = self._get_data_reports(module_set) 573 record = {"command": command, 574 "session_id": os.path.split(report_path)[-1], 575 "report_path": report_path, 576 "unsuccessful_params": unsuccessful, 577 "data_reports": data_reports 578 } 579 return record 580 581 def _get_data_reports(self, module_set): 582 data_reports = dict() 583 if self._check_mode(ModeType.decc): 584 from xdevice import SuiteReporter 585 for module_name, report_path, report_result in \ 586 SuiteReporter.get_history_result_list(): 587 if module_name in module_set: 588 data_reports.update({module_name: report_path}) 589 else: 590 for report_path, module_name in self.data_reports: 591 if module_name == ReportConstant.empty_name: 592 root = self.data_helper.parse_data_report(report_path) 593 module_name = self._get_module_name(report_path, root) 594 if module_name in module_set: 595 data_reports.update({module_name: report_path}) 596 597 return data_reports 598 599 @classmethod 600 def get_task_info_params(cls, history_path): 601 # under encryption status, don't handle anything directly 602 if check_pub_key_exist() and not cls._check_mode(ModeType.decc): 603 return () 604 605 def decode(content): 606 return ''.join([chr(i) for i in [int(b, 2) for b in 607 content.split(' ')]]) 608 609 record_path = os.path.join(history_path, 610 ReportConstant.task_info_record) 611 if not os.path.exists(record_path): 612 LOG.error("%s not exists!", ReportConstant.task_info_record) 613 return () 614 615 import json 616 from xdevice import Scheduler 617 with open(record_path, mode="rb") as file: 618 if Scheduler.mode == ModeType.decc: 619 # under decc, read from encoded text 620 result = json.loads(decode(file.read().decode("utf-8"))) 621 else: 622 # others, read from plain text 623 result = json.loads(file.read()) 624 if not len(result.keys()) == 5: 625 LOG.error("%s error!", ReportConstant.task_info_record) 626 return () 627 628 return result["session_id"], result["command"], result["report_path"],\ 629 result["unsuccessful_params"], result["data_reports"] 630 631 @classmethod 632 def set_summary_report_result(cls, summary_data_path, result_xml): 633 cls.summary_report_result.clear() 634 cls.summary_report_result.append((summary_data_path, result_xml)) 635 636 @classmethod 637 def get_result_of_summary_report(cls): 638 if cls.summary_report_result: 639 return cls.summary_report_result[0][1] 640 641 @classmethod 642 def summary_report_result_exists(cls): 643 return True if cls.summary_report_result else False 644 645 @classmethod 646 def get_path_of_summary_report(cls): 647 if cls.summary_report_result: 648 return cls.summary_report_result[0][0] 649 650 @classmethod 651 def _write_long_size_file(cls, zip_object, long_size_file): 652 for filename, arcname in long_size_file: 653 zip_info = zipfile.ZipInfo.from_file(filename, arcname) 654 zip_info.compress_type = getattr(zip_object, "compression", 655 zipfile.ZIP_DEFLATED) 656 if hasattr(zip_info, "_compresslevel"): 657 _compress_level = getattr(zip_object, "compresslevel", None) 658 setattr(zip_info, "_compresslevel", _compress_level) 659 with open(filename, "rb") as src, \ 660 zip_object.open(zip_info, "w") as des: 661 shutil.copyfileobj(src, des, 1024 * 1024 * 8) 662