• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# This file is to implement the rom analyzation of standard device.
17#
18
19import argparse
20import copy
21import glob
22import json
23import os
24import re
25import sys
26import subprocess
27import typing
28import xml.dom.minidom as dom
29from typing import Dict
30from pprint import pprint
31from pkgs.basic_tool import unit_adaptive
32
33from pkgs.simple_excel_writer import SimpleExcelWriter
34
35debug = True if sys.gettrace() else False
36
37
38class HDCTool:
39    @classmethod
40    def verify_hdc(cls, verify_str: str = "OpenHarmony") -> bool:
41        """
42        验证hdc是否可用
43        True:可用
44        False:不可用
45        """
46        cp = subprocess.run(["hdc", "--help"], capture_output=True)
47        stdout = str(cp.stdout)
48        stderr = str(cp.stderr)
49        return verify_str in stdout or verify_str in stderr
50
51    @classmethod
52    def verify_device(cls, device_num: str) -> bool:
53        """
54        验证设备是否已经连接
55        True:已连接
56        False:未连接
57        """
58        cp = subprocess.run(["hdc", "list", "targets"], capture_output=True)
59        stdout = str(cp.stdout)
60        stderr = str(cp.stderr)
61        return device_num in stderr or device_num in stdout
62
63    @classmethod
64    def exec(cls, args: list, output_from: str = "stdout"):
65        cp = subprocess.run(args, capture_output=True)
66        if output_from == "stdout":
67            return cp.stdout.decode()
68        elif output_from == "stderr":
69            return cp.stderr.decode()
70        else:
71            print("error: 'output_from' must be stdout or stdin")
72
73
74def delete_values_from_dict(target_dict: typing.Dict, key_list: typing.Iterable):
75    for k in key_list:
76        if k not in target_dict.keys():
77            continue
78        del target_dict[k]
79
80
81class RamAnalyzer:
82    @classmethod
83    def __hidumper_mem_line_process(cls, content: typing.Text) -> typing.List[typing.Text]:
84        """
85        将hidumper的拥有的数据行进行分割,得到
86        [pid, name, pss, vss, rss, uss]格式的list
87        """
88        trival_pattern = re.compile(r"kB|\(.*\)(?#去除单位kB以及小括号内的任意数据,包括小括号)")
89        content = re.sub(trival_pattern, "", content)
90        blank_pattern = re.compile(r"\s+(?#匹配一个或多个空格)")
91        return re.sub(blank_pattern, ' ', content.strip()).split()
92
93    __ss_dict: typing.Dict[str, int] = {
94        "Pss": 2,
95        "Vss": 3,
96        "Rss": 4,
97        "Uss": 5
98    }
99
100    @classmethod
101    def __parse_hidumper_mem(cls, content: typing.Text, device_num: str, ss: str = "Pss") -> typing.Dict[
102        typing.Text, int]:
103        """
104        解析:hidumper --meme的结果
105        返回{process_name: pss}形式的字典
106        '248  	samgr              1464(0 in SwapPss) kB    15064 kB     6928 kB     1072 kB\r'
107        """
108
109        def find_full_process_name(hname: str) -> str:
110            for lname in __process_name_list:
111                if lname.startswith(hname):
112                    return lname
113            return str()
114
115        def process_ps_ef(content: str) -> list:
116            line_list = content.strip().split("\n")[1:]
117            process_name_list = list()
118            for line in line_list:
119                process_name = line.split()[7]
120                if process_name.startswith('['):
121                    continue
122                process_name_list.append(process_name)
123            return process_name_list
124
125        if ss not in cls.__ss_dict.keys():
126            print("error: {} is not a valid parameter".format(ss))
127            return dict()
128        output = content.split('\n')
129        process_pss_dict = dict()
130        __process_name_list: typing.List[str] = process_ps_ef(
131            HDCTool.exec(["hdc", "-t", device_num, "shell", "ps", "-ef"]))
132        for line in output:
133            if "Total Memory Usage by Size" in line:
134                break
135            if line.isspace():
136                continue
137            processed: typing.List[typing.Text] = cls.__hidumper_mem_line_process(
138                line)
139            # 如果第一列不是数字(pid),就过
140            if not processed or not processed[0].isnumeric():
141                continue
142            name = processed[1]  # 否则的话就取名字,和对应的size
143            size = int(processed[cls.__ss_dict.get(ss)]) * \
144                   1024  # kilo byte to byte
145            full_process_name = find_full_process_name(name)
146            if not full_process_name:
147                print(
148                    f"warning: process \"{full_process_name}\" not found in the result of command \"ps -ef\"")
149                continue
150            process_pss_dict[full_process_name] = size
151        return process_pss_dict
152
153    @classmethod
154    def process_hidumper_info(cls, device_num: str, ss: str) -> typing.Dict[str, int]:
155        """
156        处理进程名与对应进程大小
157        """
158
159        def exec_once() -> typing.Dict[str, int]:
160            stdout = HDCTool.exec(
161                ["hdc", "-t", device_num, "shell", "hidumper", "--mem"])
162            name_size_dict = cls.__parse_hidumper_mem(stdout, device_num, ss)
163            return name_size_dict
164
165        if not HDCTool.verify_hdc():
166            print("error: Command 'hdc' not found")
167            return dict()
168        if not HDCTool.verify_device(device_num):
169            print("error: {} is inaccessible or not found".format(device_num))
170            return dict()
171
172        return exec_once()
173
174    @classmethod
175    def __parse_process_json(cls, file_path: str, result_dict: typing.Dict[str, typing.List[str]]):
176        """
177        解析json文件,结存存入 result_dict中,格式:{process_name: os_list}
178        其中,so_list中是so的base_name
179        """
180        if not (os.path.isfile(file_path) and file_path.endswith(".json")):
181            print("warning: {} not exist or not a json file".format(file_path))
182            return
183        with open(file_path, 'r', encoding='utf-8') as f:
184            j_content: typing.Dict[str, typing.Any] = json.load(f)
185        if "process" not in j_content.keys() or "systemability" not in j_content.keys():
186            print(
187                f"warning: {file_path} has no field 'process' or 'systemability'")
188            return
189        process_name: str = j_content.get("process")
190        elf_list: typing.List[str] = list()
191        for sa in j_content.get("systemability"):
192            libpath: str = sa.get("libpath")
193            if not libpath:
194                continue
195            elf_list.append(libpath)
196        result_dict[process_name] = elf_list
197
198    @classmethod
199    def get_elf_info_from_rom_result(cls, rom_result_json: str) -> typing.Dict[str, typing.Dict[str, str]]:
200        """
201        利用rom_analyzer.py的分析结果,重组成
202        {file_base_name: {"subsystem_name":subsystem_name, "component_name":component_name}}
203        的形式
204        """
205        with open(rom_result_json, 'r', encoding='utf-8') as f:
206            rom_info_dict = json.load(f)
207        elf_info_dict: typing.Dict[str, typing.Dict[str, str]] = dict()
208        for subsystem_name in rom_info_dict.keys():
209            sub_val_dict: typing.Dict[str, typing.Any] = rom_info_dict.get(
210                subsystem_name)
211            delete_values_from_dict(sub_val_dict, ["size", "file_count"])
212            for component_name in sub_val_dict.keys():
213                component_val_dict: typing.Dict[str, str] = sub_val_dict.get(
214                    component_name)
215                delete_values_from_dict(component_val_dict, [
216                    "size", "file_count"])
217                for file_name, size in component_val_dict.items():
218                    file_basename: str = os.path.split(file_name)[-1]
219                    elf_info_dict[file_basename] = {
220                        "subsystem_name": subsystem_name,
221                        "component_name": component_name,
222                        "size": size
223                    }
224
225        return elf_info_dict
226
227    @classmethod
228    def __parse_process_cfg(cls, cfg_path: str, profile_path: str, result_dict: dict):
229        """
230        解析cfg,因为有的cfg会拉起xml中的进程,所以也可能会去解析xml
231        """
232        with open(cfg_path, 'r', encoding='utf-8') as f:
233            cfg_dict = json.loads(f.read())
234        services = cfg_dict.get("services")
235        if services is None:
236            print("warning: 'services' not in {}".format(cfg_path))
237            return
238        for service in services:
239            process_name = service.get("name")
240            first, *path_list = service.get("path")
241            if first.endswith("sa_main"):
242                # 由sa_main去来起进程
243                xml_base_name = os.path.split(path_list[0])[-1]
244                cls.__parse_process_json(os.path.join(
245                    profile_path, xml_base_name), result_dict)
246            else:
247                # 直接执行
248                if result_dict.get(process_name) is None:
249                    result_dict[process_name] = list()
250                result_dict.get(process_name).append(os.path.split(first)[-1])
251
252    @classmethod
253    def get_process_so_relationship(cls, cfg_path: str, profile_path: str) -> typing.Dict[
254        str, typing.List[str]]:
255        """
256        parse the relationship between process and elf file
257        """
258        # 从merged_sa里面收集
259        process_elf_dict: typing.Dict[str, typing.List[str]] = dict()
260        cfg_list = glob.glob(cfg_path + os.sep + "*.cfg", recursive=True)
261        for cfg in cfg_list:
262            if debug:
263                print("parsing: ", cfg)
264            try:
265                cls.__parse_process_cfg(cfg, profile_path, process_elf_dict)
266            except:
267                print("parse '{}' failed".format(cfg))
268            finally:
269                ...
270        return process_elf_dict
271
272    @classmethod
273    def __inside_save_result_as_excel(cls, baseline_file, subsystem_name, component_name, component_size,
274                                      component_baseline, process_name, process_size, elf_name, elf_size):
275        if baseline_file:
276            return [subsystem_name, component_name, component_size,
277                    component_baseline, process_name, process_size, elf_name, elf_size]
278        else:
279            return [subsystem_name, component_name, component_size,
280                    process_name, process_size, elf_name, elf_size]
281
282    @classmethod
283    def __save_result_as_excel(cls, data_dict: dict, filename: str, ss: str, baseline_file: str, unit_adapt: bool):
284        """
285        保存结果到excel中
286        子系统:{
287            "size": 1234,
288            部件:{
289                "size":123,
290                "base_line":124,
291                进程:{
292                    "size":12,
293                    "elf":{
294                        "elf_file_1":elf_size,
295                        ...
296                    }
297                }
298            }
299        }
300        """
301        tmp_dict = copy.deepcopy(data_dict)
302        writer = SimpleExcelWriter("ram_info")
303        header_unit = "" if unit_adapt else ", Byte"
304        header = [
305            "subsystem_name", "component_name", f"component_size(ram{header_unit})", "process_name",
306            f"process_size({ss}{header_unit})", "elf", f"elf_size{'' if unit_adapt else '(Byte)'}"
307        ]
308        if baseline_file:
309            header = [
310                "subsystem_name", "component_name", f"component_size(ram{header_unit})", "baseline", "process_name",
311                f"process_size({ss}{header_unit})", "elf", f"elf_size{'' if unit_adapt else '(Byte)'}"
312            ]
313        writer.set_sheet_header(header)
314        subsystem_c = 0
315        subsystem_start_r = 1
316        subsystem_end_r = 0
317
318        component_c = 1
319        component_start_r = 1
320        component_end_r = 0
321        component_size_c = 2
322        baseline_c = 3
323
324        process_start_r = 1
325        process_end_r = 0
326        process_c = 4
327        process_size_c = 5
328        if not baseline_file:
329            process_c -= 1
330            process_size_c -= 1
331        for subsystem_name, subsystem_info in tmp_dict.items():
332            subsystem_size = subsystem_info.get("size")
333            if subsystem_size:
334                del subsystem_info["size"]
335            for component_name, component_info in subsystem_info.items():
336                component_size = component_info.get("size")
337                component_baseline = component_info.get("baseline")
338                if "size" in component_info.keys():
339                    del component_info["size"]
340                if "baseline" in component_info.keys():
341                    del component_info["baseline"]
342                for process_name, process_info in component_info.items():
343                    process_size = process_info.get("size")
344                    elf_info = process_info.get("elf")
345                    for elf_name, elf_size in elf_info.items():
346                        line = cls.__inside_save_result_as_excel(baseline_file, subsystem_name, component_name,
347                                                                 component_size,
348                                                                 component_baseline, process_name, process_size,
349                                                                 elf_name, elf_size)
350                        writer.append_line(line)
351                    elf_count = len(elf_info)
352                    process_end_r += elf_count
353                    component_end_r += elf_count
354                    subsystem_end_r += elf_count
355                    writer.write_merge(
356                        process_start_r, process_c, process_end_r, process_c, process_name)
357                    writer.write_merge(
358                        process_start_r, process_size_c, process_end_r, process_size_c, process_size)
359                    process_start_r = process_end_r + 1
360                writer.write_merge(component_start_r, component_c,
361                                   component_end_r, component_c, component_name)
362                writer.write_merge(component_start_r, component_size_c,
363                                   component_end_r, component_size_c, component_size)
364                if baseline_file:
365                    writer.write_merge(component_start_r, baseline_c,
366                                       component_end_r, baseline_c, component_baseline)
367                component_start_r = component_end_r + 1
368            writer.write_merge(subsystem_start_r, subsystem_c,
369                               subsystem_end_r, subsystem_c, subsystem_name)
370            subsystem_start_r = subsystem_end_r + 1
371        writer.save(filename)
372
373    @classmethod
374    def find_elf_size_from_rom_result(cls, service_name: str, subsystem_name: str, component_name: str,
375                                      evaluator: typing.Callable, rom_result_dict: typing.Dict[str, typing.Dict]) -> \
376            typing.Tuple[
377                bool, str, str, int]:
378        """
379        全局查找进程的相关elf文件
380        subsystem_name与component_name可明确指定,或为*以遍历整个dict
381        evaluator:评估elf文件的从phone下面开始的路径与service_name的关系,评判如何才是找到了
382        returns: 是否查找到,elf文件名,部件名,size
383        """
384        subsystem_name_list = [
385            subsystem_name] if subsystem_name != "*" else rom_result_dict.keys()
386        for sn in subsystem_name_list:
387            sub_val_dict = rom_result_dict.get(sn)
388            component_name_list = [
389                component_name] if component_name != '*' else sub_val_dict.keys()
390            for cn in component_name_list:
391                if cn == "size" or cn == "file_count":
392                    continue
393                component_val_dict: typing.Dict[str,
394                int] = sub_val_dict.get(cn)
395                for k, v in component_val_dict.items():
396                    if k == "size" or k == "file_count":
397                        continue
398                    if not evaluator(service_name, k):
399                        continue
400                    return True, os.path.split(k)[-1], sn, cn, v
401        return False, str(), str(), str(), int()
402
403    @classmethod
404    def add_baseline(self, refactored_result_dict: Dict, baseline_file: str) -> None:
405        with open(baseline_file, 'r', encoding='utf-8') as f:
406            baseline_dict = json.load(f)
407        for subsystem_name, subsystem_info in refactored_result_dict.items():
408            for component_name, component_info in subsystem_info.items():
409                if component_name == "size":
410                    continue
411                if not baseline_dict.get(subsystem_name):
412                    continue
413                if not baseline_dict[subsystem_name].get(component_name):
414                    continue
415                component_info["baseline"] = baseline_dict[subsystem_name][component_name].get(
416                    "ram")
417
418    @classmethod
419    def inside_refactored_result_unit_adaptive(cls, process_info):
420        for elf_name, elf_size in process_info["elf"].items():
421            process_info["elf"][elf_name] = unit_adaptive(elf_size)
422        return process_info
423
424    @classmethod
425    def refactored_result_unit_adaptive(cls, result_dict: Dict[str, Dict]) -> None:
426        for subsystem_name, subsystem_info in result_dict.items():
427            sub_size = unit_adaptive(subsystem_info["size"])
428            del subsystem_info["size"]
429            for component_name, component_info in subsystem_info.items():
430                com_size = unit_adaptive(component_info["size"])
431                del component_info["size"]
432                for process_name, process_info in component_info.items():
433                    pro_size = unit_adaptive(process_info["size"])
434                    del process_info["size"]
435                    process_info = cls.inside_refactored_result_unit_adaptive(process_info)
436                    process_info["size"] = pro_size
437                component_info["size"] = com_size
438            subsystem_info["size"] = sub_size
439
440    @classmethod
441    def result_process1(cls, result_dict, process_name, process_size, elf, size):
442        result_dict[process_name] = dict()
443        result_dict[process_name]["size"] = process_size
444        result_dict[process_name]["startup"] = dict()
445        result_dict[process_name]["startup"]["init"] = dict()
446        result_dict[process_name]["startup"]["init"][elf if len(
447            elf) != 0 else "UNKNOWN"] = size
448        return result_dict
449
450    @classmethod
451    def result_process2(cls, result_dict, process_name, subsystem_name, process_size, component_name, hap_name, size):
452        result_dict[process_name] = dict()
453        result_dict[process_name]["size"] = process_size
454        result_dict[process_name][subsystem_name] = dict()
455        result_dict[process_name][subsystem_name][component_name] = dict()
456        result_dict[process_name][subsystem_name][component_name][hap_name if len(
457            hap_name) != 0 else "UNKNOWN"] = size
458        return result_dict
459
460    @classmethod
461    def result_process3(cls, result_dict, process_name, process_size):
462        result_dict[process_name] = dict()
463        result_dict[process_name]["size"] = process_size
464        result_dict[process_name]["UNKNOWN"] = dict()
465        result_dict[process_name]["UNKNOWN"]["UNKNOWN"] = dict()
466        result_dict[process_name]["UNKNOWN"]["UNKNOWN"]["UNKNOWN"] = int()
467        return result_dict
468
469    @classmethod
470    def result_process4(cls, result_dict, process_size_dict, rom_result_dict, process_elf_dict, so_info_dict):
471        def get(key: typing.Any, dt: typing.Dict[str, typing.Any]):
472            for k, v in dt.items():
473                if k.startswith(key) or (len(v) > 0 and key == v[0]):
474                    # 要么uinput_inject的对应key为mmi_uinput_inject。对于此类特殊处理,即:如果service_name找不到,但是直接执行的bin等于这个名字,也认为找到
475                    return v
476
477        for process_name, process_size in process_size_dict.items():  # 从进程出发
478            if not process_name:
479                print("warning: an empty 'process_name' has been found.")
480                continue
481            # 如果部件是init,特殊处理
482            if process_name == "init":
483                _, elf, _, _, size = cls.find_elf_size_from_rom_result(process_name, "startup", "init",
484                                                                       lambda x, y: os.path.split(y)[
485                                                                                        -1].lower() == x.lower(),
486                                                                       rom_result_dict)
487                result_dict = cls.result_process1(result_dict, process_name, process_size, elf, size)
488                continue
489            # 如果是hap,特殊处理
490            if (process_name.startswith("com.") or process_name.startswith("ohos.")):
491                _, hap_name, subsystem_name, component_name, size = cls.find_elf_size_from_rom_result(process_name, "*",
492                                                                                                      "*",
493                                                                                                      lambda x, y: len(
494                                                                                                          y.split(
495                                                                                '/')) >= 3 and x.lower().startswith(
496                                                                                                    y.split('/')[
497                                                                                                        2].lower()),
498                                                                                                      rom_result_dict)
499                result_dict = cls.result_process2(result_dict, process_name, subsystem_name, process_size,
500                                                  component_name, hap_name, size)
501                continue
502            # 得到进程相关的elf文件list
503            so_list: list = get(process_name, process_elf_dict)
504            if so_list is None:
505                print("warning: process '{}' not found in .json or .cfg".format(
506                    process_name))
507                result_dict = cls.result_process3(result_dict, process_name, process_size)
508                continue
509            result_dict[process_name] = dict()
510            result_dict[process_name]["size"] = process_size
511            for so in so_list:
512                unit = so_info_dict.get(so)
513                if unit is None:
514                    result_dict[process_name]["UNKNOWN"] = dict()
515                    result_dict[process_name]["UNKNOWN"]["UNKNOWN"] = dict()
516                    result_dict[process_name]["UNKNOWN"]["UNKNOWN"][so] = int()
517                    print("warning: '{}' in {} not found in json from rom analysis result".format(
518                        so, process_name))
519                    continue
520                component_name = unit.get("component_name")
521                subsystem_name = unit.get("subsystem_name")
522                so_size = unit.get("size")
523                if result_dict.get(process_name).get(subsystem_name) is None:
524                    result_dict[process_name][subsystem_name] = dict()
525                if result_dict.get(process_name).get(subsystem_name).get(component_name) is None:
526                    result_dict[process_name][subsystem_name][component_name] = dict()
527                result_dict[process_name][subsystem_name][component_name][so] = so_size
528        return result_dict
529
530    @classmethod
531    def analysis(cls, cfg_path: str, json_path: str, rom_result_json: str, device_num: str,
532                 output_file: str, ss: str, output_excel: bool, baseline_file: str, unit_adapt: bool):
533        """
534        process size subsystem/component so so_size
535        """
536        if not HDCTool.verify_hdc():
537            print("error: Command 'hdc' not found")
538            return
539        if not HDCTool.verify_device(device_num):
540            print("error: {} is inaccessible or not found".format(device_num))
541            return
542        with open(rom_result_json, 'r', encoding='utf-8') as f:
543            rom_result_dict: typing.Dict = json.loads(f.read())
544        # 从rom的分析结果中将需要的elf信息重组
545        so_info_dict: typing.Dict[
546            str, typing.Dict[str["component_name|subsystem_name|size"], str]] = cls.get_elf_info_from_rom_result(
547            rom_result_json)
548        process_elf_dict: typing.Dict[str, typing.List[str]] = cls.get_process_so_relationship(cfg_path,
549                                                                                               json_path)
550        process_size_dict: typing.Dict[str, int] = cls.process_hidumper_info(
551            device_num, ss)
552        result_dict: typing.Dict[str, typing.Dict[str, typing.Any]] = dict()
553        result_dict = cls.result_process4(result_dict, process_size_dict, rom_result_dict, process_elf_dict,
554                                          so_info_dict)
555        base_dir, _ = os.path.split(output_file)
556        if len(base_dir) != 0 and not os.path.isdir(base_dir):
557            os.makedirs(base_dir, exist_ok=True)
558        with open(output_file + ".json", 'w', encoding='utf-8') as f:
559            json.dump(result_dict, f, indent=4)
560        refactored_result: Dict[str, Dict] = refacotr_result(result_dict)
561        if unit_adapt:
562            cls.refactored_result_unit_adaptive(refactored_result)
563        if baseline_file:
564            cls.add_baseline(refactored_result, baseline_file)
565        with open(f"refactored_{output_file}.json", 'w', encoding='utf-8') as f:
566            json.dump(refactored_result, f, indent=4)
567        if output_excel:
568            cls.__save_result_as_excel(
569                refactored_result, output_file + ".xls", ss, baseline_file, unit_adapt)
570
571
572def inside_refacotr_result(component_info, refactored_ram_dict, subsystem_name, component_name, process_name,
573                           process_size):
574    for elf_name, elf_size in component_info.items():
575        if not refactored_ram_dict.get(subsystem_name):
576            refactored_ram_dict[subsystem_name] = dict()
577            refactored_ram_dict[subsystem_name]["size"] = 0
578        if not refactored_ram_dict[subsystem_name].get(component_name):
579            refactored_ram_dict[subsystem_name][component_name] = dict(
580            )
581            refactored_ram_dict[subsystem_name][component_name]["size"] = 0
582        refactored_ram_dict[subsystem_name][component_name][process_name] = dict(
583        )
584        refactored_ram_dict[subsystem_name][component_name][process_name]["size"] = process_size
585        refactored_ram_dict[subsystem_name][component_name][process_name]["elf"] = dict(
586        )
587        refactored_ram_dict[subsystem_name][component_name][process_name]["elf"][elf_name] = elf_size
588        refactored_ram_dict[subsystem_name]["size"] += process_size
589        refactored_ram_dict[subsystem_name][component_name]["size"] += process_size
590    return refactored_ram_dict
591
592
593def refacotr_result(ram_result: Dict[str, Dict]) -> Dict[str, Dict]:
594    refactored_ram_dict: Dict[str, Dict] = dict()
595    for process_name, process_info in ram_result.items():
596        process_size = process_info.get("size")
597        del process_info["size"]
598        for subsystem_name, subsystem_info in process_info.items():
599            for component_name, component_info in subsystem_info.items():
600                refactored_ram_dict = inside_refacotr_result(component_info, refactored_ram_dict, subsystem_name,
601                                                             component_name, process_name, process_size)
602    return refactored_ram_dict
603
604
605def get_args():
606    VERSION = 1.0
607    parser = argparse.ArgumentParser(
608        description="analyze ram size of component"
609    )
610    parser.add_argument("-v", "-version", action="version",
611                        version=f"version {VERSION}")
612    parser.add_argument("-s", "--json_path", type=str, required=True,
613                        help="path of sa json file. eg: -x ~/openharmony/out/rk3568/packages/phone/system/profile")
614    parser.add_argument("-c", "--cfg_path", type=str, required=True,
615                        help="path of cfg files. eg: -c ./cfgs/")
616    parser.add_argument("-j", "--rom_result", type=str, default="./rom_analysis_result.json",
617                        help="json file produced by rom_analyzer_v1.0.py, default: ./rom_analysis_result.json."
618                             "eg: -j ./demo/rom_analysis_result.json")
619    parser.add_argument("-n", "--device_num", type=str, required=True,
620                        help="device number to be collect hidumper info. eg: -n 7001005458323933328a01fce16d3800")
621    parser.add_argument("-b", "--baseline_file", type=str, default="",
622                        help="baseline file of rom and ram generated by rom analysis.")
623    parser.add_argument("-o", "--output_filename", default="ram_analysis_result", type=str,
624                        help="base name of output file, default: ram_analysis_result. eg: -o ram_analysis_result")
625    parser.add_argument("-u", "--unit_adaptive",
626                        action="store_true", help="unit adaptive")
627    parser.add_argument("-e", "--excel", type=bool, default=False,
628                        help="if output result as excel, default: False. eg: -e True")
629    args = parser.parse_args()
630    return args
631
632
633def abspath(path: str) -> str:
634    return os.path.abspath(os.path.expanduser(path))
635
636
637if __name__ == '__main__':
638    args = get_args()
639    cfg_path_name = abspath(args.cfg_path)
640    profile_path_name = abspath(args.json_path)
641    rom_result = args.rom_result
642    device = args.device_num
643    output_filename = args.output_filename
644    baseline = args.baseline_file
645    output_excel_path = args.excel
646    unit_adaptiv = args.unit_adaptive
647    RamAnalyzer.analysis(cfg_path_name, profile_path_name, rom_result,
648                         device_num=device, output_file=output_filename, ss="Pss", output_excel=output_excel_path,
649                         baseline_file=baseline, unit_adapt=unit_adaptiv)
650