#!/usr/bin/env python3 # coding=utf-8 # # Copyright (c) 2020-2022 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import json import os import re import shutil import subprocess import sys import time import platform import zipfile import stat import random import xml.etree.ElementTree as ET from dataclasses import dataclass from json import JSONDecodeError from xdevice import DeviceTestType, check_result_report from xdevice import DeviceLabelType from xdevice import CommonParserType from xdevice import ExecuteTerminate from xdevice import DeviceError from xdevice import ShellHandler from xdevice import IDriver from xdevice import platform_logger from xdevice import Plugin from xdevice import get_plugin from ohos.environment.dmlib import process_command_ret from core.utils import get_decode from core.utils import get_fuzzer_path from core.config.resource_manager import ResourceManager from core.config.config_manager import FuzzerConfigManager __all__ = [ "CppTestDriver", "JSUnitTestDriver", "disable_keyguard", "GTestConst"] LOG = platform_logger("Drivers") DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test") OBJ = "obj" _ACE_LOG_MARKER = " a0c0d0" TIME_OUT = 900 * 1000 JS_TIMEOUT = 10 CYCLE_TIMES = 50 FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL MODES = stat.S_IWUSR | stat.S_IRUSR ############################################################################## ############################################################################## class CollectingOutputReceiver: def __init__(self): self.output = "" def __read__(self, output): self.output = "%s%s" % (self.output, output) def __error__(self, message): pass def __done__(self, result_code="", message=""): pass class DisplayOutputReceiver: def __init__(self): self.output = "" self.unfinished_line = "" def _process_output(self, output, end_mark="\n"): content = output if self.unfinished_line: content = "".join((self.unfinished_line, content)) self.unfinished_line = "" lines = content.split(end_mark) if content.endswith(end_mark): return lines[:-1] else: self.unfinished_line = lines[-1] return lines[:-1] def __read__(self, output): self.output = "%s%s" % (self.output, output) lines = self._process_output(output) for line in lines: line = line.strip() if line: LOG.info(get_decode(line)) def __error__(self, message): pass def __done__(self, result_code="", message=""): pass @dataclass class GTestConst(object): exec_para_filter = "--gtest_filter" exec_para_level = "--gtest_testsize" exec_acts_para_filter = "--jstest_filter" exec_acts_para_level = "--jstest_testsize" def get_device_log_file(report_path, serial=None, log_name="device_log"): from xdevice import Variables log_path = os.path.join(report_path, Variables.report_vars.log_dir) os.makedirs(log_path, exist_ok=True) serial = serial or time.time_ns() device_file_name = "{}_{}.log".format(log_name, serial) device_log_file = os.path.join(log_path, device_file_name) return device_log_file def get_level_para_string(level_string): level_list = list(set(level_string.split(","))) level_para_string = "" for item in level_list: if not item.isdigit(): continue item = item.strip(" ") level_para_string = f"{level_para_string}Level{item}," level_para_string = level_para_string.strip(",") return level_para_string def get_result_savepath(testsuit_path, result_rootpath): findkey = os.sep + "tests" + os.sep filedir, _ = os.path.split(testsuit_path) pos = filedir.find(findkey) if -1 != pos: subpath = filedir[pos + len(findkey):] pos1 = subpath.find(os.sep) if -1 != pos1: subpath = subpath[pos1 + len(os.sep):] result_path = os.path.join(result_rootpath, "result", subpath) else: result_path = os.path.join(result_rootpath, "result") else: result_path = os.path.join(result_rootpath, "result") if not os.path.exists(result_path): os.makedirs(result_path) LOG.info("result_savepath = " + result_path) return result_path def get_test_log_savepath(result_rootpath, result_suit_path): suit_path = result_suit_path.split("result")[-1].strip(os.sep).split(os.sep)[0] test_log_path = os.path.join(result_rootpath, "log", "test_log", suit_path) if not os.path.exists(test_log_path): os.makedirs(test_log_path) LOG.info("test_log_savepath = {}".format(test_log_path)) return test_log_path def update_xml(suite_file, result_xml): suite_path_txt = suite_file.split(".")[0] + "_path.txt" if os.path.exists(suite_path_txt) and os.path.exists(result_xml): with open(suite_path_txt, "r") as path_text: line = path_text.readline().replace("//", "").strip() tree = ET.parse(result_xml) tree.getroot().attrib["path"] = line tree.write(result_xml) # all testsuit common Unavailable test result xml def _create_empty_result_file(filepath, filename, error_message): error_message = str(error_message) error_message = error_message.replace("\"", "") error_message = error_message.replace("<", "") error_message = error_message.replace(">", "") error_message = error_message.replace("&", "") if filename.endswith(".hap"): filename = filename.split(".")[0] if not os.path.exists(filepath): with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) file_desc.write('\n') file_desc.write( '\n' % time_stamp) file_desc.write( ' \n' % (filename, error_message)) file_desc.write(' \n') file_desc.write('\n') return def _unlock_screen(device): device.execute_shell_command("svc power stayon true") time.sleep(1) def _unlock_device(device): device.execute_shell_command("input keyevent 82") time.sleep(1) device.execute_shell_command("wm dismiss-keyguard") time.sleep(1) def _lock_screen(device): device.execute_shell_command("svc power stayon false") time.sleep(1) def disable_keyguard(device): _unlock_screen(device) _unlock_device(device) def _sleep_according_to_result(result): if result: time.sleep(1) def _create_fuzz_crash_file(filepath, filename): if not os.path.exists(filepath): with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) file_desc.write('\n') file_desc.write( '\n' % time_stamp) file_desc.write( ' \n' % filename) file_desc.write( ' \n' % (filename, filename)) file_desc.write( ' \n') file_desc.write(' \n') file_desc.write(' \n') file_desc.write(' \n') file_desc.write('\n') return def _create_fuzz_pass_file(filepath, filename): if not os.path.exists(filepath): with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) file_desc.write('\n') file_desc.write( '\n' % time_stamp) file_desc.write( ' \n' % filename) file_desc.write( ' \n' % (filename, filename)) file_desc.write(' \n') file_desc.write('\n') return def _create_fuzz_result_file(filepath, filename, error_message): error_message = str(error_message) error_message = error_message.replace("\"", "") error_message = error_message.replace("<", "") error_message = error_message.replace(">", "") error_message = error_message.replace("&", "") if "AddressSanitizer" in error_message: LOG.error("FUZZ TEST CRASH") _create_fuzz_crash_file(filepath, filename) elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second', error_message, re.M) is not None: LOG.info("FUZZ TEST PASS") _create_fuzz_pass_file(filepath, filename) else: LOG.error("FUZZ TEST UNAVAILABLE") _create_empty_result_file(filepath, filename, error_message) return ############################################################################## ############################################################################## class ResultManager(object): def __init__(self, testsuit_path, config): self.testsuite_path = testsuit_path self.config = config self.result_rootpath = self.config.report_path self.device = self.config.device if testsuit_path.endswith(".hap"): self.device_testpath = self.config.test_hap_out_path else: self.device_testpath = self.config.target_test_path self.testsuite_name = os.path.basename(self.testsuite_path) self.is_coverage = False def set_is_coverage(self, is_coverage): self.is_coverage = is_coverage def get_test_results(self, error_message=""): # Get test result files filepath, _ = self.obtain_test_result_file() if "fuzztest" == self.config.testtype[0]: LOG.info("create fuzz test report") _create_fuzz_result_file(filepath, self.testsuite_name, error_message) if not self.is_coverage: self._obtain_fuzz_corpus() if not os.path.exists(filepath): _create_empty_result_file(filepath, self.testsuite_name, error_message) if "benchmark" == self.config.testtype[0]: self._obtain_benchmark_result() # Get coverage data files if self.is_coverage: self.obtain_coverage_data() return filepath def get_test_results_hidelog(self, error_message=""): # Get test result files result_file_path, test_log_path = self.obtain_test_result_file() log_content = "" if not error_message: if os.path.exists(test_log_path): with open(test_log_path, "r") as log: log_content = log.readlines() else: LOG.error("{}: Test log not exist.".format(test_log_path)) else: log_content = error_message if "fuzztest" == self.config.testtype[0]: LOG.info("create fuzz test report") _create_fuzz_result_file(result_file_path, self.testsuite_name, log_content) if not self.is_coverage: self._obtain_fuzz_corpus() if not os.path.exists(result_file_path): _create_empty_result_file(result_file_path, self.testsuite_name, log_content) if "benchmark" == self.config.testtype[0]: self._obtain_benchmark_result() # Get coverage data files if self.is_coverage: self.obtain_coverage_data() return result_file_path def _obtain_fuzz_corpus(self): command = f"cd {DEFAULT_TEST_PATH}; tar czf {self.testsuite_name}_corpus.tar.gz corpus;" self.config.device.execute_shell_command(command) result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath) LOG.info(f"fuzz_dir = {result_save_path}") self.device.pull_file(f"{DEFAULT_TEST_PATH}/{self.testsuite_name}_corpus.tar.gz", result_save_path) def _obtain_benchmark_result(self): benchmark_root_dir = os.path.abspath( os.path.join(self.result_rootpath, "benchmark")) benchmark_dir = os.path.abspath( os.path.join(benchmark_root_dir, self.get_result_sub_save_path(), self.testsuite_name)) if not os.path.exists(benchmark_dir): os.makedirs(benchmark_dir) LOG.info("benchmark_dir = %s" % benchmark_dir) self.device.pull_file(os.path.join(self.device_testpath, "%s.json" % self.testsuite_name), benchmark_dir) if not os.path.exists(os.path.join(benchmark_dir, "%s.json" % self.testsuite_name)): os.rmdir(benchmark_dir) return benchmark_dir def get_result_sub_save_path(self): find_key = os.sep + "benchmark" + os.sep file_dir, _ = os.path.split(self.testsuite_path) pos = file_dir.find(find_key) subpath = "" if -1 != pos: subpath = file_dir[pos + len(find_key):] LOG.info("subpath = " + subpath) return subpath def obtain_test_result_file(self): result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath) result_file_path = os.path.join(result_save_path, "%s.xml" % self.testsuite_name) result_josn_file_path = os.path.join(result_save_path, "%s.json" % self.testsuite_name) if self.testsuite_path.endswith('.hap'): remote_result_file = os.path.join(self.device_testpath, "testcase_result.xml") remote_json_result_file = os.path.join(self.device_testpath, "%s.json" % self.testsuite_name) else: remote_result_file = os.path.join(self.device_testpath, "%s.xml" % self.testsuite_name) remote_json_result_file = os.path.join(self.device_testpath, "%s.json" % self.testsuite_name) if self.config.testtype[0] != "fuzztest": if self.device.is_file_exist(remote_result_file): self.device.pull_file(remote_result_file, result_file_path) elif self.device.is_file_exist(remote_json_result_file): self.device.pull_file(remote_json_result_file, result_josn_file_path) result_file_path = result_josn_file_path else: LOG.info("%s not exist", remote_result_file) if self.config.hidelog: remote_log_result_file = os.path.join(self.device_testpath, "%s.log" % self.testsuite_name) test_log_save_path = get_test_log_savepath(self.result_rootpath, result_save_path) test_log_file_path = os.path.join(test_log_save_path, "%s.log" % self.testsuite_name) self.device.pull_file(remote_log_result_file, test_log_file_path) return result_file_path, test_log_file_path return result_file_path, "" def make_empty_result_file(self, error_message=""): result_savepath = get_result_savepath(self.testsuite_path, self.result_rootpath) result_filepath = os.path.join(result_savepath, "%s.xml" % self.testsuite_name) if not os.path.exists(result_filepath): _create_empty_result_file(result_filepath, self.testsuite_name, error_message) def is_exist_target_in_device(self, path, target): if platform.system() == "Windows": command = '\"ls -l %s | grep %s\"' % (path, target) else: command = "ls -l %s | grep %s" % (path, target) check_result = False stdout_info = self.device.execute_shell_command(command) if stdout_info != "" and stdout_info.find(target) != -1: check_result = True return check_result def obtain_coverage_data(self): cov_root_dir = os.path.abspath(os.path.join( self.result_rootpath, "..", "coverage", "data", "exec")) tests_path = self.config.testcases_path test_type = self.testsuite_path.split(tests_path)[1].strip(os.sep).split(os.sep)[0] cxx_cov_path = os.path.abspath(os.path.join( self.result_rootpath, "..", "coverage", "data", "cxx", self.testsuite_name + '_' + test_type)) if os.path.basename(self.testsuite_name).startswith("rust_"): target_name = "lib.unstripped" else: target_name = OBJ if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name): if not os.path.exists(cxx_cov_path): os.makedirs(cxx_cov_path) else: cxx_cov_path = cxx_cov_path + f"_{str(int(time.time()))}" self.config.device.execute_shell_command( "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name)) src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name) self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT) tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name) if platform.system() == "Windows": process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True) process.communicate() os.remove(tar_path) os.rename(os.path.join(cxx_cov_path, target_name), os.path.join(cxx_cov_path, OBJ)) else: subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" % (tar_path, cxx_cov_path), shell=True).communicate() subprocess.Popen("rm -rf %s" % tar_path, shell=True).communicate() if target_name != OBJ: subprocess.Popen("mv %s %s" % (os.path.join(cxx_cov_path, target_name), os.path.join(cxx_cov_path, OBJ)), shell=True).communicate() ############################################################################## ############################################################################## @Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test) class CppTestDriver(IDriver): """ CppTest is a Test that runs a native test package on given device. """ # test driver config config = None result = "" def __check_environment__(self, device_options): if len(device_options) == 1 and device_options[0].label is None: return True if len(device_options) != 1 or \ device_options[0].label != DeviceLabelType.phone: return False return True def __check_config__(self, config): pass def __result__(self): return self.result if os.path.exists(self.result) else "" def __execute__(self, request): try: self.config = request.config self.config.target_test_path = DEFAULT_TEST_PATH self.config.device = request.config.environment.devices[0] suite_file = request.root.source.source_file LOG.debug("Testsuite FilePath: %s" % suite_file) if not suite_file: LOG.error("test source '%s' not exists" % request.root.source.source_string) return if not self.config.device: result = ResultManager(suite_file, self.config) result.set_is_coverage(False) result.make_empty_result_file( "No test device is found. ") return self.config.device.set_device_report_path(request.config.report_path) self.config.device.device_log_collector.start_hilog_task() self._init_gtest() self._run_gtest(suite_file) finally: serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns()) log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace( ":", "_")) self.config.device.device_log_collector.stop_hilog_task( log_tar_file_name, module_name=request.get_module_name()) def _init_gtest(self): self.config.device.connector_command("target mount") self.config.device.execute_shell_command( "rm -rf %s" % self.config.target_test_path) self.config.device.execute_shell_command( "mkdir -p %s" % self.config.target_test_path) self.config.device.execute_shell_command( "mount -o rw,remount,rw /") if "fuzztest" == self.config.testtype[0]: self.config.device.execute_shell_command( "mkdir -p %s" % os.path.join(self.config.target_test_path, "corpus")) def _gtest_command(self, suite_file): filename = os.path.basename(suite_file) test_para = self._get_test_para(self.config.testcase, self.config.testlevel, self.config.testtype, self.config.target_test_path, suite_file, filename) # execute testcase if not self.config.coverage: if self.config.random == "random": seed = random.randint(1, 100) command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s --gtest_shuffle --gtest_random_seed=%d" % ( self.config.target_test_path, filename, filename, test_para, seed) else: command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % ( self.config.target_test_path, filename, filename, test_para) else: coverage_outpath = self.config.coverage_outpath if coverage_outpath: strip_num = len(coverage_outpath.strip("/").split("/")) else: ohos_config_path = os.path.join(sys.source_code_root_path, "out", "ohos_config.json") with open(ohos_config_path, 'r') as json_file: json_info = json.load(json_file) out_path = json_info.get("out_path") strip_num = len(out_path.strip("/").split("/")) if "fuzztest" == self.config.testtype[0]: self._push_corpus_cov_if_exist(suite_file) command = f"cd {self.config.target_test_path}; tar zxf {filename}_corpus.tar.gz; \ rm -rf {filename}.xml; chmod +x *; GCOV_PREFIX={DEFAULT_TEST_PATH}; \ GCOV_PREFIX_STRIP={strip_num} ./{filename} {test_para}" else: command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=%s " \ "GCOV_PREFIX_STRIP=%s ./%s %s" % \ (self.config.target_test_path, filename, DEFAULT_TEST_PATH, str(strip_num), filename, test_para) if self.config.hidelog: command += " > {}.log 2>&1".format(filename) return command def _run_gtest(self, suite_file): from xdevice import Variables is_coverage_test = True if self.config.coverage else False # push testsuite file self.config.device.push_file(suite_file, self.config.target_test_path) self.config.device.execute_shell_command( "hilog -d %s" % (os.path.join(self.config.target_test_path, os.path.basename(suite_file))) ) self._push_corpus_if_exist(suite_file) # push resource files resource_manager = ResourceManager() resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device) command = self._gtest_command(suite_file) result = ResultManager(suite_file, self.config) result.set_is_coverage(is_coverage_test) try: # get result if self.config.hidelog: return_message = "" display_receiver = CollectingOutputReceiver() self.config.device.execute_shell_command( command, receiver=display_receiver, timeout=TIME_OUT, retry=0) else: display_receiver = DisplayOutputReceiver() self.config.device.execute_shell_command( command, receiver=display_receiver, timeout=TIME_OUT, retry=0) return_message = display_receiver.output except (ExecuteTerminate, DeviceError) as exception: return_message = str(exception.args) if self.config.hidelog: self.result = result.get_test_results_hidelog(return_message) else: self.result = result.get_test_results(return_message) update_xml(suite_file, self.result) resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device) @staticmethod def _alter_init(name): with open(name, "rb") as f: lines = f.read() str_content = lines.decode("utf-8") pattern_sharp = '^\s*#.*$(\n|\r\n)' pattern_star = '/\*.*?\*/(\n|\r\n)+' pattern_xml = '(\n|\r\n)+' if re.match(pattern_sharp, str_content, flags=re.M): striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M) elif re.findall(pattern_star, str_content, flags=re.S): striped_content = re.sub(pattern_star, '', str_content, flags=re.S) elif re.findall(pattern_xml, str_content, flags=re.S): striped_content = re.sub(pattern_xml, '', str_content, flags=re.S) else: striped_content = str_content striped_bt = striped_content.encode("utf-8") if os.path.exists(name): os.remove(name) with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f: f.write(striped_bt) def _push_corpus_cov_if_exist(self, suite_file): corpus_path = suite_file.split("fuzztest")[-1].strip(os.sep) cov_file = os.path.join( sys.framework_root_dir, "reports", "latest_corpus", corpus_path + "_corpus.tar.gz") LOG.info("corpus_cov file :%s" % str(cov_file)) self.config.device.push_file(cov_file, os.path.join(self.config.target_test_path)) def _push_corpus_if_exist(self, suite_file): if "fuzztest" == self.config.testtype[0]: corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus") if not os.path.isdir(corpus_path): return corpus_dirs = [] corpus_file_list = [] for root, _, files in os.walk(corpus_path): if not files: continue corpus_dir = root.split("corpus")[-1] if corpus_dir != "": corpus_dirs.append(corpus_dir) for file in files: cp_file = os.path.normcase(os.path.join(root, file)) corpus_file_list.append(cp_file) if file == "init": self._alter_init(cp_file) # mkdir corpus files dir if corpus_dirs: for corpus in corpus_dirs: mkdir_corpus_command = f"shell; mkdir -p {corpus}" self.config.device.connector_command(mkdir_corpus_command) # push corpus file if corpus_file_list: for corpus_file in corpus_file_list: self.config.device.push_file(corpus_file, os.path.join(self.config.target_test_path, "corpus")) def _get_test_para(self, testcase, testlevel, testtype, target_test_path, suite_file, filename): if "benchmark" == testtype[0]: test_para = (" --benchmark_out_format=json" " --benchmark_out=%s%s.json") % ( target_test_path, filename) return test_para if "" != testcase and "" == testlevel: test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) elif "" == testcase and "" != testlevel: level_para = get_level_para_string(testlevel) test_para = "%s=%s" % (GTestConst.exec_para_level, level_para) else: test_para = "" if "fuzztest" == testtype[0]: cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path( suite_file), "project.xml")).get_fuzzer_config("fuzztest") LOG.info("config list :%s" % str(cfg_list)) if self.config.coverage: test_para += "corpus -runs=0" + \ " -max_len=" + cfg_list[0] + \ " -max_total_time=" + cfg_list[1] + \ " -rss_limit_mb=" + cfg_list[2] else: test_para += "corpus -max_len=" + cfg_list[0] + \ " -max_total_time=" + cfg_list[1] + \ " -rss_limit_mb=" + cfg_list[2] return test_para ############################################################################## ############################################################################## @Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test) class JSUnitTestDriver(IDriver): """ JSUnitTestDriver is a Test that runs a native test package on given device. """ def __init__(self): self.config = None self.result = "" self.start_time = None self.ability_name = "" self.package_name = "" # log self.hilog = None self.hilog_proc = None def __check_environment__(self, device_options): pass def __check_config__(self, config): pass def __result__(self): return self.result if os.path.exists(self.result) else "" def __execute__(self, request): try: LOG.info("developer_test driver") self.config = request.config self.config.target_test_path = DEFAULT_TEST_PATH self.config.device = request.config.environment.devices[0] suite_file = request.root.source.source_file result_save_path = get_result_savepath(suite_file, self.config.report_path) self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) if not suite_file: LOG.error("test source '%s' not exists" % request.root.source.source_string) return if not self.config.device: result = ResultManager(suite_file, self.config) result.set_is_coverage(False) result.make_empty_result_file( "No test device is found") return package_name, ability_name = self._get_package_and_ability_name( suite_file) self.package_name = package_name self.ability_name = ability_name self.config.test_hap_out_path = \ "/data/data/%s/files/" % self.package_name self.config.device.connector_command("shell hilog -r") self.hilog = get_device_log_file( request.config.report_path, request.config.device.__get_serial__() + "_" + request. get_module_name(), "device_hilog") hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o755) with os.fdopen(hilog_open, "a") as hilog_file_pipe: self.config.device.device_log_collector.add_log_address(None, self.hilog) _, self.hilog_proc = self.config.device.device_log_collector.\ start_catch_device_log(hilog_file_pipe=hilog_file_pipe) self._init_jsunit_test() self._run_jsunit(suite_file, self.hilog) hilog_file_pipe.flush() self.generate_console_output(self.hilog, request) xml_path = os.path.join( request.config.report_path, "result", '.'.join((request.get_module_name(), "xml"))) shutil.move(xml_path, self.result) update_xml(suite_file, self.result) finally: self.config.device.device_log_collector.remove_log_address(None, self.hilog) self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc) def _init_jsunit_test(self): self.config.device.connector_command("target mount") self.config.device.execute_shell_command( "rm -rf %s" % self.config.target_test_path) self.config.device.execute_shell_command( "mkdir -p %s" % self.config.target_test_path) self.config.device.execute_shell_command( "mount -o rw,remount,rw /") def _run_jsunit(self, suite_file, device_log_file): filename = os.path.basename(suite_file) _, suffix_name = os.path.splitext(filename) resource_manager = ResourceManager() resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) if suffix_name == ".hap": json_file_path = suite_file.replace(".hap", ".json") if os.path.exists(json_file_path): timeout = self._get_json_shell_timeout(json_file_path) else: timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) else: timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device) main_result = self._install_hap(suite_file) result = ResultManager(suite_file, self.config) if main_result: self._execute_hapfile_jsunittest() try: status = False actiontime = JS_TIMEOUT times = CYCLE_TIMES if timeout: actiontime = timeout times = 1 with open(device_log_file, "r", encoding='utf-8', errors='ignore') as file_read_pipe: for i in range(0, times): if status: break else: time.sleep(float(actiontime)) start_time = int(time.time()) while True: data = file_read_pipe.readline() if data.lower().find(_ACE_LOG_MARKER) != -1 and data.find("[end] run suites end") != -1: LOG.info("execute testcase successfully.") status = True break if int(time.time()) - start_time > 5: break finally: _lock_screen(self.config.device) self._uninstall_hap(self.package_name) else: self.result = result.get_test_results("Error: install hap failed") LOG.error("Error: install hap failed") resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device) def generate_console_output(self, device_log_file, request): result_message = self.read_device_log(device_log_file) report_name = request.get_module_name() parsers = get_plugin( Plugin.PARSER, CommonParserType.jsunit) if parsers: parsers = parsers[:1] for listener in request.listeners: listener.device_sn = self.config.device.device_sn parser_instances = [] for parser in parsers: parser_instance = parser.__class__() parser_instance.suites_name = report_name parser_instance.suite_name = report_name parser_instance.listeners = request.listeners parser_instances.append(parser_instance) handler = ShellHandler(parser_instances) process_command_ret(result_message, handler) def read_device_log(self, device_log_file): result_message = "" with open(device_log_file, "r", encoding='utf-8', errors='ignore') as file_read_pipe: while True: data = file_read_pipe.readline() if not data: break # only filter JSApp log if data.lower().find(_ACE_LOG_MARKER) != -1: result_message += data if data.find("[end] run suites end") != -1: break return result_message def _execute_hapfile_jsunittest(self): _unlock_screen(self.config.device) _unlock_device(self.config.device) try: return_message = self.start_hap_execute() except (ExecuteTerminate, DeviceError) as exception: return_message = str(exception.args) return return_message def _install_hap(self, suite_file): message = self.config.device.connector_command("install %s" % suite_file) message = str(message).rstrip() if message == "" or "success" in message: return_code = True if message != "": LOG.info(message) else: return_code = False if message != "": LOG.warning(message) _sleep_according_to_result(return_code) return return_code def start_hap_execute(self): try: command = "aa start -d 123 -a %s.MainAbility -b %s" \ % (self.package_name, self.package_name) self.start_time = time.time() result_value = self.config.device.execute_shell_command( command, timeout=TIME_OUT) if "success" in str(result_value).lower(): LOG.info("execute %s's testcase success. result value=%s" % (self.package_name, result_value)) else: LOG.info("execute %s's testcase failed. result value=%s" % (self.package_name, result_value)) _sleep_according_to_result(result_value) return_message = result_value except (ExecuteTerminate, DeviceError) as exception: return_message = exception.args return return_message def _uninstall_hap(self, package_name): return_message = self.config.device.execute_shell_command( "bm uninstall -n %s" % package_name) _sleep_according_to_result(return_message) return return_message @staticmethod def _get_acts_test_para(testcase, testlevel, testtype, target_test_path, suite_file, filename): if "actstest" == testtype[0]: test_para = (" --actstest_out_format=json" " --actstest_out=%s%s.json") % ( target_test_path, filename) return test_para if "" != testcase and "" == testlevel: test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase) elif "" == testcase and "" != testlevel: level_para = get_level_para_string(testlevel) test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para) else: test_para = "" return test_para @staticmethod def _get_hats_test_para(testcase, testlevel, testtype, target_test_path, suite_file, filename): if "hatstest" == testtype[0]: test_hats_para = (" --hatstest_out_format=json" " --hatstest_out=%s%s.json") % ( target_test_path, filename) return test_hats_para if "" != testcase and "" == testlevel: test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) elif "" == testcase and "" != testlevel: level_para = get_level_para_string(testlevel) test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para) else: test_hats_para = "" return test_hats_para @classmethod def _get_json_shell_timeout(cls, json_filepath): test_timeout = 0 try: with open(json_filepath, 'r') as json_file: data_dic = json.load(json_file) if not data_dic: return test_timeout else: if "driver" in data_dic.keys(): driver_dict = data_dic.get("driver") if driver_dict and "test-timeout" in driver_dict.keys(): test_timeout = int(driver_dict["shell-timeout"]) / 1000 return test_timeout except JSONDecodeError: return test_timeout finally: print(" get json shell timeout finally") @staticmethod def _get_package_and_ability_name(hap_filepath): package_name = "" ability_name = "" if os.path.exists(hap_filepath): filename = os.path.basename(hap_filepath) # unzip the hap file hap_bak_path = os.path.abspath(os.path.join( os.path.dirname(hap_filepath), "%s.bak" % filename)) zf_desc = zipfile.ZipFile(hap_filepath) try: zf_desc.extractall(path=hap_bak_path) except RuntimeError as error: print("Unzip error: ", hap_bak_path) zf_desc.close() # verify config.json file app_profile_path = os.path.join(hap_bak_path, "config.json") if not os.path.exists(app_profile_path): print("file %s not exist" % app_profile_path) return package_name, ability_name if os.path.isdir(app_profile_path): print("%s is a folder, and not a file" % app_profile_path) return package_name, ability_name # get package_name and ability_name value load_dict = {} with open(app_profile_path, 'r') as load_f: load_dict = json.load(load_f) profile_list = load_dict.values() for profile in profile_list: package_name = profile.get("package") if not package_name: continue abilities = profile.get("abilities") for abilitie in abilities: abilities_name = abilitie.get("name") if abilities_name.startswith("."): ability_name = package_name + abilities_name[ abilities_name.find("."):] else: ability_name = abilities_name break break # delete hap_bak_path if os.path.exists(hap_bak_path): shutil.rmtree(hap_bak_path) else: print("file %s not exist" % hap_filepath) return package_name, ability_name @Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test) class OHRustTestDriver(IDriver): def __init__(self): self.result = "" self.error_message = "" self.config = None def __check_environment__(self, device_options): pass def __check_config__(self, config): pass def __execute__(self, request): try: LOG.debug("Start to execute open harmony rust test") self.config = request.config self.config.device = request.config.environment.devices[0] self.config.target_test_path = DEFAULT_TEST_PATH suite_file = request.root.source.source_file LOG.debug("Testsuite filepath:{}".format(suite_file)) if not suite_file: LOG.error("test source '{}' not exists".format( request.root.source.source_string)) return result_save_path = get_result_savepath(suite_file, self.config.report_path) self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) self.config.device.set_device_report_path(request.config.report_path) self.config.device.device_log_collector.start_hilog_task() self._init_oh_rust() self._run_oh_rust(suite_file, request) except Exception as exception: self.error_message = exception if not getattr(exception, "error_no", ""): setattr(exception, "error_no", "03409") LOG.exception(self.error_message, exc_info=False, error_no="03409") finally: serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns()) log_tar_file_name = "{}_{}".format( request.get_module_name(), str(serial).replace(":", "_")) self.config.device.device_log_collector.stop_hilog_task( log_tar_file_name, module_name=request.get_module_name()) xml_path = os.path.join( request.config.report_path, "result", '.'.join((request.get_module_name(), "xml"))) shutil.move(xml_path, self.result) self.result = check_result_report( request.config.report_path, self.result, self.error_message) update_xml(request.root.source.source_file, self.result) def _init_oh_rust(self): self.config.device.connector_command("target mount") self.config.device.execute_shell_command( "mount -o rw,remount,rw /") def _run_oh_rust(self, suite_file, request=None): self.config.device.push_file(suite_file, self.config.target_test_path) self.config.device.execute_shell_command( "hilog -d %s" % (os.path.join(self.config.target_test_path, os.path.basename(suite_file))) ) resource_manager = ResourceManager() resource_data_dict, resource_dir = \ resource_manager.get_resource_data_dic(suite_file) resource_manager.process_preparer_data(resource_data_dict, resource_dir, self.config.device) for listener in request.listeners: listener.device_sn = self.config.device.device_sn parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust) if parsers: parsers = parsers[:1] parser_instances = [] for parser in parsers: parser_instance = parser.__class__() parser_instance.suite_name = request.get_module_name() parser_instance.listeners = request.listeners parser_instances.append(parser_instance) handler = ShellHandler(parser_instances) if self.config.coverage: command = "cd {}; chmod +x *; GCOV_PREFIX=. ./{}".format( self.config.target_test_path, os.path.basename(suite_file)) else: command = "cd {}; chmod +x *; ./{}".format( self.config.target_test_path, os.path.basename(suite_file)) self.config.device.execute_shell_command( command, timeout=TIME_OUT, receiver=handler, retry=0) if self.config.coverage: result = ResultManager(suite_file, self.config) result.obtain_coverage_data() resource_manager.process_cleaner_data(resource_data_dict, resource_dir, self.config.device) def __result__(self): return self.result if os.path.exists(self.result) else ""