1# -*- coding:utf-8 -*- 2import traceback 3import uuid 4import sys 5import subprocess 6import os 7import time 8import re 9import shutil 10import random 11import platform 12import socket 13 14from core.base import BaseApp, dec_stepmsg 15from util.file_locker import FileLock 16from util.log_info import logger 17from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout 18from aw.Download.Download import * 19from aw.Common.Constant import CONSTANT 20from aw.Common.Common import getFileName 21from aw.ExtractFile.ExtractFile import * 22from aw.Common.Common import getHostIp, copyFile, copyDirectory 23 24total_time = "" 25lock_suffix = CONSTANT.File.LOCK_SUFFIX 26suc_file = CONSTANT.File.SUC_FILE 27failed_file = CONSTANT.File.FAILED_FILE 28REBOOT_TIMEOUT = 20000000 29 30 31class liteOsUpgrade_RK3568(BaseApp): 32 ''' 33 @author: cwx1076044 34 ''' 35 36 def __init__(self, param_file): 37 super().__init__(param_file) 38 self.param_List = ["upgrade_upgradeLocation", "sn"] 39 40 @dec_stepmsg("hongmeng RK3568 flash") 41 def excute(self): 42 ''' 43 #=================================================================================== 44 # @Method: excute(self) 45 # @Precondition: none 46 # @Func: 升级执行入口 47 # @PostStatus: none 48 # @eg: excute() 49 # @return: True or Flase 50 #=================================================================================== 51 ''' 52 step_index = self.params_dict.get("step_list").index("liteOsUpgrade_RK3568_app") 53 54 # # 执行下载 55 # try: 56 # if not self.download(): 57 # CONSTANT.ENVERRMESSAGE = "image download fail" 58 # logger.printLog(CONSTANT.ENVERRMESSAGE) 59 # return 98 60 # except Exception as e: 61 # logger.error(e) 62 # #raise e 63 # return 98 64 65 # 执行升级 66 try: 67 return_code = self.upgrade() 68 if not return_code: 69 CONSTANT.ENVERRMESSAGE = "board upgrade fail" 70 logger.printLog(CONSTANT.ENVERRMESSAGE) 71 return False 72 if return_code == 98: 73 return 98 74 if return_code == 99: 75 return 99 76 return True 77 except Exception as e: 78 logger.error(e) 79 raise e 80 81 @dec_stepmsg("upgrade") 82 @timeout(3600) 83 def upgrade(self): 84 ''' 85 #=================================================================================== 86 # @Method: upgrade(self) 87 # @Precondition: none 88 # @Func: 升级相关业务逻辑 89 # @PostStatus: none 90 # @eg: upgrade() 91 # @return: True or Flase 92 #=================================================================================== 93 ''' 94 global local_image_path, loader_tool_path, sn, LocationID, test_num, system_type 95 system_type = platform.system() 96 hostname = socket.gethostname() 97 ipaddress = socket.gethostbyname(hostname) 98 logger.printLog("******系统ip为:%s ******" % ipaddress) 99 logger.printLog("******系统为:%s ******" % system_type) 100 if system_type == "Windows": 101 lock_file = r'C:/deviceupgrade/task.lock' 102 else: 103 lock_file = '/home/openharmony/deviceupgrade/task.lock' 104 # 如果上一个任务没执行完成,不往下继续执行 105 if not is_can_exec(lock_file): 106 return False 107 version_savepath = self.params_dict.get("img_path") 108 upgrade_test_type = self.params_dict.get("UpgradeTestType") 109 sn = self.params_dict.get("sn") 110 LocationID = self.params_dict.get("LocationID") 111 test_num = self.params_dict.get("test_num") 112 pr_url = self.params_dict.get("pr_url") 113 logFilePath = self.logFilePath 114 logger.info(logFilePath) 115 if system_type == "Windows": 116 r = logFilePath.rfind("\\") 117 else: 118 r = logFilePath.rfind("/") 119 report_path = logFilePath[:r] 120 logger.info(report_path) 121 scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))) 122 logger.info(scriptpath) 123 local_image_path = os.path.join(version_savepath) 124 logger.info(local_image_path) 125 # 判断是否存在smoke_check.json文件 126 smoke_check_file = os.path.join(local_image_path, 'smoke_check.json') 127 if os.path.exists(smoke_check_file): 128 logger.info("smoke_check.json file exist, return True directly, file path is %s" % smoke_check_file) 129 delete_file_lock(lock_file) 130 return True 131 if system_type == "Windows": 132 loader_tool_path = os.path.join(scriptpath, "resource", "RK3568_tool", "upgrade_tool.exe") 133 else: 134 loader_tool_path = os.path.join(scriptpath, "resource", "RK3568_tool", "upgrade_tool") 135 logger.info(loader_tool_path) 136 mini_path = os.path.join(local_image_path, "mini_system_test", "L2_mini_system_test.py") 137 archive_path = os.path.join(version_savepath) 138 if not self.check_devices_mode(): 139 check_devices_cmd = "hdc list targets" 140 f = send_times(check_devices_cmd) 141 logger.info(f) 142 delete_file_lock(lock_file) 143 if not f or "Empty" in f: 144 logger.error("No devices found,please check the device.") 145 return False 146 else: 147 logger.info("3568 board is connected.") 148 return self.check_devices_mode() 149 else: 150 # 下載鏡像 151 upgrde_loader_cmd = "%s -s %s UL %s/MiniLoaderAll.bin -noreset" % (loader_tool_path, LocationID, local_image_path) 152 h = sendCmd(upgrde_loader_cmd) 153 logger.info(h) 154 if "Upgrade loader ok" not in h: 155 logger.error("Download MiniLoaderAll.bin Fail!") 156 delete_file_lock(lock_file) 157 return False 158 else: 159 logger.printLog("Download MiniLoaderAll.bin Success!") 160 # time.sleep(3) 161 write_gpt_cmd = "%s -s %s DI -p %s/parameter.txt" % (loader_tool_path, LocationID, local_image_path) 162 j = sendCmd(write_gpt_cmd) 163 logger.info(j) 164 if "Write gpt ok" not in j: 165 logger.error("Failed to execute the parameter.txt") 166 delete_file_lock(lock_file) 167 return False 168 else: 169 logger.printLog("Successfully executed parameter.txt.") 170 # time.sleep(5) 171 download_uboot_cmd = "%s -s %s DI -uboot %s/uboot.img %s/parameter.txt" % ( 172 loader_tool_path, LocationID, local_image_path, local_image_path) 173 k = sendCmd(download_uboot_cmd) 174 logger.info(k) 175 if "Download image ok" not in k: 176 logger.error("Failed to download the uboot.image!") 177 delete_file_lock(lock_file) 178 if self.check_devices_mode(): 179 return 98 180 return False 181 else: 182 logger.printLog("The uboot.image downloaded successfully!") 183 # time.sleep(5) 184 if not self.flash_version(): 185 delete_file_lock(lock_file) 186 return False 187 reboot_devices_cmd = "%s -s %s RD" % (loader_tool_path, LocationID) 188 reboot_result = sendCmd(reboot_devices_cmd) 189 logger.info(reboot_result) 190 time.sleep(60) 191 # try: 192 # if upgrade_test_type != "mini_system_test": 193 # if not start_cmd(sn): 194 # if self.check_devices_mode(): 195 # return 98 196 # return False 197 # except Exception as t: 198 # logger.info(t) 199 # if self.check_devices_mode(): 200 # return 98 201 # return False 202 # time.sleep(10) 203 if "Reset Device OK" not in reboot_result: 204 logger.error("Failed to reboot the board!") 205 delete_file_lock(lock_file) 206 return False 207 else: 208 logger.info("Reboot successfully!") 209 # os.system("hdc -t %s shell reboot" % sn) 210 # time.sleep(30) 211 # os.system("hdc -t %s shell write_updater user_factory_reset;reboot updater " % sn) 212 # time.sleep(40) 213 # os.system("hdc -t %s shell set persist.usb.setting.gadget_conn_prompt false" % sn) 214 delete_file_lock(lock_file) 215 logger.printLog("******下载完成,升级成功,开始进行冒烟测试******") 216 # hdc_kill() 217 # os.system("hdc_std -t %s shell hilog -w start" % sn) 218 # os.system("hdc_std -t %s shell hilog -w start -t kmsg" % sn) 219 if upgrade_test_type == "null": 220 return True 221 # 临时安装 222 # self.install_third_packages() 223 224 screenshot_path = os.path.join(local_image_path, "screenshot") 225 script_path = os.path.join(screenshot_path, 'new_script') 226 logger.info(script_path) 227 # py_path = os.path.join(script_path, "main.py") 228 py_file = "main.py" 229 new_report_path = os.path.join(report_path, "result") 230 logger.info(new_report_path) 231 time_sleep = random.randint(3, 7) 232 time.sleep(time_sleep) 233 try: 234 if not os.path.exists(new_report_path): 235 os.mkdir(new_report_path) 236 except Exception as e: 237 logger.error(e) 238 return 98 239 if upgrade_test_type == "mini_system_test": 240 save_path = os.path.join(new_report_path) 241 if exec_cmd(mini_path, sn, save_path, archive_path) == 98: 242 return 98 243 return True 244 245 if not upgrade_test_type or upgrade_test_type == "smoke_test": 246 # 进到工程目录 247 cur_path = os.getcwd() 248 os.chdir(script_path) 249 test_return = cmd_test(script_path, py_file, sn, test_num, new_report_path, pr_url) 250 # 执行完回到原来的目录 251 os.chdir(cur_path) 252 if test_return == 1: 253 return True 254 if test_return == 98: 255 return 98 256 if test_return == 99: 257 return 99 258 else: 259 return False 260 261 # def install_third_packages(self): 262 # try: 263 # logger.debug('python -m pip list') 264 # rst = subprocess.run('python -m pip list', capture_output=True, shell=True, encoding='utf-8', timeout=30) 265 # logger.debug(rst) 266 # logger.debug('python -m pip install pytest -U') 267 # rst = subprocess.run('python -m pip install pytest -U', capture_output=True, shell=True, encoding='utf-8', timeout=600) 268 # logger.debug(rst) 269 # logger.debug('python -m pip uninstall pytest-testreport -y') 270 # rst = subprocess.run('python -m pip uninstall pytest-testreport -y', capture_output=True, shell=True, encoding='utf-8', timeout=600) 271 # logger.debug(rst) 272 # logger.debug('python -m pip install pytest-html -U') 273 # rst = subprocess.run('python -m pip install pytest-html -U', capture_output=True, shell=True, encoding='utf-8', timeout=600) 274 # logger.debug(rst) 275 # logger.debug('python -m pip list') 276 # rst = subprocess.run('python -m pip list', capture_output=True, shell=True, encoding='utf-8', timeout=30) 277 # logger.debug(rst) 278 # except: 279 # logger.error(traceback.format_exc()) 280 281 @timeout(1000) 282 def flash_version(self): 283 partList = ["boot_linux", "system", "vendor", "userdata", "resource", "ramdisk", "chipset", "sys-prod", "chip-prod"] 284 for i in partList: 285 if not os.path.exists("%s/%s.img" % (local_image_path, i)): 286 logger.printLog("%s.img is not exist, ignore" % i) 287 continue 288 loadcmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i) 289 p = sendCmd(loadcmd) 290 logger.info(p) 291 # time.sleep(5) 292 if "Download image ok" not in p: 293 logger.info("try download %s again!" % i) 294 time.sleep(1) 295 second_cmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i) 296 f = sendCmd(second_cmd) 297 logger.info(f) 298 if "Download image ok" not in f: 299 logger.printLog("Failed to download the %s.img!" % i) 300 if self.check_devices_mode(): 301 return 98 302 else: 303 return False 304 return True 305 else: 306 logger.printLog("The %s.img downloaded successfully!" % i) 307 return True 308 309 @timeout(120) 310 def check_devices_mode(self): 311 check_times = 0 312 while check_times < 5: 313 check_mode_cmd = "%s LD" % loader_tool_path 314 g = sendCmd(check_mode_cmd) 315 logger.info(g) 316 # time.sleep(40) 317 if "LocationID=%s Mode=Loader" % LocationID in g: 318 logger.info("3568 board has entered the Loader mode successfully!") 319 return True 320 else: 321 # if test_num != "2/2": 322 # hdc_kill() 323 os.system("hdc -t %s shell reboot loader" % sn) 324 time.sleep(8) 325 check_times += 1 326 logger.error("Failed to enter the loader mode!") 327 return False 328 329 # @dec_stepmsg("download") 330 # @timeout(360) 331 # def download(self): 332 # ''' 333 # #=================================================================================== 334 # # @Method: download(self) 335 # # @Precondition: none 336 # # @Func: 构建下载到本地的路径,执行相应包的下载 337 # # @PostStatus: none 338 # # @eg: download() 339 # # @return: True or Flase 340 # #=================================================================================== 341 # ''' 342 # global version_savepath, version_name 343 # dir_path = CONSTANT.Path.getDirPath() 344 # if self.params_dict.get("pbiid"): 345 # version_path = self.params_dict.get("pbiid") 346 # version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT")) 347 # version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name) 348 # else: 349 # version_path = self.params_dict.get("upgrade_upgradeLocation") 350 # version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation")))) 351 # version_savepath = os.path.join(dir_path, version_name, "img") 352 # # 执行img下载 353 # 354 # if self.params_dict.get("isDownload") == "True": 355 # logger.printLog("不需要做下载,直接返回") 356 # return True 357 # 358 # import hashlib 359 # save_file_str = version_path.replace("/", "").replace("\\", "") 360 # save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest() 361 # logger.info("download hash string:%s, hash value:%s" % (save_file_str, save_file_name)) 362 # save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt")) 363 # logger.info(version_savepath) 364 # logger.info(save_path_file) 365 # t = version_savepath[:-4] 366 # logger.info(t) 367 # if not self.excutedown(version_path, version_savepath, save_path_file, False): 368 # logger.info("download again!") 369 # try: 370 # if os.path.exists(t): 371 # shutil.rmtree(t) 372 # logger.info("remove dir succeed") 373 # if os.path.exists(save_path_file): 374 # logger.info("remove file succeed") 375 # os.remove(save_path_file) 376 # except Exception as p: 377 # logger.error(p) 378 # raise Exception(p) 379 # time.sleep(15) 380 # if not self.excutedown(version_path, version_savepath, save_path_file, False): 381 # logger.error("download img fail") 382 # return False 383 # 384 # # 保存本地版本路径给devicetest去版本路径下取用例 385 # saveVersion(save_path_file, version_savepath) 386 # return True 387 # 388 # def excutedown(self, source_path, download_dir, suc_mark, is_file): 389 # ''' 390 # #=================================================================================== 391 # # @Method: excutedown(source_path, download_dir, suc_mark, is_file) 392 # # @Precondition: none 393 # # @Func: 执行下载动作 394 # # @PostStatus: none 395 # # @Param: source_path:资源文件路径 396 # # download_dir:文件下载到本地的文件夹路径 397 # # is_file:是否是文件 398 # # @eg: excutedown("xxxx", "D:\\local\\image", suc_mark, Flase) 399 # # @return: True or Flase 400 # #=================================================================================== 401 # ''' 402 # failed_mark = os.path.join(download_dir, failed_file) 403 # lock_path = os.path.join(download_dir, lock_suffix) 404 # file_lock = FileLock() 405 # 406 # if isDownLoadSuccess(download_dir, suc_mark, failed_mark): 407 # return True 408 # try: 409 # nowtime = get_now_time_str_info() 410 # logger.printLog("%s Downloading, please wait" % nowtime) 411 # file_lock.lockFile(lock_path) 412 # ret = "" 413 # logger.info("Get lock. Start to ") 414 # try: 415 # if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True": 416 # ret = downloadByBitComet(source_path, download_dir, os_method) 417 # elif source_path.startswith('\\\\'): 418 # ret = downloadByCopy(source_path, download_dir, is_file) 419 # elif self.params_dict.get("pbiid"): 420 # ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", 421 # self.params_dict.get("pbiid")) 422 # elif source_path.startswith("http"): 423 # ret = run_download(source_path, download_dir) 424 # except Exception as f: 425 # logger.error(f) 426 # 427 # if source_path.endswith(".zip"): 428 # zip_name = os.path.basename(source_path) 429 # ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir) 430 # if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)): 431 # if source_path.startswith("http") and ("file_id=" in source_path): 432 # if source_path.endswith(".tar.gz"): 433 # zip_name = source_path.split('=')[-1] 434 # else: 435 # zip_name = "out.tar.gz" 436 # else: 437 # zip_name = os.path.basename(source_path) 438 # ret = unTarFile(os.path.join(download_dir, zip_name), download_dir) 439 # nowtime = get_now_time_str_info() 440 # logger.printLog("%s download to %s end" % (nowtime, download_dir)) 441 # 442 # if not ret: 443 # with open(failed_mark, "a+") as fp: 444 # fp.write("") 445 # return ret 446 # except Exception as e: 447 # logger.printLog(e) 448 # #raise Exception(e) 449 # finally: 450 # file_lock.releaseFile() 451 452 453@timeout(30) 454def hdc_kill(): 455 logger.info("kill the process") 456 os.system("hdc kill") 457 time.sleep(2) 458 logger.info("start the process") 459 os.system("hdc -l5 start") 460 # time.sleep(10) 461 462 463def sendCmd(mycmd): 464 result = "".join(os.popen(mycmd).readlines()) 465 return result 466 467 468def send_times(mycmd): 469 times = 0 470 outcome = sendCmd(mycmd) 471 while times < 3: 472 if not outcome or "Empty" in outcome: 473 times += 1 474 time.sleep(3) 475 else: 476 time.sleep(3) 477 return outcome 478 return outcome 479 480 481@timeout(180) 482def start_cmd(sn): 483 try: 484 os.system("hdc -l5 start") 485 # power_cmd = "hdc -t %s shell \"power-shell setmode 602\"" % sn 486 hilog_cmd = "hdc -t %s shell \"hilog -w start -l 400000000 -m none\"" % sn 487 # logger.info(power_cmd) 488 logger.info(hilog_cmd) 489 # power_result = sendCmd(power_cmd) 490 # logger.info(power_result) 491 # if not power_result: 492 # return False 493 # number = 0 494 # while "Set Mode Success" not in power_result and number < 30: 495 # time.sleep(4) 496 # power_result = sendCmd(power_cmd) 497 # logger.info(power_result) 498 # number += 1 499 # if number >= 20: 500 # logger.error("Set mode failed") 501 # return False 502 hilog_result = sendCmd(hilog_cmd) 503 logger.info(hilog_result) 504 return True 505 except Exception as e: 506 logger.error(e) 507 return False 508 509 510@timeout(900) 511def cmd_test(screenshot_path, py_file, device_num, test_num, new_report_path, pr): 512 global total_time 513 save_screenshot_path = os.path.join(new_report_path, "screenshot_result") 514 logger.info(save_screenshot_path) 515 time_sleep = random.randint(1, 5) 516 time.sleep(time_sleep) 517 try: 518 if not os.path.exists(save_screenshot_path): 519 os.mkdir(save_screenshot_path) 520 logger.info(save_screenshot_path) 521 base_screenshot_path = os.path.join(new_report_path, "screenshot_base") 522 if not os.path.exists(base_screenshot_path): 523 os.mkdir(base_screenshot_path) 524 logger.info(base_screenshot_path) 525 except Exception as e: 526 logger.error(e) 527 return 98 528 # config_path = os.path.join(screenshot_path, "app_capture_screen_test_config.json") 529 py_cmd = "python {} --device_num {} --test_num {} --save_path {} --pr {}".format(py_file, device_num, test_num, save_screenshot_path, pr) 530 time1 = time.time() 531 result = outCmd(py_cmd, save_screenshot_path, base_screenshot_path, screenshot_path) 532 time2 = time.time() 533 total_time = int(time2 - time1) 534 logger.info("total_time: %s" % total_time) 535 if result == 1: 536 return True 537 if result == 98: 538 return 98 539 if result == 99: 540 return 99 541 else: 542 return False 543 544 545@timeout(900) 546def outCmd(cmd, save_screenshot_path, base_screenshot_path, script_path): 547 logger.info("cmd is: %s" % cmd) 548 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding='utf-8', errors='ignore', universal_newlines=True) 549 # p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, errors='ignore', universal_newlines=True) 550 curline = p.stdout.readline() 551 # list_png_name = [] 552 try: 553 while "End of check" not in curline: 554 curline = p.stdout.readline() 555 logger.info(curline) 556 if 'SmokeTest: End of check, test succeeded!' in curline: 557 return True 558 # if "abnarmal" in curline: 559 # png_name = curline.split(" ")[3].split(".")[0] 560 # list_png_name.append(png_name) 561 if "SmokeTest find some fatal problems" in curline: 562 logger.error("SmokeTest find some fatal problems!") 563 return 99 564 except Exception as e: 565 logger.error(e) 566 logger.error("execute smoke_test.py failed!") 567 return 99 568 # l = list(set(list_png_name)) 569 # if l: 570 # logger.error(l) 571 try: 572 resource_path = os.path.join(script_path, 'resource') 573 for jpeg_file in os.listdir(resource_path): 574 if jpeg_file.endswith('jpeg'): 575 result = os.path.join(resource_path, jpeg_file) 576 base = os.path.join(base_screenshot_path, jpeg_file) 577 shutil.copy(result, base) 578 except Exception as t: 579 logger.info(t) 580 p.wait() 581 logger.info("p.returncode %s" % p.returncode) 582 if p.returncode == 0: 583 logger.info("screenshot check is ok!") 584 return True 585 if p.returncode == 101: 586 logger.error("device disconnection, please check the device!") 587 return False 588 logger.error("screenshot test failed, check the %s" % save_screenshot_path) 589 return 98 590 591 592@timeout(1000) 593def exec_cmd(mini_path, sn, save_path, archive_path): 594 cmd = "python %s --device_num %s --save_path %s --archive_path %s" % (mini_path, sn, save_path, archive_path) 595 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="gbk") 596 curline = p.stdout.readline() 597 try: 598 while "End of check" not in curline: 599 curline = p.stdout.readline() 600 logger.info(curline) 601 except Exception as e: 602 logger.error(e) 603 p.wait() 604 logger.info("p.returncode %s" % p.returncode) 605 if p.returncode == 0: 606 logger.info("mini_system_test is ok!") 607 return True 608 logger.error("mini_system_test failed!") 609 return 98 610 611@timeout(1000) 612def is_can_exec(lock_file): 613 """ 614 判断升级是否可以执行 615 @param lock_file: 文件路径 616 """ 617 lock_duration = 10 * 60 # 10分钟(以秒为单位) 618 if os.getenv('wait_time') is not None: 619 lock_duration = int(os.getenv('wait_time')) 620 # 检查锁文件 621 if os.path.exists(lock_file): 622 # 获取锁文件的创建时间 623 lock_time = os.path.getmtime(lock_file) 624 current_time = time.time() 625 # 判断锁是否超时 626 if (current_time - lock_time) < 30: 627 return True 628 if (current_time - lock_time) < lock_duration: 629 logger.error("ask is already running. Exiting.") 630 return False 631 else: 632 logger.warning("ask running time is more than %s second, can exec" % lock_duration) 633 delete_file_lock(lock_file) 634 create_file_lock(lock_file) 635 return True 636 else: 637 logger.info("no tasks are being executed") 638 create_file_lock(lock_file) 639 return True 640 641def create_file_lock(lock_file): 642 """ 643 创建文件锁 644 @param lock_file: 文件路径 645 """ 646 directory = os.path.dirname(lock_file) 647 if not os.path.exists(directory): 648 os.makedirs(directory) 649 logger.info("create_file_lock") 650 with open(lock_file, 'w') as f: 651 f.write('locked, please can not delete') 652def delete_file_lock(lock_file): 653 """ 654 删除文件 655 @param lock_file: 文件路径 656 """ 657 logger.info("delete_file_lock") 658 if os.path.exists(lock_file): 659 os.remove(lock_file)