• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import platform
21import shutil
22import time
23import zipfile
24from ast import literal_eval
25
26from _core.interface import IReporter
27from _core.plugin import Plugin
28from _core.constants import ModeType
29from _core.constants import TestType
30from _core.logger import platform_logger
31from _core.exception import ParamError
32from _core.utils import get_filename_extension
33from _core.report.encrypt import check_pub_key_exist
34from _core.report.encrypt import do_rsa_encrypt
35from _core.report.encrypt import get_file_summary
36from _core.report.reporter_helper import DataHelper
37from _core.report.reporter_helper import ExecInfo
38from _core.report.reporter_helper import VisionHelper
39from _core.report.reporter_helper import ReportConstant
40
41LOG = platform_logger("ResultReporter")
42
43
44@Plugin(type=Plugin.REPORTER, id=TestType.all)
45class ResultReporter(IReporter):
46    summary_report_result = []
47
48    def __init__(self):
49        self.report_path = None
50        self.task_info = None
51        self.summary_data_path = None
52        self.summary_data_str = ""
53        self.exec_info = None
54        self.parsed_data = None
55        self.data_helper = None
56        self.vision_helper = None
57
58    def __generate_reports__(self, report_path, **kwargs):
59        LOG.info("")
60        LOG.info("**************************************************")
61        LOG.info("************** Start generate reports ************")
62        LOG.info("**************************************************")
63        LOG.info("")
64
65        if self._check_params(report_path, **kwargs):
66            # generate data report
67            self._generate_data_report()
68
69            # generate vision reports
70            self._generate_vision_reports()
71
72            # generate task info record
73            self._generate_task_info_record()
74
75            # generate summary ini
76            self._generate_summary()
77
78            # copy reports to reports/latest folder
79            self._copy_report()
80
81            # compress report folder
82            self._compress_report_folder()
83
84        LOG.info("")
85        LOG.info("**************************************************")
86        LOG.info("************** Ended generate reports ************")
87        LOG.info("**************************************************")
88        LOG.info("")
89
90    def _check_params(self, report_path, **kwargs):
91        task_info = kwargs.get("task_info", "")
92        if not report_path:
93            LOG.error("report path is wrong", error_no="00440",
94                      ReportPath=report_path)
95            return False
96        if not task_info or not isinstance(task_info, ExecInfo):
97            LOG.error("task info is wrong", error_no="00441",
98                      TaskInfo=task_info)
99            return False
100
101        os.makedirs(report_path, exist_ok=True)
102        self.report_path = report_path
103        self.task_info = task_info
104        self.summary_data_path = os.path.join(
105            self.report_path, ReportConstant.summary_data_report)
106        self.exec_info = task_info
107        self.data_helper = DataHelper()
108        self.vision_helper = VisionHelper()
109        return True
110
111    def _generate_data_report(self):
112        # initial element
113        test_suites_element = self.data_helper.initial_suites_element()
114
115        # update test suites element
116        update_flag = self._update_test_suites(test_suites_element)
117        if not update_flag:
118            return
119
120        # generate report
121        if not self._check_mode(ModeType.decc):
122            self.data_helper.generate_report(test_suites_element,
123                                             self.summary_data_path)
124
125        # set SuiteReporter.suite_report_result
126        if not check_pub_key_exist() and not self._check_mode(
127                ModeType.decc):
128            return
129        self.set_summary_report_result(
130            self.summary_data_path, DataHelper.to_string(test_suites_element))
131
132        if self._check_mode(ModeType.decc):
133            try:
134                from agent.decc import Handler
135                from xdevice import Scheduler
136                LOG.info("upload task summary result to decc")
137                Handler.upload_task_summary_results(
138                    self.get_result_of_summary_report())
139            except ModuleNotFoundError as error:
140                LOG.error("module not found %s", error.args)
141
142    def _update_test_suites(self, test_suites_element):
143        # initial attributes for test suites element
144        test_suites_attributes, need_update_attributes = \
145            self._init_attributes()
146
147        # get test suite elements that are children of test suites element
148        modules = dict()
149        test_suite_elements = []
150        for data_report, module_name in self.data_reports:
151            if data_report.endswith(ReportConstant.summary_data_report):
152                continue
153            root = self.data_helper.parse_data_report(data_report)
154            if module_name == ReportConstant.empty_name:
155                module_name = self._get_module_name(data_report, root)
156            total = int(root.get(ReportConstant.tests, 0))
157            modules[module_name] = modules.get(module_name, 0) + total
158
159            self._append_product_info(test_suites_attributes, root)
160            for child in root:
161                child.tail = self.data_helper.LINE_BREAK_INDENT
162                if not child.get(ReportConstant.module_name) or child.get(
163                        ReportConstant.module_name) == \
164                        ReportConstant.empty_name:
165                    child.set(ReportConstant.module_name, module_name)
166                self._check_tests_and_unavailable(child)
167                # covert "notrun" to "ignored" for the test case status
168                for element in child:
169                    if element.get(ReportConstant.status, "") == \
170                            ReportConstant.not_run:
171                        ignored = int(child.get(ReportConstant.ignored, 0)) + 1
172                        child.set(ReportConstant.ignored, "%s" % ignored)
173                test_suite_elements.append(child)
174                for update_attribute in need_update_attributes:
175                    update_value = child.get(update_attribute, 0)
176                    if not update_value:
177                        update_value = 0
178                    test_suites_attributes[update_attribute] += int(
179                        update_value)
180
181        if test_suite_elements:
182            child = test_suite_elements[-1]
183            child.tail = self.data_helper.LINE_BREAK
184        else:
185            LOG.error("execute result not exists")
186            return False
187
188        # set test suites element attributes and children
189        modules_zero = [module_name for module_name, total in modules.items()
190                        if total == 0]
191        if modules_zero:
192            LOG.info("the total tests of %s module is 0", ",".join(
193                modules_zero))
194        test_suites_attributes[ReportConstant.run_modules] = \
195            len(modules) - len(modules_zero)
196        test_suites_attributes[ReportConstant.modules] = len(modules)
197        self.data_helper.set_element_attributes(test_suites_element,
198                                                test_suites_attributes)
199        test_suites_element.extend(test_suite_elements)
200        return True
201
202    @classmethod
203    def _check_tests_and_unavailable(cls, child):
204        total = child.get(ReportConstant.tests, "0")
205        unavailable = child.get(ReportConstant.unavailable, "0")
206        if total and total != "0" and unavailable and \
207                unavailable != "0":
208            child.set(ReportConstant.unavailable, "0")
209            LOG.warning("%s total: %s, unavailable: %s", child.get(
210                ReportConstant.name), total, unavailable)
211
212    @classmethod
213    def _append_product_info(cls, test_suites_attributes, root):
214        product_info = root.get(ReportConstant.product_info, "")
215        if not product_info:
216            return
217        try:
218            product_info = literal_eval(str(product_info))
219        except SyntaxError as error:
220            LOG.error("%s %s", root.get(ReportConstant.name, ""), error.args)
221            product_info = {}
222
223        if not test_suites_attributes[ReportConstant.product_info]:
224            test_suites_attributes[ReportConstant.product_info] = \
225                product_info
226            return
227        for key, value in product_info.items():
228            exist_value = test_suites_attributes[
229                ReportConstant.product_info].get(key, "")
230
231            if not exist_value:
232                test_suites_attributes[
233                    ReportConstant.product_info][key] = value
234                continue
235            if value in exist_value:
236                continue
237            test_suites_attributes[ReportConstant.product_info][key] = \
238                "%s,%s" % (exist_value, value)
239
240    @classmethod
241    def _get_module_name(cls, data_report, root):
242        # get module name from data report
243        module_name = get_filename_extension(data_report)[0]
244        if "report" in module_name or "summary" in module_name or \
245                "<" in data_report or ">" in data_report:
246            module_name = root.get(ReportConstant.name,
247                                   ReportConstant.empty_name)
248            if "report" in module_name or "summary" in module_name:
249                module_name = ReportConstant.empty_name
250        return module_name
251
252    def _init_attributes(self):
253        test_suites_attributes = {
254            ReportConstant.name:
255                ReportConstant.summary_data_report.split(".")[0],
256            ReportConstant.start_time: self.task_info.test_time,
257            ReportConstant.end_time: time.strftime(ReportConstant.time_format,
258                                                   time.localtime()),
259            ReportConstant.errors: 0, ReportConstant.disabled: 0,
260            ReportConstant.failures: 0, ReportConstant.tests: 0,
261            ReportConstant.ignored: 0, ReportConstant.unavailable: 0,
262            ReportConstant.product_info: self.task_info.product_info,
263            ReportConstant.modules: 0, ReportConstant.run_modules: 0}
264        need_update_attributes = [ReportConstant.tests, ReportConstant.ignored,
265                                  ReportConstant.failures,
266                                  ReportConstant.disabled,
267                                  ReportConstant.errors,
268                                  ReportConstant.unavailable]
269        return test_suites_attributes, need_update_attributes
270
271    def _generate_vision_reports(self):
272        if not self._check_mode(ModeType.decc) and not \
273                self.summary_data_report_exist:
274            LOG.error("summary data report not exists")
275            return
276
277        if check_pub_key_exist() or self._check_mode(ModeType.decc):
278            if not self.summary_report_result_exists():
279                LOG.error("summary data report not exists")
280                return
281            self.summary_data_str = \
282                self.get_result_of_summary_report()
283            if check_pub_key_exist():
284                from xdevice import SuiteReporter
285                SuiteReporter.clear_report_result()
286
287        # parse data
288        if self.summary_data_str:
289            # only in decc mode and pub key, self.summary_data_str is not empty
290            summary_element_tree = self.data_helper.parse_data_report(
291                self.summary_data_str)
292        else:
293            summary_element_tree = self.data_helper.parse_data_report(
294                self.summary_data_path)
295        parsed_data = self.vision_helper.parse_element_data(
296            summary_element_tree, self.report_path, self.task_info)
297        self.parsed_data = parsed_data
298        self.exec_info, summary, _ = parsed_data
299
300        if self._check_mode(ModeType.decc):
301            return
302
303        LOG.info("Summary result: modules: %s, run modules: %s, total: "
304                 "%s, passed: %s, failed: %s, blocked: %s, ignored: %s, "
305                 "unavailable: %s", summary.modules, summary.run_modules,
306                 summary.result.total, summary.result.passed,
307                 summary.result.failed, summary.result.blocked,
308                 summary.result.ignored, summary.result.unavailable)
309        LOG.info("Log path: %s", self.exec_info.log_path)
310
311        # generate summary vision report
312        report_generate_flag = self._generate_vision_report(
313            parsed_data, ReportConstant.summary_title,
314            ReportConstant.summary_vision_report)
315
316        # generate details vision report
317        if report_generate_flag and summary.result.total > 0:
318            self._generate_vision_report(
319                parsed_data, ReportConstant.details_title,
320                ReportConstant.details_vision_report)
321
322        # generate failures vision report
323        if summary.result.total != (
324                summary.result.passed + summary.result.ignored) or \
325                summary.result.unavailable > 0:
326            self._generate_vision_report(
327                parsed_data, ReportConstant.failures_title,
328                ReportConstant.failures_vision_report)
329
330    def _generate_vision_report(self, parsed_data, title, render_target):
331
332        # render data
333        report_context = self.vision_helper.render_data(
334            title, parsed_data, render_target=render_target)
335
336        # generate report
337        if report_context:
338            report_path = os.path.join(self.report_path, render_target)
339            self.vision_helper.generate_report(report_path, report_context)
340            return True
341        else:
342            LOG.error("Failed to generate %s", render_target)
343            return False
344
345    @property
346    def summary_data_report_exist(self):
347        return "<" in self.summary_data_str or \
348               os.path.exists(self.summary_data_path)
349
350    @property
351    def data_reports(self):
352        if check_pub_key_exist() or self._check_mode(ModeType.decc):
353            from xdevice import SuiteReporter
354            suite_reports = SuiteReporter.get_report_result()
355            if self._check_mode(ModeType.decc):
356                LOG.debug("handle history result, data_reports length:{}".
357                          format(len(suite_reports)))
358                SuiteReporter.clear_history_result()
359                SuiteReporter.append_history_result(suite_reports)
360            data_reports = []
361            for report_path, report_result in suite_reports:
362                module_name = get_filename_extension(report_path)[0]
363                data_reports.append((report_result, module_name))
364            SuiteReporter.clear_report_result()
365            return data_reports
366
367        if not os.path.isdir(self.report_path):
368            return []
369        data_reports = []
370        result_path = os.path.join(self.report_path, "result")
371        for root, _, files in os.walk(self.report_path):
372            for file_name in files:
373                if not file_name.endswith(self.data_helper.DATA_REPORT_SUFFIX):
374                    continue
375                module_name = self._find_module_name(result_path, root)
376                data_reports.append((os.path.join(root, file_name),
377                                     module_name))
378        return data_reports
379
380    @classmethod
381    def _find_module_name(cls, result_path, root):
382        # find module name from directory tree
383        common_path = os.path.commonpath([result_path, root])
384        if os.path.normcase(result_path) != os.path.normcase(common_path) or \
385                os.path.normcase(result_path) == os.path.normcase(root):
386            return ReportConstant.empty_name
387
388        root_dir, module_name = os.path.split(root)
389        if os.path.normcase(result_path) == os.path.normcase(root_dir):
390            return ReportConstant.empty_name
391        root_dir, subsystem_name = os.path.split(root_dir)
392        while os.path.normcase(result_path) != os.path.normcase(root_dir):
393            module_name = subsystem_name
394            root_dir, subsystem_name = os.path.split(root_dir)
395        return module_name
396
397    def _generate_summary(self):
398        if not self.summary_data_report_exist or \
399                self._check_mode(ModeType.decc):
400            return
401        summary_ini_content = \
402            "[default]\n" \
403            "Platform=%s\n" \
404            "Test Type=%s\n" \
405            "Device Name=%s\n" \
406            "Host Info=%s\n" \
407            "Test Start/ End Time=%s\n" \
408            "Execution Time=%s\n" % (
409                self.exec_info.platform, self.exec_info.test_type,
410                self.exec_info.device_name, self.exec_info.host_info,
411                self.exec_info.test_time, self.exec_info.execute_time)
412        if self.exec_info.product_info:
413            for key, value in self.exec_info.product_info.items():
414                summary_ini_content = "{}{}".format(
415                    summary_ini_content, "%s=%s\n" % (key, value))
416
417        if not self._check_mode(ModeType.factory):
418            summary_ini_content = "{}{}".format(
419                summary_ini_content, "Log Path=%s\n" % self.exec_info.log_path)
420
421        # write summary_ini_content
422        summary_filepath = os.path.join(self.report_path,
423                                        ReportConstant.summary_ini)
424
425        if platform.system() == "Windows":
426            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY
427        else:
428            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
429        summary_filepath_open = os.open(summary_filepath, flags, 0o755)
430
431        with os.fdopen(summary_filepath_open, "wb") as file_handler:
432            if check_pub_key_exist():
433                try:
434                    cipher_text = do_rsa_encrypt(summary_ini_content)
435                except ParamError as error:
436                    LOG.error(error, error_no=error.error_no)
437                    cipher_text = b""
438                file_handler.write(cipher_text)
439            else:
440                file_handler.write(bytes(summary_ini_content, 'utf-8'))
441            file_handler.flush()
442            LOG.info("generate summary ini: %s", summary_filepath)
443
444    def _copy_report(self):
445        from xdevice import Scheduler
446        if Scheduler.upload_address or self._check_mode(ModeType.decc):
447            return
448
449        from xdevice import Variables
450        dst_path = os.path.join(Variables.exec_dir,
451                                Variables.report_vars.report_dir, "latest")
452        try:
453            shutil.rmtree(dst_path, ignore_errors=True)
454            os.makedirs(dst_path, exist_ok=True)
455            LOG.info("copy summary files to %s", dst_path)
456
457            # copy reports to reports/latest folder
458            for report_file in os.listdir(self.report_path):
459                src_file = os.path.join(self.report_path, report_file)
460                dst_file = os.path.join(dst_path, report_file)
461                if os.path.isfile(src_file):
462                    shutil.copyfile(src_file, dst_file)
463        except OSError:
464            return
465
466    def _compress_report_folder(self):
467        if self._check_mode(ModeType.decc):
468            return
469
470        if not os.path.isdir(self.report_path):
471            LOG.error("'%s' is not folder!" % self.report_path)
472            return
473
474        # get file path list
475        file_path_list = []
476        for dir_path, _, file_names in os.walk(self.report_path):
477            f_path = dir_path.replace(self.report_path, '')
478            f_path = f_path and f_path + os.sep or ''
479            for filename in file_names:
480                file_path_list.append(
481                    (os.path.join(dir_path, filename), f_path + filename))
482
483        # compress file
484        zipped_file = "%s.zip" % os.path.join(
485            self.report_path, os.path.basename(self.report_path))
486        zip_object = zipfile.ZipFile(zipped_file, 'w', zipfile.ZIP_DEFLATED,
487                                     allowZip64=True)
488        try:
489            LOG.info("executing compress process, please wait...")
490            long_size_file = []
491            for src_path, target_path in file_path_list:
492                long_size_file.append((src_path, target_path))
493            self._write_long_size_file(zip_object, long_size_file)
494            LOG.info("generate zip file: %s", zipped_file)
495        except zipfile.BadZipFile as bad_error:
496            LOG.error("zip report folder error: %s" % bad_error.args)
497        finally:
498            zip_object.close()
499
500        # generate hex digest, then save it to summary_report.hash
501        hash_file = os.path.abspath(os.path.join(
502            self.report_path, ReportConstant.summary_report_hash))
503        hash_file_open = os.open(hash_file,
504                                 os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o755)
505        with os.fdopen(hash_file_open, "w") as hash_file_handler:
506            hash_file_handler.write(get_file_summary(zipped_file))
507            LOG.info("generate hash file: %s", hash_file)
508            hash_file_handler.flush()
509        return zipped_file
510
511    @classmethod
512    def _check_mode(cls, mode):
513        from xdevice import Scheduler
514        return Scheduler.mode == mode
515
516    def _generate_task_info_record(self):
517        # under encryption status, don't handle anything directly
518        if check_pub_key_exist() and not self._check_mode(ModeType.decc):
519            return
520
521        # get info from command_queue
522        from xdevice import Scheduler
523        if not Scheduler.command_queue:
524            return
525        _, command, report_path = Scheduler.command_queue[-1]
526
527        # handle parsed data
528        record = self._parse_record_from_data(command, report_path)
529
530        def encode(content):
531            # inner function to encode
532            return ' '.join([bin(ord(c)).replace('0b', '') for c in content])
533
534        # write into file
535        import json
536        record_file = os.path.join(self.report_path,
537                                   ReportConstant.task_info_record)
538        _record_json = json.dumps(record, indent=2)
539
540        with open(file=record_file, mode="wb") as file:
541            if Scheduler.mode == ModeType.decc:
542                # under decc, write in encoded text
543                file.write(bytes(encode(_record_json), encoding="utf-8"))
544            else:
545                # others, write in plain text
546                file.write(bytes(_record_json, encoding="utf-8"))
547
548        LOG.info("generate record file: %s", record_file)
549
550    def _parse_record_from_data(self, command, report_path):
551        record = dict()
552        if self.parsed_data:
553            _, _, suites = self.parsed_data
554            unsuccessful = dict()
555            module_set = set()
556            for suite in suites:
557                module_set.add(suite.module_name)
558
559                failed = unsuccessful.get(suite.module_name, [])
560                # because suite not contains case's some attribute,
561                # for example, 'module', 'classname', 'name' . so
562                # if unavailable, only add module's name into list.
563                if int(suite.result.unavailable) > 0:
564                    failed.append(suite.module_name)
565                else:
566                    # others, get key attributes join string
567                    for case in suite.get_cases():
568                        if not case.is_passed():
569                            failed.append(
570                                "{}#{}".format(case.classname, case.name))
571                unsuccessful.update({suite.module_name: failed})
572            data_reports = self._get_data_reports(module_set)
573            record = {"command": command,
574                      "session_id": os.path.split(report_path)[-1],
575                      "report_path": report_path,
576                      "unsuccessful_params": unsuccessful,
577                      "data_reports": data_reports
578                      }
579        return record
580
581    def _get_data_reports(self, module_set):
582        data_reports = dict()
583        if self._check_mode(ModeType.decc):
584            from xdevice import SuiteReporter
585            for module_name, report_path, report_result in \
586                    SuiteReporter.get_history_result_list():
587                if module_name in module_set:
588                    data_reports.update({module_name: report_path})
589        else:
590            for report_path, module_name in self.data_reports:
591                if module_name == ReportConstant.empty_name:
592                    root = self.data_helper.parse_data_report(report_path)
593                    module_name = self._get_module_name(report_path, root)
594                if module_name in module_set:
595                    data_reports.update({module_name: report_path})
596
597        return data_reports
598
599    @classmethod
600    def get_task_info_params(cls, history_path):
601        # under encryption status, don't handle anything directly
602        if check_pub_key_exist() and not cls._check_mode(ModeType.decc):
603            return ()
604
605        def decode(content):
606            return ''.join([chr(i) for i in [int(b, 2) for b in
607                                             content.split(' ')]])
608
609        record_path = os.path.join(history_path,
610                                   ReportConstant.task_info_record)
611        if not os.path.exists(record_path):
612            LOG.error("%s not exists!", ReportConstant.task_info_record)
613            return ()
614
615        import json
616        from xdevice import Scheduler
617        with open(record_path, mode="rb") as file:
618            if Scheduler.mode == ModeType.decc:
619                # under decc, read from encoded text
620                result = json.loads(decode(file.read().decode("utf-8")))
621            else:
622                # others, read from plain text
623                result = json.loads(file.read())
624        if not len(result.keys()) == 5:
625            LOG.error("%s error!", ReportConstant.task_info_record)
626            return ()
627
628        return result["session_id"], result["command"], result["report_path"],\
629               result["unsuccessful_params"], result["data_reports"]
630
631    @classmethod
632    def set_summary_report_result(cls, summary_data_path, result_xml):
633        cls.summary_report_result.clear()
634        cls.summary_report_result.append((summary_data_path, result_xml))
635
636    @classmethod
637    def get_result_of_summary_report(cls):
638        if cls.summary_report_result:
639            return cls.summary_report_result[0][1]
640
641    @classmethod
642    def summary_report_result_exists(cls):
643        return True if cls.summary_report_result else False
644
645    @classmethod
646    def get_path_of_summary_report(cls):
647        if cls.summary_report_result:
648            return cls.summary_report_result[0][0]
649
650    @classmethod
651    def _write_long_size_file(cls, zip_object, long_size_file):
652        for filename, arcname in long_size_file:
653            zip_info = zipfile.ZipInfo.from_file(filename, arcname)
654            zip_info.compress_type = getattr(zip_object, "compression",
655                                             zipfile.ZIP_DEFLATED)
656            if hasattr(zip_info, "_compresslevel"):
657                _compress_level = getattr(zip_object, "compresslevel", None)
658                setattr(zip_info, "_compresslevel", _compress_level)
659            with open(filename, "rb") as src, \
660                    zip_object.open(zip_info, "w") as des:
661                shutil.copyfileobj(src, des, 1024 * 1024 * 8)
662