1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21from dataclasses import dataclass 22from xml.etree import ElementTree 23 24from _core.error import ErrorMessage 25from _core.exception import ParamError 26from _core.logger import platform_logger 27from _core.utils import get_local_ip 28from _core.constants import ConfigConst 29 30__all__ = ["UserConfigManager"] 31LOG = platform_logger("ConfigManager") 32 33 34def initialize(func): 35 36 def wrapper(self, *args, **kwargs): 37 name = "_" + func.__name__ 38 if not hasattr(self, name): 39 setattr(self, name, func(self, *args, **kwargs)) 40 return getattr(self, name) 41 42 return wrapper 43 44 45@dataclass 46class ConfigFileConst(object): 47 userconfig_filepath = "user_config.xml" 48 49 50class UserConfigManager(object): 51 def __init__(self, config_file="", env=""): 52 from xdevice import Variables 53 try: 54 if env: 55 self.config_content = ElementTree.fromstring(env) 56 else: 57 if config_file: 58 self.file_path = config_file 59 else: 60 user_path = os.path.join(Variables.exec_dir, "config") 61 top_user_path = os.path.join(Variables.top_dir, "config") 62 config_path = os.path.join(Variables.res_dir, "config") 63 paths = [user_path, top_user_path, config_path] 64 65 for path in paths: 66 if os.path.exists(os.path.abspath(os.path.join( 67 path, ConfigFileConst.userconfig_filepath))): 68 self.file_path = os.path.abspath(os.path.join( 69 path, ConfigFileConst.userconfig_filepath)) 70 break 71 72 LOG.debug("User config path: %s" % self.file_path) 73 if os.path.exists(self.file_path): 74 tree = ElementTree.parse(self.file_path) 75 self.config_content = tree.getroot() 76 else: 77 raise ParamError(ErrorMessage.UserConfig.Code_0103001) 78 79 except SyntaxError as error: 80 err = ErrorMessage.UserConfig.Code_0103003 if env else ErrorMessage.UserConfig.Code_0103002 81 err_msg = err.format(error) 82 raise ParamError(err_msg) from error 83 84 @property 85 @initialize 86 def environment(self): 87 envs = [] 88 ele_environment = self.config_content.find("environment") 89 if ele_environment is None: 90 return envs 91 dev_alias = {} 92 for device in ele_environment.findall("device"): 93 device_type = device.get("type", "").strip() 94 if not device_type or device_type == "com": 95 continue 96 # 设备管理器使用usb_type作为入参,故需将type转换为usb_type 97 data = {"usb_type": device_type, "label": ""} 98 data.update(device.attrib) 99 for info in device.findall("info"): 100 dev = {"ip": "", "port": "", "sn": "", "alias": ""} 101 dev.update(info.attrib) 102 dev.update(data) 103 104 # 去除空格,alias字符转大写 105 new_dev = {} 106 for k, v in dev.items(): 107 if k == "alias": 108 v = v.upper() 109 v = v.strip() 110 new_dev[k] = v 111 dev.update(new_dev) 112 113 # 不允许设备的别名相同 114 sn, alias = dev.get("sn"), dev.get("alias") 115 if alias: 116 for k, v in dev_alias.items(): 117 if alias == v: 118 raise ParamError(ErrorMessage.UserConfig.Code_0103004.format(sn, k)) 119 dev_alias.update({sn: alias}) 120 121 ip = dev.get("ip") 122 if not ip: 123 dev.update({"ip": "127.0.0.1"}) 124 envs.append(dev) 125 return envs 126 127 @property 128 @initialize 129 def testcases(self): 130 return self.get_element_cfg("testcases") 131 132 @property 133 @initialize 134 def resource(self): 135 return self.get_element_cfg("resource") 136 137 @property 138 @initialize 139 def devicelog(self): 140 """ 141 <devicelog> 142 <enable>ON</enable> 143 <clear>TRUE</clear> 144 <dir></dir> 145 <loglevel>INFO</loglevel> 146 <hdc>FALSE</hdc> 147 </devicelog> 148 """ 149 tag = "devicelog" 150 cfg = self.get_element_cfg(tag) 151 152 # 若是旧配置方式<devicelog>ON</devicelog>,转换为{"enable": "ON"} 153 enable = cfg.get(tag) 154 if enable is not None: 155 cfg.pop(tag) 156 cfg.update({ConfigConst.tag_enable: "ON" if enable.upper() == "ON" else "OFF"}) 157 158 # 从自定义参数中更新配置 159 if self.pass_through: 160 user_define = None 161 try: 162 user_define = json.loads(self.pass_through).get("user_define") 163 except ValueError: 164 pass 165 if user_define and isinstance(user_define, dict): 166 device_log = user_define.get(tag) 167 if device_log and isinstance(device_log, dict): 168 cfg.update(device_log) 169 170 # 默认配置参数 171 data = { 172 ConfigConst.tag_enable: "ON", 173 ConfigConst.tag_clear: "TRUE", 174 ConfigConst.tag_dir: "", 175 ConfigConst.tag_loglevel: "INFO", 176 ConfigConst.tag_hdc: "FALSE" 177 } 178 # 刷新默认配置参数 179 for key in data.keys(): 180 value = cfg.get(key) 181 if value: 182 data.update({key: value}) 183 return data 184 185 @property 186 @initialize 187 def loglevel(self): 188 data = self.get_element_cfg(ConfigConst.tag_loglevel) 189 level = data.get(ConfigConst.tag_loglevel) 190 if level is not None: 191 data.pop(ConfigConst.tag_loglevel) 192 level = level.upper() 193 level = level if level in ["DEBUG", "INFO"] else "" 194 data.update({"console": level or "INFO"}) 195 return data 196 197 @property 198 @initialize 199 def taskargs(self): 200 """ 201 <taskargs> 202 <agent_mode></agent_mode> 203 <pass_through></pass_through> 204 <repeat></repeat> 205 <screenshot>false</screenshot> 206 <screenrecorder>false</screenrecorder> 207 </taskargs> 208 """ 209 data = self.get_element_cfg("taskargs") 210 pass_through = data.get(ConfigConst.pass_through) 211 if pass_through: 212 user_define = None 213 try: 214 user_define = json.loads(pass_through).get("user_define") 215 except ValueError: 216 pass 217 if user_define: 218 self.update_task_args(data, user_define) 219 return data 220 221 @property 222 @initialize 223 def custom(self): 224 """ 225 <custom></custom> 226 """ 227 return self.get_element_cfg("custom") 228 229 @property 230 @initialize 231 def pass_through(self): 232 return self.taskargs.get(ConfigConst.pass_through) 233 234 def get_element_cfg(self, tag): 235 element = self.config_content.find(tag) 236 return {} if element is None else self.get_element_dict(element) 237 238 @staticmethod 239 def get_element_dict(element, is_top=True): 240 """ 241 element: ElementTree.Element, traversal element 242 is_top: bool, when the element has no child and if is the top, result as dict else as text 243 return : dict 244 """ 245 if not isinstance(element, ElementTree.Element): 246 raise TypeError("element must be instance of xml.etree.ElementTree.Element") 247 data = element.attrib 248 if len(element) == 0: 249 text = "" if element.text is None else element.text.strip() 250 if is_top: 251 data.update({element.tag: text}) 252 data = {k: v.strip() for k, v in data.items()} 253 return data 254 return text 255 256 for sub in element: 257 k, v = sub.tag, UserConfigManager.get_element_dict(sub, is_top=False) 258 # 同一层级存在多个同名tag,数据存为列表 259 if len(element.findall(k)) > 1: 260 value = data.get(k) if k in data else [] 261 value.append(v) 262 data.update({k: value}) 263 else: 264 data.update({k: v}) 265 return data 266 267 def get_wifi_config(self): 268 wifi = self.taskargs.get("wifi") 269 if wifi: 270 return wifi.split(",") 271 wifi = self.custom.get("wifi") 272 # 未配置 273 if wifi is None: 274 return [] 275 # 只配置了一个 276 if isinstance(wifi, dict): 277 wifi = [wifi] 278 data = [] 279 for info in wifi: 280 if not isinstance(info, dict): 281 continue 282 ssid = info.get("ssid", "") 283 password = info.get("password", "") 284 wifi_type = info.get("type", "") 285 if not ssid: 286 continue 287 if not wifi_type: 288 data.append("{}:{}".format(ssid, password)) 289 else: 290 data.append("{}:{}:{}".format(ssid, password, wifi_type)) 291 return data 292 293 def update_task_args(self, task_args=None, new_args=None): 294 if task_args is None: 295 task_args = self.taskargs 296 if not isinstance(new_args, dict): 297 return 298 # 目前在用的参数列表 299 known_test_args = [ 300 "agent_mode", ConfigConst.repeat, ConfigConst.pass_through, 301 "screenshot", "screenrecorder", ConfigConst.web_resource, "wifi", 302 "install_user0", "ui_adaptive", "kill_uitest" 303 ] 304 # 更新同名参数 305 keys = set(task_args.keys()) & set(new_args.keys()) | set(known_test_args) 306 for key in keys: 307 value = new_args.get(key) 308 if not value or task_args.get(key) == value: 309 continue 310 task_args.update({key: value}) 311 312 def get_user_config_list(self, tag_name): 313 data_dic = {} 314 for child in self.config_content: 315 if tag_name == child.tag: 316 for sub in child: 317 data_dic[sub.tag] = sub.text 318 return data_dic 319 320 @staticmethod 321 def remove_strip(value): 322 return value.strip() 323 324 @staticmethod 325 def _verify_duplicate(items): 326 if len(set(items)) != len(items): 327 LOG.warning("Find duplicate sn config, configuration incorrect") 328 return False 329 return True 330 331 def _handle_str(self, input_string): 332 config_list = map(self.remove_strip, input_string.split(';')) 333 config_list = [item for item in config_list if item] 334 if config_list: 335 if not self._verify_duplicate(config_list): 336 return [] 337 return config_list 338 339 def get_sn_list(self, input_string): 340 sn_select_list = [] 341 if input_string: 342 sn_select_list = self._handle_str(input_string) 343 return sn_select_list 344 345 def get_remote_config(self): 346 remote_dic = {} 347 data_dic = self.get_user_config_list("remote") 348 349 if "ip" in data_dic.keys() and "port" in data_dic.keys(): 350 remote_ip = data_dic.get("ip", "") 351 remote_port = data_dic.get("port", "") 352 else: 353 remote_ip = "" 354 remote_port = "" 355 356 if (not remote_ip) or (not remote_port): 357 remote_ip = "" 358 remote_port = "" 359 if remote_ip == get_local_ip(): 360 remote_ip = "127.0.0.1" 361 remote_dic["ip"] = remote_ip 362 remote_dic["port"] = remote_port 363 return remote_dic 364 365 def get_user_config(self, target_name, filter_name=None): 366 data_dic = {} 367 all_nodes = self.config_content.findall(target_name) 368 if not all_nodes: 369 return data_dic 370 371 for node in all_nodes: 372 if filter_name: 373 if node.get('label') != filter_name: 374 continue 375 for sub in node: 376 data_dic[sub.tag] = sub.text if sub.text else "" 377 378 return data_dic 379 380 def get_testcases_dir(self): 381 from xdevice import Variables 382 testcases_dir = self.testcases.get(ConfigConst.tag_dir) 383 if testcases_dir: 384 if os.path.isabs(testcases_dir): 385 return testcases_dir 386 return os.path.abspath( 387 os.path.join(Variables.exec_dir, testcases_dir)) 388 return os.path.abspath(os.path.join(Variables.exec_dir, "testcases")) 389 390 def get_resource_path(self): 391 from xdevice import Variables 392 resource_dir = self.resource.get(ConfigConst.tag_dir) 393 if resource_dir: 394 if os.path.isabs(resource_dir): 395 return resource_dir 396 return os.path.abspath( 397 os.path.join(Variables.exec_dir, resource_dir)) 398 return os.path.abspath(os.path.join(Variables.exec_dir, "resource")) 399 400 def environment_enable(self): 401 if self.config_content.find("environment") or \ 402 self.config_content.find("environment/device"): 403 return True 404 return False 405 406 @property 407 @initialize 408 def uploadtrack(self): 409 """ 410 Below configuring closes uploading track data. 411 <uploadtrack>FALSE</uploadtrack> 412 """ 413 tag = "uploadtrack" 414 cfg = self.get_element_cfg(tag) 415 return cfg 416