• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import sys
21
22from xdevice import TestExecType
23from xdevice import DeviceError
24from xdevice import ParamError
25from xdevice import ReportException
26from xdevice import ExecuteTerminate
27from xdevice import IDriver
28from xdevice import platform_logger
29from xdevice import Plugin
30from xdevice import JsonParser
31from xdevice import get_config_value
32from xdevice import do_module_kit_setup
33from xdevice import do_module_kit_teardown
34from xdevice import get_filename_extension
35from xdevice import get_file_absolute_path
36from xdevice import get_kit_instances
37from xdevice import check_result_report
38from xdevice import Scheduler
39from xdevice import LifeStage
40from xdevice import HostDrivenTestType
41
42from devicetest.core.constants import DeviceTestMode
43
44__all__ = ["DeviceTestDriver", "DeviceTestSuiteDriver"]
45LOG = platform_logger("DeviceTest")
46PY_SUFFIX = ".py"
47PYD_SUFFIX = ".pyd"
48PYC_SUFFIX = ".pyc"
49
50
51def _get_dict_test_list(module_path, file_name):
52    test_list = []
53    for root, _, files in os.walk(module_path):
54        for _file in files:
55            if (_file.endswith(".py") or _file.endswith(".pyd")) \
56                    and file_name == os.path.splitext(_file)[0]:
57                test_list.append(os.path.join(root, _file))
58    return test_list
59
60
61@Plugin(type=Plugin.DRIVER, id=HostDrivenTestType.device_test)
62class DeviceTestDriver(IDriver):
63    """
64    DeviceTest is a Test that runs a host-driven test on given devices.
65    """
66    # test driver config
67    config = None
68    result = ""
69    error_message = ""
70    py_file = ""
71
72    def __init__(self):
73        pass
74
75    def __check_environment__(self, device_options):
76        pass
77
78    def __check_config__(self, config=None):
79        pass
80
81    def __execute__(self, request):
82        try:
83            # delete sys devicetest mode
84            if hasattr(sys, DeviceTestMode.MODE):
85                delattr(sys, DeviceTestMode.MODE)
86
87            # set self.config
88            self.config = request.config
89            self.config.devices = request.get_devices()
90            if request.get("exectype") == TestExecType.device_test \
91                    and not self.config.devices:
92                LOG.error("No device", error_no="00104")
93                raise ParamError("Load Error[00104]", error_no="00104")
94
95            # get source, json config and kits
96            if request.get_config_file():
97                source = request.get_config_file()
98                LOG.debug("Test config file path: %s" % source)
99            else:
100                source = request.get_source_string()
101                LOG.debug("Test String: %s" % source)
102
103            if not source:
104                LOG.error("No config file found for '%s'" %
105                          request.get_source_file(), error_no="00102")
106                raise ParamError("Load Error(00102)", error_no="00102")
107
108            json_config = JsonParser(source)
109            kits = get_kit_instances(json_config, request.config.resource_path,
110                                     request.config.testcases_path)
111            do_module_kit_setup(request, kits)
112
113            test_name = request.get_module_name()
114            self.result = os.path.join(request.config.report_path, "result", "%s.xml" % test_name)
115
116            # set configs keys
117            configs = self._set_configs(json_config, kits, request)
118
119            # handle test args
120            self._handle_test_args(request=request)
121
122            # get test list
123            test_list = self._get_test_list(json_config, request, source)
124            if not test_list:
125                raise ParamError("no test list to run")
126            self._run_devicetest(configs, test_list)
127        except (ReportException, ModuleNotFoundError, ExecuteTerminate,
128                SyntaxError, ValueError, AttributeError, TypeError,
129                KeyboardInterrupt, ParamError, DeviceError) as exception:
130            error_no = getattr(exception, "error_no", "00000")
131            LOG.exception(exception, exc_info=False, error_no=error_no)
132            self.error_message = exception
133            Scheduler.call_life_stage_action(
134                stage=LifeStage.case_end,
135                case_name=request.get_module_name(),
136                case_result="Failed",
137                error_msg=str(self.error_message))
138        finally:
139            self._handle_finally(request)
140
141    def _get_test_list(self, json_config, request, source):
142        test_list = get_config_value('py_file', json_config.get_driver(),
143                                     is_list=True)
144        if str(request.root.source.source_file).endswith(PYD_SUFFIX) or \
145                str(request.root.source.source_file).endswith(PY_SUFFIX):
146            test_list = [request.root.source.source_file]
147
148        if not test_list and os.path.exists(source):
149            dir_name, file_name = os.path.split(source)
150            file_name, _ = os.path.splitext(file_name)
151            test_list = _get_dict_test_list(os.path.dirname(source), file_name)
152
153        # check test list
154        testcase = request.get("testcase")
155        testcase_list = []
156        if testcase:
157            testcase_list = str(testcase).split(";")
158
159        checked_test_list = []
160        for _, test in enumerate(test_list):
161            if not os.path.exists(test):
162                try:
163                    absolute_file = get_file_absolute_path(test, [
164                        self.config.resource_path, self.config.testcases_path])
165                except ParamError as error:
166                    LOG.error(error, error_no=error.error_no)
167                    continue
168            else:
169                absolute_file = test
170
171            file_name = get_filename_extension(absolute_file)[0]
172            if not testcase_list or file_name in testcase_list:
173                checked_test_list.append(absolute_file)
174            else:
175                LOG.info("Test '%s' is ignored", absolute_file)
176        if checked_test_list:
177            LOG.info("Test list: {}".format(checked_test_list))
178        else:
179            LOG.error("No test list found", error_no="00109")
180            raise ParamError("Load Error(00109), No test list found", error_no="00109")
181        if len(checked_test_list) > 1:
182            LOG.error("{}.json field [py_file] only support one py file, now is {}"
183                      .format(request.get_module_name(), test_list), error_no="00110")
184            raise ParamError("Load Error(00110), too many testcase in field [py_file].", error_no="00110")
185        return checked_test_list
186
187    def _set_configs(self, json_config, kits, request):
188        configs = dict()
189        configs["testargs"] = self.config.testargs or {}
190        configs["testcases_path"] = self.config.testcases_path or ""
191        configs["request"] = request
192        configs["test_name"] = request.get_module_name()
193        configs["report_path"] = request.config.report_path
194        configs["execute"] = get_config_value(
195            'execute', json_config.get_driver(), False)
196        return configs
197
198    def _handle_finally(self, request):
199        # do kit teardown
200        do_module_kit_teardown(request)
201
202        # check result report
203        report_name = request.root.source.test_name if \
204            not request.root.source.test_name.startswith("{") \
205            else "report"
206        module_name = request.get_module_name()
207        self.result = check_result_report(
208            request.config.report_path, self.result, self.error_message,
209            report_name, module_name)
210
211    def _run_devicetest(self, configs, test_list):
212        from xdevice import Variables
213
214        # insert paths for loading _devicetest module and testcases
215        devicetest_module = os.path.join(Variables.modules_dir, "_devicetest")
216        if os.path.exists(devicetest_module):
217            sys.path.insert(1, devicetest_module)
218        if configs["testcases_path"]:
219            sys.path.insert(1, configs["testcases_path"])
220            sys.path.insert(1, os.path.dirname(configs["testcases_path"]))
221
222        # apply data to devicetest module about resource path
223        request = configs.get('request', None)
224        if request:
225            sys.ecotest_resource_path = request.config.resource_path
226
227        # run devicetest
228        from devicetest.main import DeviceTest
229        device_test = DeviceTest(test_list, configs, self.config.devices, LOG, self.result)
230        device_test.run()
231
232    def __result__(self):
233        return self.result if os.path.exists(self.result) else ""
234
235    def _handle_test_args(self, request):
236        for device in self.config.devices:
237            # 恢复步骤截图的默认设置
238            setattr(device, "screenshot", False)
239            setattr(device, "screenshot_fail", True)
240            setattr(device, "screenrecorder", False)
241
242
243def _get_dict_testsuite(testsuite, config):
244    post_suffix = [PY_SUFFIX, PYD_SUFFIX, PYC_SUFFIX]
245    for suffix in post_suffix:
246        testsuite_file = "{}{}".format(testsuite, suffix)
247        if not os.path.exists(testsuite_file):
248            try:
249                absolute_file = get_file_absolute_path(testsuite_file, [
250                    config.resource_path, config.testcases_path])
251                return absolute_file
252            except ParamError as error:
253                LOG.error(error, error_no=error.error_no)
254                continue
255        else:
256            return testsuite_file
257    return None
258
259
260@Plugin(type=Plugin.DRIVER, id=HostDrivenTestType.device_testsuite)
261class DeviceTestSuiteDriver(IDriver):
262    """
263    DeviceTestSuiteDriver is a Test that runs a host-driven test on given devices.
264    """
265    # test driver config
266    config = None
267    result = ""
268    error_message = ""
269    py_file = ""
270
271    def __init__(self):
272        pass
273
274    def __check_environment__(self, device_options):
275        pass
276
277    def __check_config__(self, config=None):
278        pass
279
280    def __execute__(self, request):
281        try:
282            # delete sys devicetest mode
283            if hasattr(sys, DeviceTestMode.MODE):
284                delattr(sys, DeviceTestMode.MODE)
285
286            # set self.config
287            self.config = request.config
288            self.config.devices = request.get_devices()
289            if request.get("exectype") == TestExecType.device_test \
290                    and not self.config.devices:
291                raise ParamError("Load Error[00104]", error_no="00104")
292
293            # get source, json config and kits
294            if request.get_config_file():
295                source = request.get_config_file()
296                LOG.debug("Test config file path: %s" % source)
297            else:
298                source = request.get_source_string()
299                LOG.debug("Test String: %s" % source)
300
301            if not source:
302                LOG.error("No config file found for '%s'" %
303                          request.get_source_file(), error_no="00102")
304                raise ParamError("Load Error(00102)", error_no="00102")
305
306            json_config = JsonParser(source)
307            kits = get_kit_instances(json_config, request.config.resource_path,
308                                     request.config.testcases_path)
309            do_module_kit_setup(request, kits)
310
311            test_name = request.get_module_name()
312            self.result = os.path.join(request.config.report_path, "result", "%s.xml" % test_name)
313
314            # set configs keys
315            configs = self._set_configs(json_config, kits, request)
316
317            # handle test args
318            self._handle_test_args(request=request)
319
320            # get test list
321            test_list = self._get_test_list(json_config, request, source)
322            if not test_list:
323                raise ParamError("no test list to run")
324            self._run_testsuites(configs, test_list)
325        except (ReportException, ModuleNotFoundError, ExecuteTerminate,
326                SyntaxError, ValueError, AttributeError, TypeError,
327                KeyboardInterrupt, ParamError, DeviceError) as exception:
328            error_no = getattr(exception, "error_no", "00000")
329            LOG.exception(exception, exc_info=False, error_no=error_no)
330            self.error_message = exception
331            Scheduler.call_life_stage_action(
332                stage=LifeStage.case_end,
333                case_name=request.get_module_name(),
334                case_result="Failed",
335                error_msg=str(self.error_message))
336        finally:
337            self._handle_finally(request)
338
339    def _get_test_list(self, json_config, request, source):
340        testsuite = get_config_value('testsuite', json_config.get_driver(),
341                                     is_list=False)
342
343        if not testsuite and os.path.exists(source):
344            dir_name, file_name = os.path.split(source)
345            file_name, _ = os.path.splitext(file_name)
346            temp_testsuite = _get_dict_test_list(os.path.dirname(source), file_name)
347            if temp_testsuite:
348                testsuite = temp_testsuite[0]
349
350        if not testsuite:
351            LOG.error("Json not set testsuite or can't found py file.")
352            raise ParamError("Load Error(00109), Json not set testsuite or can't found py file.", error_no="00109")
353
354        checked_testsuite = None
355        if testsuite.endswith(PY_SUFFIX) or \
356                testsuite.endswith(PYD_SUFFIX) or \
357                testsuite.endswith(PYC_SUFFIX):
358            if not os.path.exists(testsuite):
359                try:
360                    checked_testsuite = get_file_absolute_path(testsuite, [
361                        self.config.resource_path, self.config.testcases_path])
362                except ParamError as error:
363                    LOG.debug(error, error_no=error.error_no)
364            else:
365                checked_testsuite = testsuite
366        else:
367            checked_testsuite = _get_dict_testsuite(testsuite, self.config)
368
369        if checked_testsuite:
370            LOG.info("Test suite list: {}".format(checked_testsuite))
371        else:
372            LOG.error("No test suite list found", error_no="00109")
373            raise ParamError("Load Error(00109), No test list found", error_no="00109")
374        return checked_testsuite
375
376    def _set_configs(self, json_config, kits, request):
377        configs = dict()
378        configs["testargs"] = self.config.testargs or {}
379        configs["testcases_path"] = self.config.testcases_path or ""
380        configs["resource_path"] = self.config.resource_path or ""
381        configs["request"] = request
382        configs["device_log"] = request.config.device_log
383        configs["test_name"] = request.get_module_name()
384        configs["report_path"] = request.config.report_path
385        configs["suitecases"] = get_config_value(
386            'suitecases', json_config.get_driver(), True)
387        configs["listeners"] = request.listeners.copy()
388        return configs
389
390    def _handle_finally(self, request):
391        # do kit teardown
392        do_module_kit_teardown(request)
393
394        # check result report
395        report_name = request.root.source.test_name if \
396            not request.root.source.test_name.startswith("{") \
397            else "report"
398        module_name = request.get_module_name()
399        self.result = check_result_report(
400            request.config.report_path, self.result, self.error_message,
401            report_name, module_name)
402
403    def _run_testsuites(self, configs, test_list):
404        if configs["testcases_path"]:
405            sys.path.insert(1, configs["testcases_path"])
406            sys.path.insert(1, os.path.dirname(configs["testcases_path"]))
407        request = configs.get('request', None)
408        if request:
409            sys.ecotest_resource_path = request.config.resource_path
410
411        # run AppTest
412        from devicetest.main import DeviceTestSuite
413        app_test = DeviceTestSuite(test_list=test_list, configs=configs,
414                                   devices=self.config.devices, log=LOG)
415        app_test.run()
416
417    def __result__(self):
418        return self.result if os.path.exists(self.result) else ""
419
420    def _handle_test_args(self, request):
421        for device in self.config.devices:
422            # 恢复步骤截图的默认设置
423            setattr(device, "screenshot", False)
424            setattr(device, "screenshot_fail", True)
425            setattr(device, "screenrecorder", False)
426