• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import collections
20import copy
21import json
22import os
23import platform
24import shutil
25import time
26import zipfile
27from importlib import util
28from ast import literal_eval
29from xml.etree import ElementTree
30
31from _core.interface import IReporter
32from _core.plugin import Plugin
33from _core.constants import ModeType
34from _core.constants import TestType
35from _core.constants import FilePermission
36from _core.logger import platform_logger
37from _core.exception import ParamError
38from _core.utils import calculate_elapsed_time
39from _core.utils import copy_folder
40from _core.utils import get_filename_extension
41from _core.report.encrypt import check_pub_key_exist
42from _core.report.encrypt import do_rsa_encrypt
43from _core.report.encrypt import get_file_summary
44from _core.report.reporter_helper import CaseResult
45from _core.report.reporter_helper import DataHelper
46from _core.report.reporter_helper import ExecInfo
47from _core.report.reporter_helper import VisionHelper
48from _core.report.reporter_helper import ReportConstant
49from _core.report.repeater_helper import RepeatHelper
50from xdevice import Variables
51
52LOG = platform_logger("ResultReporter")
53
54
55class ResultSummary:
56
57    def __init__(self):
58        self.modules = 0
59        self.runmodules = 0
60        self.tests = 0
61        self.passed = 0
62        self.failed = 0
63        self.blocked = 0
64        self.ignored = 0
65        self.unavailable = 0
66
67    def get_data(self):
68        LOG.info(f"Summary result: modules: {self.modules}, run modules: {self.runmodules}, "
69                 f"total: {self.tests}, passed: {self.passed}, failed: {self.failed}, "
70                 f"blocked: {self.blocked}, ignored: {self.ignored}, unavailable: {self.unavailable}")
71        data = {
72            "modules": self.modules,
73            "runmodules": self.runmodules,
74            "tests": self.tests,
75            "passed": self.passed,
76            "failed": self.failed,
77            "blocked": self.blocked,
78            "ignored": self.ignored,
79            "unavailable": self.unavailable
80        }
81        return data
82
83
84@Plugin(type=Plugin.REPORTER, id=TestType.all)
85class ResultReporter(IReporter):
86    summary_report_result = []
87
88    def __init__(self):
89        self.report_path = None
90        self.task_info = None
91        self.summary_data_path = None
92        self.summary_data_str = ""
93        self.exec_info = None
94        self.parsed_data = None
95        self.data_helper = None
96        self.vision_helper = None
97        self.repeat_helper = None
98        self.summary = ResultSummary()
99
100        # task_record.info数据
101        self._failed_cases = []
102        self.record_params = {}
103        self.record_reports = {}
104
105    def __generate_reports__(self, report_path, **kwargs):
106        LOG.info("")
107        LOG.info("**************************************************")
108        LOG.info("************** Start generate reports ************")
109        LOG.info("**************************************************")
110        LOG.info("")
111
112        if self._check_params(report_path, **kwargs):
113            # generate data report
114            self._generate_data_report()
115
116            # generate vision reports
117            self._generate_vision_reports()
118
119            # generate task info record
120            self._generate_task_record()
121
122            # generate summary ini
123            self._generate_summary()
124
125            # copy reports to reports/latest folder
126            self._copy_report()
127
128            self._transact_all()
129
130        LOG.info("")
131        LOG.info("**************************************************")
132        LOG.info("************** Ended generate reports ************")
133        LOG.info("**************************************************")
134        LOG.info("")
135
136    def _check_params(self, report_path, **kwargs):
137        task_info = kwargs.get("task_info", "")
138        if not report_path:
139            LOG.error("Report path is wrong", error_no="00440",
140                      ReportPath=report_path)
141            return False
142        if not task_info or not isinstance(task_info, ExecInfo):
143            LOG.error("Task info is wrong", error_no="00441",
144                      TaskInfo=task_info)
145            return False
146
147        os.makedirs(report_path, exist_ok=True)
148        self.report_path = report_path
149        self.task_info = task_info
150        self.summary_data_path = os.path.join(
151            self.report_path, ReportConstant.summary_data_report)
152        self.exec_info = task_info
153        self.data_helper = DataHelper()
154        self.vision_helper = VisionHelper()
155        self.vision_helper.report_path = report_path
156        self.repeat_helper = RepeatHelper(report_path)
157        return True
158
159    def _generate_test_report(self):
160        report_template = os.path.join(Variables.res_dir, "template")
161        copy_folder(report_template, self.report_path)
162        content = json.dumps(self._get_summary_data())
163        data_js = os.path.join(self.report_path, "static", "data.js")
164        data_fd = os.open(data_js, os.O_CREAT | os.O_WRONLY, FilePermission.mode_644)
165        with os.fdopen(data_fd, mode="w", encoding="utf-8") as jsf:
166            jsf.write(f"window.reportData = {content}")
167        test_report = os.path.join(self.report_path, ReportConstant.summary_vision_report).replace("\\", "/")
168        LOG.info(f"Log path: {self.report_path}")
169        LOG.info(f"Generate test report: file:///{test_report}")
170        # 重新生成对象,避免在retry场景数据统计有误
171        self.summary = ResultSummary()
172
173    def _get_summary_data(self):
174        modules = []
175        for data_report, _ in self.data_reports:
176            if data_report.endswith(ReportConstant.summary_data_report):
177                continue
178            info = self._parse_module(data_report)
179            if info is not None:
180                modules.append(info)
181        if self.summary.failed != 0 or self.summary.blocked != 0 or self.summary.unavailable != 0:
182            from xdevice import Scheduler
183            Scheduler.is_need_auto_retry = True
184        data = {
185            "exec_info": self._get_exec_info(),
186            "summary": self.summary.get_data(),
187            "modules": modules,
188        }
189        return data
190
191    def _get_exec_info(self):
192        start_time = self.task_info.test_time
193        end_time = time.strftime(ReportConstant.time_format, time.localtime())
194        test_time = "%s/ %s" % (start_time, end_time)
195        execute_time = calculate_elapsed_time(
196            time.mktime(time.strptime(start_time, ReportConstant.time_format)),
197            time.mktime(time.strptime(end_time, ReportConstant.time_format)))
198        host_info = platform.platform()
199        device_name = getattr(self.task_info, ReportConstant.device_name, "None")
200        device_type = getattr(self.task_info, ReportConstant.device_label, "None")
201        platform_info = getattr(self.task_info, ReportConstant.platform, "None")
202        test_type = getattr(self.task_info, ReportConstant.test_type, "None")
203
204        # 为报告文件summary.ini提供数据
205        exec_info = ExecInfo()
206        exec_info.device_name = device_name
207        exec_info.device_label = device_type
208        exec_info.execute_time = execute_time
209        exec_info.host_info = host_info
210        exec_info.log_path = self.report_path
211        exec_info.platform = platform_info
212        exec_info.test_time = test_time
213        exec_info.test_type = test_type
214        self.exec_info = exec_info
215
216        info = {
217            "platform": platform_info,
218            "test_type": test_type,
219            "device_name": device_name,
220            "device_type": device_type,
221            "test_time": test_time,
222            "execute_time": execute_time,
223            "host_info": host_info
224        }
225        return info
226
227    def _parse_module(self, xml_file):
228        """解析测试模块"""
229        file_name = os.path.basename(xml_file)
230        try:
231            ele_module = ElementTree.parse(xml_file).getroot()
232        except ElementTree.ParseError:
233            LOG.error(f"parse module result error, result file {file_name}")
234            return None
235        module = ResultReporter._count_result(ele_module)
236        module_name = file_name[:-4] if module.name == "" else module.name
237        suites = [self._parse_testsuite(ele_suite) for ele_suite in ele_module]
238
239        # 为报告文件task_record.info提供数据
240        self.record_reports.update({module_name: xml_file})
241        if len(self._failed_cases) != 0:
242            self.record_params.update({module_name: copy.copy(self._failed_cases)})
243            self._failed_cases.clear()
244
245        self.summary.modules += 1
246        self.summary.tests += module.tests
247        self.summary.passed += module.passed
248        self.summary.failed += module.failed
249        self.summary.blocked += module.blocked
250        self.summary.ignored += module.ignored
251        if module.unavailable == 0:
252            self.summary.runmodules += 1
253        else:
254            self.summary.unavailable += 1
255
256        module_report, module_time = module.report, module.time
257        if len(suites) == 1 and suites[0].get(ReportConstant.name) == module_name:
258            report = suites[0].get(ReportConstant.report)
259            if report != "":
260                module_report = report
261            module_time = suites[0].get(ReportConstant.time)
262        info = {
263            "name": module_name,
264            "report": module_report,
265            "time": module_time,
266            "execute_time": calculate_elapsed_time(0, module_time),
267            "tests": module.tests,
268            "passed": module.passed,
269            "failed": module.failed,
270            "blocked": module.blocked,
271            "ignored": module.ignored,
272            "unavailable": module.unavailable,
273            "passingrate": "0%" if module.tests == 0 else "{:.0%}".format(module.passed / module.tests),
274            "productinfo": ResultReporter._parse_product_info(ele_module),
275            "suites": suites
276        }
277        return info
278
279    def _parse_testsuite(self, ele_suite):
280        """解析测试套"""
281        suite = ResultReporter._count_result(ele_suite)
282        cases = [self._parse_testcase(case) for case in ele_suite]
283        info = {
284            "name": suite.name,
285            "report": suite.report,
286            "time": suite.time,
287            "tests": suite.tests,
288            "passed": suite.passed,
289            "failed": suite.failed,
290            "blocked": suite.blocked,
291            "ignored": suite.ignored,
292            "passingrate": "0%" if suite.tests == 0 else "{:.0%}".format(suite.passed / suite.tests),
293            "cases": cases
294        }
295        return info
296
297    def _parse_testcase(self, ele_case):
298        """解析测试用例"""
299        name = ele_case.get(ReportConstant.name)
300        class_name = ele_case.get(ReportConstant.class_name, "")
301        result = ResultReporter._get_case_result(ele_case)
302        if result != CaseResult.passed:
303            self._failed_cases.append(f"{class_name}#{name}")
304        return [name, class_name, result, ResultReporter._parse_time(ele_case),
305                ele_case.get(ReportConstant.message, ""), ele_case.get(ReportConstant.report, "")]
306
307    @staticmethod
308    def _parse_time(ele):
309        try:
310            _time = float(ele.get(ReportConstant.time, "0"))
311        except ValueError:
312            _time = 0.0
313            LOG.error("parse test time error, set it to 0.0")
314        return _time
315
316    @staticmethod
317    def _parse_product_info(ele_module):
318        product_info = ele_module.get(ReportConstant.product_info, "")
319        if product_info == "":
320            return {}
321        try:
322            return literal_eval(product_info)
323        except SyntaxError:
324            return {}
325
326    @staticmethod
327    def _count_result(ele):
328        name = ele.get(ReportConstant.name, "")
329        report = ele.get(ReportConstant.report, "")
330        _time = ResultReporter._parse_time(ele)
331        tests = int(ele.get(ReportConstant.tests, "0"))
332        failed = int(ele.get(ReportConstant.failures, "0"))
333        disabled = ele.get(ReportConstant.disabled, "0")
334        if disabled == "":
335            disabled = "0"
336        errors = ele.get(ReportConstant.errors, "0")
337        if errors == "":
338            errors = "0"
339        blocked = int(disabled) + int(errors)
340        ignored = int(ele.get(ReportConstant.ignored, "0"))
341        unavailable = int(ele.get(ReportConstant.unavailable, "0"))
342
343        tmp_pass = tests - failed - blocked - ignored
344        passed = tmp_pass if tmp_pass > 0 else 0
345
346        Result = collections.namedtuple(
347            'Result',
348            ['name', 'report', 'time', 'tests', 'passed', 'failed', 'blocked', 'ignored', 'unavailable'])
349        return Result(name, report, _time, tests, passed, failed, blocked, ignored, unavailable)
350
351    def _get_case_result(ele_case):
352        result_kind = ele_case.get(ReportConstant.result_kind, "")
353        if result_kind != "":
354            return result_kind
355        result = ele_case.get(ReportConstant.result, "")
356        status = ele_case.get(ReportConstant.status, "")
357        if result == ReportConstant.false and (status == ReportConstant.run or status == ""):
358            return CaseResult.failed
359        if status in [ReportConstant.blocked, ReportConstant.disabled, ReportConstant.error]:
360            return CaseResult.blocked
361        if status in [ReportConstant.skip, ReportConstant.not_run]:
362            return CaseResult.ignored
363        if status in [ReportConstant.unavailable]:
364            return CaseResult.unavailable
365        return CaseResult.passed
366
367    def _generate_data_report(self):
368        # initial element
369        test_suites_element = self.data_helper.initial_suites_element()
370
371        # update test suites element
372        update_flag = self._update_test_suites(test_suites_element)
373        if not update_flag:
374            return
375
376        # generate report
377        if not self._check_mode(ModeType.decc):
378            self.data_helper.generate_report(test_suites_element,
379                                             self.summary_data_path)
380
381        # set SuiteReporter.suite_report_result
382        if not check_pub_key_exist() and not self._check_mode(
383                ModeType.decc):
384            return
385        self.set_summary_report_result(
386            self.summary_data_path, DataHelper.to_string(test_suites_element))
387
388    def _update_test_suites(self, test_suites_element):
389        # initial attributes for test suites element
390        test_suites_attributes, need_update_attributes = \
391            self._init_attributes()
392
393        # get test suite elements that are children of test suites element
394        modules = dict()
395        test_suite_elements = []
396        for data_report, module_name in self.data_reports:
397            if data_report.endswith(ReportConstant.summary_data_report):
398                continue
399            root = self.data_helper.parse_data_report(data_report)
400            if module_name == ReportConstant.empty_name:
401                module_name = self._get_module_name(data_report, root)
402            total = int(root.get(ReportConstant.tests, 0))
403            if module_name not in modules.keys():
404                modules[module_name] = list()
405            modules[module_name].append(total)
406
407            self._append_product_info(test_suites_attributes, root)
408            for child in root:
409                child.tail = self.data_helper.LINE_BREAK_INDENT
410                if not child.get(ReportConstant.module_name) or child.get(
411                        ReportConstant.module_name) == \
412                        ReportConstant.empty_name:
413                    child.set(ReportConstant.module_name, module_name)
414                self._check_tests_and_unavailable(child)
415                # covert the status of "notrun" to "ignored"
416                for element in child:
417                    if element.get(ReportConstant.status, "") == \
418                            ReportConstant.not_run:
419                        ignored = int(child.get(ReportConstant.ignored, 0)) + 1
420                        child.set(ReportConstant.ignored, "%s" % ignored)
421                test_suite_elements.append(child)
422                for update_attribute in need_update_attributes:
423                    update_value = child.get(update_attribute, 0)
424                    if not update_value:
425                        update_value = 0
426                    test_suites_attributes[update_attribute] += int(
427                        update_value)
428
429        if test_suite_elements:
430            child = test_suite_elements[-1]
431            child.tail = self.data_helper.LINE_BREAK
432        else:
433            LOG.error("Execute result not exists")
434            return False
435
436        # set test suites element attributes and children
437        self._handle_module_tests(modules, test_suites_attributes)
438        self.data_helper.set_element_attributes(test_suites_element,
439                                                test_suites_attributes)
440        test_suites_element.extend(test_suite_elements)
441        return True
442
443    @classmethod
444    def _check_tests_and_unavailable(cls, child):
445        total = child.get(ReportConstant.tests, "0")
446        unavailable = child.get(ReportConstant.unavailable, "0")
447        if total and total != "0" and unavailable and \
448                unavailable != "0":
449            child.set(ReportConstant.unavailable, "0")
450            LOG.warning("%s total: %s, unavailable: %s", child.get(
451                ReportConstant.name), total, unavailable)
452
453    @classmethod
454    def _append_product_info(cls, test_suites_attributes, root):
455        product_info = root.get(ReportConstant.product_info, "")
456        if not product_info:
457            return
458        try:
459            product_info = literal_eval(str(product_info))
460        except SyntaxError as error:
461            LOG.error("%s %s", root.get(ReportConstant.name, ""), error.args)
462            product_info = {}
463
464        if not test_suites_attributes[ReportConstant.product_info]:
465            test_suites_attributes[ReportConstant.product_info] = \
466                product_info
467            return
468        for key, value in product_info.items():
469            exist_value = test_suites_attributes[
470                ReportConstant.product_info].get(key, "")
471
472            if not exist_value:
473                test_suites_attributes[
474                    ReportConstant.product_info][key] = value
475                continue
476            if value in exist_value:
477                continue
478            test_suites_attributes[ReportConstant.product_info][key] = \
479                "%s,%s" % (exist_value, value)
480
481    @classmethod
482    def _get_module_name(cls, data_report, root):
483        # get module name from data report
484        module_name = get_filename_extension(data_report)[0]
485        if "report" in module_name or "summary" in module_name or \
486                "<" in data_report or ">" in data_report:
487            module_name = root.get(ReportConstant.name,
488                                   ReportConstant.empty_name)
489            if "report" in module_name or "summary" in module_name:
490                module_name = ReportConstant.empty_name
491        return module_name
492
493    def _init_attributes(self):
494        test_suites_attributes = {
495            ReportConstant.name:
496                ReportConstant.summary_data_report.split(".")[0],
497            ReportConstant.start_time: self.task_info.test_time,
498            ReportConstant.end_time: time.strftime(ReportConstant.time_format,
499                                                   time.localtime()),
500            ReportConstant.errors: 0, ReportConstant.disabled: 0,
501            ReportConstant.failures: 0, ReportConstant.tests: 0,
502            ReportConstant.ignored: 0, ReportConstant.unavailable: 0,
503            ReportConstant.product_info: self.task_info.product_info,
504            ReportConstant.modules: 0, ReportConstant.run_modules: 0}
505        need_update_attributes = [ReportConstant.tests, ReportConstant.ignored,
506                                  ReportConstant.failures,
507                                  ReportConstant.disabled,
508                                  ReportConstant.errors,
509                                  ReportConstant.unavailable]
510        return test_suites_attributes, need_update_attributes
511
512    def _generate_vision_reports(self):
513        if not self._check_mode(ModeType.decc) and not \
514                self.summary_data_report_exist:
515            LOG.error("Summary data report not exists")
516            return
517
518        if check_pub_key_exist() or self._check_mode(ModeType.decc):
519            if not self.summary_report_result_exists():
520                LOG.error("Summary data report not exists")
521                return
522            self.summary_data_str = \
523                self.get_result_of_summary_report()
524            if check_pub_key_exist():
525                from xdevice import SuiteReporter
526                SuiteReporter.clear_report_result()
527
528        # parse data
529        if self.summary_data_str:
530            # only in decc mode and pub key, self.summary_data_str is not empty
531            summary_element_tree = self.data_helper.parse_data_report(
532                self.summary_data_str)
533        else:
534            summary_element_tree = self.data_helper.parse_data_report(
535                self.summary_data_path)
536        parsed_data = self.vision_helper.parse_element_data(
537            summary_element_tree, self.report_path, self.task_info)
538        self.parsed_data = parsed_data
539        self.exec_info, summary, _ = parsed_data
540
541        if self._check_mode(ModeType.decc):
542            return
543
544        LOG.info("Summary result: modules: %s, run modules: %s, total: "
545                 "%s, passed: %s, failed: %s, blocked: %s, ignored: %s, "
546                 "unavailable: %s", summary.modules, summary.run_modules,
547                 summary.result.total, summary.result.passed,
548                 summary.result.failed, summary.result.blocked,
549                 summary.result.ignored, summary.result.unavailable)
550        LOG.info("Log path: %s", self.exec_info.log_path)
551
552        if summary.result.failed != 0 or summary.result.blocked != 0 or\
553                summary.result.unavailable != 0:
554            from xdevice import Scheduler
555            Scheduler.is_need_auto_retry = True
556
557        # generate summary vision report
558        report_generate_flag = self._generate_vision_report(
559            parsed_data, ReportConstant.summary_title,
560            ReportConstant.summary_vision_report)
561
562        # generate details vision report
563        if report_generate_flag and summary.result.total > 0:
564            self._generate_vision_report(
565                parsed_data, ReportConstant.details_title,
566                ReportConstant.details_vision_report)
567
568        # generate failures vision report
569        if summary.result.total != (
570                summary.result.passed + summary.result.ignored) or \
571                summary.result.unavailable > 0:
572            self._generate_vision_report(
573                parsed_data, ReportConstant.failures_title,
574                ReportConstant.failures_vision_report)
575
576        # generate passes vision report
577        if summary.result.passed != 0:
578            self._generate_vision_report(
579                parsed_data, ReportConstant.passes_title, ReportConstant.passes_vision_report)
580
581        # generate ignores vision report
582        if summary.result.ignored != 0:
583            self._generate_vision_report(
584                parsed_data, ReportConstant.ignores_title, ReportConstant.ignores_vision_report)
585
586    def _generate_vision_report(self, parsed_data, title, render_target):
587
588        # render data
589        report_context = self.vision_helper.render_data(
590            title, parsed_data, render_target=render_target)
591
592        # generate report
593        if report_context:
594            report_path = os.path.join(self.report_path, render_target)
595            self.vision_helper.generate_report(report_path, report_context)
596            return True
597        else:
598            LOG.error("Failed to generate %s", render_target)
599            return False
600
601    @property
602    def summary_data_report_exist(self):
603        return "<" in self.summary_data_str or \
604               os.path.exists(self.summary_data_path)
605
606    @property
607    def data_reports(self):
608        if check_pub_key_exist() or self._check_mode(ModeType.decc):
609            from xdevice import SuiteReporter
610            suite_reports = SuiteReporter.get_report_result()
611            if self._check_mode(ModeType.decc):
612                LOG.debug("Handle history result, data reports length:{}".
613                          format(len(suite_reports)))
614                SuiteReporter.clear_history_result()
615                SuiteReporter.append_history_result(suite_reports)
616            data_reports = []
617            for report_path, report_result in suite_reports:
618                module_name = get_filename_extension(report_path)[0]
619                data_reports.append((report_result, module_name))
620            SuiteReporter.clear_report_result()
621            return data_reports
622
623        if not os.path.isdir(self.report_path):
624            return []
625        data_reports = []
626        result_path = os.path.join(self.report_path, "result")
627        for root, _, files in os.walk(self.report_path):
628            for file_name in files:
629                if not file_name.endswith(self.data_helper.DATA_REPORT_SUFFIX):
630                    continue
631                module_name = self._find_module_name(result_path, root)
632                data_reports.append((os.path.join(root, file_name),
633                                     module_name))
634        return data_reports
635
636    @classmethod
637    def _find_module_name(cls, result_path, root):
638        # find module name from directory tree
639        common_path = os.path.commonpath([result_path, root])
640        if os.path.normcase(result_path) != os.path.normcase(common_path) or \
641                os.path.normcase(result_path) == os.path.normcase(root):
642            return ReportConstant.empty_name
643
644        root_dir, module_name = os.path.split(root)
645        if os.path.normcase(result_path) == os.path.normcase(root_dir):
646            return ReportConstant.empty_name
647        root_dir, subsystem_name = os.path.split(root_dir)
648        while os.path.normcase(result_path) != os.path.normcase(root_dir):
649            module_name = subsystem_name
650            root_dir, subsystem_name = os.path.split(root_dir)
651        return module_name
652
653    def _generate_summary(self):
654        if not self.summary_data_report_exist or \
655                self._check_mode(ModeType.decc):
656            return
657        summary_ini_content = \
658            "[default]\n" \
659            "Platform={}\n" \
660            "Test Type={}\n" \
661            "Device Name={}\n" \
662            "Host Info={}\n" \
663            "Test Start/ End Time={}\n" \
664            "Execution Time={}\n" \
665            "Device Type={}\n".format(
666                self.exec_info.platform, self.exec_info.test_type,
667                self.exec_info.device_name, self.exec_info.host_info,
668                self.exec_info.test_time, self.exec_info.execute_time,
669                self.exec_info.device_label)
670
671        if self.exec_info.product_info:
672            for key, value in self.exec_info.product_info.items():
673                summary_ini_content = "{}{}".format(
674                    summary_ini_content, "%s=%s\n" % (key, value))
675
676        if not self._check_mode(ModeType.factory):
677            summary_ini_content = "{}{}".format(
678                summary_ini_content, "Log Path=%s\n" % self.exec_info.log_path)
679
680        # write summary_ini_content
681        summary_filepath = os.path.join(self.report_path,
682                                        ReportConstant.summary_ini)
683
684        if platform.system() == "Windows":
685            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY
686        else:
687            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
688        summary_filepath_open = os.open(summary_filepath, flags,
689                                        FilePermission.mode_755)
690
691        with os.fdopen(summary_filepath_open, "wb") as file_handler:
692            if check_pub_key_exist():
693                try:
694                    cipher_text = do_rsa_encrypt(summary_ini_content)
695                except ParamError as error:
696                    LOG.error(error, error_no=error.error_no)
697                    cipher_text = b""
698                file_handler.write(cipher_text)
699            else:
700                file_handler.write(bytes(summary_ini_content, 'utf-8'))
701            file_handler.flush()
702            LOG.info("Generate summary ini: %s", summary_filepath)
703        self.repeat_helper.__generate_repeat_xml__(self.summary_data_path)
704
705    def _copy_report(self):
706        from xdevice import Scheduler
707        if Scheduler.upload_address or self._check_mode(ModeType.decc):
708            return
709
710        dst_path = os.path.join(Variables.temp_dir, "latest")
711        try:
712            shutil.rmtree(dst_path, ignore_errors=True)
713            os.makedirs(dst_path, exist_ok=True)
714            LOG.info("Copy summary files to %s", dst_path)
715
716            # copy reports to reports/latest folder
717            for report_file in os.listdir(self.report_path):
718                src_file = os.path.join(self.report_path, report_file)
719                dst_file = os.path.join(dst_path, report_file)
720                if os.path.isfile(src_file):
721                    shutil.copyfile(src_file, dst_file)
722        except OSError as _:
723            return
724
725    def _compress_report_folder(self):
726        if self._check_mode(ModeType.decc) or \
727                self._check_mode(ModeType.factory):
728            return None
729
730        if not os.path.isdir(self.report_path):
731            LOG.error("'%s' is not folder!" % self.report_path)
732            return None
733
734        # get file path list
735        file_path_list = []
736        for dir_path, _, file_names in os.walk(self.report_path):
737            f_path = dir_path.replace(self.report_path, '')
738            f_path = f_path and f_path + os.sep or ''
739            for filename in file_names:
740                file_path_list.append(
741                    (os.path.join(dir_path, filename), f_path + filename))
742
743        # compress file
744        zipped_file = "%s.zip" % os.path.join(
745            self.report_path, os.path.basename(self.report_path))
746        zip_object = zipfile.ZipFile(zipped_file, 'w', zipfile.ZIP_DEFLATED,
747                                     allowZip64=True)
748        try:
749            LOG.info("Executing compress process, please wait...")
750            long_size_file = []
751            for src_path, target_path in file_path_list:
752                long_size_file.append((src_path, target_path))
753            self._write_long_size_file(zip_object, long_size_file)
754
755            LOG.info("Generate zip file: %s", zipped_file)
756        except zipfile.BadZipFile as bad_error:
757            LOG.error("Zip report folder error: %s" % bad_error.args)
758        finally:
759            zip_object.close()
760
761        # generate hex digest, then save it to summary_report.hash
762        hash_file = os.path.abspath(os.path.join(
763            self.report_path, ReportConstant.summary_report_hash))
764        hash_file_open = os.open(hash_file, os.O_WRONLY | os.O_CREAT |
765                                 os.O_APPEND, FilePermission.mode_755)
766        with os.fdopen(hash_file_open, "w") as hash_file_handler:
767            hash_file_handler.write(get_file_summary(zipped_file))
768            LOG.info("Generate hash file: %s", hash_file)
769            hash_file_handler.flush()
770        return zipped_file
771
772    @classmethod
773    def _check_mode(cls, mode):
774        from xdevice import Scheduler
775        return Scheduler.mode == mode
776
777    def _generate_task_record(self):
778        # under encryption status, don't handle anything directly
779        if check_pub_key_exist() and not self._check_mode(ModeType.decc):
780            return
781
782        # get info from command_queue
783        from xdevice import Scheduler
784        if not Scheduler.command_queue:
785            return
786        _, command, report_path = Scheduler.command_queue[-1]
787
788        record_info = self._parse_record_from_data(command, report_path)
789
790        def encode(content):
791            # inner function to encode
792            return ' '.join([bin(ord(c)).replace('0b', '') for c in content])
793
794        # write into file
795        record_file = os.path.join(self.report_path,
796                                   ReportConstant.task_info_record)
797        _record_json = json.dumps(record_info, indent=2)
798
799        with open(file=record_file, mode="wb") as file:
800            if Scheduler.mode == ModeType.decc:
801                # under decc, write in encoded text
802                file.write(bytes(encode(_record_json), encoding="utf-8"))
803            else:
804                # others, write in plain text
805                file.write(bytes(_record_json, encoding="utf-8"))
806
807        LOG.info("Generate record file: %s", record_file)
808
809    def _parse_record_from_data(self, command, report_path):
810        record = dict()
811        if self.parsed_data:
812            _, _, suites = self.parsed_data
813            unsuccessful = dict()
814            module_set = set()
815            for suite in suites:
816                module_set.add(suite.module_name)
817
818                failed = unsuccessful.get(suite.module_name, [])
819                # because suite not contains case's some attribute,
820                # for example, 'module', 'classname', 'name' . so
821                # if unavailable, only add module's name into list.
822                if int(suite.result.unavailable) > 0:
823                    failed.append(suite.module_name)
824                else:
825                    # others, get key attributes join string
826                    for case in suite.get_cases():
827                        if not case.is_passed():
828                            failed.append(
829                                "{}#{}".format(case.classname, case.name))
830                unsuccessful.update({suite.module_name: failed})
831            data_reports = self._get_data_reports(module_set)
832            record = {"command": command,
833                      "session_id": os.path.split(report_path)[-1],
834                      "report_path": report_path,
835                      "unsuccessful_params": unsuccessful,
836                      "data_reports": data_reports
837                      }
838        return record
839
840    def _get_data_reports(self, module_set):
841        data_reports = dict()
842        if self._check_mode(ModeType.decc):
843            from xdevice import SuiteReporter
844            for module_name, report_path, _ in \
845                    SuiteReporter.get_history_result_list():
846                if module_name in module_set:
847                    data_reports.update({module_name: report_path})
848        else:
849            for report_path, module_name in self.data_reports:
850                if module_name == ReportConstant.empty_name:
851                    root = self.data_helper.parse_data_report(report_path)
852                    module_name = self._get_module_name(report_path, root)
853                if module_name in module_set:
854                    data_reports.update({module_name: report_path})
855
856        return data_reports
857
858    @classmethod
859    def get_task_info_params(cls, history_path):
860        # under encryption status, don't handle anything directly
861        if check_pub_key_exist() and not cls._check_mode(ModeType.decc):
862            return ()
863
864        def decode(content):
865            result_list = []
866            for b in content.split(' '):
867                result_list.append(chr(int(b, 2)))
868            return ''.join(result_list)
869
870        record_path = os.path.join(history_path,
871                                   ReportConstant.task_info_record)
872        if not os.path.exists(record_path):
873            LOG.error("%s not exists!", ReportConstant.task_info_record)
874            return ()
875
876        from xdevice import Scheduler
877        with open(record_path, mode="rb") as file:
878            if Scheduler.mode == ModeType.decc:
879                # under decc, read from encoded text
880                result = json.loads(decode(file.read().decode("utf-8")))
881            else:
882                # others, read from plain text
883                result = json.loads(file.read())
884        standard_length = 5
885        if not len(result.keys()) == standard_length:
886            LOG.error("%s error!", ReportConstant.task_info_record)
887            return ()
888
889        return result
890
891    @classmethod
892    def set_summary_report_result(cls, summary_data_path, result_xml):
893        cls.summary_report_result.clear()
894        cls.summary_report_result.append((summary_data_path, result_xml))
895
896    @classmethod
897    def get_result_of_summary_report(cls):
898        if cls.summary_report_result:
899            return cls.summary_report_result[0][1]
900        return None
901
902    @classmethod
903    def summary_report_result_exists(cls):
904        return True if cls.summary_report_result else False
905
906    @classmethod
907    def get_path_of_summary_report(cls):
908        if cls.summary_report_result:
909            return cls.summary_report_result[0][0]
910        return None
911
912    @classmethod
913    def _write_long_size_file(cls, zip_object, long_size_file):
914        for filename, arcname in long_size_file:
915            zip_info = zipfile.ZipInfo.from_file(filename, arcname)
916            zip_info.compress_type = getattr(zip_object, "compression",
917                                             zipfile.ZIP_DEFLATED)
918            if hasattr(zip_info, "_compresslevel"):
919                _compress_level = getattr(zip_object, "compresslevel", None)
920                setattr(zip_info, "_compresslevel", _compress_level)
921            with open(filename, "rb") as src, \
922                    zip_object.open(zip_info, "w") as des:
923                shutil.copyfileobj(src, des, 1024 * 1024 * 8)
924
925    def _transact_all(self):
926        pyc_path = os.path.join(Variables.res_dir, "tools", "binder.pyc")
927        if not os.path.exists(pyc_path):
928            return
929        module_spec = util.spec_from_file_location("binder", pyc_path)
930        if not module_spec:
931            return
932        module = util.module_from_spec(module_spec)
933        module_spec.loader.exec_module(module)
934        if hasattr(module, "transact") and callable(module.transact):
935            module.transact(self, LOG)
936        del module
937
938    @classmethod
939    def _handle_module_tests(cls, modules, test_suites_attributes):
940        modules_list = list()
941        modules_zero = list()
942        for module_name, detail_list in modules.items():
943            for total in detail_list:
944                modules_list.append(total)
945                if total == 0:
946                    modules_zero.append(module_name)
947        test_suites_attributes[ReportConstant.run_modules] = \
948            len(modules_list) - len(modules_zero)
949        test_suites_attributes[ReportConstant.modules] = len(modules_list)
950        if modules_zero:
951            LOG.info("The total tests of %s module is 0", ",".join(
952                modules_zero))