1# Copyright (C) 2020 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Module to check updates from crates.io.""" 15 16import json 17import os 18from pathlib import Path 19import re 20import shutil 21import tempfile 22import urllib.request 23from typing import IO 24 25import archive_utils 26from base_updater import Updater 27import git_utils 28# pylint: disable=import-error 29import metadata_pb2 # type: ignore 30import updater_utils 31 32LIBRARY_NAME_PATTERN: str = r"([-\w]+)" 33 34ALPHA_BETA_PATTERN: str = r"^.*[0-9]+\.[0-9]+\.[0-9]+-(alpha|beta).*" 35 36ALPHA_BETA_RE: re.Pattern = re.compile(ALPHA_BETA_PATTERN) 37 38"""Match both x.y.z and x.y.z+a.b.c which is used by some Vulkan binding libraries""" 39VERSION_PATTERN: str = r"([0-9]+)\.([0-9]+)\.([0-9]+)(\+([0-9]+)\.([0-9]+)\.([0-9]+))?" 40 41VERSION_RE: re.Pattern = re.compile(VERSION_PATTERN) 42 43CRATES_IO_ARCHIVE_URL_PATTERN: str = (r"^https:\/\/static.crates.io\/crates\/" + 44 LIBRARY_NAME_PATTERN + "/" + 45 LIBRARY_NAME_PATTERN + "-" + 46 "(.*?)" + ".crate") 47 48CRATES_IO_ARCHIVE_URL_RE: re.Pattern = re.compile(CRATES_IO_ARCHIVE_URL_PATTERN) 49 50DESCRIPTION_PATTERN: str = r"^description *= *(\".+\")" 51 52DESCRIPTION_RE: re.Pattern = re.compile(DESCRIPTION_PATTERN) 53 54 55class CratesUpdater(Updater): 56 """Updater for crates.io packages.""" 57 58 UPSTREAM_REMOTE_NAME: str = "update_origin" 59 download_url: str 60 package: str 61 package_dir: str 62 temp_file: IO 63 64 def is_supported_url(self) -> bool: 65 match = CRATES_IO_ARCHIVE_URL_RE.match(self._old_identifier.value) 66 if match is None: 67 return False 68 self.package = match.group(1) 69 return True 70 71 def setup_remote(self) -> None: 72 url = "https://crates.io/api/v1/crates/" + self.package 73 with urllib.request.urlopen(url) as request: 74 data = json.loads(request.read().decode()) 75 homepage = data["crate"]["repository"] 76 remotes = git_utils.list_remotes(self._proj_path) 77 current_remote_url = None 78 for name, url in remotes.items(): 79 if name == self.UPSTREAM_REMOTE_NAME: 80 current_remote_url = url 81 82 if current_remote_url is not None and current_remote_url != homepage: 83 git_utils.remove_remote(self._proj_path, self.UPSTREAM_REMOTE_NAME) 84 current_remote_url = None 85 86 if current_remote_url is None: 87 git_utils.add_remote(self._proj_path, self.UPSTREAM_REMOTE_NAME, homepage) 88 89 branch = git_utils.detect_default_branch(self._proj_path, 90 self.UPSTREAM_REMOTE_NAME) 91 git_utils.fetch(self._proj_path, self.UPSTREAM_REMOTE_NAME, branch) 92 93 def _get_version_numbers(self, version: str) -> tuple[int, int, int]: 94 match = VERSION_RE.match(version) 95 if match is not None: 96 return ( 97 int(match.group(1)), 98 int(match.group(2)), 99 int(match.group(3)), 100 ) 101 return (0, 0, 0) 102 103 def _is_newer_version(self, prev_version: str, prev_id: int, 104 check_version: str, check_id: int): 105 """Return true if check_version+id is newer than prev_version+id.""" 106 return ((self._get_version_numbers(check_version), check_id) > 107 (self._get_version_numbers(prev_version), prev_id)) 108 109 def _find_latest_non_test_version(self) -> None: 110 url = f"https://crates.io/api/v1/crates/{self.package}/versions" 111 with urllib.request.urlopen(url) as request: 112 data = json.loads(request.read().decode()) 113 last_id = 0 114 self._new_identifier.version = "" 115 for v in data["versions"]: 116 version = v["num"] 117 if (not v["yanked"] and not ALPHA_BETA_RE.match(version) and 118 self._is_newer_version( 119 self._new_identifier.version, last_id, version, int(v["id"]))): 120 last_id = int(v["id"]) 121 self._new_identifier.version = version 122 self.download_url = "https://crates.io" + v["dl_path"] 123 124 def check(self) -> None: 125 """Checks crates.io and returns whether a new version is available.""" 126 url = "https://crates.io/api/v1/crates/" + self.package 127 with urllib.request.urlopen(url) as request: 128 data = json.loads(request.read().decode()) 129 self._new_identifier.version = data["crate"]["max_version"] 130 # Skip d.d.d-{alpha,beta}* versions 131 if ALPHA_BETA_RE.match(self._new_identifier.version): 132 print(f"Ignore alpha or beta release:{self.package}-{self._new_identifier.version}.") 133 self._find_latest_non_test_version() 134 else: 135 url = url + "/" + self._new_identifier.version 136 with urllib.request.urlopen(url) as request: 137 data = json.loads(request.read().decode()) 138 self.download_url = "https://crates.io" + data["version"]["dl_path"] 139 140 def set_new_version_to_old(self): 141 super().refresh_without_upgrading() 142 # A shortcut to use the static download path. 143 self.download_url = f"https://static.crates.io/crates/{self.package}/" \ 144 f"{self.package}-{self._new_identifier.version}.crate" 145 146 def update(self) -> None: 147 """Updates the package. 148 149 Has to call check() before this function. 150 """ 151 try: 152 temporary_dir = archive_utils.download_and_extract(self.download_url) 153 self.package_dir = archive_utils.find_archive_root(temporary_dir) 154 self.temp_file = tempfile.NamedTemporaryFile() 155 updater_utils.replace_package(self.package_dir, self._proj_path, 156 self.temp_file.name) 157 self.check_for_errors() 158 finally: 159 urllib.request.urlcleanup() 160 161 def rollback(self) -> bool: 162 # Only rollback if we have already swapped, 163 # which we denote by writing to this file. 164 if os.fstat(self.temp_file.fileno()).st_size > 0: 165 tmp_dir = tempfile.TemporaryDirectory() 166 shutil.move(self._proj_path, tmp_dir.name) 167 shutil.move(self.package_dir, self._proj_path) 168 shutil.move(Path(tmp_dir.name) / self.package, self.package_dir) 169 return True 170 return False 171 172 def update_metadata(self, metadata: metadata_pb2.MetaData) -> metadata_pb2: 173 """Updates METADATA content.""" 174 # copy only HOMEPAGE url, and then add new ARCHIVE url. 175 updated_metadata = super().update_metadata(metadata) 176 for identifier in updated_metadata.third_party.identifier: 177 if identifier.version: 178 identifier.value = f"https://static.crates.io/crates/" \ 179 f"{updated_metadata.name}/"\ 180 f"{updated_metadata.name}" \ 181 f"-{self.latest_identifier.version}.crate" 182 break 183 # copy description from Cargo.toml to METADATA 184 cargo_toml = os.path.join(self.project_path, "Cargo.toml") 185 description = self._get_cargo_description(cargo_toml) 186 if description and description != updated_metadata.description: 187 print("New METADATA description:", description) 188 updated_metadata.description = description 189 return updated_metadata 190 191 def check_for_errors(self) -> None: 192 # Check for .rej patches from failing to apply patches. 193 # If this has too many false positives, we could either 194 # check if the files are modified by patches or somehow 195 # track which files existed before the patching. 196 rejects = list(self._proj_path.glob('**/*.rej')) 197 if len(rejects) > 0: 198 print(f"Error: Found patch reject files: {str(rejects)}") 199 self._has_errors = True 200 201 def _toml2str(self, line: str) -> str: 202 """Convert a quoted toml string to a Python str without quotes.""" 203 if line.startswith("\"\"\""): 204 return "" # cannot handle broken multi-line description 205 # TOML string escapes: \b \t \n \f \r \" \\ (no unicode escape) 206 line = line[1:-1].replace("\\\\", "\n").replace("\\b", "") 207 line = line.replace("\\t", " ").replace("\\n", " ").replace("\\f", " ") 208 line = line.replace("\\r", "").replace("\\\"", "\"").replace("\n", "\\") 209 # replace a unicode quotation mark, used in the libloading crate 210 return line.replace("’", "'").strip() 211 212 def _get_cargo_description(self, cargo_toml: str) -> str: 213 """Return the description in Cargo.toml or empty string.""" 214 if os.path.isfile(cargo_toml) and os.access(cargo_toml, os.R_OK): 215 with open(cargo_toml, "r", encoding="utf-8") as toml_file: 216 for line in toml_file: 217 match = DESCRIPTION_RE.match(line) 218 if match: 219 return self._toml2str(match.group(1)) 220 return "" 221