1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import subprocess 21import json 22import xml.etree.ElementTree as ET 23 24 25def get_config_ip(filepath): 26 ip_config = "" 27 sn = "" 28 port = "" 29 try: 30 data_dic = {} 31 if os.path.exists(filepath): 32 tree = ET.parse(filepath) 33 root = tree.getroot() 34 for node in root.findall("environment/device"): 35 if node.attrib["type"] != "usb-hdc": 36 continue 37 for sub in node: 38 data_dic[sub.tag] = sub.text if sub.text else "" 39 ip_config = data_dic.get("ip", "") 40 sn = data_dic.get("sn", "") 41 port = data_dic.get("port", "") 42 except ET.ParseError as xml_exception: 43 print("occurs exception:{}".format(xml_exception.args)) 44 45 return ip_config, port, sn 46 47 48def get_sn_list(command): 49 device_sn_list = [] 50 # 执行查询设备sn号命令并获取执行结果 51 proc = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE, 52 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 53 strout = proc.stdout.read() 54 if isinstance(strout, bytes): 55 strout = strout.decode("utf-8", "ignore") 56 for line in strout.split("\n"): 57 line = line.strip().replace('\t', ' ') 58 if line != "": 59 device_sn_list.append(line) 60 61 return device_sn_list 62 63 64def get_all_part_service(): 65 current_path = os.getcwd() 66 root_path = current_path.split("/test/testfwk/developer_test")[0] 67 developer_path = os.path.join(root_path, "test/testfwk/developer_test") 68 system_part_service_path = os.path.join( 69 developer_path, "localCoverage/resident_service/system_part_service.json") 70 if os.path.exists(system_part_service_path): 71 with open(system_part_service_path, "r") as system_text: 72 system_text_json = json.load(system_text) 73 system_info_dict = system_text_json["system_info_dict"] 74 services_component_dict = system_text_json["services_component_dict"] 75 component_gcda_dict = system_text_json["component_gcda_dict"] 76 return system_info_dict, services_component_dict, component_gcda_dict 77 print("%s not exists.", system_part_service_path) 78 return {}, {}, {} 79 80 81def get_system_dict_to_server_name(server_name: str, system_info_dict): 82 for system, server_list in system_info_dict.items(): 83 if server_name in server_list: 84 system_info_dict_after = { 85 system: [server_name] 86 } 87 return system_info_dict_after 88 return {} 89 90 91def get_server_dict(command): 92 system_info_dict, services_component_dict, component_gcda_dict = get_all_part_service() 93 system_info_dict_after = {} 94 services_component_dict_after = {} 95 component_gcda_dict_after = {} 96 server_name = None 97 if " -ts " in command: 98 _, testsuite = command.split(" -ts ") 99 if testsuite in services_component_dict.get("dinput"): 100 services_component_dict_after = { 101 "dinput": [testsuite] 102 } 103 server_name = "dinput" 104 elif testsuite in services_component_dict.get("softbus_server"): 105 services_component_dict_after = { 106 "softbus_server": [testsuite] 107 } 108 server_name = "softbus_server" 109 if server_name: 110 system_info_dict_after = get_system_dict_to_server_name(server_name, system_info_dict) 111 component_gcda_dict_after = { 112 server_name: component_gcda_dict.get(server_name) 113 } 114 elif " -tp " in command: 115 component_name = command.split(" -tp ")[-1].split(" ")[0] 116 for server, component_list in services_component_dict.items(): 117 if component_name in component_list: 118 if server in ["dinput", "softbus_server"]: 119 break 120 services_component_dict_after = { 121 server: [component_name] 122 } 123 server_name = server 124 break 125 if server_name: 126 system_info_dict_after = get_system_dict_to_server_name(server_name, system_info_dict) 127 component_gcda_dict_after = { 128 server_name: component_gcda_dict.get(server_name) 129 } 130 elif " -ss " in command: 131 system_name = command.split(" -ss ")[-1].split(" ")[0] 132 server_list = system_info_dict.get(system_name) if system_info_dict.get(system_name) else [] 133 system_info_dict_after = { 134 system_name: server_list 135 } 136 for server_name in server_list: 137 services_component_dict_after.update({ 138 server_name: services_component_dict.get(server_name) 139 }) 140 component_gcda_dict_after.update({ 141 server_name: component_gcda_dict.get(server_name) 142 }) 143 return system_info_dict_after, services_component_dict_after, component_gcda_dict_after 144