• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import json
21import stat
22import subprocess
23import time
24from subprocess import Popen, PIPE, STDOUT, TimeoutExpired
25
26
27def logger(content, level):
28    """
29    日志打印
30    :param content:日志内容
31    :param level: 日志等级
32    :return:
33    """
34    create_time = "{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
35    print("[{}] [{}] [{}]".format(create_time, level, content))
36
37
38def json_parse(json_file):
39    """
40    json文件解析为json对象
41    :param json_file:json文件
42    :return:json对象
43    """
44    if os.path.exists(json_file):
45        with open(json_file, "r") as jf:
46            return json.load(jf)
47
48    logger("{} not exist.".format(json_file), "ERROR")
49    return {}
50
51
52def get_product_name(root_path):
53    """
54    从ohos_config.json中获取编译产物路径
55    :param root_path: ohos_config.json所在的目录
56    :return: 编译产量生成的路径
57    """
58    ohos_config = os.path.join(root_path, "out", "ohos_config.json")
59    json_obj = json_parse(ohos_config)
60    if json_obj:
61        product_name = json_obj["out_path"].split("out")[1].strip("/")
62        return product_name
63
64    logger("{} not exist.".format(ohos_config), "ERROR")
65    return ""
66
67
68def get_target_cpu(root_path):
69    """
70    从ohos_config.json中获取编译cpu
71    :param root_path: ohos_config.json所在的目录
72    :return: 编译产量生成的路径
73    """
74    ohos_config = os.path.join(root_path, "out", "ohos_config.json")
75    json_obj = json_parse(ohos_config)
76    if json_obj:
77        target_cpu = json_obj["target_cpu"]
78        return target_cpu
79
80    logger("{} not exist.".format(ohos_config), "ERROR")
81    return ""
82
83
84def shell_command(command_list: list):
85    """
86    命令行执行命令
87    :param command_list:命令参数列表
88    :return:
89    """
90    process = Popen(command_list, stdout=PIPE, stderr=STDOUT)
91    try:
92        outs, errs = process.communicate(timeout=900)
93    except TimeoutExpired:
94        process.kill()
95        outs, errs = process.communicate()
96    logger(outs.decode("utf-8").strip(), "INFO")
97
98    return errs, process.returncode
99
100
101def hdc_command(device_ip, device_port, device_sn, command):
102    """
103    hdc对远程映射的设备执行命令
104    :param device_ip:远程映射的ip
105    :param device_port:hdc端口
106    :param device_sn:设备sn号
107    :param command:
108    :return:
109    """
110    connect_cmd = "hdc -s {}:{} -t {} ".format(device_ip, device_port, device_sn)
111    cmd = connect_cmd + command
112    cmd_list = cmd.split(" ")
113    logger(cmd_list, "INFO")
114    _, exitcode = shell_command(cmd_list)
115    return exitcode
116
117
118def coverage_command(command):
119    """
120    coverage_command
121    :param command:
122    :return:
123    """
124    proc = subprocess.Popen(command, shell=True)
125    try:
126        proc.communicate()
127    except subprocess.TimeoutExpired:
128        proc.kill()
129        proc.terminate()
130
131
132def tree_find_file_endswith(path, suffix, file_list=None):
133    """
134    获取目录下所有以指定字符串结尾的文件
135    :param path: 需要遍历的目录
136    :param suffix: 后缀
137    :param file_list:
138    :return:
139    """
140    for f in os.listdir(path):
141        full_path = os.path.join(path, f)
142        if os.path.isfile(full_path) and full_path.endswith(suffix):
143            file_list.append(full_path)
144        if os.path.isdir(full_path):
145            tree_find_file_endswith(full_path, suffix, file_list)
146    return file_list
147
148
149class FoundationServer:
150    """
151    foundation拆分的进程和其对应的so之间的对应关系
152    """
153    lib_dict = {
154        "ams": ["libabilityms.z.so", "libdataobsms.z.so", "libupms.z.so", "libappms.z.so"],
155        "bms": ["libbms.z.so"],
156        "call": ["libtel_call_manager.z.so"],
157        "dms": ["libdistributed_ability_manager_svr.z.so"],
158        "fms": ["libfms.z.so"],
159        "notification": ["libcesfwk_services.z.so", "libans.z.so"],
160        "power": ["libbatteryservice.z.so", "libdisplaymgrservice.z.so", "libpowermgrservice.z.so",
161                  "libthermalservice.z.so", "libbatterystats_service.z.so"],
162        "state": ["libtel_state_registry.z.so"],
163        "wms": ["libwms.z.so"],
164        "theme": ["libscreenlock_server.z.so"]
165    }
166
167
168def is_elffile(filepath: str) -> bool:
169    """
170    判断文件是否二进制文件
171    :param filepath:
172    :return: bool
173    """
174    if not os.path.exists(filepath):
175        logger("{} not exists.".format(filepath), "ERROR")
176        return False
177
178    try:
179        file_states = os.stat(filepath)
180        file_mode = file_states[stat.ST_MODE]
181        if not stat.S_ISREG(file_mode):
182            return False
183        with open(filepath, "rb") as f:
184            header = (bytearray(f.read(4)[1:4])).decode(encoding="utf-8")
185            if header in ["ELF"]:
186                return True
187    except UnicodeDecodeError as e:
188        logger(e, "ERROR")
189
190    return False
191