• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import collections
20import copy
21import json
22import os
23import platform
24import re
25import shutil
26import time
27import stat
28from importlib import util
29from xml.etree import ElementTree
30
31from _core.interface import IReporter
32from _core.plugin import Plugin
33from _core.constants import CaseResult
34from _core.constants import DeviceProperties
35from _core.constants import ModeType
36from _core.constants import TestType
37from _core.constants import FilePermission
38from _core.logger import platform_logger
39from _core.exception import ParamError
40from _core.utils import calculate_elapsed_time
41from _core.utils import calculate_percent
42from _core.utils import copy_folder
43from _core.utils import get_filename_extension
44from _core.utils import parse_xml_cdata
45from _core.utils import show_current_environment
46from _core.report.encrypt import check_pub_key_exist
47from _core.report.encrypt import do_rsa_encrypt
48from _core.report.reporter_helper import DataHelper
49from _core.report.reporter_helper import ExecInfo
50from _core.report.reporter_helper import ReportConstant
51from _core.report.repeater_helper import RepeatHelper
52from _core.variables import Variables
53from _core.context.center import Context
54from _core.context.handler import get_case_result
55from _core.context.upload import Uploader
56
57LOG = platform_logger("ResultReporter")
58
59
60class ResultSummary:
61
62    def __init__(self):
63        self.modules = 0
64        self.repeat = 1
65        self.runmodules = 0
66        self.tests = 0
67        self.passed = 0
68        self.failed = 0
69        self.blocked = 0
70        self.ignored = 0
71        self.unavailable = 0
72        self.devices = []
73        self.__module_list = []
74
75    def get_data(self):
76        self.__module_list.clear()
77        LOG.info(f"Test Summary: modules: {self.modules}, repeat: {self.repeat}, run modules: {self.runmodules}, "
78                 f"total: {self.tests}, passed: {self.passed}, failed: {self.failed}, "
79                 f"blocked: {self.blocked}, ignored: {self.ignored}, unavailable: {self.unavailable}")
80        data = {
81            "modules": self.modules,
82            "repeat": self.repeat,
83            "runmodules": self.runmodules,
84            "tests": self.tests,
85            "passed": self.passed,
86            "failed": self.failed,
87            "blocked": self.blocked,
88            "ignored": self.ignored,
89            "unavailable": self.unavailable
90        }
91        return data
92
93    def get_devices(self):
94        return self.devices
95
96    def add_module(self, name):
97        if name not in self.__module_list:
98            self.__module_list.append(name)
99            self.modules += 1
100
101
102@Plugin(type=Plugin.REPORTER, id=TestType.all)
103class ResultReporter(IReporter):
104
105    def __init__(self):
106        self.report_path = None
107        self.task_info = None
108        self.summary_data_path = None
109        self.summary_data_str = ""
110        self.exec_info = None
111        self.data_helper = None
112        self.repeat_helper = None
113        self.summary = ResultSummary()
114
115        # task_record.info数据
116        self._failed_cases = []
117        self.record_params = {}
118        self.record_reports = {}
119
120    def __generate_reports__(self, report_path, **kwargs):
121        LOG.info("")
122        LOG.info("**************************************************")
123        LOG.info("************** Start generate reports ************")
124        LOG.info("**************************************************")
125        LOG.info("")
126
127        if self._check_params(report_path, **kwargs):
128            # generate data report
129            self._generate_data_report()
130
131            # generate vision reports
132            self._generate_test_report()
133
134            # generate task info record
135            self._generate_task_record()
136
137            # generate summary ini
138            self._generate_summary()
139
140            # copy reports to reports/latest folder
141            self._copy_report()
142
143            self._transact_all()
144
145        LOG.info("")
146        LOG.info("**************************************************")
147        LOG.info("************** Ended generate reports ************")
148        LOG.info("**************************************************")
149        LOG.info("")
150        show_current_environment()
151
152    def _check_params(self, report_path, **kwargs):
153        task_info = kwargs.get("task_info", "")
154        if not report_path:
155            LOG.error("Report path is wrong", error_no="00440",
156                      ReportPath=report_path)
157            return False
158        if not task_info or not isinstance(task_info, ExecInfo):
159            LOG.error("Task info is wrong", error_no="00441",
160                      TaskInfo=task_info)
161            return False
162
163        os.makedirs(report_path, exist_ok=True)
164        self.report_path = report_path
165        self.task_info = task_info
166        self.summary_data_path = os.path.join(
167            self.report_path, ReportConstant.summary_data_report)
168        self.exec_info = task_info
169        self.data_helper = DataHelper()
170        self.repeat_helper = RepeatHelper(report_path)
171        return True
172
173    def _generate_test_report(self):
174        report_template = os.path.join(Variables.res_dir, "template")
175        missing_files = []
176        for source in ReportConstant.new_template_sources:
177            file = source.get("file")
178            to_path = os.path.join(report_template, file)
179            if not os.path.exists(to_path) or os.path.getsize(to_path) == 0:
180                missing_files.append(to_path)
181        # 若新报告模板文件缺失,则使用旧报告模板生成测试报告
182        if len(missing_files) > 0:
183            self._generate_vision_reports()
184            return
185
186        copy_folder(report_template, self.report_path)
187        temp_file_report_html = os.path.join(self.report_path, "report.html")
188        if os.path.exists(temp_file_report_html):
189            os.remove(temp_file_report_html)
190        content = json.dumps(self._get_summary_data(), separators=(",", ":"))
191        data_js = os.path.join(self.report_path, "static", "data.js")
192        data_fd = os.open(data_js, os.O_CREAT | os.O_WRONLY, FilePermission.mode_644)
193        with os.fdopen(data_fd, mode="w", encoding="utf-8") as jsf:
194            jsf.write(f"window.reportData = {content}")
195        test_report = os.path.join(self.report_path, ReportConstant.summary_vision_report).replace("\\", "/")
196        LOG.info(f"Log path: {self.report_path}")
197        LOG.info(f"Generate test report: file:///{test_report}")
198        # 重新生成对象,避免在retry场景数据统计有误
199        self.summary = ResultSummary()
200
201    def _get_summary_data(self):
202        self.summary.repeat = self.task_info.repeat
203        modules = []
204        for data_report, _ in self.data_reports:
205            if data_report.endswith(ReportConstant.summary_data_report):
206                continue
207            info = self._parse_module(data_report)
208            if info is not None:
209                modules.append(info)
210        if self.summary.failed != 0 or self.summary.blocked != 0 or self.summary.unavailable != 0:
211            if Context.get_scheduler():
212                Context.get_scheduler().set_need_auto_retry(True)
213        data = {
214            "exec_info": self._get_exec_info(),
215            "summary": self.summary.get_data(),
216            "devices": self.summary.get_devices(),
217            "modules": modules,
218        }
219        return data
220
221    def _get_exec_info(self):
222        start_time = self.task_info.test_time
223        end_time = time.strftime(ReportConstant.time_format, time.localtime())
224        test_time = "%s/ %s" % (start_time, end_time)
225        execute_time = calculate_elapsed_time(
226            time.mktime(time.strptime(start_time, ReportConstant.time_format)),
227            time.mktime(time.strptime(end_time, ReportConstant.time_format)))
228        host_info = platform.platform()
229        device_name = getattr(self.task_info, ReportConstant.device_name, "None")
230        device_type = getattr(self.task_info, ReportConstant.device_label, "None")
231        platform_info = getattr(self.task_info, ReportConstant.platform, "None")
232        test_type = getattr(self.task_info, ReportConstant.test_type, "None")
233
234        # 为报告文件summary.ini提供数据
235        exec_info = ExecInfo()
236        exec_info.device_name = device_name
237        exec_info.device_label = device_type
238        exec_info.execute_time = execute_time
239        exec_info.host_info = host_info
240        exec_info.log_path = self.report_path
241        exec_info.platform = platform_info
242        exec_info.test_time = test_time
243        exec_info.test_type = test_type
244        self.exec_info = exec_info
245
246        info = {
247            "test_start": start_time,
248            "test_end": end_time,
249            "execute_time": execute_time,
250            "test_type": test_type,
251            "host_info": host_info,
252            "logs": self._get_task_log()
253        }
254        return info
255
256    def _parse_module(self, xml_file):
257        """解析测试模块"""
258        file_name = os.path.basename(xml_file)
259        try:
260            xml_file_open = os.open(xml_file, os.O_RDWR, stat.S_IWUSR | stat.S_IRUSR)
261            with os.fdopen(xml_file_open, mode="r", encoding="utf-8") as file_handler:
262                xml_str = file_handler.read()
263            for char_index in range(32):
264                if char_index in [10, 13]:
265                    continue
266                xml_str = xml_str.replace(chr(char_index), "")
267            ele_module = ElementTree.fromstring(xml_str)
268        except ElementTree.ParseError as e:
269            LOG.error(f"parse result xml error! xml file {xml_file}")
270            LOG.error(f"error message: {e}")
271            return None
272        module = ResultReporter._count_result(ele_module)
273        # 当模块名为空或为AllTests,将模块名设为结果xml的文件名
274        module_name = file_name[:-4] if module.name in ["", "AllTests"] else module.name.strip()
275        suites = [self._parse_testsuite(ele_suite) for ele_suite in ele_module]
276
277        # 为报告文件task_record.info提供数据
278        self.record_reports.update({module_name: xml_file})
279        if len(self._failed_cases) != 0:
280            self.record_params.update({module_name: copy.copy(self._failed_cases)})
281            self._failed_cases.clear()
282
283        self.summary.add_module(module_name)
284        self.summary.tests += module.tests
285        self.summary.passed += module.passed
286        self.summary.failed += module.failed
287        self.summary.blocked += module.blocked
288        self.summary.ignored += module.ignored
289        if module.unavailable == 0:
290            self.summary.runmodules += 1
291        else:
292            self.summary.unavailable += 1
293        devices = self._parse_devices(ele_module)
294
295        module_report, module_time = module.report, module.time
296        if len(suites) == 1 and suites[0].get(ReportConstant.name) == module_name:
297            report = suites[0].get(ReportConstant.report)
298            if report != "":
299                module_report = report
300            module_time = suites[0].get(ReportConstant.time)
301        repeat = int(ele_module.get(ReportConstant.repeat, "1"))
302        if self.summary.repeat < repeat:
303            self.summary.repeat = repeat
304        repeat_round = int(ele_module.get(ReportConstant.round, "1"))
305        test_type = ele_module.get(ReportConstant.test_type, "-")
306        test_start = ele_module.get(ReportConstant.start_time, "-")
307        test_end = ele_module.get(ReportConstant.end_time, "-")
308        if test_start != "-" and test_end != "-":
309            execute_time = calculate_elapsed_time(
310                time.mktime(time.strptime(test_start, ReportConstant.time_format)),
311                time.mktime(time.strptime(test_end, ReportConstant.time_format)))
312        else:
313            execute_time = calculate_elapsed_time(0, module_time)
314        info = {
315            "name": module_name,
316            "report": module_report,
317            "round": repeat_round,
318            "test_type": test_type,
319            "test_start": test_start,
320            "test_end": test_end,
321            "time": module_time,
322            "execute_time": execute_time,
323            "tests": module.tests,
324            "passed": module.passed,
325            "failed": module.failed,
326            "blocked": module.blocked,
327            "ignored": module.ignored,
328            "unavailable": module.unavailable,
329            "passingrate": calculate_percent(module.passed, module.tests),
330            "error": ele_module.get(ReportConstant.message, ""),
331            "logs": self._get_module_logs(module_name, repeat=repeat, repeat_round=repeat_round),
332            "devices": devices,
333            "suites": suites
334        }
335        return info
336
337    def _parse_testsuite(self, ele_suite):
338        """解析测试套"""
339        suite = ResultReporter._count_result(ele_suite)
340        cases = [self._parse_testcase(case) for case in ele_suite]
341        info = {
342            "name": suite.name,
343            "report": suite.report,
344            "time": suite.time,
345            "tests": suite.tests,
346            "passed": suite.passed,
347            "failed": suite.failed,
348            "blocked": suite.blocked,
349            "ignored": suite.ignored,
350            "passingrate": calculate_percent(suite.passed, suite.tests),
351            "cases": cases
352        }
353        return info
354
355    def _parse_testcase(self, ele_case):
356        """解析测试用例"""
357        name = ele_case.get(ReportConstant.name)
358        class_name = ele_case.get(ReportConstant.class_name, "")
359        result, error = get_case_result(ele_case)
360        if result != CaseResult.passed:
361            self._failed_cases.append(f"{class_name}#{name}")
362        return [name, class_name, result, ResultReporter._parse_time(ele_case),
363                error, ele_case.get(ReportConstant.report, "")]
364
365    @staticmethod
366    def _parse_time(ele):
367        try:
368            _time = float(ele.get(ReportConstant.time, "0"))
369        except ValueError:
370            _time = 0.0
371            LOG.error("parse test time error, set it to 0.0")
372        return _time
373
374    def _parse_devices(self, ele_module):
375        devices_str = ele_module.get(ReportConstant.devices, "")
376        if devices_str == "":
377            return []
378        try:
379            devices = json.loads(parse_xml_cdata(devices_str))
380        except Exception as e:
381            LOG.warning(f"parse devices from xml failed, {e}")
382            return []
383        # 汇总测试设备信息
384        for device in devices:
385            device_sn = device.get(DeviceProperties.sn, "")
386            temp = [d for d in self.summary.get_devices() if d.get(DeviceProperties.sn, "") == device_sn]
387            if len(temp) != 0:
388                continue
389            self.summary.get_devices().append(device)
390        return devices
391
392    @staticmethod
393    def _count_result(ele):
394        name = ele.get(ReportConstant.name, "")
395        report = ele.get(ReportConstant.report, "")
396        _time = ResultReporter._parse_time(ele)
397        tests = int(ele.get(ReportConstant.tests, "0"))
398        failed = int(ele.get(ReportConstant.failures, "0"))
399        disabled = ele.get(ReportConstant.disabled, "0")
400        if disabled == "":
401            disabled = "0"
402        errors = ele.get(ReportConstant.errors, "0")
403        if errors == "":
404            errors = "0"
405        blocked = int(disabled) + int(errors)
406        ignored = int(ele.get(ReportConstant.ignored, "0"))
407        unavailable = int(ele.get(ReportConstant.unavailable, "0"))
408
409        tmp_pass = tests - failed - blocked - ignored
410        passed = tmp_pass if tmp_pass > 0 else 0
411
412        Result = collections.namedtuple(
413            'Result',
414            ['name', 'report', 'time', 'tests', 'passed', 'failed', 'blocked', 'ignored', 'unavailable'])
415        return Result(name, report, _time, tests, passed, failed, blocked, ignored, unavailable)
416
417    def _get_module_logs(self, module_name, repeat=1, repeat_round=1):
418        """获取模块运行日志和设备日志
419        注:黑盒用例的测试报告是单独生成的,而xts只有模块级的设备日志,无用例级日志,故本方法仅支持获取模块级的设备日志
420        """
421        device_log = {}
422        round_folder = f"round{repeat_round}" if repeat > 1 else ""
423        log_path = os.path.join(self.report_path, "log", round_folder, module_name)
424        if not os.path.exists(log_path):
425            return device_log
426        module_log_uri = f"log/{round_folder}/{module_name}" if round_folder else f"log/{module_name}"
427        for filename in os.listdir(log_path):
428            file_link = f"{module_log_uri}/{filename}"
429            file_path = os.path.join(log_path, filename)
430            # 目录和模块运行日志,都提供链接
431            if os.path.isdir(file_path) or filename.startswith(ReportConstant.module_run_log):
432                device_log.setdefault(filename, file_link)
433                continue
434            # 是文件,仅提供模块级的设备日志链接
435            # 测试套日志命名格式device_log_sn.log、测试套子用例日志命名格式device_log_case_sn.log,后者“_”大于2
436            ret = re.fullmatch(r'(device_(?:hi)?log)_\S+\.log', filename)
437            if ret is None or filename.count("_") > 2:
438                continue
439            device_log.setdefault(ret.group(1), file_link)
440        return device_log
441
442    def _get_task_log(self):
443        log_path = os.path.join(self.report_path, "log")
444        if not os.path.exists(log_path):
445            return {}
446        return {f: f"log/{f}" for f in os.listdir(log_path) if f.startswith(ReportConstant.task_run_log)}
447
448    def _generate_data_report(self):
449        # initial element
450        test_suites_element = self.data_helper.initial_suites_element()
451
452        # update test suites element
453        update_flag = self._update_test_suites(test_suites_element)
454        if not update_flag:
455            return
456
457        # generate report
458        if not self._check_mode(ModeType.decc):
459            self.data_helper.generate_report(test_suites_element,
460                                             self.summary_data_path)
461
462    def _update_test_suites(self, test_suites_element):
463        # initial attributes for test suites element
464        test_suites_attributes, need_update_attributes = \
465            self._init_attributes()
466
467        # get test suite elements that are children of test suites element
468        modules = dict()
469        test_suite_elements = []
470        for data_report, module_name in self.data_reports:
471            if data_report.endswith(ReportConstant.summary_data_report):
472                continue
473            root = self.data_helper.parse_data_report(data_report)
474            self._parse_devices(root)
475            if module_name == ReportConstant.empty_name:
476                module_name = self._get_module_name(data_report, root)
477            total = int(root.get(ReportConstant.tests, 0))
478            if module_name not in modules.keys():
479                modules[module_name] = list()
480            modules[module_name].append(total)
481
482            failed_cases = []
483            for child in root:
484                child.tail = self.data_helper.LINE_BREAK_INDENT
485                if not child.get(ReportConstant.module_name) or child.get(
486                        ReportConstant.module_name) == \
487                        ReportConstant.empty_name:
488                    child.set(ReportConstant.module_name, module_name)
489                self._check_tests_and_unavailable(child)
490                # covert the status of "notrun" to "ignored"
491                for element in child:
492                    if element.get(ReportConstant.status, "") == \
493                            ReportConstant.not_run:
494                        ignored = int(child.get(ReportConstant.ignored, 0)) + 1
495                        child.set(ReportConstant.ignored, "%s" % ignored)
496                    # 记录运行失败的测试用例的编号
497                    if get_case_result(element)[0] != CaseResult.passed:
498                        class_name = element.get(ReportConstant.class_name, "")
499                        name = element.get(ReportConstant.name)
500                        failed_cases.append(f"{class_name}#{name}")
501                test_suite_elements.append(child)
502                for update_attribute in need_update_attributes:
503                    update_value = child.get(update_attribute, 0)
504                    if not update_value:
505                        update_value = 0
506                    test_suites_attributes[update_attribute] += int(
507                        update_value)
508            if failed_cases:
509                self.record_params.update({module_name: failed_cases})
510                self.record_reports.update({module_name: data_report})
511
512        if test_suite_elements:
513            child = test_suite_elements[-1]
514            child.tail = self.data_helper.LINE_BREAK
515        else:
516            LOG.error("Execute result not exists")
517            return False
518
519        # set test suites element attributes and children
520        self._handle_module_tests(modules, test_suites_attributes)
521        self.data_helper.set_element_attributes(test_suites_element,
522                                                test_suites_attributes)
523        test_suites_element.extend(test_suite_elements)
524        return True
525
526    @classmethod
527    def _check_tests_and_unavailable(cls, child):
528        total = child.get(ReportConstant.tests, "0")
529        unavailable = child.get(ReportConstant.unavailable, "0")
530        if total and total != "0" and unavailable and \
531                unavailable != "0":
532            child.set(ReportConstant.unavailable, "0")
533            LOG.warning("%s total: %s, unavailable: %s", child.get(
534                ReportConstant.name), total, unavailable)
535
536    @classmethod
537    def _get_module_name(cls, data_report, root):
538        # get module name from data report
539        module_name = get_filename_extension(data_report)[0]
540        if "report" in module_name or "summary" in module_name or \
541                "<" in data_report or ">" in data_report:
542            module_name = root.get(ReportConstant.name,
543                                   ReportConstant.empty_name)
544            if "report" in module_name or "summary" in module_name:
545                module_name = ReportConstant.empty_name
546        return module_name
547
548    def _init_attributes(self):
549        test_suites_attributes = {
550            ReportConstant.name:
551                ReportConstant.summary_data_report.split(".")[0],
552            ReportConstant.start_time: self.task_info.test_time,
553            ReportConstant.end_time: time.strftime(ReportConstant.time_format,
554                                                   time.localtime()),
555            ReportConstant.errors: 0, ReportConstant.disabled: 0,
556            ReportConstant.failures: 0, ReportConstant.tests: 0,
557            ReportConstant.ignored: 0, ReportConstant.unavailable: 0,
558            ReportConstant.modules: 0, ReportConstant.run_modules: 0}
559        need_update_attributes = [ReportConstant.tests, ReportConstant.ignored,
560                                  ReportConstant.failures,
561                                  ReportConstant.disabled,
562                                  ReportConstant.errors,
563                                  ReportConstant.unavailable]
564        return test_suites_attributes, need_update_attributes
565
566    @property
567    def summary_data_report_exist(self):
568        return "<" in self.summary_data_str or \
569               os.path.exists(self.summary_data_path)
570
571    @property
572    def data_reports(self):
573        if check_pub_key_exist() or self._check_mode(ModeType.decc):
574            from xdevice import SuiteReporter
575            suite_reports = SuiteReporter.get_report_result()
576            if self._check_mode(ModeType.decc):
577                LOG.debug("Handle history result, data reports length:{}".
578                          format(len(suite_reports)))
579                SuiteReporter.clear_history_result()
580                SuiteReporter.append_history_result(suite_reports)
581            data_reports = []
582            for report_path, report_result in suite_reports:
583                module_name = get_filename_extension(report_path)[0]
584                data_reports.append((report_result, module_name))
585            SuiteReporter.clear_report_result()
586            return data_reports
587
588        if not os.path.isdir(self.report_path):
589            return []
590        data_reports = []
591        result_path = os.path.join(self.report_path, "result")
592        for root, _, files in os.walk(result_path):
593            for file_name in files:
594                if not file_name.endswith(self.data_helper.DATA_REPORT_SUFFIX):
595                    continue
596                module_name = self._find_module_name(result_path, root)
597                data_reports.append((os.path.join(root, file_name),
598                                     module_name))
599        return data_reports
600
601    @classmethod
602    def _find_module_name(cls, result_path, root):
603        # find module name from directory tree
604        common_path = os.path.commonpath([result_path, root])
605        if os.path.normcase(result_path) != os.path.normcase(common_path) or \
606                os.path.normcase(result_path) == os.path.normcase(root):
607            return ReportConstant.empty_name
608
609        root_dir, module_name = os.path.split(root)
610        if os.path.normcase(result_path) == os.path.normcase(root_dir):
611            return ReportConstant.empty_name
612        root_dir, subsystem_name = os.path.split(root_dir)
613        while os.path.normcase(result_path) != os.path.normcase(root_dir):
614            module_name = subsystem_name
615            root_dir, subsystem_name = os.path.split(root_dir)
616        return module_name
617
618    def _generate_summary(self):
619        if not self.summary_data_report_exist or \
620                self._check_mode(ModeType.decc):
621            return
622        summary_ini_content = \
623            "[default]\n" \
624            "Platform={}\n" \
625            "Test Type={}\n" \
626            "Device Name={}\n" \
627            "Host Info={}\n" \
628            "Test Start/ End Time={}\n" \
629            "Execution Time={}\n" \
630            "Device Type={}\n".format(
631                self.exec_info.platform, self.exec_info.test_type,
632                self.exec_info.device_name, self.exec_info.host_info,
633                self.exec_info.test_time, self.exec_info.execute_time,
634                self.exec_info.device_label)
635
636        if self.exec_info.product_info:
637            for key, value in self.exec_info.product_info.items():
638                summary_ini_content = "{}{}".format(
639                    summary_ini_content, "%s=%s\n" % (key, value))
640
641        if not self._check_mode(ModeType.factory):
642            summary_ini_content = "{}{}".format(
643                summary_ini_content, "Log Path=%s\n" % self.exec_info.log_path)
644
645        # write summary_ini_content
646        summary_filepath = os.path.join(self.report_path,
647                                        ReportConstant.summary_ini)
648
649        if platform.system() == "Windows":
650            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY
651        else:
652            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
653        summary_filepath_open = os.open(summary_filepath, flags,
654                                        FilePermission.mode_755)
655
656        with os.fdopen(summary_filepath_open, "wb") as file_handler:
657            if check_pub_key_exist():
658                try:
659                    cipher_text = do_rsa_encrypt(summary_ini_content)
660                except ParamError as error:
661                    LOG.error(error, error_no=error.error_no)
662                    cipher_text = b""
663                file_handler.write(cipher_text)
664            else:
665                file_handler.write(bytes(summary_ini_content, 'utf-8'))
666            file_handler.flush()
667            LOG.info("Generate summary ini: %s", summary_filepath)
668        self.repeat_helper.__generate_repeat_xml__(self.summary_data_path)
669
670    def _copy_report(self):
671        if Uploader.is_enable() or self._check_mode(ModeType.decc):
672            return
673
674        dst_path = os.path.join(Variables.temp_dir, "latest")
675        try:
676            shutil.rmtree(dst_path, ignore_errors=True)
677            os.makedirs(dst_path, exist_ok=True)
678            LOG.info("Copy summary files to %s", dst_path)
679
680            # copy reports to reports/latest folder
681            for report_file in os.listdir(self.report_path):
682                src_file = os.path.join(self.report_path, report_file)
683                dst_file = os.path.join(dst_path, report_file)
684                if os.path.isfile(src_file):
685                    shutil.copyfile(src_file, dst_file)
686        except OSError as _:
687            return
688
689    @classmethod
690    def _check_mode(cls, mode):
691        return Context.session().mode == mode
692
693    def _generate_task_record(self):
694        # under encryption status, don't handle anything directly
695        if check_pub_key_exist() and not self._check_mode(ModeType.decc):
696            return
697
698        # get info from command_queue
699        if Context.command_queue().size() == 0:
700            return
701        _, command, report_path = Context.command_queue().get(-1)
702
703        record_info = {
704            "command": command,
705            "session_id": os.path.split(report_path)[-1],
706            "report_path": report_path,
707            "unsuccessful_params": self.record_params,
708            "data_reports": self.record_reports
709        }
710
711        def encode(content):
712            # inner function to encode
713            return ' '.join([bin(ord(c)).replace('0b', '') for c in content])
714
715        # write into file
716        record_file = os.path.join(self.report_path,
717                                   ReportConstant.task_info_record)
718        _record_json = json.dumps(record_info, indent=2)
719
720        with open(file=record_file, mode="wb") as file:
721            if Context.session().mode == ModeType.decc:
722                # under decc, write in encoded text
723                file.write(bytes(encode(_record_json), encoding="utf-8"))
724            else:
725                # others, write in plain text
726                file.write(bytes(_record_json, encoding="utf-8"))
727
728        LOG.info("Generate record file: %s", record_file)
729
730    @classmethod
731    def get_task_info_params(cls, history_path):
732        # under encryption status, don't handle anything directly
733        if check_pub_key_exist() and not cls._check_mode(ModeType.decc):
734            return ()
735
736        def decode(content):
737            result_list = []
738            for b in content.split(' '):
739                result_list.append(chr(int(b, 2)))
740            return ''.join(result_list)
741
742        record_path = os.path.join(history_path,
743                                   ReportConstant.task_info_record)
744        if not os.path.exists(record_path):
745            LOG.error("%s not exists!", ReportConstant.task_info_record)
746            return ()
747
748        with open(record_path, mode="rb") as file:
749            if Context.session().mode == ModeType.decc:
750                # under decc, read from encoded text
751                result = json.loads(decode(file.read().decode("utf-8")))
752            else:
753                # others, read from plain text
754                result = json.loads(file.read())
755        standard_length = 5
756        if not len(result.keys()) == standard_length:
757            LOG.error("%s error!", ReportConstant.task_info_record)
758            return ()
759
760        return result
761
762    def _transact_all(self):
763        pyc_path = os.path.join(Variables.res_dir, "tools", "binder.pyc")
764        if not os.path.exists(pyc_path):
765            return
766        module_spec = util.spec_from_file_location("binder", pyc_path)
767        if not module_spec:
768            return
769        module = util.module_from_spec(module_spec)
770        module_spec.loader.exec_module(module)
771        if hasattr(module, "transact") and callable(module.transact):
772            module.transact(self, LOG)
773        del module
774
775    @classmethod
776    def _handle_module_tests(cls, modules, test_suites_attributes):
777        modules_list = list()
778        modules_zero = list()
779        for module_name, detail_list in modules.items():
780            for total in detail_list:
781                modules_list.append(total)
782                if total == 0:
783                    modules_zero.append(module_name)
784        test_suites_attributes[ReportConstant.run_modules] = \
785            len(modules_list) - len(modules_zero)
786        test_suites_attributes[ReportConstant.modules] = len(modules_list)
787        if modules_zero:
788            LOG.info("The total tests of %s module is 0", ",".join(
789                modules_zero))
790
791    # ******************** 使用旧报告模板的代码 BEGIN ********************
792    def _generate_vision_reports(self):
793        from _core.report.reporter_helper import VisionHelper
794        vision_helper = VisionHelper()
795        vision_helper.report_path = self.report_path
796        if not self._check_mode(ModeType.decc) and not \
797                self.summary_data_report_exist:
798            LOG.error("Summary data report not exists")
799            return
800
801        if check_pub_key_exist() or self._check_mode(ModeType.decc):
802            from xdevice import SuiteReporter
803            SuiteReporter.clear_report_result()
804
805        # parse data
806        if self.summary_data_str:
807            # only in decc mode and pub key, self.summary_data_str is not empty
808            summary_element_tree = self.data_helper.parse_data_report(
809                self.summary_data_str)
810        else:
811            summary_element_tree = self.data_helper.parse_data_report(
812                self.summary_data_path)
813        parsed_data = vision_helper.parse_element_data(
814            summary_element_tree, self.report_path, self.task_info)
815        self.parsed_data = parsed_data
816        self.exec_info, summary, _ = parsed_data
817
818        if self._check_mode(ModeType.decc):
819            return
820
821        LOG.info("Summary result: modules: %s, run modules: %s, total: "
822                 "%s, passed: %s, failed: %s, blocked: %s, ignored: %s, "
823                 "unavailable: %s", summary.modules, summary.run_modules,
824                 summary.result.total, summary.result.passed,
825                 summary.result.failed, summary.result.blocked,
826                 summary.result.ignored, summary.result.unavailable)
827        LOG.info("Log path: %s", self.exec_info.log_path)
828
829        if summary.result.failed != 0 or summary.result.blocked != 0 or\
830                summary.result.unavailable != 0:
831            from xdevice import Scheduler
832            Scheduler.is_need_auto_retry = True
833
834        # generate summary vision report
835        report_generate_flag = self._generate_vision_report(
836            vision_helper, parsed_data, ReportConstant.summary_title,
837            ReportConstant.summary_vision_report)
838
839        # generate details vision report
840        if report_generate_flag and summary.result.total > 0:
841            self._generate_vision_report(
842                vision_helper, parsed_data, ReportConstant.details_title,
843                ReportConstant.details_vision_report)
844
845        # generate failures vision report
846        if summary.result.total != (
847                summary.result.passed + summary.result.ignored) or \
848                summary.result.unavailable > 0:
849            self._generate_vision_report(
850                vision_helper, parsed_data, ReportConstant.failures_title,
851                ReportConstant.failures_vision_report)
852
853        # generate passes vision report
854        if summary.result.passed != 0:
855            self._generate_vision_report(
856                vision_helper, parsed_data, ReportConstant.passes_title,
857                ReportConstant.passes_vision_report)
858
859        # generate ignores vision report
860        if summary.result.ignored != 0:
861            self._generate_vision_report(
862                vision_helper, parsed_data, ReportConstant.ignores_title,
863                ReportConstant.ignores_vision_report)
864
865    def _generate_vision_report(self, vision_helper, parsed_data, title, render_target):
866        # render data
867        report_context = vision_helper.render_data(
868            title, parsed_data,
869            render_target=render_target, devices=self.summary.get_devices())
870
871        # generate report
872        if report_context:
873            report_path = os.path.join(self.report_path, render_target)
874            vision_helper.generate_report(report_path, report_context)
875            return True
876        else:
877            LOG.error("Failed to generate %s", render_target)
878            return False
879    # ******************** 使用旧报告模板的代码 END ********************
880