• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import platform
21import shutil
22import time
23import zipfile
24from importlib import util
25from ast import literal_eval
26
27from _core.interface import IReporter
28from _core.plugin import Plugin
29from _core.constants import ModeType
30from _core.constants import TestType
31from _core.constants import FilePermission
32from _core.logger import platform_logger
33from _core.exception import ParamError
34from _core.utils import get_filename_extension
35from _core.report.encrypt import check_pub_key_exist
36from _core.report.encrypt import do_rsa_encrypt
37from _core.report.encrypt import get_file_summary
38from _core.report.reporter_helper import DataHelper
39from _core.report.reporter_helper import ExecInfo
40from _core.report.reporter_helper import VisionHelper
41from _core.report.reporter_helper import ReportConstant
42
43LOG = platform_logger("ResultReporter")
44
45
46@Plugin(type=Plugin.REPORTER, id=TestType.all)
47class ResultReporter(IReporter):
48    summary_report_result = []
49
50    def __init__(self):
51        self.report_path = None
52        self.task_info = None
53        self.summary_data_path = None
54        self.summary_data_str = ""
55        self.exec_info = None
56        self.parsed_data = None
57        self.data_helper = None
58        self.vision_helper = None
59
60    def __generate_reports__(self, report_path, **kwargs):
61        LOG.info("")
62        LOG.info("**************************************************")
63        LOG.info("************** Start generate reports ************")
64        LOG.info("**************************************************")
65        LOG.info("")
66
67        if self._check_params(report_path, **kwargs):
68            # generate data report
69            self._generate_data_report()
70
71            # generate vision reports
72            self._generate_vision_reports()
73
74            # generate task info record
75            self._generate_task_info_record()
76
77            # generate summary ini
78            self._generate_summary()
79
80            # copy reports to reports/latest folder
81            self._copy_report()
82
83            self._transact_all()
84
85        LOG.info("")
86        LOG.info("**************************************************")
87        LOG.info("************** Ended generate reports ************")
88        LOG.info("**************************************************")
89        LOG.info("")
90
91    def _check_params(self, report_path, **kwargs):
92        task_info = kwargs.get("task_info", "")
93        if not report_path:
94            LOG.error("Report path is wrong", error_no="00440",
95                      ReportPath=report_path)
96            return False
97        if not task_info or not isinstance(task_info, ExecInfo):
98            LOG.error("Task info is wrong", error_no="00441",
99                      TaskInfo=task_info)
100            return False
101
102        os.makedirs(report_path, exist_ok=True)
103        self.report_path = report_path
104        self.task_info = task_info
105        self.summary_data_path = os.path.join(
106            self.report_path, ReportConstant.summary_data_report)
107        self.exec_info = task_info
108        self.data_helper = DataHelper()
109        self.vision_helper = VisionHelper()
110        return True
111
112    def _generate_data_report(self):
113        # initial element
114        test_suites_element = self.data_helper.initial_suites_element()
115
116        # update test suites element
117        update_flag = self._update_test_suites(test_suites_element)
118        if not update_flag:
119            return
120
121        # generate report
122        if not self._check_mode(ModeType.decc):
123            self.data_helper.generate_report(test_suites_element,
124                                             self.summary_data_path)
125
126        # set SuiteReporter.suite_report_result
127        if not check_pub_key_exist() and not self._check_mode(
128                ModeType.decc):
129            return
130        self.set_summary_report_result(
131            self.summary_data_path, DataHelper.to_string(test_suites_element))
132
133    def _update_test_suites(self, test_suites_element):
134        # initial attributes for test suites element
135        test_suites_attributes, need_update_attributes = \
136            self._init_attributes()
137
138        # get test suite elements that are children of test suites element
139        modules = dict()
140        test_suite_elements = []
141        for data_report, module_name in self.data_reports:
142            if data_report.endswith(ReportConstant.summary_data_report):
143                continue
144            root = self.data_helper.parse_data_report(data_report)
145            if module_name == ReportConstant.empty_name:
146                module_name = self._get_module_name(data_report, root)
147            total = int(root.get(ReportConstant.tests, 0))
148            modules[module_name] = modules.get(module_name, 0) + total
149
150            self._append_product_info(test_suites_attributes, root)
151            for child in root:
152                child.tail = self.data_helper.LINE_BREAK_INDENT
153                if not child.get(ReportConstant.module_name) or child.get(
154                        ReportConstant.module_name) == \
155                        ReportConstant.empty_name:
156                    child.set(ReportConstant.module_name, module_name)
157                self._check_tests_and_unavailable(child)
158                # covert the status of "notrun" to "ignored"
159                for element in child:
160                    if element.get(ReportConstant.status, "") == \
161                            ReportConstant.not_run:
162                        ignored = int(child.get(ReportConstant.ignored, 0)) + 1
163                        child.set(ReportConstant.ignored, "%s" % ignored)
164                test_suite_elements.append(child)
165                for update_attribute in need_update_attributes:
166                    update_value = child.get(update_attribute, 0)
167                    if not update_value:
168                        update_value = 0
169                    test_suites_attributes[update_attribute] += int(
170                        update_value)
171
172        if test_suite_elements:
173            child = test_suite_elements[-1]
174            child.tail = self.data_helper.LINE_BREAK
175        else:
176            LOG.error("Execute result not exists")
177            return False
178
179        # set test suites element attributes and children
180        modules_zero = [module_name for module_name, total in modules.items()
181                        if total == 0]
182        if modules_zero:
183            LOG.info("The total tests of %s module is 0", ",".join(
184                modules_zero))
185        test_suites_attributes[ReportConstant.run_modules] = \
186            len(modules) - len(modules_zero)
187        test_suites_attributes[ReportConstant.modules] = len(modules)
188        self.data_helper.set_element_attributes(test_suites_element,
189                                                test_suites_attributes)
190        test_suites_element.extend(test_suite_elements)
191        return True
192
193    @classmethod
194    def _check_tests_and_unavailable(cls, child):
195        total = child.get(ReportConstant.tests, "0")
196        unavailable = child.get(ReportConstant.unavailable, "0")
197        if total and total != "0" and unavailable and \
198                unavailable != "0":
199            child.set(ReportConstant.unavailable, "0")
200            LOG.warning("%s total: %s, unavailable: %s", child.get(
201                ReportConstant.name), total, unavailable)
202
203    @classmethod
204    def _append_product_info(cls, test_suites_attributes, root):
205        product_info = root.get(ReportConstant.product_info, "")
206        if not product_info:
207            return
208        try:
209            product_info = literal_eval(str(product_info))
210        except SyntaxError as error:
211            LOG.error("%s %s", root.get(ReportConstant.name, ""), error.args)
212            product_info = {}
213
214        if not test_suites_attributes[ReportConstant.product_info]:
215            test_suites_attributes[ReportConstant.product_info] = \
216                product_info
217            return
218        for key, value in product_info.items():
219            exist_value = test_suites_attributes[
220                ReportConstant.product_info].get(key, "")
221
222            if not exist_value:
223                test_suites_attributes[
224                    ReportConstant.product_info][key] = value
225                continue
226            if value in exist_value:
227                continue
228            test_suites_attributes[ReportConstant.product_info][key] = \
229                "%s,%s" % (exist_value, value)
230
231    @classmethod
232    def _get_module_name(cls, data_report, root):
233        # get module name from data report
234        module_name = get_filename_extension(data_report)[0]
235        if "report" in module_name or "summary" in module_name or \
236                "<" in data_report or ">" in data_report:
237            module_name = root.get(ReportConstant.name,
238                                   ReportConstant.empty_name)
239            if "report" in module_name or "summary" in module_name:
240                module_name = ReportConstant.empty_name
241        return module_name
242
243    def _init_attributes(self):
244        test_suites_attributes = {
245            ReportConstant.name:
246                ReportConstant.summary_data_report.split(".")[0],
247            ReportConstant.start_time: self.task_info.test_time,
248            ReportConstant.end_time: time.strftime(ReportConstant.time_format,
249                                                   time.localtime()),
250            ReportConstant.errors: 0, ReportConstant.disabled: 0,
251            ReportConstant.failures: 0, ReportConstant.tests: 0,
252            ReportConstant.ignored: 0, ReportConstant.unavailable: 0,
253            ReportConstant.product_info: self.task_info.product_info,
254            ReportConstant.modules: 0, ReportConstant.run_modules: 0}
255        need_update_attributes = [ReportConstant.tests, ReportConstant.ignored,
256                                  ReportConstant.failures,
257                                  ReportConstant.disabled,
258                                  ReportConstant.errors,
259                                  ReportConstant.unavailable]
260        return test_suites_attributes, need_update_attributes
261
262    def _generate_vision_reports(self):
263        if not self._check_mode(ModeType.decc) and not \
264                self.summary_data_report_exist:
265            LOG.error("Summary data report not exists")
266            return
267
268        if check_pub_key_exist() or self._check_mode(ModeType.decc):
269            if not self.summary_report_result_exists():
270                LOG.error("Summary data report not exists")
271                return
272            self.summary_data_str = \
273                self.get_result_of_summary_report()
274            if check_pub_key_exist():
275                from xdevice import SuiteReporter
276                SuiteReporter.clear_report_result()
277
278        # parse data
279        if self.summary_data_str:
280            # only in decc mode and pub key, self.summary_data_str is not empty
281            summary_element_tree = self.data_helper.parse_data_report(
282                self.summary_data_str)
283        else:
284            summary_element_tree = self.data_helper.parse_data_report(
285                self.summary_data_path)
286        parsed_data = self.vision_helper.parse_element_data(
287            summary_element_tree, self.report_path, self.task_info)
288        self.parsed_data = parsed_data
289        self.exec_info, summary, _ = parsed_data
290
291        if self._check_mode(ModeType.decc):
292            return
293
294        LOG.info("Summary result: modules: %s, run modules: %s, total: "
295                 "%s, passed: %s, failed: %s, blocked: %s, ignored: %s, "
296                 "unavailable: %s", summary.modules, summary.run_modules,
297                 summary.result.total, summary.result.passed,
298                 summary.result.failed, summary.result.blocked,
299                 summary.result.ignored, summary.result.unavailable)
300        LOG.info("Log path: %s", self.exec_info.log_path)
301
302        # generate summary vision report
303        report_generate_flag = self._generate_vision_report(
304            parsed_data, ReportConstant.summary_title,
305            ReportConstant.summary_vision_report)
306
307        # generate details vision report
308        if report_generate_flag and summary.result.total > 0:
309            self._generate_vision_report(
310                parsed_data, ReportConstant.details_title,
311                ReportConstant.details_vision_report)
312
313        # generate failures vision report
314        if summary.result.total != (
315                summary.result.passed + summary.result.ignored) or \
316                summary.result.unavailable > 0:
317            self._generate_vision_report(
318                parsed_data, ReportConstant.failures_title,
319                ReportConstant.failures_vision_report)
320
321    def _generate_vision_report(self, parsed_data, title, render_target):
322
323        # render data
324        report_context = self.vision_helper.render_data(
325            title, parsed_data, render_target=render_target)
326
327        # generate report
328        if report_context:
329            report_path = os.path.join(self.report_path, render_target)
330            self.vision_helper.generate_report(report_path, report_context)
331            return True
332        else:
333            LOG.error("Failed to generate %s", render_target)
334            return False
335
336    @property
337    def summary_data_report_exist(self):
338        return "<" in self.summary_data_str or \
339               os.path.exists(self.summary_data_path)
340
341    @property
342    def data_reports(self):
343        if check_pub_key_exist() or self._check_mode(ModeType.decc):
344            from xdevice import SuiteReporter
345            suite_reports = SuiteReporter.get_report_result()
346            if self._check_mode(ModeType.decc):
347                LOG.debug("Handle history result, data reports length:{}".
348                          format(len(suite_reports)))
349                SuiteReporter.clear_history_result()
350                SuiteReporter.append_history_result(suite_reports)
351            data_reports = []
352            for report_path, report_result in suite_reports:
353                module_name = get_filename_extension(report_path)[0]
354                data_reports.append((report_result, module_name))
355            SuiteReporter.clear_report_result()
356            return data_reports
357
358        if not os.path.isdir(self.report_path):
359            return []
360        data_reports = []
361        result_path = os.path.join(self.report_path, "result")
362        for root, _, files in os.walk(self.report_path):
363            for file_name in files:
364                if not file_name.endswith(self.data_helper.DATA_REPORT_SUFFIX):
365                    continue
366                module_name = self._find_module_name(result_path, root)
367                data_reports.append((os.path.join(root, file_name),
368                                     module_name))
369        return data_reports
370
371    @classmethod
372    def _find_module_name(cls, result_path, root):
373        # find module name from directory tree
374        common_path = os.path.commonpath([result_path, root])
375        if os.path.normcase(result_path) != os.path.normcase(common_path) or \
376                os.path.normcase(result_path) == os.path.normcase(root):
377            return ReportConstant.empty_name
378
379        root_dir, module_name = os.path.split(root)
380        if os.path.normcase(result_path) == os.path.normcase(root_dir):
381            return ReportConstant.empty_name
382        root_dir, subsystem_name = os.path.split(root_dir)
383        while os.path.normcase(result_path) != os.path.normcase(root_dir):
384            module_name = subsystem_name
385            root_dir, subsystem_name = os.path.split(root_dir)
386        return module_name
387
388    def _generate_summary(self):
389        if not self.summary_data_report_exist or \
390                self._check_mode(ModeType.decc):
391            return
392        summary_ini_content = \
393            "[default]\n" \
394            "Platform=%s\n" \
395            "Test Type=%s\n" \
396            "Device Name=%s\n" \
397            "Host Info=%s\n" \
398            "Test Start/ End Time=%s\n" \
399            "Execution Time=%s\n" % (
400                self.exec_info.platform, self.exec_info.test_type,
401                self.exec_info.device_name, self.exec_info.host_info,
402                self.exec_info.test_time, self.exec_info.execute_time)
403        if self.exec_info.product_info:
404            for key, value in self.exec_info.product_info.items():
405                summary_ini_content = "{}{}".format(
406                    summary_ini_content, "%s=%s\n" % (key, value))
407
408        if not self._check_mode(ModeType.factory):
409            summary_ini_content = "{}{}".format(
410                summary_ini_content, "Log Path=%s\n" % self.exec_info.log_path)
411
412        # write summary_ini_content
413        summary_filepath = os.path.join(self.report_path,
414                                        ReportConstant.summary_ini)
415
416        if platform.system() == "Windows":
417            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY
418        else:
419            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
420        summary_filepath_open = os.open(summary_filepath, flags,
421                                        FilePermission.mode_755)
422
423        with os.fdopen(summary_filepath_open, "wb") as file_handler:
424            if check_pub_key_exist():
425                try:
426                    cipher_text = do_rsa_encrypt(summary_ini_content)
427                except ParamError as error:
428                    LOG.error(error, error_no=error.error_no)
429                    cipher_text = b""
430                file_handler.write(cipher_text)
431            else:
432                file_handler.write(bytes(summary_ini_content, 'utf-8'))
433            file_handler.flush()
434            LOG.info("Generate summary ini: %s", summary_filepath)
435
436    def _copy_report(self):
437        from xdevice import Scheduler
438        if Scheduler.upload_address or self._check_mode(ModeType.decc):
439            return
440
441        from xdevice import Variables
442        dst_path = os.path.join(Variables.temp_dir, "latest")
443        try:
444            shutil.rmtree(dst_path, ignore_errors=True)
445            os.makedirs(dst_path, exist_ok=True)
446            LOG.info("Copy summary files to %s", dst_path)
447
448            # copy reports to reports/latest folder
449            for report_file in os.listdir(self.report_path):
450                src_file = os.path.join(self.report_path, report_file)
451                dst_file = os.path.join(dst_path, report_file)
452                if os.path.isfile(src_file):
453                    shutil.copyfile(src_file, dst_file)
454        except OSError as _:
455            return
456
457    def _compress_report_folder(self):
458        if self._check_mode(ModeType.decc) or \
459                self._check_mode(ModeType.factory):
460            return
461
462        if not os.path.isdir(self.report_path):
463            LOG.error("'%s' is not folder!" % self.report_path)
464            return
465
466        # get file path list
467        file_path_list = []
468        for dir_path, _, file_names in os.walk(self.report_path):
469            f_path = dir_path.replace(self.report_path, '')
470            f_path = f_path and f_path + os.sep or ''
471            for filename in file_names:
472                file_path_list.append(
473                    (os.path.join(dir_path, filename), f_path + filename))
474
475        # compress file
476        zipped_file = "%s.zip" % os.path.join(
477            self.report_path, os.path.basename(self.report_path))
478        zip_object = zipfile.ZipFile(zipped_file, 'w', zipfile.ZIP_DEFLATED,
479                                     allowZip64=True)
480        try:
481            LOG.info("Executing compress process, please wait...")
482            long_size_file = []
483            for src_path, target_path in file_path_list:
484                long_size_file.append((src_path, target_path))
485            self._write_long_size_file(zip_object, long_size_file)
486
487            LOG.info("Generate zip file: %s", zipped_file)
488        except zipfile.BadZipFile as bad_error:
489            LOG.error("Zip report folder error: %s" % bad_error.args)
490        finally:
491            zip_object.close()
492
493        # generate hex digest, then save it to summary_report.hash
494        hash_file = os.path.abspath(os.path.join(
495            self.report_path, ReportConstant.summary_report_hash))
496        hash_file_open = os.open(hash_file, os.O_WRONLY | os.O_CREAT |
497                                 os.O_APPEND, FilePermission.mode_755)
498        with os.fdopen(hash_file_open, "w") as hash_file_handler:
499            hash_file_handler.write(get_file_summary(zipped_file))
500            LOG.info("Generate hash file: %s", hash_file)
501            hash_file_handler.flush()
502        return zipped_file
503
504    @classmethod
505    def _check_mode(cls, mode):
506        from xdevice import Scheduler
507        return Scheduler.mode == mode
508
509    def _generate_task_info_record(self):
510        # under encryption status, don't handle anything directly
511        if check_pub_key_exist() and not self._check_mode(ModeType.decc):
512            return
513
514        # get info from command_queue
515        from xdevice import Scheduler
516        if not Scheduler.command_queue:
517            return
518        _, command, report_path = Scheduler.command_queue[-1]
519
520        # handle parsed data
521        record = self._parse_record_from_data(command, report_path)
522
523        def encode(content):
524            # inner function to encode
525            return ' '.join([bin(ord(c)).replace('0b', '') for c in content])
526
527        # write into file
528        import json
529        record_file = os.path.join(self.report_path,
530                                   ReportConstant.task_info_record)
531        _record_json = json.dumps(record, indent=2)
532
533        with open(file=record_file, mode="wb") as file:
534            if Scheduler.mode == ModeType.decc:
535                # under decc, write in encoded text
536                file.write(bytes(encode(_record_json), encoding="utf-8"))
537            else:
538                # others, write in plain text
539                file.write(bytes(_record_json, encoding="utf-8"))
540
541        LOG.info("Generate record file: %s", record_file)
542
543    def _parse_record_from_data(self, command, report_path):
544        record = dict()
545        if self.parsed_data:
546            _, _, suites = self.parsed_data
547            unsuccessful = dict()
548            module_set = set()
549            for suite in suites:
550                module_set.add(suite.module_name)
551
552                failed = unsuccessful.get(suite.module_name, [])
553                # because suite not contains case's some attribute,
554                # for example, 'module', 'classname', 'name' . so
555                # if unavailable, only add module's name into list.
556                if int(suite.result.unavailable) > 0:
557                    failed.append(suite.module_name)
558                else:
559                    # others, get key attributes join string
560                    for case in suite.get_cases():
561                        if not case.is_passed():
562                            failed.append(
563                                "{}#{}".format(case.classname, case.name))
564                unsuccessful.update({suite.module_name: failed})
565            data_reports = self._get_data_reports(module_set)
566            record = {"command": command,
567                      "session_id": os.path.split(report_path)[-1],
568                      "report_path": report_path,
569                      "unsuccessful_params": unsuccessful,
570                      "data_reports": data_reports
571                      }
572        return record
573
574    def _get_data_reports(self, module_set):
575        data_reports = dict()
576        if self._check_mode(ModeType.decc):
577            from xdevice import SuiteReporter
578            for module_name, report_path, report_result in \
579                    SuiteReporter.get_history_result_list():
580                if module_name in module_set:
581                    data_reports.update({module_name: report_path})
582        else:
583            for report_path, module_name in self.data_reports:
584                if module_name == ReportConstant.empty_name:
585                    root = self.data_helper.parse_data_report(report_path)
586                    module_name = self._get_module_name(report_path, root)
587                if module_name in module_set:
588                    data_reports.update({module_name: report_path})
589
590        return data_reports
591
592    @classmethod
593    def get_task_info_params(cls, history_path):
594        # under encryption status, don't handle anything directly
595        if check_pub_key_exist() and not cls._check_mode(ModeType.decc):
596            return ()
597
598        def decode(content):
599            return ''.join([chr(i) for i in [int(b, 2) for b in
600                                             content.split(' ')]])
601
602        record_path = os.path.join(history_path,
603                                   ReportConstant.task_info_record)
604        if not os.path.exists(record_path):
605            LOG.error("%s not exists!", ReportConstant.task_info_record)
606            return ()
607
608        import json
609        from xdevice import Scheduler
610        with open(record_path, mode="rb") as file:
611            if Scheduler.mode == ModeType.decc:
612                # under decc, read from encoded text
613                result = json.loads(decode(file.read().decode("utf-8")))
614            else:
615                # others, read from plain text
616                result = json.loads(file.read())
617        standard_length = 5
618        if not len(result.keys()) == standard_length:
619            LOG.error("%s error!", ReportConstant.task_info_record)
620            return ()
621
622        return result
623
624    @classmethod
625    def set_summary_report_result(cls, summary_data_path, result_xml):
626        cls.summary_report_result.clear()
627        cls.summary_report_result.append((summary_data_path, result_xml))
628
629    @classmethod
630    def get_result_of_summary_report(cls):
631        if cls.summary_report_result:
632            return cls.summary_report_result[0][1]
633
634    @classmethod
635    def summary_report_result_exists(cls):
636        return True if cls.summary_report_result else False
637
638    @classmethod
639    def get_path_of_summary_report(cls):
640        if cls.summary_report_result:
641            return cls.summary_report_result[0][0]
642
643    @classmethod
644    def _write_long_size_file(cls, zip_object, long_size_file):
645        for filename, arcname in long_size_file:
646            zip_info = zipfile.ZipInfo.from_file(filename, arcname)
647            zip_info.compress_type = getattr(zip_object, "compression",
648                                             zipfile.ZIP_DEFLATED)
649            if hasattr(zip_info, "_compresslevel"):
650                _compress_level = getattr(zip_object, "compresslevel", None)
651                setattr(zip_info, "_compresslevel", _compress_level)
652            with open(filename, "rb") as src, \
653                    zip_object.open(zip_info, "w") as des:
654                shutil.copyfileobj(src, des, 1024 * 1024 * 8)
655
656    def _transact_all(self):
657        from xdevice import Variables
658        tools_dir = os.path.join(Variables.res_dir, "tools", "binder.pyc")
659        if not os.path.exists(tools_dir):
660            return
661        module_spec = util.spec_from_file_location(
662            "binder", tools_dir)
663        if not module_spec:
664            return
665        module = util.module_from_spec(module_spec)
666        module_spec.loader.exec_module(module)
667        if hasattr(module, "transact") and callable(module.transact):
668            module.transact(self, LOG)
669        del module
670