1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import shutil 21import xml.etree.ElementTree as ElementTree 22from xdevice import platform_logger 23from xdevice import DeviceTestType 24from core.constants import ConfigFileConst 25 26LOG = platform_logger("ResourceManager") 27 28 29############################################################################## 30############################################################################## 31 32class ResourceManager(object): 33 def __init__(self): 34 pass 35 36 def get_resource_data_dic(self, testsuit_filepath): 37 resource_dir = "" 38 data_dic = {} 39 40 target_name, _ = self._get_file_name_extension(testsuit_filepath) 41 xml_filepath = self.get_resource_xml_file_path(testsuit_filepath) 42 if not os.path.exists(xml_filepath): 43 return data_dic, resource_dir 44 45 data_dic = self.get_resource_data(xml_filepath, target_name) 46 resource_dir = os.path.abspath(os.path.dirname(xml_filepath)) 47 return data_dic, resource_dir 48 49 def get_resource_data(self, xml_filepath, target_name): 50 data_dic = {} 51 if os.path.exists(xml_filepath): 52 data_dic = self._parse_resource_test_xml_file( 53 xml_filepath, target_name) 54 return data_dic 55 56 def _parse_resource_test_xml_file(self, filepath, targetname): 57 data_dic = {} 58 59 node = self.find_node_by_target(filepath, targetname) 60 if node: 61 target_attrib_list = [] 62 target_attrib_list.append(node.attrib) 63 environment_data_list = [] 64 env_node = node.find("environment") 65 if env_node: 66 environment_data_list.append(env_node.attrib) 67 for element in env_node.findall("device"): 68 environment_data_list.append(element.attrib) 69 for option_element in element.findall("option"): 70 environment_data_list.append(option_element.attrib) 71 72 preparer_data_list = [] 73 pre_node = node.find("preparer") 74 if pre_node: 75 preparer_data_list.append(pre_node.attrib) 76 for element in pre_node.findall("option"): 77 preparer_data_list.append(element.attrib) 78 79 cleaner_data_list = [] 80 clr_node = node.find("cleaner") 81 if clr_node: 82 cleaner_data_list.append(clr_node.attrib) 83 for element in clr_node.findall("option"): 84 cleaner_data_list.append(element.attrib) 85 86 data_dic["nodeattrib"] = target_attrib_list 87 data_dic["environment"] = environment_data_list 88 data_dic["preparer"] = preparer_data_list 89 data_dic["cleaner"] = cleaner_data_list 90 91 return data_dic 92 93 @staticmethod 94 def find_node_by_target(file_path, targe_tname): 95 node = None 96 try: 97 if os.path.exists(file_path): 98 tree = ElementTree.parse(file_path) 99 root = tree.getroot() 100 targets = root.getiterator("target") 101 for target in targets: 102 curr_dic = target.attrib 103 if curr_dic.get("name") == targe_tname or \ 104 targe_tname.startswith(curr_dic.get("name")): 105 node = target 106 break 107 except ElementTree.ParseError as xml_exception: 108 LOG.error("resource_test.xml parsing failed." + 109 xml_exception.args) 110 return node 111 112 ########################################################################## 113 ########################################################################## 114 115 @classmethod 116 def _get_file_name_extension(cls, filepath): 117 _, fullname = os.path.split(filepath) 118 filename, ext = os.path.splitext(fullname) 119 LOG.debug("file path:{}".format(filepath)) 120 return filename, ext 121 122 @classmethod 123 def get_dir_name(cls, dir_path): 124 dir_name = "" 125 if os.path.isdir(dir_path) and dir_path[-1] != ".": 126 dir_name_list = dir_path.rstrip(os.sep).split(os.sep) 127 if len(dir_name_list) > 1: 128 dir_name = dir_name_list[-1] 129 return dir_name 130 131 def process_resource_file(self, resource_dir, preparer_list, device): 132 for item in preparer_list: 133 if "name" not in item.keys(): 134 continue 135 136 if item["name"] == "push": 137 push_value = item["value"] 138 find_key = "->" 139 pos = push_value.find(find_key) 140 src = os.path.join(resource_dir, push_value[0:pos].strip()) 141 dst = push_value[pos + len(find_key):len(push_value)].strip() 142 src = src.replace("/", os.sep) 143 dir_name = self.get_dir_name(src) 144 if dir_name != "": 145 dst = dst.rstrip("/") + "/" + dir_name 146 device.execute_shell_command("mkdir -p %s" % dst) 147 device.push_file(src, dst) 148 elif item["name"] == "pull": 149 push_value = item["value"] 150 find_key = "->" 151 pos = push_value.find(find_key) 152 src = os.path.join(resource_dir, push_value[0:pos].strip()) 153 dst = push_value[pos + len(find_key):len(push_value)].strip() 154 device.pull_file(src, dst) 155 elif item["name"] == "shell": 156 command = item["value"].strip() 157 device.execute_shell_command(command) 158 else: 159 command = item["name"] + " " + item["value"] 160 command = command.strip() 161 device.connector_command(command) 162 163 def lite_process_resource_file(self, resource_dir, preparer_list): 164 for item in preparer_list: 165 if "name" not in item.keys(): 166 continue 167 168 if item["name"] == "push": 169 copy_value = item["value"] 170 find_key = "->" 171 pos = copy_value.find(find_key) 172 src = os.path.join(resource_dir, copy_value[0:pos].strip()) 173 dst = copy_value[pos + len(find_key):len(copy_value)].strip() 174 shutil.copy(src, dst) 175 176 elif item["name"] == "pull": 177 copy_value = item["value"] 178 find_key = "->" 179 pos = copy_value.find(find_key) 180 src = os.path.join(resource_dir, copy_value[0:pos].strip()) 181 dst = copy_value[pos + len(find_key):len(copy_value)].strip() 182 shutil.copyfile(dst, src) 183 else: 184 command = item["name"] + " " + item["value"] 185 command = command.strip() 186 self.lite_device.execute_command_with_timeout(command, case_type=DeviceTestType.lite_cpp_test) 187 188 @classmethod 189 def get_env_data(cls, environment_list): 190 env_data_dic = {} 191 device_name = "" 192 option_dic = {} 193 194 for item in environment_list: 195 if "type" in item.keys(): 196 if device_name != "": 197 temp_dic = option_dic.copy() 198 env_data_dic[device_name] = temp_dic 199 device_name = "" 200 option_dic.clear() 201 device_name = item["type"] 202 203 if "name" in item.keys(): 204 name = item["name"] 205 value = item["value"] 206 option_dic[name] = value 207 208 if device_name != "": 209 temp_dic = option_dic.copy() 210 env_data_dic[device_name] = temp_dic 211 device_name = "" 212 option_dic.clear() 213 LOG.debug("get environment data finish") 214 return env_data_dic 215 216 @staticmethod 217 def get_resource_xml_file_path(test_suit_file_path): 218 current_dir = os.path.dirname(test_suit_file_path) 219 while True: 220 if current_dir.endswith(os.sep + "tests"): 221 current_dir = "" 222 break 223 if current_dir == "/" or current_dir.endswith(":\\"): 224 current_dir = "" 225 break 226 if os.path.exists(os.path.join(current_dir, "resource")): 227 break 228 current_dir = os.path.dirname(current_dir) 229 230 if current_dir != "": 231 xml_filepath = os.path.join( 232 current_dir, 233 "resource", 234 ConfigFileConst.RESOURCECONFIG_FILEPATH) 235 if not os.path.exists(xml_filepath): 236 xml_filepath = os.path.join( 237 current_dir, 238 "resource", 239 ConfigFileConst.CASE_RESOURCE_FILEPATH) 240 else: 241 xml_filepath = "" 242 LOG.info("xml_filepath = %s" % xml_filepath) 243 return xml_filepath 244 245 @classmethod 246 def get_nodeattrib_data(cls, data_dic): 247 curr_timeout = "" 248 if "nodeattrib" in data_dic.keys(): 249 LOG.info("++++++++++++++nodeattrib+++++++++++++++") 250 nodeattrib_list = data_dic["nodeattrib"] 251 if len(nodeattrib_list) != 0: 252 node_item_dic = nodeattrib_list[0] 253 if "timeout" in node_item_dic: 254 curr_timeout = node_item_dic["timeout"] 255 return curr_timeout 256 257 def get_environment_data(self, data_dic): 258 env_data_dic = {} 259 if "environment" in data_dic.keys(): 260 LOG.info("++++++++++++++environment+++++++++++++++") 261 environment_list = data_dic["environment"] 262 env_data_dic = self.get_env_data(environment_list) 263 return env_data_dic 264 265 def process_preparer_data(self, data_dic, resource_dir, device): 266 if "preparer" in data_dic.keys(): 267 LOG.info("++++++++++++++preparer+++++++++++++++") 268 preparer_list = data_dic["preparer"] 269 self.process_resource_file(resource_dir, preparer_list, device) 270 return 271 272 def lite_process_preparer_data(self, data_dic, resource_dir): 273 if "preparer" in data_dic.keys(): 274 LOG.info("++++++++++++++preparer+++++++++++++++") 275 preparer_list = data_dic["preparer"] 276 self.lite_process_resource_file(resource_dir, preparer_list) 277 return 278 279 def process_cleaner_data(self, data_dic, resource_dir, device): 280 if "cleaner" in data_dic.keys(): 281 LOG.info("++++++++++++++cleaner+++++++++++++++") 282 cleaner_list = data_dic["cleaner"] 283 self.process_resource_file(resource_dir, cleaner_list, device) 284 return 285 286 287############################################################################## 288############################################################################## 289