• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import json
20import os
21import sys
22from dataclasses import dataclass
23from xml.etree import ElementTree
24
25from _core.error import ErrorMessage
26from _core.exception import ParamError
27from _core.logger import platform_logger
28from _core.utils import get_local_ip
29from _core.constants import ConfigConst
30from _core.constants import Cluster
31
32__all__ = ["UserConfigManager"]
33LOG = platform_logger("ConfigManager")
34
35
36def initialize(func):
37
38    def wrapper(self, *args, **kwargs):
39        name = "_" + func.__name__
40        if not hasattr(self, name):
41            setattr(self, name, func(self, *args, **kwargs))
42        return getattr(self, name)
43
44    return wrapper
45
46
47@dataclass
48class ConfigFileConst(object):
49    userconfig_filepath = "user_config.xml"
50
51
52class UserConfigManager(object):
53    def __init__(self, config_file="", env=""):
54        from xdevice import Variables
55        try:
56            if env:
57                self.config_content = ElementTree.fromstring(env)
58            else:
59                if config_file:
60                    self.file_path = config_file
61                else:
62                    user_path = os.path.join(Variables.exec_dir, "config")
63                    top_user_path = os.path.join(Variables.top_dir, "config")
64                    config_path = os.path.join(Variables.res_dir, "config")
65                    paths = [user_path, top_user_path, config_path]
66
67                    for path in paths:
68                        if os.path.exists(os.path.abspath(os.path.join(
69                                path, ConfigFileConst.userconfig_filepath))):
70                            self.file_path = os.path.abspath(os.path.join(
71                                path, ConfigFileConst.userconfig_filepath))
72                            break
73
74                LOG.debug("User config path: %s" % self.file_path)
75                if os.path.exists(self.file_path):
76                    tree = ElementTree.parse(self.file_path)
77                    self.config_content = tree.getroot()
78                else:
79                    raise ParamError(ErrorMessage.UserConfig.Code_0103001)
80
81        except SyntaxError as error:
82            err = ErrorMessage.UserConfig.Code_0103003 if env else ErrorMessage.UserConfig.Code_0103002
83            err_msg = err.format(error)
84            raise ParamError(err_msg) from error
85
86    @property
87    @initialize
88    def environment(self):
89        envs = []
90        ele_environment = self.config_content.find("environment")
91        if ele_environment is None:
92            return envs
93        dev_alias = {}
94        for device in ele_environment.findall("device"):
95            device_type = device.get("type", "").strip()
96            if not device_type or device_type == "com":
97                continue
98            # 设备管理器使用usb_type作为入参,故需将type转换为usb_type
99            data = {"usb_type": device_type, "label": ""}
100            data.update(device.attrib)
101            for info in device.findall("info"):
102                dev = {"ip": "", "port": "", "sn": "", "alias": ""}
103                dev.update(info.attrib)
104                dev.update(data)
105
106                # 去除空格,alias字符转大写
107                new_dev = {}
108                for k, v in dev.items():
109                    if k == "alias":
110                        v = v.upper()
111                    v = v.strip()
112                    new_dev[k] = v
113                dev.update(new_dev)
114
115                # 不允许设备的别名相同
116                sn, alias = dev.get("sn"), dev.get("alias")
117                if alias:
118                    for k, v in dev_alias.items():
119                        if alias == v:
120                            raise ParamError(ErrorMessage.UserConfig.Code_0103004.format(sn, k))
121                dev_alias.update({sn: alias})
122
123                ip = dev.get("ip")
124                if not ip:
125                    dev.update({"ip": "127.0.0.1"})
126                envs.append(dev)
127        return envs
128
129    @property
130    @initialize
131    def testcases(self):
132        return self.get_element_cfg("testcases")
133
134    @property
135    @initialize
136    def resource(self):
137        return self.get_element_cfg("resource")
138
139    @property
140    @initialize
141    def devicelog(self):
142        """
143        <devicelog>
144            <enable>ON</enable>
145            <clear>TRUE</clear>
146            <dir></dir>
147            <loglevel>INFO</loglevel>
148            <hdc>FALSE</hdc>
149        </devicelog>
150        """
151        tag = "devicelog"
152        cfg = self.get_element_cfg(tag)
153
154        # 若是旧配置方式<devicelog>ON</devicelog>,转换为{"enable": "ON"}
155        enable = cfg.get(tag)
156        if enable is not None:
157            cfg.pop(tag)
158            cfg.update({ConfigConst.tag_enable: "ON" if enable.upper() == "ON" else "OFF"})
159
160        # 从自定义参数中更新配置
161        if self.pass_through:
162            user_define = None
163            try:
164                user_define = json.loads(self.pass_through).get("user_define")
165            except ValueError:
166                pass
167            if user_define and isinstance(user_define, dict):
168                device_log = user_define.get(tag)
169                if device_log and isinstance(device_log, dict):
170                    cfg.update(device_log)
171
172        # 默认配置参数
173        data = {
174            ConfigConst.tag_enable: "ON",
175            ConfigConst.tag_clear: "TRUE",
176            ConfigConst.tag_dir: "",
177            ConfigConst.tag_loglevel: "INFO",
178            ConfigConst.tag_hdc: "FALSE"
179        }
180        # 刷新默认配置参数
181        for key in data.keys():
182            value = cfg.get(key)
183            if value:
184                data.update({key: value})
185        return data
186
187    @property
188    @initialize
189    def loglevel(self):
190        data = self.get_element_cfg(ConfigConst.tag_loglevel)
191        level = data.get(ConfigConst.tag_loglevel)
192        if level is not None:
193            data.pop(ConfigConst.tag_loglevel)
194            level = level.upper()
195            level = level if level in ["DEBUG", "INFO"] else ""
196            data.update({"console": level or "INFO"})
197        return data
198
199    @property
200    @initialize
201    def taskargs(self):
202        """
203        <taskargs>
204            <agent_mode></agent_mode>
205            <pass_through></pass_through>
206            <repeat></repeat>
207            <screenshot>false</screenshot>
208            <screenrecorder>false</screenrecorder>
209        </taskargs>
210        """
211        data = self.get_element_cfg("taskargs")
212        pass_through = data.get(ConfigConst.pass_through)
213        if pass_through:
214            user_define = None
215            try:
216                user_define = json.loads(pass_through).get("user_define")
217            except ValueError:
218                pass
219            if user_define:
220                self.update_task_args(data, user_define)
221        return data
222
223    @property
224    @initialize
225    def custom(self):
226        """
227        <custom></custom>
228        """
229        return self.get_element_cfg("custom")
230
231    @property
232    @initialize
233    def cluster(self):
234        """
235        <cluster>
236          <enable>true/false</enable>
237          <service_mode>controller/worker</service_mode>
238          <service_port>8000</service_port>
239          <control_service_url>http://127.0.0.1:8000</control_service_url>
240        </cluster>
241        """
242        cfg = self.get_element_cfg(ConfigConst.cluster)
243        enable = cfg.get(ConfigConst.tag_enable) or "false"
244        service_mode = cfg.get(ConfigConst.service_mode) or Cluster.controller
245        service_port = cfg.get(ConfigConst.service_port) or Cluster.service_port
246        cfg.update({
247            ConfigConst.tag_enable: enable.lower(),
248            ConfigConst.service_mode: service_mode.lower(),
249            ConfigConst.service_port: service_port.lower()
250        })
251        return cfg
252
253    def enable_cluster(self):
254        enable = self.cluster.get(ConfigConst.tag_enable) == "true"
255        if enable:
256            if sys.version_info < (3, 10, 0):
257                raise Exception(ErrorMessage.Cluster.Code_0104026)
258        return enable
259
260    @property
261    @initialize
262    def pass_through(self):
263        return self.taskargs.get(ConfigConst.pass_through)
264
265    def get_element_cfg(self, tag):
266        element = self.config_content.find(tag)
267        return {} if element is None else self.get_element_dict(element)
268
269    @staticmethod
270    def get_element_dict(element, is_top=True):
271        """
272        element: ElementTree.Element, traversal element
273        is_top: bool, when the element has no child and if is the top, result as dict else as text
274        return : dict
275        """
276        if not isinstance(element, ElementTree.Element):
277            raise TypeError("element must be instance of xml.etree.ElementTree.Element")
278        data = element.attrib
279        if len(element) == 0:
280            text = "" if element.text is None else element.text.strip()
281            if is_top:
282                data.update({element.tag: text})
283                data = {k: v.strip() for k, v in data.items()}
284                return data
285            return text
286
287        for sub in element:
288            k, v = sub.tag, UserConfigManager.get_element_dict(sub, is_top=False)
289            # 同一层级存在多个同名tag,数据存为列表
290            if len(element.findall(k)) > 1:
291                value = data.get(k) if k in data else []
292                value.append(v)
293                data.update({k: value})
294            else:
295                data.update({k: v})
296        return data
297
298    def get_wifi_config(self):
299        wifi = self.taskargs.get("wifi")
300        if wifi:
301            return wifi.split(",")
302        wifi = self.custom.get("wifi")
303        # 未配置
304        if wifi is None:
305            return []
306        # 只配置了一个
307        if isinstance(wifi, dict):
308            wifi = [wifi]
309        data = []
310        for info in wifi:
311            if not isinstance(info, dict):
312                continue
313            ssid = info.get("ssid", "")
314            password = info.get("password", "")
315            wifi_type = info.get("type", "")
316            if not ssid:
317                continue
318            if not wifi_type:
319                data.append("{}:{}".format(ssid, password))
320            else:
321                data.append("{}:{}:{}".format(ssid, password, wifi_type))
322        return data
323
324    def update_task_args(self, task_args=None, new_args=None):
325        if task_args is None:
326            task_args = self.taskargs
327        if not isinstance(new_args, dict):
328            return
329        # 目前在用的参数列表
330        known_test_args = [
331            "agent_mode", ConfigConst.repeat, ConfigConst.pass_through,
332            "screenshot", "screenrecorder", ConfigConst.web_resource, "wifi",
333            "install_user0", "ui_adaptive", "kill_uitest"
334        ]
335        # 更新同名参数
336        keys = set(task_args.keys()) & set(new_args.keys()) | set(known_test_args)
337        for key in keys:
338            value = new_args.get(key)
339            if not value or task_args.get(key) == value:
340                continue
341            task_args.update({key: value})
342
343    def get_user_config_list(self, tag_name):
344        data_dic = {}
345        for child in self.config_content:
346            if tag_name == child.tag:
347                for sub in child:
348                    data_dic[sub.tag] = sub.text
349        return data_dic
350
351    @staticmethod
352    def remove_strip(value):
353        return value.strip()
354
355    @staticmethod
356    def _verify_duplicate(items):
357        if len(set(items)) != len(items):
358            LOG.warning("Find duplicate sn config, configuration incorrect")
359            return False
360        return True
361
362    def _handle_str(self, input_string):
363        config_list = map(self.remove_strip, input_string.split(';'))
364        config_list = [item for item in config_list if item]
365        if config_list:
366            if not self._verify_duplicate(config_list):
367                return []
368        return config_list
369
370    def get_sn_list(self, input_string):
371        sn_select_list = []
372        if input_string:
373            sn_select_list = self._handle_str(input_string)
374        return sn_select_list
375
376    def get_remote_config(self):
377        remote_dic = {}
378        data_dic = self.get_user_config_list("remote")
379
380        if "ip" in data_dic.keys() and "port" in data_dic.keys():
381            remote_ip = data_dic.get("ip", "")
382            remote_port = data_dic.get("port", "")
383        else:
384            remote_ip = ""
385            remote_port = ""
386
387        if (not remote_ip) or (not remote_port):
388            remote_ip = ""
389            remote_port = ""
390        if remote_ip == get_local_ip():
391            remote_ip = "127.0.0.1"
392        remote_dic["ip"] = remote_ip
393        remote_dic["port"] = remote_port
394        return remote_dic
395
396    def get_user_config(self, target_name, filter_name=None):
397        data_dic = {}
398        all_nodes = self.config_content.findall(target_name)
399        if not all_nodes:
400            return data_dic
401
402        for node in all_nodes:
403            if filter_name:
404                if node.get('label') != filter_name:
405                    continue
406            for sub in node:
407                data_dic[sub.tag] = sub.text if sub.text else ""
408
409        return data_dic
410
411    def get_testcases_dir(self):
412        from xdevice import Variables
413        testcases_dir = self.testcases.get(ConfigConst.tag_dir)
414        if testcases_dir:
415            if os.path.isabs(testcases_dir):
416                return testcases_dir
417            return os.path.abspath(
418                os.path.join(Variables.exec_dir, testcases_dir))
419        return os.path.abspath(os.path.join(Variables.exec_dir, "testcases"))
420
421    def get_resource_path(self):
422        from xdevice import Variables
423        resource_dir = self.resource.get(ConfigConst.tag_dir)
424        if resource_dir:
425            if os.path.isabs(resource_dir):
426                return resource_dir
427            return os.path.abspath(
428                os.path.join(Variables.exec_dir, resource_dir))
429        return os.path.abspath(os.path.join(Variables.exec_dir, "resource"))
430
431    def environment_enable(self):
432        if self.config_content.find("environment") or \
433                self.config_content.find("environment/device"):
434            return True
435        return False
436
437    @property
438    @initialize
439    def uploadtrack(self):
440        """
441        Below configuring closes uploading track data.
442        <uploadtrack>FALSE</uploadtrack>
443        """
444        tag = "uploadtrack"
445        cfg = self.get_element_cfg(tag)
446        return cfg
447