1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import xml.etree.ElementTree as ET 21from dataclasses import dataclass 22 23from _core.exception import ParamError 24from _core.logger import platform_logger 25from _core.utils import get_local_ip 26from _core.constants import ConfigConst 27 28__all__ = ["UserConfigManager"] 29LOG = platform_logger("ConfigManager") 30 31 32@dataclass 33class ConfigFileConst(object): 34 userconfig_filepath = "user_config.xml" 35 36 37class UserConfigManager(object): 38 def __init__(self, config_file="", env=""): 39 from xdevice import Variables 40 try: 41 if env: 42 self.config_content = ET.fromstring(env) 43 else: 44 if config_file: 45 self.file_path = config_file 46 else: 47 user_path = os.path.join(Variables.exec_dir, "config") 48 top_user_path = os.path.join(Variables.top_dir, "config") 49 config_path = os.path.join(Variables.res_dir, "config") 50 paths = [user_path, top_user_path, config_path] 51 52 for path in paths: 53 if os.path.exists(os.path.abspath(os.path.join( 54 path, ConfigFileConst.userconfig_filepath))): 55 self.file_path = os.path.abspath(os.path.join( 56 path, ConfigFileConst.userconfig_filepath)) 57 break 58 59 LOG.debug("User config path: %s" % self.file_path) 60 if os.path.exists(self.file_path): 61 tree = ET.parse(self.file_path) 62 self.config_content = tree.getroot() 63 else: 64 raise ParamError("%s not found" % self.file_path, 65 error_no="00115") 66 67 except SyntaxError as error: 68 if env: 69 raise ParamError( 70 "Parse environment parameter fail! Error: %s" % error.args, 71 error_no="00115") 72 else: 73 raise ParamError( 74 "Parse %s fail! Error: %s" % (self.file_path, error.args), 75 error_no="00115") 76 77 def get_user_config_list(self, tag_name): 78 data_dic = {} 79 for child in self.config_content: 80 if tag_name == child.tag: 81 for sub in child: 82 data_dic[sub.tag] = sub.text 83 return data_dic 84 85 @staticmethod 86 def remove_strip(value): 87 return value.strip() 88 89 @staticmethod 90 def _verify_duplicate(items): 91 if len(set(items)) != len(items): 92 LOG.warning("Find duplicate sn config, configuration incorrect") 93 return False 94 return True 95 96 def _handle_str(self, input_string): 97 config_list = map(self.remove_strip, input_string.split(';')) 98 config_list = [item for item in config_list if item] 99 if config_list: 100 if not self._verify_duplicate(config_list): 101 return [] 102 return config_list 103 104 def get_sn_list(self, input_string): 105 sn_select_list = [] 106 if input_string: 107 sn_select_list = self._handle_str(input_string) 108 return sn_select_list 109 110 def get_remote_config(self): 111 remote_dic = {} 112 data_dic = self.get_user_config_list("remote") 113 114 if "ip" in data_dic.keys() and "port" in data_dic.keys(): 115 remote_ip = data_dic.get("ip", "") 116 remote_port = data_dic.get("port", "") 117 else: 118 remote_ip = "" 119 remote_port = "" 120 121 if (not remote_ip) or (not remote_port): 122 remote_ip = "" 123 remote_port = "" 124 if remote_ip == get_local_ip(): 125 remote_ip = "127.0.0.1" 126 remote_dic["ip"] = remote_ip 127 remote_dic["port"] = remote_port 128 return remote_dic 129 130 def get_testcases_dir_config(self): 131 data_dic = self.get_user_config_list("testcases") 132 if "dir" in data_dic.keys(): 133 testcase_dir = data_dic.get("dir", "") 134 if testcase_dir is None: 135 testcase_dir = "" 136 else: 137 testcase_dir = "" 138 return testcase_dir 139 140 def get_user_config(self, target_name, filter_name=None): 141 data_dic = {} 142 all_nodes = self.config_content.findall(target_name) 143 if not all_nodes: 144 return data_dic 145 146 for node in all_nodes: 147 if filter_name: 148 if node.get('label') != filter_name: 149 continue 150 for sub in node: 151 data_dic[sub.tag] = sub.text if sub.text else "" 152 153 return data_dic 154 155 def get_node_attr(self, target_name, attr_name): 156 nodes = self.config_content.find(target_name) 157 if attr_name in nodes.attrib: 158 return nodes.attrib.get(attr_name) 159 return None 160 161 def get_com_device(self, target_name): 162 devices = [] 163 164 for node in self.config_content.findall(target_name): 165 if node.attrib["type"] != "com": 166 continue 167 168 device = [node.attrib] 169 170 # get remote device 171 data_dic = {} 172 for sub in node: 173 if sub.text is not None and sub.tag != "serial": 174 data_dic[sub.tag] = sub.text 175 if data_dic: 176 if data_dic.get("ip", "") == get_local_ip(): 177 data_dic["ip"] = "127.0.0.1" 178 device.append(data_dic) 179 devices.append(device) 180 continue 181 182 # get local device 183 for serial in node.findall("serial"): 184 data_dic = {} 185 for sub in serial: 186 if sub.text is None: 187 data_dic[sub.tag] = "" 188 else: 189 data_dic[sub.tag] = sub.text 190 device.append(data_dic) 191 devices.append(device) 192 return devices 193 194 def get_device(self, target_name): 195 for node in self.config_content.findall(target_name): 196 data_dic = {} 197 if node.attrib["type"] != "usb-hdc" and \ 198 node.attrib["type"] != "usb-adb": 199 continue 200 data_dic["usb_type"] = node.attrib["type"] 201 for sub in node: 202 if sub.text is None: 203 data_dic[sub.tag] = "" 204 else: 205 data_dic[sub.tag] = sub.text 206 if data_dic.get("ip", "") == get_local_ip(): 207 data_dic["ip"] = "127.0.0.1" 208 return data_dic 209 return None 210 211 def get_testcases_dir(self): 212 from xdevice import Variables 213 testcases_dir = self.get_testcases_dir_config() 214 if testcases_dir: 215 if os.path.isabs(testcases_dir): 216 return testcases_dir 217 return os.path.abspath(os.path.join(Variables.exec_dir, 218 testcases_dir)) 219 220 return os.path.abspath(os.path.join(Variables.exec_dir, "testcases")) 221 222 def get_resource_path(self): 223 from xdevice import Variables 224 data_dic = self.get_user_config_list("resource") 225 if "dir" in data_dic.keys(): 226 resource_dir = data_dic.get("dir", "") 227 if resource_dir: 228 if os.path.isabs(resource_dir): 229 return resource_dir 230 return os.path.abspath( 231 os.path.join(Variables.exec_dir, resource_dir)) 232 233 return os.path.abspath( 234 os.path.join(Variables.exec_dir, "resource")) 235 236 def get_log_level(self): 237 data_dic = {} 238 node = self.config_content.find("loglevel") 239 if node is not None: 240 if node.find("console") is None and node.find("file") is None: 241 # neither loglevel/console nor loglevel/file exists 242 data_dic.update({"console": str(node.text).strip()}) 243 else: 244 for child in node: 245 data_dic.update({child.tag: str(child.text).strip()}) 246 return data_dic 247 248 def get_device_log_status(self): 249 data_dic = {} 250 node = self.config_content.find("devicelog") 251 if node is not None: 252 if node.find(ConfigConst.tag_enable) is not None \ 253 or node.find(ConfigConst.tag_dir) is not None: 254 for child in node: 255 data_dic.update({child.tag: str(child.text).strip()}) 256 else: 257 data_dic.update({ConfigConst.tag_enable: str(node.text).strip()}) 258 data_dic.update({ConfigConst.tag_dir: None}) 259 data_dic.update({ConfigConst.tag_loglevel: "INFO"}) 260 data_dic.update({ConfigConst.tag_clear: "TRUE"}) 261 return data_dic 262 263 def environment_enable(self): 264 if self.config_content.find("environment") or\ 265 self.config_content.find("environment/device"): 266 return True 267 return False 268