1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import json 21import stat 22import subprocess 23import time 24from subprocess import Popen, PIPE, STDOUT, TimeoutExpired 25 26 27def logger(content, level): 28 """ 29 日志打印 30 :param content:日志内容 31 :param level: 日志等级 32 :return: 33 """ 34 create_time = "{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())) 35 print("[{}] [{}] [{}]".format(create_time, level, content)) 36 37 38def json_parse(json_file): 39 """ 40 json文件解析为json对象 41 :param json_file:json文件 42 :return:json对象 43 """ 44 if os.path.exists(json_file): 45 with open(json_file, "r") as jf: 46 return json.load(jf) 47 48 logger("{} not exist.".format(json_file), "ERROR") 49 return {} 50 51 52def get_product_name(root_path): 53 """ 54 从ohos_config.json中获取编译产物路径 55 :param root_path: ohos_config.json所在的目录 56 :return: 编译产量生成的路径 57 """ 58 ohos_config = os.path.join(root_path, "out", "ohos_config.json") 59 json_obj = json_parse(ohos_config) 60 if json_obj: 61 product_name = json_obj["out_path"].split("out")[1].strip("/") 62 return product_name 63 64 logger("{} not exist.".format(ohos_config), "ERROR") 65 return "" 66 67 68def get_target_cpu(root_path): 69 """ 70 从ohos_config.json中获取编译cpu 71 :param root_path: ohos_config.json所在的目录 72 :return: 编译产量生成的路径 73 """ 74 ohos_config = os.path.join(root_path, "out", "ohos_config.json") 75 json_obj = json_parse(ohos_config) 76 if json_obj: 77 target_cpu = json_obj["target_cpu"] 78 return target_cpu 79 80 logger("{} not exist.".format(ohos_config), "ERROR") 81 return "" 82 83 84def shell_command(command_list: list): 85 """ 86 命令行执行命令 87 :param command_list:命令参数列表 88 :return: 89 """ 90 process = Popen(command_list, stdout=PIPE, stderr=STDOUT) 91 try: 92 outs, errs = process.communicate(timeout=900) 93 except TimeoutExpired: 94 process.kill() 95 outs, errs = process.communicate() 96 logger(outs.decode("utf-8").strip(), "INFO") 97 98 return errs, process.returncode 99 100 101def hdc_command(device_ip, device_port, device_sn, command): 102 """ 103 hdc对远程映射的设备执行命令 104 :param device_ip:远程映射的ip 105 :param device_port:hdc端口 106 :param device_sn:设备sn号 107 :param command: 108 :return: 109 """ 110 connect_cmd = "hdc -s {}:{} -t {} ".format(device_ip, device_port, device_sn) 111 cmd = connect_cmd + command 112 cmd_list = cmd.split(" ") 113 logger(cmd_list, "INFO") 114 _, exitcode = shell_command(cmd_list) 115 return exitcode 116 117 118def coverage_command(command): 119 """ 120 coverage_command 121 :param command: 122 :return: 123 """ 124 proc = subprocess.Popen(command, shell=True) 125 try: 126 proc.communicate() 127 except subprocess.TimeoutExpired: 128 proc.kill() 129 proc.terminate() 130 131 132def tree_find_file_endswith(path, suffix, file_list=None): 133 """ 134 获取目录下所有以指定字符串结尾的文件 135 :param path: 需要遍历的目录 136 :param suffix: 后缀 137 :param file_list: 138 :return: 139 """ 140 for f in os.listdir(path): 141 full_path = os.path.join(path, f) 142 if os.path.isfile(full_path) and full_path.endswith(suffix): 143 file_list.append(full_path) 144 if os.path.isdir(full_path): 145 tree_find_file_endswith(full_path, suffix, file_list) 146 return file_list 147 148 149class FoundationServer: 150 """ 151 foundation拆分的进程和其对应的so之间的对应关系 152 """ 153 lib_dict = { 154 "ams": ["libabilityms.z.so", "libdataobsms.z.so", "libupms.z.so", "libappms.z.so"], 155 "bms": ["libbms.z.so"], 156 "call": ["libtel_call_manager.z.so"], 157 "dms": ["libdistributed_ability_manager_svr.z.so"], 158 "fms": ["libfms.z.so"], 159 "notification": ["libcesfwk_services.z.so", "libans.z.so"], 160 "power": ["libbatteryservice.z.so", "libdisplaymgrservice.z.so", "libpowermgrservice.z.so", 161 "libthermalservice.z.so", "libbatterystats_service.z.so"], 162 "state": ["libtel_state_registry.z.so"], 163 "wms": ["libwms.z.so"], 164 "theme": ["libscreenlock_server.z.so"] 165 } 166 167 168def is_elffile(filepath: str) -> bool: 169 """ 170 判断文件是否二进制文件 171 :param filepath: 172 :return: bool 173 """ 174 if not os.path.exists(filepath): 175 logger("{} not exists.".format(filepath), "ERROR") 176 return False 177 178 try: 179 file_states = os.stat(filepath) 180 file_mode = file_states[stat.ST_MODE] 181 if not stat.S_ISREG(file_mode): 182 return False 183 with open(filepath, "rb") as f: 184 header = (bytearray(f.read(4)[1:4])).decode(encoding="utf-8") 185 if header in ["ELF"]: 186 return True 187 except UnicodeDecodeError as e: 188 logger(e, "ERROR") 189 190 return False 191