1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import subprocess 21import threading 22import time 23 24from xdevice import convert_mac 25from xdevice import ConfigConst 26from xdevice import FilePermission 27from xdevice import ParamError 28from xdevice import get_config_value 29from xdevice import platform_logger 30from ohos.config.config_manager import OHOSUserConfigManager 31from ohos.error import ErrorMessage 32 33LOG = platform_logger("DriverConstant") 34TIME_OUT = 300 * 1000 35 36 37def get_xml_output(config, json_config): 38 xml_output = config.testargs.get("xml-output") 39 if not xml_output: 40 if get_config_value('xml-output', json_config.get_driver(), False): 41 xml_output = get_config_value('xml-output', 42 json_config.get_driver(), False) 43 else: 44 xml_output = "false" 45 else: 46 xml_output = xml_output[0] 47 xml_output = str(xml_output).lower() 48 return xml_output 49 50 51def get_nfs_server(request): 52 config_manager = OHOSUserConfigManager( 53 config_file=request.get(ConfigConst.configfile, ""), 54 env=request.get(ConfigConst.test_environment, "")) 55 remote_info = config_manager.get_user_config("testcases/server", 56 filter_name="NfsServer") 57 if not remote_info: 58 err_msg = ErrorMessage.Config.Code_0302018 59 LOG.error(err_msg) 60 raise ParamError(err_msg) 61 return remote_info 62 63 64def init_remote_server(lite_instance, request=None): 65 config_manager = OHOSUserConfigManager( 66 config_file=request.get(ConfigConst.configfile, ""), 67 env=request.get(ConfigConst.test_environment, "")) 68 linux_dict = config_manager.get_user_config("testcases/server") 69 70 if linux_dict: 71 setattr(lite_instance, "linux_host", linux_dict.get("ip")) 72 setattr(lite_instance, "linux_port", linux_dict.get("port")) 73 setattr(lite_instance, "linux_directory", linux_dict.get("dir")) 74 75 else: 76 raise ParamError(ErrorMessage.Config.Code_0302019) 77 78 79def create_empty_result_file(filepath, filename, error_message): 80 error_message = str(error_message) 81 error_message = error_message.replace("\"", """) 82 error_message = error_message.replace("<", "<") 83 error_message = error_message.replace(">", ">") 84 error_message = error_message.replace("&", "&") 85 if filename.endswith(".hap"): 86 filename = filename.split(".")[0] 87 if not os.path.exists(filepath): 88 file_open = os.open(filepath, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 89 FilePermission.mode_755) 90 with os.fdopen(file_open, "w") as file_desc: 91 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 92 time.localtime()) 93 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 94 file_desc.write('<testsuites tests="0" failures="0" ' 95 'disabled="0" errors="0" timestamp="%s" ' 96 'time="0" name="AllTests">\n' % time_stamp) 97 file_desc.write( 98 ' <testsuite name="%s" tests="0" failures="0" ' 99 'disabled="0" errors="0" time="0.0" ' 100 'unavailable="1" message="%s">\n' % 101 (filename, error_message)) 102 file_desc.write(' </testsuite>\n') 103 file_desc.write('</testsuites>\n') 104 file_desc.flush() 105 return 106 107 108def get_result_savepath(testsuit_path, result_rootpath): 109 findkey = "%stests%s" % (os.sep, os.sep) 110 filedir, _ = os.path.split(testsuit_path) 111 pos = filedir.find(findkey) 112 if -1 != pos: 113 subpath = filedir[pos + len(findkey):] 114 pos1 = subpath.find(os.sep) 115 if -1 != pos1: 116 subpath = subpath[pos1 + len(os.sep):] 117 result_path = os.path.join(result_rootpath, "result", subpath) 118 else: 119 result_path = os.path.join(result_rootpath, "result") 120 else: 121 result_path = os.path.join(result_rootpath, "result") 122 123 if not os.path.exists(result_path): 124 os.makedirs(result_path) 125 126 LOG.info("Result save path = %s" % result_path) 127 return result_path 128 129 130class ResultManager(object): 131 def __init__(self, testsuit_path, result_rootpath, device, 132 device_testpath): 133 self.testsuite_path = testsuit_path 134 self.result_rootpath = result_rootpath 135 self.device = device 136 self.device_testpath = device_testpath 137 self.testsuite_name = os.path.basename(self.testsuite_path) 138 self.is_coverage = False 139 140 def set_is_coverage(self, is_coverage): 141 self.is_coverage = is_coverage 142 143 def get_test_results(self, error_message=""): 144 # Get test result files 145 filepath = self.obtain_test_result_file() 146 if not os.path.exists(filepath): 147 create_empty_result_file(filepath, self.testsuite_name, 148 error_message) 149 150 # Get coverage data files 151 if self.is_coverage: 152 self.obtain_coverage_data() 153 154 return filepath 155 156 def obtain_test_result_file(self): 157 result_savepath = get_result_savepath(self.testsuite_path, 158 self.result_rootpath) 159 if self.testsuite_path.endswith('.hap'): 160 filepath = os.path.join(result_savepath, "%s.xml" % str( 161 self.testsuite_name).split(".")[0]) 162 163 remote_result_name = "" 164 if self.device.is_file_exist(os.path.join(self.device_testpath, 165 "testcase_result.xml")): 166 remote_result_name = "testcase_result.xml" 167 elif self.device.is_file_exist(os.path.join(self.device_testpath, 168 "report.xml")): 169 remote_result_name = "report.xml" 170 171 if remote_result_name: 172 self.device.pull_file( 173 os.path.join(self.device_testpath, remote_result_name), 174 filepath) 175 else: 176 LOG.error("%s no report file", self.device_testpath) 177 178 else: 179 filepath = os.path.join(result_savepath, "%s.xml" % 180 self.testsuite_name) 181 remote_result_file = os.path.join(self.device_testpath, 182 "%s.xml" % self.testsuite_name) 183 184 if self.device.is_file_exist(remote_result_file): 185 self.device.pull_file(remote_result_file, result_savepath) 186 else: 187 LOG.error("%s not exists", remote_result_file) 188 return filepath 189 190 def is_exist_target_in_device(self, path, target): 191 command = "ls -l %s | grep %s" % (path, target) 192 193 check_result = False 194 stdout_info = self.device.execute_shell_command(command) 195 if stdout_info != "" and stdout_info.find(target) != -1: 196 check_result = True 197 return check_result 198 199 def obtain_coverage_data(self): 200 java_cov_path = os.path.abspath( 201 os.path.join(self.result_rootpath, "..", "coverage/data/exec")) 202 dst_target_name = "%s.exec" % self.testsuite_name 203 src_target_name = "jacoco.exec" 204 if self.is_exist_target_in_device(self.device_testpath, 205 src_target_name): 206 if not os.path.exists(java_cov_path): 207 os.makedirs(java_cov_path) 208 self.device.pull_file( 209 os.path.join(self.device_testpath, src_target_name), 210 os.path.join(java_cov_path, dst_target_name)) 211 212 cxx_cov_path = os.path.abspath( 213 os.path.join(self.result_rootpath, "..", "coverage/data/cxx", 214 self.testsuite_name)) 215 target_name = "obj" 216 if self.is_exist_target_in_device(self.device_testpath, target_name): 217 if not os.path.exists(cxx_cov_path): 218 os.makedirs(cxx_cov_path) 219 src_file = os.path.join(self.device_testpath, target_name) 220 self.device.pull_file(src_file, cxx_cov_path) 221 222 223class JavaThread(threading.Thread): 224 def __init__(self, name, log_path, command, execute_jar_path, handler=None, wait_time=0): 225 super().__init__() 226 self.name = name 227 self.log_path = log_path 228 self.command = command 229 self.handler = handler 230 self.wait_time = wait_time 231 self.jar_path = execute_jar_path 232 self.jar_process = None 233 self.finished = threading.Event() 234 235 def cancel(self): 236 """Stop the timer if it hasn't finished yet""" 237 self.finished.set() 238 239 def run_java_test(self): 240 LOG.info('start to run java testcase') 241 LOG.debug('run java command: {}'.format(convert_mac(self.command))) 242 self.jar_process = subprocess.Popen(self.command, stdout=subprocess.PIPE, 243 cwd=self.jar_path) 244 jar_log_open = os.open(self.log_path, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 245 FilePermission.mode_755) 246 with os.fdopen(jar_log_open, "a+") as file_data: 247 for line in iter(self.jar_process.stdout.readline, b''): 248 line = line.decode('GBK') 249 file_data.write(line) 250 if self.handler: 251 self.handler.__read__(line.replace('\r', '')) 252 self.jar_process.stdout.close() 253 self.jar_process.wait() 254 255 def run(self): 256 self.finished.wait(self.wait_time) 257 if not self.finished.is_set(): 258 self.run_java_test() 259 self.finished.set() 260 261 def stop(self): 262 LOG.info("Stop java process") 263 self.kill_proc_and_subproc(self.jar_process) 264 265 def kill_proc_and_subproc(self, proc): 266 """ Stops a process started and subprocess 267 started by process by kill_proc_and_subproc. 268 """ 269 try: 270 cmd = "taskkill /T /F /PID {}".format(str(proc.pid)) 271 subprocess.Popen(cmd.split(), 272 stdout=subprocess.PIPE, 273 stderr=subprocess.PIPE, 274 shell=False) 275 276 except Exception as _err: 277 LOG.debug("kill subprocess exception error:{}".format(_err)) 278