1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2024 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import os 17import zipfile 18import subprocess 19import re 20import time 21import json 22 23OUTPUT_PATH = "testModule/output" 24 25def checkFile(filePath, checkFunction): 26 """ 27 description: 首先检查文件是否存在,然后检查文件内容是否符合要求 28 file_path: str, the path of the file to be checked 29 check_function: 校验函数,校验file_path文件内容是否符合要求,是则返回True,否则返回False 30 """ 31 assert os.path.exists(filePath), f"The file {filePath} does not exist." 32 with open(filePath, "r", encoding="utf-8") as f: 33 output = f.read() 34 return checkFunction(output) 35 36def checkZipFile(zipFilePath, checkFunction): 37 """ 38 description: 首先检查zip文件是否存在,然后解压文件,并检查解压后的log.txt文件内容是否符合要求 39 check_function: 校验函数,校验解压后的log.txt文件内容是否符合要求,是则返回True,否则返回False 40 """ 41 assert os.path.exists(zipFilePath), f"The file {zipFilePath} does not exist." 42 with zipfile.ZipFile(zipFilePath, 'r') as zip_ref: 43 # 解压所有文件到指定目录 44 dirname = os.path.dirname(zipFilePath) 45 zip_ref.extractall(dirname) 46 with open(f"{dirname}/log.txt", "r", encoding="utf-8") as f: 47 output = f.read() 48 return checkFunction(output) 49 50def print_check_result(func): 51 def wrapper(*args, **kwargs): 52 ret = func(*args, **kwargs) 53 if (ret): 54 print(f"func {func.__name__} success") 55 else: 56 print(f"func {func.__name__} failed") 57 return ret 58 return wrapper 59 60def GetPidByProcessName(processName): 61 pid = None 62 cmd = f"hdc shell \"pidof {processName}\"" 63 try: 64 pid = subprocess.check_output(cmd, shell=True, encoding="utf-8", text=True) 65 pid = int(pid.strip().split()[0]) 66 except subprocess.CalledProcessError as e: 67 print(f"Command failed: {cmd}\nError: {e}") 68 except Exception as e: 69 print(f"Unexpected error: {e}") 70 return pid 71 72def convert_string_to_matrix(data : str) -> list: 73 """ 74 description: 将字符串转换为矩阵 75 string: str, 字符串 76 """ 77 data = data.strip("-\n") 78 lines = data.split('\n') 79 matrix = [] 80 # 遍历每一行 81 for line in lines: 82 # 如果行是空的,跳过 83 if not line.strip(): 84 continue 85 # 分割每一列,去除空格,并转换为整数 86 row = [int(col.strip()) for col in line.split()] 87 matrix.append(row) 88 return matrix 89 90def CheckCmd(command, checkFunction, hidumperTmpCmd = ""): 91 if len(hidumperTmpCmd) != 0 and IsRootVersion(): 92 hisyseventOutput = GetHisyseventTmpFile() 93 lastWriteDay = GetLastWriteDay() 94 currentTime = GetDate() 95 output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8") 96 assert checkFunction(output) 97 if len(hidumperTmpCmd) != 0 and not IsOpenHarmonyVersion() and IsRootVersion(): 98 JudgeHisyseventReport(command, hisyseventOutput, hidumperTmpCmd, currentTime, lastWriteDay) 99 100def CheckCmdRedirect(command, checkFunction, filePath = None, hidumperTmpCmd = ""): 101 if len(hidumperTmpCmd) != 0 and IsRootVersion(): 102 hisyseventOutput = GetHisyseventTmpFile() 103 lastWriteDay = GetLastWriteDay() 104 currentTime = GetDate() 105 filePath = f"{OUTPUT_PATH}/hidumper_redirect.txt" if filePath is None else filePath 106 subprocess.check_output(f"hdc shell \"{command}\" > {filePath}", shell=True, text=True, encoding="utf-8") 107 assert checkFile(filePath, checkFunction = checkFunction) 108 if len(hidumperTmpCmd) != 0 and not IsOpenHarmonyVersion() and IsRootVersion(): 109 JudgeHisyseventReport(command, hisyseventOutput, hidumperTmpCmd, currentTime, lastWriteDay) 110 111def CheckCmdZip(command, checkFunction): 112 output = subprocess.check_output(f"hdc shell \"{command} --zip\"", shell=True, text=True, encoding="utf-8") 113 zipSourceFile = re.search("The result is:(.+)", output).group(1).strip() 114 zipTargetFile = f"{OUTPUT_PATH}/" + os.path.basename(zipSourceFile) 115 subprocess.check_output(f"hdc file recv {zipSourceFile} {zipTargetFile}", shell=True, text=True, encoding="utf-8") 116 assert checkZipFile(zipTargetFile, checkFunction = checkFunction) 117 118def IsLogVersion(): 119 output = subprocess.check_output("hdc shell param get const.product.software.version", shell=True, text=True, encoding="utf-8").strip() 120 return "log" in output 121 122def IsRootVersion(): 123 output = subprocess.check_output("hdc shell param get const.debuggable", shell=True, text=True, encoding="utf-8").strip() 124 return output == "1" 125 126def IsOpenHarmonyVersion(): 127 output = subprocess.check_output("hdc shell param get const.product.software.version", shell=True, text=True, encoding="utf-8").strip() 128 return "OpenHarmony" in output 129 130def GetHisyseventTmpFile(): 131 # 获取/data/log/hidumper/hisysevent.tmp文件内容 132 get_hisysevent_tmp_txt = "cat /data/log/hidumper/hisysevent.tmp" 133 output = subprocess.check_output(f"hdc shell \"{get_hisysevent_tmp_txt}\"", shell=True, text=True, encoding="utf-8") 134 if "No such file or directory" in output: 135 return "" 136 output = output.strip('\n') 137 return output 138 139def GetDate(): 140 formatted_time = subprocess.check_output(f"hdc shell date +%Y-%m-%d\ %H:%M:%S", shell=True, text=True, encoding="utf-8") 141 formatted_time = formatted_time.strip('\n') 142 return formatted_time 143 144def GetDateArray(formatted_time): 145 date_part = formatted_time.split(' ')[0] # 先获取日期部分,按空格分割取第一个元素 146 dateArray = date_part.split('-') # 再按 - 分割日期部分 147 return dateArray 148 149def GetLastWriteDay(): 150 lastWriteTime = subprocess.check_output(f"hdc shell stat -c %y /data/log/hidumper/hisysevent.tmp", shell=True, text=True, encoding="utf-8") 151 if "No such file or directory" in lastWriteTime: 152 return "" 153 lastWriteTime = lastWriteTime.strip(' ') 154 lastWriteDay = GetDateArray(lastWriteTime)[2] 155 return lastWriteDay 156 157def UpdateDay(): 158 currentTime = GetDate() 159 year = GetDateArray(currentTime)[0] 160 month = GetDateArray(currentTime)[1] 161 day = GetDateArray(currentTime)[2] 162 newDay = str(int(day) + 1) 163 date = year + "-" + month + "-" + newDay 164 dateCmd = f"hdc shell date \"{date}\"" 165 output = subprocess.check_output(dateCmd, shell=True, encoding="utf-8", text=True) 166 assert "00:00:00" in output 167 168# 校验是否上报hisysevent 169def JudgeHisyseventReport(command, output, hidumperTmpCmd, currentTime, lastWriteDay): 170 currentDay = GetDateArray(currentTime)[2] 171 # 执行hisysevent命令 172 hisyseventCmd = f"hisysevent -l -S \"{currentTime}\" |grep CMD_USAGE" 173 hisyseventOutput = subprocess.check_output(f"hdc shell \"{hisyseventCmd}\"", shell=True, text=True, encoding="utf-8") 174 if output == "": 175 print(f"hisysevent.tmp is not exist") 176 return 177 hisyseventFile = GetHisyseventTmpFile() 178 hisyseventFileArray = hisyseventFile.split('\n') 179 count = sum(string == hidumperTmpCmd for string in hisyseventFileArray) 180 assert count == 1 181 182def GetPathByAttribute(tree, key, value): 183 attributes = tree['attributes'] 184 if attributes is None: 185 print("tree contains no attributes") 186 return None 187 path = [] 188 if attributes.get(key) == value: 189 return path 190 for index, child in enumerate(tree['children']): 191 child_path = path + [index] 192 result = GetPathByAttribute(child, key, value) 193 if result is not None: 194 return child_path + result 195 return None 196 197def GetElementByPath(tree, path): 198 if len(path) == 1: 199 return tree['children'][path[0]] 200 return GetElementByPath(tree['children'][path[0]], path[1:]) 201 202def GetLocationByText(tree, text): 203 path = GetPathByAttribute(tree, "text", text) 204 if path is None or len(path) == 0: 205 print(f"text not find in layout file") 206 element = GetElementByPath(tree, path) 207 locations = element['attributes']['bounds'].replace('[', '').replace(']', ' ').replace(',',' ').strip().split() 208 return int((int(locations[0]) + int(locations[2])) / 2), int((int(locations[1]) + int(locations[3])) / 2) 209 210def GetLayoutTree(): 211 output = subprocess.check_output("hdc shell uitest dumpLayout", text=True) 212 path = output.strip().split(":")[-1] 213 output = subprocess.check_output(f"hdc file recv {path} {OUTPUT_PATH}/layout.json") 214 with open(f"{OUTPUT_PATH}/layout.json", encoding="utf-8") as f: 215 tree = json.load(f) 216 return tree 217 218def TouchButtonByText(text): 219 layoutTree = GetLayoutTree() 220 location = GetLocationByText(layoutTree, text) 221 output = subprocess.check_output(f"hdc shell uitest uiInput click {location[0]} {location[1]}") 222 223def CloseProcess(processName): 224 result = subprocess.run('hdc shell ps -ef', stdout=subprocess.PIPE) 225 output = result.stdout.decode('utf-8') 226 227 for line in output.splitlines(): 228 if processName in line: 229 parts = line.split() 230 pid = parts[1] 231 subprocess.run(['hdc', 'shell', 'kill', '-9', pid])