• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# This file is to implement the rom analyzation of standard device.
17#
18
19import argparse
20import copy
21import glob
22import json
23import os
24import re
25import sys
26import subprocess
27import typing
28import xml.dom.minidom as dom
29from pprint import pprint
30
31from pkgs.simple_excel_writer import SimpleExcelWriter
32
33debug = True if sys.gettrace() else False
34
35
36class HDCTool:
37    @classmethod
38    def verify_hdc(cls, verify_str: str = "OpenHarmony") -> bool:
39        """
40        验证hdc是否可用
41        True:可用
42        False:不可用
43        """
44        cp = subprocess.run(["hdc"], capture_output=True)
45        stdout = str(cp.stdout)
46        stderr = str(cp.stderr)
47        return verify_str in stdout or verify_str in stderr
48
49    @classmethod
50    def verify_device(cls, device_num: str) -> bool:
51        """
52        验证设备是否已经连接
53        True:已连接
54        False:未连接
55        """
56        cp = subprocess.run(["hdc", "list", "targets"], capture_output=True)
57        stdout = str(cp.stdout)
58        stderr = str(cp.stderr)
59        return device_num in stderr or device_num in stdout
60
61
62    @classmethod
63    def exec(cls, args: list, output_from: str = "stdout"):
64        cp = subprocess.run(args, capture_output=True)
65        if output_from == "stdout":
66            return cp.stdout.decode()
67        elif output_from == "stderr":
68            return cp.stderr.decode()
69        else:
70            print("error: 'output_from' must be stdout or stdin")
71
72
73def delete_values_from_dict(target_dict: typing.Dict, key_list: typing.Iterable):
74    for k in key_list:
75        if k not in target_dict.keys():
76            continue
77        del target_dict[k]
78
79
80class RamAnalyzer:
81    @classmethod
82    def __hidumper_mem_line_process(cls, content: typing.Text) -> typing.List[typing.Text]:
83        """
84        将hidumper的拥有的数据行进行分割,得到
85        [pid, name, pss, vss, rss, uss]格式的list
86        """
87        trival_pattern = re.compile(r"kB|\(.*\)(?#去除单位kB以及小括号内的任意数据,包括小括号)")
88        content = re.sub(trival_pattern, "", content)
89        blank_pattern = re.compile(r"\s+(?#匹配一个或多个空格)")
90        return re.sub(blank_pattern, ' ', content.strip()).split()
91
92    __ss_dict: typing.Dict[str, int] = {
93        "Pss": 2,
94        "Vss": 3,
95        "Rss": 4,
96        "Uss": 5
97    }
98
99    @classmethod
100    def __parse_hidumper_mem(cls, content: typing.Text, device_num: str, ss: str = "Pss") -> typing.Dict[
101        typing.Text, int]:
102        """
103        解析:hidumper --meme的结果
104        返回{process_name: pss}形式的字典
105        '248  	samgr              1464(0 in SwapPss) kB    15064 kB     6928 kB     1072 kB\r'
106        """
107
108        def find_full_process_name(hname: str) -> str:
109            for lname in __process_name_list:
110                if lname.startswith(hname):
111                    return lname
112
113        def process_ps_ef(content: str) -> list:
114            line_list = content.strip().split("\n")[1:]
115            process_name_list = list()
116            for line in line_list:
117                process_name = line.split()[7]
118                if process_name.startswith('['):
119                    # 内核进程
120                    continue
121                process_name_list.append(process_name)
122            return process_name_list
123
124        if ss not in cls.__ss_dict.keys():
125            print("error: {} is not a valid parameter".format(ss))
126            return dict()
127        output = content.split('\n')
128        process_pss_dict = dict()
129        __process_name_list: typing.List[str] = process_ps_ef(
130            HDCTool.exec(["hdc", "-t", device_num, "shell", "ps", "-ef"]))
131        for line in output:
132            if "Total Memory Usage by Size" in line:
133                break
134            if line.isspace():
135                continue
136            processed: typing.List[typing.Text] = cls.__hidumper_mem_line_process(line)
137            if not processed or not processed[0].isnumeric():  # 如果第一列不是数字(pid),就过
138                continue
139            name = processed[1]  # 否则的话就取名字,和对应的size
140            size = int(processed[cls.__ss_dict.get(ss)])
141            process_pss_dict[find_full_process_name(name)] = size
142        return process_pss_dict
143
144    @classmethod
145    def process_hidumper_info(cls, device_num: str, ss:str) -> typing.Dict[str, int]:
146        """
147        处理进程名与对应进程大小
148        """
149
150        def exec_once() -> typing.Dict[str, int]:
151            stdout = HDCTool.exec(["hdc", "-t", device_num, "shell", "hidumper", "--mem"])
152            name_size_dict = cls.__parse_hidumper_mem(stdout, device_num, ss)
153            return name_size_dict
154
155        if not HDCTool.verify_hdc():
156            print("error: Command 'hdc' not found")
157            return dict()
158        if not HDCTool.verify_device(device_num):
159            print("error: {} is inaccessible or not found".format(device_num))
160            return dict()
161
162        return exec_once()
163
164    @classmethod
165    def __parse_process_xml(cls, file_path: str, result_dict: typing.Dict[str, typing.List[str]]):
166        """
167        解析xml文件,结存存入 result_dict中,格式:{process_name: os_list}
168        其中,so_list中是so的base_name
169        """
170        if not (os.path.isfile(file_path) and file_path.endswith(".xml")):
171            print("warning: {} not exist or not a xml file".format(file_path))
172            return
173        doc = dom.parse(file_path)
174        info = doc.getElementsByTagName("info")[0]
175        process = info.getElementsByTagName("process")[0]
176        process_name = process.childNodes[0].data
177        result_dict[process_name] = list()
178        libs = info.getElementsByTagName("loadlibs")[0].getElementsByTagName("libpath")
179        for lib in libs:
180            so = lib.childNodes[0].data
181            result_dict.get(process_name).append(os.path.split(so)[-1])
182            if debug:
183                print(process_name, " ", so)
184
185    @classmethod
186    def get_elf_info_from_rom_result(cls, rom_result_json: str) -> typing.Dict[str, typing.Dict[str, str]]:
187        """
188        利用rom_analyzer.py的分析结果,重组成
189        {file_base_name: {"subsystem_name":subsystem_name, "component_name":component_name}}
190        的形式
191        """
192        with open(rom_result_json, 'r', encoding='utf-8') as f:
193            rom_info_dict = json.load(f)
194        elf_info_dict: typing.Dict[str, typing.Dict[str, str]] = dict()
195        for subsystem_name in rom_info_dict.keys():
196            sub_val_dict: typing.Dict[str, typing.Any] = rom_info_dict.get(subsystem_name)
197            delete_values_from_dict(sub_val_dict, ["size", "file_count"])
198            for component_name in sub_val_dict.keys():
199                component_val_dict: typing.Dict[str, str] = sub_val_dict.get(component_name)
200                delete_values_from_dict(component_val_dict, ["size", "file_count"])
201                for file_name, size in component_val_dict.items():
202                    file_basename: str = os.path.split(file_name)[-1]
203                    elf_info_dict[file_basename] = {
204                        "subsystem_name": subsystem_name,
205                        "component_name": component_name,
206                        "size": size
207                    }
208        return elf_info_dict
209
210    @classmethod
211    def __parse_process_cfg(cls, cfg_path: str, profile_path: str, result_dict: dict):
212        """
213        解析cfg,因为有的cfg会拉起xml中的进程,所以也可能会去解析xml
214        """
215        with open(cfg_path, 'r', encoding='utf-8') as f:
216            cfg_dict = json.loads(f.read())
217        services = cfg_dict.get("services")
218        if services is None:
219            print("warning: 'services' not in {}".format(cfg_path))
220            return
221        for service in services:
222            process_name = service.get("name")
223            first, *path_list = service.get("path")
224            if first.endswith("sa_main"):
225                # 由sa_main去来起进程
226                xml_base_name = os.path.split(path_list[0])[-1]
227                cls.__parse_process_xml(os.path.join(profile_path, xml_base_name), result_dict)
228            else:
229                # 直接执行
230                if result_dict.get(process_name) is None:
231                    result_dict[process_name] = list()
232                result_dict.get(process_name).append(os.path.split(first)[-1])
233
234    @classmethod
235    def get_process_so_relationship(cls, xml_path: str, cfg_path: str, profile_path: str) -> typing.Dict[
236        str, typing.List[str]]:
237        """
238        从out/{product_name}/packages/phone/sa_profile/merged_sa查找xml文件并处理得到进程与so的对应关系
239        """
240        # 从merged_sa里面收集
241        xml_list = glob.glob(xml_path + os.sep + "*[.]xml", recursive=True)
242        process_elf_dict: typing.Dict[str, typing.List[str]] = dict()
243        for xml in xml_list:
244            if debug:
245                print("parsing: ", xml)
246            try:
247                cls.__parse_process_xml(xml, process_elf_dict)
248            except:
249                print("parse '{}' failed".format(xml))
250            finally:
251                ...
252        # 从system/etc/init/*.cfg中收集,如果是sa_main拉起的,则从system/profile/*.xml中进行解析
253        cfg_list = glob.glob(cfg_path + os.sep + "*[.]cfg", recursive=True)
254        for cfg in cfg_list:
255            if debug:
256                print("parsing: ", cfg)
257            try:
258                cls.__parse_process_cfg(cfg, profile_path, process_elf_dict)
259            except:
260                print("parse '{}' failed".format(cfg))
261            finally:
262                ...
263        return process_elf_dict
264
265    @classmethod
266    def __save_result_as_excel(cls, data_dict: dict, filename: str, ss: str):
267        """
268        保存结果到excel中
269        进程名:{
270            "size": xxx,
271            子系统名:{
272                部件名:{
273                    二进制文件: xxx,
274                    ...
275                }
276            }
277        }
278        """
279        tmp_dict = copy.deepcopy(data_dict)
280        writer = SimpleExcelWriter("ram_info")
281        writer.set_sheet_header(
282            ["process_name", "process_size({}, KB)".format(ss), "subsystem_name","component_name", "elf_name", "elf_size(KB)"])
283        process_start_r = 1
284        process_end_r = 0
285        process_c = 0
286        subsystem_c = 2
287        subsystem_start_r = 1
288        subsystem_end_r = 0
289        process_size_c = 1
290        component_start_r = 1
291        component_end_r = 0
292        component_c = 3
293        for process_name in tmp_dict.keys():
294            process_val_dict: typing.Dict[str, typing.Dict[str, int]] = tmp_dict.get(process_name)
295            process_size = process_val_dict.get("size")
296            delete_values_from_dict(process_val_dict, ["size"])
297            for subsystem_name, subsystem_val_dict in process_val_dict.items(): # 遍历subsystem
298                for component_name, component_val_dict in subsystem_val_dict.items():   # 遍历component
299                    elf_count_of_component = len(component_val_dict)
300                    for elf_name, size in component_val_dict.items():   # 遍里elf
301                        writer.append_line([process_name, process_size, subsystem_name, component_name, elf_name, "%.2f" % (size / 1024)])
302                    component_end_r += elf_count_of_component
303                    subsystem_end_r += elf_count_of_component
304                    # 重写component
305                    writer.write_merge(component_start_r, component_c, component_end_r,
306                                    component_c, component_name)
307                    component_start_r = component_end_r + 1
308                    process_end_r += elf_count_of_component
309                writer.write_merge(subsystem_start_r, subsystem_c, subsystem_end_r, subsystem_c, subsystem_name)
310                subsystem_start_r = subsystem_end_r+1
311            writer.write_merge(process_start_r, process_c, process_end_r, process_c, process_name)
312            writer.write_merge(process_start_r, process_size_c, process_end_r, process_size_c, process_size)
313            process_start_r = process_end_r + 1
314        writer.save(filename)
315
316    @classmethod
317    def find_elf_size_from_rom_result(cls, service_name: str, subsystem_name: str, component_name: str,
318                                      evaluator: typing.Callable, rom_result_dict: typing.Dict[str, typing.Dict]) -> \
319            typing.Tuple[
320                bool, str, str, int]:
321        """
322        全局查找进程的相关elf文件
323        subsystem_name与component_name可明确指定,或为*以遍历整个dict
324        evaluator:评估elf文件的从phone下面开始的路径与service_name的关系,评判如何才是找到了
325        returns: 是否查找到,elf文件名,部件名,size
326        """
327        subsystem_name_list = [subsystem_name] if subsystem_name != "*" else rom_result_dict.keys()
328        for sn in subsystem_name_list:
329            sub_val_dict = rom_result_dict.get(sn)
330            component_name_list = [component_name] if component_name != '*' else sub_val_dict.keys()
331            for cn in component_name_list:
332                if cn == "size" or cn == "file_count":
333                    continue
334                component_val_dict: typing.Dict[str, int] = sub_val_dict.get(cn)
335                for k, v in component_val_dict.items():
336                    if k == "size" or k == "file_count":
337                        continue
338                    if not evaluator(service_name, k):
339                        continue
340                    return True, os.path.split(k)[-1],sn, cn, v
341        return False, str(), str(), str(), int()
342
343    @classmethod
344    def analysis(cls, cfg_path: str, xml_path: str, rom_result_json: str, device_num: str,
345                 output_file: str, ss: str, output_excel: bool):
346        """
347        process size subsystem/component so so_size
348        """
349        if not HDCTool.verify_hdc():
350            print("error: Command 'hdc' not found")
351            return
352        if not HDCTool.verify_device(device_num):
353            print("error: {} is inaccessible or not found".format(device_num))
354            return
355        with open(rom_result_json, 'r', encoding='utf-8') as f:
356            rom_result_dict: typing.Dict = json.loads(f.read())
357        # 从rom的分析结果中将需要的elf信息重组
358        so_info_dict: typing.Dict[
359            str, typing.Dict[str["component_name|subsystem_name|size"], str]] = cls.get_elf_info_from_rom_result(
360            rom_result_json)
361        process_elf_dict: typing.Dict[str, typing.List[str]] = cls.get_process_so_relationship(xml_path, cfg_path,
362                                                                                               profile_path)
363        process_size_dict: typing.Dict[str, int] = cls.process_hidumper_info(device_num, ss)
364        result_dict: typing.Dict[str, typing.Dict[str, typing.Any]] = dict()
365
366        def get(key: typing.Any, dt: typing.Dict[str, typing.Any]):
367            for k, v in dt.items():
368                if k.startswith(key) or (len(v) > 0 and key == v[0]):
369                    # 要么uinput_inject的对应key为mmi_uinput_inject。对于此类特殊处理,即:如果service_name找不到,但是直接执行的bin等于这个名字,也认为找到
370                    return v
371
372        for process_name, process_size in process_size_dict.items():  # 从进程出发
373            # 如果部件是init,特殊处理
374            if process_name == "init":
375                _, elf,_, _, size = cls.find_elf_size_from_rom_result(process_name, "startup", "init",
376                                                                    lambda x, y: os.path.split(y)[
377                                                                                     -1].lower() == x.lower(),
378                                                                    rom_result_dict)
379                result_dict[process_name] = dict()
380                result_dict[process_name]["size"] = process_size
381                result_dict[process_name]["startup"] = dict()
382                result_dict[process_name]["startup"]["init"] = dict()
383                result_dict[process_name]["startup"]["init"][elf if len(elf) != 0 else "UNKNOWN"] = size
384                continue
385            # 如果是hap,特殊处理
386            if (process_name.startswith("com.") or process_name.startswith("ohos.")):
387                _, hap_name, subsystem_name, component_name, size = cls.find_elf_size_from_rom_result(process_name, "*", "*",
388                                                                                      lambda x, y: len(
389                                                                                          y.split(
390                                                                                              '/')) >= 3 and x.lower().startswith(
391                                                                                          y.split('/')[2].lower()),
392                                                                                      rom_result_dict)
393                result_dict[process_name] = dict()
394                result_dict[process_name]["size"] = process_size
395                result_dict[process_name][subsystem_name] = dict()
396                result_dict[process_name][subsystem_name][component_name] = dict()
397                result_dict[process_name][subsystem_name][component_name][hap_name if len(hap_name) != 0 else "UNKNOWN"] = size
398                continue
399            so_list: list = get(process_name, process_elf_dict)  # 得到进程相关的elf文件list
400            if so_list is None:
401                print("warning: process '{}' not found in .xml or .cfg".format(process_name))
402                result_dict[process_name] = dict()
403                result_dict[process_name]["size"] = process_size
404                result_dict[process_name]["UNKNOWN"] = dict()
405                result_dict[process_name]["UNKNOWN"]["UNKNOWN"] = dict()
406                result_dict[process_name]["UNKNOWN"]["UNKNOWN"]["UNKNOWN"] = int()
407                continue
408            result_dict[process_name] = dict()
409            result_dict[process_name]["size"] = process_size
410            for so in so_list:
411                unit = so_info_dict.get(so)
412                if unit is None:
413                    result_dict[process_name]["UNKNOWN"] = dict()
414                    result_dict[process_name]["UNKNOWN"]["UNKNOWN"] = dict()
415                    result_dict[process_name]["UNKNOWN"]["UNKNOWN"][so] = int()
416                    print("warning: '{}' in {} not found in json from rom analysis result".format(so, process_name))
417                    continue
418                component_name = unit.get("component_name")
419                subsystem_name = unit.get("subsystem_name")
420                so_size = unit.get("size")
421                if result_dict.get(process_name).get(subsystem_name) is None:
422                    result_dict[process_name][subsystem_name] = dict()
423                if result_dict.get(process_name).get(subsystem_name).get(component_name) is None:
424                    result_dict[process_name][subsystem_name][component_name] = dict()
425                result_dict[process_name][subsystem_name][component_name][so] = so_size
426        base_dir, _ = os.path.split(output_file)
427        if len(base_dir) != 0 and not os.path.isdir(base_dir):
428            os.makedirs(base_dir, exist_ok=True)
429        with open(output_file + ".json", 'w', encoding='utf-8') as f:
430            f.write(json.dumps(result_dict, indent=4))
431        if output_excel:
432            cls.__save_result_as_excel(result_dict, output_file + ".xls", ss)
433
434
435def get_args():
436    VERSION = 1.0
437    parser = argparse.ArgumentParser(
438        description="analyze ram size of component"
439    )
440    parser.add_argument("-v", "-version", action="version",
441                        version=f"version {VERSION}")
442    parser.add_argument("-x", "--xml_path", type=str, required=True,
443                        help="path of xml file. eg: -x ~/openharmony/out/rk3568/packages/phone/system/profile")
444    parser.add_argument("-c", "--cfg_path", type=str, required=True,
445                        help="path of cfg files. eg: -c ./cfgs/")
446    parser.add_argument("-j", "--rom_result", type=str, default="./rom_analysis_result.json",
447                        help="json file produced by rom_analyzer_v1.0.py, default: ./rom_analysis_result.json."
448                             "eg: -j ./demo/rom_analysis_result.json")
449    parser.add_argument("-n", "--device_num", type=str, required=True,
450                        help="device number to be collect hidumper info. eg: -n 7001005458323933328a01fce16d3800")
451    parser.add_argument("-o", "--output_filename", default="ram_analysis_result", type=str,
452                        help="base name of output file, default: ram_analysis_result. eg: -o ram_analysis_result")
453    parser.add_argument("-e", "--excel", type=bool, default=False,
454                        help="if output result as excel, default: False. eg: -e True")
455    args = parser.parse_args()
456    return args
457
458
459if __name__ == '__main__':
460    args = get_args()
461    cfg_path = args.cfg_path
462    profile_path = args.xml_path
463    rom_result = args.rom_result
464    device_num = args.device_num
465    output_filename = args.output_filename
466    output_excel = args.excel
467    RamAnalyzer.analysis(cfg_path, profile_path, rom_result,
468                         device_num=device_num, output_file=output_filename, ss="Pss", output_excel=output_excel)
469