1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20from dataclasses import dataclass 21from xml.etree import ElementTree 22 23from _core.exception import ParamError 24from _core.logger import platform_logger 25from _core.utils import get_local_ip 26from _core.constants import ConfigConst 27 28__all__ = ["UserConfigManager"] 29LOG = platform_logger("ConfigManager") 30 31 32@dataclass 33class ConfigFileConst(object): 34 userconfig_filepath = "user_config.xml" 35 36 37class UserConfigManager(object): 38 def __init__(self, config_file="", env=""): 39 from xdevice import Variables 40 try: 41 if env: 42 self.config_content = ElementTree.fromstring(env) 43 else: 44 if config_file: 45 self.file_path = config_file 46 else: 47 user_path = os.path.join(Variables.exec_dir, "config") 48 top_user_path = os.path.join(Variables.top_dir, "config") 49 config_path = os.path.join(Variables.res_dir, "config") 50 paths = [user_path, top_user_path, config_path] 51 52 for path in paths: 53 if os.path.exists(os.path.abspath(os.path.join( 54 path, ConfigFileConst.userconfig_filepath))): 55 self.file_path = os.path.abspath(os.path.join( 56 path, ConfigFileConst.userconfig_filepath)) 57 break 58 59 LOG.debug("User config path: %s" % self.file_path) 60 if os.path.exists(self.file_path): 61 tree = ElementTree.parse(self.file_path) 62 self.config_content = tree.getroot() 63 else: 64 raise ParamError("%s not found" % self.file_path, 65 error_no="00115") 66 67 except SyntaxError as error: 68 if env: 69 raise ParamError( 70 "Parse environment parameter fail! Error: %s" % error.args, 71 error_no="00115") 72 else: 73 raise ParamError( 74 "Parse %s fail! Error: %s" % (self.file_path, error.args), 75 error_no="00115") 76 77 def get_user_config_list(self, tag_name): 78 data_dic = {} 79 for child in self.config_content: 80 if tag_name == child.tag: 81 for sub in child: 82 data_dic[sub.tag] = sub.text 83 return data_dic 84 85 @staticmethod 86 def _traversal_element(element, is_top=True): 87 """ 88 element: ElementTree.Element, traversal element 89 is_top : bool, if is the top element 90 return : dict 91 """ 92 if not isinstance(element, ElementTree.Element): 93 raise TypeError("element must be instance of xml.tree.ElementTree.Element") 94 data = {} 95 if len(element) == 0: 96 return {element.tag: element.text} if is_top else element.text 97 for sub in element: 98 data.setdefault(sub.tag, UserConfigManager._traversal_element(sub, is_top=False)) 99 return data 100 101 @staticmethod 102 def remove_strip(value): 103 return value.strip() 104 105 @staticmethod 106 def _verify_duplicate(items): 107 if len(set(items)) != len(items): 108 LOG.warning("Find duplicate sn config, configuration incorrect") 109 return False 110 return True 111 112 def _handle_str(self, input_string): 113 config_list = map(self.remove_strip, input_string.split(';')) 114 config_list = [item for item in config_list if item] 115 if config_list: 116 if not self._verify_duplicate(config_list): 117 return [] 118 return config_list 119 120 def get_sn_list(self, input_string): 121 sn_select_list = [] 122 if input_string: 123 sn_select_list = self._handle_str(input_string) 124 return sn_select_list 125 126 def get_remote_config(self): 127 remote_dic = {} 128 data_dic = self.get_user_config_list("remote") 129 130 if "ip" in data_dic.keys() and "port" in data_dic.keys(): 131 remote_ip = data_dic.get("ip", "") 132 remote_port = data_dic.get("port", "") 133 else: 134 remote_ip = "" 135 remote_port = "" 136 137 if (not remote_ip) or (not remote_port): 138 remote_ip = "" 139 remote_port = "" 140 if remote_ip == get_local_ip(): 141 remote_ip = "127.0.0.1" 142 remote_dic["ip"] = remote_ip 143 remote_dic["port"] = remote_port 144 return remote_dic 145 146 def get_testcases_dir_config(self): 147 data_dic = self.get_user_config_list("testcases") 148 if "dir" in data_dic.keys(): 149 testcase_dir = data_dic.get("dir", "") 150 if testcase_dir is None: 151 testcase_dir = "" 152 else: 153 testcase_dir = "" 154 return testcase_dir 155 156 def get_user_config(self, target_name, filter_name=None): 157 data_dic = {} 158 all_nodes = self.config_content.findall(target_name) 159 if not all_nodes: 160 return data_dic 161 162 for node in all_nodes: 163 if filter_name: 164 if node.get('label') != filter_name: 165 continue 166 for sub in node: 167 data_dic[sub.tag] = sub.text if sub.text else "" 168 169 return data_dic 170 171 def get_node_attr(self, target_name, attr_name): 172 nodes = self.config_content.find(target_name) 173 if attr_name in nodes.attrib: 174 return nodes.attrib.get(attr_name) 175 return None 176 177 def get_com_device(self, target_name): 178 devices = [] 179 180 for node in self.config_content.findall(target_name): 181 if node.attrib["type"] != "com": 182 continue 183 184 device = [node.attrib] 185 186 # get remote device 187 data_dic = {} 188 for sub in node: 189 if sub.text is not None and sub.tag != "serial": 190 data_dic[sub.tag] = sub.text 191 if data_dic: 192 if data_dic.get("ip", "") == get_local_ip(): 193 data_dic["ip"] = "127.0.0.1" 194 device.append(data_dic) 195 devices.append(device) 196 continue 197 198 # get local device 199 for serial in node.findall("serial"): 200 data_dic = {} 201 for sub in serial: 202 if sub.text is None: 203 data_dic[sub.tag] = "" 204 else: 205 data_dic[sub.tag] = sub.text 206 device.append(data_dic) 207 devices.append(device) 208 return devices 209 210 def get_device(self, target_name): 211 for node in self.config_content.findall(target_name): 212 data_dic = {} 213 if node.attrib["type"] != "usb-hdc" and \ 214 node.attrib["type"] != "usb-adb": 215 continue 216 data_dic["usb_type"] = node.attrib["type"] 217 for sub in node: 218 if sub.text is None: 219 data_dic[sub.tag] = "" 220 else: 221 data_dic[sub.tag] = sub.text 222 if data_dic.get("ip", "") == get_local_ip(): 223 data_dic["ip"] = "127.0.0.1" 224 return data_dic 225 return None 226 227 def get_testcases_dir(self): 228 from xdevice import Variables 229 testcases_dir = self.get_testcases_dir_config() 230 if testcases_dir: 231 if os.path.isabs(testcases_dir): 232 return testcases_dir 233 return os.path.abspath(os.path.join(Variables.exec_dir, 234 testcases_dir)) 235 236 return os.path.abspath(os.path.join(Variables.exec_dir, "testcases")) 237 238 def get_resource_path(self): 239 from xdevice import Variables 240 data_dic = self.get_user_config_list("resource") 241 if "dir" in data_dic.keys(): 242 resource_dir = data_dic.get("dir", "") 243 if resource_dir: 244 if os.path.isabs(resource_dir): 245 return resource_dir 246 return os.path.abspath( 247 os.path.join(Variables.exec_dir, resource_dir)) 248 249 return os.path.abspath( 250 os.path.join(Variables.exec_dir, "resource")) 251 252 def get_resource_conf(self): 253 resource = self.config_content.find("resource") 254 if resource is None: 255 return {} 256 return self._traversal_element(resource) 257 258 def get_log_level(self): 259 data_dic = {} 260 node = self.config_content.find("loglevel") 261 if node is not None: 262 if node.find("console") is None and node.find("file") is None: 263 # neither loglevel/console nor loglevel/file exists 264 data_dic.update({"console": str(node.text).strip()}) 265 else: 266 for child in node: 267 data_dic.update({child.tag: str(child.text).strip()}) 268 return data_dic 269 270 def get_device_log_status(self): 271 data_dic = {} 272 node = self.config_content.find("devicelog") 273 if node is not None: 274 if node.find(ConfigConst.tag_enable) is not None \ 275 or node.find(ConfigConst.tag_dir) is not None: 276 for child in node: 277 data_dic.update({child.tag: str(child.text).strip()}) 278 else: 279 data_dic.update({ConfigConst.tag_enable: str(node.text).strip()}) 280 data_dic.update({ConfigConst.tag_dir: None}) 281 data_dic.update({ConfigConst.tag_loglevel: "INFO"}) 282 data_dic.update({ConfigConst.tag_clear: "TRUE"}) 283 return data_dic 284 285 def environment_enable(self): 286 if self.config_content.find("environment") or\ 287 self.config_content.find("environment/device"): 288 return True 289 return False 290