1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (c) 2022 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# This file is to implement the rom analyzation of standard device. 17# 18 19import argparse 20import copy 21import glob 22import json 23import os 24import re 25import sys 26import subprocess 27import typing 28import xml.dom.minidom as dom 29from pprint import pprint 30 31from pkgs.simple_excel_writer import SimpleExcelWriter 32 33debug = True if sys.gettrace() else False 34 35 36class HDCTool: 37 @classmethod 38 def verify_hdc(cls, verify_str: str = "OpenHarmony") -> bool: 39 """ 40 验证hdc是否可用 41 True:可用 42 False:不可用 43 """ 44 cp = subprocess.run(["hdc"], capture_output=True) 45 stdout = str(cp.stdout) 46 stderr = str(cp.stderr) 47 return verify_str in stdout or verify_str in stderr 48 49 @classmethod 50 def verify_device(cls, device_num: str) -> bool: 51 """ 52 验证设备是否已经连接 53 True:已连接 54 False:未连接 55 """ 56 cp = subprocess.run(["hdc", "list", "targets"], capture_output=True) 57 stdout = str(cp.stdout) 58 stderr = str(cp.stderr) 59 return device_num in stderr or device_num in stdout 60 61 62 @classmethod 63 def exec(cls, args: list, output_from: str = "stdout"): 64 cp = subprocess.run(args, capture_output=True) 65 if output_from == "stdout": 66 return cp.stdout.decode() 67 elif output_from == "stderr": 68 return cp.stderr.decode() 69 else: 70 print("error: 'output_from' must be stdout or stdin") 71 72 73def delete_values_from_dict(target_dict: typing.Dict, key_list: typing.Iterable): 74 for k in key_list: 75 if k not in target_dict.keys(): 76 continue 77 del target_dict[k] 78 79 80class RamAnalyzer: 81 @classmethod 82 def __hidumper_mem_line_process(cls, content: typing.Text) -> typing.List[typing.Text]: 83 """ 84 将hidumper的拥有的数据行进行分割,得到 85 [pid, name, pss, vss, rss, uss]格式的list 86 """ 87 trival_pattern = re.compile(r"kB|\(.*\)(?#去除单位kB以及小括号内的任意数据,包括小括号)") 88 content = re.sub(trival_pattern, "", content) 89 blank_pattern = re.compile(r"\s+(?#匹配一个或多个空格)") 90 return re.sub(blank_pattern, ' ', content.strip()).split() 91 92 __ss_dict: typing.Dict[str, int] = { 93 "Pss": 2, 94 "Vss": 3, 95 "Rss": 4, 96 "Uss": 5 97 } 98 99 @classmethod 100 def __parse_hidumper_mem(cls, content: typing.Text, device_num: str, ss: str = "Pss") -> typing.Dict[ 101 typing.Text, int]: 102 """ 103 解析:hidumper --meme的结果 104 返回{process_name: pss}形式的字典 105 '248 samgr 1464(0 in SwapPss) kB 15064 kB 6928 kB 1072 kB\r' 106 """ 107 108 def find_full_process_name(hname: str) -> str: 109 for lname in __process_name_list: 110 if lname.startswith(hname): 111 return lname 112 113 def process_ps_ef(content: str) -> list: 114 line_list = content.strip().split("\n")[1:] 115 process_name_list = list() 116 for line in line_list: 117 process_name = line.split()[7] 118 if process_name.startswith('['): 119 # 内核进程 120 continue 121 process_name_list.append(process_name) 122 return process_name_list 123 124 if ss not in cls.__ss_dict.keys(): 125 print("error: {} is not a valid parameter".format(ss)) 126 return dict() 127 output = content.split('\n') 128 process_pss_dict = dict() 129 __process_name_list: typing.List[str] = process_ps_ef( 130 HDCTool.exec(["hdc", "-t", device_num, "shell", "ps", "-ef"])) 131 for line in output: 132 if "Total Memory Usage by Size" in line: 133 break 134 if line.isspace(): 135 continue 136 processed: typing.List[typing.Text] = cls.__hidumper_mem_line_process(line) 137 if not processed or not processed[0].isnumeric(): # 如果第一列不是数字(pid),就过 138 continue 139 name = processed[1] # 否则的话就取名字,和对应的size 140 size = int(processed[cls.__ss_dict.get(ss)]) 141 process_pss_dict[find_full_process_name(name)] = size 142 return process_pss_dict 143 144 @classmethod 145 def process_hidumper_info(cls, device_num: str, ss:str) -> typing.Dict[str, int]: 146 """ 147 处理进程名与对应进程大小 148 """ 149 150 def exec_once() -> typing.Dict[str, int]: 151 stdout = HDCTool.exec(["hdc", "-t", device_num, "shell", "hidumper", "--mem"]) 152 name_size_dict = cls.__parse_hidumper_mem(stdout, device_num, ss) 153 return name_size_dict 154 155 if not HDCTool.verify_hdc(): 156 print("error: Command 'hdc' not found") 157 return dict() 158 if not HDCTool.verify_device(device_num): 159 print("error: {} is inaccessible or not found".format(device_num)) 160 return dict() 161 162 return exec_once() 163 164 @classmethod 165 def __parse_process_xml(cls, file_path: str, result_dict: typing.Dict[str, typing.List[str]]): 166 """ 167 解析xml文件,结存存入 result_dict中,格式:{process_name: os_list} 168 其中,so_list中是so的base_name 169 """ 170 if not (os.path.isfile(file_path) and file_path.endswith(".xml")): 171 print("warning: {} not exist or not a xml file".format(file_path)) 172 return 173 doc = dom.parse(file_path) 174 info = doc.getElementsByTagName("info")[0] 175 process = info.getElementsByTagName("process")[0] 176 process_name = process.childNodes[0].data 177 result_dict[process_name] = list() 178 libs = info.getElementsByTagName("loadlibs")[0].getElementsByTagName("libpath") 179 for lib in libs: 180 so = lib.childNodes[0].data 181 result_dict.get(process_name).append(os.path.split(so)[-1]) 182 if debug: 183 print(process_name, " ", so) 184 185 @classmethod 186 def get_elf_info_from_rom_result(cls, rom_result_json: str) -> typing.Dict[str, typing.Dict[str, str]]: 187 """ 188 利用rom_analyzer.py的分析结果,重组成 189 {file_base_name: {"subsystem_name":subsystem_name, "component_name":component_name}} 190 的形式 191 """ 192 with open(rom_result_json, 'r', encoding='utf-8') as f: 193 rom_info_dict = json.load(f) 194 elf_info_dict: typing.Dict[str, typing.Dict[str, str]] = dict() 195 for subsystem_name in rom_info_dict.keys(): 196 sub_val_dict: typing.Dict[str, typing.Any] = rom_info_dict.get(subsystem_name) 197 delete_values_from_dict(sub_val_dict, ["size", "file_count"]) 198 for component_name in sub_val_dict.keys(): 199 component_val_dict: typing.Dict[str, str] = sub_val_dict.get(component_name) 200 delete_values_from_dict(component_val_dict, ["size", "file_count"]) 201 for file_name, size in component_val_dict.items(): 202 file_basename: str = os.path.split(file_name)[-1] 203 elf_info_dict[file_basename] = { 204 "subsystem_name": subsystem_name, 205 "component_name": component_name, 206 "size": size 207 } 208 return elf_info_dict 209 210 @classmethod 211 def __parse_process_cfg(cls, cfg_path: str, profile_path: str, result_dict: dict): 212 """ 213 解析cfg,因为有的cfg会拉起xml中的进程,所以也可能会去解析xml 214 """ 215 with open(cfg_path, 'r', encoding='utf-8') as f: 216 cfg_dict = json.loads(f.read()) 217 services = cfg_dict.get("services") 218 if services is None: 219 print("warning: 'services' not in {}".format(cfg_path)) 220 return 221 for service in services: 222 process_name = service.get("name") 223 first, *path_list = service.get("path") 224 if first.endswith("sa_main"): 225 # 由sa_main去来起进程 226 xml_base_name = os.path.split(path_list[0])[-1] 227 cls.__parse_process_xml(os.path.join(profile_path, xml_base_name), result_dict) 228 else: 229 # 直接执行 230 if result_dict.get(process_name) is None: 231 result_dict[process_name] = list() 232 result_dict.get(process_name).append(os.path.split(first)[-1]) 233 234 @classmethod 235 def get_process_so_relationship(cls, xml_path: str, cfg_path: str, profile_path: str) -> typing.Dict[ 236 str, typing.List[str]]: 237 """ 238 从out/{product_name}/packages/phone/sa_profile/merged_sa查找xml文件并处理得到进程与so的对应关系 239 """ 240 # 从merged_sa里面收集 241 xml_list = glob.glob(xml_path + os.sep + "*[.]xml", recursive=True) 242 process_elf_dict: typing.Dict[str, typing.List[str]] = dict() 243 for xml in xml_list: 244 if debug: 245 print("parsing: ", xml) 246 try: 247 cls.__parse_process_xml(xml, process_elf_dict) 248 except: 249 print("parse '{}' failed".format(xml)) 250 finally: 251 ... 252 # 从system/etc/init/*.cfg中收集,如果是sa_main拉起的,则从system/profile/*.xml中进行解析 253 cfg_list = glob.glob(cfg_path + os.sep + "*[.]cfg", recursive=True) 254 for cfg in cfg_list: 255 if debug: 256 print("parsing: ", cfg) 257 try: 258 cls.__parse_process_cfg(cfg, profile_path, process_elf_dict) 259 except: 260 print("parse '{}' failed".format(cfg)) 261 finally: 262 ... 263 return process_elf_dict 264 265 @classmethod 266 def __save_result_as_excel(cls, data_dict: dict, filename: str, ss: str): 267 """ 268 保存结果到excel中 269 进程名:{ 270 "size": xxx, 271 子系统名:{ 272 部件名:{ 273 二进制文件: xxx, 274 ... 275 } 276 } 277 } 278 """ 279 tmp_dict = copy.deepcopy(data_dict) 280 writer = SimpleExcelWriter("ram_info") 281 writer.set_sheet_header( 282 ["process_name", "process_size({}, KB)".format(ss), "subsystem_name","component_name", "elf_name", "elf_size(KB)"]) 283 process_start_r = 1 284 process_end_r = 0 285 process_c = 0 286 subsystem_c = 2 287 subsystem_start_r = 1 288 subsystem_end_r = 0 289 process_size_c = 1 290 component_start_r = 1 291 component_end_r = 0 292 component_c = 3 293 for process_name in tmp_dict.keys(): 294 process_val_dict: typing.Dict[str, typing.Dict[str, int]] = tmp_dict.get(process_name) 295 process_size = process_val_dict.get("size") 296 delete_values_from_dict(process_val_dict, ["size"]) 297 for subsystem_name, subsystem_val_dict in process_val_dict.items(): # 遍历subsystem 298 for component_name, component_val_dict in subsystem_val_dict.items(): # 遍历component 299 elf_count_of_component = len(component_val_dict) 300 for elf_name, size in component_val_dict.items(): # 遍里elf 301 writer.append_line([process_name, process_size, subsystem_name, component_name, elf_name, "%.2f" % (size / 1024)]) 302 component_end_r += elf_count_of_component 303 subsystem_end_r += elf_count_of_component 304 # 重写component 305 writer.write_merge(component_start_r, component_c, component_end_r, 306 component_c, component_name) 307 component_start_r = component_end_r + 1 308 process_end_r += elf_count_of_component 309 writer.write_merge(subsystem_start_r, subsystem_c, subsystem_end_r, subsystem_c, subsystem_name) 310 subsystem_start_r = subsystem_end_r+1 311 writer.write_merge(process_start_r, process_c, process_end_r, process_c, process_name) 312 writer.write_merge(process_start_r, process_size_c, process_end_r, process_size_c, process_size) 313 process_start_r = process_end_r + 1 314 writer.save(filename) 315 316 @classmethod 317 def find_elf_size_from_rom_result(cls, service_name: str, subsystem_name: str, component_name: str, 318 evaluator: typing.Callable, rom_result_dict: typing.Dict[str, typing.Dict]) -> \ 319 typing.Tuple[ 320 bool, str, str, int]: 321 """ 322 全局查找进程的相关elf文件 323 subsystem_name与component_name可明确指定,或为*以遍历整个dict 324 evaluator:评估elf文件的从phone下面开始的路径与service_name的关系,评判如何才是找到了 325 returns: 是否查找到,elf文件名,部件名,size 326 """ 327 subsystem_name_list = [subsystem_name] if subsystem_name != "*" else rom_result_dict.keys() 328 for sn in subsystem_name_list: 329 sub_val_dict = rom_result_dict.get(sn) 330 component_name_list = [component_name] if component_name != '*' else sub_val_dict.keys() 331 for cn in component_name_list: 332 if cn == "size" or cn == "file_count": 333 continue 334 component_val_dict: typing.Dict[str, int] = sub_val_dict.get(cn) 335 for k, v in component_val_dict.items(): 336 if k == "size" or k == "file_count": 337 continue 338 if not evaluator(service_name, k): 339 continue 340 return True, os.path.split(k)[-1],sn, cn, v 341 return False, str(), str(), str(), int() 342 343 @classmethod 344 def analysis(cls, cfg_path: str, xml_path: str, rom_result_json: str, device_num: str, 345 output_file: str, ss: str, output_excel: bool): 346 """ 347 process size subsystem/component so so_size 348 """ 349 if not HDCTool.verify_hdc(): 350 print("error: Command 'hdc' not found") 351 return 352 if not HDCTool.verify_device(device_num): 353 print("error: {} is inaccessible or not found".format(device_num)) 354 return 355 with open(rom_result_json, 'r', encoding='utf-8') as f: 356 rom_result_dict: typing.Dict = json.loads(f.read()) 357 # 从rom的分析结果中将需要的elf信息重组 358 so_info_dict: typing.Dict[ 359 str, typing.Dict[str["component_name|subsystem_name|size"], str]] = cls.get_elf_info_from_rom_result( 360 rom_result_json) 361 process_elf_dict: typing.Dict[str, typing.List[str]] = cls.get_process_so_relationship(xml_path, cfg_path, 362 profile_path) 363 process_size_dict: typing.Dict[str, int] = cls.process_hidumper_info(device_num, ss) 364 result_dict: typing.Dict[str, typing.Dict[str, typing.Any]] = dict() 365 366 def get(key: typing.Any, dt: typing.Dict[str, typing.Any]): 367 for k, v in dt.items(): 368 if k.startswith(key) or (len(v) > 0 and key == v[0]): 369 # 要么uinput_inject的对应key为mmi_uinput_inject。对于此类特殊处理,即:如果service_name找不到,但是直接执行的bin等于这个名字,也认为找到 370 return v 371 372 for process_name, process_size in process_size_dict.items(): # 从进程出发 373 # 如果部件是init,特殊处理 374 if process_name == "init": 375 _, elf,_, _, size = cls.find_elf_size_from_rom_result(process_name, "startup", "init", 376 lambda x, y: os.path.split(y)[ 377 -1].lower() == x.lower(), 378 rom_result_dict) 379 result_dict[process_name] = dict() 380 result_dict[process_name]["size"] = process_size 381 result_dict[process_name]["startup"] = dict() 382 result_dict[process_name]["startup"]["init"] = dict() 383 result_dict[process_name]["startup"]["init"][elf if len(elf) != 0 else "UNKNOWN"] = size 384 continue 385 # 如果是hap,特殊处理 386 if (process_name.startswith("com.") or process_name.startswith("ohos.")): 387 _, hap_name, subsystem_name, component_name, size = cls.find_elf_size_from_rom_result(process_name, "*", "*", 388 lambda x, y: len( 389 y.split( 390 '/')) >= 3 and x.lower().startswith( 391 y.split('/')[2].lower()), 392 rom_result_dict) 393 result_dict[process_name] = dict() 394 result_dict[process_name]["size"] = process_size 395 result_dict[process_name][subsystem_name] = dict() 396 result_dict[process_name][subsystem_name][component_name] = dict() 397 result_dict[process_name][subsystem_name][component_name][hap_name if len(hap_name) != 0 else "UNKNOWN"] = size 398 continue 399 so_list: list = get(process_name, process_elf_dict) # 得到进程相关的elf文件list 400 if so_list is None: 401 print("warning: process '{}' not found in .xml or .cfg".format(process_name)) 402 result_dict[process_name] = dict() 403 result_dict[process_name]["size"] = process_size 404 result_dict[process_name]["UNKNOWN"] = dict() 405 result_dict[process_name]["UNKNOWN"]["UNKNOWN"] = dict() 406 result_dict[process_name]["UNKNOWN"]["UNKNOWN"]["UNKNOWN"] = int() 407 continue 408 result_dict[process_name] = dict() 409 result_dict[process_name]["size"] = process_size 410 for so in so_list: 411 unit = so_info_dict.get(so) 412 if unit is None: 413 result_dict[process_name]["UNKNOWN"] = dict() 414 result_dict[process_name]["UNKNOWN"]["UNKNOWN"] = dict() 415 result_dict[process_name]["UNKNOWN"]["UNKNOWN"][so] = int() 416 print("warning: '{}' in {} not found in json from rom analysis result".format(so, process_name)) 417 continue 418 component_name = unit.get("component_name") 419 subsystem_name = unit.get("subsystem_name") 420 so_size = unit.get("size") 421 if result_dict.get(process_name).get(subsystem_name) is None: 422 result_dict[process_name][subsystem_name] = dict() 423 if result_dict.get(process_name).get(subsystem_name).get(component_name) is None: 424 result_dict[process_name][subsystem_name][component_name] = dict() 425 result_dict[process_name][subsystem_name][component_name][so] = so_size 426 base_dir, _ = os.path.split(output_file) 427 if len(base_dir) != 0 and not os.path.isdir(base_dir): 428 os.makedirs(base_dir, exist_ok=True) 429 with open(output_file + ".json", 'w', encoding='utf-8') as f: 430 f.write(json.dumps(result_dict, indent=4)) 431 if output_excel: 432 cls.__save_result_as_excel(result_dict, output_file + ".xls", ss) 433 434 435def get_args(): 436 VERSION = 1.0 437 parser = argparse.ArgumentParser( 438 description="analyze ram size of component" 439 ) 440 parser.add_argument("-v", "-version", action="version", 441 version=f"version {VERSION}") 442 parser.add_argument("-x", "--xml_path", type=str, required=True, 443 help="path of xml file. eg: -x ~/openharmony/out/rk3568/packages/phone/system/profile") 444 parser.add_argument("-c", "--cfg_path", type=str, required=True, 445 help="path of cfg files. eg: -c ./cfgs/") 446 parser.add_argument("-j", "--rom_result", type=str, default="./rom_analysis_result.json", 447 help="json file produced by rom_analyzer_v1.0.py, default: ./rom_analysis_result.json." 448 "eg: -j ./demo/rom_analysis_result.json") 449 parser.add_argument("-n", "--device_num", type=str, required=True, 450 help="device number to be collect hidumper info. eg: -n 7001005458323933328a01fce16d3800") 451 parser.add_argument("-o", "--output_filename", default="ram_analysis_result", type=str, 452 help="base name of output file, default: ram_analysis_result. eg: -o ram_analysis_result") 453 parser.add_argument("-e", "--excel", type=bool, default=False, 454 help="if output result as excel, default: False. eg: -e True") 455 args = parser.parse_args() 456 return args 457 458 459if __name__ == '__main__': 460 args = get_args() 461 cfg_path = args.cfg_path 462 profile_path = args.xml_path 463 rom_result = args.rom_result 464 device_num = args.device_num 465 output_filename = args.output_filename 466 output_excel = args.excel 467 RamAnalyzer.analysis(cfg_path, profile_path, rom_result, 468 device_num=device_num, output_file=output_filename, ss="Pss", output_excel=output_excel) 469