1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21import sys 22from dataclasses import dataclass 23from xml.etree import ElementTree 24 25from _core.error import ErrorMessage 26from _core.exception import ParamError 27from _core.logger import platform_logger 28from _core.utils import get_local_ip 29from _core.constants import ConfigConst 30from _core.constants import Cluster 31 32__all__ = ["UserConfigManager"] 33LOG = platform_logger("ConfigManager") 34 35 36def initialize(func): 37 38 def wrapper(self, *args, **kwargs): 39 name = "_" + func.__name__ 40 if not hasattr(self, name): 41 setattr(self, name, func(self, *args, **kwargs)) 42 return getattr(self, name) 43 44 return wrapper 45 46 47@dataclass 48class ConfigFileConst(object): 49 userconfig_filepath = "user_config.xml" 50 51 52class UserConfigManager(object): 53 def __init__(self, config_file="", env=""): 54 from xdevice import Variables 55 try: 56 if env: 57 self.config_content = ElementTree.fromstring(env) 58 else: 59 if config_file: 60 self.file_path = config_file 61 else: 62 user_path = os.path.join(Variables.exec_dir, "config") 63 top_user_path = os.path.join(Variables.top_dir, "config") 64 config_path = os.path.join(Variables.res_dir, "config") 65 paths = [user_path, top_user_path, config_path] 66 67 for path in paths: 68 if os.path.exists(os.path.abspath(os.path.join( 69 path, ConfigFileConst.userconfig_filepath))): 70 self.file_path = os.path.abspath(os.path.join( 71 path, ConfigFileConst.userconfig_filepath)) 72 break 73 74 LOG.debug("User config path: %s" % self.file_path) 75 if os.path.exists(self.file_path): 76 tree = ElementTree.parse(self.file_path) 77 self.config_content = tree.getroot() 78 else: 79 raise ParamError(ErrorMessage.UserConfig.Code_0103001) 80 81 except SyntaxError as error: 82 err = ErrorMessage.UserConfig.Code_0103003 if env else ErrorMessage.UserConfig.Code_0103002 83 err_msg = err.format(error) 84 raise ParamError(err_msg) from error 85 86 @property 87 @initialize 88 def environment(self): 89 envs = [] 90 ele_environment = self.config_content.find("environment") 91 if ele_environment is None: 92 return envs 93 dev_alias = {} 94 for device in ele_environment.findall("device"): 95 device_type = device.get("type", "").strip() 96 if not device_type or device_type == "com": 97 continue 98 # 设备管理器使用usb_type作为入参,故需将type转换为usb_type 99 data = {"usb_type": device_type, "label": ""} 100 data.update(device.attrib) 101 for info in device.findall("info"): 102 dev = {"ip": "", "port": "", "sn": "", "alias": ""} 103 dev.update(info.attrib) 104 dev.update(data) 105 106 # 去除空格,alias字符转大写 107 new_dev = {} 108 for k, v in dev.items(): 109 if k == "alias": 110 v = v.upper() 111 v = v.strip() 112 new_dev[k] = v 113 dev.update(new_dev) 114 115 # 不允许设备的别名相同 116 sn, alias = dev.get("sn"), dev.get("alias") 117 if alias: 118 for k, v in dev_alias.items(): 119 if alias == v: 120 raise ParamError(ErrorMessage.UserConfig.Code_0103004.format(sn, k)) 121 dev_alias.update({sn: alias}) 122 123 ip = dev.get("ip") 124 if not ip: 125 dev.update({"ip": "127.0.0.1"}) 126 envs.append(dev) 127 return envs 128 129 @property 130 @initialize 131 def testcases(self): 132 return self.get_element_cfg("testcases") 133 134 @property 135 @initialize 136 def resource(self): 137 return self.get_element_cfg("resource") 138 139 @property 140 @initialize 141 def devicelog(self): 142 """ 143 <devicelog> 144 <enable>ON</enable> 145 <clear>TRUE</clear> 146 <dir></dir> 147 <loglevel>INFO</loglevel> 148 <hdc>FALSE</hdc> 149 </devicelog> 150 """ 151 tag = "devicelog" 152 cfg = self.get_element_cfg(tag) 153 154 # 若是旧配置方式<devicelog>ON</devicelog>,转换为{"enable": "ON"} 155 enable = cfg.get(tag) 156 if enable is not None: 157 cfg.pop(tag) 158 cfg.update({ConfigConst.tag_enable: "ON" if enable.upper() == "ON" else "OFF"}) 159 160 # 从自定义参数中更新配置 161 if self.pass_through: 162 user_define = None 163 try: 164 user_define = json.loads(self.pass_through).get("user_define") 165 except ValueError: 166 pass 167 if user_define and isinstance(user_define, dict): 168 device_log = user_define.get(tag) 169 if device_log and isinstance(device_log, dict): 170 cfg.update(device_log) 171 172 # 默认配置参数 173 data = { 174 ConfigConst.tag_enable: "ON", 175 ConfigConst.tag_clear: "TRUE", 176 ConfigConst.tag_dir: "", 177 ConfigConst.tag_loglevel: "INFO", 178 ConfigConst.tag_hdc: "FALSE" 179 } 180 # 刷新默认配置参数 181 for key in data.keys(): 182 value = cfg.get(key) 183 if value: 184 data.update({key: value}) 185 return data 186 187 @property 188 @initialize 189 def loglevel(self): 190 data = self.get_element_cfg(ConfigConst.tag_loglevel) 191 level = data.get(ConfigConst.tag_loglevel) 192 if level is not None: 193 data.pop(ConfigConst.tag_loglevel) 194 level = level.upper() 195 level = level if level in ["DEBUG", "INFO"] else "" 196 data.update({"console": level or "INFO"}) 197 return data 198 199 @property 200 @initialize 201 def taskargs(self): 202 """ 203 <taskargs> 204 <agent_mode></agent_mode> 205 <pass_through></pass_through> 206 <repeat></repeat> 207 <screenshot>false</screenshot> 208 <screenrecorder>false</screenrecorder> 209 </taskargs> 210 """ 211 data = self.get_element_cfg("taskargs") 212 pass_through = data.get(ConfigConst.pass_through) 213 if pass_through: 214 user_define = None 215 try: 216 user_define = json.loads(pass_through).get("user_define") 217 except ValueError: 218 pass 219 if user_define: 220 self.update_task_args(data, user_define) 221 return data 222 223 @property 224 @initialize 225 def custom(self): 226 """ 227 <custom></custom> 228 """ 229 return self.get_element_cfg("custom") 230 231 @property 232 @initialize 233 def cluster(self): 234 """ 235 <cluster> 236 <enable>true/false</enable> 237 <service_mode>controller/worker</service_mode> 238 <service_port>8000</service_port> 239 <control_service_url>http://127.0.0.1:8000</control_service_url> 240 </cluster> 241 """ 242 cfg = self.get_element_cfg(ConfigConst.cluster) 243 enable = cfg.get(ConfigConst.tag_enable) or "false" 244 service_mode = cfg.get(ConfigConst.service_mode) or Cluster.controller 245 service_port = cfg.get(ConfigConst.service_port) or Cluster.service_port 246 cfg.update({ 247 ConfigConst.tag_enable: enable.lower(), 248 ConfigConst.service_mode: service_mode.lower(), 249 ConfigConst.service_port: service_port.lower() 250 }) 251 return cfg 252 253 def enable_cluster(self): 254 enable = self.cluster.get(ConfigConst.tag_enable) == "true" 255 if enable: 256 if sys.version_info < (3, 10, 0): 257 raise Exception(ErrorMessage.Cluster.Code_0104026) 258 return enable 259 260 @property 261 @initialize 262 def pass_through(self): 263 return self.taskargs.get(ConfigConst.pass_through) 264 265 def get_element_cfg(self, tag): 266 element = self.config_content.find(tag) 267 return {} if element is None else self.get_element_dict(element) 268 269 @staticmethod 270 def get_element_dict(element, is_top=True): 271 """ 272 element: ElementTree.Element, traversal element 273 is_top: bool, when the element has no child and if is the top, result as dict else as text 274 return : dict 275 """ 276 if not isinstance(element, ElementTree.Element): 277 raise TypeError("element must be instance of xml.etree.ElementTree.Element") 278 data = element.attrib 279 if len(element) == 0: 280 text = "" if element.text is None else element.text.strip() 281 if is_top: 282 data.update({element.tag: text}) 283 data = {k: v.strip() for k, v in data.items()} 284 return data 285 return text 286 287 for sub in element: 288 k, v = sub.tag, UserConfigManager.get_element_dict(sub, is_top=False) 289 # 同一层级存在多个同名tag,数据存为列表 290 if len(element.findall(k)) > 1: 291 value = data.get(k) if k in data else [] 292 value.append(v) 293 data.update({k: value}) 294 else: 295 data.update({k: v}) 296 return data 297 298 def get_wifi_config(self): 299 wifi = self.taskargs.get("wifi") 300 if wifi: 301 return wifi.split(",") 302 wifi = self.custom.get("wifi") 303 # 未配置 304 if wifi is None: 305 return [] 306 # 只配置了一个 307 if isinstance(wifi, dict): 308 wifi = [wifi] 309 data = [] 310 for info in wifi: 311 if not isinstance(info, dict): 312 continue 313 ssid = info.get("ssid", "") 314 password = info.get("password", "") 315 wifi_type = info.get("type", "") 316 if not ssid: 317 continue 318 if not wifi_type: 319 data.append("{}:{}".format(ssid, password)) 320 else: 321 data.append("{}:{}:{}".format(ssid, password, wifi_type)) 322 return data 323 324 def update_task_args(self, task_args=None, new_args=None): 325 if task_args is None: 326 task_args = self.taskargs 327 if not isinstance(new_args, dict): 328 return 329 # 目前在用的参数列表 330 known_test_args = [ 331 "agent_mode", ConfigConst.repeat, ConfigConst.pass_through, 332 "screenshot", "screenrecorder", ConfigConst.web_resource, "wifi", 333 "install_user0", "ui_adaptive", "kill_uitest" 334 ] 335 # 更新同名参数 336 keys = set(task_args.keys()) & set(new_args.keys()) | set(known_test_args) 337 for key in keys: 338 value = new_args.get(key) 339 if not value or task_args.get(key) == value: 340 continue 341 task_args.update({key: value}) 342 343 def get_user_config_list(self, tag_name): 344 data_dic = {} 345 for child in self.config_content: 346 if tag_name == child.tag: 347 for sub in child: 348 data_dic[sub.tag] = sub.text 349 return data_dic 350 351 @staticmethod 352 def remove_strip(value): 353 return value.strip() 354 355 @staticmethod 356 def _verify_duplicate(items): 357 if len(set(items)) != len(items): 358 LOG.warning("Find duplicate sn config, configuration incorrect") 359 return False 360 return True 361 362 def _handle_str(self, input_string): 363 config_list = map(self.remove_strip, input_string.split(';')) 364 config_list = [item for item in config_list if item] 365 if config_list: 366 if not self._verify_duplicate(config_list): 367 return [] 368 return config_list 369 370 def get_sn_list(self, input_string): 371 sn_select_list = [] 372 if input_string: 373 sn_select_list = self._handle_str(input_string) 374 return sn_select_list 375 376 def get_remote_config(self): 377 remote_dic = {} 378 data_dic = self.get_user_config_list("remote") 379 380 if "ip" in data_dic.keys() and "port" in data_dic.keys(): 381 remote_ip = data_dic.get("ip", "") 382 remote_port = data_dic.get("port", "") 383 else: 384 remote_ip = "" 385 remote_port = "" 386 387 if (not remote_ip) or (not remote_port): 388 remote_ip = "" 389 remote_port = "" 390 if remote_ip == get_local_ip(): 391 remote_ip = "127.0.0.1" 392 remote_dic["ip"] = remote_ip 393 remote_dic["port"] = remote_port 394 return remote_dic 395 396 def get_user_config(self, target_name, filter_name=None): 397 data_dic = {} 398 all_nodes = self.config_content.findall(target_name) 399 if not all_nodes: 400 return data_dic 401 402 for node in all_nodes: 403 if filter_name: 404 if node.get('label') != filter_name: 405 continue 406 for sub in node: 407 data_dic[sub.tag] = sub.text if sub.text else "" 408 409 return data_dic 410 411 def get_testcases_dir(self): 412 from xdevice import Variables 413 testcases_dir = self.testcases.get(ConfigConst.tag_dir) 414 if testcases_dir: 415 if os.path.isabs(testcases_dir): 416 return testcases_dir 417 return os.path.abspath( 418 os.path.join(Variables.exec_dir, testcases_dir)) 419 return os.path.abspath(os.path.join(Variables.exec_dir, "testcases")) 420 421 def get_resource_path(self): 422 from xdevice import Variables 423 resource_dir = self.resource.get(ConfigConst.tag_dir) 424 if resource_dir: 425 if os.path.isabs(resource_dir): 426 return resource_dir 427 return os.path.abspath( 428 os.path.join(Variables.exec_dir, resource_dir)) 429 return os.path.abspath(os.path.join(Variables.exec_dir, "resource")) 430 431 def environment_enable(self): 432 if self.config_content.find("environment") or \ 433 self.config_content.find("environment/device"): 434 return True 435 return False 436 437 @property 438 @initialize 439 def uploadtrack(self): 440 """ 441 Below configuring closes uploading track data. 442 <uploadtrack>FALSE</uploadtrack> 443 """ 444 tag = "uploadtrack" 445 cfg = self.get_element_cfg(tag) 446 return cfg 447