• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import platform
21import shutil
22import time
23import zipfile
24from importlib import util
25from ast import literal_eval
26
27from _core.interface import IReporter
28from _core.plugin import Plugin
29from _core.constants import ModeType
30from _core.constants import TestType
31from _core.constants import FilePermission
32from _core.logger import platform_logger
33from _core.exception import ParamError
34from _core.utils import get_filename_extension
35from _core.report.encrypt import check_pub_key_exist
36from _core.report.encrypt import do_rsa_encrypt
37from _core.report.encrypt import get_file_summary
38from _core.report.reporter_helper import DataHelper
39from _core.report.reporter_helper import ExecInfo
40from _core.report.reporter_helper import VisionHelper
41from _core.report.reporter_helper import ReportConstant
42
43LOG = platform_logger("ResultReporter")
44
45
46@Plugin(type=Plugin.REPORTER, id=TestType.all)
47class ResultReporter(IReporter):
48    summary_report_result = []
49
50    def __init__(self):
51        self.report_path = None
52        self.task_info = None
53        self.summary_data_path = None
54        self.summary_data_str = ""
55        self.exec_info = None
56        self.parsed_data = None
57        self.data_helper = None
58        self.vision_helper = None
59
60    def __generate_reports__(self, report_path, **kwargs):
61        LOG.info("")
62        LOG.info("**************************************************")
63        LOG.info("************** Start generate reports ************")
64        LOG.info("**************************************************")
65        LOG.info("")
66
67        if self._check_params(report_path, **kwargs):
68            # generate data report
69            self._generate_data_report()
70
71            # generate vision reports
72            self._generate_vision_reports()
73
74            # generate task info record
75            self._generate_task_info_record()
76
77            # generate summary ini
78            self._generate_summary()
79
80            # copy reports to reports/latest folder
81            self._copy_report()
82
83            self._transact_all()
84
85        LOG.info("")
86        LOG.info("**************************************************")
87        LOG.info("************** Ended generate reports ************")
88        LOG.info("**************************************************")
89        LOG.info("")
90
91    def _check_params(self, report_path, **kwargs):
92        task_info = kwargs.get("task_info", "")
93        if not report_path:
94            LOG.error("Report path is wrong", error_no="00440",
95                      ReportPath=report_path)
96            return False
97        if not task_info or not isinstance(task_info, ExecInfo):
98            LOG.error("Task info is wrong", error_no="00441",
99                      TaskInfo=task_info)
100            return False
101
102        os.makedirs(report_path, exist_ok=True)
103        self.report_path = report_path
104        self.task_info = task_info
105        self.summary_data_path = os.path.join(
106            self.report_path, ReportConstant.summary_data_report)
107        self.exec_info = task_info
108        self.data_helper = DataHelper()
109        self.vision_helper = VisionHelper()
110        return True
111
112    def _generate_data_report(self):
113        # initial element
114        test_suites_element = self.data_helper.initial_suites_element()
115
116        # update test suites element
117        update_flag = self._update_test_suites(test_suites_element)
118        if not update_flag:
119            return
120
121        # generate report
122        if not self._check_mode(ModeType.decc):
123            self.data_helper.generate_report(test_suites_element,
124                                             self.summary_data_path)
125
126        # set SuiteReporter.suite_report_result
127        if not check_pub_key_exist() and not self._check_mode(
128                ModeType.decc):
129            return
130        self.set_summary_report_result(
131            self.summary_data_path, DataHelper.to_string(test_suites_element))
132
133    def _update_test_suites(self, test_suites_element):
134        # initial attributes for test suites element
135        test_suites_attributes, need_update_attributes = \
136            self._init_attributes()
137
138        # get test suite elements that are children of test suites element
139        modules = dict()
140        test_suite_elements = []
141        for data_report, module_name in self.data_reports:
142            if data_report.endswith(ReportConstant.summary_data_report):
143                continue
144            root = self.data_helper.parse_data_report(data_report)
145            if module_name == ReportConstant.empty_name:
146                module_name = self._get_module_name(data_report, root)
147            total = int(root.get(ReportConstant.tests, 0))
148            modules[module_name] = modules.get(module_name, 0) + total
149
150            self._append_product_info(test_suites_attributes, root)
151            for child in root:
152                child.tail = self.data_helper.LINE_BREAK_INDENT
153                if not child.get(ReportConstant.module_name) or child.get(
154                        ReportConstant.module_name) == \
155                        ReportConstant.empty_name:
156                    child.set(ReportConstant.module_name, module_name)
157                self._check_tests_and_unavailable(child)
158                # covert the status of "notrun" to "ignored"
159                for element in child:
160                    if element.get(ReportConstant.status, "") == \
161                            ReportConstant.not_run:
162                        ignored = int(child.get(ReportConstant.ignored, 0)) + 1
163                        child.set(ReportConstant.ignored, "%s" % ignored)
164                test_suite_elements.append(child)
165                for update_attribute in need_update_attributes:
166                    update_value = child.get(update_attribute, 0)
167                    if not update_value:
168                        update_value = 0
169                    test_suites_attributes[update_attribute] += int(
170                        update_value)
171
172        if test_suite_elements:
173            child = test_suite_elements[-1]
174            child.tail = self.data_helper.LINE_BREAK
175        else:
176            LOG.error("Execute result not exists")
177            return False
178
179        # set test suites element attributes and children
180        modules_zero = [module_name for module_name, total in modules.items()
181                        if total == 0]
182        if modules_zero:
183            LOG.info("The total tests of %s module is 0", ",".join(
184                modules_zero))
185        test_suites_attributes[ReportConstant.run_modules] = \
186            len(modules) - len(modules_zero)
187        test_suites_attributes[ReportConstant.modules] = len(modules)
188        self.data_helper.set_element_attributes(test_suites_element,
189                                                test_suites_attributes)
190        test_suites_element.extend(test_suite_elements)
191        return True
192
193    @classmethod
194    def _check_tests_and_unavailable(cls, child):
195        total = child.get(ReportConstant.tests, "0")
196        unavailable = child.get(ReportConstant.unavailable, "0")
197        if total and total != "0" and unavailable and \
198                unavailable != "0":
199            child.set(ReportConstant.unavailable, "0")
200            LOG.warning("%s total: %s, unavailable: %s", child.get(
201                ReportConstant.name), total, unavailable)
202
203    @classmethod
204    def _append_product_info(cls, test_suites_attributes, root):
205        product_info = root.get(ReportConstant.product_info, "")
206        if not product_info:
207            return
208        try:
209            product_info = literal_eval(str(product_info))
210        except SyntaxError as error:
211            LOG.error("%s %s", root.get(ReportConstant.name, ""), error.args)
212            product_info = {}
213
214        if not test_suites_attributes[ReportConstant.product_info]:
215            test_suites_attributes[ReportConstant.product_info] = \
216                product_info
217            return
218        for key, value in product_info.items():
219            exist_value = test_suites_attributes[
220                ReportConstant.product_info].get(key, "")
221
222            if not exist_value:
223                test_suites_attributes[
224                    ReportConstant.product_info][key] = value
225                continue
226            if value in exist_value:
227                continue
228            test_suites_attributes[ReportConstant.product_info][key] = \
229                "%s,%s" % (exist_value, value)
230
231    @classmethod
232    def _get_module_name(cls, data_report, root):
233        # get module name from data report
234        module_name = get_filename_extension(data_report)[0]
235        if "report" in module_name or "summary" in module_name or \
236                "<" in data_report or ">" in data_report:
237            module_name = root.get(ReportConstant.name,
238                                   ReportConstant.empty_name)
239            if "report" in module_name or "summary" in module_name:
240                module_name = ReportConstant.empty_name
241        return module_name
242
243    def _init_attributes(self):
244        test_suites_attributes = {
245            ReportConstant.name:
246                ReportConstant.summary_data_report.split(".")[0],
247            ReportConstant.start_time: self.task_info.test_time,
248            ReportConstant.end_time: time.strftime(ReportConstant.time_format,
249                                                   time.localtime()),
250            ReportConstant.errors: 0, ReportConstant.disabled: 0,
251            ReportConstant.failures: 0, ReportConstant.tests: 0,
252            ReportConstant.ignored: 0, ReportConstant.unavailable: 0,
253            ReportConstant.product_info: self.task_info.product_info,
254            ReportConstant.modules: 0, ReportConstant.run_modules: 0}
255        need_update_attributes = [ReportConstant.tests, ReportConstant.ignored,
256                                  ReportConstant.failures,
257                                  ReportConstant.disabled,
258                                  ReportConstant.errors,
259                                  ReportConstant.unavailable]
260        return test_suites_attributes, need_update_attributes
261
262    def _generate_vision_reports(self):
263        if not self._check_mode(ModeType.decc) and not \
264                self.summary_data_report_exist:
265            LOG.error("Summary data report not exists")
266            return
267
268        if check_pub_key_exist() or self._check_mode(ModeType.decc):
269            if not self.summary_report_result_exists():
270                LOG.error("Summary data report not exists")
271                return
272            self.summary_data_str = \
273                self.get_result_of_summary_report()
274            if check_pub_key_exist():
275                from xdevice import SuiteReporter
276                SuiteReporter.clear_report_result()
277
278        # parse data
279        if self.summary_data_str:
280            # only in decc mode and pub key, self.summary_data_str is not empty
281            summary_element_tree = self.data_helper.parse_data_report(
282                self.summary_data_str)
283        else:
284            summary_element_tree = self.data_helper.parse_data_report(
285                self.summary_data_path)
286        parsed_data = self.vision_helper.parse_element_data(
287            summary_element_tree, self.report_path, self.task_info)
288        self.parsed_data = parsed_data
289        self.exec_info, summary, _ = parsed_data
290
291        if self._check_mode(ModeType.decc):
292            return
293
294        LOG.info("Summary result: modules: %s, run modules: %s, total: "
295                 "%s, passed: %s, failed: %s, blocked: %s, ignored: %s, "
296                 "unavailable: %s", summary.modules, summary.run_modules,
297                 summary.result.total, summary.result.passed,
298                 summary.result.failed, summary.result.blocked,
299                 summary.result.ignored, summary.result.unavailable)
300        LOG.info("Log path: %s", self.exec_info.log_path)
301
302        if summary.result.failed != 0 or summary.result.blocked != 0 or summary.result.unavailable != 0:
303            from xdevice import Scheduler
304            Scheduler.is_need_auto_retry = True
305
306        # generate summary vision report
307        report_generate_flag = self._generate_vision_report(
308            parsed_data, ReportConstant.summary_title,
309            ReportConstant.summary_vision_report)
310
311        # generate details vision report
312        if report_generate_flag and summary.result.total > 0:
313            self._generate_vision_report(
314                parsed_data, ReportConstant.details_title,
315                ReportConstant.details_vision_report)
316
317        # generate failures vision report
318        if summary.result.total != (
319                summary.result.passed + summary.result.ignored) or \
320                summary.result.unavailable > 0:
321            self._generate_vision_report(
322                parsed_data, ReportConstant.failures_title,
323                ReportConstant.failures_vision_report)
324
325    def _generate_vision_report(self, parsed_data, title, render_target):
326
327        # render data
328        report_context = self.vision_helper.render_data(
329            title, parsed_data, render_target=render_target)
330
331        # generate report
332        if report_context:
333            report_path = os.path.join(self.report_path, render_target)
334            self.vision_helper.generate_report(report_path, report_context)
335            return True
336        else:
337            LOG.error("Failed to generate %s", render_target)
338            return False
339
340    @property
341    def summary_data_report_exist(self):
342        return "<" in self.summary_data_str or \
343               os.path.exists(self.summary_data_path)
344
345    @property
346    def data_reports(self):
347        if check_pub_key_exist() or self._check_mode(ModeType.decc):
348            from xdevice import SuiteReporter
349            suite_reports = SuiteReporter.get_report_result()
350            if self._check_mode(ModeType.decc):
351                LOG.debug("Handle history result, data reports length:{}".
352                          format(len(suite_reports)))
353                SuiteReporter.clear_history_result()
354                SuiteReporter.append_history_result(suite_reports)
355            data_reports = []
356            for report_path, report_result in suite_reports:
357                module_name = get_filename_extension(report_path)[0]
358                data_reports.append((report_result, module_name))
359            SuiteReporter.clear_report_result()
360            return data_reports
361
362        if not os.path.isdir(self.report_path):
363            return []
364        data_reports = []
365        result_path = os.path.join(self.report_path, "result")
366        for root, _, files in os.walk(self.report_path):
367            for file_name in files:
368                if not file_name.endswith(self.data_helper.DATA_REPORT_SUFFIX):
369                    continue
370                module_name = self._find_module_name(result_path, root)
371                data_reports.append((os.path.join(root, file_name),
372                                     module_name))
373        return data_reports
374
375    @classmethod
376    def _find_module_name(cls, result_path, root):
377        # find module name from directory tree
378        common_path = os.path.commonpath([result_path, root])
379        if os.path.normcase(result_path) != os.path.normcase(common_path) or \
380                os.path.normcase(result_path) == os.path.normcase(root):
381            return ReportConstant.empty_name
382
383        root_dir, module_name = os.path.split(root)
384        if os.path.normcase(result_path) == os.path.normcase(root_dir):
385            return ReportConstant.empty_name
386        root_dir, subsystem_name = os.path.split(root_dir)
387        while os.path.normcase(result_path) != os.path.normcase(root_dir):
388            module_name = subsystem_name
389            root_dir, subsystem_name = os.path.split(root_dir)
390        return module_name
391
392    def _generate_summary(self):
393        if not self.summary_data_report_exist or \
394                self._check_mode(ModeType.decc):
395            return
396        summary_ini_content = \
397            "[default]\n" \
398            "Platform=%s\n" \
399            "Test Type=%s\n" \
400            "Device Name=%s\n" \
401            "Host Info=%s\n" \
402            "Test Start/ End Time=%s\n" \
403            "Execution Time=%s\n" % (
404                self.exec_info.platform, self.exec_info.test_type,
405                self.exec_info.device_name, self.exec_info.host_info,
406                self.exec_info.test_time, self.exec_info.execute_time)
407        if self.exec_info.product_info:
408            for key, value in self.exec_info.product_info.items():
409                summary_ini_content = "{}{}".format(
410                    summary_ini_content, "%s=%s\n" % (key, value))
411
412        if not self._check_mode(ModeType.factory):
413            summary_ini_content = "{}{}".format(
414                summary_ini_content, "Log Path=%s\n" % self.exec_info.log_path)
415
416        # write summary_ini_content
417        summary_filepath = os.path.join(self.report_path,
418                                        ReportConstant.summary_ini)
419
420        if platform.system() == "Windows":
421            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY
422        else:
423            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
424        summary_filepath_open = os.open(summary_filepath, flags,
425                                        FilePermission.mode_755)
426
427        with os.fdopen(summary_filepath_open, "wb") as file_handler:
428            if check_pub_key_exist():
429                try:
430                    cipher_text = do_rsa_encrypt(summary_ini_content)
431                except ParamError as error:
432                    LOG.error(error, error_no=error.error_no)
433                    cipher_text = b""
434                file_handler.write(cipher_text)
435            else:
436                file_handler.write(bytes(summary_ini_content, 'utf-8'))
437            file_handler.flush()
438            LOG.info("Generate summary ini: %s", summary_filepath)
439
440    def _copy_report(self):
441        from xdevice import Scheduler
442        if Scheduler.upload_address or self._check_mode(ModeType.decc):
443            return
444
445        from xdevice import Variables
446        dst_path = os.path.join(Variables.temp_dir, "latest")
447        try:
448            shutil.rmtree(dst_path, ignore_errors=True)
449            os.makedirs(dst_path, exist_ok=True)
450            LOG.info("Copy summary files to %s", dst_path)
451
452            # copy reports to reports/latest folder
453            for report_file in os.listdir(self.report_path):
454                src_file = os.path.join(self.report_path, report_file)
455                dst_file = os.path.join(dst_path, report_file)
456                if os.path.isfile(src_file):
457                    shutil.copyfile(src_file, dst_file)
458        except OSError as _:
459            return
460
461    def _compress_report_folder(self):
462        if self._check_mode(ModeType.decc) or \
463                self._check_mode(ModeType.factory):
464            return
465
466        if not os.path.isdir(self.report_path):
467            LOG.error("'%s' is not folder!" % self.report_path)
468            return
469
470        # get file path list
471        file_path_list = []
472        for dir_path, _, file_names in os.walk(self.report_path):
473            f_path = dir_path.replace(self.report_path, '')
474            f_path = f_path and f_path + os.sep or ''
475            for filename in file_names:
476                file_path_list.append(
477                    (os.path.join(dir_path, filename), f_path + filename))
478
479        # compress file
480        zipped_file = "%s.zip" % os.path.join(
481            self.report_path, os.path.basename(self.report_path))
482        zip_object = zipfile.ZipFile(zipped_file, 'w', zipfile.ZIP_DEFLATED,
483                                     allowZip64=True)
484        try:
485            LOG.info("Executing compress process, please wait...")
486            long_size_file = []
487            for src_path, target_path in file_path_list:
488                long_size_file.append((src_path, target_path))
489            self._write_long_size_file(zip_object, long_size_file)
490
491            LOG.info("Generate zip file: %s", zipped_file)
492        except zipfile.BadZipFile as bad_error:
493            LOG.error("Zip report folder error: %s" % bad_error.args)
494        finally:
495            zip_object.close()
496
497        # generate hex digest, then save it to summary_report.hash
498        hash_file = os.path.abspath(os.path.join(
499            self.report_path, ReportConstant.summary_report_hash))
500        hash_file_open = os.open(hash_file, os.O_WRONLY | os.O_CREAT |
501                                 os.O_APPEND, FilePermission.mode_755)
502        with os.fdopen(hash_file_open, "w") as hash_file_handler:
503            hash_file_handler.write(get_file_summary(zipped_file))
504            LOG.info("Generate hash file: %s", hash_file)
505            hash_file_handler.flush()
506        return zipped_file
507
508    @classmethod
509    def _check_mode(cls, mode):
510        from xdevice import Scheduler
511        return Scheduler.mode == mode
512
513    def _generate_task_info_record(self):
514        # under encryption status, don't handle anything directly
515        if check_pub_key_exist() and not self._check_mode(ModeType.decc):
516            return
517
518        # get info from command_queue
519        from xdevice import Scheduler
520        if not Scheduler.command_queue:
521            return
522        _, command, report_path = Scheduler.command_queue[-1]
523
524        # handle parsed data
525        record = self._parse_record_from_data(command, report_path)
526
527        def encode(content):
528            # inner function to encode
529            return ' '.join([bin(ord(c)).replace('0b', '') for c in content])
530
531        # write into file
532        import json
533        record_file = os.path.join(self.report_path,
534                                   ReportConstant.task_info_record)
535        _record_json = json.dumps(record, indent=2)
536
537        with open(file=record_file, mode="wb") as file:
538            if Scheduler.mode == ModeType.decc:
539                # under decc, write in encoded text
540                file.write(bytes(encode(_record_json), encoding="utf-8"))
541            else:
542                # others, write in plain text
543                file.write(bytes(_record_json, encoding="utf-8"))
544
545        LOG.info("Generate record file: %s", record_file)
546
547    def _parse_record_from_data(self, command, report_path):
548        record = dict()
549        if self.parsed_data:
550            _, _, suites = self.parsed_data
551            unsuccessful = dict()
552            module_set = set()
553            for suite in suites:
554                module_set.add(suite.module_name)
555
556                failed = unsuccessful.get(suite.module_name, [])
557                # because suite not contains case's some attribute,
558                # for example, 'module', 'classname', 'name' . so
559                # if unavailable, only add module's name into list.
560                if int(suite.result.unavailable) > 0:
561                    failed.append(suite.module_name)
562                else:
563                    # others, get key attributes join string
564                    for case in suite.get_cases():
565                        if not case.is_passed():
566                            failed.append(
567                                "{}#{}".format(case.classname, case.name))
568                unsuccessful.update({suite.module_name: failed})
569            data_reports = self._get_data_reports(module_set)
570            record = {"command": command,
571                      "session_id": os.path.split(report_path)[-1],
572                      "report_path": report_path,
573                      "unsuccessful_params": unsuccessful,
574                      "data_reports": data_reports
575                      }
576        return record
577
578    def _get_data_reports(self, module_set):
579        data_reports = dict()
580        if self._check_mode(ModeType.decc):
581            from xdevice import SuiteReporter
582            for module_name, report_path, report_result in \
583                    SuiteReporter.get_history_result_list():
584                if module_name in module_set:
585                    data_reports.update({module_name: report_path})
586        else:
587            for report_path, module_name in self.data_reports:
588                if module_name == ReportConstant.empty_name:
589                    root = self.data_helper.parse_data_report(report_path)
590                    module_name = self._get_module_name(report_path, root)
591                if module_name in module_set:
592                    data_reports.update({module_name: report_path})
593
594        return data_reports
595
596    @classmethod
597    def get_task_info_params(cls, history_path):
598        # under encryption status, don't handle anything directly
599        if check_pub_key_exist() and not cls._check_mode(ModeType.decc):
600            return ()
601
602        def decode(content):
603            return ''.join([chr(i) for i in [int(b, 2) for b in
604                                             content.split(' ')]])
605
606        record_path = os.path.join(history_path,
607                                   ReportConstant.task_info_record)
608        if not os.path.exists(record_path):
609            LOG.error("%s not exists!", ReportConstant.task_info_record)
610            return ()
611
612        import json
613        from xdevice import Scheduler
614        with open(record_path, mode="rb") as file:
615            if Scheduler.mode == ModeType.decc:
616                # under decc, read from encoded text
617                result = json.loads(decode(file.read().decode("utf-8")))
618            else:
619                # others, read from plain text
620                result = json.loads(file.read())
621        standard_length = 5
622        if not len(result.keys()) == standard_length:
623            LOG.error("%s error!", ReportConstant.task_info_record)
624            return ()
625
626        return result
627
628    @classmethod
629    def set_summary_report_result(cls, summary_data_path, result_xml):
630        cls.summary_report_result.clear()
631        cls.summary_report_result.append((summary_data_path, result_xml))
632
633    @classmethod
634    def get_result_of_summary_report(cls):
635        if cls.summary_report_result:
636            return cls.summary_report_result[0][1]
637
638    @classmethod
639    def summary_report_result_exists(cls):
640        return True if cls.summary_report_result else False
641
642    @classmethod
643    def get_path_of_summary_report(cls):
644        if cls.summary_report_result:
645            return cls.summary_report_result[0][0]
646
647    @classmethod
648    def _write_long_size_file(cls, zip_object, long_size_file):
649        for filename, arcname in long_size_file:
650            zip_info = zipfile.ZipInfo.from_file(filename, arcname)
651            zip_info.compress_type = getattr(zip_object, "compression",
652                                             zipfile.ZIP_DEFLATED)
653            if hasattr(zip_info, "_compresslevel"):
654                _compress_level = getattr(zip_object, "compresslevel", None)
655                setattr(zip_info, "_compresslevel", _compress_level)
656            with open(filename, "rb") as src, \
657                    zip_object.open(zip_info, "w") as des:
658                shutil.copyfileobj(src, des, 1024 * 1024 * 8)
659
660    def _transact_all(self):
661        from xdevice import Variables
662        tools_dir = os.path.join(Variables.res_dir, "tools", "binder.pyc")
663        if not os.path.exists(tools_dir):
664            return
665        module_spec = util.spec_from_file_location(
666            "binder", tools_dir)
667        if not module_spec:
668            return
669        module = util.module_from_spec(module_spec)
670        module_spec.loader.exec_module(module)
671        if hasattr(module, "transact") and callable(module.transact):
672            module.transact(self, LOG)
673        del module
674