• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# This file is for rom analyzation of lite/small devices.
17
18import sys
19import argparse
20import json
21import logging
22import os
23from typing import *
24import copy
25import preprocess
26from time import time
27from concurrent.futures import ThreadPoolExecutor, Future
28from threading import RLock
29import collections
30
31from config import result_dict, collector_config, configs, \
32    project_path, sub_com_dict, product_name, recollect_gn
33from pkgs.basic_tool import BasicTool
34from pkgs.gn_common_tool import GnCommonTool
35from pkgs.simple_excel_writer import SimpleExcelWriter
36from misc import gn_lineno_collect
37
38
39"""
401. 先收集BUILD.gn中的target信息
412. 然后根据编译产物到1中进行搜索,匹配其所属的部件
42
43对于ohos开头的template,主要根据其component字段和subsystem_name字段来归数其部件;同时也要考虑install_dir字段
44对于gn原生的template,主要根据bundle.json中的字段来归属其部件
45
46对于找不到的,可以模糊匹配,如,有产物libxxx,则可以在所有的BUILD.gn中搜索xxx,并设置一个阀值予以过滤
47"""
48
49
50class RomAnalysisTool:
51    @classmethod
52    def collect_gn_info(cls):
53        logging.info("start scanning BUILD.gn")
54        with ThreadPoolExecutor(max_workers=len(collector_config) + 1) as pool:
55            future_list: List[Future] = list()
56            for c in collector_config:
57                future_list.append(pool.submit(c))
58            for f in future_list:
59                f.result()
60        gn_info_file = configs["gn_info_file"]
61        with open(gn_info_file, 'w', encoding='utf-8') as f:
62            json.dump(result_dict, f, indent=4)
63
64    @classmethod
65    def _add_rest_dir(cls, top_dir: str, rela_path: str, sub_path: str, dir_list: List[str]) -> None:
66        """
67        :top_dir 顶层目录,不会变化
68        :rela_path 最顶层的值为空
69        :sub_path 一般是a/b/c这种形式
70        :dir_list 相对于原始top目录的子目录的全路径
71        example:
72        /
73        |-a
74        |-b
75        |-c
76        |-|-d
77        |-|-e
78        |-|-f
79        |-|-|-g
80        |-|-|-h
81        top_dir: /
82        rela_path: ""
83        sub_path: c/e
84        dir_list: [c]
85        => [c/d, c/f], assume 'a' and 'b' has been removed from dir_list
86        """
87        if (not sub_path) or (os.sep not in sub_path):
88            return
89        # 将其他目录添加到dir_list
90        t, sub_sub_path = sub_path.split(os.sep, 1)   # 如果是c/e,分割成c,e
91        t = os.path.join(rela_path, t)
92        if t in dir_list:
93            dir_list.remove(t)
94        sub_sub_dir_list = os.listdir(os.path.join(top_dir, t))
95        for ssdl in sub_sub_dir_list:
96            if os.path.join(rela_path, sub_path) != os.path.join(t, ssdl):
97                dir_list.append(os.path.join(t, ssdl))
98        if not sub_sub_dir_list:
99            return
100        cls._add_rest_dir(top_dir, t, sub_sub_path, dir_list)
101
102    @classmethod
103    def _find_files(cls, product_name: str) -> Dict[str, List[str]]:
104        product_dir: Dict[str, Dict] = configs[product_name]["product_dir"]
105        if not product_name:
106            logging.error(
107                f"product_name '{product_name}' not found in the config.yaml")
108            exit(1)
109        product_path_dit: Dict[str, str] = dict()   # 存储编译产物的类型及目录
110        root_dir = product_dir.get("root")
111        root_dir = os.path.join(project_path, root_dir)
112        relative_dir: Dict[str, str] = product_dir.get("relative")
113        if not relative_dir:
114            logging.warning(
115                f"'relative_dir' of {product_name} not found in the config.yaml")
116            exit(1)
117        # 除了so a hap bin外的全部归到etc里面
118        for k, v in relative_dir.items():
119            product_path_dit[k] = os.path.join(root_dir, v)
120        # 查找编译产物信息
121        # product_dict格式: {"so": ["a.so", "b.so"]}
122        product_dict: Dict[str, List[str]] = dict()  # 存储编译产物的名称
123        for k, v in product_path_dit.items():
124            if not os.path.exists(v):
125                logging.warning(f"dir '{v}' not exist")
126            product_dict[k] = BasicTool.find_files_with_pattern(v)  # v是全路径
127        if product_dir.get("rest"):
128            rest_dir_list: List[str] = os.listdir(
129                root_dir)  # 除了配置在relative下之外的所有剩余目录,全部归到etc下
130            for v in relative_dir.values():
131                if v in rest_dir_list:
132                    rest_dir_list.remove(v)
133            for v in relative_dir.values():
134                if os.sep in v:
135                    cls._add_rest_dir(root_dir, str(), v, rest_dir_list)
136            if "etc" not in product_dict.keys():
137                product_dict["etc"] = list()
138            for r in rest_dir_list:
139                product_dict["etc"].extend(
140                    BasicTool.find_files_with_pattern(os.path.join(root_dir, r)))
141        return product_dict
142
143    @classmethod
144    def collect_product_info(cls, product_name: str):
145        logging.info("start scanning compile products")
146        product_dict: Dict[str, List[str]] = cls._find_files(product_name)
147        with open(configs[product_name]["product_infofile"], 'w', encoding='utf-8') as f:
148            json.dump(product_dict, f, indent=4)
149        return product_dict
150
151    @classmethod
152    def _put(cls, sub: str, com: str, unit: Dict, rom_size_dict: Dict):
153        size = unit.get("size")
154        if not rom_size_dict.get("size"):   # 总大小
155            rom_size_dict["size"] = 0
156        if not rom_size_dict.get(sub):  # 子系统大小
157            rom_size_dict[sub]: Dict[str, Dict] = dict()
158            rom_size_dict[sub]["size"] = 0
159            rom_size_dict[sub]["count"] = 0
160
161        if not rom_size_dict.get(sub).get(com):  # 部件
162            rom_size_dict.get(sub)[com] = dict()
163            rom_size_dict[sub][com]["filelist"] = list()
164            rom_size_dict[sub][com]["size"] = 0
165            rom_size_dict[sub][com]["count"] = 0
166
167        rom_size_dict[sub][com]["filelist"].append(unit)
168        rom_size_dict[sub][com]["size"] += size
169        rom_size_dict[sub][com]["count"] += 1
170        rom_size_dict[sub]["size"] += size
171        rom_size_dict[sub]["count"] += 1
172        rom_size_dict["size"] += size
173
174    @classmethod
175    def _fuzzy_match(cls, file_name: str, filter_path_keyword: Tuple[str] = tuple()) -> Tuple[str, str, str]:
176        """
177        直接grep,利用出现次数最多的BUILD.gn去定位subsystem_name和component_name"""
178        logging.info(f"fuzzy match: {file_name}")
179        _, base_name = os.path.split(file_name)
180        if base_name.startswith("lib"):
181            base_name = base_name[3:]
182        if base_name.endswith(".a"):
183            base_name = base_name[:base_name.index(".a")]
184        elif base_name.endswith(".z.so"):
185            base_name = base_name[:base_name.index(".z.so")]
186        elif base_name.endswith(".so"):
187            base_name = base_name[:base_name.index(".so")]
188        exclude_dir = configs["black_list"]
189        tbl = [x for x in exclude_dir if os.sep in x]
190
191        def handler(content: Text) -> List[str]:
192            t = list(filter(lambda y: len(y) > 0, list(
193                map(lambda x: x.strip(), content.split("\n")))))
194            for item in tbl:
195                p = os.path.join(project_path, item)
196                t = list(filter(lambda x: p not in x, t))
197            return t
198        grep_result: List[str] = BasicTool.grep_ern(
199            base_name,
200            project_path,
201            include="BUILD.gn",
202            exclude=tuple(exclude_dir),
203            post_handler=handler)
204        if filter_path_keyword:
205            tmp = list()
206            for gr in grep_result:
207                for item in filter_path_keyword:
208                    if item in gr:
209                        continue
210                    tmp.append(gr)
211            grep_result = tmp
212        if not grep_result:
213            logging.info(f"fuzzy match failed.")
214            return str(), str(), str()
215        gn_dict: Dict[str, int] = collections.defaultdict(int)
216        for g in grep_result:
217            gn = g.split(':')[0].replace(project_path, "").lstrip(os.sep)
218            gn_dict[gn] += 1
219        gn_file, _ = collections.Counter(gn_dict).most_common(1)[0]
220        for k, v in sub_com_dict.items():
221            if gn_file.startswith(k):
222                s = v.get("subsystem")
223                c = v.get("component")
224                logging.info(
225                    f"fuzzy match success: subsystem_name={s}, component_name={c}")
226                return gn_file, s, c
227        logging.info(f"fuzzy match failed.")
228        return str(), str(), str()
229
230    @classmethod
231    def _save_as_xls(cls, result_dict: Dict, product_name: str) -> None:
232        logging.info("saving as xls...")
233        header = ["subsystem_name", "component_name",
234                  "output_file", "size(Byte)"]
235        tmp_dict = copy.deepcopy(result_dict)
236        excel_writer = SimpleExcelWriter("rom")
237        excel_writer.set_sheet_header(headers=header)
238        subsystem_start_row = 1
239        subsystem_end_row = 0
240        subsystem_col = 0
241        component_start_row = 1
242        component_end_row = 0
243        component_col = 1
244        del tmp_dict["size"]
245        for subsystem_name in tmp_dict.keys():
246            subsystem_dict = tmp_dict.get(subsystem_name)
247            subsystem_size = subsystem_dict.get("size")
248            subsystem_file_count = subsystem_dict.get("count")
249            del subsystem_dict["count"]
250            del subsystem_dict["size"]
251            subsystem_end_row += subsystem_file_count
252
253            for component_name in subsystem_dict.keys():
254                component_dict: Dict[str, int] = subsystem_dict.get(
255                    component_name)
256                component_size = component_dict.get("size")
257                component_file_count = component_dict.get("count")
258                del component_dict["count"]
259                del component_dict["size"]
260                component_end_row += component_file_count
261
262                for fileinfo in component_dict.get("filelist"):
263                    file_name = fileinfo.get("file_name")
264                    file_size = fileinfo.get("size")
265                    excel_writer.append_line(
266                        [subsystem_name, component_name, file_name, file_size])
267                excel_writer.write_merge(component_start_row, component_col, component_end_row, component_col,
268                                         component_name)
269                component_start_row = component_end_row + 1
270            excel_writer.write_merge(subsystem_start_row, subsystem_col, subsystem_end_row, subsystem_col,
271                                     subsystem_name)
272            subsystem_start_row = subsystem_end_row + 1
273        output_name: str = configs[product_name]["output_name"]
274        output_name = output_name.replace(".json", ".xls")
275        excel_writer.save(output_name)
276        logging.info("save as xls success.")
277
278    @ classmethod
279    def analysis(cls, product_name: str, product_dict: Dict[str, List[str]]):
280        logging.info("start analyzing...")
281        gn_info_file = configs["gn_info_file"]
282        with open(gn_info_file, 'r', encoding='utf-8') as f:
283            gn_info = json.load(f)
284        query_order: Dict[str, List[str]
285                          ] = configs[product_name]["query_order"]
286        query_order["etc"] = configs["target_type"] # etc会查找所有的template
287        rom_size_dict: Dict = dict()
288        for t, l in product_dict.items():
289            for f in l:  # 遍历所有文件
290                if os.path.isdir(f):
291                    continue
292                find_flag = False
293                type_list = query_order.get(t)
294                _, base_name = os.path.split(f)
295                size = os.path.getsize(f)
296                if not type_list:
297                    logging.warning(
298                        f"'{t}' not found in query_order of the config.yaml")
299                    break
300                for tn in type_list:    # tn example: ohos_shared_library
301                    if find_flag:   # 如果已经在前面的template中找到了,后面的就不必再查找
302                        break
303                    output_dict: Dict[str, Dict] = gn_info.get(
304                        tn)  # 这个模板对应的所有可能编译产物
305                    if not output_dict:
306                        logging.warning(
307                            f"'{tn}' not found in the {gn_info_file}")
308                        continue
309                    d = output_dict.get(base_name)
310                    if not d:
311                        continue
312                    d["size"] = size
313                    d["file_name"] = f.replace(project_path, "")
314                    cls._put(d["subsystem_name"],
315                             d["component_name"], d, rom_size_dict)
316                    find_flag = True
317                if not find_flag:   # 如果指定序列中的template都没有查找到,则模糊匹配
318                    # fuzzy match
319                    psesudo_gn, sub, com = cls._fuzzy_match(f)
320                    if sub and com:
321                        cls._put(sub, com, {
322                            "subsystem_name": sub,
323                            "component_name": com,
324                            "psesudo_gn_path": psesudo_gn,
325                            "description": "fuzzy match",
326                            "file_name": f.replace(project_path, ""),
327                            "size": size,
328                        }, rom_size_dict)
329                        find_flag = True
330                if not find_flag:   # 模糊匹配都没有匹配到的,归属到NOTFOUND
331                    cls._put("NOTFOUND", "NOTFOUND", {
332                        "file_name": f.replace(project_path, ""),
333                        "size": size,
334                    }, rom_size_dict)
335        with open(configs[product_name]["output_name"], 'w', encoding='utf-8') as f:
336            json.dump(rom_size_dict, f, indent=4)
337        cls._save_as_xls(rom_size_dict, product_name)
338        logging.info("success")
339
340
341def main():
342    if recollect_gn:
343        RomAnalysisTool.collect_gn_info()
344    product_dict: Dict[str, List[str]
345                       ] = RomAnalysisTool.collect_product_info(product_name)
346    RomAnalysisTool.analysis(product_name, product_dict)
347
348
349if __name__ == "__main__":
350    main()
351