• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import collections
20import copy
21import json
22import os
23import platform
24import re
25import shutil
26import time
27import stat
28from importlib import util
29from xml.etree import ElementTree
30
31from _core.interface import IReporter
32from _core.plugin import Plugin
33from _core.constants import CaseResult
34from _core.constants import DeviceProperties
35from _core.constants import ModeType
36from _core.constants import TestType
37from _core.constants import FilePermission
38from _core.logger import platform_logger
39from _core.exception import ParamError
40from _core.utils import calculate_elapsed_time
41from _core.utils import calculate_percent
42from _core.utils import copy_folder
43from _core.utils import get_filename_extension
44from _core.utils import parse_xml_cdata
45from _core.utils import show_current_environment
46from _core.report.encrypt import check_pub_key_exist
47from _core.report.encrypt import do_rsa_encrypt
48from _core.report.reporter_helper import DataHelper
49from _core.report.reporter_helper import ExecInfo
50from _core.report.reporter_helper import ReportConstant
51from _core.report.repeater_helper import RepeatHelper
52from _core.variables import Variables
53from _core.context.center import Context
54from _core.context.handler import get_case_result
55from _core.context.upload import Uploader
56
57LOG = platform_logger("ResultReporter")
58
59
60class ResultSummary:
61
62    def __init__(self):
63        self.modules = 0
64        self.repeat = 1
65        self.runmodules = 0
66        self.tests = 0
67        self.passed = 0
68        self.failed = 0
69        self.blocked = 0
70        self.ignored = 0
71        self.unavailable = 0
72        self.devices = []
73        self.__module_list = []
74
75    def get_data(self):
76        self.__module_list.clear()
77        LOG.info(f"Test Summary: modules: {self.modules}, repeat: {self.repeat}, run modules: {self.runmodules}, "
78                 f"total: {self.tests}, passed: {self.passed}, failed: {self.failed}, "
79                 f"blocked: {self.blocked}, ignored: {self.ignored}, unavailable: {self.unavailable}")
80        data = {
81            "modules": self.modules,
82            "repeat": self.repeat,
83            "runmodules": self.runmodules,
84            "tests": self.tests,
85            "passed": self.passed,
86            "failed": self.failed,
87            "blocked": self.blocked,
88            "ignored": self.ignored,
89            "unavailable": self.unavailable
90        }
91        return data
92
93    def get_devices(self):
94        return self.devices
95
96    def add_module(self, name):
97        if name not in self.__module_list:
98            self.__module_list.append(name)
99            self.modules += 1
100
101
102@Plugin(type=Plugin.REPORTER, id=TestType.all)
103class ResultReporter(IReporter):
104
105    def __init__(self):
106        self.report_path = None
107        self.task_info = None
108        self.summary_data_path = None
109        self.summary_data_str = ""
110        self.exec_info = None
111        self.data_helper = None
112        self.repeat_helper = None
113        self.summary = ResultSummary()
114
115        # task_record.info数据
116        self._failed_cases = []
117        self.record_params = {}
118        self.record_reports = {}
119
120    def __generate_reports__(self, report_path, **kwargs):
121        LOG.info("")
122        LOG.info("**************************************************")
123        LOG.info("************** Start generate reports ************")
124        LOG.info("**************************************************")
125        LOG.info("")
126
127        if self._check_params(report_path, **kwargs):
128            # generate data report
129            self._generate_data_report()
130
131            # generate vision reports
132            self._generate_test_report()
133
134            # generate task info record
135            self._generate_task_record()
136
137            # generate summary ini
138            self._generate_summary()
139
140            # copy reports to reports/latest folder
141            self._copy_report()
142
143            self._transact_all()
144
145        LOG.info("")
146        LOG.info("**************************************************")
147        LOG.info("************** Ended generate reports ************")
148        LOG.info("**************************************************")
149        LOG.info("")
150        show_current_environment()
151
152    def _check_params(self, report_path, **kwargs):
153        task_info = kwargs.get("task_info", "")
154        if not report_path:
155            LOG.error("Report path is wrong", error_no="00440",
156                      ReportPath=report_path)
157            return False
158        if not task_info or not isinstance(task_info, ExecInfo):
159            LOG.error("Task info is wrong", error_no="00441",
160                      TaskInfo=task_info)
161            return False
162
163        os.makedirs(report_path, exist_ok=True)
164        self.report_path = report_path
165        self.task_info = task_info
166        self.summary_data_path = os.path.join(
167            self.report_path, ReportConstant.summary_data_report)
168        self.exec_info = task_info
169        self.data_helper = DataHelper()
170        self.repeat_helper = RepeatHelper(report_path)
171        return True
172
173    def _generate_test_report(self):
174        report_template = os.path.join(Variables.res_dir, "template")
175        missing_files = []
176        for source in ReportConstant.new_template_sources:
177            file = source.get("file")
178            to_path = os.path.join(report_template, file)
179            if not os.path.exists(to_path) or os.path.getsize(to_path) == 0:
180                missing_files.append(to_path)
181        # 若新报告模板文件缺失,则使用旧报告模板生成测试报告
182        if len(missing_files) > 0:
183            self._generate_vision_reports()
184            return
185
186        copy_folder(report_template, self.report_path)
187        temp_file_report_html = os.path.join(self.report_path, "report.html")
188        if os.path.exists(temp_file_report_html):
189            os.remove(temp_file_report_html)
190        content = json.dumps(self._get_summary_data(), separators=(",", ":"))
191        data_js = os.path.join(self.report_path, "static", "data.js")
192        data_fd = os.open(data_js, os.O_CREAT | os.O_WRONLY, FilePermission.mode_644)
193        with os.fdopen(data_fd, mode="w", encoding="utf-8") as jsf:
194            jsf.write(f"window.reportData = {content}")
195        test_report = os.path.join(self.report_path, ReportConstant.summary_vision_report).replace("\\", "/")
196        LOG.info(f"Log path: {self.report_path}")
197        LOG.info(f"Generate test report: file:///{test_report}")
198        # 重新生成对象,避免在retry场景数据统计有误
199        self.summary = ResultSummary()
200
201    def _get_summary_data(self):
202        self.summary.repeat = self.task_info.repeat
203        modules = []
204        for data_report, _ in self.data_reports:
205            if data_report.endswith(ReportConstant.summary_data_report):
206                continue
207            info = self._parse_module(data_report)
208            if info is not None:
209                modules.append(info)
210        if self.summary.failed != 0 or self.summary.blocked != 0 or self.summary.unavailable != 0:
211            if Context.get_scheduler():
212                Context.get_scheduler().set_need_auto_retry(True)
213        data = {
214            "exec_info": self._get_exec_info(),
215            "summary": self.summary.get_data(),
216            "devices": self.summary.get_devices(),
217            "modules": modules,
218        }
219        return data
220
221    def _get_exec_info(self):
222        start_time = self.task_info.test_time
223        end_time = time.strftime(ReportConstant.time_format, time.localtime())
224        test_time = "%s/ %s" % (start_time, end_time)
225        execute_time = calculate_elapsed_time(
226            time.mktime(time.strptime(start_time, ReportConstant.time_format)),
227            time.mktime(time.strptime(end_time, ReportConstant.time_format)))
228        host_info = platform.platform()
229        device_name = getattr(self.task_info, ReportConstant.device_name, "None")
230        device_type = getattr(self.task_info, ReportConstant.device_label, "None")
231        platform_info = getattr(self.task_info, ReportConstant.platform, "None")
232        test_type = getattr(self.task_info, ReportConstant.test_type, "None")
233
234        # 为报告文件summary.ini提供数据
235        exec_info = ExecInfo()
236        exec_info.device_name = device_name
237        exec_info.device_label = device_type
238        exec_info.execute_time = execute_time
239        exec_info.host_info = host_info
240        exec_info.log_path = self.report_path
241        exec_info.platform = platform_info
242        exec_info.test_time = test_time
243        exec_info.test_type = test_type
244        self.exec_info = exec_info
245
246        info = {
247            "test_start": start_time,
248            "test_end": end_time,
249            "execute_time": execute_time,
250            "test_type": test_type,
251            "host_info": host_info,
252            "logs": self._get_task_log()
253        }
254        return info
255
256    def _parse_module(self, xml_file):
257        """解析测试模块"""
258        file_name = os.path.basename(xml_file)
259        try:
260            xml_file_open = os.open(xml_file, os.O_RDWR, stat.S_IWUSR | stat.S_IRUSR)
261            with os.fdopen(xml_file_open, mode="r", encoding="utf-8") as file_handler:
262                xml_str = file_handler.read()
263            for char_index in range(32):
264                if char_index in [10, 13]:
265                    continue
266                xml_str = xml_str.replace(chr(char_index), "")
267            ele_module = ElementTree.fromstring(xml_str)
268        except ElementTree.ParseError as e:
269            LOG.error(f"parse result xml error! xml file {xml_file}")
270            LOG.error(f"error message: {e}")
271            return None
272        module = ResultReporter._count_result(ele_module)
273        # 当模块名为空或为AllTests,将模块名设为结果xml的文件名
274        module_name = file_name[:-4] if module.name in ["", "AllTests"] else module.name.strip()
275        suites = [self._parse_testsuite(ele_suite) for ele_suite in ele_module]
276
277        # 为报告文件task_record.info提供数据
278        self.record_reports.update({module_name: xml_file})
279        if len(self._failed_cases) != 0:
280            self.record_params.update({module_name: copy.copy(self._failed_cases)})
281            self._failed_cases.clear()
282
283        self.summary.add_module(module_name)
284        self.summary.tests += module.tests
285        self.summary.passed += module.passed
286        self.summary.failed += module.failed
287        self.summary.blocked += module.blocked
288        self.summary.ignored += module.ignored
289        if module.unavailable == 0:
290            self.summary.runmodules += 1
291        else:
292            self.summary.unavailable += 1
293        devices = self._parse_devices(ele_module)
294
295        module_report, module_time = module.report, module.time
296        if len(suites) == 1 and suites[0].get(ReportConstant.name) == module_name:
297            report = suites[0].get(ReportConstant.report)
298            if report != "":
299                module_report = report
300            module_time = suites[0].get(ReportConstant.time)
301        repeat = int(ele_module.get(ReportConstant.repeat, "1"))
302        if self.summary.repeat < repeat:
303            self.summary.repeat = repeat
304        repeat_round = int(ele_module.get(ReportConstant.round, "1"))
305        test_type = ele_module.get(ReportConstant.test_type, "-")
306        test_start = ele_module.get(ReportConstant.start_time, "-")
307        test_end = ele_module.get(ReportConstant.end_time, "-")
308        if test_start != "-" and test_end != "-":
309            execute_time = calculate_elapsed_time(
310                time.mktime(time.strptime(test_start, ReportConstant.time_format)),
311                time.mktime(time.strptime(test_end, ReportConstant.time_format)))
312        else:
313            execute_time = calculate_elapsed_time(0, module_time)
314        info = {
315            "name": module_name,
316            "report": module_report,
317            "round": repeat_round,
318            "test_type": test_type,
319            "test_start": test_start,
320            "test_end": test_end,
321            "time": module_time,
322            "execute_time": execute_time,
323            "tests": module.tests,
324            "passed": module.passed,
325            "failed": module.failed,
326            "blocked": module.blocked,
327            "ignored": module.ignored,
328            "unavailable": module.unavailable,
329            "passingrate": calculate_percent(module.passed, module.tests),
330            "error": ele_module.get(ReportConstant.message, ""),
331            "logs": self._get_module_logs(module_name, repeat=repeat, repeat_round=repeat_round),
332            "devices": devices,
333            "suites": suites
334        }
335        return info
336
337    def _parse_testsuite(self, ele_suite):
338        """解析测试套"""
339        suite = ResultReporter._count_result(ele_suite)
340        cases = [self._parse_testcase(case) for case in ele_suite]
341        info = {
342            "name": suite.name,
343            "report": suite.report,
344            "time": suite.time,
345            "tests": suite.tests,
346            "passed": suite.passed,
347            "failed": suite.failed,
348            "blocked": suite.blocked,
349            "ignored": suite.ignored,
350            "passingrate": calculate_percent(suite.passed, suite.tests),
351            "cases": cases
352        }
353        return info
354
355    def _parse_testcase(self, ele_case):
356        """解析测试用例"""
357        name = ele_case.get(ReportConstant.name)
358        class_name = ele_case.get(ReportConstant.class_name, "")
359        result, error = get_case_result(ele_case)
360        if result != CaseResult.passed:
361            self._failed_cases.append(f"{class_name}#{name}")
362        return [name, class_name, result, ResultReporter._parse_time(ele_case),
363                error, ele_case.get(ReportConstant.report, "")]
364
365    @staticmethod
366    def _parse_time(ele):
367        try:
368            _time = float(ele.get(ReportConstant.time, "0"))
369        except ValueError:
370            _time = 0.0
371            LOG.error("parse test time error, set it to 0.0")
372        return _time
373
374    def _parse_devices(self, ele_module):
375        devices_str = ele_module.get(ReportConstant.devices, "")
376        if devices_str == "":
377            return []
378        try:
379            devices = json.loads(parse_xml_cdata(devices_str))
380        except Exception as e:
381            LOG.warning(f"parse devices from xml failed, {e}")
382            return []
383        # 汇总测试设备信息
384        for device in devices:
385            device_sn = device.get(DeviceProperties.sn, "")
386            temp = [d for d in self.summary.get_devices() if d.get(DeviceProperties.sn, "") == device_sn]
387            if len(temp) != 0:
388                continue
389            self.summary.get_devices().append(device)
390        return devices
391
392    @staticmethod
393    def _count_result(ele):
394        name = ele.get(ReportConstant.name, "")
395        report = ele.get(ReportConstant.report, "")
396        _time = ResultReporter._parse_time(ele)
397        tests = int(ele.get(ReportConstant.tests, "0"))
398        failed = int(ele.get(ReportConstant.failures, "0"))
399        disabled = ele.get(ReportConstant.disabled, "0")
400        if disabled == "":
401            disabled = "0"
402        errors = ele.get(ReportConstant.errors, "0")
403        if errors == "":
404            errors = "0"
405        blocked = int(disabled) + int(errors)
406        ignored = int(ele.get(ReportConstant.ignored, "0"))
407        unavailable = int(ele.get(ReportConstant.unavailable, "0"))
408
409        tmp_pass = tests - failed - blocked - ignored
410        passed = tmp_pass if tmp_pass > 0 else 0
411
412        Result = collections.namedtuple(
413            'Result',
414            ['name', 'report', 'time', 'tests', 'passed', 'failed', 'blocked', 'ignored', 'unavailable'])
415        return Result(name, report, _time, tests, passed, failed, blocked, ignored, unavailable)
416
417    def _get_module_logs(self, module_name, repeat=1, repeat_round=1):
418        """获取模块运行日志和设备日志
419        注:黑盒用例的测试报告是单独生成的,而xts只有模块级的设备日志,无用例级日志,故本方法仅支持获取模块级的设备日志
420        """
421        device_log = {}
422        round_folder = f"round{repeat_round}" if repeat > 1 else ""
423        log_path = os.path.join(self.report_path, "log", round_folder, module_name)
424        if not os.path.exists(log_path):
425            return device_log
426        module_log_uri = f"log/{round_folder}/{module_name}" if round_folder else f"log/{module_name}"
427        for filename in os.listdir(log_path):
428            file_link = f"{module_log_uri}/{filename}"
429            file_path = os.path.join(log_path, filename)
430            # 目录和模块运行日志,都提供链接
431            if os.path.isdir(file_path) or filename.startswith(ReportConstant.module_run_log):
432                device_log.setdefault(filename, file_link)
433                continue
434            # 是文件,仅提供模块级的设备日志链接
435            # 测试套日志命名格式device_log_sn.log、测试套子用例日志命名格式device_log_case_sn.log,后者“_”大于2
436            ret = re.fullmatch(r'(device_(?:hi)?log)_\S+\.log', filename)
437            if ret is None or filename.count("_") > 2:
438                continue
439            device_log.setdefault(ret.group(1), file_link)
440        return device_log
441
442    def _get_task_log(self):
443        log_path = os.path.join(self.report_path, "log")
444        if not os.path.exists(log_path):
445            return {}
446        return {f: f"log/{f}" for f in os.listdir(log_path) if f.startswith(ReportConstant.task_run_log)}
447
448    def _generate_data_report(self):
449        # initial element
450        test_suites_element = self.data_helper.initial_suites_element()
451
452        # update test suites element
453        update_flag = self._update_test_suites(test_suites_element)
454        if not update_flag:
455            return
456
457        # generate report
458        if not self._check_mode(ModeType.decc):
459            self.data_helper.generate_report(test_suites_element,
460                                             self.summary_data_path)
461
462    def _update_test_suites(self, test_suites_element):
463        # initial attributes for test suites element
464        test_suites_attributes, need_update_attributes = \
465            self._init_attributes()
466
467        # get test suite elements that are children of test suites element
468        modules = dict()
469        test_suite_elements = []
470        for data_report, module_name in self.data_reports:
471            if data_report.endswith(ReportConstant.summary_data_report):
472                continue
473            root = self.data_helper.parse_data_report(data_report)
474            self._parse_devices(root)
475            if module_name == ReportConstant.empty_name:
476                module_name = self._get_module_name(data_report, root)
477            total = int(root.get(ReportConstant.tests, 0))
478            if module_name not in modules.keys():
479                modules[module_name] = list()
480            modules[module_name].append(total)
481
482            for child in root:
483                child.tail = self.data_helper.LINE_BREAK_INDENT
484                if not child.get(ReportConstant.module_name) or child.get(
485                        ReportConstant.module_name) == \
486                        ReportConstant.empty_name:
487                    child.set(ReportConstant.module_name, module_name)
488                self._check_tests_and_unavailable(child)
489                # covert the status of "notrun" to "ignored"
490                for element in child:
491                    if element.get(ReportConstant.status, "") == \
492                            ReportConstant.not_run:
493                        ignored = int(child.get(ReportConstant.ignored, 0)) + 1
494                        child.set(ReportConstant.ignored, "%s" % ignored)
495                test_suite_elements.append(child)
496                for update_attribute in need_update_attributes:
497                    update_value = child.get(update_attribute, 0)
498                    if not update_value:
499                        update_value = 0
500                    test_suites_attributes[update_attribute] += int(
501                        update_value)
502
503        if test_suite_elements:
504            child = test_suite_elements[-1]
505            child.tail = self.data_helper.LINE_BREAK
506        else:
507            LOG.error("Execute result not exists")
508            return False
509
510        # set test suites element attributes and children
511        self._handle_module_tests(modules, test_suites_attributes)
512        self.data_helper.set_element_attributes(test_suites_element,
513                                                test_suites_attributes)
514        test_suites_element.extend(test_suite_elements)
515        return True
516
517    @classmethod
518    def _check_tests_and_unavailable(cls, child):
519        total = child.get(ReportConstant.tests, "0")
520        unavailable = child.get(ReportConstant.unavailable, "0")
521        if total and total != "0" and unavailable and \
522                unavailable != "0":
523            child.set(ReportConstant.unavailable, "0")
524            LOG.warning("%s total: %s, unavailable: %s", child.get(
525                ReportConstant.name), total, unavailable)
526
527    @classmethod
528    def _get_module_name(cls, data_report, root):
529        # get module name from data report
530        module_name = get_filename_extension(data_report)[0]
531        if "report" in module_name or "summary" in module_name or \
532                "<" in data_report or ">" in data_report:
533            module_name = root.get(ReportConstant.name,
534                                   ReportConstant.empty_name)
535            if "report" in module_name or "summary" in module_name:
536                module_name = ReportConstant.empty_name
537        return module_name
538
539    def _init_attributes(self):
540        test_suites_attributes = {
541            ReportConstant.name:
542                ReportConstant.summary_data_report.split(".")[0],
543            ReportConstant.start_time: self.task_info.test_time,
544            ReportConstant.end_time: time.strftime(ReportConstant.time_format,
545                                                   time.localtime()),
546            ReportConstant.errors: 0, ReportConstant.disabled: 0,
547            ReportConstant.failures: 0, ReportConstant.tests: 0,
548            ReportConstant.ignored: 0, ReportConstant.unavailable: 0,
549            ReportConstant.modules: 0, ReportConstant.run_modules: 0}
550        need_update_attributes = [ReportConstant.tests, ReportConstant.ignored,
551                                  ReportConstant.failures,
552                                  ReportConstant.disabled,
553                                  ReportConstant.errors,
554                                  ReportConstant.unavailable]
555        return test_suites_attributes, need_update_attributes
556
557    @property
558    def summary_data_report_exist(self):
559        return "<" in self.summary_data_str or \
560               os.path.exists(self.summary_data_path)
561
562    @property
563    def data_reports(self):
564        if check_pub_key_exist() or self._check_mode(ModeType.decc):
565            from xdevice import SuiteReporter
566            suite_reports = SuiteReporter.get_report_result()
567            if self._check_mode(ModeType.decc):
568                LOG.debug("Handle history result, data reports length:{}".
569                          format(len(suite_reports)))
570                SuiteReporter.clear_history_result()
571                SuiteReporter.append_history_result(suite_reports)
572            data_reports = []
573            for report_path, report_result in suite_reports:
574                module_name = get_filename_extension(report_path)[0]
575                data_reports.append((report_result, module_name))
576            SuiteReporter.clear_report_result()
577            return data_reports
578
579        if not os.path.isdir(self.report_path):
580            return []
581        data_reports = []
582        result_path = os.path.join(self.report_path, "result")
583        for root, _, files in os.walk(result_path):
584            for file_name in files:
585                if not file_name.endswith(self.data_helper.DATA_REPORT_SUFFIX):
586                    continue
587                module_name = self._find_module_name(result_path, root)
588                data_reports.append((os.path.join(root, file_name),
589                                     module_name))
590        return data_reports
591
592    @classmethod
593    def _find_module_name(cls, result_path, root):
594        # find module name from directory tree
595        common_path = os.path.commonpath([result_path, root])
596        if os.path.normcase(result_path) != os.path.normcase(common_path) or \
597                os.path.normcase(result_path) == os.path.normcase(root):
598            return ReportConstant.empty_name
599
600        root_dir, module_name = os.path.split(root)
601        if os.path.normcase(result_path) == os.path.normcase(root_dir):
602            return ReportConstant.empty_name
603        root_dir, subsystem_name = os.path.split(root_dir)
604        while os.path.normcase(result_path) != os.path.normcase(root_dir):
605            module_name = subsystem_name
606            root_dir, subsystem_name = os.path.split(root_dir)
607        return module_name
608
609    def _generate_summary(self):
610        if not self.summary_data_report_exist or \
611                self._check_mode(ModeType.decc):
612            return
613        summary_ini_content = \
614            "[default]\n" \
615            "Platform={}\n" \
616            "Test Type={}\n" \
617            "Device Name={}\n" \
618            "Host Info={}\n" \
619            "Test Start/ End Time={}\n" \
620            "Execution Time={}\n" \
621            "Device Type={}\n".format(
622                self.exec_info.platform, self.exec_info.test_type,
623                self.exec_info.device_name, self.exec_info.host_info,
624                self.exec_info.test_time, self.exec_info.execute_time,
625                self.exec_info.device_label)
626
627        if self.exec_info.product_info:
628            for key, value in self.exec_info.product_info.items():
629                summary_ini_content = "{}{}".format(
630                    summary_ini_content, "%s=%s\n" % (key, value))
631
632        if not self._check_mode(ModeType.factory):
633            summary_ini_content = "{}{}".format(
634                summary_ini_content, "Log Path=%s\n" % self.exec_info.log_path)
635
636        # write summary_ini_content
637        summary_filepath = os.path.join(self.report_path,
638                                        ReportConstant.summary_ini)
639
640        if platform.system() == "Windows":
641            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY
642        else:
643            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
644        summary_filepath_open = os.open(summary_filepath, flags,
645                                        FilePermission.mode_755)
646
647        with os.fdopen(summary_filepath_open, "wb") as file_handler:
648            if check_pub_key_exist():
649                try:
650                    cipher_text = do_rsa_encrypt(summary_ini_content)
651                except ParamError as error:
652                    LOG.error(error, error_no=error.error_no)
653                    cipher_text = b""
654                file_handler.write(cipher_text)
655            else:
656                file_handler.write(bytes(summary_ini_content, 'utf-8'))
657            file_handler.flush()
658            LOG.info("Generate summary ini: %s", summary_filepath)
659        self.repeat_helper.__generate_repeat_xml__(self.summary_data_path)
660
661    def _copy_report(self):
662        if Uploader.is_enable() or self._check_mode(ModeType.decc):
663            return
664
665        dst_path = os.path.join(Variables.temp_dir, "latest")
666        try:
667            shutil.rmtree(dst_path, ignore_errors=True)
668            os.makedirs(dst_path, exist_ok=True)
669            LOG.info("Copy summary files to %s", dst_path)
670
671            # copy reports to reports/latest folder
672            for report_file in os.listdir(self.report_path):
673                src_file = os.path.join(self.report_path, report_file)
674                dst_file = os.path.join(dst_path, report_file)
675                if os.path.isfile(src_file):
676                    shutil.copyfile(src_file, dst_file)
677        except OSError as _:
678            return
679
680    @classmethod
681    def _check_mode(cls, mode):
682        return Context.session().mode == mode
683
684    def _generate_task_record(self):
685        # under encryption status, don't handle anything directly
686        if check_pub_key_exist() and not self._check_mode(ModeType.decc):
687            return
688
689        # get info from command_queue
690        if Context.command_queue().size() == 0:
691            return
692        _, command, report_path = Context.command_queue().get(-1)
693
694        record_info = {
695            "command": command,
696            "session_id": os.path.split(report_path)[-1],
697            "report_path": report_path,
698            "unsuccessful_params": self.record_params,
699            "data_reports": self.record_reports
700        }
701
702        def encode(content):
703            # inner function to encode
704            return ' '.join([bin(ord(c)).replace('0b', '') for c in content])
705
706        # write into file
707        record_file = os.path.join(self.report_path,
708                                   ReportConstant.task_info_record)
709        _record_json = json.dumps(record_info, indent=2)
710
711        with open(file=record_file, mode="wb") as file:
712            if Context.session().mode == ModeType.decc:
713                # under decc, write in encoded text
714                file.write(bytes(encode(_record_json), encoding="utf-8"))
715            else:
716                # others, write in plain text
717                file.write(bytes(_record_json, encoding="utf-8"))
718
719        LOG.info("Generate record file: %s", record_file)
720
721    @classmethod
722    def get_task_info_params(cls, history_path):
723        # under encryption status, don't handle anything directly
724        if check_pub_key_exist() and not cls._check_mode(ModeType.decc):
725            return ()
726
727        def decode(content):
728            result_list = []
729            for b in content.split(' '):
730                result_list.append(chr(int(b, 2)))
731            return ''.join(result_list)
732
733        record_path = os.path.join(history_path,
734                                   ReportConstant.task_info_record)
735        if not os.path.exists(record_path):
736            LOG.error("%s not exists!", ReportConstant.task_info_record)
737            return ()
738
739        with open(record_path, mode="rb") as file:
740            if Context.session().mode == ModeType.decc:
741                # under decc, read from encoded text
742                result = json.loads(decode(file.read().decode("utf-8")))
743            else:
744                # others, read from plain text
745                result = json.loads(file.read())
746        standard_length = 5
747        if not len(result.keys()) == standard_length:
748            LOG.error("%s error!", ReportConstant.task_info_record)
749            return ()
750
751        return result
752
753    def _transact_all(self):
754        pyc_path = os.path.join(Variables.res_dir, "tools", "binder.pyc")
755        if not os.path.exists(pyc_path):
756            return
757        module_spec = util.spec_from_file_location("binder", pyc_path)
758        if not module_spec:
759            return
760        module = util.module_from_spec(module_spec)
761        module_spec.loader.exec_module(module)
762        if hasattr(module, "transact") and callable(module.transact):
763            module.transact(self, LOG)
764        del module
765
766    @classmethod
767    def _handle_module_tests(cls, modules, test_suites_attributes):
768        modules_list = list()
769        modules_zero = list()
770        for module_name, detail_list in modules.items():
771            for total in detail_list:
772                modules_list.append(total)
773                if total == 0:
774                    modules_zero.append(module_name)
775        test_suites_attributes[ReportConstant.run_modules] = \
776            len(modules_list) - len(modules_zero)
777        test_suites_attributes[ReportConstant.modules] = len(modules_list)
778        if modules_zero:
779            LOG.info("The total tests of %s module is 0", ",".join(
780                modules_zero))
781
782    # ******************** 使用旧报告模板的代码 BEGIN ********************
783    def _generate_vision_reports(self):
784        from _core.report.reporter_helper import VisionHelper
785        vision_helper = VisionHelper()
786        vision_helper.report_path = self.report_path
787        if not self._check_mode(ModeType.decc) and not \
788                self.summary_data_report_exist:
789            LOG.error("Summary data report not exists")
790            return
791
792        if check_pub_key_exist() or self._check_mode(ModeType.decc):
793            from xdevice import SuiteReporter
794            SuiteReporter.clear_report_result()
795
796        # parse data
797        if self.summary_data_str:
798            # only in decc mode and pub key, self.summary_data_str is not empty
799            summary_element_tree = self.data_helper.parse_data_report(
800                self.summary_data_str)
801        else:
802            summary_element_tree = self.data_helper.parse_data_report(
803                self.summary_data_path)
804        parsed_data = vision_helper.parse_element_data(
805            summary_element_tree, self.report_path, self.task_info)
806        self.parsed_data = parsed_data
807        self.exec_info, summary, _ = parsed_data
808
809        if self._check_mode(ModeType.decc):
810            return
811
812        LOG.info("Summary result: modules: %s, run modules: %s, total: "
813                 "%s, passed: %s, failed: %s, blocked: %s, ignored: %s, "
814                 "unavailable: %s", summary.modules, summary.run_modules,
815                 summary.result.total, summary.result.passed,
816                 summary.result.failed, summary.result.blocked,
817                 summary.result.ignored, summary.result.unavailable)
818        LOG.info("Log path: %s", self.exec_info.log_path)
819
820        if summary.result.failed != 0 or summary.result.blocked != 0 or\
821                summary.result.unavailable != 0:
822            from xdevice import Scheduler
823            Scheduler.is_need_auto_retry = True
824
825        # generate summary vision report
826        report_generate_flag = self._generate_vision_report(
827            vision_helper, parsed_data, ReportConstant.summary_title,
828            ReportConstant.summary_vision_report)
829
830        # generate details vision report
831        if report_generate_flag and summary.result.total > 0:
832            self._generate_vision_report(
833                vision_helper, parsed_data, ReportConstant.details_title,
834                ReportConstant.details_vision_report)
835
836        # generate failures vision report
837        if summary.result.total != (
838                summary.result.passed + summary.result.ignored) or \
839                summary.result.unavailable > 0:
840            self._generate_vision_report(
841                vision_helper, parsed_data, ReportConstant.failures_title,
842                ReportConstant.failures_vision_report)
843
844        # generate passes vision report
845        if summary.result.passed != 0:
846            self._generate_vision_report(
847                vision_helper, parsed_data, ReportConstant.passes_title,
848                ReportConstant.passes_vision_report)
849
850        # generate ignores vision report
851        if summary.result.ignored != 0:
852            self._generate_vision_report(
853                vision_helper, parsed_data, ReportConstant.ignores_title,
854                ReportConstant.ignores_vision_report)
855
856    def _generate_vision_report(self, vision_helper, parsed_data, title, render_target):
857        # render data
858        report_context = vision_helper.render_data(
859            title, parsed_data,
860            render_target=render_target, devices=self.summary.get_devices())
861
862        # generate report
863        if report_context:
864            report_path = os.path.join(self.report_path, render_target)
865            vision_helper.generate_report(report_path, report_context)
866            return True
867        else:
868            LOG.error("Failed to generate %s", render_target)
869            return False
870    # ******************** 使用旧报告模板的代码 END ********************
871