• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import time
21from xml.etree import ElementTree
22
23from _core.constants import CaseResult
24from _core.constants import HostDrivenTestType
25from _core.constants import ModeType
26from _core.constants import DeviceTestType
27from _core.context.center import Context
28from _core.context.handler import get_case_result
29from _core.logger import platform_logger
30from _core.report.reporter_helper import DataHelper
31from _core.report.reporter_helper import ReportConstant
32from _core.utils import check_mode
33from _core.utils import get_filename_extension
34from _core.utils import parse_xml_cdata
35
36LOG = platform_logger("Upload")
37
38MAX_VISIBLE_LENGTH = 256
39
40__all__ = ["Uploader"]
41
42
43class Uploader:
44
45    @classmethod
46    def is_enable(cls):
47        if Context.session().upload_address:
48            return True
49        return False
50
51    @classmethod
52    def get_session(cls):
53        return Context.session()
54
55    @classmethod
56    def upload_task_result(cls, task, error_message=""):
57        if not Uploader.is_enable():
58            return
59        task_name = cls.get_session().task_name
60        if not task_name:
61            LOG.info("No need upload summary report")
62            return
63
64        summary_data_report = os.path.join(task.config.report_path,
65                                           ReportConstant.summary_data_report)
66        if not os.path.exists(summary_data_report):
67            Uploader.upload_unavailable_result(str(error_message) or "summary report not exists",
68                                               task.config.report_path)
69            return
70
71        task_element = ElementTree.parse(summary_data_report).getroot()
72        start_time, end_time = Uploader._get_time(task_element)
73        task_result = Uploader._get_task_result(task_element)
74        error_msg = ""
75        for child in task_element:
76            if child.get(ReportConstant.message, ""):
77                error_msg = "{}{}".format(
78                    error_msg, "%s;" % child.get(ReportConstant.message))
79        if error_msg:
80            error_msg = error_msg[:-1]
81        report = Uploader._get_report_path(
82            task.config.report_path, ReportConstant.summary_vision_report)
83        Uploader.upload_case_result(
84            (task_name, task_result, error_msg, start_time, end_time, report))
85
86    @classmethod
87    def upload_module_result(cls, exec_message):
88        proxy = cls.get_session().proxy
89        if proxy is None:
90            return
91        result_file = exec_message.get_result()
92        request = exec_message.get_request()
93        test_name = request.root.source.test_name
94        if not result_file or not os.path.exists(result_file):
95            LOG.error("%s result not exists", test_name, error_no="00200")
96            return
97
98        if check_mode(ModeType.controller):
99            cls.get_session().task_name = test_name
100
101        test_type = request.root.source.test_type
102        LOG.info("Need upload result: %s, test type: %s" %
103                 (result_file, test_type))
104        upload_params = cls._get_upload_params(result_file, request)
105        if not upload_params:
106            LOG.error("%s no test case result to upload" % result_file,
107                      error_no="00201")
108            return
109        LOG.info("Need upload %s case" % len(upload_params))
110        upload_suite = []
111        for upload_param in upload_params:
112            case_id, result, error, start, end, report_path, result_content = upload_param
113            case = {"caseid": case_id, "result": result, "error": error,
114                    "start": start, "end": end, "report": report_path,
115                    "result_content": parse_xml_cdata(result_content)}
116            LOG.info("Case info: %s", case)
117            upload_suite.append(case)
118
119        if check_mode(ModeType.controller):
120            case = {"caseid": test_name, "result": "Finish", "error": "",
121                    "start": "", "end": "",
122                    "report": ""}
123            upload_suite.append(case)
124        proxy.upload_batch(upload_suite)
125
126        if check_mode(ModeType.controller):
127            cls.get_session().task_name = ""
128
129    @classmethod
130    def upload_unavailable_result(cls, error_msg, case_id="", report_path=""):
131        current_time = int(time.time() * 1000)
132        if case_id == "":
133            case_id = cls.get_session().task_name
134        Uploader.upload_case_result(
135            (case_id, CaseResult.unavailable, error_msg, current_time, current_time, report_path))
136
137    @classmethod
138    def upload_case_result(cls, upload_param):
139        proxy = cls.get_session().proxy
140        if not Uploader.is_enable() or proxy is None:
141            return
142        LOG.debug("Upload case result")
143        case_id, result, error, start_time, end_time, report_path = \
144            upload_param
145        if error and len(error) > MAX_VISIBLE_LENGTH:
146            error = "%s..." % error[:MAX_VISIBLE_LENGTH]
147        LOG.info(
148            "Get upload params: %s, %s, %s, %s, %s, %s" % (
149                case_id, result, error, start_time, end_time, report_path))
150        proxy.upload_result(case_id, result, error, start_time, end_time, report_path)
151
152    @classmethod
153    def upload_report_end(cls):
154        proxy = cls.get_session().proxy
155        if proxy is None:
156            return
157        LOG.info("Upload report end")
158        proxy.report_end()
159
160    @classmethod
161    def _get_time(cls, testsuite_element):
162        start_time = testsuite_element.get(ReportConstant.start_time, "")
163        end_time = testsuite_element.get(ReportConstant.end_time, "")
164        try:
165            if start_time and end_time:
166                start_time = int(time.mktime(time.strptime(
167                    start_time, ReportConstant.time_format)) * 1000)
168                end_time = int(time.mktime(time.strptime(
169                    end_time, ReportConstant.time_format)) * 1000)
170            else:
171                timestamp = str(testsuite_element.get(
172                    ReportConstant.time_stamp, "")).replace("T", " ")
173                cost_time = testsuite_element.get(ReportConstant.time, "")
174                if timestamp and cost_time:
175                    try:
176                        end_time = int(time.mktime(time.strptime(
177                            timestamp, ReportConstant.time_format)) * 1000)
178                    except ArithmeticError as error:
179                        LOG.error("Get time error {}".format(error))
180                        end_time = int(time.time() * 1000)
181                    except ValueError as error:
182                        LOG.error("Get time error {}".format(error))
183                        end_time = int(time.mktime(time.strptime(
184                            timestamp.split(".")[0], ReportConstant.time_format)) * 1000)
185                    start_time = int(end_time - float(cost_time) * 1000)
186                else:
187                    current_time = int(time.time() * 1000)
188                    start_time, end_time = current_time, current_time
189        except ArithmeticError as error:
190            LOG.error("Get time error {}".format(error))
191            current_time = int(time.time() * 1000)
192            start_time, end_time = current_time, current_time
193        return start_time, end_time
194
195    @classmethod
196    def _get_task_result(cls, task_element):
197        failures = int(task_element.get(ReportConstant.failures, 0))
198        errors = int(task_element.get(ReportConstant.errors, 0))
199        disabled = int(task_element.get(ReportConstant.disabled, 0))
200        unavailable = int(task_element.get(ReportConstant.unavailable, 0))
201        if disabled > 0:
202            task_result = "Blocked"
203        elif errors > 0 or failures > 0:
204            task_result = "Failed"
205        elif unavailable > 0:
206            task_result = "Unavailable"
207        else:
208            task_result = "Passed"
209        return task_result
210
211    @classmethod
212    def _get_report_path(cls, base_path, report=""):
213        """拼接报告路径
214        base_path: str, 报告基础路径
215        report   : str, 报告相对路径
216        """
217        report_path = os.path.join(base_path, report)
218        return report_path if report and os.path.exists(report_path) else base_path
219
220    @classmethod
221    def _get_upload_params(cls, result_file, request):
222        upload_params = []
223        report_path = result_file
224        testsuites_element = DataHelper.parse_data_report(report_path)
225        start_time, end_time = Uploader._get_time(testsuites_element)
226        test_type = request.get_test_type()
227        test_type_list = [HostDrivenTestType.device_test, HostDrivenTestType.windows_test,
228                          DeviceTestType.stability_test, DeviceTestType.ux_test, DeviceTestType.ark_web_test]
229        if test_type in test_type_list:
230            for ele_testsuite in testsuites_element:
231                if len(ele_testsuite) == 0:
232                    LOG.error(f"No testcase in result file: {result_file}")
233                    ele_testcase = ele_testsuite
234                    case_result, error = CaseResult.blocked, ele_testsuite.get(ReportConstant.message, "")
235                else:
236                    ele_testcase = ele_testsuite[0]
237                    case_result, error = get_case_result(ele_testcase)
238                case_id = ele_testcase.get(ReportConstant.name, "")
239                if error and len(error) > MAX_VISIBLE_LENGTH:
240                    error = "{}...".format(error[:MAX_VISIBLE_LENGTH])
241                report = Uploader._get_report_path(
242                    request.config.report_path, ele_testcase.get(ReportConstant.report, ""))
243                result_content = ele_testcase.get(ReportConstant.result_content)
244                upload_params.append(
245                    (case_id, case_result, error, start_time, end_time, report, result_content,))
246        else:
247            for testsuite_element in testsuites_element:
248                if check_mode(ModeType.developer):
249                    module_name = str(get_filename_extension(
250                        report_path)[0]).split(".")[0]
251                else:
252                    module_name = testsuite_element.get(ReportConstant.name,
253                                                        "none")
254                for ele_testcase in testsuite_element:
255                    case_id = Uploader._get_case_id(ele_testcase, module_name)
256                    case_result, error = get_case_result(ele_testcase)
257                    if case_result == "Ignored":
258                        LOG.info(
259                            "Get upload params: {} result is ignored".format(case_id))
260                        continue
261                    error = ele_testcase.get(ReportConstant.message, "")
262                    if error and len(error) > MAX_VISIBLE_LENGTH:
263                        error = "{}...".format(error[:MAX_VISIBLE_LENGTH])
264                    report = Uploader._get_report_path(
265                        request.config.report_path,
266                        ele_testcase.get(ReportConstant.report, ""))
267                    result_content = ele_testcase.get(ReportConstant.result_content)
268                    upload_params.append(
269                        (case_id, case_result, error, start_time, end_time, report, result_content,))
270        return upload_params
271
272    @classmethod
273    def _get_case_id(cls, case_element, package_name):
274        class_name = case_element.get(ReportConstant.class_name, "none")
275        method_name = case_element.get(ReportConstant.name, "none")
276        case_id = "{}#{}#{}#{}".format(cls.get_session().task_name, package_name,
277                                       class_name, method_name)
278        return case_id
279