1# Copyright (C) 2020 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Module to check updates from crates.io.""" 15 16import json 17import os 18# pylint: disable=g-importing-member 19from pathlib import Path 20import re 21import shutil 22import tempfile 23import urllib.request 24from typing import IO 25 26import archive_utils 27from base_updater import Updater 28# pylint: disable=import-error 29import metadata_pb2 # type: ignore 30import updater_utils 31 32LIBRARY_NAME_PATTERN: str = (r"([-\w]+)") 33 34ALPHA_BETA_PATTERN: str = (r"^.*[0-9]+\.[0-9]+\.[0-9]+-(alpha|beta).*") 35 36ALPHA_BETA_RE: re.Pattern = re.compile(ALPHA_BETA_PATTERN) 37 38VERSION_PATTERN: str = (r"([0-9]+)\.([0-9]+)\.([0-9]+)") 39 40VERSION_MATCHER: re.Pattern = re.compile(VERSION_PATTERN) 41 42CRATES_IO_ARCHIVE_URL_PATTERN: str = (r"^https:\/\/static.crates.io\/crates\/" + 43 LIBRARY_NAME_PATTERN + "/" + 44 LIBRARY_NAME_PATTERN + "-" + 45 VERSION_PATTERN + ".crate") 46 47CRATES_IO_ARCHIVE_URL_RE: re.Pattern = re.compile(CRATES_IO_ARCHIVE_URL_PATTERN) 48 49DESCRIPTION_PATTERN: str = (r"^description *= *(\".+\")") 50 51DESCRIPTION_MATCHER: re.Pattern = re.compile(DESCRIPTION_PATTERN) 52 53 54class CratesUpdater(Updater): 55 """Updater for crates.io packages.""" 56 57 download_url: str 58 package: str 59 package_dir: str 60 temp_file: IO 61 62 def is_supported_url(self) -> bool: 63 match = CRATES_IO_ARCHIVE_URL_RE.match(self._old_url.value) 64 if match is None: 65 return False 66 self.package = match.group(1) 67 return True 68 69 def _get_version_numbers(self, version: str) -> tuple[int, int, int]: 70 match = VERSION_MATCHER.match(version) 71 if match is not None: 72 return ( 73 int(match.group(1)), 74 int(match.group(2)), 75 int(match.group(3)), 76 ) 77 return (0, 0, 0) 78 79 def _is_newer_version(self, prev_version: str, prev_id: int, 80 check_version: str, check_id: int): 81 """Return true if check_version+id is newer than prev_version+id.""" 82 return ((self._get_version_numbers(check_version), check_id) > 83 (self._get_version_numbers(prev_version), prev_id)) 84 85 def _find_latest_non_test_version(self) -> None: 86 url = f"https://crates.io/api/v1/crates/{self.package}/versions" 87 with urllib.request.urlopen(url) as request: 88 data = json.loads(request.read().decode()) 89 last_id = 0 90 self._new_ver = "" 91 for v in data["versions"]: 92 version = v["num"] 93 if (not v["yanked"] and not ALPHA_BETA_RE.match(version) and 94 self._is_newer_version( 95 self._new_ver, last_id, version, int(v["id"]))): 96 last_id = int(v["id"]) 97 self._new_ver = version 98 self.download_url = "https://crates.io" + v["dl_path"] 99 100 def check(self) -> None: 101 """Checks crates.io and returns whether a new version is available.""" 102 url = "https://crates.io/api/v1/crates/" + self.package 103 with urllib.request.urlopen(url) as request: 104 data = json.loads(request.read().decode()) 105 self._new_ver = data["crate"]["max_version"] 106 # Skip d.d.d-{alpha,beta}* versions 107 if ALPHA_BETA_RE.match(self._new_ver): 108 print(f"Ignore alpha or beta release: {self.package}-{self._new_ver}.") 109 self._find_latest_non_test_version() 110 else: 111 url = url + "/" + self._new_ver 112 with urllib.request.urlopen(url) as request: 113 data = json.loads(request.read().decode()) 114 self.download_url = "https://crates.io" + data["version"]["dl_path"] 115 116 def use_current_as_latest(self): 117 Updater.use_current_as_latest(self) 118 # A shortcut to use the static download path. 119 self.download_url = f"https://static.crates.io/crates/{self.package}/" \ 120 f"{self.package}-{self._new_ver}.crate" 121 122 def update(self, skip_post_update: bool) -> None: 123 """Updates the package. 124 125 Has to call check() before this function. 126 """ 127 try: 128 temporary_dir = archive_utils.download_and_extract(self.download_url) 129 self.package_dir = archive_utils.find_archive_root(temporary_dir) 130 self.temp_file = tempfile.NamedTemporaryFile() 131 updater_utils.replace_package(self.package_dir, self._proj_path, 132 self.temp_file.name) 133 self.check_for_errors() 134 finally: 135 urllib.request.urlcleanup() 136 137 def rollback(self) -> bool: 138 # Only rollback if we have already swapped, 139 # which we denote by writing to this file. 140 if os.fstat(self.temp_file.fileno()).st_size > 0: 141 tmp_dir = tempfile.TemporaryDirectory() 142 shutil.move(self._proj_path, tmp_dir.name) 143 shutil.move(self.package_dir, self._proj_path) 144 shutil.move(Path(tmp_dir.name) / self.package, self.package_dir) 145 return True 146 return False 147 148 # pylint: disable=no-self-use 149 def update_metadata(self, metadata: metadata_pb2.MetaData, 150 full_path: Path) -> None: 151 """Updates METADATA content.""" 152 # copy only HOMEPAGE url, and then add new ARCHIVE url. 153 new_url_list = [] 154 for url in metadata.third_party.url: 155 if url.type == metadata_pb2.URL.HOMEPAGE: 156 new_url_list.append(url) 157 new_url = metadata_pb2.URL() 158 new_url.type = metadata_pb2.URL.ARCHIVE 159 new_url.value = f"https://static.crates.io/crates/{metadata.name}/" \ 160 f"{metadata.name}-{metadata.third_party.version}.crate" 161 new_url_list.append(new_url) 162 del metadata.third_party.url[:] 163 metadata.third_party.url.extend(new_url_list) 164 # copy description from Cargo.toml to METADATA 165 cargo_toml = os.path.join(full_path, "Cargo.toml") 166 description = self._get_cargo_description(cargo_toml) 167 if description and description != metadata.description: 168 print("New METADATA description:", description) 169 metadata.description = description 170 171 def check_for_errors(self) -> None: 172 # Check for .rej patches from failing to apply patches. 173 # If this has too many false positives, we could either 174 # check if the files are modified by patches or somehow 175 # track which files existed before the patching. 176 rejects = list(self._proj_path.glob('**/*.rej')) 177 if len(rejects) > 0: 178 print(f"Error: Found patch reject files: {str(rejects)}") 179 self._has_errors = True 180 # Check for Cargo errors embedded in Android.bp. 181 # Note that this should stay in sync with cargo2android.py. 182 with open(f'{self._proj_path}/Android.bp', 'r') as bp_file: 183 for line in bp_file: 184 if line.strip() == "Errors in cargo.out:": 185 print("Error: Found Cargo errors in Android.bp") 186 self._has_errors = True 187 return 188 189 def _toml2str(self, line: str) -> str: 190 """Convert a quoted toml string to a Python str without quotes.""" 191 if line.startswith("\"\"\""): 192 return "" # cannot handle broken multi-line description 193 # TOML string escapes: \b \t \n \f \r \" \\ (no unicode escape) 194 line = line[1:-1].replace("\\\\", "\n").replace("\\b", "") 195 line = line.replace("\\t", " ").replace("\\n", " ").replace("\\f", " ") 196 line = line.replace("\\r", "").replace("\\\"", "\"").replace("\n", "\\") 197 # replace a unicode quotation mark, used in the libloading crate 198 return line.replace("’", "'").strip() 199 200 def _get_cargo_description(self, cargo_toml: str) -> str: 201 """Return the description in Cargo.toml or empty string.""" 202 if os.path.isfile(cargo_toml) and os.access(cargo_toml, os.R_OK): 203 with open(cargo_toml, "r") as toml_file: 204 for line in toml_file: 205 match = DESCRIPTION_MATCHER.match(line) 206 if match: 207 return self._toml2str(match.group(1)) 208 return "" 209