• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2024 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import pytest
17import re
18import threading
19import time
20import json
21from utils import *
22
23PSS_TOTAL_INDEX = 0
24SWAP_PSS_INDEX = 6
25COLUMN_NUM = 10
26
27def ParseMemoryOutput(output):
28    memory_data = {}
29    for line in output.split("\n"):
30        if re.search(r"\d", line) is None:
31            continue
32        key = re.search(r'\s*([a-zA-Z.]+(?:\s+[a-zA-Z.]+)*)\s*', line).group(1).strip();
33        memory_data[key] = [int(val) for val in re.findall(r'\d+', line)]
34    return memory_data
35
36def Check64bitUnsignedOverflow(memory_data):
37    value = int(memory_data)
38    if value < 0 or value >= (1 << 64):
39        return True
40
41def ParseMemoryUsageOverflowOutput(output):
42    for line in output.split("\n"):
43        if re.search(r"\d", line) is None:
44            continue
45        pattern = r"([\w\.\-:]+?)\(pid=(\d+)\):\s*(\d+)\s+kB"
46        matches = re.findall(pattern, line)
47        for match in matches:
48            service_name, pid, memory_usage = match
49            if Check64bitUnsignedOverflow(memory_usage):
50                print(f"Service Name: {service_name}")
51                print(f"PID: {pid}")
52                print(f"The memory usage of this process exceeds the maximum value.")
53                return False
54
55    return True
56
57@print_check_result
58def CheckTotalPss(memory_data):
59    pss_sum = sum([val[0] for key,val in memory_data.items() if key in ["GL", "Graph", "guard", "native heap", "AnonPage other", "stack", ".so", ".ttf", "dev", "dmabuf", "FilePage other"]])
60    return memory_data["Total"][PSS_TOTAL_INDEX] == (pss_sum + memory_data["Total"][SWAP_PSS_INDEX])
61
62@print_check_result
63def CheckDataLength(memory_data):
64    GL_mem = memory_data.get("GL", [])
65    assert len(GL_mem) == COLUMN_NUM, "GL memory data error"
66    Graph_mem = memory_data.get("Graph", [])
67    assert len(Graph_mem) == COLUMN_NUM, "Graph memory data error"
68    guard_mem = memory_data.get("guard", [])
69    assert len(guard_mem) == COLUMN_NUM, "Guard memory data error"
70    native_heap_mem = memory_data.get("native heap", [])
71    assert len(native_heap_mem) == COLUMN_NUM, "native heap memory data error"
72    AnonPage_other = memory_data.get("AnonPage other", [])
73    assert len(AnonPage_other) == COLUMN_NUM, "AnonPage other memory data error"
74    stack_mem = memory_data.get("stack", [])
75    assert len(stack_mem) == COLUMN_NUM, "stack memory data error"
76    so_mem = memory_data.get(".so", [])
77    assert len(so_mem) == COLUMN_NUM, ".so memory data error"
78    dev_mem = memory_data.get("dev", [])
79    assert len(dev_mem) == COLUMN_NUM, "dev memory data error"
80    FilePage_other = memory_data.get("FilePage other", [])
81    assert len(FilePage_other) == COLUMN_NUM, "FilePage other memory data error"
82    Total_mem = memory_data.get("Total", [])
83    assert len(Total_mem) == COLUMN_NUM, "Total memory data error"
84    return True
85
86@print_check_result
87def CheckDmaGraphMem(memory_data):
88    return memory_data["Dma"][0] == memory_data["Graph"][PSS_TOTAL_INDEX]
89
90def CheckHidumperMemoryWithPidOutput(output):
91    memory_data = ParseMemoryOutput(output)
92    ret = all(check(memory_data) for check in [CheckDmaGraphMem, CheckTotalPss, CheckDataLength])
93    return ret
94
95def CheckHidumperMemoryWithoutPidOutput(output):
96    graph = re.search(r"Graph\((\d+) kB\)", output).group(1)
97    dma = re.search(r"DMA\((\d+) kB\)", output).group(1)
98    return int(graph) == int(dma)
99
100def CheckHidumperMemoryUsagebySizeOutput(output):
101    memory_data = ParseMemoryUsageOverflowOutput(output)
102    return memory_data
103
104def CheckHidumperGraphAndDmaOutput(output):
105    graph = re.search(r"Graph\((\d+) kB\)", output).group(1)
106    dma = re.search(r"DMA\((\d+) kB\)", output).group(1)
107
108    file_path = f"{OUTPUT_PATH}/hidumper_redirect.txt"
109    with open(file_path, 'r') as file:
110        lines = file.readlines()
111
112    header_skipped = False
113    sum_graph_value = 0
114    for line in lines:
115        if not header_skipped and "Total Memory Usage by PID:" in line:
116            header_skipped = True
117            continue
118        if line.strip() == '':
119            break
120        parts = line.split()
121        if len(parts) > 7:
122            graph_value = parts[13]
123            dma_value = parts[15]
124            if "Dma" in graph_value or "PurgPin" in dma_value:
125                continue
126            if graph_value == dma_value:
127                sum_graph_value += int(graph_value)
128
129    return sum_graph_value == int(dma)
130
131def CheckHidumperPurgSumAndPurgPinOutput(output):
132    purgSum = re.search(r"Total PurgSum:(\d+) kB", output).group(1)
133    purgPin = re.search(r"Total PurgPin:(\d+) kB", output).group(1)
134
135    output = subprocess.check_output(f'hdc shell cat /proc/meminfo', shell=True, text=True, encoding="utf-8")
136    active = re.search(r"Active\(purg\):\s*(\d+) kB", output).group(1)
137    inactive = re.search(r"Inactive\(purg\):\s*(\d+) kB", output).group(1)
138    pined = re.search(r"Pined\(purg\):\s*(\d+) kB", output).group(1)
139
140    return int(purgSum) == int(active) + int(inactive) and int(purgPin) == int(pined)
141
142def CheckHidumperMemoryValueOutput(output):
143    graph_dma_value = CheckHidumperGraphAndDmaOutput(output)
144    purgSum_purgPin_value = CheckHidumperPurgSumAndPurgPinOutput(output)
145
146    return graph_dma_value and purgSum_purgPin_value
147
148# 要并发执行的任务函数
149def ThreadTask(thread_name, delay):
150    print(f"Thread {thread_name} starting")
151    command = "hidumper --mem > /data/log/hidumper/" + thread_name
152    output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8")
153    print(f"Thread {thread_name} finished")
154
155def IsRequestErrorInOutput():
156    flag = 0
157    for i in range(6):
158        command = "cat /data/log/hidumper/Thread-" + str(i)
159        output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8")
160        if "request error" in output:
161            command = "lsof |grep Thread-" + str(i)
162            output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8")
163            print(f"output:{output}\n")
164            assert "Thread-" + str(i) not in output
165            flag = 1
166            break
167    return flag
168
169class TestHidumperMemory:
170    @pytest.mark.L0
171    def test_memory_all(self):
172        command = f"hidumper --mem"
173        hidumperTmpCmd = "OPT:mem SUB_OPT:"
174        # 校验命令行输出
175        CheckCmd(command, CheckHidumperMemoryWithoutPidOutput, hidumperTmpCmd)
176        # 校验命令行重定向输出
177        CheckCmdRedirect(command, CheckHidumperMemoryWithoutPidOutput, None, hidumperTmpCmd)
178        # 校验命令行输出到zip文件
179        CheckCmdZip(command, CheckHidumperMemoryWithoutPidOutput)
180
181    @pytest.mark.L0
182    def test_memory_usage_size(self):
183        command = f"hidumper --mem"
184        hidumperTmpCmd = "OPT:mem SUB_OPT:"
185        # 校验所有进程的内存大小
186        CheckCmd(command, CheckHidumperMemoryUsagebySizeOutput, hidumperTmpCmd)
187        # 校验Total PurgSum/Total PurgPin
188        CheckCmdRedirect(command, CheckHidumperMemoryValueOutput, None, hidumperTmpCmd)
189        # 校验命令行输出
190        CheckCmd(command, CheckHidumperMemoryWithoutPidOutput, hidumperTmpCmd)
191        # 校验命令行重定向输出
192        CheckCmdRedirect(command, CheckHidumperMemoryWithoutPidOutput, None, hidumperTmpCmd)
193        # 校验命令行输出到zip文件
194        CheckCmdZip(command, CheckHidumperMemoryWithoutPidOutput)
195
196    @pytest.mark.L0
197    def test_memory_pid(self):
198        processName = "render_service"
199        pid = GetPidByProcessName(processName)
200        command = f"hidumper --mem {pid}"
201        hidumperTmpCmd = "OPT:mem SUB_OPT:"
202        # 校验命令行输出
203        CheckCmd(command, CheckHidumperMemoryWithPidOutput, hidumperTmpCmd)
204        # 校验命令行重定向输出
205        CheckCmdRedirect(command, CheckHidumperMemoryWithPidOutput, None, hidumperTmpCmd)
206        # 校验命令行输出到zip文件
207        CheckCmdZip(command, CheckHidumperMemoryWithPidOutput)
208
209    @pytest.mark.L3
210    def test_memory_error_pid(self):
211        command = f"hidumper --mem 2147483647;hidumper --mem -2147483647"
212        hidumperTmpCmd = "OPT:mem SUB_OPT:"
213        # 校验命令行输出
214        CheckCmd(command, lambda output : "hidumper: No such process: 2147483647\nhidumper: option pid missed. 2" in output, hidumperTmpCmd)
215        command = f"hidumper --mem 2147483648;hidumper --mem -2147483648"
216        CheckCmd(command, lambda output : "hidumper: option pid missed. 2" in output, hidumperTmpCmd)
217
218    @pytest.mark.L1
219    def test_memory_note(self):
220        if not IsOpenHarmonyVersion():
221            pytest.skip("this testcase is only support in OH")
222        else:
223            # 唤醒屏幕
224            subprocess.check_call("hdc shell power-shell wakeup", shell=True)
225            # 设置屏幕常亮
226            subprocess.check_call("hdc shell power-shell setmode 602", shell=True)
227            time.sleep(3)
228            # 解锁屏幕
229            subprocess.check_call("hdc shell uinput -T -g 100 100 500 500", shell=True)
230            time.sleep(3)
231            # 启动备忘录应用
232            subprocess.check_call("hdc shell aa start -a MainAbility -b com.ohos.note", shell=True)
233            time.sleep(3)
234            output = subprocess.check_output(f"hdc shell pidof com.ohos.note", shell=True, text=True, encoding="utf-8")
235            note_pid = output.strip('\n')
236            assert len(note_pid) != 0
237            # 新建备忘录
238            subprocess.check_call("hdc shell uinput -S -c 620 100", shell=True)
239            time.sleep(3)
240            output = subprocess.check_output(f"hdc shell pidof com.ohos.note:render", shell=True, text=True, encoding="utf-8")
241            note_render_pid = output.strip('\n')
242            assert len(note_render_pid) != 0
243            command = f"hidumper --mem {note_render_pid}"
244            # 校验命令行输出
245            CheckCmd(command, CheckHidumperMemoryWithPidOutput)
246            # 校验命令行重定向输出
247            CheckCmdRedirect(command, CheckHidumperMemoryWithPidOutput)
248            # 校验命令行输出到zip文件
249            CheckCmdZip(command, CheckHidumperMemoryWithPidOutput)
250
251    @pytest.mark.L1
252    def test_memory_mutilthread(self):
253        # 创建线程列表
254        threads = []
255        # 创建线程并启动
256        for i in range(6):
257            t = threading.Thread(target=ThreadTask, args=("Thread-{}".format(i), i))
258            threads.append(t)
259            t.start()
260        # 等待所有线程完成
261        for t in threads:
262            t.join()
263        # 校验超过请求数5的命令对应的fd中包含request error信息
264        assert IsRequestErrorInOutput()
265
266