1# Copyright 2022 The ChromiumOS Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Provides patch utilities for PATCHES.json file handling.""" 6 7import collections 8import contextlib 9import dataclasses 10import json 11from pathlib import Path 12import re 13import subprocess 14import sys 15from typing import Any, Dict, IO, Iterable, List, Optional, Tuple, Union 16 17 18CHECKED_FILE_RE = re.compile(r"^checking file\s+(.*)$") 19HUNK_FAILED_RE = re.compile(r"^Hunk #(\d+) FAILED at.*") 20HUNK_HEADER_RE = re.compile(r"^@@\s+-(\d+),(\d+)\s+\+(\d+),(\d+)\s+@@") 21HUNK_END_RE = re.compile(r"^--\s*$") 22PATCH_SUBFILE_HEADER_RE = re.compile(r"^\+\+\+ [ab]/(.*)$") 23 24 25@contextlib.contextmanager 26def atomic_write(fp: Union[Path, str], mode="w", *args, **kwargs): 27 """Write to a filepath atomically. 28 29 This works by a temp file swap, created with a .tmp suffix in 30 the same directory briefly until being renamed to the desired 31 filepath. 32 33 Args: 34 fp: Filepath to open. 35 mode: File mode; can be 'w', 'wb'. Default 'w'. 36 *args: Passed to Path.open as nargs. 37 **kwargs: Passed to Path.open as kwargs. 38 39 Raises: 40 ValueError when the mode is invalid. 41 """ 42 if isinstance(fp, str): 43 fp = Path(fp) 44 if mode not in ("w", "wb"): 45 raise ValueError(f"mode {mode} not accepted") 46 temp_fp = fp.with_suffix(fp.suffix + ".tmp") 47 try: 48 with temp_fp.open(mode, *args, **kwargs) as f: 49 yield f 50 except: 51 if temp_fp.is_file(): 52 temp_fp.unlink() 53 raise 54 temp_fp.rename(fp) 55 56 57@dataclasses.dataclass 58class Hunk: 59 """Represents a patch Hunk.""" 60 61 hunk_id: int 62 """Hunk ID for the current file.""" 63 orig_start: int 64 orig_hunk_len: int 65 patch_start: int 66 patch_hunk_len: int 67 patch_hunk_lineno_begin: int 68 patch_hunk_lineno_end: Optional[int] 69 70 71def parse_patch_stream(patch_stream: IO[str]) -> Dict[str, List[Hunk]]: 72 """Parse a patch file-like into Hunks. 73 74 Args: 75 patch_stream: A IO stream formatted like a git patch file. 76 77 Returns: 78 A dictionary mapping filenames to lists of Hunks present 79 in the patch stream. 80 """ 81 82 current_filepath = None 83 current_hunk_id = 0 84 current_hunk = None 85 out = collections.defaultdict(list) 86 for lineno, line in enumerate(patch_stream.readlines()): 87 subfile_header = PATCH_SUBFILE_HEADER_RE.match(line) 88 if subfile_header: 89 current_filepath = subfile_header.group(1) 90 if not current_filepath: 91 raise RuntimeError("Could not get file header in patch stream") 92 # Need to reset the hunk id, as it's per-file. 93 current_hunk_id = 0 94 continue 95 hunk_header = HUNK_HEADER_RE.match(line) 96 if hunk_header: 97 if not current_filepath: 98 raise RuntimeError( 99 "Parsed hunk before file header in patch stream" 100 ) 101 if current_hunk: 102 # Already parsing a hunk 103 current_hunk.patch_hunk_lineno_end = lineno 104 current_hunk_id += 1 105 current_hunk = Hunk( 106 hunk_id=current_hunk_id, 107 orig_start=int(hunk_header.group(1)), 108 orig_hunk_len=int(hunk_header.group(2)), 109 patch_start=int(hunk_header.group(3)), 110 patch_hunk_len=int(hunk_header.group(4)), 111 patch_hunk_lineno_begin=lineno + 1, 112 patch_hunk_lineno_end=None, 113 ) 114 out[current_filepath].append(current_hunk) 115 continue 116 if current_hunk and HUNK_END_RE.match(line): 117 current_hunk.patch_hunk_lineno_end = lineno 118 return out 119 120 121def parse_failed_patch_output(text: str) -> Dict[str, List[int]]: 122 current_file = None 123 failed_hunks = collections.defaultdict(list) 124 for eline in text.split("\n"): 125 checked_file_match = CHECKED_FILE_RE.match(eline) 126 if checked_file_match: 127 current_file = checked_file_match.group(1) 128 continue 129 failed_match = HUNK_FAILED_RE.match(eline) 130 if failed_match: 131 if not current_file: 132 raise ValueError("Input stream was not parsable") 133 hunk_id = int(failed_match.group(1)) 134 failed_hunks[current_file].append(hunk_id) 135 return failed_hunks 136 137 138@dataclasses.dataclass(frozen=True) 139class PatchResult: 140 """Result of a patch application.""" 141 142 succeeded: bool 143 failed_hunks: Dict[str, List[Hunk]] = dataclasses.field( 144 default_factory=dict 145 ) 146 147 def __bool__(self): 148 return self.succeeded 149 150 def failure_info(self) -> str: 151 if self.succeeded: 152 return "" 153 s = "" 154 for file, hunks in self.failed_hunks.items(): 155 s += f"{file}:\n" 156 for h in hunks: 157 s += f"Lines {h.orig_start} to {h.orig_start + h.orig_hunk_len}\n" 158 s += "--------------------\n" 159 return s 160 161 162@dataclasses.dataclass 163class PatchEntry: 164 """Object mapping of an entry of PATCHES.json.""" 165 166 workdir: Path 167 """Storage location for the patches.""" 168 metadata: Optional[Dict[str, Any]] 169 platforms: Optional[List[str]] 170 rel_patch_path: str 171 version_range: Optional[Dict[str, Optional[int]]] 172 _parsed_hunks = None 173 174 def __post_init__(self): 175 if not self.workdir.is_dir(): 176 raise ValueError(f"workdir {self.workdir} is not a directory") 177 178 @classmethod 179 def from_dict(cls, workdir: Path, data: Dict[str, Any]): 180 """Instatiate from a dictionary. 181 182 Dictionary must have at least the following key: 183 184 { 185 'rel_patch_path': '<relative patch path to workdir>', 186 } 187 188 Returns: 189 A new PatchEntry. 190 """ 191 return cls( 192 workdir, 193 data.get("metadata"), 194 data.get("platforms"), 195 data["rel_patch_path"], 196 data.get("version_range"), 197 ) 198 199 def to_dict(self) -> Dict[str, Any]: 200 out: Dict[str, Any] = { 201 "metadata": self.metadata, 202 } 203 if self.platforms: 204 # To match patch_sync, only serialized when 205 # non-empty and non-null. 206 out["platforms"] = sorted(self.platforms) 207 out.update( 208 { 209 "rel_patch_path": self.rel_patch_path, 210 "version_range": self.version_range, 211 } 212 ) 213 return out 214 215 def parsed_hunks(self) -> Dict[str, List[Hunk]]: 216 # Minor caching here because IO is slow. 217 if not self._parsed_hunks: 218 with self.patch_path().open(encoding="utf-8") as f: 219 self._parsed_hunks = parse_patch_stream(f) 220 return self._parsed_hunks 221 222 def patch_path(self) -> Path: 223 return self.workdir / self.rel_patch_path 224 225 def can_patch_version(self, svn_version: int) -> bool: 226 """Is this patch meant to apply to `svn_version`?""" 227 # Sometimes the key is there, but it's set to None. 228 if not self.version_range: 229 return True 230 from_v = self.version_range.get("from") or 0 231 until_v = self.version_range.get("until") 232 if until_v is None: 233 until_v = sys.maxsize 234 return from_v <= svn_version < until_v 235 236 def is_old(self, svn_version: int) -> bool: 237 """Is this patch old compared to `svn_version`?""" 238 if not self.version_range: 239 return False 240 until_v = self.version_range.get("until") 241 # Sometimes the key is there, but it's set to None. 242 if until_v is None: 243 until_v = sys.maxsize 244 return svn_version >= until_v 245 246 def apply( 247 self, root_dir: Path, extra_args: Optional[List[str]] = None 248 ) -> PatchResult: 249 """Apply a patch to a given directory.""" 250 if not extra_args: 251 extra_args = [] 252 # Cmd to apply a patch in the src unpack path. 253 abs_patch_path = self.patch_path().absolute() 254 if not abs_patch_path.is_file(): 255 raise RuntimeError( 256 f"Cannot apply: patch {abs_patch_path} is not a file" 257 ) 258 cmd = [ 259 "patch", 260 "-d", 261 root_dir.absolute(), 262 "-f", 263 "-p1", 264 "--no-backup-if-mismatch", 265 "-i", 266 abs_patch_path, 267 ] + extra_args 268 try: 269 subprocess.run( 270 cmd, encoding="utf-8", check=True, stdout=subprocess.PIPE 271 ) 272 except subprocess.CalledProcessError as e: 273 parsed_hunks = self.parsed_hunks() 274 failed_hunks_id_dict = parse_failed_patch_output(e.stdout) 275 failed_hunks = {} 276 for path, failed_hunk_ids in failed_hunks_id_dict.items(): 277 hunks_for_file = parsed_hunks[path] 278 failed_hunks[path] = [ 279 hunk 280 for hunk in hunks_for_file 281 if hunk.hunk_id in failed_hunk_ids 282 ] 283 return PatchResult(succeeded=False, failed_hunks=failed_hunks) 284 return PatchResult(succeeded=True) 285 286 def test_apply(self, root_dir: Path) -> PatchResult: 287 """Dry run applying a patch to a given directory.""" 288 return self.apply(root_dir, ["--dry-run"]) 289 290 def title(self) -> str: 291 if not self.metadata: 292 return "" 293 return self.metadata.get("title", "") 294 295 296@dataclasses.dataclass(frozen=True) 297class PatchInfo: 298 """Holds info for a round of patch applications.""" 299 300 # str types are legacy. Patch lists should 301 # probably be PatchEntries, 302 applied_patches: List[PatchEntry] 303 failed_patches: List[PatchEntry] 304 # Can be deleted once legacy code is removed. 305 non_applicable_patches: List[str] 306 # Can be deleted once legacy code is removed. 307 disabled_patches: List[str] 308 # Can be deleted once legacy code is removed. 309 removed_patches: List[str] 310 # Can be deleted once legacy code is removed. 311 modified_metadata: Optional[str] 312 313 def _asdict(self): 314 return dataclasses.asdict(self) 315 316 317def json_to_patch_entries(workdir: Path, json_fd: IO[str]) -> List[PatchEntry]: 318 """Convert a json IO object to List[PatchEntry]. 319 320 Examples: 321 >>> f = open('PATCHES.json') 322 >>> patch_entries = json_to_patch_entries(Path(), f) 323 """ 324 return [PatchEntry.from_dict(workdir, d) for d in json.load(json_fd)] 325 326 327def _print_failed_patch(pe: PatchEntry, failed_hunks: Dict[str, List[Hunk]]): 328 """Print information about a single failing PatchEntry. 329 330 Args: 331 pe: A PatchEntry that failed. 332 failed_hunks: Hunks for pe which failed as dict: 333 filepath: [Hunk...] 334 """ 335 print(f"Could not apply {pe.rel_patch_path}: {pe.title()}", file=sys.stderr) 336 for fp, hunks in failed_hunks.items(): 337 print(f"{fp}:", file=sys.stderr) 338 for h in hunks: 339 print( 340 f"- {pe.rel_patch_path} " 341 f"l:{h.patch_hunk_lineno_begin}...{h.patch_hunk_lineno_end}", 342 file=sys.stderr, 343 ) 344 345 346def apply_all_from_json( 347 svn_version: int, 348 llvm_src_dir: Path, 349 patches_json_fp: Path, 350 continue_on_failure: bool = False, 351) -> PatchInfo: 352 """Attempt to apply some patches to a given LLVM source tree. 353 354 This relies on a PATCHES.json file to be the primary way 355 the patches are applied. 356 357 Args: 358 svn_version: LLVM Subversion revision to patch. 359 llvm_src_dir: llvm-project root-level source directory to patch. 360 patches_json_fp: Filepath to the PATCHES.json file. 361 continue_on_failure: Skip any patches which failed to apply, 362 rather than throw an Exception. 363 """ 364 with patches_json_fp.open(encoding="utf-8") as f: 365 patches = json_to_patch_entries(patches_json_fp.parent, f) 366 skipped_patches = [] 367 failed_patches = [] 368 applied_patches = [] 369 for pe in patches: 370 applied, failed_hunks = apply_single_patch_entry( 371 svn_version, llvm_src_dir, pe 372 ) 373 if applied: 374 applied_patches.append(pe) 375 continue 376 if failed_hunks is not None: 377 if continue_on_failure: 378 failed_patches.append(pe) 379 continue 380 else: 381 _print_failed_patch(pe, failed_hunks) 382 raise RuntimeError( 383 "failed to apply patch " f"{pe.patch_path()}: {pe.title()}" 384 ) 385 # Didn't apply, didn't fail, it was skipped. 386 skipped_patches.append(pe) 387 return PatchInfo( 388 non_applicable_patches=skipped_patches, 389 applied_patches=applied_patches, 390 failed_patches=failed_patches, 391 disabled_patches=[], 392 removed_patches=[], 393 modified_metadata=None, 394 ) 395 396 397def apply_single_patch_entry( 398 svn_version: int, 399 llvm_src_dir: Path, 400 pe: PatchEntry, 401 ignore_version_range: bool = False, 402) -> Tuple[bool, Optional[Dict[str, List[Hunk]]]]: 403 """Try to apply a single PatchEntry object. 404 405 Returns: 406 Tuple where the first element indicates whether the patch applied, 407 and the second element is a faild hunk mapping from file name to lists of 408 hunks (if the patch didn't apply). 409 """ 410 # Don't apply patches outside of the version range. 411 if not ignore_version_range and not pe.can_patch_version(svn_version): 412 return False, None 413 # Test first to avoid making changes. 414 test_application = pe.test_apply(llvm_src_dir) 415 if not test_application: 416 return False, test_application.failed_hunks 417 # Now actually make changes. 418 application_result = pe.apply(llvm_src_dir) 419 if not application_result: 420 # This should be very rare/impossible. 421 return False, application_result.failed_hunks 422 return True, None 423 424 425def is_git_dirty(git_root_dir: Path) -> bool: 426 """Return whether the given git directory has uncommitted changes.""" 427 if not git_root_dir.is_dir(): 428 raise ValueError(f"git_root_dir {git_root_dir} is not a directory") 429 cmd = ["git", "ls-files", "-m", "--other", "--exclude-standard"] 430 return ( 431 subprocess.run( 432 cmd, 433 stdout=subprocess.PIPE, 434 check=True, 435 cwd=git_root_dir, 436 encoding="utf-8", 437 ).stdout 438 != "" 439 ) 440 441 442def clean_src_tree(src_path): 443 """Cleans the source tree of the changes made in 'src_path'.""" 444 445 reset_src_tree_cmd = ["git", "-C", src_path, "reset", "HEAD", "--hard"] 446 447 subprocess.run(reset_src_tree_cmd, check=True) 448 449 clean_src_tree_cmd = ["git", "-C", src_path, "clean", "-fd"] 450 451 subprocess.run(clean_src_tree_cmd, check=True) 452 453 454@contextlib.contextmanager 455def git_clean_context(git_root_dir: Path): 456 """Cleans up a git directory when the context exits.""" 457 if is_git_dirty(git_root_dir): 458 raise RuntimeError("Cannot setup clean context; git_root_dir is dirty") 459 try: 460 yield 461 finally: 462 clean_src_tree(git_root_dir) 463 464 465def _write_json_changes(patches: List[Dict[str, Any]], file_io: IO[str]): 466 """Write JSON changes to file, does not acquire new file lock.""" 467 json.dump(patches, file_io, indent=4, separators=(",", ": ")) 468 # Need to add a newline as json.dump omits it. 469 file_io.write("\n") 470 471 472def update_version_ranges( 473 svn_version: int, llvm_src_dir: Path, patches_json_fp: Path 474) -> PatchInfo: 475 """Reduce the version ranges of failing patches. 476 477 Patches which fail to apply will have their 'version_range.until' 478 field reduced to the passed in svn_version. 479 480 Modifies the contents of patches_json_fp. 481 482 Args: 483 svn_version: LLVM revision number. 484 llvm_src_dir: llvm-project directory path. 485 patches_json_fp: Filepath to the PATCHES.json file. 486 487 Returns: 488 PatchInfo for applied and disabled patches. 489 """ 490 with patches_json_fp.open(encoding="utf-8") as f: 491 patch_entries = json_to_patch_entries( 492 patches_json_fp.parent, 493 f, 494 ) 495 modified_entries, applied_patches = update_version_ranges_with_entries( 496 svn_version, llvm_src_dir, patch_entries 497 ) 498 with atomic_write(patches_json_fp, encoding="utf-8") as f: 499 _write_json_changes([p.to_dict() for p in patch_entries], f) 500 for entry in modified_entries: 501 print( 502 f"Stopped applying {entry.rel_patch_path} ({entry.title()}) " 503 f"for r{svn_version}" 504 ) 505 return PatchInfo( 506 non_applicable_patches=[], 507 applied_patches=applied_patches, 508 failed_patches=[], 509 disabled_patches=[p.rel_patch_path for p in modified_entries], 510 removed_patches=[], 511 modified_metadata=str(patches_json_fp) if modified_entries else None, 512 ) 513 514 515def update_version_ranges_with_entries( 516 svn_version: int, 517 llvm_src_dir: Path, 518 patch_entries: Iterable[PatchEntry], 519) -> Tuple[List[PatchEntry], List[PatchEntry]]: 520 """Test-able helper for UpdateVersionRanges. 521 522 Args: 523 svn_version: LLVM revision number. 524 llvm_src_dir: llvm-project directory path. 525 patch_entries: PatchEntry objects to modify. 526 527 Returns: 528 Tuple of (modified entries, applied patches) 529 530 Post: 531 Modifies patch_entries in place. 532 """ 533 modified_entries: List[PatchEntry] = [] 534 applied_patches: List[PatchEntry] = [] 535 active_patches = (pe for pe in patch_entries if not pe.is_old(svn_version)) 536 with git_clean_context(llvm_src_dir): 537 for pe in active_patches: 538 test_result = pe.test_apply(llvm_src_dir) 539 if not test_result: 540 if pe.version_range is None: 541 pe.version_range = {} 542 pe.version_range["until"] = svn_version 543 modified_entries.append(pe) 544 else: 545 # We have to actually apply the patch so that future patches 546 # will stack properly. 547 if not pe.apply(llvm_src_dir).succeeded: 548 raise RuntimeError( 549 "Could not apply patch that dry ran successfully" 550 ) 551 applied_patches.append(pe) 552 553 return modified_entries, applied_patches 554 555 556def remove_old_patches( 557 svn_version: int, llvm_src_dir: Path, patches_json_fp: Path 558) -> PatchInfo: 559 """Remove patches that don't and will never apply for the future. 560 561 Patches are determined to be "old" via the "is_old" method for 562 each patch entry. 563 564 Args: 565 svn_version: LLVM SVN version. 566 llvm_src_dir: LLVM source directory. 567 patches_json_fp: Location to edit patches on. 568 569 Returns: 570 PatchInfo for modified patches. 571 """ 572 with patches_json_fp.open(encoding="utf-8") as f: 573 patches_list = json.load(f) 574 patch_entries = ( 575 PatchEntry.from_dict(llvm_src_dir, elem) for elem in patches_list 576 ) 577 oldness = [(entry, entry.is_old(svn_version)) for entry in patch_entries] 578 filtered_entries = [entry.to_dict() for entry, old in oldness if not old] 579 with atomic_write(patches_json_fp, encoding="utf-8") as f: 580 _write_json_changes(filtered_entries, f) 581 removed_entries = [entry for entry, old in oldness if old] 582 plural_patches = "patch" if len(removed_entries) == 1 else "patches" 583 print(f"Removed {len(removed_entries)} old {plural_patches}:") 584 for r in removed_entries: 585 print(f"- {r.rel_patch_path}: {r.title()}") 586 587 return PatchInfo( 588 non_applicable_patches=[], 589 applied_patches=[], 590 failed_patches=[], 591 disabled_patches=[], 592 removed_patches=[p.rel_patch_path for p in removed_entries], 593 modified_metadata=str(patches_json_fp) if removed_entries else None, 594 ) 595