1''' 2# Copyright (c) 2023 Huawei Device Co., Ltd. 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6 7# http://www.apache.org/licenses/LICENSE-2.0 8 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14''' 15 16import datetime 17import json 18import os 19import re 20import shutil 21import stat 22import subprocess 23import sys 24import time 25 26import paramiko 27 28import LinuxContains 29 30 31install_fail_dict = {} 32install_success_dict = {} 33total_hap_num = 0 34hap_install_success = 0 35hap_install_fail = 0 36special_num = 0 37special_hap_list = [] 38depend_config_map = dict 39 40CONFIG_PATH='D:\\window_manager_config.xml' 41FINGER_PRINT = '8E93863FC32EE238060BF69A9B37E2608FFFB21F93C862DD511CBAC9F30024B5' 42repeat_time = 0 43snapshot = 1 44 45 46def exec_cmd(cmd): 47 f = os.popen("hdc shell " + "\"" + cmd + "\"") 48 # print(cmd) 49 text = f.read() 50 f.close() 51 52 53 return text 54 55 56def exec_cmd_path(cmd, path): 57 f = os.popen("hdc shell " + "\"" + cmd + "\" >> " + path) 58 # print(cmd) 59 text = f.read() 60 f.close() 61 return text 62 63 64def exec_cmd_simple(cmd): 65 f = os.popen(cmd) 66 # print(cmd) 67 text = f.read() 68 f.close() 69 return text 70 71def exists(file_path): 72 return os.path.exists(file_path) 73 74 75def get_haps(local_dir): 76 files = os.listdir(local_dir) 77 hap_list = [] 78 for file in files: 79 if "_signed" in file: 80 hap_list.append(file) 81 return hap_list 82 83 84def install_hap(hap, hap_path, base_dir): 85 # print("install " + hap_path + "\\" + hap) 86 if not exists(os.path.join(hap_path, hap)): 87 return 88 install_res = exec_cmd_simple("hdc install -r {}".format(hap_path + "/" + hap)) 89 # print(install_res) 90 if not install_res.__contains__("msg:install bundle successfully."): 91 install_res = install_res.replace("\nAppMod finish\n\n", "") 92 install_fail_dict[hap] = install_res 93 else: 94 exec_cmd_simple("echo install {0} success! >> {1}".format(hap, "{0}/auto_test.log".format(base_dir))) 95 install_success_dict[hap] = install_res 96 time.sleep(2) 97 98 99def install_depend_haps(curr_haps, hap_path, base_dir): 100 for hap in curr_haps: 101 if not depend_config_map.get(hap): 102 continue 103 depend_haps = depend_config_map.get(hap) 104 for depend_hap in depend_haps: 105 install_hap(depend_hap, hap_path, base_dir) 106 install_depend_haps(depend_haps, hap_path, base_dir) 107 108 109def get_test_bundle(begin_bundles): 110 end_bundles = get_all_bundles() 111 return list(set(end_bundles) - set(begin_bundles)) 112 113 114def install_haps(local_hap, hap_path, base_dir): 115 for hap in local_hap: 116 install_hap(hap, hap_path, base_dir) 117 118 119def start_log(): 120 cmd_clean = "rm -r /data/log/hilog/*" 121 cmd_start = "hilog -w start -l 1M" 122 exec_cmd(cmd_clean) 123 exec_cmd(cmd_start) 124 125 126def stop_log(): 127 cmd_stop = "hilog -w stop" 128 exec_cmd(cmd_stop) 129 130 131def recv_log(local_log): 132 # 输出日志结果 133 file_path = local_log 134 cmd = "hdc file recv /data/log/hilog/ {}".format(file_path) 135 exec_cmd_simple(cmd) 136 137 138def test_install_hap_with_error_snapshot(uninstall_bundles, base_dir): 139 with open(LinuxContains.FA_MODAL_AND_LOWER_CASE_LIST,'r',encoding='utf-8') as fp: 140 FA_Python_Obj=json.load(fp) 141 is_stage_model=0 142 for key,value in FA_Python_Obj.items(): 143 for bundle_name in uninstall_bundles: 144 # print("key: "+key) 145 bundle_name=bundle_name.strip() 146 if bundle_name==key: 147 is_stage_model=is_stage_model+1 148 if is_stage_model == 0: 149 cmd = r"hdc shell aa test -b {} -p com.example.entry_test -m entry_test -s unittest /ets/TestRunner/OpenHarmonyTestRunner -s timeout 30000000 " 150 else: 151 cmd= r"hdc shell aa test -b {} -p com.example.entry_test -m entry_test -s unittest OpenHarmonyTestRunner -s timeout 30000000 " 152 path = "{0}/auto_test.log".format(base_dir) 153 if len(uninstall_bundles) != 1: 154 exec_cmd_simple("echo uninstall_bundles:{0}, >> {1}".format(uninstall_bundles, path)) 155 for bundle in uninstall_bundles: 156 bundle = bundle.strip() 157 exec_cmd_simple("echo ================start {0} ui test================{1} >> {2}".format(bundle, 158 datetime.datetime.now().strftime( 159 '%Y-%m-%d %H:%M:%S'), 160 path)) 161 # print("test " + bundle) 162 tmp_cmd = cmd.format(bundle) 163 # print(tmp_cmd) 164 p = subprocess.Popen(tmp_cmd, shell=True, close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, 165 stderr=subprocess.PIPE, encoding="utf-8") 166 current = "-1" 167 auto_log = open("{0}/auto_test.log".format(base_dir), mode='a') 168 while True: 169 line = p.stdout.readline() 170 auto_log.writelines(line) 171 if line and "current" in line: 172 nums = re.findall(r"\d+", line) 173 if (len(nums) == 0): 174 current = "0" 175 else: 176 current = nums[0] 177 if line and "stack" in line: 178 exec_cmd_simple( 179 "hdc shell snapshot_display -f /data/snapshot/{0}_{1}.jpeg".format(bundle, current)) 180 if line == '' and p.poll() != None: 181 break 182 exec_cmd_simple("hdc file recv /data/snapshot/. {}/snapshot".format(base_dir)) 183 exec_cmd_simple("hdc shell rm -rf /data/snapshot/*") 184 auto_log.flush() 185 auto_log.close() 186 time.sleep(5) 187 188 189def test_install_hap(test_bundles, base_dir): 190 cmd = r"aa test -b {} -p com.example.entry_test -m entry_test -s unittest OpenHarmonyTestRunner -s timeout 30000000" 191 path = "{0}/auto_test.log".format(base_dir) 192 for bundle in test_bundles: 193 bundle = bundle.strip() 194 if bundle == 'ohos.samples.launcher': 195 cmd_launcher = 'hdc shell aa start -b ohos.samples.launcher -a MainAbility' 196 os.system(cmd_launcher) 197 exec_cmd_simple("echo ================start {0} ui test================{1} >> {2}".format(bundle, 198 datetime.datetime.now().strftime( 199 '%Y-%m-%d %H:%M:%S'), 200 path)) 201 # print("test " + bundle) 202 tmp_cmd = cmd.format(bundle) 203 exec_cmd_path(tmp_cmd, path) 204 time.sleep(5) 205 206 207def uninstall_bundles(install_bundles): 208 for bundle in install_bundles: 209 # print("uninstall " + bundle) 210 uninstall_res = exec_cmd_simple("hdc uninstall {}".format(bundle)) 211 212 213def clear_dir(dir_path): 214 if os.path.exists(dir_path): 215 shutil.rmtree(dir_path) 216 os.makedirs(dir_path) 217 218 219def clear_file(file_path): 220 if os.path.exists(file_path): 221 os.remove(file_path) 222 os.system(r"type nul>{}".format(file_path)) 223 224 225def get_all_bundles(): 226 bundles = exec_cmd("bm dump -a") 227 bundles = bundles.splitlines() 228 del bundles[0] 229 return bundles 230 231 232def batch_install(haps, base_dir): 233 start_log() 234 exec_cmd_simple("hdc shell power-shell setmode 602") 235 # exec_cmd_simple("hdc shell setenforce 0") 236 exec_cmd_simple("hdc shell param set persist.ace.testmode.enabled 1") 237 exec_cmd_simple("hdc shell mkdir /data/snapshot") 238 limit = 2 239 count = 0 240 time = 0 241 cur_batch_hap = [] 242 special_haps = LinuxContains.SPECIAL_HAP.split(";") 243 target_paths = LinuxContains.TARGET_PATH.split(";") 244 begin_bundles = get_all_bundles() 245 while time <= repeat_time: 246 for hap in haps: 247 248 isTargetPath = False 249 for target_path in target_paths: 250 if hap.startswith(target_path): 251 isTargetPath = True 252 if not isTargetPath: 253 continue 254 255 isSpecialSkip = False 256 for special_hap in special_haps: 257 if special_hap in hap: 258 global special_num 259 isSpecialSkip = True 260 special_hap_list.append(hap) 261 special_num = special_num + 1 262 break 263 if isSpecialSkip: 264 continue 265 266 cur_batch_hap.append(hap) 267 count = count + 1 268 if count == limit: 269 install_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir) 270 test_bundles = get_test_bundle(begin_bundles) 271 install_depend_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir) 272 all_install_bundles = get_test_bundle(begin_bundles) 273 if snapshot == 0: 274 test_install_hap(test_bundles, base_dir) 275 else: 276 test_install_hap_with_error_snapshot(test_bundles, base_dir) 277 count = 0 278 uninstall_bundles(all_install_bundles) 279 cur_batch_hap.clear() 280 if len(cur_batch_hap) != 0: 281 install_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir) 282 test_bundles = get_test_bundle(begin_bundles) 283 install_depend_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir) 284 all_install_bundles = get_test_bundle(begin_bundles) 285 if snapshot == 0: 286 test_install_hap(test_bundles, base_dir) 287 else: 288 test_install_hap_with_error_snapshot(test_bundles, base_dir) 289 uninstall_bundles(all_install_bundles) 290 cur_batch_hap.clear() 291 time += 1 292 stop_log() 293 recv_log(base_dir + "/") 294 295 296def handle_test_log(base_dir): 297 file = open("{0}/auto_test.log".format(base_dir), encoding='utf-8', errors='ignore') 298 p_num = 0 299 fail_num = 0 300 success_num = 0 301 test_exp_num = 0 302 test_pass_num = 0 303 test_fail_num = 0 304 test_error_num = 0 305 died_num = 0 306 fail_dict = [] 307 died_dict = [] 308 curr_name = "" 309 has_result = 1 310 for x in file: 311 if x.startswith(r"================start"): 312 if (has_result == 0): 313 fail_num = fail_num + 1 314 fail_dict.append(curr_name) 315 else: 316 has_result = 0 317 p_num = p_num + 1 318 curr_name = x.split(" ")[1] 319 if x.startswith(r"OHOS_REPORT_RESULT"): 320 has_result = 1 321 nums = re.findall(r"\d+", x) 322 if len(nums) == 4 or len(nums) == 5: 323 if nums[0] == nums[3] and int(nums[0]) != 0: 324 success_num = success_num + 1 325 else: 326 fail_num = fail_num + 1 327 fail_dict.append(curr_name) 328 test_exp_num = test_exp_num + int(nums[0]) 329 test_fail_num = test_fail_num + int(nums[1]) 330 test_error_num = test_error_num + int(nums[2]) 331 test_pass_num = test_pass_num + int(nums[3]) 332 else: 333 fail_num = fail_num + 1 334 fail_dict.append(curr_name) 335 elif x.__contains__("App died"): 336 has_result = 1 337 died_num = died_num + 1 338 died_dict.append(curr_name) 339 340 file.close() 341 342 error_log = open("{0}/auto_test.log".format(base_dir), mode='a') 343 error_log.writelines( 344 "共完成测试项目 {0}个,成功{1}个,失败{2}个,异常中止(App died){3}个,特殊应用跳过{4}个\n".format( 345 int(p_num + special_num / 2), success_num, fail_num, died_num, int(special_num / 2))) 346 347 error_log.writelines( 348 "共完成测试用例 {0}个,成功{1}个,失败{2}个,错误{3}个\n".format(test_exp_num, test_pass_num, test_fail_num, 349 test_error_num)) 350 print("successNum:{0} failNum:{1} diedNum:{2} specialNum:{3} testNum:{4} testSuccessNum:{5} testFailNum:{6} testErrorNum:{7}".format(success_num, 351 fail_num, died_num, int(special_num / 2), test_exp_num, test_pass_num, test_fail_num, test_error_num)) 352 if len(fail_dict) > 0: 353 error_log.writelines("失败工程BundleName如下:\n") 354 for x in fail_dict: 355 error_log.writelines(" " + x + "\n") 356 if len(died_dict) > 0: 357 error_log.writelines("异常中止(App died)工程BundleName如下:\n") 358 for x in died_dict: 359 error_log.writelines(" " + x + "\n") 360 error_log.writelines("安装失败项目数量:{0}\n".format(len(install_fail_dict))) 361 for i in install_fail_dict: 362 error_log.writelines('{0} : {1}'.format(i, install_fail_dict[i])) 363 if len(special_hap_list) > 0: 364 error_log.writelines("特殊安装跳过Hap数量:{0}\n".format(special_num)) 365 for i in special_hap_list: 366 error_log.writelines(i + "\n") 367 error_log.flush() 368 error_log.close() 369 if fail_num != 0 or died_num != 0 or len(install_fail_dict) != 0: 370 print ('test failed !!') 371 else: 372 return ('test success !!') 373 374def init_out(): 375 base_dir = sys.path[0] 376 out_path = "{0}/ui_test_out/{1}".format(base_dir, datetime.datetime.now().strftime('%Y%m%d')) 377 print(out_path) 378 clear_dir(out_path) 379 clear_dir("{0}/errorLog".format(out_path)) 380 clear_dir("{0}/successLog".format(out_path)) 381 clear_dir("{0}/SampleSignHap".format(out_path)) 382 clear_file("{0}/auto_test.log".format(out_path)) 383 clear_dir("{0}/hilog".format(out_path)) 384 clear_dir("{0}/snapshot".format(out_path)) 385 return out_path 386 387 388def add_permission(bundle_name, finger_print): 389 KEY_NAME = 'install_list' 390 f = open(LinuxContains.INSTALL_LIST_CAPABILITY, 'r') 391 if bundle_name in f.read(): 392 with open(LinuxContains.INSTALL_LIST_CAPABILITY, "r", encoding="utf-8") as f: 393 old_data = json.load(f) 394 # print(bundle_name + " 已存在,需要修改权限") 395 for i in old_data[KEY_NAME]: 396 if i['bundleName'] == bundle_name: 397 i['app_signature'][0] = finger_print 398 with open(LinuxContains.INSTALL_LIST_CAPABILITY, "w", encoding="utf-8") as f: 399 json.dump(old_data, f) 400 401 else: 402 # 追加权限 403 permission_fields = { 404 "bundleName": bundle_name, 405 "app_signature": [finger_print], 406 "allowAppUsePrivilegeExtension": True 407 } 408 with open(LinuxContains.INSTALL_LIST_CAPABILITY, "r", encoding="utf-8") as f: 409 old_data = json.load(f) 410 # 这里就是那个列表List 411 old_data[KEY_NAME].append(permission_fields) 412 # print(bundle_name + " 不存在,需要追加权限") 413 with open(LinuxContains.INSTALL_LIST_CAPABILITY, "w", encoding="utf-8") as f: 414 json.dump(old_data, f) 415 416 417def pull_list(): 418 cmd_pull = "hdc shell mount -o rw,remount / & hdc file recv /system/etc/app/install_list_capability.json" 419 os.system(cmd_pull) 420 421 422def push_list(): 423 cmd_push = "hdc shell mount -o rw,remount / & hdc file send install_list_capability.json /system/etc/app/install_list_capability.json & hdc shell chmod 777 /system/etc/app/install_list_capability.json & hdc shell reboot" 424 # print(cmd_push) 425 os.system(cmd_push) 426 time.sleep(40) # 等待重启 427 cmd_unlock = 'hdc shell power-shell wakeup & hdc shell uinput -T -m 100 200 100 900 600 & hdc shell power-shell setmode 602' # 亮屏并解锁屏幕 428 # print("设置屏幕常亮") 429 # print(cmd_unlock) 430 os.system(cmd_unlock) 431 432 433def load_config_to_dict(config_file): 434 with open(config_file, encoding='utf-8') as a: 435 # 读取文件 436 global depend_config_map 437 depend_config_map = json.load(a) 438 # print(type (depend_config_map)) 439 440 441def replace_enable(file,old_str,new_str): 442 with open(file, "r", encoding="utf-8") as f1,open("%s.bak" % file, "w", encoding="utf-8") as f2: 443 for line in f1: 444 if old_str in line: 445 line = line.replace(old_str, new_str) 446 f2.write(line) 447 os.remove(file) 448 os.rename("%s.bak" % file, file) 449 450 451def pull_config(): 452 cmd='hdc shell mount -o rw,remount / & hdc file recv system/etc/window/resources/window_manager_config.xml {0}'.format(LinuxContains.SAVE_XML_PATH) 453 os.system(cmd) 454 455 456def push_config(): 457 cmd='hdc file send {0}/window_manager_config.xml system/etc/window/resources/window_manager_config.xml'.format(LinuxContains.SAVE_XML_PATH) 458 os.system(cmd) 459 460 461if __name__ == '__main__': 462 load_config_to_dict(LinuxContains.COMBIN_CONFIG) 463 # print(depend_config_map) 464 465 # 创建ui_test_out文件夹及子文件夹 466 out_base = init_out() 467 # pull_config() 468 # replace_enable(CONFIG_PATH,'decor enable="false"','decor enable="true"') 469 # push_config() 470 471 # 拉install_list_capability.json 472 pull_list() 473 474 # 特殊安装应用,添加权限 475 with open(LinuxContains.SPECIAL_LIST,'r',encoding='utf-8') as fp: 476 python_obj=json.load(fp) 477 for key,value in python_obj.items(): 478 add_permission(value[0],FINGER_PRINT) 479 480 # 推install_list_capability.json,重启解锁 481 push_list() 482 # sftp_from_remote("{0}\\SampleSignHap".format(out_base), "{0}\\errorLog".format(out_base), 483 # "{0}\\successLog".format(out_base)) 484 haps = get_haps(LinuxContains.SIGN_HAP_PATH) 485 total_hap_num = len(haps) / 2 486 batch_install(haps, out_base) 487 handle_test_log(out_base)