• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The ChromiumOS Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Provides patch utilities for PATCHES.json file handling."""
6
7import collections
8import contextlib
9import dataclasses
10import json
11from pathlib import Path
12import re
13import subprocess
14import sys
15from typing import Any, Dict, IO, Iterable, List, Optional, Tuple, Union
16
17
18CHECKED_FILE_RE = re.compile(r"^checking file\s+(.*)$")
19HUNK_FAILED_RE = re.compile(r"^Hunk #(\d+) FAILED at.*")
20HUNK_HEADER_RE = re.compile(r"^@@\s+-(\d+),(\d+)\s+\+(\d+),(\d+)\s+@@")
21HUNK_END_RE = re.compile(r"^--\s*$")
22PATCH_SUBFILE_HEADER_RE = re.compile(r"^\+\+\+ [ab]/(.*)$")
23
24
25@contextlib.contextmanager
26def atomic_write(fp: Union[Path, str], mode="w", *args, **kwargs):
27    """Write to a filepath atomically.
28
29    This works by a temp file swap, created with a .tmp suffix in
30    the same directory briefly until being renamed to the desired
31    filepath.
32
33    Args:
34      fp: Filepath to open.
35      mode: File mode; can be 'w', 'wb'. Default 'w'.
36      *args: Passed to Path.open as nargs.
37      **kwargs: Passed to Path.open as kwargs.
38
39    Raises:
40      ValueError when the mode is invalid.
41    """
42    if isinstance(fp, str):
43        fp = Path(fp)
44    if mode not in ("w", "wb"):
45        raise ValueError(f"mode {mode} not accepted")
46    temp_fp = fp.with_suffix(fp.suffix + ".tmp")
47    try:
48        with temp_fp.open(mode, *args, **kwargs) as f:
49            yield f
50    except:
51        if temp_fp.is_file():
52            temp_fp.unlink()
53        raise
54    temp_fp.rename(fp)
55
56
57@dataclasses.dataclass
58class Hunk:
59    """Represents a patch Hunk."""
60
61    hunk_id: int
62    """Hunk ID for the current file."""
63    orig_start: int
64    orig_hunk_len: int
65    patch_start: int
66    patch_hunk_len: int
67    patch_hunk_lineno_begin: int
68    patch_hunk_lineno_end: Optional[int]
69
70
71def parse_patch_stream(patch_stream: IO[str]) -> Dict[str, List[Hunk]]:
72    """Parse a patch file-like into Hunks.
73
74    Args:
75      patch_stream: A IO stream formatted like a git patch file.
76
77    Returns:
78      A dictionary mapping filenames to lists of Hunks present
79      in the patch stream.
80    """
81
82    current_filepath = None
83    current_hunk_id = 0
84    current_hunk = None
85    out = collections.defaultdict(list)
86    for lineno, line in enumerate(patch_stream.readlines()):
87        subfile_header = PATCH_SUBFILE_HEADER_RE.match(line)
88        if subfile_header:
89            current_filepath = subfile_header.group(1)
90            if not current_filepath:
91                raise RuntimeError("Could not get file header in patch stream")
92            # Need to reset the hunk id, as it's per-file.
93            current_hunk_id = 0
94            continue
95        hunk_header = HUNK_HEADER_RE.match(line)
96        if hunk_header:
97            if not current_filepath:
98                raise RuntimeError(
99                    "Parsed hunk before file header in patch stream"
100                )
101            if current_hunk:
102                # Already parsing a hunk
103                current_hunk.patch_hunk_lineno_end = lineno
104            current_hunk_id += 1
105            current_hunk = Hunk(
106                hunk_id=current_hunk_id,
107                orig_start=int(hunk_header.group(1)),
108                orig_hunk_len=int(hunk_header.group(2)),
109                patch_start=int(hunk_header.group(3)),
110                patch_hunk_len=int(hunk_header.group(4)),
111                patch_hunk_lineno_begin=lineno + 1,
112                patch_hunk_lineno_end=None,
113            )
114            out[current_filepath].append(current_hunk)
115            continue
116        if current_hunk and HUNK_END_RE.match(line):
117            current_hunk.patch_hunk_lineno_end = lineno
118    return out
119
120
121def parse_failed_patch_output(text: str) -> Dict[str, List[int]]:
122    current_file = None
123    failed_hunks = collections.defaultdict(list)
124    for eline in text.split("\n"):
125        checked_file_match = CHECKED_FILE_RE.match(eline)
126        if checked_file_match:
127            current_file = checked_file_match.group(1)
128            continue
129        failed_match = HUNK_FAILED_RE.match(eline)
130        if failed_match:
131            if not current_file:
132                raise ValueError("Input stream was not parsable")
133            hunk_id = int(failed_match.group(1))
134            failed_hunks[current_file].append(hunk_id)
135    return failed_hunks
136
137
138@dataclasses.dataclass(frozen=True)
139class PatchResult:
140    """Result of a patch application."""
141
142    succeeded: bool
143    failed_hunks: Dict[str, List[Hunk]] = dataclasses.field(
144        default_factory=dict
145    )
146
147    def __bool__(self):
148        return self.succeeded
149
150    def failure_info(self) -> str:
151        if self.succeeded:
152            return ""
153        s = ""
154        for file, hunks in self.failed_hunks.items():
155            s += f"{file}:\n"
156            for h in hunks:
157                s += f"Lines {h.orig_start} to {h.orig_start + h.orig_hunk_len}\n"
158            s += "--------------------\n"
159        return s
160
161
162@dataclasses.dataclass
163class PatchEntry:
164    """Object mapping of an entry of PATCHES.json."""
165
166    workdir: Path
167    """Storage location for the patches."""
168    metadata: Optional[Dict[str, Any]]
169    platforms: Optional[List[str]]
170    rel_patch_path: str
171    version_range: Optional[Dict[str, Optional[int]]]
172    _parsed_hunks = None
173
174    def __post_init__(self):
175        if not self.workdir.is_dir():
176            raise ValueError(f"workdir {self.workdir} is not a directory")
177
178    @classmethod
179    def from_dict(cls, workdir: Path, data: Dict[str, Any]):
180        """Instatiate from a dictionary.
181
182        Dictionary must have at least the following key:
183
184          {
185            'rel_patch_path': '<relative patch path to workdir>',
186          }
187
188        Returns:
189          A new PatchEntry.
190        """
191        return cls(
192            workdir,
193            data.get("metadata"),
194            data.get("platforms"),
195            data["rel_patch_path"],
196            data.get("version_range"),
197        )
198
199    def to_dict(self) -> Dict[str, Any]:
200        out: Dict[str, Any] = {
201            "metadata": self.metadata,
202        }
203        if self.platforms:
204            # To match patch_sync, only serialized when
205            # non-empty and non-null.
206            out["platforms"] = sorted(self.platforms)
207        out.update(
208            {
209                "rel_patch_path": self.rel_patch_path,
210                "version_range": self.version_range,
211            }
212        )
213        return out
214
215    def parsed_hunks(self) -> Dict[str, List[Hunk]]:
216        # Minor caching here because IO is slow.
217        if not self._parsed_hunks:
218            with self.patch_path().open(encoding="utf-8") as f:
219                self._parsed_hunks = parse_patch_stream(f)
220        return self._parsed_hunks
221
222    def patch_path(self) -> Path:
223        return self.workdir / self.rel_patch_path
224
225    def can_patch_version(self, svn_version: int) -> bool:
226        """Is this patch meant to apply to `svn_version`?"""
227        # Sometimes the key is there, but it's set to None.
228        if not self.version_range:
229            return True
230        from_v = self.version_range.get("from") or 0
231        until_v = self.version_range.get("until")
232        if until_v is None:
233            until_v = sys.maxsize
234        return from_v <= svn_version < until_v
235
236    def is_old(self, svn_version: int) -> bool:
237        """Is this patch old compared to `svn_version`?"""
238        if not self.version_range:
239            return False
240        until_v = self.version_range.get("until")
241        # Sometimes the key is there, but it's set to None.
242        if until_v is None:
243            until_v = sys.maxsize
244        return svn_version >= until_v
245
246    def apply(
247        self, root_dir: Path, extra_args: Optional[List[str]] = None
248    ) -> PatchResult:
249        """Apply a patch to a given directory."""
250        if not extra_args:
251            extra_args = []
252        # Cmd to apply a patch in the src unpack path.
253        abs_patch_path = self.patch_path().absolute()
254        if not abs_patch_path.is_file():
255            raise RuntimeError(
256                f"Cannot apply: patch {abs_patch_path} is not a file"
257            )
258        cmd = [
259            "patch",
260            "-d",
261            root_dir.absolute(),
262            "-f",
263            "-p1",
264            "--no-backup-if-mismatch",
265            "-i",
266            abs_patch_path,
267        ] + extra_args
268        try:
269            subprocess.run(
270                cmd, encoding="utf-8", check=True, stdout=subprocess.PIPE
271            )
272        except subprocess.CalledProcessError as e:
273            parsed_hunks = self.parsed_hunks()
274            failed_hunks_id_dict = parse_failed_patch_output(e.stdout)
275            failed_hunks = {}
276            for path, failed_hunk_ids in failed_hunks_id_dict.items():
277                hunks_for_file = parsed_hunks[path]
278                failed_hunks[path] = [
279                    hunk
280                    for hunk in hunks_for_file
281                    if hunk.hunk_id in failed_hunk_ids
282                ]
283            return PatchResult(succeeded=False, failed_hunks=failed_hunks)
284        return PatchResult(succeeded=True)
285
286    def test_apply(self, root_dir: Path) -> PatchResult:
287        """Dry run applying a patch to a given directory."""
288        return self.apply(root_dir, ["--dry-run"])
289
290    def title(self) -> str:
291        if not self.metadata:
292            return ""
293        return self.metadata.get("title", "")
294
295
296@dataclasses.dataclass(frozen=True)
297class PatchInfo:
298    """Holds info for a round of patch applications."""
299
300    # str types are legacy. Patch lists should
301    # probably be PatchEntries,
302    applied_patches: List[PatchEntry]
303    failed_patches: List[PatchEntry]
304    # Can be deleted once legacy code is removed.
305    non_applicable_patches: List[str]
306    # Can be deleted once legacy code is removed.
307    disabled_patches: List[str]
308    # Can be deleted once legacy code is removed.
309    removed_patches: List[str]
310    # Can be deleted once legacy code is removed.
311    modified_metadata: Optional[str]
312
313    def _asdict(self):
314        return dataclasses.asdict(self)
315
316
317def json_to_patch_entries(workdir: Path, json_fd: IO[str]) -> List[PatchEntry]:
318    """Convert a json IO object to List[PatchEntry].
319
320    Examples:
321      >>> f = open('PATCHES.json')
322      >>> patch_entries = json_to_patch_entries(Path(), f)
323    """
324    return [PatchEntry.from_dict(workdir, d) for d in json.load(json_fd)]
325
326
327def _print_failed_patch(pe: PatchEntry, failed_hunks: Dict[str, List[Hunk]]):
328    """Print information about a single failing PatchEntry.
329
330    Args:
331      pe: A PatchEntry that failed.
332      failed_hunks: Hunks for pe which failed as dict:
333        filepath: [Hunk...]
334    """
335    print(f"Could not apply {pe.rel_patch_path}: {pe.title()}", file=sys.stderr)
336    for fp, hunks in failed_hunks.items():
337        print(f"{fp}:", file=sys.stderr)
338        for h in hunks:
339            print(
340                f"- {pe.rel_patch_path} "
341                f"l:{h.patch_hunk_lineno_begin}...{h.patch_hunk_lineno_end}",
342                file=sys.stderr,
343            )
344
345
346def apply_all_from_json(
347    svn_version: int,
348    llvm_src_dir: Path,
349    patches_json_fp: Path,
350    continue_on_failure: bool = False,
351) -> PatchInfo:
352    """Attempt to apply some patches to a given LLVM source tree.
353
354    This relies on a PATCHES.json file to be the primary way
355    the patches are applied.
356
357    Args:
358      svn_version: LLVM Subversion revision to patch.
359      llvm_src_dir: llvm-project root-level source directory to patch.
360      patches_json_fp: Filepath to the PATCHES.json file.
361      continue_on_failure: Skip any patches which failed to apply,
362        rather than throw an Exception.
363    """
364    with patches_json_fp.open(encoding="utf-8") as f:
365        patches = json_to_patch_entries(patches_json_fp.parent, f)
366    skipped_patches = []
367    failed_patches = []
368    applied_patches = []
369    for pe in patches:
370        applied, failed_hunks = apply_single_patch_entry(
371            svn_version, llvm_src_dir, pe
372        )
373        if applied:
374            applied_patches.append(pe)
375            continue
376        if failed_hunks is not None:
377            if continue_on_failure:
378                failed_patches.append(pe)
379                continue
380            else:
381                _print_failed_patch(pe, failed_hunks)
382                raise RuntimeError(
383                    "failed to apply patch " f"{pe.patch_path()}: {pe.title()}"
384                )
385        # Didn't apply, didn't fail, it was skipped.
386        skipped_patches.append(pe)
387    return PatchInfo(
388        non_applicable_patches=skipped_patches,
389        applied_patches=applied_patches,
390        failed_patches=failed_patches,
391        disabled_patches=[],
392        removed_patches=[],
393        modified_metadata=None,
394    )
395
396
397def apply_single_patch_entry(
398    svn_version: int,
399    llvm_src_dir: Path,
400    pe: PatchEntry,
401    ignore_version_range: bool = False,
402) -> Tuple[bool, Optional[Dict[str, List[Hunk]]]]:
403    """Try to apply a single PatchEntry object.
404
405    Returns:
406      Tuple where the first element indicates whether the patch applied,
407      and the second element is a faild hunk mapping from file name to lists of
408      hunks (if the patch didn't apply).
409    """
410    # Don't apply patches outside of the version range.
411    if not ignore_version_range and not pe.can_patch_version(svn_version):
412        return False, None
413    # Test first to avoid making changes.
414    test_application = pe.test_apply(llvm_src_dir)
415    if not test_application:
416        return False, test_application.failed_hunks
417    # Now actually make changes.
418    application_result = pe.apply(llvm_src_dir)
419    if not application_result:
420        # This should be very rare/impossible.
421        return False, application_result.failed_hunks
422    return True, None
423
424
425def is_git_dirty(git_root_dir: Path) -> bool:
426    """Return whether the given git directory has uncommitted changes."""
427    if not git_root_dir.is_dir():
428        raise ValueError(f"git_root_dir {git_root_dir} is not a directory")
429    cmd = ["git", "ls-files", "-m", "--other", "--exclude-standard"]
430    return (
431        subprocess.run(
432            cmd,
433            stdout=subprocess.PIPE,
434            check=True,
435            cwd=git_root_dir,
436            encoding="utf-8",
437        ).stdout
438        != ""
439    )
440
441
442def clean_src_tree(src_path):
443    """Cleans the source tree of the changes made in 'src_path'."""
444
445    reset_src_tree_cmd = ["git", "-C", src_path, "reset", "HEAD", "--hard"]
446
447    subprocess.run(reset_src_tree_cmd, check=True)
448
449    clean_src_tree_cmd = ["git", "-C", src_path, "clean", "-fd"]
450
451    subprocess.run(clean_src_tree_cmd, check=True)
452
453
454@contextlib.contextmanager
455def git_clean_context(git_root_dir: Path):
456    """Cleans up a git directory when the context exits."""
457    if is_git_dirty(git_root_dir):
458        raise RuntimeError("Cannot setup clean context; git_root_dir is dirty")
459    try:
460        yield
461    finally:
462        clean_src_tree(git_root_dir)
463
464
465def _write_json_changes(patches: List[Dict[str, Any]], file_io: IO[str]):
466    """Write JSON changes to file, does not acquire new file lock."""
467    json.dump(patches, file_io, indent=4, separators=(",", ": "))
468    # Need to add a newline as json.dump omits it.
469    file_io.write("\n")
470
471
472def update_version_ranges(
473    svn_version: int, llvm_src_dir: Path, patches_json_fp: Path
474) -> PatchInfo:
475    """Reduce the version ranges of failing patches.
476
477    Patches which fail to apply will have their 'version_range.until'
478    field reduced to the passed in svn_version.
479
480    Modifies the contents of patches_json_fp.
481
482    Args:
483      svn_version: LLVM revision number.
484      llvm_src_dir: llvm-project directory path.
485      patches_json_fp: Filepath to the PATCHES.json file.
486
487    Returns:
488      PatchInfo for applied and disabled patches.
489    """
490    with patches_json_fp.open(encoding="utf-8") as f:
491        patch_entries = json_to_patch_entries(
492            patches_json_fp.parent,
493            f,
494        )
495    modified_entries, applied_patches = update_version_ranges_with_entries(
496        svn_version, llvm_src_dir, patch_entries
497    )
498    with atomic_write(patches_json_fp, encoding="utf-8") as f:
499        _write_json_changes([p.to_dict() for p in patch_entries], f)
500    for entry in modified_entries:
501        print(
502            f"Stopped applying {entry.rel_patch_path} ({entry.title()}) "
503            f"for r{svn_version}"
504        )
505    return PatchInfo(
506        non_applicable_patches=[],
507        applied_patches=applied_patches,
508        failed_patches=[],
509        disabled_patches=[p.rel_patch_path for p in modified_entries],
510        removed_patches=[],
511        modified_metadata=str(patches_json_fp) if modified_entries else None,
512    )
513
514
515def update_version_ranges_with_entries(
516    svn_version: int,
517    llvm_src_dir: Path,
518    patch_entries: Iterable[PatchEntry],
519) -> Tuple[List[PatchEntry], List[PatchEntry]]:
520    """Test-able helper for UpdateVersionRanges.
521
522    Args:
523      svn_version: LLVM revision number.
524      llvm_src_dir: llvm-project directory path.
525      patch_entries: PatchEntry objects to modify.
526
527    Returns:
528      Tuple of (modified entries, applied patches)
529
530    Post:
531      Modifies patch_entries in place.
532    """
533    modified_entries: List[PatchEntry] = []
534    applied_patches: List[PatchEntry] = []
535    active_patches = (pe for pe in patch_entries if not pe.is_old(svn_version))
536    with git_clean_context(llvm_src_dir):
537        for pe in active_patches:
538            test_result = pe.test_apply(llvm_src_dir)
539            if not test_result:
540                if pe.version_range is None:
541                    pe.version_range = {}
542                pe.version_range["until"] = svn_version
543                modified_entries.append(pe)
544            else:
545                # We have to actually apply the patch so that future patches
546                # will stack properly.
547                if not pe.apply(llvm_src_dir).succeeded:
548                    raise RuntimeError(
549                        "Could not apply patch that dry ran successfully"
550                    )
551                applied_patches.append(pe)
552
553    return modified_entries, applied_patches
554
555
556def remove_old_patches(
557    svn_version: int, llvm_src_dir: Path, patches_json_fp: Path
558) -> PatchInfo:
559    """Remove patches that don't and will never apply for the future.
560
561    Patches are determined to be "old" via the "is_old" method for
562    each patch entry.
563
564    Args:
565      svn_version: LLVM SVN version.
566      llvm_src_dir: LLVM source directory.
567      patches_json_fp: Location to edit patches on.
568
569    Returns:
570      PatchInfo for modified patches.
571    """
572    with patches_json_fp.open(encoding="utf-8") as f:
573        patches_list = json.load(f)
574    patch_entries = (
575        PatchEntry.from_dict(llvm_src_dir, elem) for elem in patches_list
576    )
577    oldness = [(entry, entry.is_old(svn_version)) for entry in patch_entries]
578    filtered_entries = [entry.to_dict() for entry, old in oldness if not old]
579    with atomic_write(patches_json_fp, encoding="utf-8") as f:
580        _write_json_changes(filtered_entries, f)
581    removed_entries = [entry for entry, old in oldness if old]
582    plural_patches = "patch" if len(removed_entries) == 1 else "patches"
583    print(f"Removed {len(removed_entries)} old {plural_patches}:")
584    for r in removed_entries:
585        print(f"- {r.rel_patch_path}: {r.title()}")
586
587    return PatchInfo(
588        non_applicable_patches=[],
589        applied_patches=[],
590        failed_patches=[],
591        disabled_patches=[],
592        removed_patches=[p.rel_patch_path for p in removed_entries],
593        modified_metadata=str(patches_json_fp) if removed_entries else None,
594    )
595