1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import shutil 21import glob 22import time 23import stat 24 25from _core.config.config_manager import UserConfigManager 26from _core.constants import ConfigConst 27from _core.constants import DeviceTestType 28from _core.constants import GTestConst 29from _core.constants import DeviceLabelType 30from _core.constants import ComType 31from _core.constants import ParserType 32from _core.constants import CKit 33from _core.constants import DeviceLiteKernel 34from _core.constants import FilePermission 35from _core.driver.parser_lite import ShellHandler 36from _core.environment.dmlib_lite import generate_report 37from _core.exception import ExecuteTerminate 38from _core.exception import LiteDeviceError 39from _core.exception import LiteDeviceExecuteCommandError 40from _core.exception import ParamError 41from _core.executor.listener import CollectingLiteGTestListener 42from _core.executor.listener import TestDescription 43from _core.interface import IDriver 44from _core.plugin import Plugin 45from _core.plugin import get_plugin 46from _core.logger import platform_logger 47from _core.report.reporter_helper import DataHelper 48from _core.testkit.json_parser import JsonParser 49from _core.testkit.kit_lite import DeployKit 50from _core.testkit.kit_lite import DeployToolKit 51from _core.utils import get_config_value 52from _core.utils import get_kit_instances 53from _core.utils import check_result_report 54from _core.utils import get_device_log_file 55from _core.utils import get_filename_extension 56from _core.utils import get_file_absolute_path 57from _core.utils import get_test_component_version 58from _core.report.suite_reporter import SuiteReporter 59 60__all__ = ["CppTestDriver", "CTestDriver", "init_remote_server"] 61LOG = platform_logger("DriversLite") 62FAILED_RUN_TEST_ATTEMPTS = 2 63CPP_TEST_MOUNT_STOP_SIGN = "not mount properly, Test Stop" 64CPP_TEST_NFS_SIGN = "execve: I/O error" 65 66 67def get_nfs_server(request): 68 config_manager = UserConfigManager( 69 config_file=request.get(ConfigConst.configfile, ""), 70 env=request.get(ConfigConst.test_environment, "")) 71 remote_info = config_manager.get_user_config("testcases/server", 72 filter_name="NfsServer") 73 if not remote_info: 74 err_msg = "The name of remote nfs server does not match" 75 LOG.error(err_msg, error_no="00403") 76 raise ParamError(err_msg, error_no="00403") 77 return remote_info 78 79 80def init_remote_server(lite_instance, request=None): 81 config_manager = UserConfigManager( 82 config_file=request.get(ConfigConst.configfile, ""), 83 env=request.get(ConfigConst.test_environment, "")) 84 linux_dict = config_manager.get_user_config("testcases/server") 85 86 if linux_dict: 87 setattr(lite_instance, "linux_host", linux_dict.get("ip")) 88 setattr(lite_instance, "linux_port", linux_dict.get("port")) 89 setattr(lite_instance, "linux_directory", linux_dict.get("dir")) 90 91 else: 92 error_message = "nfs server does not exist, please " \ 93 "check user_config.xml" 94 raise ParamError(error_message, error_no="00108") 95 96 97def get_testcases(testcases_list): 98 cases_list = [] 99 for test in testcases_list: 100 test_item = test.split("#") 101 if len(test_item) == 1: 102 cases_list.append(test) 103 elif len(test_item) == 2: 104 cases_list.append(test_item[-1]) 105 return cases_list 106 107 108def sort_by_length(file_name): 109 return len(file_name) 110 111 112@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test_lite) 113class CppTestDriver(IDriver): 114 """ 115 CppTest is a test that runs a native test package on given lite device. 116 """ 117 config = None 118 result = "" 119 error_message = "" 120 121 def __init__(self): 122 self.rerun = True 123 self.file_name = "" 124 125 def __check_environment__(self, device_options): 126 if len(device_options) != 1 or \ 127 device_options[0].label != DeviceLabelType.ipcamera: 128 self.error_message = "check environment failed" 129 return False 130 return True 131 132 def __check_config__(self, config=None): 133 pass 134 135 def __execute__(self, request): 136 kits = [] 137 device_log_file = get_device_log_file( 138 request.config.report_path, 139 request.get_devices()[0].__get_serial__()) 140 try: 141 self.config = request.config 142 self.init_cpp_config() 143 self.config.device = request.config.environment.devices[0] 144 init_remote_server(self, request=request) 145 config_file = request.root.source.config_file 146 json_config = JsonParser(config_file) 147 self._get_driver_config(json_config) 148 149 bin_file = get_config_value('execute', json_config.get_driver(), 150 False) 151 kits = get_kit_instances(json_config, 152 request.config.resource_path, 153 request.config.testcases_path) 154 from xdevice import Scheduler 155 for kit in kits: 156 if not Scheduler.is_execute: 157 raise ExecuteTerminate("ExecuteTerminate", 158 error_no="00300") 159 kit.__setup__(request.config.device, request=request) 160 161 command = self._get_execute_command(bin_file) 162 163 self.set_file_name(request, command) 164 165 if self.config.xml_output: 166 self.delete_device_xml(request, self.config.device_xml_path) 167 if os.path.exists(self.result): 168 os.remove(self.result) 169 if request.config.testargs.get("dry_run"): 170 self.config.dry_run = request.config.testargs.get( 171 "dry_run")[0].lower() 172 self.dry_run(command, request.listeners) 173 else: 174 self.run_cpp_test(command, request) 175 self.generate_device_xml(request, self.execute_bin) 176 177 except (LiteDeviceError, Exception) as exception: 178 LOG.exception(exception, exc_info=False) 179 self.error_message = exception 180 finally: 181 device_log_file_open = os.open(device_log_file, os.O_WRONLY | 182 os.O_CREAT | os.O_APPEND, 183 FilePermission.mode_755) 184 with os.fdopen(device_log_file_open, "a") as file_name: 185 file_name.write(self.config.command_result) 186 file_name.flush() 187 LOG.info("-------------finally-----------------") 188 self._after_command(kits, request) 189 190 def _get_execute_command(self, bin_file): 191 if self.config.device.get("device_kernel") == \ 192 DeviceLiteKernel.linux_kernel: 193 execute_dir = "/storage" + "/".join(bin_file.split("/")[0:-1]) 194 else: 195 execute_dir = "/".join(bin_file.split("/")[0:-1]) 196 self.execute_bin = bin_file.split("/")[-1] 197 198 self.config.device.execute_command_with_timeout( 199 command="cd {}".format(execute_dir), timeout=1) 200 self.config.execute_bin_path = execute_dir 201 202 if self.execute_bin.startswith("/"): 203 command = ".%s" % self.execute_bin 204 else: 205 command = "./%s" % self.execute_bin 206 207 report_path = "/%s/%s/" % ("reports", self.execute_bin.split(".")[0]) 208 self.config.device_xml_path = (self.linux_directory + report_path).\ 209 replace("//", "/") 210 self.config.device_report_path = execute_dir + report_path 211 212 return command 213 214 def _get_driver_config(self, json_config): 215 xml_output = get_config_value('xml-output', 216 json_config.get_driver(), False) 217 218 if isinstance(xml_output, bool): 219 self.config.xml_output = xml_output 220 elif str(xml_output).lower() == "false": 221 self.config.xml_output = False 222 else: 223 self.config.xml_output = True 224 225 rerun = get_config_value('rerun', json_config.get_driver(), False) 226 if isinstance(rerun, bool): 227 self.rerun = rerun 228 elif str(rerun).lower() == "false": 229 self.rerun = False 230 else: 231 self.rerun = True 232 233 timeout_config = get_config_value('timeout', 234 json_config.get_driver(), False) 235 if timeout_config: 236 self.config.timeout = int(timeout_config) 237 else: 238 self.config.timeout = 900 239 240 def _after_command(self, kits, request): 241 if self.config.device.get("device_kernel") == \ 242 DeviceLiteKernel.linux_kernel: 243 self.config.device.execute_command_with_timeout( 244 command="cd /storage", timeout=1) 245 else: 246 self.config.device.execute_command_with_timeout( 247 command="cd /", timeout=1) 248 for kit in kits: 249 kit.__teardown__(request.config.device) 250 self.config.device.close() 251 self.delete_device_xml(request, self.linux_directory) 252 253 report_name = "report" if request.root.source. \ 254 test_name.startswith("{") else get_filename_extension( 255 request.root.source.test_name)[0] 256 if not self.config.dry_run: 257 self.result = check_result_report( 258 request.config.report_path, self.result, self.error_message, 259 report_name) 260 261 def generate_device_xml(self, request, execute_bin): 262 if self.config.xml_output: 263 self.download_nfs_xml(request, self.config.device_xml_path) 264 self.merge_xml(execute_bin) 265 266 def dry_run(self, request, command, listener=None): 267 if self.config.xml_output: 268 collect_test_command = "%s --gtest_output=xml:%s " \ 269 "--gtest_list_tests" % \ 270 (command, self.config.device_report_path) 271 result, _, _ = self.config.device.execute_command_with_timeout( 272 command=collect_test_command, 273 case_type=DeviceTestType.cpp_test_lite, 274 timeout=15, receiver=None) 275 if CPP_TEST_MOUNT_STOP_SIGN in result: 276 tests = [] 277 return tests 278 tests = self.read_nfs_xml(request, self.config.device_xml_path) 279 self.delete_device_xml(request, self.config.device_xml_path) 280 return tests 281 282 else: 283 parsers = get_plugin(Plugin.PARSER, ParserType.cpp_test_list_lite) 284 parser_instances = [] 285 for parser in parsers: 286 parser_instance = parser.__class__() 287 parser_instance.suites_name = os.path.basename(self.result) 288 if listener: 289 parser_instance.listeners = listener 290 parser_instances.append(parser_instance) 291 handler = ShellHandler(parser_instances) 292 293 collect_test_command = "%s --gtest_list_tests" % command 294 result, _, _ = self.config.device.execute_command_with_timeout( 295 command=collect_test_command, 296 case_type=DeviceTestType.cpp_test_lite, 297 timeout=15, receiver=handler) 298 self.config.command_result = "{}{}".format( 299 self.config.command_result, result) 300 if parser_instances[0].tests and \ 301 len(parser_instances[0].tests) > 0: 302 SuiteReporter.set_suite_list([item.test_name for item in 303 parser_instances[0].tests]) 304 else: 305 SuiteReporter.set_suite_list([]) 306 tests = parser_instances[0].tests 307 if not tests: 308 LOG.error("collect test failed!", error_no="00402") 309 return parser_instances[0].tests 310 311 def run_cpp_test(self, command, request): 312 if request.config.testargs.get("test"): 313 testcases_list = get_testcases( 314 request.config.testargs.get("test")) 315 for test in testcases_list: 316 command_case = "{} --gtest_filter=*{}".format( 317 command, test) 318 319 if not self.config.xml_output: 320 self.run(command_case, request.listeners, timeout=15) 321 else: 322 command_case = "{} --gtest_output=xml:{}".format( 323 command_case, self.config.device_report_path) 324 self.run(command_case, None, timeout=15) 325 else: 326 self._do_test_run(command, request) 327 328 def init_cpp_config(self): 329 setattr(self.config, "command_result", "") 330 setattr(self.config, "device_xml_path", "") 331 setattr(self.config, "dry_run", False) 332 333 def merge_xml(self, execute_bin): 334 report_path = os.path.join(self.config.report_path, "result") 335 summary_result = DataHelper.get_summary_result( 336 report_path, self.result, key=sort_by_length, 337 file_prefix=execute_bin) 338 if summary_result: 339 SuiteReporter.append_report_result(( 340 os.path.join(report_path, "%s.xml" % execute_bin), 341 DataHelper.to_string(summary_result))) 342 else: 343 self.error_message = "The test case did not generate XML" 344 for xml_file in os.listdir(os.path.split(self.result)[0]): 345 if not xml_file.startswith(execute_bin): 346 continue 347 if xml_file != os.path.split(self.result)[1]: 348 os.remove(os.path.join(os.path.split( 349 self.result)[0], xml_file)) 350 351 def set_file_name(self, request, command): 352 self.file_name = command.split(" ")[0].split("/")[-1].split(".")[0] 353 self.result = "%s.xml" % os.path.join(request.config.report_path, 354 "result", self.file_name) 355 356 def run(self, command=None, listener=None, timeout=None): 357 if not timeout: 358 timeout = self.config.timeout 359 if listener: 360 parsers = get_plugin(Plugin.PARSER, ParserType.cpp_test_lite) 361 parser_instances = [] 362 for parser in parsers: 363 parser_instance = parser.__class__() 364 parser_instance.suite_name = self.file_name 365 parser_instance.listeners = listener 366 parser_instances.append(parser_instance) 367 handler = ShellHandler(parser_instances) 368 else: 369 handler = None 370 result, _, error = self.config.device.execute_command_with_timeout( 371 command=command, case_type=DeviceTestType.cpp_test_lite, 372 timeout=timeout, receiver=handler) 373 self.config.command_result += result 374 if result.count(CPP_TEST_NFS_SIGN) >= 1: 375 _, _, error = self.config.device.execute_command_with_timeout( 376 command="ping %s" % self.linux_host, 377 case_type=DeviceTestType.cpp_test_lite, 378 timeout=5) 379 return error, result, handler 380 381 def _do_test_run(self, command, request): 382 test_to_run = self._collect_test_to_run(request, command) 383 self._run_with_rerun(command, request, test_to_run) 384 385 def _run_with_rerun(self, command, request, expected_tests): 386 if self.config.xml_output: 387 self.run("{} --gtest_output=xml:{}".format( 388 command, self.config.device_report_path)) 389 time.sleep(5) 390 test_rerun = True 391 if self.check_xml_exist(self.execute_bin + ".xml"): 392 test_rerun = False 393 test_run = self.read_nfs_xml(request, self.config.device_xml_path, 394 test_rerun) 395 396 if len(test_run) < len(expected_tests): 397 expected_tests = TestDescription.remove_test(expected_tests, 398 test_run) 399 self._rerun_tests(command, expected_tests, None) 400 else: 401 test_tracker = CollectingLiteGTestListener() 402 listener = request.listeners 403 listener_copy = listener.copy() 404 listener_copy.append(test_tracker) 405 self.run(command, listener_copy) 406 test_run = test_tracker.get_current_run_results() 407 if len(test_run) != len(expected_tests): 408 expected_tests = TestDescription.remove_test(expected_tests, 409 test_run) 410 self._rerun_tests(command, expected_tests, listener) 411 412 def _rerun_tests(self, command, expected_tests, listener): 413 if not expected_tests: 414 LOG.debug("No tests to re-run, all tests executed at least once.") 415 for test in expected_tests: 416 self._re_run(command, test, listener) 417 418 def _re_run(self, command, test, listener): 419 if self.config.xml_output: 420 _, _, handler = self.run("{} {}=*{} --gtest_output=xml:{}".format( 421 command, GTestConst.exec_para_filter, test.test_name, 422 self.config.device_report_path), 423 listener, timeout=15) 424 else: 425 handler = None 426 for _ in range(FAILED_RUN_TEST_ATTEMPTS): 427 try: 428 listener_copy = listener.copy() 429 test_tracker = CollectingLiteGTestListener() 430 listener_copy.append(test_tracker) 431 _, _, handler = self.run("{} {}=*{}".format( 432 command, GTestConst.exec_para_filter, test.test_name), 433 listener_copy, timeout=15) 434 if len(test_tracker.get_current_run_results()): 435 return 436 except LiteDeviceError as _: 437 LOG.debug("Exception: ShellCommandUnresponsiveException") 438 handler.parsers[0].mark_test_as_failed(test) 439 440 def _collect_test_to_run(self, request, command): 441 if self.rerun: 442 tests = self.dry_run(request, command) 443 return tests 444 return [] 445 446 def download_nfs_xml(self, request, report_path): 447 remote_nfs = get_nfs_server(request) 448 if not remote_nfs: 449 err_msg = "The name of remote device {} does not match". \ 450 format(self.remote) 451 LOG.error(err_msg, error_no="00403") 452 raise TypeError(err_msg) 453 LOG.info("Trying to pull remote server: {}:{} report files to local " 454 "in dir {}".format 455 (remote_nfs.get("ip"), remote_nfs.get("port"), 456 os.path.dirname(self.result))) 457 result_dir = os.path.join(request.config.report_path, "result") 458 os.makedirs(result_dir, exist_ok=True) 459 try: 460 if remote_nfs["remote"] == "true": 461 import paramiko 462 client = paramiko.Transport((remote_nfs.get("ip"), 463 int(remote_nfs.get("port")))) 464 client.connect(username=remote_nfs.get("username"), 465 password=remote_nfs.get("password")) 466 sftp = paramiko.SFTPClient.from_transport(client) 467 files = sftp.listdir(report_path) 468 469 for report_xml in files: 470 if report_xml.endswith(".xml"): 471 filepath = report_path + report_xml 472 try: 473 sftp.get(remotepath=filepath, 474 localpath=os.path.join(os.path.split( 475 self.result)[0], report_xml)) 476 except IOError as error: 477 LOG.error(error, error_no="00404") 478 client.close() 479 else: 480 if os.path.isdir(report_path): 481 for report_xml in os.listdir(report_path): 482 if report_xml.endswith(".xml"): 483 filepath = report_path + report_xml 484 shutil.copy(filepath, 485 os.path.join(os.path.split( 486 self.result)[0], report_xml)) 487 except (FileNotFoundError, IOError) as error: 488 LOG.error("download xml failed %s" % error, error_no="00403") 489 490 def check_xml_exist(self, xml_file, timeout=60): 491 ls_command = "ls %s" % self.config.device_report_path 492 start_time = time.time() 493 while time.time() - start_time < timeout: 494 result, _, _ = self.config.device.execute_command_with_timeout( 495 command=ls_command, case_type=DeviceTestType.cpp_test_lite, 496 timeout=5, receiver=None) 497 if xml_file in result: 498 return True 499 time.sleep(5) 500 if (self.execute_bin + "_1.xml") in result: 501 return False 502 return False 503 504 def read_nfs_xml(self, request, report_path, is_true=False): 505 remote_nfs = get_nfs_server(request) 506 if not remote_nfs: 507 err_msg = "The name of remote device {} does not match". \ 508 format(self.remote) 509 LOG.error(err_msg, error_no="00403") 510 raise TypeError(err_msg) 511 tests = [] 512 execute_bin_xml = (self.execute_bin + "_1.xml") if is_true else ( 513 self.execute_bin + ".xml") 514 LOG.debug("run into :{}".format(is_true)) 515 file_path = os.path.join(report_path, execute_bin_xml) 516 if not self.check_xml_exist(execute_bin_xml): 517 return tests 518 519 from xml.etree import ElementTree 520 try: 521 if remote_nfs["remote"] == "true": 522 import paramiko 523 client = paramiko.SSHClient() 524 client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 525 client.connect(hostname=remote_nfs.get("ip"), 526 port=int(remote_nfs.get("port")), 527 username=remote_nfs.get("username"), 528 password=remote_nfs.get("password")) 529 sftp_client = client.open_sftp() 530 remote_file = sftp_client.open(file_path) 531 try: 532 result = remote_file.read().decode() 533 suites_element = ElementTree.fromstring(result) 534 for suite_element in suites_element: 535 suite_name = suite_element.get("name", "") 536 for case in suite_element: 537 case_name = case.get("name") 538 test = TestDescription(suite_name, case_name) 539 if test not in tests: 540 tests.append(test) 541 finally: 542 remote_file.close() 543 client.close() 544 else: 545 if os.path.isdir(report_path): 546 flags = os.O_RDONLY 547 modes = stat.S_IWUSR | stat.S_IRUSR 548 with os.fdopen(os.open(file_path, flags, modes), 549 "r") as test_file: 550 result = test_file.read() 551 suites_element = ElementTree.fromstring(result) 552 for suite_element in suites_element: 553 suite_name = suite_element.get("name", "") 554 for case in suite_element: 555 case_name = case.get("name") 556 test = TestDescription(suite_name, case_name) 557 if test not in tests: 558 tests.append(test) 559 except (FileNotFoundError, IOError) as error: 560 LOG.error("download xml failed %s" % error, error_no="00403") 561 except SyntaxError as error: 562 LOG.error("parse xml failed %s" % error, error_no="00404") 563 return tests 564 565 def delete_device_xml(self, request, report_path): 566 remote_nfs = get_nfs_server(request) 567 if not remote_nfs: 568 err_msg = "The name of remote device {} does not match". \ 569 format(self.remote) 570 LOG.error(err_msg, error_no="00403") 571 raise TypeError(err_msg) 572 LOG.info("delete xml directory {} from remote server: {}" 573 "".format 574 (report_path, remote_nfs.get("ip"))) 575 if remote_nfs["remote"] == "true": 576 import paramiko 577 client = paramiko.Transport((remote_nfs.get("ip"), 578 int(remote_nfs.get("port")))) 579 client.connect(username=remote_nfs.get("username"), 580 password=remote_nfs.get("password")) 581 sftp = paramiko.SFTPClient.from_transport(client) 582 try: 583 sftp.stat(report_path) 584 files = sftp.listdir(report_path) 585 for report_xml in files: 586 if report_xml.endswith(".xml"): 587 filepath = "{}{}".format(report_path, report_xml) 588 try: 589 sftp.remove(filepath) 590 time.sleep(0.5) 591 except IOError as _: 592 pass 593 except FileNotFoundError as _: 594 pass 595 client.close() 596 else: 597 for report_xml in glob.glob(os.path.join(report_path, '*.xml')): 598 try: 599 os.remove(report_xml) 600 except Exception as exception: 601 LOG.error( 602 "remove {} Failed:{}".format(report_xml, exception)) 603 pass 604 605 def __result__(self): 606 return self.result if os.path.exists(self.result) else "" 607 608 609@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite) 610class CTestDriver(IDriver): 611 """ 612 CTest is a test that runs a native test package on given lite device. 613 """ 614 config = None 615 result = "" 616 error_message = "" 617 version_cmd = "AT+CSV" 618 619 def __init__(self): 620 self.file_name = "" 621 622 def __check_environment__(self, device_options): 623 if len(device_options) != 1 or \ 624 device_options[0].label != DeviceLabelType.wifiiot: 625 self.error_message = "check environment failed" 626 return False 627 return True 628 629 def __check_config__(self, config=None): 630 del config 631 self.config = None 632 633 def __execute__(self, request): 634 from xdevice import Variables 635 try: 636 self.config = request.config 637 self.config.device = request.config.environment.devices[0] 638 current_dir = request.config.resource_path if \ 639 request.config.resource_path else Variables.exec_dir 640 if request.root.source.source_file.strip(): 641 source = os.path.join(current_dir, 642 request.root.source.source_file.strip()) 643 self.file_name = os.path.basename( 644 request.root.source.source_file.strip()).split(".")[0] 645 else: 646 source = request.root.source.source_string.strip() 647 json_config = JsonParser(source) 648 kit_instances = get_kit_instances(json_config, 649 request.config.resource_path, 650 request.config.testcases_path) 651 for (kit_instance, kit_info) in zip(kit_instances, 652 json_config.get_kits()): 653 if isinstance(kit_instance, DeployToolKit): 654 LOG.debug("run ctest third party") 655 self._run_ctest_third_party(source=source, request=request, 656 time_out=int( 657 kit_instance.time_out)) 658 break 659 else: 660 LOG.debug("run ctest") 661 self._run_ctest(source=source, request=request) 662 break 663 664 except (LiteDeviceExecuteCommandError, Exception) as exception: 665 LOG.error(exception, error_no=getattr(exception, "error_no", 666 "00000")) 667 self.error_message = exception 668 finally: 669 report_name = "report" if request.root.source. \ 670 test_name.startswith("{") else get_filename_extension( 671 request.root.source.test_name)[0] 672 self.result = check_result_report( 673 request.config.report_path, self.result, self.error_message, 674 report_name) 675 676 def _run_ctest(self, source=None, request=None): 677 parser_instances = [] 678 parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite) 679 try: 680 if not source: 681 LOG.error("Error: source don't exist %s." % source, 682 error_no="00101") 683 return 684 685 version = get_test_component_version(self.config) 686 687 for parser in parsers: 688 parser_instance = parser.__class__() 689 parser_instance.suites_name = self.file_name 690 parser_instance.product_info.setdefault("Version", version) 691 parser_instance.listeners = request.listeners 692 parser_instances.append(parser_instance) 693 handler = ShellHandler(parser_instances) 694 695 reset_cmd = self._reset_device(request, source) 696 self.result = "%s.xml" % os.path.join( 697 request.config.report_path, "result", self.file_name) 698 self.config.device.device.com_dict.get( 699 ComType.deploy_com).connect() 700 result, _, error = self.config.device.device. \ 701 execute_command_with_timeout( 702 command=reset_cmd, case_type=DeviceTestType.ctest_lite, 703 key=ComType.deploy_com, timeout=90, receiver=handler) 704 device_log_file = get_device_log_file(request.config.report_path, 705 request.config.device. 706 __get_serial__()) 707 device_log_file_open = \ 708 os.open(device_log_file, os.O_WRONLY | os.O_CREAT | 709 os.O_APPEND, FilePermission.mode_755) 710 with os.fdopen(device_log_file_open, "a") as file_name: 711 file_name.write("{}{}".format( 712 "\n".join(result.split("\n")[0:-1]), "\n")) 713 file_name.flush() 714 finally: 715 self.config.device.device.com_dict.get( 716 ComType.deploy_com).close() 717 718 def _run_ctest_third_party(self, source=None, request=None, time_out=5): 719 parser_instances = [] 720 parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite) 721 try: 722 if not source: 723 LOG.error("Error: source don't exist %s." % source, 724 error_no="00101") 725 return 726 727 version = get_test_component_version(self.config) 728 729 for parser in parsers: 730 parser_instance = parser.__class__() 731 parser_instance.suites_name = self.file_name 732 parser_instance.product_info.setdefault("Version", version) 733 parser_instance.listeners = request.listeners 734 parser_instances.append(parser_instance) 735 handler = ShellHandler(parser_instances) 736 737 while True: 738 input_burning = input("Please enter 'y' or 'n' after " 739 "the burning is complete," 740 "enter 'quit' to exit:") 741 if input_burning.lower().strip() in ["y", "yes"]: 742 LOG.info("Burning succeeded.") 743 break 744 elif input_burning.lower().strip() in ["n", "no"]: 745 LOG.info("Burning failed.") 746 elif input_burning.lower().strip() == "quit": 747 break 748 else: 749 LOG.info("The input {} parameter is incorrect," 750 "please enter 'y' or 'n' after the " 751 "burning is complete ,enter 'quit' " 752 "to exit.".format(input_burning)) 753 LOG.info("Please press the device " 754 "reset button when the send commmand [] appears ") 755 time.sleep(3) 756 self.result = "%s.xml" % os.path.join( 757 request.config.report_path, "result", self.file_name) 758 self.config.device.device.com_dict.get( 759 ComType.deploy_com).connect() 760 result, _, error = self.config.device.device. \ 761 execute_command_with_timeout( 762 command=[], case_type=DeviceTestType.ctest_lite, 763 key=ComType.deploy_com, timeout=time_out, receiver=handler) 764 device_log_file = get_device_log_file(request.config.report_path, 765 request.config.device. 766 __get_serial__()) 767 device_log_file_open = \ 768 os.open(device_log_file, os.O_WRONLY | os.O_CREAT | 769 os.O_APPEND, 0o755) 770 with os.fdopen(device_log_file_open, "a") as file_name: 771 file_name.write("{}{}".format( 772 "\n".join(result.split("\n")[0:-1]), "\n")) 773 file_name.flush() 774 finally: 775 self.config.device.device.com_dict.get( 776 ComType.deploy_com).close() 777 778 def _reset_device(self, request, source): 779 json_config = JsonParser(source) 780 reset_cmd = [] 781 kit_instances = get_kit_instances(json_config, 782 request.config.resource_path, 783 request.config.testcases_path) 784 from xdevice import Scheduler 785 for (kit_instance, kit_info) in zip(kit_instances, 786 json_config.get_kits()): 787 if not isinstance(kit_instance, DeployKit): 788 continue 789 if not self.file_name: 790 self.file_name = get_config_value( 791 'burn_file', kit_info)[0].split("\\")[-1].split(".")[0] 792 reset_cmd = kit_instance.burn_command 793 if not Scheduler.is_execute: 794 raise ExecuteTerminate("ExecuteTerminate", 795 error_no="00300") 796 kit_instance.__setup__( 797 self.config.device) 798 reset_cmd = [int(item, 16) for item in reset_cmd] 799 return reset_cmd 800 801 def __result__(self): 802 return self.result if os.path.exists(self.result) else "" 803 804 805@Plugin(type=Plugin.DRIVER, id=DeviceTestType.open_source_test) 806class OpenSourceTestDriver(IDriver): 807 """ 808 OpenSourceTest is a test that runs a native test package on given 809 device lite device. 810 """ 811 config = None 812 result = "" 813 error_message = "" 814 has_param = False 815 816 def __init__(self): 817 self.rerun = True 818 self.file_name = "" 819 self.handler = None 820 821 def __check_environment__(self, device_options): 822 if len(device_options) != 1 or \ 823 device_options[0].label != DeviceLabelType.ipcamera: 824 self.error_message = "check environment failed" 825 return False 826 return True 827 828 def __check_config__(self, config=None): 829 pass 830 831 def __execute__(self, request): 832 kits = [] 833 try: 834 self.config = request.config 835 setattr(self.config, "command_result", "") 836 self.config.device = request.config.environment.devices[0] 837 init_remote_server(self, request) 838 config_file = request.root.source.config_file 839 json_config = JsonParser(config_file) 840 pre_cmd = get_config_value('pre_cmd', json_config.get_driver(), 841 False) 842 execute_dir = get_config_value('execute', json_config.get_driver(), 843 False) 844 kits = get_kit_instances(json_config, 845 request.config.resource_path, 846 request.config.testcases_path) 847 from xdevice import Scheduler 848 for kit in kits: 849 if not Scheduler.is_execute: 850 raise ExecuteTerminate("ExecuteTerminate", 851 error_no="00300") 852 copy_list = kit.__setup__(request.config.device, 853 request=request) 854 855 self.file_name = request.root.source.test_name 856 self.set_file_name(request, request.root.source.test_name) 857 self.config.device.execute_command_with_timeout( 858 command=pre_cmd, timeout=1) 859 self.config.device.execute_command_with_timeout( 860 command="cd {}".format(execute_dir), timeout=1) 861 device_log_file = get_device_log_file( 862 request.config.report_path, 863 request.config.device.__get_serial__()) 864 device_log_file_open = \ 865 os.open(device_log_file, os.O_WRONLY | os.O_CREAT | 866 os.O_APPEND, FilePermission.mode_755) 867 with os.fdopen(device_log_file_open, "a") as file_name: 868 for test_bin in copy_list: 869 if not test_bin.endswith(".run-test"): 870 continue 871 if test_bin.startswith("/"): 872 command = ".%s" % test_bin 873 else: 874 command = "./%s" % test_bin 875 self._do_test_run(command, request) 876 file_name.write(self.config.command_result) 877 file_name.flush() 878 879 except (LiteDeviceExecuteCommandError, Exception) as exception: 880 LOG.error(exception, error_no=getattr(exception, "error_no", 881 "00000")) 882 self.error_message = exception 883 finally: 884 LOG.info("-------------finally-----------------") 885 # umount the dirs already mount 886 for kit in kits: 887 kit.__teardown__(request.config.device) 888 self.config.device.close() 889 report_name = "report" if request.root.source. \ 890 test_name.startswith("{") else get_filename_extension( 891 request.root.source.test_name)[0] 892 self.result = check_result_report( 893 request.config.report_path, self.result, self.error_message, 894 report_name) 895 896 def set_file_name(self, request, bin_file): 897 self.result = "%s.xml" % os.path.join( 898 request.config.report_path, "result", bin_file) 899 900 def run(self, command=None, listener=None, timeout=20): 901 parsers = get_plugin(Plugin.PARSER, 902 ParserType.open_source_test) 903 parser_instances = [] 904 for parser in parsers: 905 parser_instance = parser.__class__() 906 parser_instance.suite_name = self.file_name 907 parser_instance.test_name = command.replace("./", "") 908 parser_instance.listeners = listener 909 parser_instances.append(parser_instance) 910 self.handler = ShellHandler(parser_instances) 911 for _ in range(3): 912 result, _, error = self.config.device.execute_command_with_timeout( 913 command=command, case_type=DeviceTestType.open_source_test, 914 timeout=timeout, receiver=self.handler) 915 self.config.command_result = result 916 if "pass" in result.lower(): 917 break 918 return error, result, self.handler 919 920 def _do_test_run(self, command, request): 921 listeners = request.listeners 922 for listener in listeners: 923 listener.device_sn = self.config.device.device_sn 924 error, _, _ = self.run(command, listeners, timeout=60) 925 if error: 926 LOG.error( 927 "execute %s failed" % command, error_no="00402") 928 929 def __result__(self): 930 return self.result if os.path.exists(self.result) else "" 931 932 933@Plugin(type=Plugin.DRIVER, id=DeviceTestType.build_only_test) 934class BuildOnlyTestDriver(IDriver): 935 """ 936 BuildOnlyTest is a test that runs a native test package on given 937 device lite device. 938 """ 939 config = None 940 result = "" 941 error_message = "" 942 943 def __check_environment__(self, device_options): 944 pass 945 946 def __check_config__(self, config): 947 pass 948 949 def __execute__(self, request): 950 self.config = request.config 951 self.config.device = request.config.environment.devices[0] 952 self.file_name = request.root.source.test_name 953 self.config_file = request.root.source.config_file 954 self.testcases_path = request.config.testcases_path 955 file_path = self._get_log_file() 956 result_list = self._get_result_list(file_path) 957 if len(result_list) == 0: 958 LOG.error( 959 "Error: source don't exist %s." % request.root.source. 960 source_file, error_no="00101") 961 return 962 total_result = '' 963 for result in result_list: 964 flags = os.O_RDONLY 965 modes = stat.S_IWUSR | stat.S_IRUSR 966 with os.fdopen(os.open(result, flags, modes), "r", 967 encoding="utf-8") as file_content: 968 result = file_content.read() 969 if not result.endswith('\n'): 970 result = '%s\n' % result 971 total_result = '{}{}'.format(total_result, result) 972 parsers = get_plugin(Plugin.PARSER, ParserType.build_only_test) 973 parser_instances = [] 974 for parser in parsers: 975 parser_instance = parser.__class__() 976 parser_instance.suite_name = self.file_name 977 parser_instance.listeners = request.listeners 978 parser_instances.append(parser_instance) 979 handler = ShellHandler(parser_instances) 980 generate_report(handler, total_result) 981 982 @classmethod 983 def _get_result_list(cls, file_path): 984 result_list = list() 985 for root_path, dirs_path, file_names in os.walk(file_path): 986 for file_name in file_names: 987 if file_name == "logfile": 988 result_list.append(os.path.join(root_path, file_name)) 989 return result_list 990 991 def _get_log_file(self): 992 json_config = JsonParser(self.config_file) 993 log_path = get_config_value('log_path', json_config.get_driver(), 994 False) 995 log_path = str(log_path.replace("/", "", 1)) if log_path.startswith( 996 "/") else str(log_path) 997 LOG.debug("The log path is:%s" % log_path) 998 file_path = get_file_absolute_path(log_path, 999 paths=[self.testcases_path]) 1000 LOG.debug("The file path is:%s" % file_path) 1001 return file_path 1002 1003 def __result__(self): 1004 return self.result if os.path.exists(self.result) else "" 1005 1006 1007@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite) 1008class JSUnitTestLiteDriver(IDriver): 1009 """ 1010 JSUnitTestDriver is a Test that runs a native test package on given device. 1011 """ 1012 1013 def __init__(self): 1014 self.result = "" 1015 self.error_message = "" 1016 self.kits = [] 1017 self.config = None 1018 1019 def __check_environment__(self, device_options): 1020 pass 1021 1022 def __check_config__(self, config): 1023 pass 1024 1025 def _get_driver_config(self, json_config): 1026 bundle_name = get_config_value('bundle-name', 1027 json_config.get_driver(), False) 1028 if not bundle_name: 1029 raise ParamError("Can't find bundle-name in config file.", 1030 error_no="00108") 1031 else: 1032 self.config.bundle_name = bundle_name 1033 1034 ability = get_config_value('ability', 1035 json_config.get_driver(), False) 1036 if not ability: 1037 self.config.ability = "default" 1038 else: 1039 self.config.ability = ability 1040 1041 def __execute__(self, request): 1042 try: 1043 LOG.debug("Start execute xdevice extension JSUnit Test") 1044 1045 self.config = request.config 1046 self.config.device = request.config.environment.devices[0] 1047 1048 config_file = request.root.source.config_file 1049 suite_file = request.root.source.source_file 1050 1051 if not suite_file: 1052 raise ParamError( 1053 "test source '%s' not exists" % 1054 request.root.source.source_string, error_no="00101") 1055 1056 if not os.path.exists(config_file): 1057 LOG.error("Error: Test cases don't exist %s." % config_file, 1058 error_no="00101") 1059 raise ParamError( 1060 "Error: Test cases don't exist %s." % config_file, 1061 error_no="00101") 1062 1063 self.file_name = os.path.basename( 1064 request.root.source.source_file.strip()).split(".")[0] 1065 1066 self.result = "%s.xml" % os.path.join( 1067 request.config.report_path, "result", self.file_name) 1068 1069 json_config = JsonParser(config_file) 1070 self.kits = get_kit_instances(json_config, 1071 self.config.resource_path, 1072 self.config.testcases_path) 1073 1074 self._get_driver_config(json_config) 1075 from xdevice import Scheduler 1076 for kit in self.kits: 1077 if not Scheduler.is_execute: 1078 raise ExecuteTerminate("ExecuteTerminate", 1079 error_no="00300") 1080 if kit.__class__.__name__ == CKit.liteinstall: 1081 kit.bundle_name = self.config.bundle_name 1082 kit.__setup__(self.config.device, request=request) 1083 1084 self._run_jsunit(request) 1085 1086 except Exception as exception: 1087 self.error_message = exception 1088 finally: 1089 report_name = "report" if request.root.source. \ 1090 test_name.startswith("{") else get_filename_extension( 1091 request.root.source.test_name)[0] 1092 1093 self.result = check_result_report( 1094 request.config.report_path, self.result, self.error_message, 1095 report_name) 1096 1097 for kit in self.kits: 1098 kit.__teardown__(self.config.device) 1099 self.config.device.close() 1100 1101 def _run_jsunit(self, request): 1102 parser_instances = [] 1103 parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite) 1104 for parser in parsers: 1105 parser_instance = parser.__class__() 1106 parser_instance.suites_name = self.file_name 1107 parser_instance.listeners = request.listeners 1108 parser_instances.append(parser_instance) 1109 handler = ShellHandler(parser_instances) 1110 1111 command = "./bin/aa start -p %s -n %s" % \ 1112 (self.config.bundle_name, self.config.ability) 1113 result, _, error = self.config.device.execute_command_with_timeout( 1114 command=command, timeout=300, receiver=handler) 1115 device_log_file = get_device_log_file(request.config.report_path, 1116 request.config.device. 1117 __get_serial__()) 1118 device_log_file_open =\ 1119 os.open(device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 1120 0o755) 1121 with os.fdopen(device_log_file_open, "a") as file_name: 1122 file_name.write("{}{}".format( 1123 "\n".join(result.split("\n")[0:-1]), "\n")) 1124 file_name.flush() 1125 1126 def __result__(self): 1127 return self.result if os.path.exists(self.result) else "" 1128