• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import shutil
21import xml.etree.ElementTree as ElementTree
22from xdevice import platform_logger
23from xdevice import DeviceTestType
24from core.constants import ConfigFileConst
25
26LOG = platform_logger("ResourceManager")
27DEFAULT_TIMEOUT = "300"
28
29
30##############################################################################
31##############################################################################
32
33class ResourceManager(object):
34    def __init__(self):
35        pass
36
37    def get_resource_data_dic(self, testsuit_filepath):
38        resource_dir = ""
39        data_dic = {}
40
41        target_name, _ = self._get_file_name_extension(testsuit_filepath)
42        xml_filepath = self.get_resource_xml_file_path(testsuit_filepath)
43        if not os.path.exists(xml_filepath):
44            return data_dic, resource_dir
45
46        data_dic = self.get_resource_data(xml_filepath, target_name)
47        resource_dir = os.path.abspath(os.path.dirname(xml_filepath))
48        return data_dic, resource_dir
49
50    def get_resource_data(self, xml_filepath, target_name):
51        data_dic = {}
52        if os.path.exists(xml_filepath):
53            data_dic = self._parse_resource_test_xml_file(
54                xml_filepath, target_name)
55        return data_dic
56
57    def _parse_resource_test_xml_file(self, filepath, targetname):
58        data_dic = {}
59
60        node = self.find_node_by_target(filepath, targetname)
61        if node:
62            target_attrib_list = []
63            target_attrib_list.append(node.attrib)
64            environment_data_list = []
65            env_node = node.find("environment")
66            if env_node:
67                environment_data_list.append(env_node.attrib)
68                for element in env_node.findall("device"):
69                    environment_data_list.append(element.attrib)
70                    for option_element in element.findall("option"):
71                        environment_data_list.append(option_element.attrib)
72
73            preparer_data_list = []
74            pre_node = node.find("preparer")
75            if pre_node:
76                preparer_data_list.append(pre_node.attrib)
77                for element in pre_node.findall("option"):
78                    preparer_data_list.append(element.attrib)
79
80            cleaner_data_list = []
81            clr_node = node.find("cleaner")
82            if clr_node:
83                cleaner_data_list.append(clr_node.attrib)
84                for element in clr_node.findall("option"):
85                    cleaner_data_list.append(element.attrib)
86
87            data_dic["nodeattrib"] = target_attrib_list
88            data_dic["environment"] = environment_data_list
89            data_dic["preparer"] = preparer_data_list
90            data_dic["cleaner"] = cleaner_data_list
91
92        return data_dic
93
94    @staticmethod
95    def find_node_by_target(file_path, targe_tname):
96        node = None
97        try:
98            if os.path.exists(file_path):
99                tree = ElementTree.parse(file_path)
100                root = tree.getroot()
101                targets = root.getiterator("target")
102                for target in targets:
103                    curr_dic = target.attrib
104                    if curr_dic.get("name") == targe_tname or \
105                            targe_tname.startswith(curr_dic.get("name")):
106                        node = target
107                        break
108        except ElementTree.ParseError as xml_exception:
109            LOG.error("resource_test.xml parsing failed." +
110                      xml_exception.args)
111        return node
112
113    ##########################################################################
114    ##########################################################################
115
116    @classmethod
117    def _get_file_name_extension(cls, filepath):
118        _, fullname = os.path.split(filepath)
119        filename, ext = os.path.splitext(fullname)
120        LOG.debug("file path:{}".format(filepath))
121        return filename, ext
122
123    @classmethod
124    def get_dir_name(cls, dir_path):
125        dir_name = ""
126        if os.path.isdir(dir_path) and dir_path[-1] != ".":
127            dir_name_list = dir_path.rstrip(os.sep).split(os.sep)
128            if len(dir_name_list) > 1:
129                dir_name = dir_name_list[-1]
130        return dir_name
131
132    def process_resource_file(self, resource_dir, preparer_list, device):
133        for item in preparer_list:
134            if "name" not in item.keys():
135                continue
136
137            if item["name"] == "push":
138                push_value = item["value"]
139                find_key = "->"
140                pos = push_value.find(find_key)
141                src = os.path.join(resource_dir, push_value[0:pos].strip())
142                dst = push_value[pos + len(find_key):len(push_value)].strip()
143                src = src.replace("/", os.sep)
144                dir_name = self.get_dir_name(src)
145                if dir_name != "":
146                    dst = dst.rstrip("/") + "/" + dir_name
147                device.execute_shell_command("mkdir -p %s" % dst)
148                device.push_file(src, dst)
149            elif item["name"] == "pull":
150                push_value = item["value"]
151                find_key = "->"
152                pos = push_value.find(find_key)
153                src = os.path.join(resource_dir, push_value[0:pos].strip())
154                dst = push_value[pos + len(find_key):len(push_value)].strip()
155                device.pull_file(src, dst)
156            elif item["name"] == "shell":
157                command = item["value"].strip()
158                device.execute_shell_command(command)
159            else:
160                command = item["name"] + " " + item["value"]
161                command = command.strip()
162                device.hdc_command(command)
163
164    def lite_process_resource_file(self, resource_dir, preparer_list):
165        for item in preparer_list:
166            if "name" not in item.keys():
167                continue
168
169            if item["name"] == "push":
170                copy_value = item["value"]
171                find_key = "->"
172                pos = copy_value.find(find_key)
173                src = os.path.join(resource_dir, copy_value[0:pos].strip())
174                dst = copy_value[pos + len(find_key):len(copy_value)].strip()
175                shutil.copy(src, dst)
176
177            elif item["name"] == "pull":
178                copy_value = item["value"]
179                find_key = "->"
180                pos = copy_value.find(find_key)
181                src = os.path.join(resource_dir, copy_value[0:pos].strip())
182                dst = copy_value[pos + len(find_key):len(copy_value)].strip()
183                shutil.copyfile(dst, src)
184            else:
185                command = item["name"] + " " + item["value"]
186                command = command.strip()
187                self.lite_device.execute_command_with_timeout(command, case_type=DeviceTestType.lite_cpp_test)
188
189    @classmethod
190    def get_env_data(cls, environment_list):
191        env_data_dic = {}
192        device_name = ""
193        option_dic = {}
194
195        for item in environment_list:
196            if "type" in item.keys():
197                if device_name != "":
198                    temp_dic = option_dic.copy()
199                    env_data_dic[device_name] = temp_dic
200                    device_name = ""
201                    option_dic.clear()
202                device_name = item["type"]
203
204            if "name" in item.keys():
205                name = item["name"]
206                value = item["value"]
207                option_dic[name] = value
208
209        if device_name != "":
210            temp_dic = option_dic.copy()
211            env_data_dic[device_name] = temp_dic
212            device_name = ""
213            option_dic.clear()
214        LOG.debug("get environment data finish")
215        return env_data_dic
216
217    @staticmethod
218    def get_resource_xml_file_path(test_suit_file_path):
219        current_dir = os.path.dirname(test_suit_file_path)
220        while True:
221            if current_dir.endswith(os.sep + "tests"):
222                current_dir = ""
223                break
224            if current_dir == "/" or current_dir.endswith(":\\"):
225                current_dir = ""
226                break
227            if os.path.exists(os.path.join(current_dir, "resource")):
228                break
229            current_dir = os.path.dirname(current_dir)
230
231        if current_dir != "":
232            xml_filepath = os.path.join(
233                current_dir,
234                "resource",
235                ConfigFileConst.RESOURCECONFIG_FILEPATH)
236            if not os.path.exists(xml_filepath):
237                xml_filepath = os.path.join(
238                    current_dir,
239                    "resource",
240                    ConfigFileConst.CASE_RESOURCE_FILEPATH)
241        else:
242            xml_filepath = ""
243        LOG.info("xml_filepath = %s" % xml_filepath)
244        return xml_filepath
245
246    @classmethod
247    def get_nodeattrib_data(cls, data_dic):
248        curr_timeout = DEFAULT_TIMEOUT
249        if "nodeattrib" in data_dic.keys():
250            LOG.info("++++++++++++++nodeattrib+++++++++++++++")
251            nodeattrib_list = data_dic["nodeattrib"]
252            if len(nodeattrib_list) != 0:
253                node_item_dic = nodeattrib_list[0]
254                if "timeout" in node_item_dic:
255                    curr_timeout = node_item_dic["timeout"]
256        return curr_timeout
257
258    def get_environment_data(self, data_dic):
259        env_data_dic = {}
260        if "environment" in data_dic.keys():
261            LOG.info("++++++++++++++environment+++++++++++++++")
262            environment_list = data_dic["environment"]
263            env_data_dic = self.get_env_data(environment_list)
264        return env_data_dic
265
266    def process_preparer_data(self, data_dic, resource_dir, device):
267        if "preparer" in data_dic.keys():
268            LOG.info("++++++++++++++preparer+++++++++++++++")
269            preparer_list = data_dic["preparer"]
270            self.process_resource_file(resource_dir, preparer_list, device)
271        return
272
273    def lite_process_preparer_data(self, data_dic, resource_dir):
274        if "preparer" in data_dic.keys():
275            LOG.info("++++++++++++++preparer+++++++++++++++")
276            preparer_list = data_dic["preparer"]
277            self.lite_process_resource_file(resource_dir, preparer_list)
278        return
279
280    def process_cleaner_data(self, data_dic, resource_dir, device):
281        if "cleaner" in data_dic.keys():
282            LOG.info("++++++++++++++cleaner+++++++++++++++")
283            cleaner_list = data_dic["cleaner"]
284            self.process_resource_file(resource_dir, cleaner_list, device)
285        return
286
287
288##############################################################################
289##############################################################################
290