• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (c) 2025 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import importlib
17import os
18from common_utils import run_cmd
19import threading
20import hashlib
21import time
22
23
24remote_sha256_cache = dict()
25_cache_lock = threading.Lock()
26
27
28def import_rich_module():
29    module = importlib.import_module("rich.progress")
30    progress = module.Progress(
31        module.TextColumn("[bold blue]{task.fields[filename]}", justify="right"),
32        module.BarColumn(bar_width=None),
33        "[progress.percentage]{task.percentage:>3.1f}%",
34        "•",
35        module.DownloadColumn(),
36        "•",
37        module.TransferSpeedColumn(),
38        "•",
39        module.TimeRemainingColumn(),
40    )
41    return progress
42
43
44def check_sha256_by_mark(remote_url, unzip_dir: str, unzip_filename: str) -> tuple:
45    """
46    检查是否存在和远程文件哈希值匹配的解压完成标记文件
47    标记文件名:远程文件哈希值 + '.' + unzip_filename + '.mark'
48    """
49
50
51    remote_sha256 = get_remote_sha256(remote_url)
52    mark_file_name = remote_sha256 + '.' + unzip_filename + '.mark'
53    mark_file_path = os.path.join(unzip_dir, mark_file_name)
54    return os.path.exists(mark_file_path), mark_file_path
55
56
57def file_sha256(file_path):
58    sha = hashlib.sha256()
59    with open(file_path, "rb") as f:
60        while chunk := f.read(8192):  # 分块读取大文件,避免内存问题
61            sha.update(chunk)
62    return sha.hexdigest()
63
64
65def check_sha256(remote_url: str, local_file: str) -> bool:
66    """
67    检查本地文件的SHA256值是否与远程文件一致
68    """
69
70    remote_sha256 = get_remote_sha256(remote_url)
71    local_sha256 = file_sha256(local_file)
72    if remote_sha256 != local_sha256:
73        print(
74            "remote file {}.sha256 is not found, begin check SHASUMS256.txt".format(
75                remote_url
76            )
77        )
78        remote_sha256 = obtain_sha256_by_sha_sums256(remote_url)
79    return remote_sha256 == local_sha256
80
81
82def get_remote_sha256(remote_url: str) -> str:
83    """
84    从远程.sha256文件中获取哈希值
85    """
86    start_time = time.time()
87    with _cache_lock:  # 加锁检查缓存
88        if remote_url in remote_sha256_cache:
89            return remote_sha256_cache[remote_url]
90
91    # 在锁外执行耗时操作(如网络请求)
92    check_sha256_cmd = f"curl -s -k {remote_url}.sha256"
93    remote_sha256, _, _ = run_cmd(check_sha256_cmd.split())
94
95    with _cache_lock:  # 加锁更新缓存
96        remote_sha256_cache[remote_url] = remote_sha256
97    endtime = time.time()
98    cost_time = endtime - start_time
99    remote_file_name = os.path.basename(remote_url)
100    print(f"get remote sha256 for {remote_file_name} end, cost time: {cost_time}")
101    return remote_sha256
102
103
104def obtain_sha256_by_sha_sums256(remote_url: str) -> str:
105    """
106    从远程的SHASUMS256.txt中获取SHA256值
107    """
108    sha_sums256 = "SHASUMS256.txt"
109    sha_sums256_path = os.path.join(os.path.dirname(remote_url), sha_sums256)
110    file_name = os.path.basename(remote_url)
111    cmd = "curl -s -k " + sha_sums256_path
112    data_sha_sums256, _, _ = run_cmd(cmd.split())
113    remote_sha256 = None
114    for line in data_sha_sums256.split("\n"):
115        if file_name in line:
116            remote_sha256 = line.split(" ")[0]
117    return remote_sha256
118
119
120def get_local_path(download_root: str, remote_url: str):
121    """根据远程URL生成本地路径,本地文件名为url的MD5值+远程文件名
122    """
123    remote_file_name = os.path.basename(remote_url)
124    remote_url_md5_value = hashlib.md5((remote_url + '\n').encode()).hexdigest()
125    local_path = os.path.join(download_root, '{}.{}'.format(remote_url_md5_value, remote_file_name))
126    return local_path
127
128
129def extract_compress_files_and_gen_mark(source_file: str, unzip_dir: str, mark_file_path: str):
130    """
131    解压缩文件并生成解压完成标记文件
132    标记文件名:远程文件哈希值 + '.' + unzip_filename + '.mark'
133    """
134    if not os.path.exists(unzip_dir):
135        os.makedirs(unzip_dir, exist_ok=True)
136    if source_file.endswith(".zip"):
137        command = ["unzip", "-qq", "-o", "-d", unzip_dir, source_file]
138    elif source_file.endswith(".tar.gz"):
139        command = ["tar", "-xzf", source_file, "-C", unzip_dir]
140    elif source_file.endswith(".tar.xz"):
141        command = ["tar", "-xJf", source_file, "-C", unzip_dir]
142    elif source_file.endswith(".tar"):
143        command = ["tar", "-xvf", source_file, "-C", unzip_dir]
144    else:
145        print("暂不支持解压此类型压缩文件!")
146        return
147
148    _, err, retcode = run_cmd(command)
149    if retcode != 0:
150        print("解压失败,错误信息:", err)
151        return
152    else:
153        with open(mark_file_path, "w") as f:
154            f.write("0")
155