1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import shutil 21import xml.etree.ElementTree as ElementTree 22from xdevice import platform_logger 23from xdevice import DeviceTestType 24from core.constants import ConfigFileConst 25 26LOG = platform_logger("ResourceManager") 27DEFAULT_TIMEOUT = "300" 28 29 30############################################################################## 31############################################################################## 32 33class ResourceManager(object): 34 def __init__(self): 35 pass 36 37 def get_resource_data_dic(self, testsuit_filepath): 38 resource_dir = "" 39 data_dic = {} 40 41 target_name, _ = self._get_file_name_extension(testsuit_filepath) 42 xml_filepath = self.get_resource_xml_file_path(testsuit_filepath) 43 if not os.path.exists(xml_filepath): 44 return data_dic, resource_dir 45 46 data_dic = self.get_resource_data(xml_filepath, target_name) 47 resource_dir = os.path.abspath(os.path.dirname(xml_filepath)) 48 return data_dic, resource_dir 49 50 def get_resource_data(self, xml_filepath, target_name): 51 data_dic = {} 52 if os.path.exists(xml_filepath): 53 data_dic = self._parse_resource_test_xml_file( 54 xml_filepath, target_name) 55 return data_dic 56 57 def _parse_resource_test_xml_file(self, filepath, targetname): 58 data_dic = {} 59 60 node = self.find_node_by_target(filepath, targetname) 61 if node: 62 target_attrib_list = [] 63 target_attrib_list.append(node.attrib) 64 environment_data_list = [] 65 env_node = node.find("environment") 66 if env_node: 67 environment_data_list.append(env_node.attrib) 68 for element in env_node.findall("device"): 69 environment_data_list.append(element.attrib) 70 for option_element in element.findall("option"): 71 environment_data_list.append(option_element.attrib) 72 73 preparer_data_list = [] 74 pre_node = node.find("preparer") 75 if pre_node: 76 preparer_data_list.append(pre_node.attrib) 77 for element in pre_node.findall("option"): 78 preparer_data_list.append(element.attrib) 79 80 cleaner_data_list = [] 81 clr_node = node.find("cleaner") 82 if clr_node: 83 cleaner_data_list.append(clr_node.attrib) 84 for element in clr_node.findall("option"): 85 cleaner_data_list.append(element.attrib) 86 87 data_dic["nodeattrib"] = target_attrib_list 88 data_dic["environment"] = environment_data_list 89 data_dic["preparer"] = preparer_data_list 90 data_dic["cleaner"] = cleaner_data_list 91 92 return data_dic 93 94 @staticmethod 95 def find_node_by_target(file_path, targe_tname): 96 node = None 97 try: 98 if os.path.exists(file_path): 99 tree = ElementTree.parse(file_path) 100 root = tree.getroot() 101 targets = root.getiterator("target") 102 for target in targets: 103 curr_dic = target.attrib 104 if curr_dic.get("name") == targe_tname or \ 105 targe_tname.startswith(curr_dic.get("name")): 106 node = target 107 break 108 except ElementTree.ParseError as xml_exception: 109 LOG.error("resource_test.xml parsing failed." + 110 xml_exception.args) 111 return node 112 113 ########################################################################## 114 ########################################################################## 115 116 @classmethod 117 def _get_file_name_extension(cls, filepath): 118 _, fullname = os.path.split(filepath) 119 filename, ext = os.path.splitext(fullname) 120 LOG.debug("file path:{}".format(filepath)) 121 return filename, ext 122 123 @classmethod 124 def get_dir_name(cls, dir_path): 125 dir_name = "" 126 if os.path.isdir(dir_path) and dir_path[-1] != ".": 127 dir_name_list = dir_path.rstrip(os.sep).split(os.sep) 128 if len(dir_name_list) > 1: 129 dir_name = dir_name_list[-1] 130 return dir_name 131 132 def process_resource_file(self, resource_dir, preparer_list, device): 133 for item in preparer_list: 134 if "name" not in item.keys(): 135 continue 136 137 if item["name"] == "push": 138 push_value = item["value"] 139 find_key = "->" 140 pos = push_value.find(find_key) 141 src = os.path.join(resource_dir, push_value[0:pos].strip()) 142 dst = push_value[pos + len(find_key):len(push_value)].strip() 143 src = src.replace("/", os.sep) 144 dir_name = self.get_dir_name(src) 145 if dir_name != "": 146 dst = dst.rstrip("/") + "/" + dir_name 147 device.execute_shell_command("mkdir -p %s" % dst) 148 device.push_file(src, dst) 149 elif item["name"] == "pull": 150 push_value = item["value"] 151 find_key = "->" 152 pos = push_value.find(find_key) 153 src = os.path.join(resource_dir, push_value[0:pos].strip()) 154 dst = push_value[pos + len(find_key):len(push_value)].strip() 155 device.pull_file(src, dst) 156 elif item["name"] == "shell": 157 command = item["value"].strip() 158 device.execute_shell_command(command) 159 else: 160 command = item["name"] + " " + item["value"] 161 command = command.strip() 162 device.hdc_command(command) 163 164 def lite_process_resource_file(self, resource_dir, preparer_list): 165 for item in preparer_list: 166 if "name" not in item.keys(): 167 continue 168 169 if item["name"] == "push": 170 copy_value = item["value"] 171 find_key = "->" 172 pos = copy_value.find(find_key) 173 src = os.path.join(resource_dir, copy_value[0:pos].strip()) 174 dst = copy_value[pos + len(find_key):len(copy_value)].strip() 175 shutil.copy(src, dst) 176 177 elif item["name"] == "pull": 178 copy_value = item["value"] 179 find_key = "->" 180 pos = copy_value.find(find_key) 181 src = os.path.join(resource_dir, copy_value[0:pos].strip()) 182 dst = copy_value[pos + len(find_key):len(copy_value)].strip() 183 shutil.copyfile(dst, src) 184 else: 185 command = item["name"] + " " + item["value"] 186 command = command.strip() 187 self.lite_device.execute_command_with_timeout(command, case_type=DeviceTestType.lite_cpp_test) 188 189 @classmethod 190 def get_env_data(cls, environment_list): 191 env_data_dic = {} 192 device_name = "" 193 option_dic = {} 194 195 for item in environment_list: 196 if "type" in item.keys(): 197 if device_name != "": 198 temp_dic = option_dic.copy() 199 env_data_dic[device_name] = temp_dic 200 device_name = "" 201 option_dic.clear() 202 device_name = item["type"] 203 204 if "name" in item.keys(): 205 name = item["name"] 206 value = item["value"] 207 option_dic[name] = value 208 209 if device_name != "": 210 temp_dic = option_dic.copy() 211 env_data_dic[device_name] = temp_dic 212 device_name = "" 213 option_dic.clear() 214 LOG.debug("get environment data finish") 215 return env_data_dic 216 217 @staticmethod 218 def get_resource_xml_file_path(test_suit_file_path): 219 current_dir = os.path.dirname(test_suit_file_path) 220 while True: 221 if current_dir.endswith(os.sep + "tests"): 222 current_dir = "" 223 break 224 if current_dir == "/" or current_dir.endswith(":\\"): 225 current_dir = "" 226 break 227 if os.path.exists(os.path.join(current_dir, "resource")): 228 break 229 current_dir = os.path.dirname(current_dir) 230 231 if current_dir != "": 232 xml_filepath = os.path.join( 233 current_dir, 234 "resource", 235 ConfigFileConst.RESOURCECONFIG_FILEPATH) 236 if not os.path.exists(xml_filepath): 237 xml_filepath = os.path.join( 238 current_dir, 239 "resource", 240 ConfigFileConst.CASE_RESOURCE_FILEPATH) 241 else: 242 xml_filepath = "" 243 LOG.info("xml_filepath = %s" % xml_filepath) 244 return xml_filepath 245 246 @classmethod 247 def get_nodeattrib_data(cls, data_dic): 248 curr_timeout = DEFAULT_TIMEOUT 249 if "nodeattrib" in data_dic.keys(): 250 LOG.info("++++++++++++++nodeattrib+++++++++++++++") 251 nodeattrib_list = data_dic["nodeattrib"] 252 if len(nodeattrib_list) != 0: 253 node_item_dic = nodeattrib_list[0] 254 if "timeout" in node_item_dic: 255 curr_timeout = node_item_dic["timeout"] 256 return curr_timeout 257 258 def get_environment_data(self, data_dic): 259 env_data_dic = {} 260 if "environment" in data_dic.keys(): 261 LOG.info("++++++++++++++environment+++++++++++++++") 262 environment_list = data_dic["environment"] 263 env_data_dic = self.get_env_data(environment_list) 264 return env_data_dic 265 266 def process_preparer_data(self, data_dic, resource_dir, device): 267 if "preparer" in data_dic.keys(): 268 LOG.info("++++++++++++++preparer+++++++++++++++") 269 preparer_list = data_dic["preparer"] 270 self.process_resource_file(resource_dir, preparer_list, device) 271 return 272 273 def lite_process_preparer_data(self, data_dic, resource_dir): 274 if "preparer" in data_dic.keys(): 275 LOG.info("++++++++++++++preparer+++++++++++++++") 276 preparer_list = data_dic["preparer"] 277 self.lite_process_resource_file(resource_dir, preparer_list) 278 return 279 280 def process_cleaner_data(self, data_dic, resource_dir, device): 281 if "cleaner" in data_dic.keys(): 282 LOG.info("++++++++++++++cleaner+++++++++++++++") 283 cleaner_list = data_dic["cleaner"] 284 self.process_resource_file(resource_dir, cleaner_list, device) 285 return 286 287 288############################################################################## 289############################################################################## 290