• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import xml.etree.ElementTree as ET
21from dataclasses import dataclass
22
23from _core.exception import ParamError
24from _core.logger import platform_logger
25from _core.utils import get_local_ip
26
27
28__all__ = ["UserConfigManager"]
29LOG = platform_logger("ConfigManager")
30
31
32@dataclass
33class ConfigFileConst(object):
34    userconfig_filepath = "user_config.xml"
35
36
37class UserConfigManager(object):
38    def __init__(self, config_file="", env=""):
39        from xdevice import Variables
40        try:
41            if env:
42                self.config_content = ET.fromstring(env)
43            else:
44                if config_file:
45                    self.file_path = config_file
46                else:
47                    user_path = os.path.join(Variables.exec_dir, "config")
48                    top_user_path = os.path.join(Variables.top_dir, "config")
49                    config_path = os.path.join(Variables.res_dir, "config")
50                    paths = [user_path, top_user_path, config_path]
51
52                    for path in paths:
53                        if os.path.exists(os.path.abspath(os.path.join(
54                                path, ConfigFileConst.userconfig_filepath))):
55                            self.file_path = os.path.abspath(os.path.join(
56                                path, ConfigFileConst.userconfig_filepath))
57                            break
58
59                LOG.debug("user config path: %s" % self.file_path)
60                if os.path.exists(self.file_path):
61                    tree = ET.parse(self.file_path)
62                    self.config_content = tree.getroot()
63                else:
64                    raise ParamError("%s not found" % self.file_path,
65                                     error_no="00115")
66
67        except SyntaxError as error:
68            if env:
69                raise ParamError(
70                    "Parse environment parameter fail! Error: %s" % error.args,
71                    error_no="00115")
72            else:
73                raise ParamError(
74                    "Parse %s fail! Error: %s" % (self.file_path, error.args),
75                    error_no="00115")
76
77    def get_user_config_list(self, tag_name):
78        data_dic = {}
79        for child in self.config_content:
80            if tag_name == child.tag:
81                for sub in child:
82                    data_dic[sub.tag] = sub.text
83        return data_dic
84
85    @staticmethod
86    def remove_strip(value):
87        return value.strip()
88
89    @staticmethod
90    def _verify_duplicate(items):
91        if len(set(items)) != len(items):
92            LOG.warning("find duplicate sn config, configuration incorrect")
93            return False
94        return True
95
96    def _handle_str(self, input_string):
97        config_list = map(self.remove_strip, input_string.split(';'))
98        config_list = [item for item in config_list if item]
99        if config_list:
100            if not self._verify_duplicate(config_list):
101                return []
102        return config_list
103
104    def get_sn_list(self, input_string):
105        sn_select_list = []
106        if input_string:
107            sn_select_list = self._handle_str(input_string)
108        return sn_select_list
109
110    def get_remote_config(self):
111        remote_dic = {}
112        data_dic = self.get_user_config_list("remote")
113
114        if "ip" in data_dic.keys() and "port" in data_dic.keys():
115            remote_ip = data_dic.get("ip", "")
116            remote_port = data_dic.get("port", "")
117        else:
118            remote_ip = ""
119            remote_port = ""
120
121        if (not remote_ip) or (not remote_port):
122            remote_ip = ""
123            remote_port = ""
124        if remote_ip == get_local_ip():
125            remote_ip = "127.0.0.1"
126        remote_dic["ip"] = remote_ip
127        remote_dic["port"] = remote_port
128        return remote_dic
129
130    def get_testcases_dir_config(self):
131        data_dic = self.get_user_config_list("testcases")
132        if "dir" in data_dic.keys():
133            testcase_dir = data_dic.get("dir", "")
134            if testcase_dir is None:
135                testcase_dir = ""
136        else:
137            testcase_dir = ""
138        return testcase_dir
139
140    def get_user_config(self, target_name, filter_name=None):
141        data_dic = {}
142        all_nodes = self.config_content.findall(target_name)
143        if not all_nodes:
144            return data_dic
145
146        for node in all_nodes:
147            if filter_name:
148                if node.get('label') != filter_name:
149                    continue
150            for sub in node:
151                data_dic[sub.tag] = sub.text if sub.text else ""
152
153        return data_dic
154
155    def get_node_attr(self, target_name, attr_name):
156        nodes = self.config_content.find(target_name)
157        if attr_name in nodes.attrib:
158            return nodes.attrib.get(attr_name)
159
160    def get_com_device(self, target_name):
161        devices = []
162
163        for node in self.config_content.findall(target_name):
164            if node.attrib["type"] != "com" and node.attrib["type"] != "agent":
165                continue
166
167            device = [node.attrib]
168
169            # get remote device
170            data_dic = {}
171            for sub in node:
172                if sub.text is not None and sub.tag != "serial":
173                    data_dic[sub.tag] = sub.text
174            if data_dic:
175                if data_dic.get("ip", "") == get_local_ip():
176                    data_dic["ip"] = "127.0.0.1"
177                device.append(data_dic)
178                devices.append(device)
179                continue
180
181            # get local device
182            for serial in node.findall("serial"):
183                data_dic = {}
184                for sub in serial:
185                    if sub.text is None:
186                        data_dic[sub.tag] = ""
187                    else:
188                        data_dic[sub.tag] = sub.text
189                device.append(data_dic)
190            devices.append(device)
191        return devices
192
193    def get_device(self, target_name):
194        for node in self.config_content.findall(target_name):
195            data_dic = {}
196            if node.attrib["type"] != "usb-hdc":
197                continue
198            data_dic["usb_type"] = node.attrib["type"]
199            for sub in node:
200                if sub.text is None:
201                    data_dic[sub.tag] = ""
202                else:
203                    data_dic[sub.tag] = sub.text
204            if data_dic.get("ip", "") == get_local_ip():
205                data_dic["ip"] = "127.0.0.1"
206            return data_dic
207        return None
208
209    def get_testcases_dir(self):
210        from xdevice import Variables
211        testcases_dir = self.get_testcases_dir_config()
212        if testcases_dir:
213            if os.path.isabs(testcases_dir):
214                return testcases_dir
215            return os.path.abspath(os.path.join(Variables.exec_dir,
216                                                testcases_dir))
217
218        return os.path.abspath(os.path.join(Variables.exec_dir, "testcases"))
219
220    def get_resource_path(self):
221        from xdevice import Variables
222        data_dic = self.get_user_config_list("resource")
223        if "dir" in data_dic.keys():
224            resource_dir = data_dic.get("dir", "")
225            if resource_dir:
226                if os.path.isabs(resource_dir):
227                    return resource_dir
228                return os.path.abspath(
229                    os.path.join(Variables.exec_dir, resource_dir))
230
231        return os.path.abspath(
232            os.path.join(Variables.exec_dir, "resource"))
233
234    def get_log_level(self):
235        data_dic = {}
236        node = self.config_content.find("loglevel")
237        if node is not None:
238            if node.find("console") is None and node.find("file") is None:
239                # neither loglevel/console nor loglevel/file exists
240                data_dic.update({"console": str(node.text).strip()})
241            else:
242                for child in node:
243                    data_dic.update({child.tag: str(child.text).strip()})
244        return data_dic
245