1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (c) 2025 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import importlib 17import os 18from common_utils import run_cmd 19import threading 20import hashlib 21import time 22 23 24remote_sha256_cache = dict() 25_cache_lock = threading.Lock() 26 27 28def import_rich_module(): 29 module = importlib.import_module("rich.progress") 30 progress = module.Progress( 31 module.TextColumn("[bold blue]{task.fields[filename]}", justify="right"), 32 module.BarColumn(bar_width=None), 33 "[progress.percentage]{task.percentage:>3.1f}%", 34 "•", 35 module.DownloadColumn(), 36 "•", 37 module.TransferSpeedColumn(), 38 "•", 39 module.TimeRemainingColumn(), 40 ) 41 return progress 42 43 44def check_sha256_by_mark(remote_url, unzip_dir: str, unzip_filename: str) -> tuple: 45 """ 46 检查是否存在和远程文件哈希值匹配的解压完成标记文件 47 标记文件名:远程文件哈希值 + '.' + unzip_filename + '.mark' 48 """ 49 50 51 remote_sha256 = get_remote_sha256(remote_url) 52 mark_file_name = remote_sha256 + '.' + unzip_filename + '.mark' 53 mark_file_path = os.path.join(unzip_dir, mark_file_name) 54 return os.path.exists(mark_file_path), mark_file_path 55 56 57def file_sha256(file_path): 58 sha = hashlib.sha256() 59 with open(file_path, "rb") as f: 60 while chunk := f.read(8192): # 分块读取大文件,避免内存问题 61 sha.update(chunk) 62 return sha.hexdigest() 63 64 65def check_sha256(remote_url: str, local_file: str) -> bool: 66 """ 67 检查本地文件的SHA256值是否与远程文件一致 68 """ 69 70 remote_sha256 = get_remote_sha256(remote_url) 71 local_sha256 = file_sha256(local_file) 72 if remote_sha256 != local_sha256: 73 print( 74 "remote file {}.sha256 is not found, begin check SHASUMS256.txt".format( 75 remote_url 76 ) 77 ) 78 remote_sha256 = obtain_sha256_by_sha_sums256(remote_url) 79 return remote_sha256 == local_sha256 80 81 82def get_remote_sha256(remote_url: str) -> str: 83 """ 84 从远程.sha256文件中获取哈希值 85 """ 86 start_time = time.time() 87 with _cache_lock: # 加锁检查缓存 88 if remote_url in remote_sha256_cache: 89 return remote_sha256_cache[remote_url] 90 91 # 在锁外执行耗时操作(如网络请求) 92 check_sha256_cmd = f"curl -s -k {remote_url}.sha256" 93 remote_sha256, _, _ = run_cmd(check_sha256_cmd.split()) 94 95 with _cache_lock: # 加锁更新缓存 96 remote_sha256_cache[remote_url] = remote_sha256 97 endtime = time.time() 98 cost_time = endtime - start_time 99 remote_file_name = os.path.basename(remote_url) 100 print(f"get remote sha256 for {remote_file_name} end, cost time: {cost_time}") 101 return remote_sha256 102 103 104def obtain_sha256_by_sha_sums256(remote_url: str) -> str: 105 """ 106 从远程的SHASUMS256.txt中获取SHA256值 107 """ 108 sha_sums256 = "SHASUMS256.txt" 109 sha_sums256_path = os.path.join(os.path.dirname(remote_url), sha_sums256) 110 file_name = os.path.basename(remote_url) 111 cmd = "curl -s -k " + sha_sums256_path 112 data_sha_sums256, _, _ = run_cmd(cmd.split()) 113 remote_sha256 = None 114 for line in data_sha_sums256.split("\n"): 115 if file_name in line: 116 remote_sha256 = line.split(" ")[0] 117 return remote_sha256 118 119 120def get_local_path(download_root: str, remote_url: str): 121 """根据远程URL生成本地路径,本地文件名为url的MD5值+远程文件名 122 """ 123 remote_file_name = os.path.basename(remote_url) 124 remote_url_md5_value = hashlib.md5((remote_url + '\n').encode()).hexdigest() 125 local_path = os.path.join(download_root, '{}.{}'.format(remote_url_md5_value, remote_file_name)) 126 return local_path 127 128 129def extract_compress_files_and_gen_mark(source_file: str, unzip_dir: str, mark_file_path: str): 130 """ 131 解压缩文件并生成解压完成标记文件 132 标记文件名:远程文件哈希值 + '.' + unzip_filename + '.mark' 133 """ 134 if not os.path.exists(unzip_dir): 135 os.makedirs(unzip_dir, exist_ok=True) 136 if source_file.endswith(".zip"): 137 command = ["unzip", "-qq", "-o", "-d", unzip_dir, source_file] 138 elif source_file.endswith(".tar.gz"): 139 command = ["tar", "-xzf", source_file, "-C", unzip_dir] 140 elif source_file.endswith(".tar.xz"): 141 command = ["tar", "-xJf", source_file, "-C", unzip_dir] 142 elif source_file.endswith(".tar"): 143 command = ["tar", "-xvf", source_file, "-C", unzip_dir] 144 else: 145 print("暂不支持解压此类型压缩文件!") 146 return 147 148 _, err, retcode = run_cmd(command) 149 if retcode != 0: 150 print("解压失败,错误信息:", err) 151 return 152 else: 153 with open(mark_file_path, "w") as f: 154 f.write("0") 155