• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20from dataclasses import dataclass
21from xml.etree import ElementTree
22
23from _core.exception import ParamError
24from _core.logger import platform_logger
25from _core.utils import get_local_ip
26from _core.constants import ConfigConst
27
28__all__ = ["UserConfigManager"]
29LOG = platform_logger("ConfigManager")
30
31
32@dataclass
33class ConfigFileConst(object):
34    userconfig_filepath = "user_config.xml"
35
36
37class UserConfigManager(object):
38    def __init__(self, config_file="", env=""):
39        from xdevice import Variables
40        try:
41            if env:
42                self.config_content = ElementTree.fromstring(env)
43            else:
44                if config_file:
45                    self.file_path = config_file
46                else:
47                    user_path = os.path.join(Variables.exec_dir, "config")
48                    top_user_path = os.path.join(Variables.top_dir, "config")
49                    config_path = os.path.join(Variables.res_dir, "config")
50                    paths = [user_path, top_user_path, config_path]
51
52                    for path in paths:
53                        if os.path.exists(os.path.abspath(os.path.join(
54                                path, ConfigFileConst.userconfig_filepath))):
55                            self.file_path = os.path.abspath(os.path.join(
56                                path, ConfigFileConst.userconfig_filepath))
57                            break
58
59                LOG.debug("User config path: %s" % self.file_path)
60                if os.path.exists(self.file_path):
61                    tree = ElementTree.parse(self.file_path)
62                    self.config_content = tree.getroot()
63                else:
64                    raise ParamError("%s not found" % self.file_path,
65                                     error_no="00115")
66
67        except SyntaxError as error:
68            if env:
69                raise ParamError(
70                    "Parse environment parameter fail! Error: %s" % error.args,
71                    error_no="00115")
72            else:
73                raise ParamError(
74                    "Parse %s fail! Error: %s" % (self.file_path, error.args),
75                    error_no="00115")
76
77    def get_user_config_list(self, tag_name):
78        data_dic = {}
79        for child in self.config_content:
80            if tag_name == child.tag:
81                for sub in child:
82                    data_dic[sub.tag] = sub.text
83        return data_dic
84
85    @staticmethod
86    def _traversal_element(element, is_top=True):
87        """
88        element: ElementTree.Element, traversal element
89        is_top : bool, if is the top element
90        return : dict
91        """
92        if not isinstance(element, ElementTree.Element):
93            raise TypeError("element must be instance of xml.tree.ElementTree.Element")
94        data = {}
95        if len(element) == 0:
96            return {element.tag: element.text} if is_top else element.text
97        for sub in element:
98            data.setdefault(sub.tag, UserConfigManager._traversal_element(sub, is_top=False))
99        return data
100
101    @staticmethod
102    def remove_strip(value):
103        return value.strip()
104
105    @staticmethod
106    def _verify_duplicate(items):
107        if len(set(items)) != len(items):
108            LOG.warning("Find duplicate sn config, configuration incorrect")
109            return False
110        return True
111
112    def _handle_str(self, input_string):
113        config_list = map(self.remove_strip, input_string.split(';'))
114        config_list = [item for item in config_list if item]
115        if config_list:
116            if not self._verify_duplicate(config_list):
117                return []
118        return config_list
119
120    def get_sn_list(self, input_string):
121        sn_select_list = []
122        if input_string:
123            sn_select_list = self._handle_str(input_string)
124        return sn_select_list
125
126    def get_remote_config(self):
127        remote_dic = {}
128        data_dic = self.get_user_config_list("remote")
129
130        if "ip" in data_dic.keys() and "port" in data_dic.keys():
131            remote_ip = data_dic.get("ip", "")
132            remote_port = data_dic.get("port", "")
133        else:
134            remote_ip = ""
135            remote_port = ""
136
137        if (not remote_ip) or (not remote_port):
138            remote_ip = ""
139            remote_port = ""
140        if remote_ip == get_local_ip():
141            remote_ip = "127.0.0.1"
142        remote_dic["ip"] = remote_ip
143        remote_dic["port"] = remote_port
144        return remote_dic
145
146    def get_testcases_dir_config(self):
147        data_dic = self.get_user_config_list("testcases")
148        if "dir" in data_dic.keys():
149            testcase_dir = data_dic.get("dir", "")
150            if testcase_dir is None:
151                testcase_dir = ""
152        else:
153            testcase_dir = ""
154        return testcase_dir
155
156    def get_user_config(self, target_name, filter_name=None):
157        data_dic = {}
158        all_nodes = self.config_content.findall(target_name)
159        if not all_nodes:
160            return data_dic
161
162        for node in all_nodes:
163            if filter_name:
164                if node.get('label') != filter_name:
165                    continue
166            for sub in node:
167                data_dic[sub.tag] = sub.text if sub.text else ""
168
169        return data_dic
170
171    def get_node_attr(self, target_name, attr_name):
172        nodes = self.config_content.find(target_name)
173        if attr_name in nodes.attrib:
174            return nodes.attrib.get(attr_name)
175        return None
176
177    def get_com_device(self, target_name):
178        devices = []
179
180        for node in self.config_content.findall(target_name):
181            if node.attrib["type"] != "com":
182                continue
183
184            device = [node.attrib]
185
186            # get remote device
187            data_dic = {}
188            for sub in node:
189                if sub.text is not None and sub.tag != "serial":
190                    data_dic[sub.tag] = sub.text
191            if data_dic:
192                if data_dic.get("ip", "") == get_local_ip():
193                    data_dic["ip"] = "127.0.0.1"
194                device.append(data_dic)
195                devices.append(device)
196                continue
197
198            # get local device
199            for serial in node.findall("serial"):
200                data_dic = {}
201                for sub in serial:
202                    if sub.text is None:
203                        data_dic[sub.tag] = ""
204                    else:
205                        data_dic[sub.tag] = sub.text
206                device.append(data_dic)
207            devices.append(device)
208        return devices
209
210    def get_device(self, target_name):
211        for node in self.config_content.findall(target_name):
212            data_dic = {}
213            if node.attrib["type"] != "usb-hdc" and \
214                    node.attrib["type"] != "usb-adb":
215                continue
216            data_dic["usb_type"] = node.attrib["type"]
217            for sub in node:
218                if sub.text is None:
219                    data_dic[sub.tag] = ""
220                else:
221                    data_dic[sub.tag] = sub.text
222            if data_dic.get("ip", "") == get_local_ip():
223                data_dic["ip"] = "127.0.0.1"
224            return data_dic
225        return None
226
227    def get_testcases_dir(self):
228        from xdevice import Variables
229        testcases_dir = self.get_testcases_dir_config()
230        if testcases_dir:
231            if os.path.isabs(testcases_dir):
232                return testcases_dir
233            return os.path.abspath(os.path.join(Variables.exec_dir,
234                                                testcases_dir))
235
236        return os.path.abspath(os.path.join(Variables.exec_dir, "testcases"))
237
238    def get_resource_path(self):
239        from xdevice import Variables
240        data_dic = self.get_user_config_list("resource")
241        if "dir" in data_dic.keys():
242            resource_dir = data_dic.get("dir", "")
243            if resource_dir:
244                if os.path.isabs(resource_dir):
245                    return resource_dir
246                return os.path.abspath(
247                    os.path.join(Variables.exec_dir, resource_dir))
248
249        return os.path.abspath(
250            os.path.join(Variables.exec_dir, "resource"))
251
252    def get_resource_conf(self):
253        resource = self.config_content.find("resource")
254        if resource is None:
255            return {}
256        return self._traversal_element(resource)
257
258    def get_log_level(self):
259        data_dic = {}
260        node = self.config_content.find("loglevel")
261        if node is not None:
262            if node.find("console") is None and node.find("file") is None:
263                # neither loglevel/console nor loglevel/file exists
264                data_dic.update({"console": str(node.text).strip()})
265            else:
266                for child in node:
267                    data_dic.update({child.tag: str(child.text).strip()})
268        return data_dic
269
270    def get_device_log_status(self):
271        data_dic = {}
272        node = self.config_content.find("devicelog")
273        if node is not None:
274            if node.find(ConfigConst.tag_enable) is not None \
275                    or node.find(ConfigConst.tag_dir) is not None:
276                for child in node:
277                    data_dic.update({child.tag: str(child.text).strip()})
278            else:
279                data_dic.update({ConfigConst.tag_enable: str(node.text).strip()})
280                data_dic.update({ConfigConst.tag_dir: None})
281                data_dic.update({ConfigConst.tag_loglevel: "INFO"})
282                data_dic.update({ConfigConst.tag_clear: "TRUE"})
283        return data_dic
284
285    def environment_enable(self):
286        if self.config_content.find("environment") or\
287                self.config_content.find("environment/device"):
288            return True
289        return False
290