#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import os
import re
import shutil
import time
import platform
import zipfile
import stat
from dataclasses import dataclass
from xdevice import DeviceTestType
from xdevice import DeviceLabelType
from xdevice import ExecuteTerminate
from xdevice import DeviceError
from xdevice import ShellHandler
from xdevice import IDriver
from xdevice import platform_logger
from xdevice import Plugin
from xdevice import get_plugin
from xdevice_extension._core.constants import CommonParserType
from xdevice_extension._core.environment.dmlib import process_command_ret
from core.utils import get_decode
from core.utils import get_fuzzer_path
from core.config.resource_manager import ResourceManager
from core.config.config_manager import FuzzerConfigManager
__all__ = [
"CppTestDriver",
"JSUnitTestDriver",
"disable_keyguard",
"GTestConst"]
LOG = platform_logger("Drivers")
DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test")
TIME_OUT = 900 * 1000
##############################################################################
##############################################################################
class DisplayOutputReceiver:
def __init__(self):
self.output = ""
self.unfinished_line = ""
def _process_output(self, output, end_mark="\n"):
content = output
if self.unfinished_line:
content = "".join((self.unfinished_line, content))
self.unfinished_line = ""
lines = content.split(end_mark)
if content.endswith(end_mark):
return lines[:-1]
else:
self.unfinished_line = lines[-1]
return lines[:-1]
def __read__(self, output):
self.output = "%s%s" % (self.output, output)
lines = self._process_output(output)
for line in lines:
line = line.strip()
if line:
LOG.info(get_decode(line))
def __error__(self, message):
pass
def __done__(self, result_code="", message=""):
pass
@dataclass
class GTestConst(object):
exec_para_filter = "--gtest_filter"
exec_para_level = "--gtest_testsize"
def get_device_log_file(report_path, serial=None, log_name="device_log"):
from xdevice import Variables
log_path = os.path.join(report_path, Variables.report_vars.log_dir)
os.makedirs(log_path, exist_ok=True)
serial = serial or time.time_ns()
device_file_name = "{}_{}.log".format(log_name, serial)
device_log_file = os.path.join(log_path, device_file_name)
return device_log_file
def get_level_para_string(level_string):
level_list = list(set(level_string.split(",")))
level_para_string = ""
for item in level_list:
if not item.isdigit():
continue
item = item.strip(" ")
level_para_string += ("Level%s," % item)
level_para_string = level_para_string.strip(",")
return level_para_string
def get_result_savepath(testsuit_path, result_rootpath):
findkey = os.sep + "tests" + os.sep
filedir, _ = os.path.split(testsuit_path)
pos = filedir.find(findkey)
if -1 != pos:
subpath = filedir[pos + len(findkey):]
pos1 = subpath.find(os.sep)
if -1 != pos1:
subpath = subpath[pos1 + len(os.sep):]
result_path = os.path.join(result_rootpath, "result", subpath)
else:
result_path = os.path.join(result_rootpath, "result")
else:
result_path = os.path.join(result_rootpath, "result")
if not os.path.exists(result_path):
os.makedirs(result_path)
LOG.info("result_savepath = " + result_path)
return result_path
# all testsuit common Unavailable test result xml
def _create_empty_result_file(filepath, filename, error_message):
error_message = str(error_message)
error_message = error_message.replace("\"", "")
error_message = error_message.replace("<", "")
error_message = error_message.replace(">", "")
error_message = error_message.replace("&", "")
if filename.endswith(".hap"):
filename = filename.split(".")[0]
if not os.path.exists(filepath):
with open(filepath, "w", encoding='utf-8') as file_desc:
time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime())
file_desc.write('\n')
file_desc.write(
'\n' % time_stamp)
file_desc.write(
' \n' %
(filename, error_message))
file_desc.write(' \n')
file_desc.write('\n')
return
def _unlock_screen(device):
device.execute_shell_command("svc power stayon true")
time.sleep(1)
def _unlock_device(device):
device.execute_shell_command("input keyevent 82")
time.sleep(1)
device.execute_shell_command("wm dismiss-keyguard")
time.sleep(1)
def _lock_screen(device):
device.execute_shell_command("svc power stayon false")
time.sleep(1)
def disable_keyguard(device):
_unlock_screen(device)
_unlock_device(device)
def _sleep_according_to_result(result):
if result:
time.sleep(1)
def _create_fuzz_crash_file(filepath, filename):
if not os.path.exists(filepath):
with open(filepath, "w", encoding='utf-8') as file_desc:
time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime())
file_desc.write('\n')
file_desc.write(
'\n' % time_stamp)
file_desc.write(
' \n' % filename)
file_desc.write(
' \n' % (filename, filename))
file_desc.write(
' \n')
file_desc.write(' \n')
file_desc.write(' \n')
file_desc.write(' \n')
file_desc.write('\n')
return
def _create_fuzz_pass_file(filepath, filename):
if not os.path.exists(filepath):
with open(filepath, "w", encoding='utf-8') as file_desc:
time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime())
file_desc.write('\n')
file_desc.write(
'\n' % time_stamp)
file_desc.write(
' \n' % filename)
file_desc.write(
' \n' % (filename, filename))
file_desc.write(' \n')
file_desc.write('\n')
return
def _create_fuzz_result_file(filepath, filename, error_message):
error_message = str(error_message)
error_message = error_message.replace("\"", "")
error_message = error_message.replace("<", "")
error_message = error_message.replace(">", "")
error_message = error_message.replace("&", "")
if "AddressSanitizer" in error_message:
LOG.error("FUZZ TEST CRASH")
_create_fuzz_crash_file(filepath, filename)
elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second',
error_message, re.M) is not None:
LOG.info("FUZZ TEST PASS")
_create_fuzz_pass_file(filepath, filename)
else:
LOG.error("FUZZ TEST UNAVAILABLE")
_create_empty_result_file(filepath, filename, error_message)
return
##############################################################################
##############################################################################
class ResultManager(object):
def __init__(self, testsuit_path, config):
self.testsuite_path = testsuit_path
self.config = config
self.result_rootpath = self.config.report_path
self.device = self.config.device
if testsuit_path.endswith(".hap"):
self.device_testpath = self.config.test_hap_out_path
else:
self.device_testpath = self.config.target_test_path
self.testsuite_name = os.path.basename(self.testsuite_path)
self.is_coverage = False
def set_is_coverage(self, is_coverage):
self.is_coverage = is_coverage
def get_test_results(self, error_message=""):
# Get test result files
filepath = self.obtain_test_result_file()
if "fuzztest" == self.config.testtype[0]:
LOG.info("create fuzz test report")
_create_fuzz_result_file(filepath, self.testsuite_name,
error_message)
return filepath
if not os.path.exists(filepath):
_create_empty_result_file(filepath, self.testsuite_name,
error_message)
if "benchmark" == self.config.testtype[0]:
self._obtain_benchmark_result()
# Get coverage data files
if self.is_coverage:
self.obtain_coverage_data()
return filepath
def _obtain_benchmark_result(self):
benchmark_root_dir = os.path.abspath(
os.path.join(self.result_rootpath, "benchmark"))
benchmark_dir = os.path.abspath(
os.path.join(benchmark_root_dir,
self.get_result_sub_save_path(),
self.testsuite_name))
if not os.path.exists(benchmark_dir):
os.makedirs(benchmark_dir)
LOG.info("benchmark_dir = %s" % benchmark_dir)
self.device.pull_file(os.path.join(self.device_testpath,
"%s.json" % self.testsuite_name), benchmark_dir)
if not os.path.exists(os.path.join(benchmark_dir,
"%s.json" % self.testsuite_name)):
os.rmdir(benchmark_dir)
return benchmark_dir
def get_result_sub_save_path(self):
find_key = os.sep + "benchmark" + os.sep
file_dir, _ = os.path.split(self.testsuite_path)
pos = file_dir.find(find_key)
subpath = ""
if -1 != pos:
subpath = file_dir[pos + len(find_key):]
LOG.info("subpath = " + subpath)
return subpath
def obtain_test_result_file(self):
result_save_path = get_result_savepath(self.testsuite_path,
self.result_rootpath)
result_file_path = os.path.join(result_save_path,
"%s.xml" % self.testsuite_name)
result_josn_file_path = os.path.join(result_save_path,
"%s.json" % self.testsuite_name)
if self.testsuite_path.endswith('.hap'):
remote_result_file = os.path.join(self.device_testpath,
"testcase_result.xml")
remote_json_result_file = os.path.join(self.device_testpath,
"%s.json" % self.testsuite_name)
else:
remote_result_file = os.path.join(self.device_testpath,
"%s.xml" % self.testsuite_name)
remote_json_result_file = os.path.join(self.device_testpath,
"%s.json" % self.testsuite_name)
if self.device.is_file_exist(remote_result_file):
self.device.pull_file(remote_result_file, result_file_path)
elif self.device.is_file_exist(remote_json_result_file):
self.device.pull_file(remote_json_result_file,
result_josn_file_path)
result_file_path = result_josn_file_path
else:
LOG.error("%s not exist", remote_result_file)
return result_file_path
def make_empty_result_file(self, error_message=""):
result_savepath = get_result_savepath(self.testsuite_path,
self.result_rootpath)
result_filepath = os.path.join(result_savepath, "%s.xml" %
self.testsuite_name)
if not os.path.exists(result_filepath):
_create_empty_result_file(result_filepath,
self.testsuite_name, error_message)
def is_exist_target_in_device(self, path, target):
if platform.system() == "Windows":
command = '\"ls -l %s | grep %s\"' % (path, target)
else:
command = "ls -l %s | grep %s" % (path, target)
check_result = False
stdout_info = self.device.execute_shell_command(command)
if stdout_info != "" and stdout_info.find(target) != -1:
check_result = True
return check_result
def obtain_coverage_data(self):
cov_root_dir = os.path.abspath(os.path.join(
self.result_rootpath,
"..",
"coverage",
"data",
"exec"))
target_name = "obj"
cxx_cov_path = os.path.abspath(os.path.join(
self.result_rootpath,
"..",
"coverage",
"data",
"cxx",
self.testsuite_name))
if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name):
if not os.path.exists(cxx_cov_path):
os.makedirs(cxx_cov_path)
src_file = os.path.join(DEFAULT_TEST_PATH, target_name)
self.device.pull_file(src_file, cxx_cov_path, is_create=True)
##############################################################################
##############################################################################
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test)
class CppTestDriver(IDriver):
"""
CppTest is a Test that runs a native test package on given device.
"""
# test driver config
config = None
result = ""
def __check_environment__(self, device_options):
if len(device_options) == 1 and device_options[0].label is None:
return True
if len(device_options) != 1 or \
device_options[0].label != DeviceLabelType.phone:
return False
return True
def __check_config__(self, config):
pass
def __result__(self):
return self.result if os.path.exists(self.result) else ""
def __execute__(self, request):
try:
self.config = request.config
self.config.target_test_path = DEFAULT_TEST_PATH
self.config.device = request.config.environment.devices[0]
suite_file = request.root.source.source_file
LOG.debug("Testsuite FilePath: %s" % suite_file)
if not suite_file:
LOG.error("test source '%s' not exists" %
request.root.source.source_string)
return
if not self.config.device:
result = ResultManager(suite_file, self.config)
result.set_is_coverage(False)
result.make_empty_result_file(
"No test device is found. ")
return
serial = request.config.device.__get_serial__()
device_log_file = get_device_log_file(
request.config.report_path,
serial)
with open(device_log_file, "a", encoding="UTF-8") as file_pipe:
self.config.device.start_catch_device_log(file_pipe)
self._init_gtest()
self._run_gtest(suite_file)
finally:
self.config.device.stop_catch_device_log()
def _init_gtest(self):
self.config.device.hdc_command("target mount")
self.config.device.execute_shell_command(
"rm -rf %s" % self.config.target_test_path)
self.config.device.execute_shell_command(
"mkdir -p %s" % self.config.target_test_path)
self.config.device.execute_shell_command(
"mount -o rw,remount,rw /")
if "fuzztest" == self.config.testtype[0]:
self.config.device.execute_shell_command(
"mkdir -p %s" % os.path.join(self.config.target_test_path,
"corpus"))
def _run_gtest(self, suite_file):
from xdevice import Variables
filename = os.path.basename(suite_file)
test_para = self._get_test_para(self.config.testcase,
self.config.testlevel,
self.config.testtype,
self.config.target_test_path,
filename)
is_coverage_test = True if self.config.coverage else False
# push testsuite file
self.config.device.push_file(suite_file, self.config.target_test_path)
self._push_corpus_if_exist(filename)
# push resource files
resource_manager = ResourceManager()
resource_data_dic, resource_dir = \
resource_manager.get_resource_data_dic(suite_file)
resource_manager.process_preparer_data(resource_data_dic, resource_dir,
self.config.device)
# execute testcase
if not self.config.coverage:
command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % (
self.config.target_test_path,
filename,
filename,
test_para)
else:
coverage_outpath = self.config.coverage_outpath
strip_num = len(coverage_outpath.split(os.sep)) - 1
command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=. " \
"GCOV_PREFIX_STRIP=%s ./%s %s" % \
(self.config.target_test_path,
filename,
str(strip_num),
filename,
test_para)
result = ResultManager(suite_file, self.config)
result.set_is_coverage(is_coverage_test)
try:
# get result
display_receiver = DisplayOutputReceiver()
self.config.device.execute_shell_command(
command,
receiver=display_receiver,
timeout=TIME_OUT,
retry=0)
return_message = display_receiver.output
except (ExecuteTerminate, DeviceError) as exception:
return_message = str(exception.args)
self.result = result.get_test_results(return_message)
resource_manager.process_cleaner_data(resource_data_dic,
resource_dir,
self.config.device)
def _push_corpus_if_exist(self, filename):
if "fuzztest" == self.config.testtype[0]:
corpus_path = os.path.join(get_fuzzer_path(filename), "corpus")
self.config.device.push_file(corpus_path,
os.path.join(self.config.target_test_path, "corpus"))
@staticmethod
def _get_test_para(testcase,
testlevel,
testtype,
target_test_path,
filename):
if "benchmark" == testtype[0]:
test_para = (" --benchmark_out_format=json"
" --benchmark_out=%s%s.json") % (
target_test_path, filename)
return test_para
if "" != testcase and "" == testlevel:
test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
elif "" == testcase and "" != testlevel:
level_para = get_level_para_string(testlevel)
test_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
else:
test_para = ""
if "fuzztest" == testtype[0]:
cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path(
filename), "project.xml")).get_fuzzer_config("fuzztest")
LOG.info("config list :%s" % str(cfg_list))
test_para += "corpus -max_len=" + cfg_list[0] + \
" -max_total_time=" + cfg_list[1] + \
" -rss_limit_mb=" + cfg_list[2]
return test_para
##############################################################################
##############################################################################
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test)
class JSUnitTestDriver(IDriver):
"""
JSUnitTestDriver is a Test that runs a native test package on given device.
"""
def __init__(self):
self.config = None
self.result = ""
self.start_time = None
self.ability_name = ""
self.package_name = ""
def __check_environment__(self, device_options):
pass
def __check_config__(self, config):
pass
def __result__(self):
return self.result if os.path.exists(self.result) else ""
def __execute__(self, request):
try:
LOG.info("developertest driver")
self.result = os.path.join(
request.config.report_path, "result",
'.'.join((request.get_module_name(), "xml")))
self.config = request.config
self.config.target_test_path = DEFAULT_TEST_PATH
self.config.device = request.config.environment.devices[0]
suite_file = request.root.source.source_file
if not suite_file:
LOG.error("test source '%s' not exists" %
request.root.source.source_string)
return
if not self.config.device:
result = ResultManager(suite_file, self.config)
result.set_is_coverage(False)
result.make_empty_result_file(
"No test device is found")
return
package_name, ability_name = self._get_package_and_ability_name(
suite_file)
self.package_name = package_name
self.ability_name = ability_name
self.config.test_hap_out_path = \
"/data/data/%s/files/" % self.package_name
self.config.device.hdc_command("shell hilog -r")
hilog = get_device_log_file(
request.config.report_path,
request.config.device.__get_serial__() + "_" + request.
get_module_name(),
"device_hilog")
hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
0o755)
with os.fdopen(hilog_open, "a") as hilog_file_pipe:
self.config.device.start_catch_device_log(hilog_file_pipe)
self._init_jsunit_test()
self._run_jsunit(suite_file)
hilog_file_pipe.flush()
self.generate_console_output(hilog, request)
finally:
self.config.device.stop_catch_device_log()
def _init_jsunit_test(self):
self.config.device.hdc_command("target mount")
self.config.device.execute_shell_command(
"rm -rf %s" % self.config.target_test_path)
self.config.device.execute_shell_command(
"mkdir -p %s" % self.config.target_test_path)
self.config.device.execute_shell_command(
"mount -o rw,remount,rw /")
def _run_jsunit(self, suite_file):
filename = os.path.basename(suite_file)
resource_manager = ResourceManager()
resource_data_dic, resource_dir = \
resource_manager.get_resource_data_dic(suite_file)
resource_manager.process_preparer_data(resource_data_dic, resource_dir,
self.config.device)
main_result = self._install_hap(suite_file)
result = ResultManager(suite_file, self.config)
if main_result:
self._execute_hapfile_jsunittest()
self._uninstall_hap(self.package_name)
else:
self.result = result.get_test_results("Error: install hap failed")
LOG.error("Error: install hap failed")
resource_manager.process_cleaner_data(resource_data_dic, resource_dir,
self.config.device)
def generate_console_output(self, device_log_file, request):
result_message = self.read_device_log(device_log_file)
report_name = request.get_module_name()
parsers = get_plugin(
Plugin.PARSER, CommonParserType.jsunit)
if parsers:
parsers = parsers[:1]
for listener in request.listeners:
listener.device_sn = self.config.device.device_sn
parser_instances = []
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suites_name = report_name
parser_instance.suite_name = report_name
parser_instance.listeners = request.listeners
parser_instances.append(parser_instance)
handler = ShellHandler(parser_instances)
process_command_ret(result_message, handler)
def read_device_log(self, device_log_file):
device_log_file_open = os.open(device_log_file, os.O_RDONLY,
stat.S_IWUSR | stat.S_IRUSR)
result_message = ""
with os.fdopen(device_log_file_open, "r", encoding='utf-8') \
as file_read_pipe:
while True:
data = file_read_pipe.readline()
if not data or not data.strip():
break
# only filter JSApp log
if data.find("JSApp:") != -1:
result_message += data
if data.find("[end] run suites end") != -1:
break
return result_message
def _execute_hapfile_jsunittest(self):
_unlock_screen(self.config.device)
_unlock_device(self.config.device)
try:
return_message = self.start_hap_activity()
except (ExecuteTerminate, DeviceError) as exception:
return_message = str(exception.args)
_lock_screen(self.config.device)
return return_message
def _install_hap(self, suite_file):
message = self.config.device.hdc_command("install %s" % suite_file)
message = str(message).rstrip()
if message == "" or "success" in message:
return_code = True
if message != "":
LOG.info(message)
else:
return_code = False
if message != "":
LOG.warning(message)
_sleep_according_to_result(return_code)
return return_code
def start_hap_activity(self):
try:
command = "aa start -d 123 -a %s.MainAbility -b %s" \
% (self.package_name, self.package_name)
self.start_time = time.time()
result_value = self.config.device.execute_shell_command(
command, timeout=TIME_OUT)
if "success" in str(result_value).lower():
LOG.info("execute %s's testcase success. result value=%s"
% (self.package_name, result_value))
time.sleep(30)
else:
LOG.info("execute %s's testcase failed. result value=%s"
% (self.package_name, result_value))
_sleep_according_to_result(result_value)
return_message = result_value
except (ExecuteTerminate, DeviceError) as exception:
return_message = exception.args
return return_message
def _uninstall_hap(self, package_name):
return_message = self.config.device.execute_shell_command(
"bm uninstall -n %s" % package_name)
_sleep_according_to_result(return_message)
return return_message
@staticmethod
def _get_package_and_ability_name(hap_filepath):
package_name = ""
ability_name = ""
if os.path.exists(hap_filepath):
filename = os.path.basename(hap_filepath)
#unzip the hap file
hap_bak_path = os.path.abspath(os.path.join(
os.path.dirname(hap_filepath),
"%s.bak" % filename))
zf_desc = zipfile.ZipFile(hap_filepath)
try:
zf_desc.extractall(path=hap_bak_path)
except RuntimeError as error:
print(error)
zf_desc.close()
#verify config.json file
app_profile_path = os.path.join(hap_bak_path, "config.json")
if not os.path.exists(app_profile_path):
print("file %s not exist" % app_profile_path)
return package_name, ability_name
if os.path.isdir(app_profile_path):
print("%s is a folder, and not a file" % app_profile_path)
return package_name, ability_name
#get package_name and ability_name value
load_dict = {}
with open(app_profile_path, 'r') as load_f:
load_dict = json.load(load_f)
profile_list = load_dict.values()
for profile in profile_list:
package_name = profile.get("package")
if not package_name:
continue
abilities = profile.get("abilities")
for abilitie in abilities:
abilities_name = abilitie.get("name")
if abilities_name.startswith("."):
ability_name = package_name + abilities_name[
abilities_name.find("."):]
else:
ability_name = abilities_name
break
break
#delete hap_bak_path
if os.path.exists(hap_bak_path):
shutil.rmtree(hap_bak_path)
else:
print("file %s not exist" % hap_filepath)
return package_name, ability_name