1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import subprocess 21import json 22import xml.etree.ElementTree as ET 23 24 25def get_config_ip(filepath): 26 ip_config = "" 27 sn = "" 28 port = "" 29 try: 30 data_dic = {} 31 if os.path.exists(filepath): 32 tree = ET.parse(filepath) 33 root = tree.getroot() 34 for node in root.findall("environment/device"): 35 if node.attrib["type"] != "usb-hdc": 36 continue 37 for sub in node: 38 data_dic[sub.tag] = sub.text if sub.text else "" 39 ip_config = data_dic.get("ip", "") 40 sn = data_dic.get("sn", "") 41 port = data_dic.get("port", "") 42 except ET.ParseError as xml_exception: 43 print("occurs exception:{}".format(xml_exception.args)) 44 45 return ip_config, port, sn 46 47 48def get_config_ip_info(filepath): 49 ip_config = "" 50 sn = "" 51 port = "" 52 if not os.path.exists(filepath): 53 return ip_config, port, sn 54 try: 55 data_dic = {} 56 tree = ET.parse(filepath) 57 root = tree.getroot() 58 for node in root.findall("environment/device"): 59 if node.attrib["type"] == "usb-hdc": 60 break 61 for sub in node: 62 if sub.tag != "info": 63 continue 64 ip_config = sub.attrib["ip"] 65 port = sub.attrib["port"] 66 sn = sub.attrib["sn"] 67 except ET.ParseError as xml_exception: 68 print("occurs exception:{}".format(xml_exception.args)) 69 except KeyError as err: 70 print(f"Key error: {err}") 71 72 return ip_config, port, sn 73 74 75def get_sn_list(command): 76 device_sn_list = [] 77 # 执行查询设备sn号命令并获取执行结果 78 proc = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE, 79 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 80 strout = proc.stdout.read() 81 if isinstance(strout, bytes): 82 strout = strout.decode("utf-8", "ignore") 83 for line in strout.split("\n"): 84 line = line.strip().replace('\t', ' ') 85 if line != "": 86 device_sn_list.append(line) 87 88 return device_sn_list 89 90 91def get_all_part_service(): 92 current_path = os.getcwd() 93 root_path = current_path.split("/test/testfwk/developer_test")[0] 94 developer_path = os.path.join(root_path, "test/testfwk/developer_test") 95 system_part_service_path = os.path.join( 96 developer_path, "local_coverage/resident_service/system_part_service.json") 97 if os.path.exists(system_part_service_path): 98 with open(system_part_service_path, "r") as system_text: 99 system_text_json = json.load(system_text) 100 system_info_dict = system_text_json["system_info_dict"] 101 services_component_dict = system_text_json["services_component_dict"] 102 component_gcda_dict = system_text_json["component_gcda_dict"] 103 return system_info_dict, services_component_dict, component_gcda_dict 104 print("%s not exists.", system_part_service_path) 105 return {}, {}, {} 106 107 108def get_system_dict_to_server_name(server_name: str, system_info_dict): 109 for system, server_list in system_info_dict.items(): 110 if server_name in server_list: 111 system_info_dict_after = { 112 system: [server_name] 113 } 114 return system_info_dict_after 115 return {} 116 117 118def get_server_dict(command): 119 system_info_dict, services_component_dict, component_gcda_dict = get_all_part_service() 120 system_info_dict_after = {} 121 services_component_dict_after = {} 122 component_gcda_dict_after = {} 123 server_name = None 124 if " -ts " in command: 125 _, testsuite = command.split(" -ts ") 126 if testsuite in services_component_dict.get("dinput"): 127 services_component_dict_after = { 128 "dinput": [testsuite] 129 } 130 server_name = "dinput" 131 elif testsuite in services_component_dict.get("softbus_server"): 132 services_component_dict_after = { 133 "softbus_server": [testsuite] 134 } 135 server_name = "softbus_server" 136 if server_name: 137 system_info_dict_after = get_system_dict_to_server_name(server_name, system_info_dict) 138 component_gcda_dict_after = { 139 server_name: component_gcda_dict.get(server_name) 140 } 141 elif " -tp " in command: 142 component_name = command.split(" -tp ")[-1].split(" ")[0] 143 for server, component_list in services_component_dict.items(): 144 if component_name in component_list: 145 if server in ["dinput", "softbus_server"]: 146 break 147 services_component_dict_after = { 148 server: [component_name] 149 } 150 server_name = server 151 break 152 if server_name: 153 system_info_dict_after = get_system_dict_to_server_name(server_name, system_info_dict) 154 component_gcda_dict_after = { 155 server_name: component_gcda_dict.get(server_name) 156 } 157 elif " -ss " in command: 158 system_name = command.split(" -ss ")[-1].split(" ")[0] 159 server_list = system_info_dict.get(system_name) if system_info_dict.get(system_name) else [] 160 system_info_dict_after = { 161 system_name: server_list 162 } 163 for server_name in server_list: 164 services_component_dict_after.update({ 165 server_name: services_component_dict.get(server_name) 166 }) 167 component_gcda_dict_after.update({ 168 server_name: component_gcda_dict.get(server_name) 169 }) 170 return system_info_dict_after, services_component_dict_after, component_gcda_dict_after 171