1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import sys 21 22from xdevice import TestExecType 23from xdevice import DeviceError 24from xdevice import ParamError 25from xdevice import ReportException 26from xdevice import ExecuteTerminate 27from xdevice import IDriver 28from xdevice import platform_logger 29from xdevice import Plugin 30from xdevice import JsonParser 31from xdevice import get_config_value 32from xdevice import do_module_kit_setup 33from xdevice import do_module_kit_teardown 34from xdevice import get_filename_extension 35from xdevice import get_file_absolute_path 36from xdevice import get_kit_instances 37from xdevice import check_result_report 38from xdevice import Scheduler 39from xdevice import LifeStage 40from xdevice import HostDrivenTestType 41 42from devicetest.core.constants import DeviceTestMode 43 44__all__ = ["DeviceTestDriver", "DeviceTestSuiteDriver"] 45LOG = platform_logger("DeviceTest") 46PY_SUFFIX = ".py" 47PYD_SUFFIX = ".pyd" 48PYC_SUFFIX = ".pyc" 49 50 51def _get_dict_test_list(module_path, file_name): 52 test_list = [] 53 for root, _, files in os.walk(module_path): 54 for _file in files: 55 if (_file.endswith(".py") or _file.endswith(".pyd")) \ 56 and file_name == os.path.splitext(_file)[0]: 57 test_list.append(os.path.join(root, _file)) 58 return test_list 59 60 61@Plugin(type=Plugin.DRIVER, id=HostDrivenTestType.device_test) 62class DeviceTestDriver(IDriver): 63 """ 64 DeviceTest is a Test that runs a host-driven test on given devices. 65 """ 66 # test driver config 67 config = None 68 result = "" 69 error_message = "" 70 py_file = "" 71 72 def __init__(self): 73 pass 74 75 def __check_environment__(self, device_options): 76 pass 77 78 def __check_config__(self, config=None): 79 pass 80 81 def __execute__(self, request): 82 try: 83 # delete sys devicetest mode 84 if hasattr(sys, DeviceTestMode.MODE): 85 delattr(sys, DeviceTestMode.MODE) 86 87 # set self.config 88 self.config = request.config 89 self.config.devices = request.get_devices() 90 if request.get("exectype") == TestExecType.device_test \ 91 and not self.config.devices: 92 LOG.error("No device", error_no="00104") 93 raise ParamError("Load Error[00104]", error_no="00104") 94 95 # get source, json config and kits 96 if request.get_config_file(): 97 source = request.get_config_file() 98 LOG.debug("Test config file path: %s" % source) 99 else: 100 source = request.get_source_string() 101 LOG.debug("Test String: %s" % source) 102 103 if not source: 104 LOG.error("No config file found for '%s'" % 105 request.get_source_file(), error_no="00102") 106 raise ParamError("Load Error(00102)", error_no="00102") 107 108 json_config = JsonParser(source) 109 kits = get_kit_instances(json_config, request.config.resource_path, 110 request.config.testcases_path) 111 do_module_kit_setup(request, kits) 112 113 test_name = request.get_module_name() 114 self.result = os.path.join(request.config.report_path, "result", "%s.xml" % test_name) 115 116 # set configs keys 117 configs = self._set_configs(json_config, kits, request) 118 119 # handle test args 120 self._handle_test_args(request=request) 121 122 # get test list 123 test_list = self._get_test_list(json_config, request, source) 124 if not test_list: 125 raise ParamError("no test list to run") 126 self._run_devicetest(configs, test_list) 127 except (ReportException, ModuleNotFoundError, ExecuteTerminate, 128 SyntaxError, ValueError, AttributeError, TypeError, 129 KeyboardInterrupt, ParamError, DeviceError) as exception: 130 error_no = getattr(exception, "error_no", "00000") 131 LOG.exception(exception, exc_info=False, error_no=error_no) 132 self.error_message = exception 133 Scheduler.call_life_stage_action( 134 stage=LifeStage.case_end, 135 case_name=request.get_module_name(), 136 case_result="Failed", 137 error_msg=str(self.error_message)) 138 finally: 139 self._handle_finally(request) 140 141 def _get_test_list(self, json_config, request, source): 142 test_list = get_config_value('py_file', json_config.get_driver(), 143 is_list=True) 144 if str(request.root.source.source_file).endswith(PYD_SUFFIX) or \ 145 str(request.root.source.source_file).endswith(PY_SUFFIX): 146 test_list = [request.root.source.source_file] 147 148 if not test_list and os.path.exists(source): 149 dir_name, file_name = os.path.split(source) 150 file_name, _ = os.path.splitext(file_name) 151 test_list = _get_dict_test_list(os.path.dirname(source), file_name) 152 153 # check test list 154 testcase = request.get("testcase") 155 testcase_list = [] 156 if testcase: 157 testcase_list = str(testcase).split(";") 158 159 checked_test_list = [] 160 for _, test in enumerate(test_list): 161 if not os.path.exists(test): 162 try: 163 absolute_file = get_file_absolute_path(test, [ 164 self.config.resource_path, self.config.testcases_path]) 165 except ParamError as error: 166 LOG.error(error, error_no=error.error_no) 167 continue 168 else: 169 absolute_file = test 170 171 file_name = get_filename_extension(absolute_file)[0] 172 if not testcase_list or file_name in testcase_list: 173 checked_test_list.append(absolute_file) 174 else: 175 LOG.info("Test '%s' is ignored", absolute_file) 176 if checked_test_list: 177 LOG.info("Test list: {}".format(checked_test_list)) 178 else: 179 LOG.error("No test list found", error_no="00109") 180 raise ParamError("Load Error(00109), No test list found", error_no="00109") 181 if len(checked_test_list) > 1: 182 LOG.error("{}.json field [py_file] only support one py file, now is {}" 183 .format(request.get_module_name(), test_list), error_no="00110") 184 raise ParamError("Load Error(00110), too many testcase in field [py_file].", error_no="00110") 185 return checked_test_list 186 187 def _set_configs(self, json_config, kits, request): 188 configs = dict() 189 configs["testargs"] = self.config.testargs or {} 190 configs["testcases_path"] = self.config.testcases_path or "" 191 configs["request"] = request 192 configs["test_name"] = request.get_module_name() 193 configs["report_path"] = request.config.report_path 194 configs["execute"] = get_config_value( 195 'execute', json_config.get_driver(), False) 196 return configs 197 198 def _handle_finally(self, request): 199 # do kit teardown 200 do_module_kit_teardown(request) 201 202 # check result report 203 report_name = request.root.source.test_name if \ 204 not request.root.source.test_name.startswith("{") \ 205 else "report" 206 module_name = request.get_module_name() 207 self.result = check_result_report( 208 request.config.report_path, self.result, self.error_message, 209 report_name, module_name) 210 211 def _run_devicetest(self, configs, test_list): 212 from xdevice import Variables 213 214 # insert paths for loading _devicetest module and testcases 215 devicetest_module = os.path.join(Variables.modules_dir, "_devicetest") 216 if os.path.exists(devicetest_module): 217 sys.path.insert(1, devicetest_module) 218 if configs["testcases_path"]: 219 sys.path.insert(1, configs["testcases_path"]) 220 sys.path.insert(1, os.path.dirname(configs["testcases_path"])) 221 222 # apply data to devicetest module about resource path 223 request = configs.get('request', None) 224 if request: 225 sys.ecotest_resource_path = request.config.resource_path 226 227 # run devicetest 228 from devicetest.main import DeviceTest 229 device_test = DeviceTest(test_list, configs, self.config.devices, LOG, self.result) 230 device_test.run() 231 232 def __result__(self): 233 return self.result if os.path.exists(self.result) else "" 234 235 def _handle_test_args(self, request): 236 for device in self.config.devices: 237 # 恢复步骤截图的默认设置 238 setattr(device, "screenshot", False) 239 setattr(device, "screenshot_fail", True) 240 setattr(device, "screenrecorder", False) 241 242 243def _get_dict_testsuite(testsuite, config): 244 post_suffix = [PY_SUFFIX, PYD_SUFFIX, PYC_SUFFIX] 245 for suffix in post_suffix: 246 testsuite_file = "{}{}".format(testsuite, suffix) 247 if not os.path.exists(testsuite_file): 248 try: 249 absolute_file = get_file_absolute_path(testsuite_file, [ 250 config.resource_path, config.testcases_path]) 251 return absolute_file 252 except ParamError as error: 253 LOG.error(error, error_no=error.error_no) 254 continue 255 else: 256 return testsuite_file 257 return None 258 259 260@Plugin(type=Plugin.DRIVER, id=HostDrivenTestType.device_testsuite) 261class DeviceTestSuiteDriver(IDriver): 262 """ 263 DeviceTestSuiteDriver is a Test that runs a host-driven test on given devices. 264 """ 265 # test driver config 266 config = None 267 result = "" 268 error_message = "" 269 py_file = "" 270 271 def __init__(self): 272 pass 273 274 def __check_environment__(self, device_options): 275 pass 276 277 def __check_config__(self, config=None): 278 pass 279 280 def __execute__(self, request): 281 try: 282 # delete sys devicetest mode 283 if hasattr(sys, DeviceTestMode.MODE): 284 delattr(sys, DeviceTestMode.MODE) 285 286 # set self.config 287 self.config = request.config 288 self.config.devices = request.get_devices() 289 if request.get("exectype") == TestExecType.device_test \ 290 and not self.config.devices: 291 raise ParamError("Load Error[00104]", error_no="00104") 292 293 # get source, json config and kits 294 if request.get_config_file(): 295 source = request.get_config_file() 296 LOG.debug("Test config file path: %s" % source) 297 else: 298 source = request.get_source_string() 299 LOG.debug("Test String: %s" % source) 300 301 if not source: 302 LOG.error("No config file found for '%s'" % 303 request.get_source_file(), error_no="00102") 304 raise ParamError("Load Error(00102)", error_no="00102") 305 306 json_config = JsonParser(source) 307 kits = get_kit_instances(json_config, request.config.resource_path, 308 request.config.testcases_path) 309 do_module_kit_setup(request, kits) 310 311 test_name = request.get_module_name() 312 self.result = os.path.join(request.config.report_path, "result", "%s.xml" % test_name) 313 314 # set configs keys 315 configs = self._set_configs(json_config, kits, request) 316 317 # handle test args 318 self._handle_test_args(request=request) 319 320 # get test list 321 test_list = self._get_test_list(json_config, request, source) 322 if not test_list: 323 raise ParamError("no test list to run") 324 self._run_testsuites(configs, test_list) 325 except (ReportException, ModuleNotFoundError, ExecuteTerminate, 326 SyntaxError, ValueError, AttributeError, TypeError, 327 KeyboardInterrupt, ParamError, DeviceError) as exception: 328 error_no = getattr(exception, "error_no", "00000") 329 LOG.exception(exception, exc_info=False, error_no=error_no) 330 self.error_message = exception 331 Scheduler.call_life_stage_action( 332 stage=LifeStage.case_end, 333 case_name=request.get_module_name(), 334 case_result="Failed", 335 error_msg=str(self.error_message)) 336 finally: 337 self._handle_finally(request) 338 339 def _get_test_list(self, json_config, request, source): 340 testsuite = get_config_value('testsuite', json_config.get_driver(), 341 is_list=False) 342 343 if not testsuite and os.path.exists(source): 344 dir_name, file_name = os.path.split(source) 345 file_name, _ = os.path.splitext(file_name) 346 temp_testsuite = _get_dict_test_list(os.path.dirname(source), file_name) 347 if temp_testsuite: 348 testsuite = temp_testsuite[0] 349 350 if not testsuite: 351 LOG.error("Json not set testsuite or can't found py file.") 352 raise ParamError("Load Error(00109), Json not set testsuite or can't found py file.", error_no="00109") 353 354 checked_testsuite = None 355 if testsuite.endswith(PY_SUFFIX) or \ 356 testsuite.endswith(PYD_SUFFIX) or \ 357 testsuite.endswith(PYC_SUFFIX): 358 if not os.path.exists(testsuite): 359 try: 360 checked_testsuite = get_file_absolute_path(testsuite, [ 361 self.config.resource_path, self.config.testcases_path]) 362 except ParamError as error: 363 LOG.debug(error, error_no=error.error_no) 364 else: 365 checked_testsuite = testsuite 366 else: 367 checked_testsuite = _get_dict_testsuite(testsuite, self.config) 368 369 if checked_testsuite: 370 LOG.info("Test suite list: {}".format(checked_testsuite)) 371 else: 372 LOG.error("No test suite list found", error_no="00109") 373 raise ParamError("Load Error(00109), No test list found", error_no="00109") 374 return checked_testsuite 375 376 def _set_configs(self, json_config, kits, request): 377 configs = dict() 378 configs["testargs"] = self.config.testargs or {} 379 configs["testcases_path"] = self.config.testcases_path or "" 380 configs["resource_path"] = self.config.resource_path or "" 381 configs["request"] = request 382 configs["device_log"] = request.config.device_log 383 configs["test_name"] = request.get_module_name() 384 configs["report_path"] = request.config.report_path 385 configs["suitecases"] = get_config_value( 386 'suitecases', json_config.get_driver(), True) 387 configs["listeners"] = request.listeners.copy() 388 return configs 389 390 def _handle_finally(self, request): 391 # do kit teardown 392 do_module_kit_teardown(request) 393 394 # check result report 395 report_name = request.root.source.test_name if \ 396 not request.root.source.test_name.startswith("{") \ 397 else "report" 398 module_name = request.get_module_name() 399 self.result = check_result_report( 400 request.config.report_path, self.result, self.error_message, 401 report_name, module_name) 402 403 def _run_testsuites(self, configs, test_list): 404 if configs["testcases_path"]: 405 sys.path.insert(1, configs["testcases_path"]) 406 sys.path.insert(1, os.path.dirname(configs["testcases_path"])) 407 request = configs.get('request', None) 408 if request: 409 sys.ecotest_resource_path = request.config.resource_path 410 411 # run AppTest 412 from devicetest.main import DeviceTestSuite 413 app_test = DeviceTestSuite(test_list=test_list, configs=configs, 414 devices=self.config.devices, log=LOG) 415 app_test.run() 416 417 def __result__(self): 418 return self.result if os.path.exists(self.result) else "" 419 420 def _handle_test_args(self, request): 421 for device in self.config.devices: 422 # 恢复步骤截图的默认设置 423 setattr(device, "screenshot", False) 424 setattr(device, "screenshot_fail", True) 425 setattr(device, "screenrecorder", False) 426