1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import xml.etree.ElementTree as ET 21from dataclasses import dataclass 22 23from _core.exception import ParamError 24from _core.logger import platform_logger 25from _core.utils import get_local_ip 26 27__all__ = ["UserConfigManager"] 28LOG = platform_logger("ConfigManager") 29 30 31@dataclass 32class ConfigFileConst(object): 33 userconfig_filepath = "user_config.xml" 34 35 36class UserConfigManager(object): 37 def __init__(self, config_file="", env=""): 38 from xdevice import Variables 39 try: 40 if env: 41 self.config_content = ET.fromstring(env) 42 else: 43 if config_file: 44 self.file_path = config_file 45 else: 46 user_path = os.path.join(Variables.exec_dir, "config") 47 top_user_path = os.path.join(Variables.top_dir, "config") 48 config_path = os.path.join(Variables.res_dir, "config") 49 paths = [user_path, top_user_path, config_path] 50 51 for path in paths: 52 if os.path.exists(os.path.abspath(os.path.join( 53 path, ConfigFileConst.userconfig_filepath))): 54 self.file_path = os.path.abspath(os.path.join( 55 path, ConfigFileConst.userconfig_filepath)) 56 break 57 58 LOG.debug("User config path: %s" % self.file_path) 59 if os.path.exists(self.file_path): 60 tree = ET.parse(self.file_path) 61 self.config_content = tree.getroot() 62 else: 63 raise ParamError("%s not found" % self.file_path, 64 error_no="00115") 65 66 except SyntaxError as error: 67 if env: 68 raise ParamError( 69 "Parse environment parameter fail! Error: %s" % error.args, 70 error_no="00115") 71 else: 72 raise ParamError( 73 "Parse %s fail! Error: %s" % (self.file_path, error.args), 74 error_no="00115") 75 76 def get_user_config_list(self, tag_name): 77 data_dic = {} 78 for child in self.config_content: 79 if tag_name == child.tag: 80 for sub in child: 81 data_dic[sub.tag] = sub.text 82 return data_dic 83 84 @staticmethod 85 def remove_strip(value): 86 return value.strip() 87 88 @staticmethod 89 def _verify_duplicate(items): 90 if len(set(items)) != len(items): 91 LOG.warning("Find duplicate sn config, configuration incorrect") 92 return False 93 return True 94 95 def _handle_str(self, input_string): 96 config_list = map(self.remove_strip, input_string.split(';')) 97 config_list = [item for item in config_list if item] 98 if config_list: 99 if not self._verify_duplicate(config_list): 100 return [] 101 return config_list 102 103 def get_sn_list(self, input_string): 104 sn_select_list = [] 105 if input_string: 106 sn_select_list = self._handle_str(input_string) 107 return sn_select_list 108 109 def get_remote_config(self): 110 remote_dic = {} 111 data_dic = self.get_user_config_list("remote") 112 113 if "ip" in data_dic.keys() and "port" in data_dic.keys(): 114 remote_ip = data_dic.get("ip", "") 115 remote_port = data_dic.get("port", "") 116 else: 117 remote_ip = "" 118 remote_port = "" 119 120 if (not remote_ip) or (not remote_port): 121 remote_ip = "" 122 remote_port = "" 123 if remote_ip == get_local_ip(): 124 remote_ip = "127.0.0.1" 125 remote_dic["ip"] = remote_ip 126 remote_dic["port"] = remote_port 127 return remote_dic 128 129 def get_testcases_dir_config(self): 130 data_dic = self.get_user_config_list("testcases") 131 if "dir" in data_dic.keys(): 132 testcase_dir = data_dic.get("dir", "") 133 if testcase_dir is None: 134 testcase_dir = "" 135 else: 136 testcase_dir = "" 137 return testcase_dir 138 139 def get_user_config(self, target_name, filter_name=None): 140 data_dic = {} 141 all_nodes = self.config_content.findall(target_name) 142 if not all_nodes: 143 return data_dic 144 145 for node in all_nodes: 146 if filter_name: 147 if node.get('label') != filter_name: 148 continue 149 for sub in node: 150 data_dic[sub.tag] = sub.text if sub.text else "" 151 152 return data_dic 153 154 def get_node_attr(self, target_name, attr_name): 155 nodes = self.config_content.find(target_name) 156 if attr_name in nodes.attrib: 157 return nodes.attrib.get(attr_name) 158 159 def get_com_device(self, target_name): 160 devices = [] 161 162 for node in self.config_content.findall(target_name): 163 if node.attrib["type"] != "com": 164 continue 165 166 device = [node.attrib] 167 168 # get remote device 169 data_dic = {} 170 for sub in node: 171 if sub.text is not None and sub.tag != "serial": 172 data_dic[sub.tag] = sub.text 173 if data_dic: 174 if data_dic.get("ip", "") == get_local_ip(): 175 data_dic["ip"] = "127.0.0.1" 176 device.append(data_dic) 177 devices.append(device) 178 continue 179 180 # get local device 181 for serial in node.findall("serial"): 182 data_dic = {} 183 for sub in serial: 184 if sub.text is None: 185 data_dic[sub.tag] = "" 186 else: 187 data_dic[sub.tag] = sub.text 188 device.append(data_dic) 189 devices.append(device) 190 return devices 191 192 def get_device(self, target_name): 193 for node in self.config_content.findall(target_name): 194 data_dic = {} 195 if node.attrib["type"] != "usb-hdc" and \ 196 node.attrib["type"] != "usb-adb": 197 continue 198 data_dic["usb_type"] = node.attrib["type"] 199 for sub in node: 200 if sub.text is None: 201 data_dic[sub.tag] = "" 202 else: 203 data_dic[sub.tag] = sub.text 204 if data_dic.get("ip", "") == get_local_ip(): 205 data_dic["ip"] = "127.0.0.1" 206 return data_dic 207 return None 208 209 def get_testcases_dir(self): 210 from xdevice import Variables 211 testcases_dir = self.get_testcases_dir_config() 212 if testcases_dir: 213 if os.path.isabs(testcases_dir): 214 return testcases_dir 215 return os.path.abspath(os.path.join(Variables.exec_dir, 216 testcases_dir)) 217 218 return os.path.abspath(os.path.join(Variables.exec_dir, "testcases")) 219 220 def get_resource_path(self): 221 from xdevice import Variables 222 data_dic = self.get_user_config_list("resource") 223 if "dir" in data_dic.keys(): 224 resource_dir = data_dic.get("dir", "") 225 if resource_dir: 226 if os.path.isabs(resource_dir): 227 return resource_dir 228 return os.path.abspath( 229 os.path.join(Variables.exec_dir, resource_dir)) 230 231 return os.path.abspath( 232 os.path.join(Variables.exec_dir, "resource")) 233 234 def get_log_level(self): 235 data_dic = {} 236 node = self.config_content.find("loglevel") 237 if node is not None: 238 if node.find("console") is None and node.find("file") is None: 239 # neither loglevel/console nor loglevel/file exists 240 data_dic.update({"console": str(node.text).strip()}) 241 else: 242 for child in node: 243 data_dic.update({child.tag: str(child.text).strip()}) 244 return data_dic 245 246 def get_device_log_status(self): 247 node = self.config_content.find("devicelog") 248 if node is not None: 249 return str(node.text).strip() 250 return None 251 252