• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2024 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import os
17import zipfile
18import subprocess
19import re
20import time
21import json
22
23OUTPUT_PATH = "testModule/output"
24
25def checkFile(filePath, checkFunction):
26    """
27    description: 首先检查文件是否存在,然后检查文件内容是否符合要求
28    file_path: str, the path of the file to be checked
29    check_function: 校验函数,校验file_path文件内容是否符合要求,是则返回True,否则返回False
30    """
31    assert os.path.exists(filePath), f"The file {filePath} does not exist."
32    with open(filePath, "r", encoding="utf-8") as f:
33        output = f.read()
34    return checkFunction(output)
35
36def checkZipFile(zipFilePath, checkFunction):
37    """
38    description: 首先检查zip文件是否存在,然后解压文件,并检查解压后的log.txt文件内容是否符合要求
39    check_function: 校验函数,校验解压后的log.txt文件内容是否符合要求,是则返回True,否则返回False
40    """
41    assert os.path.exists(zipFilePath), f"The file {zipFilePath} does not exist."
42    with zipfile.ZipFile(zipFilePath, 'r') as zip_ref:
43        # 解压所有文件到指定目录
44        dirname = os.path.dirname(zipFilePath)
45        zip_ref.extractall(dirname)
46        with open(f"{dirname}/log.txt", "r", encoding="utf-8") as f:
47            output = f.read()
48    return checkFunction(output)
49
50def print_check_result(func):
51    def wrapper(*args, **kwargs):
52        ret = func(*args, **kwargs)
53        if (ret):
54            print(f"func {func.__name__} success")
55        else:
56            print(f"func {func.__name__} failed")
57        return ret
58    return wrapper
59
60def GetPidByProcessName(processName):
61    pid = None
62    cmd = f"hdc shell \"pidof {processName}\""
63    try:
64        pid = subprocess.check_output(cmd, shell=True, encoding="utf-8", text=True)
65        pid = int(pid.strip().split()[0])
66    except subprocess.CalledProcessError as e:
67        print(f"Command failed: {cmd}\nError: {e}")
68    except Exception as e:
69        print(f"Unexpected error: {e}")
70    return pid
71
72def convert_string_to_matrix(data : str) -> list:
73    """
74    description: 将字符串转换为矩阵
75    string: str, 字符串
76    """
77    data = data.strip("-\n")
78    lines = data.split('\n')
79    matrix = []
80    # 遍历每一行
81    for line in lines:
82        # 如果行是空的,跳过
83        if not line.strip():
84            continue
85        # 分割每一列,去除空格,并转换为整数
86        row = [int(col.strip()) for col in line.split()]
87        matrix.append(row)
88    return matrix
89
90def CheckCmd(command, checkFunction, hidumperTmpCmd = ""):
91    if len(hidumperTmpCmd) != 0 and IsRootVersion():
92        hisyseventOutput = GetHisyseventTmpFile()
93        lastWriteDay = GetLastWriteDay()
94        currentTime = GetDate()
95    output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8")
96    assert checkFunction(output)
97    if len(hidumperTmpCmd) != 0 and not IsOpenHarmonyVersion() and IsRootVersion():
98        JudgeHisyseventReport(command, hisyseventOutput, hidumperTmpCmd, currentTime, lastWriteDay)
99
100def CheckCmdRedirect(command, checkFunction, filePath = None, hidumperTmpCmd = ""):
101    if len(hidumperTmpCmd) != 0 and IsRootVersion():
102        hisyseventOutput = GetHisyseventTmpFile()
103        lastWriteDay = GetLastWriteDay()
104        currentTime = GetDate()
105    filePath = f"{OUTPUT_PATH}/hidumper_redirect.txt" if filePath is None else filePath
106    subprocess.check_output(f"hdc shell \"{command}\" > {filePath}", shell=True, text=True, encoding="utf-8")
107    assert checkFile(filePath, checkFunction = checkFunction)
108    if len(hidumperTmpCmd) != 0 and not IsOpenHarmonyVersion() and IsRootVersion():
109        JudgeHisyseventReport(command, hisyseventOutput, hidumperTmpCmd, currentTime, lastWriteDay)
110
111def CheckCmdZip(command, checkFunction):
112    output = subprocess.check_output(f"hdc shell \"{command} --zip\"", shell=True, text=True, encoding="utf-8")
113    zipSourceFile = re.search("The result is:(.+)", output).group(1).strip()
114    zipTargetFile = f"{OUTPUT_PATH}/" + os.path.basename(zipSourceFile)
115    subprocess.check_output(f"hdc file recv {zipSourceFile} {zipTargetFile}", shell=True, text=True, encoding="utf-8")
116    assert checkZipFile(zipTargetFile, checkFunction = checkFunction)
117
118def IsLogVersion():
119    output = subprocess.check_output("hdc shell param get const.product.software.version", shell=True, text=True, encoding="utf-8").strip()
120    return "log" in output
121
122def IsRootVersion():
123    output = subprocess.check_output("hdc shell param get const.debuggable", shell=True, text=True, encoding="utf-8").strip()
124    return output == "1"
125
126def IsOpenHarmonyVersion():
127    output = subprocess.check_output("hdc shell param get const.product.software.version", shell=True, text=True, encoding="utf-8").strip()
128    return "OpenHarmony" in output
129
130def GetHisyseventTmpFile():
131    # 获取/data/log/hidumper/hisysevent.tmp文件内容
132    get_hisysevent_tmp_txt = "cat /data/log/hidumper/hisysevent.tmp"
133    output = subprocess.check_output(f"hdc shell \"{get_hisysevent_tmp_txt}\"", shell=True, text=True, encoding="utf-8")
134    if "No such file or directory" in output:
135        return ""
136    output = output.strip('\n')
137    return output
138
139def GetDate():
140    formatted_time = subprocess.check_output(f"hdc shell date +%Y-%m-%d\ %H:%M:%S", shell=True, text=True, encoding="utf-8")
141    formatted_time = formatted_time.strip('\n')
142    return formatted_time
143
144def GetDateArray(formatted_time):
145    date_part = formatted_time.split(' ')[0]  # 先获取日期部分,按空格分割取第一个元素
146    dateArray = date_part.split('-')  # 再按 - 分割日期部分
147    return dateArray
148
149def GetLastWriteDay():
150    lastWriteTime = subprocess.check_output(f"hdc shell stat -c %y /data/log/hidumper/hisysevent.tmp", shell=True, text=True, encoding="utf-8")
151    if "No such file or directory" in lastWriteTime:
152        return ""
153    lastWriteTime = lastWriteTime.strip(' ')
154    lastWriteDay = GetDateArray(lastWriteTime)[2]
155    return lastWriteDay
156
157def UpdateDay():
158    currentTime = GetDate()
159    year = GetDateArray(currentTime)[0]
160    month = GetDateArray(currentTime)[1]
161    day = GetDateArray(currentTime)[2]
162    newDay = str(int(day) + 1)
163    date = year + "-" + month + "-" + newDay
164    dateCmd = f"hdc shell date \"{date}\""
165    output = subprocess.check_output(dateCmd, shell=True, encoding="utf-8", text=True)
166    assert "00:00:00" in output
167
168# 校验是否上报hisysevent
169def JudgeHisyseventReport(command, output, hidumperTmpCmd, currentTime, lastWriteDay):
170    currentDay = GetDateArray(currentTime)[2]
171    # 执行hisysevent命令
172    hisyseventCmd = f"hisysevent -l -S \"{currentTime}\" |grep CMD_USAGE"
173    hisyseventOutput = subprocess.check_output(f"hdc shell \"{hisyseventCmd}\"", shell=True, text=True, encoding="utf-8")
174    if output == "":
175        print(f"hisysevent.tmp is not exist")
176        return
177    hisyseventFile = GetHisyseventTmpFile()
178    hisyseventFileArray = hisyseventFile.split('\n')
179    count = sum(string == hidumperTmpCmd for string in hisyseventFileArray)
180    assert count == 1
181
182def GetPathByAttribute(tree, key, value):
183    attributes = tree['attributes']
184    if attributes is None:
185        print("tree contains no attributes")
186        return None
187    path = []
188    if attributes.get(key) == value:
189        return path
190    for index, child in enumerate(tree['children']):
191        child_path = path + [index]
192        result = GetPathByAttribute(child, key, value)
193        if result is not None:
194            return child_path + result
195    return None
196
197def GetElementByPath(tree, path):
198    if len(path) == 1:
199        return tree['children'][path[0]]
200    return GetElementByPath(tree['children'][path[0]], path[1:])
201
202def GetLocationByText(tree, text):
203    path = GetPathByAttribute(tree, "text", text)
204    if path is None or len(path) == 0:
205        print(f"text not find in layout file")
206    element = GetElementByPath(tree, path)
207    locations = element['attributes']['bounds'].replace('[', '').replace(']', ' ').replace(',',' ').strip().split()
208    return int((int(locations[0]) + int(locations[2])) / 2), int((int(locations[1]) + int(locations[3])) / 2)
209
210def GetLayoutTree():
211    output = subprocess.check_output("hdc shell uitest dumpLayout", text=True)
212    path = output.strip().split(":")[-1]
213    output = subprocess.check_output(f"hdc file recv {path} {OUTPUT_PATH}/layout.json")
214    with open(f"{OUTPUT_PATH}/layout.json", encoding="utf-8") as f:
215        tree = json.load(f)
216    return tree
217
218def TouchButtonByText(text):
219    layoutTree = GetLayoutTree()
220    location = GetLocationByText(layoutTree, text)
221    output = subprocess.check_output(f"hdc shell uitest uiInput click {location[0]} {location[1]}")
222
223def CloseProcess(processName):
224    result = subprocess.run('hdc shell ps -ef', stdout=subprocess.PIPE)
225    output = result.stdout.decode('utf-8')
226
227    for line in output.splitlines():
228        if processName in line:
229            parts = line.split()
230            pid = parts[1]
231            subprocess.run(['hdc', 'shell', 'kill', '-9', pid])