• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2024 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import pytest
17import re
18import threading
19import time
20import json
21from utils import *
22
23PSS_TOTAL_INDEX = 0
24SWAP_PSS_INDEX = 6
25COLUMN_NUM = 10
26
27def ParseMemoryOutput(output):
28    memory_data = {}
29    for line in output.split("\n"):
30        if re.search(r"\d", line) is None:
31            continue
32        key = re.search(r'\s*([a-zA-Z.]+(?:\s+[a-zA-Z.]+)*)\s*', line).group(1).strip();
33        memory_data[key] = [int(val) for val in re.findall(r'\d+', line)]
34    return memory_data
35
36def Check64bitUnsignedOverflow(memory_data):
37    value = int(memory_data)
38    if value < 0 or value >= (1 << 64):
39        return True
40
41def ParseMemoryUsageOverflowOutput(output):
42    for line in output.split("\n"):
43        if re.search(r"\d", line) is None:
44            continue
45        pattern = r"([\w\.\-:]+?)\(pid=(\d+)\):\s*(\d+)\s+kB"
46        matches = re.findall(pattern, line)
47        for match in matches:
48            service_name, pid, memory_usage = match
49            if Check64bitUnsignedOverflow(memory_usage):
50                print(f"Service Name: {service_name}")
51                print(f"PID: {pid}")
52                print(f"The memory usage of this process exceeds the maximum value.")
53                return False
54
55    return True
56
57@print_check_result
58def CheckTotalPss(memory_data):
59    pss_sum = sum([val[0] for key,val in memory_data.items() if key in ["GL", "Graph", "guard", "native heap", "AnonPage other", "stack", ".so", ".ttf", "dev", "dmabuf", "FilePage other", "Ashmem"]])
60    return memory_data["Total"][PSS_TOTAL_INDEX] == (pss_sum + memory_data["Total"][SWAP_PSS_INDEX])
61
62@print_check_result
63def CheckDataLength(memory_data):
64    GL_mem = memory_data.get("GL", [])
65    assert len(GL_mem) == COLUMN_NUM, "GL memory data error"
66    Graph_mem = memory_data.get("Graph", [])
67    assert len(Graph_mem) == COLUMN_NUM, "Graph memory data error"
68    guard_mem = memory_data.get("guard", [])
69    assert len(guard_mem) == COLUMN_NUM, "Guard memory data error"
70    native_heap_mem = memory_data.get("native heap", [])
71    assert len(native_heap_mem) == COLUMN_NUM, "native heap memory data error"
72    AnonPage_other = memory_data.get("AnonPage other", [])
73    assert len(AnonPage_other) == COLUMN_NUM, "AnonPage other memory data error"
74    stack_mem = memory_data.get("stack", [])
75    assert len(stack_mem) == COLUMN_NUM, "stack memory data error"
76    so_mem = memory_data.get(".so", [])
77    assert len(so_mem) == COLUMN_NUM, ".so memory data error"
78    dev_mem = memory_data.get("dev", [])
79    assert len(dev_mem) == COLUMN_NUM, "dev memory data error"
80    FilePage_other = memory_data.get("FilePage other", [])
81    assert len(FilePage_other) == COLUMN_NUM, "FilePage other memory data error"
82    ashmem = memory_data.get("Ashmem", [])
83    assert len(ashmem) == COLUMN_NUM, "Ashmem memory data error"
84    Total_mem = memory_data.get("Total", [])
85    assert len(Total_mem) == COLUMN_NUM, "Total memory data error"
86    return True
87
88@print_check_result
89def CheckDmaGraphMem(memory_data):
90    return memory_data["Dma"][0] == memory_data["Graph"][PSS_TOTAL_INDEX]
91
92def CheckRenderServiceAshmemInfo(memory_data):
93    if IsOpenHarmonyVersion():
94        pytest.skip("this testcase is only support in HO")
95        return True
96    else:
97        output = subprocess.check_output(f'hdc shell cat /proc/ashmem_process_info', shell=True, text=True, encoding="utf-8")
98        #Total ashmem  of [render_service] virtual size is  33778396, physical size is 33734656
99        match = re.search(r'Total ashmem  of \[render_service\] virtual size is  \d+, physical size is (\d+) ', output)
100        if match:
101            render_service_ashmem = int(match.group(1)) / 1024
102            print(f"render_service_ashmem: {render_service_ashmem}")
103            ashmem = memory_data["Ashmem"][0]
104            print(f"memory_data ashmem: {ashmem}")
105            return memory_data["Ashmem"][0] == render_service_ashmem
106        else:
107            print(f"not find render_service_ashmem")
108            return False
109
110def CheckHidumperMemoryWithPidOutput(output):
111    memory_data = ParseMemoryOutput(output)
112    ret = all(check(memory_data) for check in [CheckDmaGraphMem, CheckTotalPss, CheckDataLength, CheckRenderServiceAshmemInfo])
113    return ret
114
115def CheckHidumperMemoryWithoutPidOutput(output):
116    graph = re.search(r"Graph\((\d+) kB\)", output).group(1)
117    dma = re.search(r"DMA\((\d+) kB\)", output).group(1)
118    return int(graph) == int(dma)
119
120def CheckHidumperMemoryUsagebySizeOutput(output):
121    memory_data = ParseMemoryUsageOverflowOutput(output)
122    return memory_data
123
124def CheckHidumperGraphAndDmaOutput(output):
125    graph = re.search(r"Graph\((\d+) kB\)", output).group(1)
126    dma = re.search(r"DMA\((\d+) kB\)", output).group(1)
127
128    file_path = f"{OUTPUT_PATH}/hidumper_redirect.txt"
129    with open(file_path, 'r') as file:
130        lines = file.readlines()
131
132    header_skipped = False
133    sum_graph_value = 0
134    for line in lines:
135        if not header_skipped and "Total Memory Usage by PID:" in line:
136            header_skipped = True
137            continue
138        if line.strip() == '':
139            break
140        parts = line.split()
141        if len(parts) > 7:
142            graph_value = parts[13]
143            dma_value = parts[15]
144            if "Dma" in graph_value or "PurgPin" in dma_value:
145                continue
146            if graph_value == dma_value:
147                sum_graph_value += int(graph_value)
148
149    return sum_graph_value == int(dma)
150
151def CheckHidumperPurgSumAndPurgPinOutput(output):
152    purgSum = re.search(r"Total PurgSum:(\d+) kB", output).group(1)
153    purgPin = re.search(r"Total PurgPin:(\d+) kB", output).group(1)
154
155    output = subprocess.check_output(f'hdc shell cat /proc/meminfo', shell=True, text=True, encoding="utf-8")
156    active = re.search(r"Active\(purg\):\s*(\d+) kB", output).group(1)
157    inactive = re.search(r"Inactive\(purg\):\s*(\d+) kB", output).group(1)
158    pined = re.search(r"Pined\(purg\):\s*(\d+) kB", output).group(1)
159
160    return int(purgSum) == int(active) + int(inactive) and int(purgPin) == int(pined)
161
162def CheckHidumperMemoryValueOutput(output):
163    graph_dma_value = CheckHidumperGraphAndDmaOutput(output)
164    purgSum_purgPin_value = CheckHidumperPurgSumAndPurgPinOutput(output)
165
166    return graph_dma_value and purgSum_purgPin_value
167
168# 要并发执行的任务函数
169def ThreadTask(thread_name, delay):
170    print(f"Thread {thread_name} starting")
171    command = "hidumper --mem > /data/log/hidumper/" + thread_name
172    output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8")
173    print(f"Thread {thread_name} finished")
174
175def IsRequestErrorInOutput():
176    flag = 0
177    for i in range(6):
178        command = "cat /data/log/hidumper/Thread-" + str(i)
179        output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8")
180        if "request error" in output:
181            command = "lsof |grep Thread-" + str(i)
182            output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8")
183            print(f"output:{output}\n")
184            assert "Thread-" + str(i) not in output
185            flag = 1
186            break
187    return flag
188
189class TestHidumperMemory:
190    @pytest.mark.L0
191    def test_memory_all(self):
192        command = f"hidumper --mem"
193        hidumperTmpCmd = "OPT:mem SUB_OPT:"
194        # 校验命令行输出
195        CheckCmd(command, CheckHidumperMemoryWithoutPidOutput, hidumperTmpCmd)
196        # 校验命令行重定向输出
197        CheckCmdRedirect(command, CheckHidumperMemoryWithoutPidOutput, None, hidumperTmpCmd)
198        # 校验命令行输出到zip文件
199        CheckCmdZip(command, CheckHidumperMemoryWithoutPidOutput)
200
201    @pytest.mark.L0
202    def test_memory_usage_size(self):
203        command = f"hidumper --mem"
204        hidumperTmpCmd = "OPT:mem SUB_OPT:"
205        # 校验所有进程的内存大小
206        CheckCmd(command, CheckHidumperMemoryUsagebySizeOutput, hidumperTmpCmd)
207        # 校验Total PurgSum/Total PurgPin
208        CheckCmdRedirect(command, CheckHidumperMemoryValueOutput, None, hidumperTmpCmd)
209        # 校验命令行输出
210        CheckCmd(command, CheckHidumperMemoryWithoutPidOutput, hidumperTmpCmd)
211        # 校验命令行重定向输出
212        CheckCmdRedirect(command, CheckHidumperMemoryWithoutPidOutput, None, hidumperTmpCmd)
213        # 校验命令行输出到zip文件
214        CheckCmdZip(command, CheckHidumperMemoryWithoutPidOutput)
215
216    @pytest.mark.L0
217    def test_memory_pid(self):
218        processName = "render_service"
219        pid = GetPidByProcessName(processName)
220        command = f"hidumper --mem {pid}"
221        hidumperTmpCmd = "OPT:mem SUB_OPT:"
222        # 校验命令行输出
223        CheckCmd(command, CheckHidumperMemoryWithPidOutput, hidumperTmpCmd)
224        # 校验命令行重定向输出
225        CheckCmdRedirect(command, CheckHidumperMemoryWithPidOutput, None, hidumperTmpCmd)
226        # 校验命令行输出到zip文件
227        CheckCmdZip(command, CheckHidumperMemoryWithPidOutput)
228
229    @pytest.mark.L3
230    def test_memory_error_pid(self):
231        command = f"hidumper --mem 2147483647;hidumper --mem -2147483647"
232        hidumperTmpCmd = "OPT:mem SUB_OPT:"
233        # 校验命令行输出
234        CheckCmd(command, lambda output : "hidumper: No such process: 2147483647\nhidumper: option pid missed. 2" in output, hidumperTmpCmd)
235        command = f"hidumper --mem 2147483648;hidumper --mem -2147483648"
236        CheckCmd(command, lambda output : "hidumper: option pid missed. 2" in output, hidumperTmpCmd)
237
238    @pytest.mark.L1
239    def test_memory_note(self):
240        if not IsOpenHarmonyVersion():
241            pytest.skip("this testcase is only support in OH")
242        else:
243            # 唤醒屏幕
244            subprocess.check_call("hdc shell power-shell wakeup", shell=True)
245            # 设置屏幕常亮
246            subprocess.check_call("hdc shell power-shell setmode 602", shell=True)
247            time.sleep(3)
248            # 解锁屏幕
249            subprocess.check_call("hdc shell uinput -T -g 100 100 500 500", shell=True)
250            time.sleep(3)
251            # 启动备忘录应用
252            subprocess.check_call("hdc shell aa start -a MainAbility -b com.ohos.note", shell=True)
253            time.sleep(3)
254            output = subprocess.check_output(f"hdc shell pidof com.ohos.note", shell=True, text=True, encoding="utf-8")
255            note_pid = output.strip('\n')
256            assert len(note_pid) != 0
257            # 新建备忘录
258            subprocess.check_call("hdc shell uinput -S -c 620 100", shell=True)
259            time.sleep(3)
260            output = subprocess.check_output(f"hdc shell pidof com.ohos.note:render", shell=True, text=True, encoding="utf-8")
261            note_render_pid = output.strip('\n')
262            assert len(note_render_pid) != 0
263            command = f"hidumper --mem {note_render_pid}"
264            # 校验命令行输出
265            CheckCmd(command, CheckHidumperMemoryWithPidOutput)
266            # 校验命令行重定向输出
267            CheckCmdRedirect(command, CheckHidumperMemoryWithPidOutput)
268            # 校验命令行输出到zip文件
269            CheckCmdZip(command, CheckHidumperMemoryWithPidOutput)
270
271    @pytest.mark.L1
272    def test_memory_mutilthread(self):
273        # 创建线程列表
274        threads = []
275        # 创建线程并启动
276        for i in range(6):
277            t = threading.Thread(target=ThreadTask, args=("Thread-{}".format(i), i))
278            threads.append(t)
279            t.start()
280        # 等待所有线程完成
281        for t in threads:
282            t.join()
283        # 校验超过请求数5的命令对应的fd中包含request error信息
284        assert IsRequestErrorInOutput()
285
286