• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import xml.etree.ElementTree as ET
21from dataclasses import dataclass
22
23from _core.exception import ParamError
24from _core.logger import platform_logger
25from _core.utils import get_local_ip
26
27__all__ = ["UserConfigManager"]
28LOG = platform_logger("ConfigManager")
29
30
31@dataclass
32class ConfigFileConst(object):
33    userconfig_filepath = "user_config.xml"
34
35
36class UserConfigManager(object):
37    def __init__(self, config_file="", env=""):
38        from xdevice import Variables
39        try:
40            if env:
41                self.config_content = ET.fromstring(env)
42            else:
43                if config_file:
44                    self.file_path = config_file
45                else:
46                    user_path = os.path.join(Variables.exec_dir, "config")
47                    top_user_path = os.path.join(Variables.top_dir, "config")
48                    config_path = os.path.join(Variables.res_dir, "config")
49                    paths = [user_path, top_user_path, config_path]
50
51                    for path in paths:
52                        if os.path.exists(os.path.abspath(os.path.join(
53                                path, ConfigFileConst.userconfig_filepath))):
54                            self.file_path = os.path.abspath(os.path.join(
55                                path, ConfigFileConst.userconfig_filepath))
56                            break
57
58                LOG.debug("User config path: %s" % self.file_path)
59                if os.path.exists(self.file_path):
60                    tree = ET.parse(self.file_path)
61                    self.config_content = tree.getroot()
62                else:
63                    raise ParamError("%s not found" % self.file_path,
64                                     error_no="00115")
65
66        except SyntaxError as error:
67            if env:
68                raise ParamError(
69                    "Parse environment parameter fail! Error: %s" % error.args,
70                    error_no="00115")
71            else:
72                raise ParamError(
73                    "Parse %s fail! Error: %s" % (self.file_path, error.args),
74                    error_no="00115")
75
76    def get_user_config_list(self, tag_name):
77        data_dic = {}
78        for child in self.config_content:
79            if tag_name == child.tag:
80                for sub in child:
81                    data_dic[sub.tag] = sub.text
82        return data_dic
83
84    @staticmethod
85    def remove_strip(value):
86        return value.strip()
87
88    @staticmethod
89    def _verify_duplicate(items):
90        if len(set(items)) != len(items):
91            LOG.warning("Find duplicate sn config, configuration incorrect")
92            return False
93        return True
94
95    def _handle_str(self, input_string):
96        config_list = map(self.remove_strip, input_string.split(';'))
97        config_list = [item for item in config_list if item]
98        if config_list:
99            if not self._verify_duplicate(config_list):
100                return []
101        return config_list
102
103    def get_sn_list(self, input_string):
104        sn_select_list = []
105        if input_string:
106            sn_select_list = self._handle_str(input_string)
107        return sn_select_list
108
109    def get_remote_config(self):
110        remote_dic = {}
111        data_dic = self.get_user_config_list("remote")
112
113        if "ip" in data_dic.keys() and "port" in data_dic.keys():
114            remote_ip = data_dic.get("ip", "")
115            remote_port = data_dic.get("port", "")
116        else:
117            remote_ip = ""
118            remote_port = ""
119
120        if (not remote_ip) or (not remote_port):
121            remote_ip = ""
122            remote_port = ""
123        if remote_ip == get_local_ip():
124            remote_ip = "127.0.0.1"
125        remote_dic["ip"] = remote_ip
126        remote_dic["port"] = remote_port
127        return remote_dic
128
129    def get_testcases_dir_config(self):
130        data_dic = self.get_user_config_list("testcases")
131        if "dir" in data_dic.keys():
132            testcase_dir = data_dic.get("dir", "")
133            if testcase_dir is None:
134                testcase_dir = ""
135        else:
136            testcase_dir = ""
137        return testcase_dir
138
139    def get_user_config(self, target_name, filter_name=None):
140        data_dic = {}
141        all_nodes = self.config_content.findall(target_name)
142        if not all_nodes:
143            return data_dic
144
145        for node in all_nodes:
146            if filter_name:
147                if node.get('label') != filter_name:
148                    continue
149            for sub in node:
150                data_dic[sub.tag] = sub.text if sub.text else ""
151
152        return data_dic
153
154    def get_node_attr(self, target_name, attr_name):
155        nodes = self.config_content.find(target_name)
156        if attr_name in nodes.attrib:
157            return nodes.attrib.get(attr_name)
158
159    def get_com_device(self, target_name):
160        devices = []
161
162        for node in self.config_content.findall(target_name):
163            if node.attrib["type"] != "com":
164                continue
165
166            device = [node.attrib]
167
168            # get remote device
169            data_dic = {}
170            for sub in node:
171                if sub.text is not None and sub.tag != "serial":
172                    data_dic[sub.tag] = sub.text
173            if data_dic:
174                if data_dic.get("ip", "") == get_local_ip():
175                    data_dic["ip"] = "127.0.0.1"
176                device.append(data_dic)
177                devices.append(device)
178                continue
179
180            # get local device
181            for serial in node.findall("serial"):
182                data_dic = {}
183                for sub in serial:
184                    if sub.text is None:
185                        data_dic[sub.tag] = ""
186                    else:
187                        data_dic[sub.tag] = sub.text
188                device.append(data_dic)
189            devices.append(device)
190        return devices
191
192    def get_device(self, target_name):
193        for node in self.config_content.findall(target_name):
194            data_dic = {}
195            if node.attrib["type"] != "usb-hdc" and \
196                    node.attrib["type"] != "usb-adb":
197                continue
198            data_dic["usb_type"] = node.attrib["type"]
199            for sub in node:
200                if sub.text is None:
201                    data_dic[sub.tag] = ""
202                else:
203                    data_dic[sub.tag] = sub.text
204            if data_dic.get("ip", "") == get_local_ip():
205                data_dic["ip"] = "127.0.0.1"
206            return data_dic
207        return None
208
209    def get_testcases_dir(self):
210        from xdevice import Variables
211        testcases_dir = self.get_testcases_dir_config()
212        if testcases_dir:
213            if os.path.isabs(testcases_dir):
214                return testcases_dir
215            return os.path.abspath(os.path.join(Variables.exec_dir,
216                                                testcases_dir))
217
218        return os.path.abspath(os.path.join(Variables.exec_dir, "testcases"))
219
220    def get_resource_path(self):
221        from xdevice import Variables
222        data_dic = self.get_user_config_list("resource")
223        if "dir" in data_dic.keys():
224            resource_dir = data_dic.get("dir", "")
225            if resource_dir:
226                if os.path.isabs(resource_dir):
227                    return resource_dir
228                return os.path.abspath(
229                    os.path.join(Variables.exec_dir, resource_dir))
230
231        return os.path.abspath(
232            os.path.join(Variables.exec_dir, "resource"))
233
234    def get_log_level(self):
235        data_dic = {}
236        node = self.config_content.find("loglevel")
237        if node is not None:
238            if node.find("console") is None and node.find("file") is None:
239                # neither loglevel/console nor loglevel/file exists
240                data_dic.update({"console": str(node.text).strip()})
241            else:
242                for child in node:
243                    data_dic.update({child.tag: str(child.text).strip()})
244        return data_dic
245
246    def get_device_log_status(self):
247        node = self.config_content.find("devicelog")
248        if node is not None:
249            return str(node.text).strip()
250        return None
251
252