1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2024 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import pytest 17import re 18import threading 19import time 20import json 21from utils import * 22 23PSS_TOTAL_INDEX = 0 24SWAP_PSS_INDEX = 6 25COLUMN_NUM = 10 26 27def ParseMemoryOutput(output): 28 memory_data = {} 29 for line in output.split("\n"): 30 if re.search(r"\d", line) is None: 31 continue 32 key = re.search(r'\s*([a-zA-Z.]+(?:\s+[a-zA-Z.]+)*)\s*', line).group(1).strip(); 33 memory_data[key] = [int(val) for val in re.findall(r'\d+', line)] 34 return memory_data 35 36def Check64bitUnsignedOverflow(memory_data): 37 value = int(memory_data) 38 if value < 0 or value >= (1 << 64): 39 return True 40 41def ParseMemoryUsageOverflowOutput(output): 42 for line in output.split("\n"): 43 if re.search(r"\d", line) is None: 44 continue 45 pattern = r"([\w\.\-:]+?)\(pid=(\d+)\):\s*(\d+)\s+kB" 46 matches = re.findall(pattern, line) 47 for match in matches: 48 service_name, pid, memory_usage = match 49 if Check64bitUnsignedOverflow(memory_usage): 50 print(f"Service Name: {service_name}") 51 print(f"PID: {pid}") 52 print(f"The memory usage of this process exceeds the maximum value.") 53 return False 54 55 return True 56 57@print_check_result 58def CheckTotalPss(memory_data): 59 pss_sum = sum([val[0] for key,val in memory_data.items() if key in ["GL", "Graph", "guard", "native heap", "AnonPage other", "stack", ".so", ".ttf", "dev", "dmabuf", "FilePage other", "Ashmem"]]) 60 return memory_data["Total"][PSS_TOTAL_INDEX] == (pss_sum + memory_data["Total"][SWAP_PSS_INDEX]) 61 62@print_check_result 63def CheckDataLength(memory_data): 64 GL_mem = memory_data.get("GL", []) 65 assert len(GL_mem) == COLUMN_NUM, "GL memory data error" 66 Graph_mem = memory_data.get("Graph", []) 67 assert len(Graph_mem) == COLUMN_NUM, "Graph memory data error" 68 guard_mem = memory_data.get("guard", []) 69 assert len(guard_mem) == COLUMN_NUM, "Guard memory data error" 70 native_heap_mem = memory_data.get("native heap", []) 71 assert len(native_heap_mem) == COLUMN_NUM, "native heap memory data error" 72 AnonPage_other = memory_data.get("AnonPage other", []) 73 assert len(AnonPage_other) == COLUMN_NUM, "AnonPage other memory data error" 74 stack_mem = memory_data.get("stack", []) 75 assert len(stack_mem) == COLUMN_NUM, "stack memory data error" 76 so_mem = memory_data.get(".so", []) 77 assert len(so_mem) == COLUMN_NUM, ".so memory data error" 78 dev_mem = memory_data.get("dev", []) 79 assert len(dev_mem) == COLUMN_NUM, "dev memory data error" 80 FilePage_other = memory_data.get("FilePage other", []) 81 assert len(FilePage_other) == COLUMN_NUM, "FilePage other memory data error" 82 ashmem = memory_data.get("Ashmem", []) 83 assert len(ashmem) == COLUMN_NUM, "Ashmem memory data error" 84 Total_mem = memory_data.get("Total", []) 85 assert len(Total_mem) == COLUMN_NUM, "Total memory data error" 86 return True 87 88@print_check_result 89def CheckDmaGraphMem(memory_data): 90 return memory_data["Dma"][0] == memory_data["Graph"][PSS_TOTAL_INDEX] 91 92def CheckRenderServiceAshmemInfo(memory_data): 93 if IsOpenHarmonyVersion(): 94 pytest.skip("this testcase is only support in HO") 95 return True 96 else: 97 output = subprocess.check_output(f'hdc shell cat /proc/ashmem_process_info', shell=True, text=True, encoding="utf-8") 98 #Total ashmem of [render_service] virtual size is 33778396, physical size is 33734656 99 match = re.search(r'Total ashmem of \[render_service\] virtual size is \d+, physical size is (\d+) ', output) 100 if match: 101 render_service_ashmem = int(match.group(1)) / 1024 102 print(f"render_service_ashmem: {render_service_ashmem}") 103 ashmem = memory_data["Ashmem"][0] 104 print(f"memory_data ashmem: {ashmem}") 105 return memory_data["Ashmem"][0] == render_service_ashmem 106 else: 107 print(f"not find render_service_ashmem") 108 return False 109 110def CheckHidumperMemoryWithPidOutput(output): 111 memory_data = ParseMemoryOutput(output) 112 ret = all(check(memory_data) for check in [CheckDmaGraphMem, CheckTotalPss, CheckDataLength, CheckRenderServiceAshmemInfo]) 113 return ret 114 115def CheckHidumperMemoryWithoutPidOutput(output): 116 graph = re.search(r"Graph\((\d+) kB\)", output).group(1) 117 dma = re.search(r"DMA\((\d+) kB\)", output).group(1) 118 return int(graph) == int(dma) 119 120def CheckHidumperMemoryUsagebySizeOutput(output): 121 memory_data = ParseMemoryUsageOverflowOutput(output) 122 return memory_data 123 124def CheckHidumperGraphAndDmaOutput(output): 125 graph = re.search(r"Graph\((\d+) kB\)", output).group(1) 126 dma = re.search(r"DMA\((\d+) kB\)", output).group(1) 127 128 file_path = f"{OUTPUT_PATH}/hidumper_redirect.txt" 129 with open(file_path, 'r') as file: 130 lines = file.readlines() 131 132 header_skipped = False 133 sum_graph_value = 0 134 for line in lines: 135 if not header_skipped and "Total Memory Usage by PID:" in line: 136 header_skipped = True 137 continue 138 if line.strip() == '': 139 break 140 parts = line.split() 141 if len(parts) > 7: 142 graph_value = parts[13] 143 dma_value = parts[15] 144 if "Dma" in graph_value or "PurgPin" in dma_value: 145 continue 146 if graph_value == dma_value: 147 sum_graph_value += int(graph_value) 148 149 return sum_graph_value == int(dma) 150 151def CheckHidumperPurgSumAndPurgPinOutput(output): 152 purgSum = re.search(r"Total PurgSum:(\d+) kB", output).group(1) 153 purgPin = re.search(r"Total PurgPin:(\d+) kB", output).group(1) 154 155 output = subprocess.check_output(f'hdc shell cat /proc/meminfo', shell=True, text=True, encoding="utf-8") 156 active = re.search(r"Active\(purg\):\s*(\d+) kB", output).group(1) 157 inactive = re.search(r"Inactive\(purg\):\s*(\d+) kB", output).group(1) 158 pined = re.search(r"Pined\(purg\):\s*(\d+) kB", output).group(1) 159 160 return int(purgSum) == int(active) + int(inactive) and int(purgPin) == int(pined) 161 162def CheckHidumperMemoryValueOutput(output): 163 graph_dma_value = CheckHidumperGraphAndDmaOutput(output) 164 purgSum_purgPin_value = CheckHidumperPurgSumAndPurgPinOutput(output) 165 166 return graph_dma_value and purgSum_purgPin_value 167 168# 要并发执行的任务函数 169def ThreadTask(thread_name, delay): 170 print(f"Thread {thread_name} starting") 171 command = "hidumper --mem > /data/log/hidumper/" + thread_name 172 output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8") 173 print(f"Thread {thread_name} finished") 174 175def IsRequestErrorInOutput(): 176 flag = 0 177 for i in range(6): 178 command = "cat /data/log/hidumper/Thread-" + str(i) 179 output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8") 180 if "request error" in output: 181 command = "lsof |grep Thread-" + str(i) 182 output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8") 183 print(f"output:{output}\n") 184 assert "Thread-" + str(i) not in output 185 flag = 1 186 break 187 return flag 188 189class TestHidumperMemory: 190 @pytest.mark.L0 191 def test_memory_all(self): 192 command = f"hidumper --mem" 193 hidumperTmpCmd = "OPT:mem SUB_OPT:" 194 # 校验命令行输出 195 CheckCmd(command, CheckHidumperMemoryWithoutPidOutput, hidumperTmpCmd) 196 # 校验命令行重定向输出 197 CheckCmdRedirect(command, CheckHidumperMemoryWithoutPidOutput, None, hidumperTmpCmd) 198 # 校验命令行输出到zip文件 199 CheckCmdZip(command, CheckHidumperMemoryWithoutPidOutput) 200 201 @pytest.mark.L0 202 def test_memory_usage_size(self): 203 command = f"hidumper --mem" 204 hidumperTmpCmd = "OPT:mem SUB_OPT:" 205 # 校验所有进程的内存大小 206 CheckCmd(command, CheckHidumperMemoryUsagebySizeOutput, hidumperTmpCmd) 207 # 校验Total PurgSum/Total PurgPin 208 CheckCmdRedirect(command, CheckHidumperMemoryValueOutput, None, hidumperTmpCmd) 209 # 校验命令行输出 210 CheckCmd(command, CheckHidumperMemoryWithoutPidOutput, hidumperTmpCmd) 211 # 校验命令行重定向输出 212 CheckCmdRedirect(command, CheckHidumperMemoryWithoutPidOutput, None, hidumperTmpCmd) 213 # 校验命令行输出到zip文件 214 CheckCmdZip(command, CheckHidumperMemoryWithoutPidOutput) 215 216 @pytest.mark.L0 217 def test_memory_pid(self): 218 processName = "render_service" 219 pid = GetPidByProcessName(processName) 220 command = f"hidumper --mem {pid}" 221 hidumperTmpCmd = "OPT:mem SUB_OPT:" 222 # 校验命令行输出 223 CheckCmd(command, CheckHidumperMemoryWithPidOutput, hidumperTmpCmd) 224 # 校验命令行重定向输出 225 CheckCmdRedirect(command, CheckHidumperMemoryWithPidOutput, None, hidumperTmpCmd) 226 # 校验命令行输出到zip文件 227 CheckCmdZip(command, CheckHidumperMemoryWithPidOutput) 228 229 @pytest.mark.L3 230 def test_memory_error_pid(self): 231 command = f"hidumper --mem 2147483647;hidumper --mem -2147483647" 232 hidumperTmpCmd = "OPT:mem SUB_OPT:" 233 # 校验命令行输出 234 CheckCmd(command, lambda output : "hidumper: No such process: 2147483647\nhidumper: option pid missed. 2" in output, hidumperTmpCmd) 235 command = f"hidumper --mem 2147483648;hidumper --mem -2147483648" 236 CheckCmd(command, lambda output : "hidumper: option pid missed. 2" in output, hidumperTmpCmd) 237 238 @pytest.mark.L1 239 def test_memory_note(self): 240 if not IsOpenHarmonyVersion(): 241 pytest.skip("this testcase is only support in OH") 242 else: 243 # 唤醒屏幕 244 subprocess.check_call("hdc shell power-shell wakeup", shell=True) 245 # 设置屏幕常亮 246 subprocess.check_call("hdc shell power-shell setmode 602", shell=True) 247 time.sleep(3) 248 # 解锁屏幕 249 subprocess.check_call("hdc shell uinput -T -g 100 100 500 500", shell=True) 250 time.sleep(3) 251 # 启动备忘录应用 252 subprocess.check_call("hdc shell aa start -a MainAbility -b com.ohos.note", shell=True) 253 time.sleep(3) 254 output = subprocess.check_output(f"hdc shell pidof com.ohos.note", shell=True, text=True, encoding="utf-8") 255 note_pid = output.strip('\n') 256 assert len(note_pid) != 0 257 # 新建备忘录 258 subprocess.check_call("hdc shell uinput -S -c 620 100", shell=True) 259 time.sleep(3) 260 output = subprocess.check_output(f"hdc shell pidof com.ohos.note:render", shell=True, text=True, encoding="utf-8") 261 note_render_pid = output.strip('\n') 262 assert len(note_render_pid) != 0 263 command = f"hidumper --mem {note_render_pid}" 264 # 校验命令行输出 265 CheckCmd(command, CheckHidumperMemoryWithPidOutput) 266 # 校验命令行重定向输出 267 CheckCmdRedirect(command, CheckHidumperMemoryWithPidOutput) 268 # 校验命令行输出到zip文件 269 CheckCmdZip(command, CheckHidumperMemoryWithPidOutput) 270 271 @pytest.mark.L1 272 def test_memory_mutilthread(self): 273 # 创建线程列表 274 threads = [] 275 # 创建线程并启动 276 for i in range(6): 277 t = threading.Thread(target=ThreadTask, args=("Thread-{}".format(i), i)) 278 threads.append(t) 279 t.start() 280 # 等待所有线程完成 281 for t in threads: 282 t.join() 283 # 校验超过请求数5的命令对应的fd中包含request error信息 284 assert IsRequestErrorInOutput() 285 286