• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import subprocess
21import json
22import xml.etree.ElementTree as ET
23
24
25def get_config_ip(filepath):
26    ip_config = ""
27    sn = ""
28    port = ""
29    try:
30        data_dic = {}
31        if os.path.exists(filepath):
32            tree = ET.parse(filepath)
33            root = tree.getroot()
34            for node in root.findall("environment/device"):
35                if node.attrib["type"] != "usb-hdc":
36                    continue
37                for sub in node:
38                    data_dic[sub.tag] = sub.text if sub.text else ""
39                ip_config = data_dic.get("ip", "")
40                sn = data_dic.get("sn", "")
41                port = data_dic.get("port", "")
42    except ET.ParseError as xml_exception:
43        print("occurs exception:{}".format(xml_exception.args))
44
45    return ip_config, port, sn
46
47
48def get_config_ip_info(filepath):
49    ip_config = ""
50    sn = ""
51    port = ""
52    if not os.path.exists(filepath):
53        return ip_config, port, sn
54    try:
55        data_dic = {}
56        tree = ET.parse(filepath)
57        root = tree.getroot()
58        for node in root.findall("environment/device"):
59            if node.attrib["type"] == "usb-hdc":
60                break
61        for sub in node:
62            if sub.tag != "info":
63                continue
64            ip_config = sub.attrib["ip"]
65            port = sub.attrib["port"]
66            sn = sub.attrib["sn"]
67    except ET.ParseError as xml_exception:
68        print("occurs exception:{}".format(xml_exception.args))
69    except KeyError as err:
70        print(f"Key error: {err}")
71
72    return ip_config, port, sn
73
74
75def get_sn_list(command):
76    device_sn_list = []
77    # 执行查询设备sn号命令并获取执行结果
78    proc = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE,
79                            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
80    strout = proc.stdout.read()
81    if isinstance(strout, bytes):
82        strout = strout.decode("utf-8", "ignore")
83    for line in strout.split("\n"):
84        line = line.strip().replace('\t', ' ')
85        if line != "":
86            device_sn_list.append(line)
87
88    return device_sn_list
89
90
91def get_all_part_service():
92    current_path = os.getcwd()
93    root_path = current_path.split("/test/testfwk/developer_test")[0]
94    developer_path = os.path.join(root_path, "test/testfwk/developer_test")
95    system_part_service_path = os.path.join(
96        developer_path, "local_coverage/resident_service/system_part_service.json")
97    if os.path.exists(system_part_service_path):
98        with open(system_part_service_path, "r") as system_text:
99            system_text_json = json.load(system_text)
100            system_info_dict = system_text_json["system_info_dict"]
101            services_component_dict = system_text_json["services_component_dict"]
102            component_gcda_dict = system_text_json["component_gcda_dict"]
103            return system_info_dict, services_component_dict, component_gcda_dict
104    print("%s not exists.", system_part_service_path)
105    return {}, {}, {}
106
107
108def get_system_dict_to_server_name(server_name: str, system_info_dict):
109    for system, server_list in system_info_dict.items():
110        if server_name in server_list:
111            system_info_dict_after = {
112                system: [server_name]
113            }
114            return system_info_dict_after
115    return {}
116
117
118def get_server_dict(command):
119    system_info_dict, services_component_dict, component_gcda_dict = get_all_part_service()
120    system_info_dict_after = {}
121    services_component_dict_after = {}
122    component_gcda_dict_after = {}
123    server_name = None
124    if " -ts " in command:
125        _, testsuite = command.split(" -ts ")
126        if testsuite in services_component_dict.get("dinput"):
127            services_component_dict_after = {
128                "dinput": [testsuite]
129            }
130            server_name = "dinput"
131        elif testsuite in services_component_dict.get("softbus_server"):
132            services_component_dict_after = {
133                "softbus_server": [testsuite]
134            }
135            server_name = "softbus_server"
136        if server_name:
137            system_info_dict_after = get_system_dict_to_server_name(server_name, system_info_dict)
138            component_gcda_dict_after = {
139                server_name: component_gcda_dict.get(server_name)
140            }
141    elif " -tp " in command:
142        component_name = command.split(" -tp ")[-1].split(" ")[0]
143        for server, component_list in services_component_dict.items():
144            if component_name in component_list:
145                if server in ["dinput", "softbus_server"]:
146                    break
147                services_component_dict_after = {
148                    server: [component_name]
149                }
150                server_name = server
151                break
152        if server_name:
153            system_info_dict_after = get_system_dict_to_server_name(server_name, system_info_dict)
154            component_gcda_dict_after = {
155                server_name: component_gcda_dict.get(server_name)
156            }
157    elif " -ss " in command:
158        system_name = command.split(" -ss ")[-1].split(" ")[0]
159        server_list = system_info_dict.get(system_name) if system_info_dict.get(system_name) else []
160        system_info_dict_after = {
161            system_name: server_list
162        }
163        for server_name in server_list:
164            services_component_dict_after.update({
165                server_name: services_component_dict.get(server_name)
166            })
167            component_gcda_dict_after.update({
168                server_name: component_gcda_dict.get(server_name)
169            })
170    return system_info_dict_after, services_component_dict_after, component_gcda_dict_after
171