1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import sys 21 22 23def _init_sys_config(): 24 sys.localcoverage_path = os.path.join(current_path, "..") 25 sys.path.insert(0, sys.localcoverage_path) 26 27 28def get_subsystem_name(partname: str) -> str: 29 """ 30 获取部件所属的子系统名 31 :param partname: 32 :return: 子系统名 33 """ 34 parts_info_json = os.path.join(out_path, "build_configs", "parts_info", "part_subsystem.json") 35 if not os.path.exists(parts_info_json): 36 logger("{} not exists.".format(parts_info_json), "ERROR") 37 return "" 38 json_obj = json_parse(parts_info_json) 39 if json_obj: 40 if partname not in json_obj: 41 logger("{} part not exist in {}".format(partname, parts_info_json), "ERROR") 42 return "" 43 return json_obj[partname] 44 return "" 45 46 47def find_part_so_dest_path(test_part: str) -> str: 48 """ 49 获取指定部件的obj目录 50 :param test_part:部件名称 51 :return:部件obj目录 52 """ 53 parts_info_json = os.path.join(out_path, "build_configs", "parts_info", "parts_path_info.json") 54 if not os.path.exists(parts_info_json): 55 logger("{} not exists.".format(parts_info_json), "ERROR") 56 return "" 57 json_obj = json_parse(parts_info_json) 58 if json_obj: 59 if test_part not in json_obj: 60 logger("{} part not exist in {}.".format(test_part, parts_info_json), "ERROR") 61 return "" 62 path = os.path.join(out_path, "obj", json_obj[test_part]) 63 return path 64 65 return "" 66 67 68def find_subsystem_so_dest_path(sub_system: str) -> list: 69 """ 70 获取指定子系统的obj目录 71 :param sub_system:子系统名 72 :return: 子系统下所有部件obj目录的列表 73 """ 74 subsystem_config_json = os.path.join(out_path, "build_configs", "subsystem_info", "subsystem_build_config.json") 75 if not os.path.exists(subsystem_config_json): 76 logger("{} not exists.".format(subsystem_config_json), "ERROR") 77 return [] 78 79 json_obj = json_parse(subsystem_config_json) 80 if json_obj: 81 if sub_system not in json_obj["subsystem"]: 82 logger("{} not exist in subsystem_build_config.json".format(sub_system), "ERROR") 83 return [] 84 if "path" not in json_obj["subsystem"][sub_system]: 85 logger("{} no path in subsystem_build_config.json".format(sub_system), "ERROR") 86 return [] 87 88 path = list() 89 for s in json_obj["subsystem"][sub_system]["path"]: 90 path.append(os.path.join(out_path, "obj", s)) 91 return path 92 93 return [] 94 95 96def find_so_source_dest(path: str, subsystem_name: str) -> dict: 97 """ 98 获取so和设备里所在目录的对应关系 99 :param path: 子系统obj目录 100 :return: so和设备里所在目录的对应关系dict 101 """ 102 so_dict = dict() 103 json_list = list() 104 if not path: 105 return {} 106 107 json_list = tree_find_file_endswith(path, "_module_info.json", json_list) 108 109 for j in json_list: 110 json_obj = json_parse(j) 111 if "subsystem_name" not in json_obj: 112 continue 113 if json_obj["subsystem_name"] != subsystem_name: 114 continue 115 if "source" not in json_obj or "dest" not in json_obj: 116 logger("{} json file error.".format(j), "ERROR") 117 return {} 118 119 source_path = os.path.join(out_path, json_obj["source"]) 120 if source_path.endswith(".so"): 121 so_dict[source_path] = [tmp for tmp in json_obj["dest"] if ( 122 tmp.startswith("system/") or tmp.startswith("vendor/"))] 123 124 return so_dict 125 126 127def push_coverage_so(so_dict: dict): 128 """ 129 推送so到设备 130 :param so_dict: so和设备里目录对应dict 131 :return: 132 """ 133 if not so_dict: 134 logger("No coverage so to push.", "INFO") 135 return 136 for device in device_sn_list: 137 cmd = "shell mount -o rw,remount /" 138 hdc_command(device_ip, device_port, device, cmd) 139 for source_path, dest_paths in so_dict.items(): 140 if not os.path.exists(source_path): 141 logger("{} not exist.".format(source_path), "ERROR") 142 continue 143 for dest_path in dest_paths: 144 full_dest = os.path.join("/", dest_path) 145 command = "file send {} {}".format(source_path, full_dest) 146 hdc_command(device_ip, device_port, device, command) 147 148 149if __name__ == "__main__": 150 current_path = os.path.abspath(os.path.dirname(__name__)) 151 152 _init_sys_config() 153 from localCoverage.resident_service.public_method import get_config_ip, get_sn_list 154 from localCoverage.utils import get_product_name, hdc_command, tree_find_file_endswith,\ 155 json_parse, logger, is_elffile 156 157 root_path = current_path.split("/test/testfwk/developer_test")[0] 158 out_path = os.path.join(root_path, "out", get_product_name(root_path)) 159 developer_path = os.path.join(root_path, "test", "testfwk", "developer_test") 160 161 # 获取远程映射相关hdc参数 162 device_ip, device_port, device_sn_strs = get_config_ip(os.path.join(developer_path, "config", "user_config.xml")) 163 if not device_port: 164 device_port = "8710" 165 if not device_sn_strs: 166 device_sn_list = get_sn_list("hdc -s {}:{} list targets".format(device_ip, device_port)) 167 else: 168 device_sn_list = device_sn_strs.split(";") 169 170 subsystem_list, testpart_list = [], [] 171 testtype = sys.argv[1] 172 param_list = sys.argv[2:] 173 174 # 入参为ss子系统和tp部件分别处理 175 if testtype == "testpart": 176 for param in param_list: 177 testpart_list.append(param.strip("[").strip("]").strip(",")) 178 for testpart in testpart_list: 179 json_path_list = find_part_so_dest_path(testpart) 180 subsystem = get_subsystem_name(testpart) 181 source_dest_dict = find_so_source_dest(json_path_list, subsystem) 182 push_coverage_so(source_dest_dict) 183 else: 184 for param in param_list: 185 subsystem_list.append(param.strip("[").strip("]").strip(",")) 186 187 if len(subsystem_list) == 1: 188 subsystem = subsystem_list[0] 189 json_path_list = find_subsystem_so_dest_path(subsystem) 190 for json_path in json_path_list: 191 source_dest_dict = find_so_source_dest(json_path, subsystem) 192 push_coverage_so(source_dest_dict) 193