1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import shutil 21import xml.etree.ElementTree as ElementTree 22from xdevice import platform_logger 23from xdevice import DeviceTestType 24from core.constants import ConfigFileConst 25 26LOG = platform_logger("ResourceManager") 27 28 29############################################################################## 30############################################################################## 31 32class ResourceManager(object): 33 def __init__(self): 34 pass 35 36 def get_resource_data_dic(self, testsuit_filepath): 37 resource_dir = "" 38 data_dic = {} 39 40 target_name, _ = self._get_file_name_extension(testsuit_filepath) 41 xml_filepath = self.get_resource_xml_file_path(testsuit_filepath) 42 if not os.path.exists(xml_filepath): 43 return data_dic, resource_dir 44 45 data_dic = self.get_resource_data(xml_filepath, target_name) 46 resource_dir = os.path.abspath(os.path.dirname(xml_filepath)) 47 return data_dic, resource_dir 48 49 def get_resource_data(self, xml_filepath, target_name): 50 data_dic = {} 51 if os.path.exists(xml_filepath): 52 data_dic = self._parse_resource_test_xml_file( 53 xml_filepath, target_name) 54 return data_dic 55 56 def _parse_resource_test_xml_file(self, filepath, targetname): 57 data_dic = {} 58 59 node = self.find_node_by_target(filepath, targetname) 60 if node: 61 target_attrib_list = [] 62 target_attrib_list.append(node.attrib) 63 environment_data_list = [] 64 env_node = node.find("environment") 65 if env_node: 66 environment_data_list.append(env_node.attrib) 67 for element in env_node.findall("device"): 68 environment_data_list.append(element.attrib) 69 for option_element in element.findall("option"): 70 environment_data_list.append(option_element.attrib) 71 72 preparer_data_list = [] 73 pre_node = node.find("preparer") 74 if pre_node: 75 preparer_data_list.append(pre_node.attrib) 76 for element in pre_node.findall("option"): 77 preparer_data_list.append(element.attrib) 78 79 cleaner_data_list = [] 80 clr_node = node.find("cleaner") 81 if clr_node: 82 cleaner_data_list.append(clr_node.attrib) 83 for element in clr_node.findall("option"): 84 cleaner_data_list.append(element.attrib) 85 86 data_dic["nodeattrib"] = target_attrib_list 87 data_dic["environment"] = environment_data_list 88 data_dic["preparer"] = preparer_data_list 89 data_dic["cleaner"] = cleaner_data_list 90 91 return data_dic 92 93 @staticmethod 94 def find_node_by_target(file_path, targe_tname): 95 node = None 96 try: 97 if os.path.exists(file_path): 98 tree = ElementTree.parse(file_path) 99 root = tree.getroot() 100 targets = root.getiterator("target") 101 for target in targets: 102 curr_dic = target.attrib 103 if curr_dic.get("name") == targe_tname or \ 104 targe_tname.startswith(curr_dic.get("name")): 105 node = target 106 break 107 except ElementTree.ParseError as xml_exception: 108 LOG.error("resource_test.xml parsing failed." + 109 xml_exception.args) 110 return node 111 112 ########################################################################## 113 ########################################################################## 114 115 @classmethod 116 def _get_file_name_extension(cls, filepath): 117 _, fullname = os.path.split(filepath) 118 filename, ext = os.path.splitext(fullname) 119 LOG.debug("file path:{}".format(filepath)) 120 return filename, ext 121 122 @classmethod 123 def get_dir_name(cls, dir_path): 124 dir_name = "" 125 if os.path.isdir(dir_path) and dir_path[-1] != ".": 126 dir_name_list = dir_path.rstrip(os.sep).split(os.sep) 127 if len(dir_name_list) > 1: 128 dir_name = dir_name_list[-1] 129 return dir_name 130 131 def process_resource_file(self, resource_dir, preparer_list, device): 132 for item in preparer_list: 133 if "name" not in item.keys(): 134 continue 135 136 if item["name"] == "push": 137 push_value = item["value"] 138 find_key = "->" 139 pos = push_value.find(find_key) 140 src = os.path.join(resource_dir, push_value[0:pos].strip()) 141 dst = push_value[pos + len(find_key):len(push_value)].strip() 142 src = src.replace("/", os.sep) 143 dir_name = self.get_dir_name(src) 144 if dir_name != "": 145 dst = dst.rstrip("/") + "/" + dir_name 146 device.execute_shell_command("mkdir -p %s" % dst) 147 device.push_file(src, dst) 148 device.execute_shell_command("hilog -d %s" % ( 149 os.path.join(dst, os.path.basename(src)))) 150 elif item["name"] == "pull": 151 push_value = item["value"] 152 find_key = "->" 153 pos = push_value.find(find_key) 154 src = os.path.join(resource_dir, push_value[0:pos].strip()) 155 dst = push_value[pos + len(find_key):len(push_value)].strip() 156 device.pull_file(src, dst) 157 elif item["name"] == "shell": 158 command = item["value"].strip() 159 device.execute_shell_command(command) 160 else: 161 command = item["name"] + " " + item["value"] 162 command = command.strip() 163 device.connector_command(command) 164 165 def lite_process_resource_file(self, resource_dir, preparer_list): 166 for item in preparer_list: 167 if "name" not in item.keys(): 168 continue 169 170 if item["name"] == "push": 171 copy_value = item["value"] 172 find_key = "->" 173 pos = copy_value.find(find_key) 174 src = os.path.join(resource_dir, copy_value[0:pos].strip()) 175 dst = copy_value[pos + len(find_key):len(copy_value)].strip() 176 shutil.copy(src, dst) 177 178 elif item["name"] == "pull": 179 copy_value = item["value"] 180 find_key = "->" 181 pos = copy_value.find(find_key) 182 src = os.path.join(resource_dir, copy_value[0:pos].strip()) 183 dst = copy_value[pos + len(find_key):len(copy_value)].strip() 184 shutil.copyfile(dst, src) 185 else: 186 command = item["name"] + " " + item["value"] 187 command = command.strip() 188 self.lite_device.execute_command_with_timeout(command, case_type=DeviceTestType.lite_cpp_test) 189 190 @classmethod 191 def get_env_data(cls, environment_list): 192 env_data_dic = {} 193 device_name = "" 194 option_dic = {} 195 196 for item in environment_list: 197 if "type" in item.keys(): 198 if device_name != "": 199 temp_dic = option_dic.copy() 200 env_data_dic[device_name] = temp_dic 201 device_name = "" 202 option_dic.clear() 203 device_name = item["type"] 204 205 if "name" in item.keys(): 206 name = item["name"] 207 value = item["value"] 208 option_dic[name] = value 209 210 if device_name != "": 211 temp_dic = option_dic.copy() 212 env_data_dic[device_name] = temp_dic 213 device_name = "" 214 option_dic.clear() 215 LOG.debug("get environment data finish") 216 return env_data_dic 217 218 @staticmethod 219 def get_resource_xml_file_path(test_suit_file_path): 220 current_dir = os.path.dirname(test_suit_file_path) 221 while True: 222 if current_dir.endswith(os.sep + "tests"): 223 current_dir = "" 224 break 225 if current_dir == "/" or current_dir.endswith(":\\"): 226 current_dir = "" 227 break 228 if os.path.exists(os.path.join(current_dir, "resource")): 229 break 230 current_dir = os.path.dirname(current_dir) 231 232 if current_dir != "": 233 xml_filepath = os.path.join( 234 current_dir, 235 "resource", 236 ConfigFileConst.RESOURCECONFIG_FILEPATH) 237 if not os.path.exists(xml_filepath): 238 xml_filepath = os.path.join( 239 current_dir, 240 "resource", 241 ConfigFileConst.CASE_RESOURCE_FILEPATH) 242 else: 243 xml_filepath = "" 244 LOG.info("xml_filepath = %s" % xml_filepath) 245 return xml_filepath 246 247 @classmethod 248 def get_nodeattrib_data(cls, data_dic): 249 curr_timeout = "" 250 if "nodeattrib" in data_dic.keys(): 251 LOG.info("++++++++++++++nodeattrib+++++++++++++++") 252 nodeattrib_list = data_dic["nodeattrib"] 253 if len(nodeattrib_list) != 0: 254 node_item_dic = nodeattrib_list[0] 255 if "timeout" in node_item_dic: 256 curr_timeout = node_item_dic["timeout"] 257 return curr_timeout 258 259 def get_environment_data(self, data_dic): 260 env_data_dic = {} 261 if "environment" in data_dic.keys(): 262 LOG.info("++++++++++++++environment+++++++++++++++") 263 environment_list = data_dic["environment"] 264 env_data_dic = self.get_env_data(environment_list) 265 return env_data_dic 266 267 def process_preparer_data(self, data_dic, resource_dir, device): 268 if "preparer" in data_dic.keys(): 269 LOG.info("++++++++++++++preparer+++++++++++++++") 270 preparer_list = data_dic["preparer"] 271 self.process_resource_file(resource_dir, preparer_list, device) 272 return 273 274 def lite_process_preparer_data(self, data_dic, resource_dir): 275 if "preparer" in data_dic.keys(): 276 LOG.info("++++++++++++++preparer+++++++++++++++") 277 preparer_list = data_dic["preparer"] 278 self.lite_process_resource_file(resource_dir, preparer_list) 279 return 280 281 def process_cleaner_data(self, data_dic, resource_dir, device): 282 if "cleaner" in data_dic.keys(): 283 LOG.info("++++++++++++++cleaner+++++++++++++++") 284 cleaner_list = data_dic["cleaner"] 285 self.process_resource_file(resource_dir, cleaner_list, device) 286 return 287 288 289############################################################################## 290############################################################################## 291