1# Copyright (c) 2023 Huawei Device Co., Ltd. 2# Licensed under the Apache License, Version 2.0 (the "License"); 3# you may not use this file except in compliance with the License. 4# You may obtain a copy of the License at 5# 6# http://www.apache.org/licenses/LICENSE-2.0 7# 8# Unless required by applicable law or agreed to in writing, software 9# distributed under the License is distributed on an "AS IS" BASIS, 10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11# See the License for the specific language governing permissions and 12# limitations under the License. 13 14from ast import parse 15import json 16import sys 17import os 18import time 19import argparse 20import re 21import subprocess 22import shlex 23import datetime 24 25def GetDirSize(dir_path): 26 if not os.path.exists(dir_path): 27 PrintToLog("\n\nERROR: %s, dir are not exist!!!\n" % dir_path) 28 PrintToLog("End of check, test failed!") 29 sys.exit(99) 30 size = 0 31 for root, dirs, files in os.walk(dir_path): 32 for name in files: 33 if not os.path.islink(os.path.join(root, name)): 34 sz = os.path.getsize(os.path.join(root, name)) 35 #print('{} : {}byte'.format(os.path.join(root, name), sz)) 36 size += sz 37 PrintToLog('total size: {:.2f}M'.format(size/1024/1024)) 38 return size 39 40def PrintToLog(str): 41 time = datetime.datetime.now() 42 str = "[{}] {}".format(time, str) 43 print(str) 44 with open(os.path.join(args.save_path, 'L2_mini_test_{}.log'.format(args.device_num)), mode='a', encoding='utf-8') as log_file: 45 console = sys.stdout 46 sys.stdout = log_file 47 print(str) 48 sys.stdout = console 49 log_file.close() 50 51def EnterCmd(mycmd, waittime = 0, printresult = 1): 52 if mycmd == "": 53 return 54 global CmdRetryCnt 55 CmdRetryCnt = 1 56 EnterCmdRetry = 2 57 while EnterCmdRetry: 58 EnterCmdRetry -= 1 59 try: 60 p = subprocess.Popen(mycmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 61 result, unused_err = p.communicate(timeout=25) 62 try: 63 result=result.decode(encoding="utf-8") 64 except UnicodeDecodeError: 65 result=result.decode('gbk', errors='ignore') 66 break 67 except Exception as e: 68 result = 'retry failed again' 69 PrintToLog(e) 70 CmdRetryCnt += 1 71 p.kill() 72 if printresult == 1: 73 with open(os.path.join(args.save_path, 'mini_test_{}.bat'.format(args.device_num)), mode='a', encoding='utf-8') as cmd_file: 74 cmd_file.write(mycmd + '\n') 75 cmd_file.close() 76 PrintToLog(mycmd) 77 PrintToLog(result) 78 sys.stdout.flush() 79 if waittime != 0: 80 time.sleep(waittime) 81 if printresult == 1: 82 with open(os.path.join(args.save_path, 'mini_test_{}.bat'.format(args.device_num)), mode='a', encoding='utf-8') as cmd_file: 83 cmd_file.write("ping -n {} 127.0.0.1>null\n".format(waittime)) 84 cmd_file.close() 85 return result 86 87def EnterShellCmd(shellcmd, waittime = 0, printresult = 1): 88 if shellcmd == "": 89 return 90 cmd = "hdc_std -t {} shell \"{}\"".format(args.device_num, shellcmd) 91 return EnterCmd(cmd, waittime, printresult) 92 93if __name__ == "__main__": 94 parser = argparse.ArgumentParser(description='manual to this script') 95 parser.add_argument('--save_path', type=str, default = 'D:\\DeviceTestTools\\screenshot') 96 parser.add_argument('--device_num', type=str, default = 'null') 97 parser.add_argument('--archive_path', type=str, default = 'Z:\workspace\ohos_L2\ohos\out\\rk3568\packages\phone') 98 args = parser.parse_args() 99 100 if args.device_num == 'null': 101 result = EnterCmd("hdc_std list targets", 1, 0) 102 print(result) 103 args.device_num = result.split()[0] 104 105 PrintToLog("\n\n########## First check key processes start ##############") 106 lose_process = [] 107 process_pid = {} 108 109 two_check_process_list = ['huks_service', 'hilogd', 'hdf_devmgr', 'samgr', 'foundation', 'accesstoken_ser'] 110 other_process_list = ['softbus_server', 'deviceauth_service'] 111 112 for pname in two_check_process_list: 113 pids = EnterCmd("hdc_std -t {} shell pidof {}".format(args.device_num, pname), 0, 1) 114 try: 115 pidlist = pids.split() 116 int(pidlist[0]) 117 for pid in pidlist: 118 int(pid) 119 process_pid[pname] = pidlist 120 except: 121 lose_process.append(pname) 122 all_p = EnterShellCmd("ps -elf") 123 for pname in other_process_list: 124 findp = all_p.find(pname, 0, len(all_p)) 125 if findp == -1: 126 lose_process.append(pname) 127 128 if lose_process: 129 PrintToLog("\n\nERROR: %s, These processes are not exist!!!\n" % lose_process) 130 PrintToLog("End of check, test failed!") 131 sys.exit(99) 132 else: 133 PrintToLog("First processes check is ok\n") 134 135 time.sleep(10) 136 137 # key processes second check, and cmp to first check 138 PrintToLog("\n\n########## Second check key processes start ##############") 139 second_check_lose_process = [] 140 # for pname in two_check_process_list + other_process_list: 141 for pname in two_check_process_list: 142 pids = EnterCmd("hdc_std -t {} shell pidof {}".format(args.device_num, pname), 0, 1) 143 try: 144 pidlist = pids.split() 145 if process_pid[pname] != pidlist: 146 if pname in two_check_process_list: 147 PrintToLog("ERROR: pid of %s is different the first check" % pname) 148 PrintToLog("SmokeTest find some fatal problems!") 149 PrintToLog("End of check, test failed!") 150 sys.exit(99) 151 else: 152 PrintToLog("WARNNING: pid of %s is different the first check" % pname) 153 elif len(pidlist) != 1: 154 if pname in two_check_process_list: 155 PrintToLog("ERROR: pid of %s is not only one" % pname) 156 PrintToLog("SmokeTest find some fatal problems!") 157 PrintToLog("End of check, test failed!") 158 sys.exit(99) 159 else: 160 PrintToLog("WARNNING: pid of %s is not only one" % pname) 161 except: 162 second_check_lose_process.append(pname) 163 164 if second_check_lose_process: 165 PrintToLog("ERROR: pid of %s is not exist" % pname) 166 PrintToLog("SmokeTest find some fatal problems!") 167 PrintToLog("End of check, test failed!") 168 sys.exit(99) 169 else: 170 PrintToLog("Second processes check is ok\n") 171 172 target_dir = os.path.normpath(os.path.join(args.archive_path, "system")) 173 PrintToLog(target_dir) 174 ret_size = GetDirSize(target_dir)/1024/1024 175 PrintToLog('Size of system is :{:.2f}M'.format(ret_size)) 176 if ret_size > 50: 177 PrintToLog('ERROR: Size of system({:.2f}M) is over the upper limit(50M)'.format(ret_size)) 178 PrintToLog("End of check, test failed!") 179 sys.exit(99) 180 181 target_dir = os.path.normpath(os.path.join(args.archive_path, "data")) 182 ret_size = GetDirSize(target_dir)/1024/1024 183 PrintToLog('Size of data is :{:.2f}M'.format(ret_size)) 184 if ret_size > 50: 185 PrintToLog('ERROR: Size of data({:.2f}M) is over the upper limit(50M)'.format(ret_size)) 186 PrintToLog("End of check, test failed!") 187 sys.exit(99) 188 189 target_dir = os.path.normpath(os.path.join(args.archive_path, "updater")) 190 ret_size = GetDirSize(target_dir)/1024/1024 191 PrintToLog('Size of updater is :{:.2f}M'.format(ret_size)) 192 if ret_size > 50: 193 PrintToLog('ERROR: Size of updater({:.2f}M) is over the upper limit(50M)'.format(ret_size)) 194 PrintToLog("End of check, test failed!") 195 sys.exit(99) 196 197 target_dir = os.path.normpath(os.path.join(args.archive_path, "vendor")) 198 ret_size = GetDirSize(target_dir)/1024/1024 199 PrintToLog('Size of vendor is :{:.2f}M'.format(ret_size)) 200 if ret_size > 50: 201 PrintToLog('ERROR: Size of vendor({:.2f}M) is over the upper limit(50M)'.format(ret_size)) 202 PrintToLog("End of check, test failed!") 203 sys.exit(99) 204 205 PrintToLog("All testcase is ok") 206 PrintToLog("End of check, test succeeded!") 207 sys.exit(0) 208