1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import xml.etree.ElementTree as ET 21from dataclasses import dataclass 22 23from _core.exception import ParamError 24from _core.logger import platform_logger 25from _core.utils import get_local_ip 26 27 28__all__ = ["UserConfigManager"] 29LOG = platform_logger("ConfigManager") 30 31 32@dataclass 33class ConfigFileConst(object): 34 userconfig_filepath = "user_config.xml" 35 36 37class UserConfigManager(object): 38 def __init__(self, config_file="", env=""): 39 from xdevice import Variables 40 try: 41 if env: 42 self.config_content = ET.fromstring(env) 43 else: 44 if config_file: 45 self.file_path = config_file 46 else: 47 user_path = os.path.join(Variables.exec_dir, "config") 48 top_user_path = os.path.join(Variables.top_dir, "config") 49 config_path = os.path.join(Variables.res_dir, "config") 50 paths = [user_path, top_user_path, config_path] 51 52 for path in paths: 53 if os.path.exists(os.path.abspath(os.path.join( 54 path, ConfigFileConst.userconfig_filepath))): 55 self.file_path = os.path.abspath(os.path.join( 56 path, ConfigFileConst.userconfig_filepath)) 57 break 58 59 LOG.debug("user config path: %s" % self.file_path) 60 if os.path.exists(self.file_path): 61 tree = ET.parse(self.file_path) 62 self.config_content = tree.getroot() 63 else: 64 raise ParamError("%s not found" % self.file_path, 65 error_no="00115") 66 67 except SyntaxError as error: 68 if env: 69 raise ParamError( 70 "Parse environment parameter fail! Error: %s" % error.args, 71 error_no="00115") 72 else: 73 raise ParamError( 74 "Parse %s fail! Error: %s" % (self.file_path, error.args), 75 error_no="00115") 76 77 def get_user_config_list(self, tag_name): 78 data_dic = {} 79 for child in self.config_content: 80 if tag_name == child.tag: 81 for sub in child: 82 data_dic[sub.tag] = sub.text 83 return data_dic 84 85 @staticmethod 86 def remove_strip(value): 87 return value.strip() 88 89 @staticmethod 90 def _verify_duplicate(items): 91 if len(set(items)) != len(items): 92 LOG.warning("find duplicate sn config, configuration incorrect") 93 return False 94 return True 95 96 def _handle_str(self, input_string): 97 config_list = map(self.remove_strip, input_string.split(';')) 98 config_list = [item for item in config_list if item] 99 if config_list: 100 if not self._verify_duplicate(config_list): 101 return [] 102 return config_list 103 104 def get_sn_list(self, input_string): 105 sn_select_list = [] 106 if input_string: 107 sn_select_list = self._handle_str(input_string) 108 return sn_select_list 109 110 def get_remote_config(self): 111 remote_dic = {} 112 data_dic = self.get_user_config_list("remote") 113 114 if "ip" in data_dic.keys() and "port" in data_dic.keys(): 115 remote_ip = data_dic.get("ip", "") 116 remote_port = data_dic.get("port", "") 117 else: 118 remote_ip = "" 119 remote_port = "" 120 121 if (not remote_ip) or (not remote_port): 122 remote_ip = "" 123 remote_port = "" 124 if remote_ip == get_local_ip(): 125 remote_ip = "127.0.0.1" 126 remote_dic["ip"] = remote_ip 127 remote_dic["port"] = remote_port 128 return remote_dic 129 130 def get_testcases_dir_config(self): 131 data_dic = self.get_user_config_list("testcases") 132 if "dir" in data_dic.keys(): 133 testcase_dir = data_dic.get("dir", "") 134 if testcase_dir is None: 135 testcase_dir = "" 136 else: 137 testcase_dir = "" 138 return testcase_dir 139 140 def get_user_config(self, target_name, filter_name=None): 141 data_dic = {} 142 all_nodes = self.config_content.findall(target_name) 143 if not all_nodes: 144 return data_dic 145 146 for node in all_nodes: 147 if filter_name: 148 if node.get('label') != filter_name: 149 continue 150 for sub in node: 151 data_dic[sub.tag] = sub.text if sub.text else "" 152 153 return data_dic 154 155 def get_node_attr(self, target_name, attr_name): 156 nodes = self.config_content.find(target_name) 157 if attr_name in nodes.attrib: 158 return nodes.attrib.get(attr_name) 159 160 def get_com_device(self, target_name): 161 devices = [] 162 163 for node in self.config_content.findall(target_name): 164 if node.attrib["type"] != "com" and node.attrib["type"] != "agent": 165 continue 166 167 device = [node.attrib] 168 169 # get remote device 170 data_dic = {} 171 for sub in node: 172 if sub.text is not None and sub.tag != "serial": 173 data_dic[sub.tag] = sub.text 174 if data_dic: 175 if data_dic.get("ip", "") == get_local_ip(): 176 data_dic["ip"] = "127.0.0.1" 177 device.append(data_dic) 178 devices.append(device) 179 continue 180 181 # get local device 182 for serial in node.findall("serial"): 183 data_dic = {} 184 for sub in serial: 185 if sub.text is None: 186 data_dic[sub.tag] = "" 187 else: 188 data_dic[sub.tag] = sub.text 189 device.append(data_dic) 190 devices.append(device) 191 return devices 192 193 def get_device(self, target_name): 194 for node in self.config_content.findall(target_name): 195 data_dic = {} 196 if node.attrib["type"] != "usb-hdc": 197 continue 198 data_dic["usb_type"] = node.attrib["type"] 199 for sub in node: 200 if sub.text is None: 201 data_dic[sub.tag] = "" 202 else: 203 data_dic[sub.tag] = sub.text 204 if data_dic.get("ip", "") == get_local_ip(): 205 data_dic["ip"] = "127.0.0.1" 206 return data_dic 207 return None 208 209 def get_testcases_dir(self): 210 from xdevice import Variables 211 testcases_dir = self.get_testcases_dir_config() 212 if testcases_dir: 213 if os.path.isabs(testcases_dir): 214 return testcases_dir 215 return os.path.abspath(os.path.join(Variables.exec_dir, 216 testcases_dir)) 217 218 return os.path.abspath(os.path.join(Variables.exec_dir, "testcases")) 219 220 def get_resource_path(self): 221 from xdevice import Variables 222 data_dic = self.get_user_config_list("resource") 223 if "dir" in data_dic.keys(): 224 resource_dir = data_dic.get("dir", "") 225 if resource_dir: 226 if os.path.isabs(resource_dir): 227 return resource_dir 228 return os.path.abspath( 229 os.path.join(Variables.exec_dir, resource_dir)) 230 231 return os.path.abspath( 232 os.path.join(Variables.exec_dir, "resource")) 233 234 def get_log_level(self): 235 data_dic = {} 236 node = self.config_content.find("loglevel") 237 if node is not None: 238 if node.find("console") is None and node.find("file") is None: 239 # neither loglevel/console nor loglevel/file exists 240 data_dic.update({"console": str(node.text).strip()}) 241 else: 242 for child in node: 243 data_dic.update({child.tag: str(child.text).strip()}) 244 return data_dic 245