1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import shutil 21import glob 22import time 23import stat 24 25from xdevice import UserConfigManager 26from xdevice import ConfigConst 27from xdevice import ExecuteTerminate 28from xdevice import ParamError 29 30from xdevice import TestDescription 31from xdevice import IDriver 32from xdevice import Plugin 33from xdevice import get_plugin 34from xdevice import platform_logger 35from xdevice import DataHelper 36from xdevice import JsonParser 37from xdevice import get_config_value 38from xdevice import get_kit_instances 39from xdevice import check_result_report 40from xdevice import get_device_log_file 41from xdevice import get_filename_extension 42from xdevice import get_file_absolute_path 43from xdevice import get_test_component_version 44from xdevice import SuiteReporter 45from xdevice import ShellHandler 46from xdevice import FilePermission 47from xdevice import DeviceTestType 48from xdevice import GTestConst 49from xdevice import DeviceLabelType 50from ohos.constants import ComType 51from ohos.constants import ParserType 52from ohos.constants import DeviceLiteKernel 53from ohos.constants import CKit 54from ohos.exception import LiteDeviceError 55from ohos.exception import LiteDeviceExecuteCommandError 56from ohos.executor.listener import CollectingLiteGTestListener 57from ohos.testkit.kit_lite import DeployKit 58from ohos.testkit.kit_lite import DeployToolKit 59from ohos.environment.dmlib_lite import generate_report 60 61__all__ = ["CppTestDriver", "CTestDriver", "init_remote_server"] 62LOG = platform_logger("DriversLite") 63FAILED_RUN_TEST_ATTEMPTS = 2 64CPP_TEST_MOUNT_STOP_SIGN = "not mount properly, Test Stop" 65CPP_TEST_NFS_SIGN = "execve: I/O error" 66 67 68def get_nfs_server(request): 69 config_manager = UserConfigManager( 70 config_file=request.get(ConfigConst.configfile, ""), 71 env=request.get(ConfigConst.test_environment, "")) 72 remote_info = config_manager.get_user_config("testcases/server", 73 filter_name="NfsServer") 74 if not remote_info: 75 err_msg = "The name of remote nfs server does not match" 76 LOG.error(err_msg, error_no="00403") 77 raise ParamError(err_msg, error_no="00403") 78 return remote_info 79 80 81def init_remote_server(lite_instance, request=None): 82 config_manager = UserConfigManager( 83 config_file=request.get(ConfigConst.configfile, ""), 84 env=request.get(ConfigConst.test_environment, "")) 85 linux_dict = config_manager.get_user_config("testcases/server") 86 87 if linux_dict: 88 setattr(lite_instance, "linux_host", linux_dict.get("ip")) 89 setattr(lite_instance, "linux_port", linux_dict.get("port")) 90 setattr(lite_instance, "linux_directory", linux_dict.get("dir")) 91 92 else: 93 error_message = "nfs server does not exist, please " \ 94 "check user_config.xml" 95 raise ParamError(error_message, error_no="00108") 96 97 98def get_testcases(testcases_list): 99 cases_list = [] 100 for test in testcases_list: 101 test_item = test.split("#") 102 if len(test_item) == 1: 103 cases_list.append(test) 104 elif len(test_item) == 2: 105 cases_list.append(test_item[-1]) 106 return cases_list 107 108 109def sort_by_length(file_name): 110 return len(file_name) 111 112 113@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test_lite) 114class CppTestDriver(IDriver): 115 """ 116 CppTest is a test that runs a native test package on given lite device. 117 """ 118 config = None 119 result = "" 120 error_message = "" 121 122 def __init__(self): 123 self.rerun = True 124 self.file_name = "" 125 126 def __check_environment__(self, device_options): 127 if len(device_options) != 1 or \ 128 device_options[0].label != DeviceLabelType.ipcamera: 129 self.error_message = "check environment failed" 130 return False 131 return True 132 133 def __check_config__(self, config=None): 134 pass 135 136 def __execute__(self, request): 137 kits = [] 138 device_log_file = get_device_log_file( 139 request.config.report_path, 140 request.get_devices()[0].__get_serial__()) 141 try: 142 self.config = request.config 143 self.init_cpp_config() 144 self.config.device = request.config.environment.devices[0] 145 init_remote_server(self, request=request) 146 config_file = request.root.source.config_file 147 json_config = JsonParser(config_file) 148 self._get_driver_config(json_config) 149 150 bin_file = get_config_value('execute', json_config.get_driver(), 151 False) 152 kits = get_kit_instances(json_config, 153 request.config.resource_path, 154 request.config.testcases_path) 155 from xdevice import Scheduler 156 for kit in kits: 157 if not Scheduler.is_execute: 158 raise ExecuteTerminate("ExecuteTerminate", 159 error_no="00300") 160 kit.__setup__(request.config.device, request=request) 161 162 command = self._get_execute_command(bin_file) 163 164 self.set_file_name(request, command) 165 166 if self.config.xml_output: 167 self.delete_device_xml(request, self.config.device_xml_path) 168 if os.path.exists(self.result): 169 os.remove(self.result) 170 if request.config.testargs.get("dry_run"): 171 self.config.dry_run = request.config.testargs.get( 172 "dry_run")[0].lower() 173 self.dry_run(command, request.listeners) 174 else: 175 self.run_cpp_test(command, request) 176 self.generate_device_xml(request, self.execute_bin) 177 178 except (LiteDeviceError, Exception) as exception: 179 LOG.exception(exception, exc_info=False) 180 self.error_message = exception 181 finally: 182 device_log_file_open = os.open(device_log_file, os.O_WRONLY | 183 os.O_CREAT | os.O_APPEND, 184 FilePermission.mode_755) 185 with os.fdopen(device_log_file_open, "a") as file_name: 186 file_name.write(self.config.command_result) 187 file_name.flush() 188 LOG.info("-------------finally-----------------") 189 self._after_command(kits, request) 190 191 def _get_execute_command(self, bin_file): 192 if self.config.device.get("device_kernel") == \ 193 DeviceLiteKernel.linux_kernel: 194 execute_dir = "/storage" + "/".join(bin_file.split("/")[0:-1]) 195 else: 196 execute_dir = "/".join(bin_file.split("/")[0:-1]) 197 self.execute_bin = bin_file.split("/")[-1] 198 199 self.config.device.execute_command_with_timeout( 200 command="cd {}".format(execute_dir), timeout=1) 201 self.config.execute_bin_path = execute_dir 202 203 if self.execute_bin.startswith("/"): 204 command = ".%s" % self.execute_bin 205 else: 206 command = "./%s" % self.execute_bin 207 208 report_path = "/%s/%s/" % ("reports", self.execute_bin.split(".")[0]) 209 self.config.device_xml_path = (self.linux_directory + report_path). \ 210 replace("//", "/") 211 self.config.device_report_path = execute_dir + report_path 212 213 return command 214 215 def _get_driver_config(self, json_config): 216 xml_output = get_config_value('xml-output', 217 json_config.get_driver(), False) 218 219 if isinstance(xml_output, bool): 220 self.config.xml_output = xml_output 221 elif str(xml_output).lower() == "false": 222 self.config.xml_output = False 223 else: 224 self.config.xml_output = True 225 226 rerun = get_config_value('rerun', json_config.get_driver(), False) 227 if isinstance(rerun, bool): 228 self.rerun = rerun 229 elif str(rerun).lower() == "false": 230 self.rerun = False 231 else: 232 self.rerun = True 233 234 timeout_config = get_config_value('timeout', 235 json_config.get_driver(), False) 236 if timeout_config: 237 self.config.timeout = int(timeout_config) // 1000 238 else: 239 self.config.timeout = 900 240 241 def _after_command(self, kits, request): 242 if self.config.device.get("device_kernel") == \ 243 DeviceLiteKernel.linux_kernel: 244 self.config.device.execute_command_with_timeout( 245 command="cd /storage", timeout=1) 246 else: 247 self.config.device.execute_command_with_timeout( 248 command="cd /", timeout=1) 249 for kit in kits: 250 kit.__teardown__(request.config.device) 251 self.config.device.close() 252 self.delete_device_xml(request, self.linux_directory) 253 254 report_name = "report" if request.root.source. \ 255 test_name.startswith("{") else get_filename_extension( 256 request.root.source.test_name)[0] 257 if not self.config.dry_run: 258 self.result = check_result_report( 259 request.config.report_path, self.result, self.error_message, 260 report_name) 261 262 def generate_device_xml(self, request, execute_bin): 263 if self.config.xml_output: 264 self.download_nfs_xml(request, self.config.device_xml_path) 265 self.merge_xml(execute_bin) 266 267 def dry_run(self, request, command, listener=None): 268 if self.config.xml_output: 269 collect_test_command = "%s --gtest_output=xml:%s " \ 270 "--gtest_list_tests" % \ 271 (command, self.config.device_report_path) 272 result, _, _ = self.config.device.execute_command_with_timeout( 273 command=collect_test_command, 274 case_type=DeviceTestType.cpp_test_lite, 275 timeout=15, receiver=None) 276 if CPP_TEST_MOUNT_STOP_SIGN in result: 277 tests = [] 278 return tests 279 tests = self.read_nfs_xml(request, self.config.device_xml_path) 280 self.delete_device_xml(request, self.config.device_xml_path) 281 return tests 282 283 else: 284 parsers = get_plugin(Plugin.PARSER, ParserType.cpp_test_list_lite) 285 parser_instances = [] 286 for parser in parsers: 287 parser_instance = parser.__class__() 288 parser_instance.suites_name = os.path.basename(self.result) 289 if listener: 290 parser_instance.listeners = listener 291 parser_instances.append(parser_instance) 292 handler = ShellHandler(parser_instances) 293 294 collect_test_command = "%s --gtest_list_tests" % command 295 result, _, _ = self.config.device.execute_command_with_timeout( 296 command=collect_test_command, 297 case_type=DeviceTestType.cpp_test_lite, 298 timeout=15, receiver=handler) 299 self.config.command_result = "{}{}".format( 300 self.config.command_result, result) 301 if parser_instances[0].tests and \ 302 len(parser_instances[0].tests) > 0: 303 SuiteReporter.set_suite_list([item.test_name for item in 304 parser_instances[0].tests]) 305 else: 306 SuiteReporter.set_suite_list([]) 307 tests = parser_instances[0].tests 308 if not tests: 309 LOG.error("Collect test failed!", error_no="00402") 310 return parser_instances[0].tests 311 312 def run_cpp_test(self, command, request): 313 if request.config.testargs.get("test"): 314 testcases_list = get_testcases( 315 request.config.testargs.get("test")) 316 for test in testcases_list: 317 command_case = "{} --gtest_filter=*{}".format( 318 command, test) 319 320 if not self.config.xml_output: 321 self.run(command_case, request.listeners, timeout=15) 322 else: 323 command_case = "{} --gtest_output=xml:{}".format( 324 command_case, self.config.device_report_path) 325 self.run(command_case, None, timeout=15) 326 else: 327 self._do_test_run(command, request) 328 329 def init_cpp_config(self): 330 setattr(self.config, "command_result", "") 331 setattr(self.config, "device_xml_path", "") 332 setattr(self.config, "dry_run", False) 333 334 def merge_xml(self, execute_bin): 335 report_path = os.path.join(self.config.report_path, "result") 336 summary_result = DataHelper.get_summary_result( 337 report_path, self.result, key=sort_by_length, 338 file_prefix=execute_bin) 339 if summary_result: 340 SuiteReporter.append_report_result(( 341 os.path.join(report_path, "%s.xml" % execute_bin), 342 DataHelper.to_string(summary_result))) 343 else: 344 self.error_message = "The test case did not generate XML" 345 for xml_file in os.listdir(os.path.split(self.result)[0]): 346 if not xml_file.startswith(execute_bin): 347 continue 348 if xml_file != os.path.split(self.result)[1]: 349 os.remove(os.path.join(os.path.split( 350 self.result)[0], xml_file)) 351 352 def set_file_name(self, request, command): 353 self.file_name = command.split(" ")[0].split("/")[-1].split(".")[0] 354 self.result = "%s.xml" % os.path.join(request.config.report_path, 355 "result", self.file_name) 356 357 def run(self, command=None, listener=None, timeout=None): 358 if not timeout: 359 timeout = self.config.timeout 360 if listener: 361 parsers = get_plugin(Plugin.PARSER, ParserType.cpp_test_lite) 362 parser_instances = [] 363 for parser in parsers: 364 parser_instance = parser.__class__() 365 parser_instance.suite_name = self.file_name 366 parser_instance.listeners = listener 367 parser_instances.append(parser_instance) 368 handler = ShellHandler(parser_instances) 369 else: 370 handler = None 371 result, _, error = self.config.device.execute_command_with_timeout( 372 command=command, case_type=DeviceTestType.cpp_test_lite, 373 timeout=timeout, receiver=handler) 374 self.config.command_result += result 375 if result.count(CPP_TEST_NFS_SIGN) >= 1: 376 _, _, error = self.config.device.execute_command_with_timeout( 377 command="ping %s" % self.linux_host, 378 case_type=DeviceTestType.cpp_test_lite, 379 timeout=5) 380 return error, result, handler 381 382 def _do_test_run(self, command, request): 383 test_to_run = self._collect_test_to_run(request, command) 384 self._run_with_rerun(command, request, test_to_run) 385 386 def _run_with_rerun(self, command, request, expected_tests): 387 if self.config.xml_output: 388 self.run("{} --gtest_output=xml:{}".format( 389 command, self.config.device_report_path)) 390 time.sleep(5) 391 test_rerun = True 392 if self.check_xml_exist(self.execute_bin + ".xml"): 393 test_rerun = False 394 test_run = self.read_nfs_xml(request, 395 self.config.device_xml_path, 396 test_rerun) 397 if len(test_run) < len(expected_tests): 398 expected_tests = TestDescription.remove_test(expected_tests, 399 test_run) 400 self._rerun_tests(command, expected_tests, None) 401 else: 402 test_tracker = CollectingLiteGTestListener() 403 listener = request.listeners 404 listener_copy = listener.copy() 405 listener_copy.append(test_tracker) 406 self.run(command, listener_copy) 407 test_run = test_tracker.get_current_run_results() 408 if len(test_run) != len(expected_tests): 409 expected_tests = TestDescription.remove_test(expected_tests, 410 test_run) 411 self._rerun_tests(command, expected_tests, listener) 412 413 def _rerun_tests(self, command, expected_tests, listener): 414 if not expected_tests: 415 LOG.debug("No tests to re-run, all tests executed at least once.") 416 for test in expected_tests: 417 self._re_run(command, test, listener) 418 419 def _re_run(self, command, test, listener): 420 if self.config.xml_output: 421 _, _, handler = self.run("{} {}=*{} --gtest_output=xml:{}".format( 422 command, GTestConst.exec_para_filter, test.test_name, 423 self.config.device_report_path), 424 listener, timeout=15) 425 else: 426 handler = None 427 for _ in range(FAILED_RUN_TEST_ATTEMPTS): 428 try: 429 listener_copy = listener.copy() 430 test_tracker = CollectingLiteGTestListener() 431 listener_copy.append(test_tracker) 432 _, _, handler = self.run("{} {}=*{}".format( 433 command, GTestConst.exec_para_filter, test.test_name), 434 listener_copy, timeout=15) 435 if len(test_tracker.get_current_run_results()): 436 return 437 except LiteDeviceError as _: 438 LOG.debug("Exception: ShellCommandUnresponsiveException") 439 handler.parsers[0].mark_test_as_failed(test) 440 441 def _collect_test_to_run(self, request, command): 442 if self.rerun: 443 tests = self.dry_run(request, command) 444 return tests 445 return [] 446 447 def download_nfs_xml(self, request, report_path): 448 remote_nfs = get_nfs_server(request) 449 if not remote_nfs: 450 err_msg = "The name of remote device {} does not match". \ 451 format(self.remote) 452 LOG.error(err_msg, error_no="00403") 453 raise TypeError(err_msg) 454 LOG.info("Trying to pull remote server: {}:{} report files to local " 455 "in dir {}".format 456 (remote_nfs.get("ip"), remote_nfs.get("port"), 457 os.path.dirname(self.result))) 458 result_dir = os.path.join(request.config.report_path, "result") 459 os.makedirs(result_dir, exist_ok=True) 460 try: 461 if remote_nfs["remote"] == "true": 462 import paramiko 463 client = paramiko.Transport((remote_nfs.get("ip"), 464 int(remote_nfs.get("port")))) 465 client.connect(username=remote_nfs.get("username"), 466 password=remote_nfs.get("password")) 467 sftp = paramiko.SFTPClient.from_transport(client) 468 files = sftp.listdir(report_path) 469 470 for report_xml in files: 471 if report_xml.endswith(".xml"): 472 filepath = report_path + report_xml 473 try: 474 sftp.get(remotepath=filepath, 475 localpath=os.path.join(os.path.split( 476 self.result)[0], report_xml)) 477 except IOError as error: 478 LOG.error(error, error_no="00404") 479 client.close() 480 else: 481 if os.path.isdir(report_path): 482 for report_xml in os.listdir(report_path): 483 if report_xml.endswith(".xml"): 484 filepath = report_path + report_xml 485 shutil.copy(filepath, 486 os.path.join(os.path.split( 487 self.result)[0], report_xml)) 488 except (FileNotFoundError, IOError) as error: 489 LOG.error("Download xml failed %s" % error, error_no="00403") 490 491 def check_xml_exist(self, xml_file, timeout=60): 492 ls_command = "ls %s" % self.config.device_report_path 493 start_time = time.time() 494 while time.time() - start_time < timeout: 495 result, _, _ = self.config.device.execute_command_with_timeout( 496 command=ls_command, case_type=DeviceTestType.cpp_test_lite, 497 timeout=5, receiver=None) 498 if xml_file in result: 499 return True 500 time.sleep(5) 501 if (self.execute_bin + "_1.xml") in result: 502 return False 503 return False 504 505 def read_nfs_xml(self, request, report_path, is_true=False): 506 remote_nfs = get_nfs_server(request) 507 if not remote_nfs: 508 err_msg = "The name of remote device {} does not match". \ 509 format(self.remote) 510 LOG.error(err_msg, error_no="00403") 511 raise TypeError(err_msg) 512 tests = [] 513 execute_bin_xml = (self.execute_bin + "_1.xml") if is_true else ( 514 self.execute_bin + ".xml") 515 LOG.debug("run into :{}".format(is_true)) 516 file_path = os.path.join(report_path, execute_bin_xml) 517 if not self.check_xml_exist(execute_bin_xml): 518 return tests 519 520 from xml.etree import ElementTree 521 try: 522 if remote_nfs["remote"] == "true": 523 import paramiko 524 client = paramiko.SSHClient() 525 client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 526 client.connect(hostname=remote_nfs.get("ip"), 527 port=int(remote_nfs.get("port")), 528 username=remote_nfs.get("username"), 529 password=remote_nfs.get("password")) 530 sftp_client = client.open_sftp() 531 remote_file = sftp_client.open(file_path) 532 try: 533 result = remote_file.read().decode() 534 suites_element = ElementTree.fromstring(result) 535 for suite_element in suites_element: 536 suite_name = suite_element.get("name", "") 537 for case in suite_element: 538 case_name = case.get("name") 539 test = TestDescription(suite_name, case_name) 540 if test not in tests: 541 tests.append(test) 542 finally: 543 remote_file.close() 544 client.close() 545 else: 546 if os.path.isdir(report_path): 547 flags = os.O_RDONLY 548 modes = stat.S_IWUSR | stat.S_IRUSR 549 with os.fdopen(os.open(file_path, flags, modes), 550 "r") as test_file: 551 result = test_file.read() 552 suites_element = ElementTree.fromstring(result) 553 for suite_element in suites_element: 554 suite_name = suite_element.get("name", "") 555 for case in suite_element: 556 case_name = case.get("name") 557 test = TestDescription(suite_name, case_name) 558 if test not in tests: 559 tests.append(test) 560 except (FileNotFoundError, IOError) as error: 561 LOG.error("Download xml failed %s" % error, error_no="00403") 562 except SyntaxError as error: 563 LOG.error("Parse xml failed %s" % error, error_no="00404") 564 return tests 565 566 def delete_device_xml(self, request, report_path): 567 remote_nfs = get_nfs_server(request) 568 if not remote_nfs: 569 err_msg = "The name of remote device {} does not match". \ 570 format(self.remote) 571 LOG.error(err_msg, error_no="00403") 572 raise TypeError(err_msg) 573 LOG.info("Delete xml directory {} from remote server: {}" 574 "".format 575 (report_path, remote_nfs.get("ip"))) 576 if remote_nfs["remote"] == "true": 577 import paramiko 578 client = paramiko.Transport((remote_nfs.get("ip"), 579 int(remote_nfs.get("port")))) 580 client.connect(username=remote_nfs.get("username"), 581 password=remote_nfs.get("password")) 582 sftp = paramiko.SFTPClient.from_transport(client) 583 try: 584 sftp.stat(report_path) 585 files = sftp.listdir(report_path) 586 for report_xml in files: 587 if report_xml.endswith(".xml"): 588 filepath = "{}{}".format(report_path, report_xml) 589 try: 590 sftp.remove(filepath) 591 time.sleep(0.5) 592 except IOError as _: 593 pass 594 except FileNotFoundError as _: 595 pass 596 client.close() 597 else: 598 for report_xml in glob.glob(os.path.join(report_path, '*.xml')): 599 try: 600 os.remove(report_xml) 601 except Exception as exception: 602 LOG.error( 603 "remove {} Failed:{}".format(report_xml, exception)) 604 pass 605 606 def __result__(self): 607 return self.result if os.path.exists(self.result) else "" 608 609 610@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite) 611class CTestDriver(IDriver): 612 """ 613 CTest is a test that runs a native test package on given lite device. 614 """ 615 config = None 616 result = "" 617 error_message = "" 618 version_cmd = "AT+CSV" 619 620 def __init__(self): 621 self.file_name = "" 622 self.run_third = False 623 self.kit_type = None 624 self.auto_deploy = None 625 626 def __check_environment__(self, device_options): 627 if len(device_options) != 1 or \ 628 device_options[0].label != DeviceLabelType.wifiiot: 629 self.error_message = "check environment failed" 630 return False 631 return True 632 633 def __check_config__(self, config=None): 634 del config 635 self.config = None 636 637 def __execute__(self, request): 638 from xdevice import Variables 639 try: 640 self.config = request.config 641 self.config.device = request.config.environment.devices[0] 642 current_dir = request.config.resource_path if \ 643 request.config.resource_path else Variables.exec_dir 644 if request.root.source.source_file.strip(): 645 source = os.path.join(current_dir, 646 request.root.source.source_file.strip()) 647 self.file_name = os.path.basename( 648 request.root.source.source_file.strip()).split(".")[0] 649 else: 650 source = request.root.source.source_string.strip() 651 json_config = JsonParser(source) 652 kit_instances = get_kit_instances(json_config, 653 request.config.resource_path, 654 request.config.testcases_path) 655 for (_, kit_info) in zip(kit_instances, json_config.get_kits()): 656 self.auto_deploy = kit_info.get("auto_deploy", "") 657 self.kit_type = kit_info.get("type", "") 658 if self.kit_type == CKit.deploytool: 659 self.run_third = True 660 LOG.info("Kit type:{}".format(self.kit_type)) 661 LOG.info("Auto deploy:{}".format(self.auto_deploy)) 662 LOG.info("Run third:{}".format(self.run_third)) 663 if self.run_third: 664 LOG.info("Run the third-party vendor part") 665 if self.auto_deploy in ["False", "flase"]: 666 self._run_ctest_third_party(source=source, request=request) 667 else: 668 self._run_ctest_upgrade_party(source=source, request=request) 669 else: 670 LOG.debug("Run ctest") 671 self._run_ctest(source=source, request=request) 672 673 except (LiteDeviceExecuteCommandError, Exception) as exception: 674 LOG.error(exception, error_no=getattr(exception, "error_no", 675 "00000")) 676 self.error_message = exception 677 finally: 678 report_name = "report" if request.root.source. \ 679 test_name.startswith("{") else get_filename_extension( 680 request.root.source.test_name)[0] 681 self.result = check_result_report( 682 request.config.report_path, self.result, self.error_message, 683 report_name) 684 685 def _run_ctest(self, source=None, request=None, timeout=90): 686 parser_instances = [] 687 parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite) 688 try: 689 if not source: 690 LOG.error("Error: source don't exist %s." % source, 691 error_no="00101") 692 return 693 694 version = get_test_component_version(self.config) 695 696 for parser in parsers: 697 parser_instance = parser.__class__() 698 parser_instance.suites_name = self.file_name 699 parser_instance.product_info.setdefault("Version", version) 700 parser_instance.listeners = request.listeners 701 parser_instances.append(parser_instance) 702 handler = ShellHandler(parser_instances) 703 704 reset_cmd = self._reset_device(request, source) 705 self.result = "%s.xml" % os.path.join( 706 request.config.report_path, "result", self.file_name) 707 self.config.device.device.com_dict.get( 708 ComType.deploy_com).connect() 709 result, _, error = self.config.device.device. \ 710 execute_command_with_timeout( 711 command=reset_cmd, case_type=DeviceTestType.ctest_lite, 712 key=ComType.deploy_com, timeout=timeout, receiver=handler) 713 device_log_file = get_device_log_file(request.config.report_path, 714 request.config.device. 715 __get_serial__()) 716 device_log_file_open = \ 717 os.open(device_log_file, os.O_WRONLY | os.O_CREAT | 718 os.O_APPEND, FilePermission.mode_755) 719 with os.fdopen(device_log_file_open, "a") as file_name: 720 file_name.write("{}{}".format( 721 "\n".join(result.split("\n")[0:-1]), "\n")) 722 file_name.flush() 723 finally: 724 self.config.device.device.com_dict.get( 725 ComType.deploy_com).close() 726 727 def _run_ctest_third_party(self, source=None, request=None, timeout=5): 728 parser_instances = [] 729 parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite) 730 try: 731 if not source: 732 LOG.error("Error: source don't exist %s." % source, 733 error_no="00101") 734 return 735 736 version = get_test_component_version(self.config) 737 738 for parser in parsers: 739 parser_instance = parser.__class__() 740 parser_instance.suites_name = self.file_name 741 parser_instance.product_info.setdefault("Version", version) 742 parser_instance.listeners = request.listeners 743 parser_instances.append(parser_instance) 744 handler = ShellHandler(parser_instances) 745 746 while True: 747 input_burning = input("Please enter 'y' or 'n' " 748 "after the burning is complete," 749 "enter 'quit' to exit:") 750 if input_burning.lower().strip() in ["y", "yes"]: 751 LOG.info("Burning succeeded.") 752 break 753 elif input_burning.lower().strip() in ["n", "no"]: 754 LOG.info("Burning failed.") 755 elif input_burning.lower().strip() == "quit": 756 break 757 else: 758 LOG.info({"The input parameter is incorrect,please" 759 " enter 'y' or 'n' after the burning is " 760 "complete,enter 'quit' to exit."}) 761 LOG.info("Please press the reset button on the device ") 762 time.sleep(5) 763 self.result = "%s.xml" % os.path.join( 764 request.config.report_path, "result", self.file_name) 765 self.config.device.device.com_dict.get( 766 ComType.deploy_com).connect() 767 768 LOG.debug("Device com:{}".format(self.config.device.device)) 769 result, _, error = self.config.device.device. \ 770 execute_command_with_timeout( 771 command=[], case_type=DeviceTestType.ctest_lite, 772 key=ComType.deploy_com, timeout=timeout, receiver=handler) 773 device_log_file = get_device_log_file(request.config.report_path, 774 request.config.device. 775 __get_serial__()) 776 device_log_file_open = \ 777 os.open(device_log_file, os.O_WRONLY | os.O_CREAT | 778 os.O_APPEND, FilePermission.mode_755) 779 with os.fdopen(device_log_file_open, "a") as file_name: 780 file_name.write("{}{}".format( 781 "\n".join(result.split("\n")[0:-1]), "\n")) 782 file_name.flush() 783 finally: 784 self.config.device.device.com_dict.get( 785 ComType.deploy_com).close() 786 787 def _run_ctest_upgrade_party(self, source=None, request=None, timeout=90): 788 parser_instances = [] 789 parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite) 790 try: 791 if not source: 792 LOG.error("Error: source don't exist %s." % source, 793 error_no="00101") 794 return 795 796 version = get_test_component_version(self.config) 797 798 for parser in parsers: 799 parser_instance = parser.__class__() 800 parser_instance.suites_name = self.file_name 801 parser_instance.product_info.setdefault("Version", version) 802 parser_instance.listeners = request.listeners 803 parser_instances.append(parser_instance) 804 handler = ShellHandler(parser_instances) 805 result = self._reset_third_device(request, source) 806 self.result = "%s.xml" % os.path.join( 807 request.config.report_path, "result", self.file_name) 808 if isinstance(result, list): 809 self.config.device.device.com_dict.get( 810 ComType.deploy_com).connect() 811 LOG.debug("Device com:{}".format(self.config.device.device)) 812 result, _, error = self.config.device.device. \ 813 execute_command_with_timeout( 814 command=request, case_type=DeviceTestType.ctest_lite, 815 key=ComType.deploy_com, timeout=timeout, receiver=handler) 816 else: 817 handler.__read__(result) 818 handler.__done__() 819 device_log_file = get_device_log_file(request.config.report_path, 820 request.config.device. 821 __get_serial__()) 822 device_log_file_open = \ 823 os.open(device_log_file, os.O_WRONLY | os.O_CREAT | 824 os.O_APPEND, FilePermission.mode_755) 825 with os.fdopen(device_log_file_open, "a") as file_name: 826 file_name.write("{}{}".format( 827 "\n".join(result.split("\n")[0:-1]), "\n")) 828 file_name.flush() 829 finally: 830 self.config.device.device.com_dict.get( 831 ComType.deploy_com).close() 832 833 def _reset_device(self, request, source): 834 json_config = JsonParser(source) 835 reset_cmd = [] 836 kit_instances = get_kit_instances(json_config, 837 request.config.resource_path, 838 request.config.testcases_path) 839 from xdevice import Scheduler 840 for (kit_instance, kit_info) in zip(kit_instances, 841 json_config.get_kits()): 842 if not isinstance(kit_instance, DeployKit): 843 continue 844 if not self.file_name: 845 self.file_name = get_config_value( 846 'burn_file', kit_info)[0].split("\\")[-1].split(".")[0] 847 reset_cmd = kit_instance.burn_command 848 if not Scheduler.is_execute: 849 raise ExecuteTerminate("ExecuteTerminate", 850 error_no="00300") 851 kit_instance.__setup__( 852 self.config.device) 853 reset_cmd = [int(item, 16) for item in reset_cmd] 854 return reset_cmd 855 856 def _reset_third_device(self, request, source): 857 json_config = JsonParser(source) 858 reset_cmd = [] 859 kit_instances = get_kit_instances(json_config, 860 request.config.resource_path, 861 request.config.testcases_path) 862 from xdevice import Scheduler 863 for (kit_instance, kit_info) in zip(kit_instances, 864 json_config.get_kits()): 865 if not isinstance(kit_instance, DeployToolKit): 866 continue 867 if not self.file_name: 868 self.file_name = get_config_value( 869 'burn_file', kit_info)[0].split("\\")[-1].split(".")[0] 870 if not Scheduler.is_execute: 871 raise ExecuteTerminate("ExecuteTerminate", 872 error_no="00300") 873 reset_cmd = kit_instance.__setup__( 874 self.config.device) 875 return reset_cmd 876 877 def __result__(self): 878 return self.result if os.path.exists(self.result) else "" 879 880 881@Plugin(type=Plugin.DRIVER, id=DeviceTestType.open_source_test) 882class OpenSourceTestDriver(IDriver): 883 """ 884 OpenSourceTest is a test that runs a native test package on given 885 device lite device. 886 """ 887 config = None 888 result = "" 889 error_message = "" 890 has_param = False 891 892 def __init__(self): 893 self.rerun = True 894 self.file_name = "" 895 self.handler = None 896 897 def __check_environment__(self, device_options): 898 if len(device_options) != 1 or \ 899 device_options[0].label != DeviceLabelType.ipcamera: 900 self.error_message = "check environment failed" 901 return False 902 return True 903 904 def __check_config__(self, config=None): 905 pass 906 907 def __execute__(self, request): 908 kits = [] 909 try: 910 self.config = request.config 911 setattr(self.config, "command_result", "") 912 self.config.device = request.config.environment.devices[0] 913 init_remote_server(self, request) 914 config_file = request.root.source.config_file 915 json_config = JsonParser(config_file) 916 pre_cmd = get_config_value('pre_cmd', json_config.get_driver(), 917 False) 918 execute_dir = get_config_value('execute', json_config.get_driver(), 919 False) 920 kits = get_kit_instances(json_config, 921 request.config.resource_path, 922 request.config.testcases_path) 923 from xdevice import Scheduler 924 for kit in kits: 925 if not Scheduler.is_execute: 926 raise ExecuteTerminate("ExecuteTerminate", 927 error_no="00300") 928 copy_list = kit.__setup__(request.config.device, 929 request=request) 930 931 self.file_name = request.root.source.test_name 932 self.set_file_name(request, request.root.source.test_name) 933 self.config.device.execute_command_with_timeout( 934 command=pre_cmd, timeout=1) 935 self.config.device.execute_command_with_timeout( 936 command="cd {}".format(execute_dir), timeout=1) 937 device_log_file = get_device_log_file( 938 request.config.report_path, 939 request.config.device.__get_serial__()) 940 device_log_file_open = \ 941 os.open(device_log_file, os.O_WRONLY | os.O_CREAT | 942 os.O_APPEND, FilePermission.mode_755) 943 with os.fdopen(device_log_file_open, "a") as file_name: 944 for test_bin in copy_list: 945 if not test_bin.endswith(".run-test"): 946 continue 947 if test_bin.startswith("/"): 948 command = ".%s" % test_bin 949 else: 950 command = "./%s" % test_bin 951 self._do_test_run(command, request) 952 file_name.write(self.config.command_result) 953 file_name.flush() 954 955 except (LiteDeviceExecuteCommandError, Exception) as exception: 956 LOG.error(exception, error_no=getattr(exception, "error_no", 957 "00000")) 958 self.error_message = exception 959 finally: 960 LOG.info("-------------finally-----------------") 961 # umount the dirs already mount 962 for kit in kits: 963 kit.__teardown__(request.config.device) 964 self.config.device.close() 965 report_name = "report" if request.root.source. \ 966 test_name.startswith("{") else get_filename_extension( 967 request.root.source.test_name)[0] 968 self.result = check_result_report( 969 request.config.report_path, self.result, self.error_message, 970 report_name) 971 972 def set_file_name(self, request, bin_file): 973 self.result = "%s.xml" % os.path.join( 974 request.config.report_path, "result", bin_file) 975 976 def run(self, command=None, listener=None, timeout=20): 977 parsers = get_plugin(Plugin.PARSER, 978 ParserType.open_source_test) 979 parser_instances = [] 980 for parser in parsers: 981 parser_instance = parser.__class__() 982 parser_instance.suite_name = self.file_name 983 parser_instance.test_name = command.replace("./", "") 984 parser_instance.listeners = listener 985 parser_instances.append(parser_instance) 986 self.handler = ShellHandler(parser_instances) 987 for _ in range(3): 988 result, _, error = self.config.device.execute_command_with_timeout( 989 command=command, case_type=DeviceTestType.open_source_test, 990 timeout=timeout, receiver=self.handler) 991 self.config.command_result = result 992 if "pass" in result.lower(): 993 break 994 return error, result, self.handler 995 996 def _do_test_run(self, command, request): 997 listeners = request.listeners 998 for listener in listeners: 999 listener.device_sn = self.config.device.device_sn 1000 error, _, _ = self.run(command, listeners, timeout=60) 1001 if error: 1002 LOG.error( 1003 "Execute %s failed" % command, error_no="00402") 1004 1005 def __result__(self): 1006 return self.result if os.path.exists(self.result) else "" 1007 1008 1009@Plugin(type=Plugin.DRIVER, id=DeviceTestType.build_only_test) 1010class BuildOnlyTestDriver(IDriver): 1011 """ 1012 BuildOnlyTest is a test that runs a native test package on given 1013 device lite device. 1014 """ 1015 config = None 1016 result = "" 1017 error_message = "" 1018 1019 def __check_environment__(self, device_options): 1020 pass 1021 1022 def __check_config__(self, config): 1023 pass 1024 1025 def __execute__(self, request): 1026 self.config = request.config 1027 self.config.device = request.config.environment.devices[0] 1028 self.file_name = request.root.source.test_name 1029 self.config_file = request.root.source.config_file 1030 self.testcases_path = request.config.testcases_path 1031 file_path = self._get_log_file() 1032 result_list = self._get_result_list(file_path) 1033 if len(result_list) == 0: 1034 LOG.error( 1035 "Error: source don't exist %s." % request.root.source. 1036 source_file, error_no="00101") 1037 return 1038 total_result = '' 1039 for result in result_list: 1040 flags = os.O_RDONLY 1041 modes = stat.S_IWUSR | stat.S_IRUSR 1042 with os.fdopen(os.open(result, flags, modes), "r", 1043 encoding="utf-8") as file_content: 1044 result = file_content.read() 1045 if not result.endswith('\n'): 1046 result = '%s\n' % result 1047 total_result = '{}{}'.format(total_result, result) 1048 parsers = get_plugin(Plugin.PARSER, ParserType.build_only_test) 1049 parser_instances = [] 1050 for parser in parsers: 1051 parser_instance = parser.__class__() 1052 parser_instance.suite_name = self.file_name 1053 parser_instance.listeners = request.listeners 1054 parser_instances.append(parser_instance) 1055 handler = ShellHandler(parser_instances) 1056 generate_report(handler, total_result) 1057 1058 @classmethod 1059 def _get_result_list(cls, file_path): 1060 result_list = list() 1061 for root_path, dirs_path, file_names in os.walk(file_path): 1062 for file_name in file_names: 1063 if file_name == "logfile": 1064 result_list.append(os.path.join(root_path, file_name)) 1065 return result_list 1066 1067 def _get_log_file(self): 1068 json_config = JsonParser(self.config_file) 1069 log_path = get_config_value('log_path', json_config.get_driver(), 1070 False) 1071 log_path = str(log_path.replace("/", "", 1)) if log_path.startswith( 1072 "/") else str(log_path) 1073 LOG.debug("The log path is:%s" % log_path) 1074 file_path = get_file_absolute_path(log_path, 1075 paths=[self.testcases_path]) 1076 LOG.debug("The file path is:%s" % file_path) 1077 return file_path 1078 1079 def __result__(self): 1080 return self.result if os.path.exists(self.result) else "" 1081 1082 1083@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite) 1084class JSUnitTestLiteDriver(IDriver): 1085 """ 1086 JSUnitTestDriver is a Test that runs a native test package on given device. 1087 """ 1088 1089 def __init__(self): 1090 self.result = "" 1091 self.error_message = "" 1092 self.kits = [] 1093 self.config = None 1094 1095 def __check_environment__(self, device_options): 1096 pass 1097 1098 def __check_config__(self, config): 1099 pass 1100 1101 def _get_driver_config(self, json_config): 1102 bundle_name = get_config_value('bundle-name', 1103 json_config.get_driver(), False) 1104 if not bundle_name: 1105 raise ParamError("Can't find bundle-name in config file.", 1106 error_no="00108") 1107 else: 1108 self.config.bundle_name = bundle_name 1109 1110 ability = get_config_value('ability', 1111 json_config.get_driver(), False) 1112 if not ability: 1113 self.config.ability = "default" 1114 else: 1115 self.config.ability = ability 1116 1117 def __execute__(self, request): 1118 try: 1119 LOG.debug("Start execute xdevice extension JSUnit Test") 1120 1121 self.config = request.config 1122 self.config.device = request.config.environment.devices[0] 1123 1124 config_file = request.root.source.config_file 1125 suite_file = request.root.source.source_file 1126 1127 if not suite_file: 1128 raise ParamError( 1129 "test source '%s' not exists" % 1130 request.root.source.source_string, error_no="00101") 1131 1132 if not os.path.exists(config_file): 1133 LOG.error("Error: Test cases don't exist %s." % config_file, 1134 error_no="00101") 1135 raise ParamError( 1136 "Error: Test cases don't exist %s." % config_file, 1137 error_no="00101") 1138 1139 self.file_name = os.path.basename( 1140 request.root.source.source_file.strip()).split(".")[0] 1141 1142 self.result = "%s.xml" % os.path.join( 1143 request.config.report_path, "result", self.file_name) 1144 1145 json_config = JsonParser(config_file) 1146 self.kits = get_kit_instances(json_config, 1147 self.config.resource_path, 1148 self.config.testcases_path) 1149 1150 self._get_driver_config(json_config) 1151 from xdevice import Scheduler 1152 for kit in self.kits: 1153 if not Scheduler.is_execute: 1154 raise ExecuteTerminate("ExecuteTerminate", 1155 error_no="00300") 1156 if kit.__class__.__name__ == CKit.liteinstall: 1157 kit.bundle_name = self.config.bundle_name 1158 kit.__setup__(self.config.device, request=request) 1159 1160 self._run_jsunit(request) 1161 1162 except Exception as exception: 1163 self.error_message = exception 1164 finally: 1165 report_name = "report" if request.root.source. \ 1166 test_name.startswith("{") else get_filename_extension( 1167 request.root.source.test_name)[0] 1168 1169 self.result = check_result_report( 1170 request.config.report_path, self.result, self.error_message, 1171 report_name) 1172 1173 for kit in self.kits: 1174 kit.__teardown__(self.config.device) 1175 self.config.device.close() 1176 1177 def _run_jsunit(self, request): 1178 parser_instances = [] 1179 parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite) 1180 for parser in parsers: 1181 parser_instance = parser.__class__() 1182 parser_instance.suites_name = self.file_name 1183 parser_instance.listeners = request.listeners 1184 parser_instances.append(parser_instance) 1185 handler = ShellHandler(parser_instances) 1186 1187 command = "./bin/aa start -p %s -n %s" % \ 1188 (self.config.bundle_name, self.config.ability) 1189 result, _, error = self.config.device.execute_command_with_timeout( 1190 command=command, timeout=300, receiver=handler) 1191 device_log_file = get_device_log_file(request.config.report_path, 1192 request.config.device. 1193 __get_serial__()) 1194 device_log_file_open =\ 1195 os.open(device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 1196 FilePermission.mode_755) 1197 with os.fdopen(device_log_file_open, "a") as file_name: 1198 file_name.write("{}{}".format( 1199 "\n".join(result.split("\n")[0:-1]), "\n")) 1200 file_name.flush() 1201 1202 def __result__(self): 1203 return self.result if os.path.exists(self.result) else "" 1204