• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import json
20import os
21from dataclasses import dataclass
22from xml.etree import ElementTree
23
24from _core.error import ErrorMessage
25from _core.exception import ParamError
26from _core.logger import platform_logger
27from _core.utils import get_local_ip
28from _core.constants import ConfigConst
29
30__all__ = ["UserConfigManager"]
31LOG = platform_logger("ConfigManager")
32
33
34def initialize(func):
35
36    def wrapper(self, *args, **kwargs):
37        name = "_" + func.__name__
38        if not hasattr(self, name):
39            setattr(self, name, func(self, *args, **kwargs))
40        return getattr(self, name)
41
42    return wrapper
43
44
45@dataclass
46class ConfigFileConst(object):
47    userconfig_filepath = "user_config.xml"
48
49
50class UserConfigManager(object):
51    def __init__(self, config_file="", env=""):
52        from xdevice import Variables
53        try:
54            if env:
55                self.config_content = ElementTree.fromstring(env)
56            else:
57                if config_file:
58                    self.file_path = config_file
59                else:
60                    user_path = os.path.join(Variables.exec_dir, "config")
61                    top_user_path = os.path.join(Variables.top_dir, "config")
62                    config_path = os.path.join(Variables.res_dir, "config")
63                    paths = [user_path, top_user_path, config_path]
64
65                    for path in paths:
66                        if os.path.exists(os.path.abspath(os.path.join(
67                                path, ConfigFileConst.userconfig_filepath))):
68                            self.file_path = os.path.abspath(os.path.join(
69                                path, ConfigFileConst.userconfig_filepath))
70                            break
71
72                LOG.debug("User config path: %s" % self.file_path)
73                if os.path.exists(self.file_path):
74                    tree = ElementTree.parse(self.file_path)
75                    self.config_content = tree.getroot()
76                else:
77                    raise ParamError(ErrorMessage.UserConfig.Code_0103001)
78
79        except SyntaxError as error:
80            err = ErrorMessage.UserConfig.Code_0103003 if env else ErrorMessage.UserConfig.Code_0103002
81            err_msg = err.format(error)
82            raise ParamError(err_msg) from error
83
84    @property
85    @initialize
86    def environment(self):
87        envs = []
88        ele_environment = self.config_content.find("environment")
89        if ele_environment is None:
90            return envs
91        dev_alias = {}
92        for device in ele_environment.findall("device"):
93            device_type = device.get("type", "").strip()
94            if not device_type or device_type == "com":
95                continue
96            # 设备管理器使用usb_type作为入参,故需将type转换为usb_type
97            data = {"usb_type": device_type, "label": ""}
98            data.update(device.attrib)
99            for info in device.findall("info"):
100                dev = {"ip": "", "port": "", "sn": "", "alias": ""}
101                dev.update(info.attrib)
102                dev.update(data)
103
104                # 去除空格,alias字符转大写
105                new_dev = {}
106                for k, v in dev.items():
107                    if k == "alias":
108                        v = v.upper()
109                    v = v.strip()
110                    new_dev[k] = v
111                dev.update(new_dev)
112
113                # 不允许设备的别名相同
114                sn, alias = dev.get("sn"), dev.get("alias")
115                if alias:
116                    for k, v in dev_alias.items():
117                        if alias == v:
118                            raise ParamError(ErrorMessage.UserConfig.Code_0103004.format(sn, k))
119                dev_alias.update({sn: alias})
120
121                ip = dev.get("ip")
122                if not ip:
123                    dev.update({"ip": "127.0.0.1"})
124                envs.append(dev)
125        return envs
126
127    @property
128    @initialize
129    def testcases(self):
130        return self.get_element_cfg("testcases")
131
132    @property
133    @initialize
134    def resource(self):
135        return self.get_element_cfg("resource")
136
137    @property
138    @initialize
139    def devicelog(self):
140        """
141        <devicelog>
142            <enable>ON</enable>
143            <clear>TRUE</clear>
144            <dir></dir>
145            <loglevel>INFO</loglevel>
146            <hdc>FALSE</hdc>
147        </devicelog>
148        """
149        tag = "devicelog"
150        cfg = self.get_element_cfg(tag)
151
152        # 若是旧配置方式<devicelog>ON</devicelog>,转换为{"enable": "ON"}
153        enable = cfg.get(tag)
154        if enable is not None:
155            cfg.pop(tag)
156            cfg.update({ConfigConst.tag_enable: "ON" if enable.upper() == "ON" else "OFF"})
157
158        # 从自定义参数中更新配置
159        if self.pass_through:
160            user_define = None
161            try:
162                user_define = json.loads(self.pass_through).get("user_define")
163            except ValueError:
164                pass
165            if user_define and isinstance(user_define, dict):
166                device_log = user_define.get(tag)
167                if device_log and isinstance(device_log, dict):
168                    cfg.update(device_log)
169
170        # 默认配置参数
171        data = {
172            ConfigConst.tag_enable: "ON",
173            ConfigConst.tag_clear: "TRUE",
174            ConfigConst.tag_dir: "",
175            ConfigConst.tag_loglevel: "INFO",
176            ConfigConst.tag_hdc: "FALSE"
177        }
178        # 刷新默认配置参数
179        for key in data.keys():
180            value = cfg.get(key)
181            if value:
182                data.update({key: value})
183        return data
184
185    @property
186    @initialize
187    def loglevel(self):
188        data = self.get_element_cfg(ConfigConst.tag_loglevel)
189        level = data.get(ConfigConst.tag_loglevel)
190        if level is not None:
191            data.pop(ConfigConst.tag_loglevel)
192            level = level.upper()
193            level = level if level in ["DEBUG", "INFO"] else ""
194            data.update({"console": level or "INFO"})
195        return data
196
197    @property
198    @initialize
199    def taskargs(self):
200        """
201        <taskargs>
202            <agent_mode></agent_mode>
203            <pass_through></pass_through>
204            <repeat></repeat>
205            <screenshot>false</screenshot>
206            <screenrecorder>false</screenrecorder>
207        </taskargs>
208        """
209        data = self.get_element_cfg("taskargs")
210        pass_through = data.get(ConfigConst.pass_through)
211        if pass_through:
212            user_define = None
213            try:
214                user_define = json.loads(pass_through).get("user_define")
215            except ValueError:
216                pass
217            if user_define:
218                self.update_task_args(data, user_define)
219        return data
220
221    @property
222    @initialize
223    def custom(self):
224        """
225        <custom></custom>
226        """
227        return self.get_element_cfg("custom")
228
229    @property
230    @initialize
231    def pass_through(self):
232        return self.taskargs.get(ConfigConst.pass_through)
233
234    def get_element_cfg(self, tag):
235        element = self.config_content.find(tag)
236        return {} if element is None else self.get_element_dict(element)
237
238    @staticmethod
239    def get_element_dict(element, is_top=True):
240        """
241        element: ElementTree.Element, traversal element
242        is_top: bool, when the element has no child and if is the top, result as dict else as text
243        return : dict
244        """
245        if not isinstance(element, ElementTree.Element):
246            raise TypeError("element must be instance of xml.etree.ElementTree.Element")
247        data = element.attrib
248        if len(element) == 0:
249            text = "" if element.text is None else element.text.strip()
250            if is_top:
251                data.update({element.tag: text})
252                data = {k: v.strip() for k, v in data.items()}
253                return data
254            return text
255
256        for sub in element:
257            k, v = sub.tag, UserConfigManager.get_element_dict(sub, is_top=False)
258            # 同一层级存在多个同名tag,数据存为列表
259            if len(element.findall(k)) > 1:
260                value = data.get(k) if k in data else []
261                value.append(v)
262                data.update({k: value})
263            else:
264                data.update({k: v})
265        return data
266
267    def get_wifi_config(self):
268        wifi = self.taskargs.get("wifi")
269        if wifi:
270            return wifi.split(",")
271        wifi = self.custom.get("wifi")
272        # 未配置
273        if wifi is None:
274            return []
275        # 只配置了一个
276        if isinstance(wifi, dict):
277            wifi = [wifi]
278        data = []
279        for info in wifi:
280            if not isinstance(info, dict):
281                continue
282            ssid = info.get("ssid", "")
283            password = info.get("password", "")
284            wifi_type = info.get("type", "")
285            if not ssid:
286                continue
287            if not wifi_type:
288                data.append("{}:{}".format(ssid, password))
289            else:
290                data.append("{}:{}:{}".format(ssid, password, wifi_type))
291        return data
292
293    def update_task_args(self, task_args=None, new_args=None):
294        if task_args is None:
295            task_args = self.taskargs
296        if not isinstance(new_args, dict):
297            return
298        # 目前在用的参数列表
299        known_test_args = [
300            "agent_mode", ConfigConst.repeat, ConfigConst.pass_through,
301            "screenshot", "screenrecorder", ConfigConst.web_resource, "wifi",
302            "install_user0", "ui_adaptive", "kill_uitest"
303        ]
304        # 更新同名参数
305        keys = set(task_args.keys()) & set(new_args.keys()) | set(known_test_args)
306        for key in keys:
307            value = new_args.get(key)
308            if not value or task_args.get(key) == value:
309                continue
310            task_args.update({key: value})
311
312    def get_user_config_list(self, tag_name):
313        data_dic = {}
314        for child in self.config_content:
315            if tag_name == child.tag:
316                for sub in child:
317                    data_dic[sub.tag] = sub.text
318        return data_dic
319
320    @staticmethod
321    def remove_strip(value):
322        return value.strip()
323
324    @staticmethod
325    def _verify_duplicate(items):
326        if len(set(items)) != len(items):
327            LOG.warning("Find duplicate sn config, configuration incorrect")
328            return False
329        return True
330
331    def _handle_str(self, input_string):
332        config_list = map(self.remove_strip, input_string.split(';'))
333        config_list = [item for item in config_list if item]
334        if config_list:
335            if not self._verify_duplicate(config_list):
336                return []
337        return config_list
338
339    def get_sn_list(self, input_string):
340        sn_select_list = []
341        if input_string:
342            sn_select_list = self._handle_str(input_string)
343        return sn_select_list
344
345    def get_remote_config(self):
346        remote_dic = {}
347        data_dic = self.get_user_config_list("remote")
348
349        if "ip" in data_dic.keys() and "port" in data_dic.keys():
350            remote_ip = data_dic.get("ip", "")
351            remote_port = data_dic.get("port", "")
352        else:
353            remote_ip = ""
354            remote_port = ""
355
356        if (not remote_ip) or (not remote_port):
357            remote_ip = ""
358            remote_port = ""
359        if remote_ip == get_local_ip():
360            remote_ip = "127.0.0.1"
361        remote_dic["ip"] = remote_ip
362        remote_dic["port"] = remote_port
363        return remote_dic
364
365    def get_user_config(self, target_name, filter_name=None):
366        data_dic = {}
367        all_nodes = self.config_content.findall(target_name)
368        if not all_nodes:
369            return data_dic
370
371        for node in all_nodes:
372            if filter_name:
373                if node.get('label') != filter_name:
374                    continue
375            for sub in node:
376                data_dic[sub.tag] = sub.text if sub.text else ""
377
378        return data_dic
379
380    def get_testcases_dir(self):
381        from xdevice import Variables
382        testcases_dir = self.testcases.get(ConfigConst.tag_dir)
383        if testcases_dir:
384            if os.path.isabs(testcases_dir):
385                return testcases_dir
386            return os.path.abspath(
387                os.path.join(Variables.exec_dir, testcases_dir))
388        return os.path.abspath(os.path.join(Variables.exec_dir, "testcases"))
389
390    def get_resource_path(self):
391        from xdevice import Variables
392        resource_dir = self.resource.get(ConfigConst.tag_dir)
393        if resource_dir:
394            if os.path.isabs(resource_dir):
395                return resource_dir
396            return os.path.abspath(
397                os.path.join(Variables.exec_dir, resource_dir))
398        return os.path.abspath(os.path.join(Variables.exec_dir, "resource"))
399
400    def environment_enable(self):
401        if self.config_content.find("environment") or \
402                self.config_content.find("environment/device"):
403            return True
404        return False
405
406    @property
407    @initialize
408    def uploadtrack(self):
409        """
410        Below configuring closes uploading track data.
411        <uploadtrack>FALSE</uploadtrack>
412        """
413        tag = "uploadtrack"
414        cfg = self.get_element_cfg(tag)
415        return cfg
416