#!/usr/bin/env python3 # coding=utf-8 # # Copyright (c) 2020-2022 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import os import shutil import glob import time import stat from xdevice import UserConfigManager from xdevice import ConfigConst from xdevice import ExecuteTerminate from xdevice import ParamError from xdevice import TestDescription from xdevice import IDriver from xdevice import Plugin from xdevice import get_plugin from xdevice import platform_logger from xdevice import DataHelper from xdevice import JsonParser from xdevice import get_config_value from xdevice import get_kit_instances from xdevice import check_result_report from xdevice import get_device_log_file from xdevice import get_filename_extension from xdevice import get_file_absolute_path from xdevice import get_test_component_version from xdevice import SuiteReporter from xdevice import ShellHandler from xdevice import FilePermission from xdevice import DeviceTestType from xdevice import GTestConst from xdevice import DeviceLabelType from ohos.constants import ComType from ohos.constants import ParserType from ohos.constants import DeviceLiteKernel from ohos.constants import CKit from ohos.exception import LiteDeviceError from ohos.exception import LiteDeviceExecuteCommandError from ohos.executor.listener import CollectingLiteGTestListener from ohos.testkit.kit_lite import DeployKit from ohos.testkit.kit_lite import DeployToolKit from ohos.environment.dmlib_lite import generate_report __all__ = ["CppTestDriver", "CTestDriver", "init_remote_server"] LOG = platform_logger("DriversLite") FAILED_RUN_TEST_ATTEMPTS = 2 CPP_TEST_MOUNT_STOP_SIGN = "not mount properly, Test Stop" CPP_TEST_NFS_SIGN = "execve: I/O error" def get_nfs_server(request): config_manager = UserConfigManager( config_file=request.get(ConfigConst.configfile, ""), env=request.get(ConfigConst.test_environment, "")) remote_info = config_manager.get_user_config("testcases/server", filter_name="NfsServer") if not remote_info: err_msg = "The name of remote nfs server does not match" LOG.error(err_msg, error_no="00403") raise ParamError(err_msg, error_no="00403") return remote_info def init_remote_server(lite_instance, request=None): config_manager = UserConfigManager( config_file=request.get(ConfigConst.configfile, ""), env=request.get(ConfigConst.test_environment, "")) linux_dict = config_manager.get_user_config("testcases/server") if linux_dict: setattr(lite_instance, "linux_host", linux_dict.get("ip")) setattr(lite_instance, "linux_port", linux_dict.get("port")) setattr(lite_instance, "linux_directory", linux_dict.get("dir")) else: error_message = "nfs server does not exist, please " \ "check user_config.xml" raise ParamError(error_message, error_no="00108") def get_testcases(testcases_list): cases_list = [] for test in testcases_list: test_item = test.split("#") if len(test_item) == 1: cases_list.append(test) elif len(test_item) == 2: cases_list.append(test_item[-1]) return cases_list def sort_by_length(file_name): return len(file_name) @Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test_lite) class CppTestDriver(IDriver): """ CppTest is a test that runs a native test package on given lite device. """ config = None result = "" error_message = "" def __init__(self): self.rerun = True self.file_name = "" def __check_environment__(self, device_options): if len(device_options) != 1 or \ device_options[0].label != DeviceLabelType.ipcamera: self.error_message = "check environment failed" return False return True def __check_config__(self, config=None): pass def __execute__(self, request): kits = [] device_log_file = get_device_log_file( request.config.report_path, request.get_devices()[0].__get_serial__()) try: self.config = request.config self.init_cpp_config() self.config.device = request.config.environment.devices[0] init_remote_server(self, request=request) config_file = request.root.source.config_file json_config = JsonParser(config_file) self._get_driver_config(json_config) bin_file = get_config_value('execute', json_config.get_driver(), False) kits = get_kit_instances(json_config, request.config.resource_path, request.config.testcases_path) from xdevice import Scheduler for kit in kits: if not Scheduler.is_execute: raise ExecuteTerminate("ExecuteTerminate", error_no="00300") kit.__setup__(request.config.device, request=request) command = self._get_execute_command(bin_file) self.set_file_name(request, command) if self.config.xml_output: self.delete_device_xml(request, self.config.device_xml_path) if os.path.exists(self.result): os.remove(self.result) if request.config.testargs.get("dry_run"): self.config.dry_run = request.config.testargs.get( "dry_run")[0].lower() self.dry_run(command, request.listeners) else: self.run_cpp_test(command, request) self.generate_device_xml(request, self.execute_bin) except (LiteDeviceError, Exception) as exception: LOG.exception(exception, exc_info=False) self.error_message = exception finally: device_log_file_open = os.open(device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND, FilePermission.mode_755) with os.fdopen(device_log_file_open, "a") as file_name: file_name.write(self.config.command_result) file_name.flush() LOG.info("-------------finally-----------------") self._after_command(kits, request) def _get_execute_command(self, bin_file): if self.config.device.get("device_kernel") == \ DeviceLiteKernel.linux_kernel: execute_dir = "/storage" + "/".join(bin_file.split("/")[0:-1]) else: execute_dir = "/".join(bin_file.split("/")[0:-1]) self.execute_bin = bin_file.split("/")[-1] self.config.device.execute_command_with_timeout( command="cd {}".format(execute_dir), timeout=1) self.config.execute_bin_path = execute_dir if self.execute_bin.startswith("/"): command = ".%s" % self.execute_bin else: command = "./%s" % self.execute_bin report_path = "/%s/%s/" % ("reports", self.execute_bin.split(".")[0]) self.config.device_xml_path = (self.linux_directory + report_path). \ replace("//", "/") self.config.device_report_path = execute_dir + report_path return command def _get_driver_config(self, json_config): xml_output = get_config_value('xml-output', json_config.get_driver(), False) if isinstance(xml_output, bool): self.config.xml_output = xml_output elif str(xml_output).lower() == "false": self.config.xml_output = False else: self.config.xml_output = True rerun = get_config_value('rerun', json_config.get_driver(), False) if isinstance(rerun, bool): self.rerun = rerun elif str(rerun).lower() == "false": self.rerun = False else: self.rerun = True timeout_config = get_config_value('timeout', json_config.get_driver(), False) if timeout_config: self.config.timeout = int(timeout_config) // 1000 else: self.config.timeout = 900 def _after_command(self, kits, request): if self.config.device.get("device_kernel") == \ DeviceLiteKernel.linux_kernel: self.config.device.execute_command_with_timeout( command="cd /storage", timeout=1) else: self.config.device.execute_command_with_timeout( command="cd /", timeout=1) for kit in kits: kit.__teardown__(request.config.device) self.config.device.close() self.delete_device_xml(request, self.linux_directory) report_name = "report" if request.root.source. \ test_name.startswith("{") else get_filename_extension( request.root.source.test_name)[0] if not self.config.dry_run: self.result = check_result_report( request.config.report_path, self.result, self.error_message, report_name) def generate_device_xml(self, request, execute_bin): if self.config.xml_output: self.download_nfs_xml(request, self.config.device_xml_path) self.merge_xml(execute_bin) def dry_run(self, request, command, listener=None): if self.config.xml_output: collect_test_command = "%s --gtest_output=xml:%s " \ "--gtest_list_tests" % \ (command, self.config.device_report_path) result, _, _ = self.config.device.execute_command_with_timeout( command=collect_test_command, case_type=DeviceTestType.cpp_test_lite, timeout=15, receiver=None) if CPP_TEST_MOUNT_STOP_SIGN in result: tests = [] return tests tests = self.read_nfs_xml(request, self.config.device_xml_path) self.delete_device_xml(request, self.config.device_xml_path) return tests else: parsers = get_plugin(Plugin.PARSER, ParserType.cpp_test_list_lite) parser_instances = [] for parser in parsers: parser_instance = parser.__class__() parser_instance.suites_name = os.path.basename(self.result) if listener: parser_instance.listeners = listener parser_instances.append(parser_instance) handler = ShellHandler(parser_instances) collect_test_command = "%s --gtest_list_tests" % command result, _, _ = self.config.device.execute_command_with_timeout( command=collect_test_command, case_type=DeviceTestType.cpp_test_lite, timeout=15, receiver=handler) self.config.command_result = "{}{}".format( self.config.command_result, result) if parser_instances[0].tests and \ len(parser_instances[0].tests) > 0: SuiteReporter.set_suite_list([item.test_name for item in parser_instances[0].tests]) else: SuiteReporter.set_suite_list([]) tests = parser_instances[0].tests if not tests: LOG.error("Collect test failed!", error_no="00402") return parser_instances[0].tests def run_cpp_test(self, command, request): if request.config.testargs.get("test"): testcases_list = get_testcases( request.config.testargs.get("test")) for test in testcases_list: command_case = "{} --gtest_filter=*{}".format( command, test) if not self.config.xml_output: self.run(command_case, request.listeners, timeout=15) else: command_case = "{} --gtest_output=xml:{}".format( command_case, self.config.device_report_path) self.run(command_case, None, timeout=15) else: self._do_test_run(command, request) def init_cpp_config(self): setattr(self.config, "command_result", "") setattr(self.config, "device_xml_path", "") setattr(self.config, "dry_run", False) def merge_xml(self, execute_bin): report_path = os.path.join(self.config.report_path, "result") summary_result = DataHelper.get_summary_result( report_path, self.result, key=sort_by_length, file_prefix=execute_bin) if summary_result: SuiteReporter.append_report_result(( os.path.join(report_path, "%s.xml" % execute_bin), DataHelper.to_string(summary_result))) else: self.error_message = "The test case did not generate XML" for xml_file in os.listdir(os.path.split(self.result)[0]): if not xml_file.startswith(execute_bin): continue if xml_file != os.path.split(self.result)[1]: os.remove(os.path.join(os.path.split( self.result)[0], xml_file)) def set_file_name(self, request, command): self.file_name = command.split(" ")[0].split("/")[-1].split(".")[0] self.result = "%s.xml" % os.path.join(request.config.report_path, "result", self.file_name) def run(self, command=None, listener=None, timeout=None): if not timeout: timeout = self.config.timeout if listener: parsers = get_plugin(Plugin.PARSER, ParserType.cpp_test_lite) parser_instances = [] for parser in parsers: parser_instance = parser.__class__() parser_instance.suite_name = self.file_name parser_instance.listeners = listener parser_instances.append(parser_instance) handler = ShellHandler(parser_instances) else: handler = None result, _, error = self.config.device.execute_command_with_timeout( command=command, case_type=DeviceTestType.cpp_test_lite, timeout=timeout, receiver=handler) self.config.command_result += result if result.count(CPP_TEST_NFS_SIGN) >= 1: _, _, error = self.config.device.execute_command_with_timeout( command="ping %s" % self.linux_host, case_type=DeviceTestType.cpp_test_lite, timeout=5) return error, result, handler def _do_test_run(self, command, request): test_to_run = self._collect_test_to_run(request, command) self._run_with_rerun(command, request, test_to_run) def _run_with_rerun(self, command, request, expected_tests): if self.config.xml_output: self.run("{} --gtest_output=xml:{}".format( command, self.config.device_report_path)) time.sleep(5) test_rerun = True if self.check_xml_exist(self.execute_bin + ".xml"): test_rerun = False test_run = self.read_nfs_xml(request, self.config.device_xml_path, test_rerun) if len(test_run) < len(expected_tests): expected_tests = TestDescription.remove_test(expected_tests, test_run) self._rerun_tests(command, expected_tests, None) else: test_tracker = CollectingLiteGTestListener() listener = request.listeners listener_copy = listener.copy() listener_copy.append(test_tracker) self.run(command, listener_copy) test_run = test_tracker.get_current_run_results() if len(test_run) != len(expected_tests): expected_tests = TestDescription.remove_test(expected_tests, test_run) self._rerun_tests(command, expected_tests, listener) def _rerun_tests(self, command, expected_tests, listener): if not expected_tests: LOG.debug("No tests to re-run, all tests executed at least once.") for test in expected_tests: self._re_run(command, test, listener) def _re_run(self, command, test, listener): if self.config.xml_output: _, _, handler = self.run("{} {}=*{} --gtest_output=xml:{}".format( command, GTestConst.exec_para_filter, test.test_name, self.config.device_report_path), listener, timeout=15) else: handler = None for _ in range(FAILED_RUN_TEST_ATTEMPTS): try: listener_copy = listener.copy() test_tracker = CollectingLiteGTestListener() listener_copy.append(test_tracker) _, _, handler = self.run("{} {}=*{}".format( command, GTestConst.exec_para_filter, test.test_name), listener_copy, timeout=15) if len(test_tracker.get_current_run_results()): return except LiteDeviceError as _: LOG.debug("Exception: ShellCommandUnresponsiveException") handler.parsers[0].mark_test_as_failed(test) def _collect_test_to_run(self, request, command): if self.rerun: tests = self.dry_run(request, command) return tests return [] def download_nfs_xml(self, request, report_path): remote_nfs = get_nfs_server(request) if not remote_nfs: err_msg = "The name of remote device {} does not match". \ format(self.remote) LOG.error(err_msg, error_no="00403") raise TypeError(err_msg) LOG.info("Trying to pull remote server: {}:{} report files to local " "in dir {}".format (remote_nfs.get("ip"), remote_nfs.get("port"), os.path.dirname(self.result))) result_dir = os.path.join(request.config.report_path, "result") os.makedirs(result_dir, exist_ok=True) try: if remote_nfs["remote"] == "true": import paramiko client = paramiko.Transport((remote_nfs.get("ip"), int(remote_nfs.get("port")))) client.connect(username=remote_nfs.get("username"), password=remote_nfs.get("password")) sftp = paramiko.SFTPClient.from_transport(client) files = sftp.listdir(report_path) for report_xml in files: if report_xml.endswith(".xml"): filepath = report_path + report_xml try: sftp.get(remotepath=filepath, localpath=os.path.join(os.path.split( self.result)[0], report_xml)) except IOError as error: LOG.error(error, error_no="00404") client.close() else: if os.path.isdir(report_path): for report_xml in os.listdir(report_path): if report_xml.endswith(".xml"): filepath = report_path + report_xml shutil.copy(filepath, os.path.join(os.path.split( self.result)[0], report_xml)) except (FileNotFoundError, IOError) as error: LOG.error("Download xml failed %s" % error, error_no="00403") def check_xml_exist(self, xml_file, timeout=60): ls_command = "ls %s" % self.config.device_report_path start_time = time.time() while time.time() - start_time < timeout: result, _, _ = self.config.device.execute_command_with_timeout( command=ls_command, case_type=DeviceTestType.cpp_test_lite, timeout=5, receiver=None) if xml_file in result: return True time.sleep(5) if (self.execute_bin + "_1.xml") in result: return False return False def read_nfs_xml(self, request, report_path, is_true=False): remote_nfs = get_nfs_server(request) if not remote_nfs: err_msg = "The name of remote device {} does not match". \ format(self.remote) LOG.error(err_msg, error_no="00403") raise TypeError(err_msg) tests = [] execute_bin_xml = (self.execute_bin + "_1.xml") if is_true else ( self.execute_bin + ".xml") LOG.debug("run into :{}".format(is_true)) file_path = os.path.join(report_path, execute_bin_xml) if not self.check_xml_exist(execute_bin_xml): return tests from xml.etree import ElementTree try: if remote_nfs["remote"] == "true": import paramiko client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname=remote_nfs.get("ip"), port=int(remote_nfs.get("port")), username=remote_nfs.get("username"), password=remote_nfs.get("password")) sftp_client = client.open_sftp() remote_file = sftp_client.open(file_path) try: result = remote_file.read().decode() suites_element = ElementTree.fromstring(result) for suite_element in suites_element: suite_name = suite_element.get("name", "") for case in suite_element: case_name = case.get("name") test = TestDescription(suite_name, case_name) if test not in tests: tests.append(test) finally: remote_file.close() client.close() else: if os.path.isdir(report_path): flags = os.O_RDONLY modes = stat.S_IWUSR | stat.S_IRUSR with os.fdopen(os.open(file_path, flags, modes), "r") as test_file: result = test_file.read() suites_element = ElementTree.fromstring(result) for suite_element in suites_element: suite_name = suite_element.get("name", "") for case in suite_element: case_name = case.get("name") test = TestDescription(suite_name, case_name) if test not in tests: tests.append(test) except (FileNotFoundError, IOError) as error: LOG.error("Download xml failed %s" % error, error_no="00403") except SyntaxError as error: LOG.error("Parse xml failed %s" % error, error_no="00404") return tests def delete_device_xml(self, request, report_path): remote_nfs = get_nfs_server(request) if not remote_nfs: err_msg = "The name of remote device {} does not match". \ format(self.remote) LOG.error(err_msg, error_no="00403") raise TypeError(err_msg) LOG.info("Delete xml directory {} from remote server: {}" "".format (report_path, remote_nfs.get("ip"))) if remote_nfs["remote"] == "true": import paramiko client = paramiko.Transport((remote_nfs.get("ip"), int(remote_nfs.get("port")))) client.connect(username=remote_nfs.get("username"), password=remote_nfs.get("password")) sftp = paramiko.SFTPClient.from_transport(client) try: sftp.stat(report_path) files = sftp.listdir(report_path) for report_xml in files: if report_xml.endswith(".xml"): filepath = "{}{}".format(report_path, report_xml) try: sftp.remove(filepath) time.sleep(0.5) except IOError as _: pass except FileNotFoundError as _: pass client.close() else: for report_xml in glob.glob(os.path.join(report_path, '*.xml')): try: os.remove(report_xml) except Exception as exception: LOG.error( "remove {} Failed:{}".format(report_xml, exception)) pass def __result__(self): return self.result if os.path.exists(self.result) else "" @Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite) class CTestDriver(IDriver): """ CTest is a test that runs a native test package on given lite device. """ config = None result = "" error_message = "" version_cmd = "AT+CSV" def __init__(self): self.file_name = "" self.run_third = False self.kit_type = None self.auto_deploy = None def __check_environment__(self, device_options): if len(device_options) != 1 or \ device_options[0].label != DeviceLabelType.wifiiot: self.error_message = "check environment failed" return False return True def __check_config__(self, config=None): del config self.config = None def __execute__(self, request): from xdevice import Variables try: self.config = request.config self.config.device = request.config.environment.devices[0] current_dir = request.config.resource_path if \ request.config.resource_path else Variables.exec_dir if request.root.source.source_file.strip(): source = os.path.join(current_dir, request.root.source.source_file.strip()) self.file_name = os.path.basename( request.root.source.source_file.strip()).split(".")[0] else: source = request.root.source.source_string.strip() json_config = JsonParser(source) kit_instances = get_kit_instances(json_config, request.config.resource_path, request.config.testcases_path) for (_, kit_info) in zip(kit_instances, json_config.get_kits()): self.auto_deploy = kit_info.get("auto_deploy", "") self.kit_type = kit_info.get("type", "") if self.kit_type == CKit.deploytool: self.run_third = True LOG.info("Kit type:{}".format(self.kit_type)) LOG.info("Auto deploy:{}".format(self.auto_deploy)) LOG.info("Run third:{}".format(self.run_third)) if self.run_third: LOG.info("Run the third-party vendor part") if self.auto_deploy in ["False", "flase"]: self._run_ctest_third_party(source=source, request=request) else: self._run_ctest_upgrade_party(source=source, request=request) else: LOG.debug("Run ctest") self._run_ctest(source=source, request=request) except (LiteDeviceExecuteCommandError, Exception) as exception: LOG.error(exception, error_no=getattr(exception, "error_no", "00000")) self.error_message = exception finally: report_name = "report" if request.root.source. \ test_name.startswith("{") else get_filename_extension( request.root.source.test_name)[0] self.result = check_result_report( request.config.report_path, self.result, self.error_message, report_name) def _run_ctest(self, source=None, request=None, timeout=90): parser_instances = [] parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite) try: if not source: LOG.error("Error: source don't exist %s." % source, error_no="00101") return version = get_test_component_version(self.config) for parser in parsers: parser_instance = parser.__class__() parser_instance.suites_name = self.file_name parser_instance.product_info.setdefault("Version", version) parser_instance.listeners = request.listeners parser_instances.append(parser_instance) handler = ShellHandler(parser_instances) reset_cmd = self._reset_device(request, source) self.result = "%s.xml" % os.path.join( request.config.report_path, "result", self.file_name) self.config.device.device.com_dict.get( ComType.deploy_com).connect() result, _, error = self.config.device.device. \ execute_command_with_timeout( command=reset_cmd, case_type=DeviceTestType.ctest_lite, key=ComType.deploy_com, timeout=timeout, receiver=handler) device_log_file = get_device_log_file(request.config.report_path, request.config.device. __get_serial__()) device_log_file_open = \ os.open(device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND, FilePermission.mode_755) with os.fdopen(device_log_file_open, "a") as file_name: file_name.write("{}{}".format( "\n".join(result.split("\n")[0:-1]), "\n")) file_name.flush() finally: self.config.device.device.com_dict.get( ComType.deploy_com).close() def _run_ctest_third_party(self, source=None, request=None, timeout=5): parser_instances = [] parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite) try: if not source: LOG.error("Error: source don't exist %s." % source, error_no="00101") return version = get_test_component_version(self.config) for parser in parsers: parser_instance = parser.__class__() parser_instance.suites_name = self.file_name parser_instance.product_info.setdefault("Version", version) parser_instance.listeners = request.listeners parser_instances.append(parser_instance) handler = ShellHandler(parser_instances) while True: input_burning = input("Please enter 'y' or 'n' " "after the burning is complete," "enter 'quit' to exit:") if input_burning.lower().strip() in ["y", "yes"]: LOG.info("Burning succeeded.") break elif input_burning.lower().strip() in ["n", "no"]: LOG.info("Burning failed.") elif input_burning.lower().strip() == "quit": break else: LOG.info({"The input parameter is incorrect,please" " enter 'y' or 'n' after the burning is " "complete,enter 'quit' to exit."}) LOG.info("Please press the reset button on the device ") time.sleep(5) self.result = "%s.xml" % os.path.join( request.config.report_path, "result", self.file_name) self.config.device.device.com_dict.get( ComType.deploy_com).connect() LOG.debug("Device com:{}".format(self.config.device.device)) result, _, error = self.config.device.device. \ execute_command_with_timeout( command=[], case_type=DeviceTestType.ctest_lite, key=ComType.deploy_com, timeout=timeout, receiver=handler) device_log_file = get_device_log_file(request.config.report_path, request.config.device. __get_serial__()) device_log_file_open = \ os.open(device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND, FilePermission.mode_755) with os.fdopen(device_log_file_open, "a") as file_name: file_name.write("{}{}".format( "\n".join(result.split("\n")[0:-1]), "\n")) file_name.flush() finally: self.config.device.device.com_dict.get( ComType.deploy_com).close() def _run_ctest_upgrade_party(self, source=None, request=None, timeout=90): parser_instances = [] parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite) try: if not source: LOG.error("Error: source don't exist %s." % source, error_no="00101") return version = get_test_component_version(self.config) for parser in parsers: parser_instance = parser.__class__() parser_instance.suites_name = self.file_name parser_instance.product_info.setdefault("Version", version) parser_instance.listeners = request.listeners parser_instances.append(parser_instance) handler = ShellHandler(parser_instances) result = self._reset_third_device(request, source) self.result = "%s.xml" % os.path.join( request.config.report_path, "result", self.file_name) if isinstance(result, list): self.config.device.device.com_dict.get( ComType.deploy_com).connect() LOG.debug("Device com:{}".format(self.config.device.device)) result, _, error = self.config.device.device. \ execute_command_with_timeout( command=request, case_type=DeviceTestType.ctest_lite, key=ComType.deploy_com, timeout=timeout, receiver=handler) else: handler.__read__(result) handler.__done__() device_log_file = get_device_log_file(request.config.report_path, request.config.device. __get_serial__()) device_log_file_open = \ os.open(device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND, FilePermission.mode_755) with os.fdopen(device_log_file_open, "a") as file_name: file_name.write("{}{}".format( "\n".join(result.split("\n")[0:-1]), "\n")) file_name.flush() finally: self.config.device.device.com_dict.get( ComType.deploy_com).close() def _reset_device(self, request, source): json_config = JsonParser(source) reset_cmd = [] kit_instances = get_kit_instances(json_config, request.config.resource_path, request.config.testcases_path) from xdevice import Scheduler for (kit_instance, kit_info) in zip(kit_instances, json_config.get_kits()): if not isinstance(kit_instance, DeployKit): continue if not self.file_name: self.file_name = get_config_value( 'burn_file', kit_info)[0].split("\\")[-1].split(".")[0] reset_cmd = kit_instance.burn_command if not Scheduler.is_execute: raise ExecuteTerminate("ExecuteTerminate", error_no="00300") kit_instance.__setup__( self.config.device) reset_cmd = [int(item, 16) for item in reset_cmd] return reset_cmd def _reset_third_device(self, request, source): json_config = JsonParser(source) reset_cmd = [] kit_instances = get_kit_instances(json_config, request.config.resource_path, request.config.testcases_path) from xdevice import Scheduler for (kit_instance, kit_info) in zip(kit_instances, json_config.get_kits()): if not isinstance(kit_instance, DeployToolKit): continue if not self.file_name: self.file_name = get_config_value( 'burn_file', kit_info)[0].split("\\")[-1].split(".")[0] if not Scheduler.is_execute: raise ExecuteTerminate("ExecuteTerminate", error_no="00300") reset_cmd = kit_instance.__setup__( self.config.device) return reset_cmd def __result__(self): return self.result if os.path.exists(self.result) else "" @Plugin(type=Plugin.DRIVER, id=DeviceTestType.open_source_test) class OpenSourceTestDriver(IDriver): """ OpenSourceTest is a test that runs a native test package on given device lite device. """ config = None result = "" error_message = "" has_param = False def __init__(self): self.rerun = True self.file_name = "" self.handler = None def __check_environment__(self, device_options): if len(device_options) != 1 or \ device_options[0].label != DeviceLabelType.ipcamera: self.error_message = "check environment failed" return False return True def __check_config__(self, config=None): pass def __execute__(self, request): kits = [] try: self.config = request.config setattr(self.config, "command_result", "") self.config.device = request.config.environment.devices[0] init_remote_server(self, request) config_file = request.root.source.config_file json_config = JsonParser(config_file) pre_cmd = get_config_value('pre_cmd', json_config.get_driver(), False) execute_dir = get_config_value('execute', json_config.get_driver(), False) kits = get_kit_instances(json_config, request.config.resource_path, request.config.testcases_path) from xdevice import Scheduler for kit in kits: if not Scheduler.is_execute: raise ExecuteTerminate("ExecuteTerminate", error_no="00300") copy_list = kit.__setup__(request.config.device, request=request) self.file_name = request.root.source.test_name self.set_file_name(request, request.root.source.test_name) self.config.device.execute_command_with_timeout( command=pre_cmd, timeout=1) self.config.device.execute_command_with_timeout( command="cd {}".format(execute_dir), timeout=1) device_log_file = get_device_log_file( request.config.report_path, request.config.device.__get_serial__()) device_log_file_open = \ os.open(device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND, FilePermission.mode_755) with os.fdopen(device_log_file_open, "a") as file_name: for test_bin in copy_list: if not test_bin.endswith(".run-test"): continue if test_bin.startswith("/"): command = ".%s" % test_bin else: command = "./%s" % test_bin self._do_test_run(command, request) file_name.write(self.config.command_result) file_name.flush() except (LiteDeviceExecuteCommandError, Exception) as exception: LOG.error(exception, error_no=getattr(exception, "error_no", "00000")) self.error_message = exception finally: LOG.info("-------------finally-----------------") # umount the dirs already mount for kit in kits: kit.__teardown__(request.config.device) self.config.device.close() report_name = "report" if request.root.source. \ test_name.startswith("{") else get_filename_extension( request.root.source.test_name)[0] self.result = check_result_report( request.config.report_path, self.result, self.error_message, report_name) def set_file_name(self, request, bin_file): self.result = "%s.xml" % os.path.join( request.config.report_path, "result", bin_file) def run(self, command=None, listener=None, timeout=20): parsers = get_plugin(Plugin.PARSER, ParserType.open_source_test) parser_instances = [] for parser in parsers: parser_instance = parser.__class__() parser_instance.suite_name = self.file_name parser_instance.test_name = command.replace("./", "") parser_instance.listeners = listener parser_instances.append(parser_instance) self.handler = ShellHandler(parser_instances) for _ in range(3): result, _, error = self.config.device.execute_command_with_timeout( command=command, case_type=DeviceTestType.open_source_test, timeout=timeout, receiver=self.handler) self.config.command_result = result if "pass" in result.lower(): break return error, result, self.handler def _do_test_run(self, command, request): listeners = request.listeners for listener in listeners: listener.device_sn = self.config.device.device_sn error, _, _ = self.run(command, listeners, timeout=60) if error: LOG.error( "Execute %s failed" % command, error_no="00402") def __result__(self): return self.result if os.path.exists(self.result) else "" @Plugin(type=Plugin.DRIVER, id=DeviceTestType.build_only_test) class BuildOnlyTestDriver(IDriver): """ BuildOnlyTest is a test that runs a native test package on given device lite device. """ config = None result = "" error_message = "" def __check_environment__(self, device_options): pass def __check_config__(self, config): pass def __execute__(self, request): self.config = request.config self.config.device = request.config.environment.devices[0] self.file_name = request.root.source.test_name self.config_file = request.root.source.config_file self.testcases_path = request.config.testcases_path file_path = self._get_log_file() result_list = self._get_result_list(file_path) if len(result_list) == 0: LOG.error( "Error: source don't exist %s." % request.root.source. source_file, error_no="00101") return total_result = '' for result in result_list: flags = os.O_RDONLY modes = stat.S_IWUSR | stat.S_IRUSR with os.fdopen(os.open(result, flags, modes), "r", encoding="utf-8") as file_content: result = file_content.read() if not result.endswith('\n'): result = '%s\n' % result total_result = '{}{}'.format(total_result, result) parsers = get_plugin(Plugin.PARSER, ParserType.build_only_test) parser_instances = [] for parser in parsers: parser_instance = parser.__class__() parser_instance.suite_name = self.file_name parser_instance.listeners = request.listeners parser_instances.append(parser_instance) handler = ShellHandler(parser_instances) generate_report(handler, total_result) @classmethod def _get_result_list(cls, file_path): result_list = list() for root_path, dirs_path, file_names in os.walk(file_path): for file_name in file_names: if file_name == "logfile": result_list.append(os.path.join(root_path, file_name)) return result_list def _get_log_file(self): json_config = JsonParser(self.config_file) log_path = get_config_value('log_path', json_config.get_driver(), False) log_path = str(log_path.replace("/", "", 1)) if log_path.startswith( "/") else str(log_path) LOG.debug("The log path is:%s" % log_path) file_path = get_file_absolute_path(log_path, paths=[self.testcases_path]) LOG.debug("The file path is:%s" % file_path) return file_path def __result__(self): return self.result if os.path.exists(self.result) else "" @Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite) class JSUnitTestLiteDriver(IDriver): """ JSUnitTestDriver is a Test that runs a native test package on given device. """ def __init__(self): self.result = "" self.error_message = "" self.kits = [] self.config = None def __check_environment__(self, device_options): pass def __check_config__(self, config): pass def _get_driver_config(self, json_config): bundle_name = get_config_value('bundle-name', json_config.get_driver(), False) if not bundle_name: raise ParamError("Can't find bundle-name in config file.", error_no="00108") else: self.config.bundle_name = bundle_name ability = get_config_value('ability', json_config.get_driver(), False) if not ability: self.config.ability = "default" else: self.config.ability = ability def __execute__(self, request): try: LOG.debug("Start execute xdevice extension JSUnit Test") self.config = request.config self.config.device = request.config.environment.devices[0] config_file = request.root.source.config_file suite_file = request.root.source.source_file if not suite_file: raise ParamError( "test source '%s' not exists" % request.root.source.source_string, error_no="00101") if not os.path.exists(config_file): LOG.error("Error: Test cases don't exist %s." % config_file, error_no="00101") raise ParamError( "Error: Test cases don't exist %s." % config_file, error_no="00101") self.file_name = os.path.basename( request.root.source.source_file.strip()).split(".")[0] self.result = "%s.xml" % os.path.join( request.config.report_path, "result", self.file_name) json_config = JsonParser(config_file) self.kits = get_kit_instances(json_config, self.config.resource_path, self.config.testcases_path) self._get_driver_config(json_config) from xdevice import Scheduler for kit in self.kits: if not Scheduler.is_execute: raise ExecuteTerminate("ExecuteTerminate", error_no="00300") if kit.__class__.__name__ == CKit.liteinstall: kit.bundle_name = self.config.bundle_name kit.__setup__(self.config.device, request=request) self._run_jsunit(request) except Exception as exception: self.error_message = exception finally: report_name = "report" if request.root.source. \ test_name.startswith("{") else get_filename_extension( request.root.source.test_name)[0] self.result = check_result_report( request.config.report_path, self.result, self.error_message, report_name) for kit in self.kits: kit.__teardown__(self.config.device) self.config.device.close() def _run_jsunit(self, request): parser_instances = [] parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite) for parser in parsers: parser_instance = parser.__class__() parser_instance.suites_name = self.file_name parser_instance.listeners = request.listeners parser_instances.append(parser_instance) handler = ShellHandler(parser_instances) command = "./bin/aa start -p %s -n %s" % \ (self.config.bundle_name, self.config.ability) result, _, error = self.config.device.execute_command_with_timeout( command=command, timeout=300, receiver=handler) device_log_file = get_device_log_file(request.config.report_path, request.config.device. __get_serial__()) device_log_file_open =\ os.open(device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND, FilePermission.mode_755) with os.fdopen(device_log_file_open, "a") as file_name: file_name.write("{}{}".format( "\n".join(result.split("\n")[0:-1]), "\n")) file_name.flush() def __result__(self): return self.result if os.path.exists(self.result) else ""