• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# This file is for rom analyzation of lite/small devices.
17
18import sys
19import argparse
20import json
21import logging
22import os
23from typing import *
24import copy
25import preprocess
26from time import time
27from concurrent.futures import ThreadPoolExecutor, Future
28from threading import RLock
29import collections
30
31from config import result_dict, collector_config, configs, \
32    project_path, sub_com_dict, product_name, recollect_gn, baseline, unit_adapt
33from pkgs.basic_tool import BasicTool, unit_adaptive
34from pkgs.gn_common_tool import GnCommonTool
35from pkgs.simple_excel_writer import SimpleExcelWriter
36from pkgs.rom_ram_baseline_collector import RomRamBaselineCollector
37from misc import gn_lineno_collect
38
39
40"""
411. 先收集BUILD.gn中的target信息
422. 然后根据编译产物到1中进行搜索,匹配其所属的部件
43
44对于ohos开头的template,主要根据其component字段和subsystem_name字段来归数其部件;同时也要考虑install_dir字段
45对于gn原生的template,主要根据bundle.json中的字段来归属其部件
46
47对于找不到的,可以模糊匹配,如,有产物libxxx,则可以在所有的BUILD.gn中搜索xxx,并设置一个阀值予以过滤
48"""
49
50
51class RomAnalysisTool:
52    @classmethod
53    def collect_gn_info(cls):
54        logging.info("start scanning BUILD.gn")
55        with ThreadPoolExecutor(max_workers=len(collector_config) + 1) as pool:
56            future_list: List[Future] = list()
57            for c in collector_config:
58                future_list.append(pool.submit(c))
59            for f in future_list:
60                f.result()
61        gn_info_file = configs["gn_info_file"]
62        with open(gn_info_file, 'w', encoding='utf-8') as f:
63            json.dump(result_dict, f, indent=4)
64
65    @classmethod
66    def _add_rest_dir(cls, top_dir: str, rela_path: str, sub_path: str, dir_list: List[str]) -> None:
67        """
68        :top_dir 顶层目录,不会变化
69        :rela_path 最顶层的值为空
70        :sub_path 一般是a/b/c这种形式
71        :dir_list 相对于原始top目录的子目录的全路径
72        example:
73        /
74        |-a
75        |-b
76        |-c
77        |-|-d
78        |-|-e
79        |-|-f
80        |-|-|-g
81        |-|-|-h
82        top_dir: /
83        rela_path: ""
84        sub_path: c/e
85        dir_list: [c]
86        => [c/d, c/f], assume 'a' and 'b' has been removed from dir_list
87        """
88        if (not sub_path) or (os.sep not in sub_path):
89            return
90        # 将其他目录添加到dir_list
91        t, sub_sub_path = sub_path.split(os.sep, 1)  # 如果是c/e,分割成c,e
92        t = os.path.join(rela_path, t)
93        if t in dir_list:
94            dir_list.remove(t)
95        sub_sub_dir_list = os.listdir(os.path.join(top_dir, t))
96        for ssdl in sub_sub_dir_list:
97            if os.path.join(rela_path, sub_path) != os.path.join(t, ssdl):
98                dir_list.append(os.path.join(t, ssdl))
99        if not sub_sub_dir_list:
100            return
101        cls._add_rest_dir(top_dir, t, sub_sub_path, dir_list)
102
103    @classmethod
104    def _find_files(cls, product_name: str) -> Dict[str, List[str]]:
105        product_dir: Dict[str, Dict] = configs[product_name]["product_dir"]
106        if not product_name:
107            logging.error(
108                f"product_name '{product_name}' not found in the config.yaml")
109            exit(1)
110        product_path_dit: Dict[str, str] = dict()  # 存储编译产物的类型及目录
111        root_dir = product_dir.get("root")
112        root_dir = os.path.join(project_path, root_dir)
113        relative_dir: Dict[str, str] = product_dir.get("relative")
114        if not relative_dir:
115            logging.warning(
116                f"'relative_dir' of {product_name} not found in the config.yaml")
117            exit(1)
118        # 除了so a hap bin外的全部归到etc里面
119        for k, v in relative_dir.items():
120            product_path_dit[k] = os.path.join(root_dir, v)
121        # 查找编译产物信息
122        # product_dict格式: {"so": ["a.so", "b.so"]}
123        product_dict: Dict[str, List[str]] = dict()  # 存储编译产物的名称
124        for k, v in product_path_dit.items():
125            if not os.path.exists(v):
126                logging.warning(f"dir '{v}' not exist")
127            product_dict[k] = BasicTool.find_files_with_pattern(v)  # v是全路径
128        if product_dir.get("rest"):
129            rest_dir_list: List[str] = os.listdir(
130                root_dir)  # 除了配置在relative下之外的所有剩余目录,全部归到etc下
131            for v in relative_dir.values():
132                if v in rest_dir_list:
133                    rest_dir_list.remove(v)
134            for v in relative_dir.values():
135                if os.sep in v:
136                    cls._add_rest_dir(root_dir, str(), v, rest_dir_list)
137            if "etc" not in product_dict.keys():
138                product_dict["etc"] = list()
139            for r in rest_dir_list:
140                product_dict["etc"].extend(
141                    BasicTool.find_files_with_pattern(os.path.join(root_dir, r)))
142        return product_dict
143
144    @classmethod
145    def collect_product_info(cls, product_name: str):
146        logging.info("start scanning compile products")
147        product_dict: Dict[str, List[str]] = cls._find_files(product_name)
148        with open(configs[product_name]["product_infofile"], 'w', encoding='utf-8') as f:
149            json.dump(product_dict, f, indent=4)
150        return product_dict
151
152    @classmethod
153    def _put(cls, sub: str, com: str, unit: Dict, rom_size_dict: Dict, com_size_baseline: str = str()):
154        size = unit.get("size")
155        if not rom_size_dict.get("size"):  # 总大小
156            rom_size_dict["size"] = 0
157        if not rom_size_dict.get(sub):  # 子系统大小
158            rom_size_dict[sub]: Dict[str, Dict] = dict()
159            rom_size_dict[sub]["size"] = 0
160            rom_size_dict[sub]["count"] = 0
161
162        if not rom_size_dict.get(sub).get(com):  # 部件
163            rom_size_dict.get(sub)[com] = dict()
164            rom_size_dict[sub][com]["filelist"] = list()
165            rom_size_dict[sub][com]["size"] = 0
166            rom_size_dict[sub][com]["count"] = 0
167
168        if (sub != "NOTFOUND" and sub != "UNDEFINED" and com != "NOTFOUND" and com != "UNDEFINED") \
169                and (not rom_size_dict.get(sub).get(com).get("baseline")) and baseline:
170            rom_size_dict[sub][com]["baseline"] = com_size_baseline
171
172        rom_size_dict[sub][com]["filelist"].append(unit)
173        rom_size_dict[sub][com]["size"] += size
174        rom_size_dict[sub][com]["count"] += 1
175        rom_size_dict[sub]["size"] += size
176        rom_size_dict[sub]["count"] += 1
177        rom_size_dict["size"] += size
178
179    @classmethod
180    def _fuzzy_match(cls, file_name: str, filter_path_keyword: Tuple[str] = tuple()) -> Tuple[str, str, str]:
181        """
182        TODO 应当先遍历gn_info进行匹配
183        直接grep,利用出现次数最多的BUILD.gn去定位subsystem_name和component_name"""
184        logging.info(f"fuzzy match: {file_name}")
185        _, base_name = os.path.split(file_name)
186        if base_name.startswith("lib"):
187            base_name = base_name[3:]
188        if base_name.endswith(".a"):
189            base_name = base_name[:base_name.index(".a")]
190        elif base_name.endswith(".z.so"):
191            base_name = base_name[:base_name.index(".z.so")]
192        elif base_name.endswith(".so"):
193            base_name = base_name[:base_name.index(".so")]
194        exclude_dir = configs["black_list"]
195        tbl = [x for x in exclude_dir if os.sep in x]
196
197        def handler(content: Text) -> List[str]:
198            t = list(filter(lambda y: len(y) > 0, list(
199                map(lambda x: x.strip(), content.split("\n")))))
200            for item in tbl:
201                p = os.path.join(project_path, item)
202                t = list(filter(lambda x: p not in x, t))
203            return t
204        grep_result: List[str] = BasicTool.grep_ern(
205            base_name,
206            project_path,
207            include="BUILD.gn",
208            exclude=tuple(exclude_dir),
209            post_handler=handler)
210        if filter_path_keyword:
211            tmp = list()
212            for gr in grep_result:
213                for item in filter_path_keyword:
214                    if item in gr:
215                        continue
216                    tmp.append(gr)
217            grep_result = tmp
218        if not grep_result:
219            logging.info(f"fuzzy match failed.")
220            return str(), str(), str()
221        gn_dict: Dict[str, int] = collections.defaultdict(int)
222        for g in grep_result:
223            gn = g.split(':')[0].replace(project_path, "").lstrip(os.sep)
224            gn_dict[gn] += 1
225        gn_file, _ = collections.Counter(gn_dict).most_common(1)[0]
226        for k, v in sub_com_dict.items():
227            if gn_file.startswith(k):
228                s = v.get("subsystem")
229                c = v.get("component")
230                logging.info(
231                    f"fuzzy match success: subsystem_name={s}, component_name={c}")
232                return gn_file, s, c
233        logging.info(f"fuzzy match failed.")
234        return str(), str(), str()
235
236    @classmethod
237    def _save_as_xls(cls, result_dict: Dict, product_name: str, baseline: bool) -> None:
238        logging.info("saving as xls...")
239        header = ["subsystem_name", "component_name",
240                  "output_file", "size(Byte)"]
241        if baseline:
242            header = ["subsystem_name", "component_name", "baseline",
243                      "output_file", "size(Byte)"]
244        tmp_dict = copy.deepcopy(result_dict)
245        excel_writer = SimpleExcelWriter("rom")
246        excel_writer.set_sheet_header(headers=header)
247        subsystem_start_row = 1
248        subsystem_end_row = 0
249        subsystem_col = 0
250        component_start_row = 1
251        component_end_row = 0
252        component_col = 1
253        baseline_col = 2
254        del tmp_dict["size"]
255        for subsystem_name in tmp_dict.keys():
256            subsystem_dict = tmp_dict.get(subsystem_name)
257            subsystem_size = subsystem_dict.get("size")
258            subsystem_file_count = subsystem_dict.get("count")
259            del subsystem_dict["count"]
260            del subsystem_dict["size"]
261            subsystem_end_row += subsystem_file_count
262
263            for component_name in subsystem_dict.keys():
264                component_dict: Dict[str, int] = subsystem_dict.get(
265                    component_name)
266                component_size = component_dict.get("size")
267                component_file_count = component_dict.get("count")
268                component_baseline = component_dict.get("baseline")
269                if component_baseline:
270                    del component_dict["baseline"]
271                del component_dict["count"]
272                del component_dict["size"]
273                component_end_row += component_file_count
274
275                for fileinfo in component_dict.get("filelist"):
276                    file_name = fileinfo.get("file_name")
277                    file_size = fileinfo.get("size")
278                    line = [subsystem_name, component_name,
279                            file_name, file_size]
280                    if baseline:
281                        line = [subsystem_name, component_name,
282                                component_baseline, file_name, file_size]
283                    excel_writer.append_line(line)
284                excel_writer.write_merge(component_start_row, component_col, component_end_row, component_col,
285                                         component_name)
286                if baseline:
287                    excel_writer.write_merge(component_start_row, baseline_col, component_end_row, baseline_col,
288                                             component_baseline)
289                component_start_row = component_end_row + 1
290            excel_writer.write_merge(subsystem_start_row, subsystem_col, subsystem_end_row, subsystem_col,
291                                     subsystem_name)
292            subsystem_start_row = subsystem_end_row + 1
293        output_name: str = configs[product_name]["output_name"]
294        output_name = output_name.replace(".json", ".xls")
295        excel_writer.save(output_name)
296        logging.info("save as xls success.")
297
298    @classmethod
299    def _result_unit_adaptive(cls, result_dict: Dict[str, Dict]) -> None:
300        total_size = unit_adaptive(result_dict["size"])
301        del result_dict["size"]
302        for subsystem_name, subsystem_info in result_dict.items():
303            sub_size = unit_adaptive(subsystem_info["size"])
304            count = subsystem_info["count"]
305            del subsystem_info["size"]
306            del subsystem_info["count"]
307            for component_name, component_info in subsystem_info.items():
308                component_info["size"] = unit_adaptive(component_info["size"])
309            subsystem_info["size"] = sub_size
310            subsystem_info["count"] = count
311        result_dict["size"] = total_size
312
313    @classmethod
314    def _match_manual_configured(cls, manual_config_info: Dict[str, Dict], compiled_files: Dict[str, List], compiled_root_path: str, result_dict: Dict[str, Dict]) -> None:
315        for file_path, file_info in manual_config_info.items():
316            full_path = os.path.join(
317                project_path, compiled_root_path, file_path)
318            if not os.path.isfile(full_path):
319                logging.warning(f"config error: {file_path} is not a file.")
320                continue
321            file_info["size"] = os.path.getsize(full_path)
322            file_info["file_name"] = full_path
323            cls._put(file_info["subsystem"],
324                     file_info["component"], file_info, result_dict)
325            for _, v in compiled_files.items():
326                if full_path not in v:
327                    continue
328                index = v.index(full_path)
329                del v[index]
330                break
331
332    @classmethod
333    def _iterate_all_template_type(cls, type_list: List[str], gn_info: Dict, gn_info_file: str, base_name: str, rom_ram_baseline: Dict, rom_size_dict: Dict, f: str, size: int):
334        find_flag = False
335        component_rom_baseline = None
336        for tn in type_list:  # tn example: ohos_shared_library
337            if find_flag:  # 如果已经在前面的template中找到了,后面的就不必再查找
338                break
339            output_dict: Dict[str, Dict] = gn_info.get(
340                tn)  # 这个模板对应的所有可能编译产物
341            if not output_dict:
342                logging.warning(
343                    f"'{tn}' not found in the {gn_info_file}")
344                continue
345            d = output_dict.get(base_name)
346            if not d:
347                continue
348            d["size"] = size
349            d["file_name"] = f.replace(project_path, "")
350            if rom_ram_baseline.get(d["subsystem_name"]) and rom_ram_baseline.get(d["subsystem_name"]).get(d["component_name"]):
351                component_rom_baseline = rom_ram_baseline.get(
352                    d["subsystem_name"]).get(d["component_name"]).get("rom")
353            cls._put(d["subsystem_name"],
354                     d["component_name"], d, rom_size_dict, component_rom_baseline)
355            find_flag = True
356        if not find_flag:  # 如果指定序列中的template都没有查找到,则模糊匹配
357            # fuzzy match
358            psesudo_gn, sub, com = cls._fuzzy_match(f)
359            if sub and com:
360                if rom_ram_baseline.get(sub) and rom_ram_baseline.get(sub).get(com):
361                    component_rom_baseline = rom_ram_baseline.get(
362                        sub).get(com).get("baseline")
363                cls._put(sub, com, {
364                    "subsystem_name": sub,
365                    "component_name": com,
366                    "psesudo_gn_path": psesudo_gn,
367                    "description": "fuzzy match",
368                    "file_name": f.replace(project_path, ""),
369                    "size": size,
370                }, rom_size_dict, component_rom_baseline)
371                find_flag = True
372        if not find_flag:  # 模糊匹配都没有匹配到的,归属到NOTFOUND
373            cls._put("NOTFOUND", "NOTFOUND", {
374                "file_name": f.replace(project_path, ""),
375                "size": size,
376            }, rom_size_dict)
377
378    @classmethod
379    def _subsystem_component_for_all_product_file(cls, product_dict: Dict[str, List[str]], query_order: Dict[str, List[str]],
380                                                  gn_info: Dict, gn_info_file: str, rom_ram_baseline: Dict, rom_size_dict: Dict):
381        for t, l in product_dict.items():
382            for f in l:  # 遍历所有文件
383                if os.path.isdir(f):
384                    continue
385                type_list = query_order.get(t)
386                _, base_name = os.path.split(f)
387                size = os.path.getsize(f)
388                if not type_list:
389                    logging.warning(
390                        f"'{t}' not found in query_order of the config.yaml")
391                    break
392                cls._iterate_all_template_type(
393                    type_list, gn_info, gn_info_file, base_name, rom_ram_baseline, rom_size_dict, f, size)
394
395    @classmethod
396    def analysis(cls, product_name: str, product_dict: Dict[str, List[str]]):
397        """analysis the rom of lite/small product
398
399        Args:
400            product_name (str): product name configured in the yaml
401            product_dict (Dict[str, List[str]]): result dict of compiled product file
402                format:
403                    "bin":[...],
404                    "so":[...]
405                    ...
406        """
407        logging.info("start analyzing...")
408        rom_ram_baseline: Dict[str, Dict] = RomRamBaselineCollector.collect(
409            project_path)
410        with open("rom_ram_baseline.json", 'w', encoding='utf-8') as f:
411            json.dump(rom_ram_baseline, f, indent=4)
412        gn_info_file = configs["gn_info_file"]  # filename to save gn_info
413        with open(gn_info_file, 'r', encoding='utf-8') as f:
414            gn_info = json.load(f)
415        query_order: Dict[str, List[str]
416                          ] = configs[product_name]["query_order"]  # query order of the gn template to be matched
417        query_order["etc"] = configs["target_type"]  # etc会查找所有的template
418        rom_size_dict: Dict = dict()
419        if "manual_config" in configs[product_name].keys():
420            cls._match_manual_configured(
421                configs[product_name]["manual_config"], product_dict, configs[product_name]["product_dir"]["root"], rom_size_dict)
422        cls._subsystem_component_for_all_product_file(
423            product_dict, query_order, gn_info, gn_info_file, rom_ram_baseline, rom_size_dict)
424        if unit_adapt:
425            cls._result_unit_adaptive(rom_size_dict)
426        with open(configs[product_name]["output_name"], 'w', encoding='utf-8') as f:
427            json.dump(rom_size_dict, f, indent=4)
428        cls._save_as_xls(rom_size_dict, product_name, baseline)
429        logging.info("success")
430
431
432def main():
433    if recollect_gn:
434        RomAnalysisTool.collect_gn_info()
435    product_dict: Dict[str, List[str]
436                       ] = RomAnalysisTool.collect_product_info(product_name)
437    RomAnalysisTool.analysis(product_name, product_dict)
438
439
440if __name__ == "__main__":
441    main()
442