• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import shutil
21import xml.etree.ElementTree as ElementTree
22from xdevice import platform_logger
23from xdevice import DeviceTestType
24from core.constants import ConfigFileConst
25
26LOG = platform_logger("ResourceManager")
27
28
29##############################################################################
30##############################################################################
31
32class ResourceManager(object):
33    def __init__(self):
34        pass
35
36    def get_resource_data_dic(self, testsuit_filepath):
37        resource_dir = ""
38        data_dic = {}
39
40        target_name, _ = self._get_file_name_extension(testsuit_filepath)
41        xml_filepath = self.get_resource_xml_file_path(testsuit_filepath)
42        if not os.path.exists(xml_filepath):
43            return data_dic, resource_dir
44
45        data_dic = self.get_resource_data(xml_filepath, target_name)
46        resource_dir = os.path.abspath(os.path.dirname(xml_filepath))
47        return data_dic, resource_dir
48
49    def get_resource_data(self, xml_filepath, target_name):
50        data_dic = {}
51        if os.path.exists(xml_filepath):
52            data_dic = self._parse_resource_test_xml_file(
53                xml_filepath, target_name)
54        return data_dic
55
56    def _parse_resource_test_xml_file(self, filepath, targetname):
57        data_dic = {}
58
59        node = self.find_node_by_target(filepath, targetname)
60        if node:
61            target_attrib_list = []
62            target_attrib_list.append(node.attrib)
63            environment_data_list = []
64            env_node = node.find("environment")
65            if env_node:
66                environment_data_list.append(env_node.attrib)
67                for element in env_node.findall("device"):
68                    environment_data_list.append(element.attrib)
69                    for option_element in element.findall("option"):
70                        environment_data_list.append(option_element.attrib)
71
72            preparer_data_list = []
73            pre_node = node.find("preparer")
74            if pre_node:
75                preparer_data_list.append(pre_node.attrib)
76                for element in pre_node.findall("option"):
77                    preparer_data_list.append(element.attrib)
78
79            cleaner_data_list = []
80            clr_node = node.find("cleaner")
81            if clr_node:
82                cleaner_data_list.append(clr_node.attrib)
83                for element in clr_node.findall("option"):
84                    cleaner_data_list.append(element.attrib)
85
86            data_dic["nodeattrib"] = target_attrib_list
87            data_dic["environment"] = environment_data_list
88            data_dic["preparer"] = preparer_data_list
89            data_dic["cleaner"] = cleaner_data_list
90
91        return data_dic
92
93    @staticmethod
94    def find_node_by_target(file_path, targe_tname):
95        node = None
96        try:
97            if os.path.exists(file_path):
98                tree = ElementTree.parse(file_path)
99                root = tree.getroot()
100                targets = root.getiterator("target")
101                for target in targets:
102                    curr_dic = target.attrib
103                    if curr_dic.get("name") == targe_tname or \
104                            targe_tname.startswith(curr_dic.get("name")):
105                        node = target
106                        break
107        except ElementTree.ParseError as xml_exception:
108            LOG.error("resource_test.xml parsing failed." +
109                      xml_exception.args)
110        return node
111
112    ##########################################################################
113    ##########################################################################
114
115    @classmethod
116    def _get_file_name_extension(cls, filepath):
117        _, fullname = os.path.split(filepath)
118        filename, ext = os.path.splitext(fullname)
119        LOG.debug("file path:{}".format(filepath))
120        return filename, ext
121
122    @classmethod
123    def get_dir_name(cls, dir_path):
124        dir_name = ""
125        if os.path.isdir(dir_path) and dir_path[-1] != ".":
126            dir_name_list = dir_path.rstrip(os.sep).split(os.sep)
127            if len(dir_name_list) > 1:
128                dir_name = dir_name_list[-1]
129        return dir_name
130
131    def process_resource_file(self, resource_dir, preparer_list, device):
132        for item in preparer_list:
133            if "name" not in item.keys():
134                continue
135
136            if item["name"] == "push":
137                push_value = item["value"]
138                find_key = "->"
139                pos = push_value.find(find_key)
140                src = os.path.join(resource_dir, push_value[0:pos].strip())
141                dst = push_value[pos + len(find_key):len(push_value)].strip()
142                src = src.replace("/", os.sep)
143                dir_name = self.get_dir_name(src)
144                if dir_name != "":
145                    dst = dst.rstrip("/") + "/" + dir_name
146                device.execute_shell_command("mkdir -p %s" % dst)
147                device.push_file(src, dst)
148            elif item["name"] == "pull":
149                push_value = item["value"]
150                find_key = "->"
151                pos = push_value.find(find_key)
152                src = os.path.join(resource_dir, push_value[0:pos].strip())
153                dst = push_value[pos + len(find_key):len(push_value)].strip()
154                device.pull_file(src, dst)
155            elif item["name"] == "shell":
156                command = item["value"].strip()
157                device.execute_shell_command(command)
158            else:
159                command = item["name"] + " " + item["value"]
160                command = command.strip()
161                device.connector_command(command)
162
163    def lite_process_resource_file(self, resource_dir, preparer_list):
164        for item in preparer_list:
165            if "name" not in item.keys():
166                continue
167
168            if item["name"] == "push":
169                copy_value = item["value"]
170                find_key = "->"
171                pos = copy_value.find(find_key)
172                src = os.path.join(resource_dir, copy_value[0:pos].strip())
173                dst = copy_value[pos + len(find_key):len(copy_value)].strip()
174                shutil.copy(src, dst)
175
176            elif item["name"] == "pull":
177                copy_value = item["value"]
178                find_key = "->"
179                pos = copy_value.find(find_key)
180                src = os.path.join(resource_dir, copy_value[0:pos].strip())
181                dst = copy_value[pos + len(find_key):len(copy_value)].strip()
182                shutil.copyfile(dst, src)
183            else:
184                command = item["name"] + " " + item["value"]
185                command = command.strip()
186                self.lite_device.execute_command_with_timeout(command, case_type=DeviceTestType.lite_cpp_test)
187
188    @classmethod
189    def get_env_data(cls, environment_list):
190        env_data_dic = {}
191        device_name = ""
192        option_dic = {}
193
194        for item in environment_list:
195            if "type" in item.keys():
196                if device_name != "":
197                    temp_dic = option_dic.copy()
198                    env_data_dic[device_name] = temp_dic
199                    device_name = ""
200                    option_dic.clear()
201                device_name = item["type"]
202
203            if "name" in item.keys():
204                name = item["name"]
205                value = item["value"]
206                option_dic[name] = value
207
208        if device_name != "":
209            temp_dic = option_dic.copy()
210            env_data_dic[device_name] = temp_dic
211            device_name = ""
212            option_dic.clear()
213        LOG.debug("get environment data finish")
214        return env_data_dic
215
216    @staticmethod
217    def get_resource_xml_file_path(test_suit_file_path):
218        current_dir = os.path.dirname(test_suit_file_path)
219        while True:
220            if current_dir.endswith(os.sep + "tests"):
221                current_dir = ""
222                break
223            if current_dir == "/" or current_dir.endswith(":\\"):
224                current_dir = ""
225                break
226            if os.path.exists(os.path.join(current_dir, "resource")):
227                break
228            current_dir = os.path.dirname(current_dir)
229
230        if current_dir != "":
231            xml_filepath = os.path.join(
232                current_dir,
233                "resource",
234                ConfigFileConst.RESOURCECONFIG_FILEPATH)
235            if not os.path.exists(xml_filepath):
236                xml_filepath = os.path.join(
237                    current_dir,
238                    "resource",
239                    ConfigFileConst.CASE_RESOURCE_FILEPATH)
240        else:
241            xml_filepath = ""
242        LOG.info("xml_filepath = %s" % xml_filepath)
243        return xml_filepath
244
245    @classmethod
246    def get_nodeattrib_data(cls, data_dic):
247        curr_timeout = ""
248        if "nodeattrib" in data_dic.keys():
249            LOG.info("++++++++++++++nodeattrib+++++++++++++++")
250            nodeattrib_list = data_dic["nodeattrib"]
251            if len(nodeattrib_list) != 0:
252                node_item_dic = nodeattrib_list[0]
253                if "timeout" in node_item_dic:
254                    curr_timeout = node_item_dic["timeout"]
255        return curr_timeout
256
257    def get_environment_data(self, data_dic):
258        env_data_dic = {}
259        if "environment" in data_dic.keys():
260            LOG.info("++++++++++++++environment+++++++++++++++")
261            environment_list = data_dic["environment"]
262            env_data_dic = self.get_env_data(environment_list)
263        return env_data_dic
264
265    def process_preparer_data(self, data_dic, resource_dir, device):
266        if "preparer" in data_dic.keys():
267            LOG.info("++++++++++++++preparer+++++++++++++++")
268            preparer_list = data_dic["preparer"]
269            self.process_resource_file(resource_dir, preparer_list, device)
270        return
271
272    def lite_process_preparer_data(self, data_dic, resource_dir):
273        if "preparer" in data_dic.keys():
274            LOG.info("++++++++++++++preparer+++++++++++++++")
275            preparer_list = data_dic["preparer"]
276            self.lite_process_resource_file(resource_dir, preparer_list)
277        return
278
279    def process_cleaner_data(self, data_dic, resource_dir, device):
280        if "cleaner" in data_dic.keys():
281            LOG.info("++++++++++++++cleaner+++++++++++++++")
282            cleaner_list = data_dic["cleaner"]
283            self.process_resource_file(resource_dir, cleaner_list, device)
284        return
285
286
287##############################################################################
288##############################################################################
289