1# -*- coding:utf-8 -*- 2import traceback 3import uuid 4import sys 5import subprocess 6import os 7import time 8import re 9import shutil 10import random 11import platform 12import socket 13 14from core.base import BaseApp, dec_stepmsg 15from util.file_locker import FileLock 16from util.log_info import logger 17from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout 18from aw.Download.Download import * 19from aw.Common.Constant import CONSTANT 20from aw.Common.Common import getFileName 21from aw.ExtractFile.ExtractFile import * 22from aw.Common.Common import getHostIp, copyFile, copyDirectory 23 24total_time = "" 25lock_suffix = CONSTANT.File.LOCK_SUFFIX 26suc_file = CONSTANT.File.SUC_FILE 27failed_file = CONSTANT.File.FAILED_FILE 28REBOOT_TIMEOUT = 20000000 29 30 31class liteOsUpgrade_RK3568(BaseApp): 32 ''' 33 @author: cwx1076044 34 ''' 35 36 def __init__(self, param_file): 37 super().__init__(param_file) 38 self.param_List = ["upgrade_upgradeLocation", "sn"] 39 40 @dec_stepmsg("hongmeng RK3568 flash") 41 def excute(self): 42 ''' 43 #=================================================================================== 44 # @Method: excute(self) 45 # @Precondition: none 46 # @Func: 升级执行入口 47 # @PostStatus: none 48 # @eg: excute() 49 # @return: True or Flase 50 #=================================================================================== 51 ''' 52 step_index = self.params_dict.get("step_list").index("liteOsUpgrade_RK3568_app") 53 54 # # 执行下载 55 # try: 56 # if not self.download(): 57 # CONSTANT.ENVERRMESSAGE = "image download fail" 58 # logger.printLog(CONSTANT.ENVERRMESSAGE) 59 # return 98 60 # except Exception as e: 61 # logger.error(e) 62 # #raise e 63 # return 98 64 65 # 执行升级 66 try: 67 return_code = self.upgrade() 68 if not return_code: 69 CONSTANT.ENVERRMESSAGE = "board upgrade fail" 70 logger.printLog(CONSTANT.ENVERRMESSAGE) 71 return False 72 if return_code == 98: 73 return 98 74 if return_code == 99: 75 return 99 76 return True 77 except Exception as e: 78 logger.error(e) 79 raise e 80 81 @dec_stepmsg("upgrade") 82 @timeout(3600) 83 def upgrade(self): 84 ''' 85 #=================================================================================== 86 # @Method: upgrade(self) 87 # @Precondition: none 88 # @Func: 升级相关业务逻辑 89 # @PostStatus: none 90 # @eg: upgrade() 91 # @return: True or Flase 92 #=================================================================================== 93 ''' 94 global local_image_path, loader_tool_path, sn, LocationID, test_num, system_type 95 system_type = platform.system() 96 hostname = socket.gethostname() 97 ipaddress = socket.gethostbyname(hostname) 98 logger.printLog("******系统ip为:%s ******" % ipaddress) 99 logger.printLog("******系统为:%s ******" % system_type) 100 if system_type == "Windows": 101 lock_file = r'C:/deviceupgrade/task.lock' 102 else: 103 lock_file = '/home/openharmony/deviceupgrade/task.lock' 104 # 如果上一个任务没执行完成,不往下继续执行 105 if not is_can_exec(lock_file): 106 return False 107 version_savepath = self.params_dict.get("img_path") 108 upgrade_test_type = self.params_dict.get("UpgradeTestType") 109 sn = self.params_dict.get("sn") 110 LocationID = self.params_dict.get("LocationID") 111 test_num = self.params_dict.get("test_num") 112 pr_url = self.params_dict.get("pr_url") 113 logFilePath = self.logFilePath 114 logger.info(logFilePath) 115 if system_type == "Windows": 116 r = logFilePath.rfind("\\") 117 else: 118 r = logFilePath.rfind("/") 119 report_path = logFilePath[:r] 120 logger.info(report_path) 121 scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))) 122 logger.info(scriptpath) 123 local_image_path = os.path.join(version_savepath) 124 logger.info(local_image_path) 125 # 判断是否存在smoke_check.json文件 126 smoke_check_file = os.path.join(local_image_path, 'smoke_check.json') 127 if os.path.exists(smoke_check_file): 128 logger.info("smoke_check.json file exist, return True directly, file path is %s" % smoke_check_file) 129 delete_file_lock(lock_file) 130 return True 131 if system_type == "Windows": 132 loader_tool_path = os.path.join(scriptpath, "resource", "RK3568_tool", "upgrade_tool.exe") 133 else: 134 loader_tool_path = os.path.join(scriptpath, "resource", "RK3568_tool", "upgrade_tool") 135 logger.info(loader_tool_path) 136 mini_path = os.path.join(local_image_path, "mini_system_test", "L2_mini_system_test.py") 137 archive_path = os.path.join(version_savepath) 138 if not self.check_devices_mode(): 139 check_devices_cmd = "hdc list targets" 140 f = send_times(check_devices_cmd) 141 logger.info(f) 142 delete_file_lock(lock_file) 143 if not f or "Empty" in f: 144 logger.error("No devices found,please check the device.") 145 return False 146 else: 147 logger.info("3568 board is connected.") 148 return self.check_devices_mode() 149 else: 150 # 下載鏡像 151 upgrde_loader_cmd = "%s -s %s UL %s/MiniLoaderAll.bin -noreset" % (loader_tool_path, LocationID, local_image_path) 152 h = sendCmd(upgrde_loader_cmd) 153 logger.info(h) 154 if "Upgrade loader ok" not in h: 155 logger.error("Download MiniLoaderAll.bin Fail!") 156 delete_file_lock(lock_file) 157 return False 158 else: 159 logger.printLog("Download MiniLoaderAll.bin Success!") 160 # time.sleep(3) 161 write_gpt_cmd = "%s -s %s DI -p %s/parameter.txt" % (loader_tool_path, LocationID, local_image_path) 162 j = sendCmd(write_gpt_cmd) 163 logger.info(j) 164 if "Write gpt ok" not in j: 165 logger.error("Failed to execute the parameter.txt") 166 delete_file_lock(lock_file) 167 return False 168 else: 169 logger.printLog("Successfully executed parameter.txt.") 170 # time.sleep(5) 171 download_uboot_cmd = "%s -s %s DI -uboot %s/uboot.img %s/parameter.txt" % ( 172 loader_tool_path, LocationID, local_image_path, local_image_path) 173 k = sendCmd(download_uboot_cmd) 174 logger.info(k) 175 if "Download image ok" not in k: 176 logger.error("Failed to download the uboot.image!") 177 delete_file_lock(lock_file) 178 if self.check_devices_mode(): 179 return 98 180 return False 181 else: 182 logger.printLog("The uboot.image downloaded successfully!") 183 # time.sleep(5) 184 if not self.flash_version(): 185 delete_file_lock(lock_file) 186 return False 187 reboot_devices_cmd = "%s -s %s RD" % (loader_tool_path, LocationID) 188 reboot_result = sendCmd(reboot_devices_cmd) 189 logger.info(reboot_result) 190 time.sleep(50) 191 # try: 192 # if upgrade_test_type != "mini_system_test": 193 # if not start_cmd(sn): 194 # if self.check_devices_mode(): 195 # return 98 196 # return False 197 # except Exception as t: 198 # logger.info(t) 199 # if self.check_devices_mode(): 200 # return 98 201 # return False 202 # time.sleep(10) 203 if "Reset Device OK" not in reboot_result: 204 logger.error("Failed to reboot the board!") 205 delete_file_lock(lock_file) 206 return False 207 else: 208 logger.info("Reboot successfully!") 209 #os.system("hdc -t %s shell set persist.usb.setting.gadget_conn_prompt false" % sn) 210 delete_file_lock(lock_file) 211 logger.printLog("******下载完成,升级成功,开始进行冒烟测试******") 212 hdc_kill() 213 # os.system("hdc_std -t %s shell hilog -w start" % sn) 214 # os.system("hdc_std -t %s shell hilog -w start -t kmsg" % sn) 215 if upgrade_test_type == "null": 216 return True 217 # 临时安装 218 # self.install_third_packages() 219 220 screenshot_path = os.path.join(local_image_path, "screenshot") 221 script_path = os.path.join(screenshot_path, 'new_script') 222 logger.info(script_path) 223 # py_path = os.path.join(script_path, "main.py") 224 py_file = "main.py" 225 new_report_path = os.path.join(report_path, "result") 226 logger.info(new_report_path) 227 time_sleep = random.randint(3, 7) 228 time.sleep(time_sleep) 229 try: 230 if not os.path.exists(new_report_path): 231 os.mkdir(new_report_path) 232 except Exception as e: 233 logger.error(e) 234 return 98 235 if upgrade_test_type == "mini_system_test": 236 save_path = os.path.join(new_report_path) 237 if exec_cmd(mini_path, sn, save_path, archive_path) == 98: 238 return 98 239 return True 240 241 if not upgrade_test_type or upgrade_test_type == "smoke_test": 242 # 进到工程目录 243 cur_path = os.getcwd() 244 os.chdir(script_path) 245 test_return = cmd_test(script_path, py_file, sn, test_num, new_report_path, pr_url) 246 # 执行完回到原来的目录 247 os.chdir(cur_path) 248 if test_return == 1: 249 return True 250 if test_return == 98: 251 return 98 252 if test_return == 99: 253 return 99 254 else: 255 return False 256 257 # def install_third_packages(self): 258 # try: 259 # logger.debug('python -m pip list') 260 # rst = subprocess.run('python -m pip list', capture_output=True, shell=True, encoding='utf-8', timeout=30) 261 # logger.debug(rst) 262 # logger.debug('python -m pip install pytest -U') 263 # rst = subprocess.run('python -m pip install pytest -U', capture_output=True, shell=True, encoding='utf-8', timeout=600) 264 # logger.debug(rst) 265 # logger.debug('python -m pip uninstall pytest-testreport -y') 266 # rst = subprocess.run('python -m pip uninstall pytest-testreport -y', capture_output=True, shell=True, encoding='utf-8', timeout=600) 267 # logger.debug(rst) 268 # logger.debug('python -m pip install pytest-html -U') 269 # rst = subprocess.run('python -m pip install pytest-html -U', capture_output=True, shell=True, encoding='utf-8', timeout=600) 270 # logger.debug(rst) 271 # logger.debug('python -m pip list') 272 # rst = subprocess.run('python -m pip list', capture_output=True, shell=True, encoding='utf-8', timeout=30) 273 # logger.debug(rst) 274 # except: 275 # logger.error(traceback.format_exc()) 276 277 @timeout(1000) 278 def flash_version(self): 279 partList = ["boot_linux", "system", "vendor", "userdata", "resource", "ramdisk", "chipset", "sys-prod", "chip-prod"] 280 for i in partList: 281 if not os.path.exists("%s/%s.img" % (local_image_path, i)): 282 logger.printLog("%s.img is not exist, ignore" % i) 283 continue 284 loadcmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i) 285 p = sendCmd(loadcmd) 286 logger.info(p) 287 # time.sleep(5) 288 if "Download image ok" not in p: 289 logger.info("try download %s again!" % i) 290 time.sleep(1) 291 second_cmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i) 292 f = sendCmd(second_cmd) 293 logger.info(f) 294 if "Download image ok" not in f: 295 logger.printLog("Failed to download the %s.img!" % i) 296 if self.check_devices_mode(): 297 return 98 298 else: 299 return False 300 return True 301 else: 302 logger.printLog("The %s.img downloaded successfully!" % i) 303 return True 304 305 @timeout(120) 306 def check_devices_mode(self): 307 check_times = 0 308 while check_times < 5: 309 check_mode_cmd = "%s LD" % loader_tool_path 310 g = sendCmd(check_mode_cmd) 311 logger.info(g) 312 # time.sleep(40) 313 if "LocationID=%s Mode=Loader" % LocationID in g: 314 logger.info("3568 board has entered the Loader mode successfully!") 315 return True 316 else: 317 # if test_num != "2/2": 318 # hdc_kill() 319 os.system("hdc -t %s shell reboot loader" % sn) 320 time.sleep(8) 321 check_times += 1 322 logger.error("Failed to enter the loader mode!") 323 return False 324 325 # @dec_stepmsg("download") 326 # @timeout(360) 327 # def download(self): 328 # ''' 329 # #=================================================================================== 330 # # @Method: download(self) 331 # # @Precondition: none 332 # # @Func: 构建下载到本地的路径,执行相应包的下载 333 # # @PostStatus: none 334 # # @eg: download() 335 # # @return: True or Flase 336 # #=================================================================================== 337 # ''' 338 # global version_savepath, version_name 339 # dir_path = CONSTANT.Path.getDirPath() 340 # if self.params_dict.get("pbiid"): 341 # version_path = self.params_dict.get("pbiid") 342 # version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT")) 343 # version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name) 344 # else: 345 # version_path = self.params_dict.get("upgrade_upgradeLocation") 346 # version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation")))) 347 # version_savepath = os.path.join(dir_path, version_name, "img") 348 # # 执行img下载 349 # 350 # if self.params_dict.get("isDownload") == "True": 351 # logger.printLog("不需要做下载,直接返回") 352 # return True 353 # 354 # import hashlib 355 # save_file_str = version_path.replace("/", "").replace("\\", "") 356 # save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest() 357 # logger.info("download hash string:%s, hash value:%s" % (save_file_str, save_file_name)) 358 # save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt")) 359 # logger.info(version_savepath) 360 # logger.info(save_path_file) 361 # t = version_savepath[:-4] 362 # logger.info(t) 363 # if not self.excutedown(version_path, version_savepath, save_path_file, False): 364 # logger.info("download again!") 365 # try: 366 # if os.path.exists(t): 367 # shutil.rmtree(t) 368 # logger.info("remove dir succeed") 369 # if os.path.exists(save_path_file): 370 # logger.info("remove file succeed") 371 # os.remove(save_path_file) 372 # except Exception as p: 373 # logger.error(p) 374 # raise Exception(p) 375 # time.sleep(15) 376 # if not self.excutedown(version_path, version_savepath, save_path_file, False): 377 # logger.error("download img fail") 378 # return False 379 # 380 # # 保存本地版本路径给devicetest去版本路径下取用例 381 # saveVersion(save_path_file, version_savepath) 382 # return True 383 # 384 # def excutedown(self, source_path, download_dir, suc_mark, is_file): 385 # ''' 386 # #=================================================================================== 387 # # @Method: excutedown(source_path, download_dir, suc_mark, is_file) 388 # # @Precondition: none 389 # # @Func: 执行下载动作 390 # # @PostStatus: none 391 # # @Param: source_path:资源文件路径 392 # # download_dir:文件下载到本地的文件夹路径 393 # # is_file:是否是文件 394 # # @eg: excutedown("xxxx", "D:\\local\\image", suc_mark, Flase) 395 # # @return: True or Flase 396 # #=================================================================================== 397 # ''' 398 # failed_mark = os.path.join(download_dir, failed_file) 399 # lock_path = os.path.join(download_dir, lock_suffix) 400 # file_lock = FileLock() 401 # 402 # if isDownLoadSuccess(download_dir, suc_mark, failed_mark): 403 # return True 404 # try: 405 # nowtime = get_now_time_str_info() 406 # logger.printLog("%s Downloading, please wait" % nowtime) 407 # file_lock.lockFile(lock_path) 408 # ret = "" 409 # logger.info("Get lock. Start to ") 410 # try: 411 # if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True": 412 # ret = downloadByBitComet(source_path, download_dir, os_method) 413 # elif source_path.startswith('\\\\'): 414 # ret = downloadByCopy(source_path, download_dir, is_file) 415 # elif self.params_dict.get("pbiid"): 416 # ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", 417 # self.params_dict.get("pbiid")) 418 # elif source_path.startswith("http"): 419 # ret = run_download(source_path, download_dir) 420 # except Exception as f: 421 # logger.error(f) 422 # 423 # if source_path.endswith(".zip"): 424 # zip_name = os.path.basename(source_path) 425 # ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir) 426 # if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)): 427 # if source_path.startswith("http") and ("file_id=" in source_path): 428 # if source_path.endswith(".tar.gz"): 429 # zip_name = source_path.split('=')[-1] 430 # else: 431 # zip_name = "out.tar.gz" 432 # else: 433 # zip_name = os.path.basename(source_path) 434 # ret = unTarFile(os.path.join(download_dir, zip_name), download_dir) 435 # nowtime = get_now_time_str_info() 436 # logger.printLog("%s download to %s end" % (nowtime, download_dir)) 437 # 438 # if not ret: 439 # with open(failed_mark, "a+") as fp: 440 # fp.write("") 441 # return ret 442 # except Exception as e: 443 # logger.printLog(e) 444 # #raise Exception(e) 445 # finally: 446 # file_lock.releaseFile() 447 448 449@timeout(30) 450def hdc_kill(): 451 logger.info("kill the process") 452 os.system("hdc kill") 453 time.sleep(2) 454 logger.info("start the process") 455 os.system("hdc -l5 start") 456 # time.sleep(10) 457 458 459def sendCmd(mycmd): 460 result = "".join(os.popen(mycmd).readlines()) 461 return result 462 463 464def send_times(mycmd): 465 times = 0 466 outcome = sendCmd(mycmd) 467 while times < 3: 468 if not outcome or "Empty" in outcome: 469 times += 1 470 time.sleep(3) 471 else: 472 time.sleep(3) 473 return outcome 474 return outcome 475 476 477@timeout(180) 478def start_cmd(sn): 479 try: 480 os.system("hdc -l5 start") 481 # power_cmd = "hdc -t %s shell \"power-shell setmode 602\"" % sn 482 hilog_cmd = "hdc -t %s shell \"hilog -w start -l 400000000 -m none\"" % sn 483 # logger.info(power_cmd) 484 logger.info(hilog_cmd) 485 # power_result = sendCmd(power_cmd) 486 # logger.info(power_result) 487 # if not power_result: 488 # return False 489 # number = 0 490 # while "Set Mode Success" not in power_result and number < 30: 491 # time.sleep(4) 492 # power_result = sendCmd(power_cmd) 493 # logger.info(power_result) 494 # number += 1 495 # if number >= 20: 496 # logger.error("Set mode failed") 497 # return False 498 hilog_result = sendCmd(hilog_cmd) 499 logger.info(hilog_result) 500 return True 501 except Exception as e: 502 logger.error(e) 503 return False 504 505 506@timeout(900) 507def cmd_test(screenshot_path, py_file, device_num, test_num, new_report_path, pr): 508 global total_time 509 save_screenshot_path = os.path.join(new_report_path, "screenshot_result") 510 logger.info(save_screenshot_path) 511 time_sleep = random.randint(1, 5) 512 time.sleep(time_sleep) 513 try: 514 if not os.path.exists(save_screenshot_path): 515 os.mkdir(save_screenshot_path) 516 logger.info(save_screenshot_path) 517 base_screenshot_path = os.path.join(new_report_path, "screenshot_base") 518 if not os.path.exists(base_screenshot_path): 519 os.mkdir(base_screenshot_path) 520 logger.info(base_screenshot_path) 521 except Exception as e: 522 logger.error(e) 523 return 98 524 # config_path = os.path.join(screenshot_path, "app_capture_screen_test_config.json") 525 py_cmd = "python {} --device_num {} --test_num {} --save_path {} --pr {}".format(py_file, device_num, test_num, save_screenshot_path, pr) 526 time1 = time.time() 527 result = outCmd(py_cmd, save_screenshot_path, base_screenshot_path, screenshot_path) 528 time2 = time.time() 529 total_time = int(time2 - time1) 530 logger.info("total_time: %s" % total_time) 531 if result == 1: 532 return True 533 if result == 98: 534 return 98 535 if result == 99: 536 return 99 537 else: 538 return False 539 540 541@timeout(900) 542def outCmd(cmd, save_screenshot_path, base_screenshot_path, script_path): 543 logger.info("cmd is: %s" % cmd) 544 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding='utf-8', errors='ignore', universal_newlines=True) 545 # p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, errors='ignore', universal_newlines=True) 546 curline = p.stdout.readline() 547 # list_png_name = [] 548 try: 549 while "End of check" not in curline: 550 curline = p.stdout.readline() 551 logger.info(curline) 552 if 'SmokeTest: End of check, test succeeded!' in curline: 553 return True 554 # if "abnarmal" in curline: 555 # png_name = curline.split(" ")[3].split(".")[0] 556 # list_png_name.append(png_name) 557 if "SmokeTest find some fatal problems" in curline: 558 logger.error("SmokeTest find some fatal problems!") 559 return 99 560 except Exception as e: 561 logger.error(e) 562 logger.error("execute smoke_test.py failed!") 563 return 99 564 # l = list(set(list_png_name)) 565 # if l: 566 # logger.error(l) 567 try: 568 resource_path = os.path.join(script_path, 'resource') 569 for jpeg_file in os.listdir(resource_path): 570 if jpeg_file.endswith('jpeg'): 571 result = os.path.join(resource_path, jpeg_file) 572 base = os.path.join(base_screenshot_path, jpeg_file) 573 shutil.copy(result, base) 574 except Exception as t: 575 logger.info(t) 576 p.wait() 577 logger.info("p.returncode %s" % p.returncode) 578 if p.returncode == 0: 579 logger.info("screenshot check is ok!") 580 return True 581 if p.returncode == 101: 582 logger.error("device disconnection, please check the device!") 583 return False 584 logger.error("screenshot test failed, check the %s" % save_screenshot_path) 585 return 98 586 587 588@timeout(1000) 589def exec_cmd(mini_path, sn, save_path, archive_path): 590 cmd = "python %s --device_num %s --save_path %s --archive_path %s" % (mini_path, sn, save_path, archive_path) 591 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="gbk") 592 curline = p.stdout.readline() 593 try: 594 while "End of check" not in curline: 595 curline = p.stdout.readline() 596 logger.info(curline) 597 except Exception as e: 598 logger.error(e) 599 p.wait() 600 logger.info("p.returncode %s" % p.returncode) 601 if p.returncode == 0: 602 logger.info("mini_system_test is ok!") 603 return True 604 logger.error("mini_system_test failed!") 605 return 98 606 607@timeout(1000) 608def is_can_exec(lock_file): 609 """ 610 判断升级是否可以执行 611 @param lock_file: 文件路径 612 """ 613 lock_duration = 10 * 60 # 10分钟(以秒为单位) 614 if os.getenv('wait_time') is not None: 615 lock_duration = int(os.getenv('wait_time')) 616 # 检查锁文件 617 if os.path.exists(lock_file): 618 # 获取锁文件的创建时间 619 lock_time = os.path.getmtime(lock_file) 620 current_time = time.time() 621 # 判断锁是否超时 622 if (current_time - lock_time) < 30: 623 return True 624 if (current_time - lock_time) < lock_duration: 625 logger.error("ask is already running. Exiting.") 626 return False 627 else: 628 logger.warning("ask running time is more than %s second, can exec" % lock_duration) 629 delete_file_lock(lock_file) 630 create_file_lock(lock_file) 631 return True 632 else: 633 logger.info("no tasks are being executed") 634 create_file_lock(lock_file) 635 return True 636 637def create_file_lock(lock_file): 638 """ 639 创建文件锁 640 @param lock_file: 文件路径 641 """ 642 directory = os.path.dirname(lock_file) 643 if not os.path.exists(directory): 644 os.makedirs(directory) 645 logger.info("create_file_lock") 646 with open(lock_file, 'w') as f: 647 f.write('locked, please can not delete') 648def delete_file_lock(lock_file): 649 """ 650 删除文件 651 @param lock_file: 文件路径 652 """ 653 logger.info("delete_file_lock") 654 if os.path.exists(lock_file): 655 os.remove(lock_file)