1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import codecs 20import os 21import shutil 22import stat 23import sys 24import time 25 26from devicetest.core.error_message import ErrorMessage 27from devicetest.core.exception import DeviceTestError 28from devicetest.core.variables import get_project_path 29from devicetest.log.logger import DeviceTestLog as log 30 31 32def get_template_path(template_file_path, isdir=None): 33 ''' 34 @summary: Obtains the absolute path of the template screen cap path. 35 @param isdir: Obtain the directory: True; Obtain the file: False; 36 None: Ignore the file type 37 ''' 38 template_file_path = template_file_path.replace("\\", "/") 39 if os.path.isabs(template_file_path) \ 40 and (not isdir and os.path.isfile(template_file_path)): 41 return os.path.abspath(template_file_path) 42 43 # remove first str '/' 44 if not os.path.isfile(template_file_path) and template_file_path.startswith("/"): 45 template_file_path = template_file_path[1:] 46 47 _fol = None 48 if template_file_path.startswith("resource"): 49 path = template_file_path[9:] 50 from xdevice import EnvPool 51 if EnvPool.resource_path is not None: 52 folder = os.path.abspath(EnvPool.resource_path) 53 _fol = travesal_folder(folder, path, isdir) 54 if _fol is None: 55 log.debug("Not found [%s] in env pool path %s, " 56 "continue to find template in resource path." % ( 57 path, folder)) 58 if _fol is None: 59 ecotest_resource_path = getattr(sys, "ecotest_resource_path", "") 60 if ecotest_resource_path is not None: 61 folder = os.path.abspath(ecotest_resource_path) 62 _fol = travesal_folder(folder, path, isdir) 63 if _fol is None: 64 log.debug("Not found [%s] in resource path %s, " 65 "continue to find template in other path." % ( 66 path, folder)) 67 else: 68 _fol = get_resource_path(template_file_path) 69 log.debug("get template path:{}".format(_fol)) 70 return _fol 71 72 73def get_resource_path(resource_file_path, isdir=None): 74 ''' 75 @summary: Obtains the absolute path of the resource file. 76 @param isdir: Obtain the directory: True; Obtain the file: False; 77 None: Ignore the file type 78 ''' 79 resource_file_path = resource_file_path.replace("\\", "/") 80 if os.path.isabs(resource_file_path) \ 81 and ((isdir is None and os.path.exists(resource_file_path)) 82 or (not isdir and os.path.isfile(resource_file_path)) 83 or (isdir and os.path.isdir(resource_file_path))): 84 return os.path.abspath(resource_file_path) 85 86 _fol = None 87 from xdevice import EnvPool 88 if EnvPool.resource_path is not None: 89 folder = os.path.abspath(EnvPool.resource_path) 90 _fol = travesal_folder(folder, resource_file_path, isdir) 91 if _fol is None: 92 log.debug("Not found [%s] in env pool path %s, " 93 "continue to find in project resource path." % ( 94 resource_file_path, folder)) 95 96 if _fol is None: 97 ecotest_resource_path = getattr(sys, "ecotest_resource_path", "") 98 if ecotest_resource_path is not None: 99 folder = os.path.abspath(ecotest_resource_path) 100 _fol = travesal_folder(folder, resource_file_path, isdir) 101 if _fol is None: 102 log.debug("Not found [%s] in ecotest path %s, " 103 "continue to find in suit path." % ( 104 resource_file_path, folder)) 105 106 from devicetest.core.variables import DeccVariable 107 if _fol is None: 108 folder = os.path.abspath(DeccVariable.project.resource_path) 109 _fol = travesal_folder(folder, resource_file_path, isdir) 110 if _fol is None: 111 log.debug("Not found [%s] in product path %s, " 112 "continue to find in project resource path." % ( 113 resource_file_path, folder)) 114 115 if _fol is None: 116 folder = os.path.abspath(DeccVariable.project.test_suite_path) 117 _fol = travesal_folder(folder, resource_file_path, isdir) 118 if _fol is None: 119 log.debug("Not found [%s] in product path %s, " 120 "continue to find in suit resource path." % ( 121 resource_file_path, folder)) 122 123 if _fol is None: 124 folder = os.path.abspath(get_project_path()) 125 _fol = travesal_folder(folder, resource_file_path, isdir) 126 if _fol is None: 127 log.debug("Not found [%s] in product path %s, " 128 "continue to find in project path." % ( 129 resource_file_path, folder)) 130 131 if _fol is None: 132 log.error(ErrorMessage.Error_01102.Message.en, 133 error_no=ErrorMessage.Error_01102.Code) 134 raise DeviceTestError(ErrorMessage.Error_01102.Topic) 135 log.debug("get resource path:{}".format(_fol)) 136 return _fol 137 138 139def travesal_folder(folder, folder_file_path, isdir=False): 140 folder_file = os.path.join(folder, folder_file_path) 141 if (isdir is None and os.path.exists(folder_file)) \ 142 or (not isdir and os.path.isfile(folder_file)) \ 143 or (isdir and os.path.isdir(folder_file)): 144 return os.path.abspath(folder_file) 145 146 if not os.path.exists(folder): 147 return None 148 149 for child in os.listdir(folder): 150 if child == ".svn": 151 continue 152 153 folder_file = os.path.join(folder, child) 154 if os.path.isdir(folder_file): 155 if (isdir is None or isdir) \ 156 and folder_file.endswith(os.sep + folder_file_path): 157 return folder_file 158 else: 159 folder_ret = travesal_folder(folder_file, 160 folder_file_path, isdir) 161 if folder_ret is not None: 162 return folder_ret 163 elif os.path.isfile(folder_file) \ 164 and folder_file.endswith(os.sep + folder_file_path) \ 165 and (isdir is None or not isdir): 166 return folder_file 167 168 return None 169 170 171def os_open_file_write(file_path, content, mode="w"): 172 try: 173 flags = os.O_WRONLY | os.O_CREAT 174 modes = stat.S_IWUSR | stat.S_IRUSR 175 dir_path = os.path.dirname(file_path) 176 if not os.path.isdir(dir_path): 177 os.makedirs(dir_path) 178 with os.fdopen(os.open(file_path, flags, modes), mode) as fout: 179 fout.write(content) 180 181 except Exception as error: 182 log.error(ErrorMessage.Error_01214.Message.en, 183 error_no=ErrorMessage.Error_01214.Code, 184 is_traceback=True) 185 raise DeviceTestError(ErrorMessage.Error_01214.Topic) from error 186 187 188def os_open_file_read(file_path, mode="r"): 189 try: 190 flags = os.O_RDONLY 191 modes = stat.S_IWUSR | stat.S_IRUSR 192 with os.fdopen(os.open(file_path, flags, modes), mode) as fout: 193 return fout.read() 194 195 except FileNotFoundError as error: 196 log.error(ErrorMessage.Error_01216.Message.en, 197 error_no=ErrorMessage.Error_01216.Code, 198 is_traceback=True) 199 raise DeviceTestError(ErrorMessage.Error_01216.Topic) from error 200 201 except Exception as error: 202 log.error(ErrorMessage.Error_01215.Message.en, 203 error_no=ErrorMessage.Error_01215.Code, 204 is_traceback=True) 205 raise DeviceTestError(ErrorMessage.Error_01215.Topic) from error 206 207 208def save_file(file_path, content): 209 os_open_file_write(file_path, content, "wb") 210 211 212def create_dir(create_path): 213 """ 214 Creates a directory if it does not exist already. 215 Args: 216 create_path: The path of the directory to create. 217 """ 218 full_path = os.path.abspath(os.path.expanduser(create_path)) 219 if not os.path.exists(full_path): 220 os.makedirs(full_path, exist_ok=True) # exist_ok=True 221 222 223def to_file_plus(file_path, content, console=False, level="INFO"): 224 """ 225 @summary: Create file, append "content" to file and add timestamp. 226 """ 227 dirname = os.path.dirname(file_path) 228 if not os.path.exists(dirname): 229 # When the to_file_plus method is invoked in a thread, there is a 230 # possibility that the method does not exist when os.path.exists is 231 # executed. However, the directory has been created by the main thread 232 # during directory creation. Therefore, an exception is captured. 233 try: 234 os.makedirs(dirname) 235 except Exception as exception: 236 log.error(exception) 237 from devicetest.log.logger import get_log_line_timestamp 238 timestamp = get_log_line_timestamp() 239 data = "%s %s %s\n" % (timestamp, level, content) 240 if console: 241 print(data[:-2]) 242 srw = codecs.open(file_path, "a", "utf-8") 243 srw.write(data) 244 srw.close() 245 246 247def to_file(filename, content): 248 ''' 249 genenrate files 250 ''' 251 dirname = os.path.dirname(filename) 252 if not os.path.isdir(dirname): 253 os.makedirs(dirname) 254 os_open_file_write(filename, content, "wb") 255 256 257def delfile(filename): 258 try: 259 os.remove(filename) 260 except Exception as exception: 261 log.error(exception) 262 if os.path.isfile(filename): 263 if "nt" in sys.builtin_module_names: 264 os.remove(filename) 265 else: 266 shutil.rmtree(filename) 267 268 for _ in range(5): 269 if os.path.isfile(filename): 270 time.sleep(0.1) 271 continue 272 else: 273 break 274 275 if os.path.isfile(filename): 276 log.error("Delete file %s failed." % filename) 277 278 279def delfolder(dirname): 280 try: 281 shutil.rmtree(dirname) 282 except Exception as _: 283 if os.path.isdir(dirname): 284 shutil.rmtree(dirname) 285 286 for _ in range(5): 287 if os.path.isdir(dirname): 288 time.sleep(0.1) 289 continue 290 else: 291 break 292 293 if os.path.isdir(dirname): 294 log.error("Delete folder %s failed." % dirname) 295 296 297def copy_to_folder(src, des): 298 """Copy a folder and its children or a file to another folder. 299 """ 300 src = os.path.normpath(src) 301 des = os.path.normpath(des) 302 if not os.path.exists(src): 303 log.error("No found [%s]" % src) 304 return 305 if not os.path.exists(des): 306 create_dir(des) 307 if not os.path.isdir(des): 308 log.error("[%s] is not a folder." % des) 309 return 310 311 if not os.path.isdir(src): 312 shutil.copy(src, des) 313 return 314 os.chdir(src) 315 src_file_list = [os.path.join(src, src_file) 316 for src_file in os.listdir(des)] 317 for source in src_file_list: 318 if os.path.isfile(source): 319 shutil.copy(source, des) 320 if os.path.isdir(source): 321 _, src_name = os.path.split(source) 322 shutil.copytree(source, os.path.join(des, src_name)) 323 324 325def delete_file_folder(src): 326 ''' 327 @summary: Delete files or directories. 328 ''' 329 330 if os.path.isfile(src): 331 delfile(src) 332 elif os.path.isdir(src): 333 delfolder(src) 334