• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Checks for consistency of jobs between different GitHub workflows.
2
3Any job with a specific `sync-tag` must match all other jobs with the same `sync-tag`.
4"""
5
6from __future__ import annotations
7
8import argparse
9import itertools
10import json
11from collections import defaultdict
12from enum import Enum
13from pathlib import Path
14from typing import Any, Iterable, NamedTuple
15
16from yaml import dump, load
17
18
19# Safely load fast C Yaml loader/dumper if they are available
20try:
21    from yaml import CSafeLoader as Loader
22except ImportError:
23    from yaml import SafeLoader as Loader  # type: ignore[assignment, misc]
24
25
26class LintSeverity(str, Enum):
27    ERROR = "error"
28    WARNING = "warning"
29    ADVICE = "advice"
30    DISABLED = "disabled"
31
32
33class LintMessage(NamedTuple):
34    path: str | None
35    line: int | None
36    char: int | None
37    code: str
38    severity: LintSeverity
39    name: str
40    original: str | None
41    replacement: str | None
42    description: str | None
43
44
45def glob_yamls(path: Path) -> Iterable[Path]:
46    return itertools.chain(path.glob("**/*.yml"), path.glob("**/*.yaml"))
47
48
49def load_yaml(path: Path) -> Any:
50    with open(path) as f:
51        return load(f, Loader)
52
53
54def is_workflow(yaml: Any) -> bool:
55    return yaml.get("jobs") is not None
56
57
58def print_lint_message(path: Path, job: dict[str, Any], sync_tag: str) -> None:
59    job_id = next(iter(job.keys()))
60    with open(path) as f:
61        lines = f.readlines()
62    for i, line in enumerate(lines):
63        if f"{job_id}:" in line:
64            line_number = i + 1
65
66    lint_message = LintMessage(
67        path=str(path),
68        line=line_number,
69        char=None,
70        code="WORKFLOWSYNC",
71        severity=LintSeverity.ERROR,
72        name="workflow-inconsistency",
73        original=None,
74        replacement=None,
75        description=f"Job doesn't match other jobs with sync-tag: '{sync_tag}'",
76    )
77    print(json.dumps(lint_message._asdict()), flush=True)
78
79
80if __name__ == "__main__":
81    parser = argparse.ArgumentParser(
82        description="workflow consistency linter.",
83        fromfile_prefix_chars="@",
84    )
85    parser.add_argument(
86        "filenames",
87        nargs="+",
88        help="paths to lint",
89    )
90    args = parser.parse_args()
91
92    # Go through the provided files, aggregating jobs with the same sync tag
93    tag_to_jobs = defaultdict(list)
94    for path in args.filenames:
95        workflow = load_yaml(Path(path))
96        jobs = workflow["jobs"]
97        for job_id, job in jobs.items():
98            try:
99                sync_tag = job["with"]["sync-tag"]
100            except KeyError:
101                continue
102
103            # remove the "if" field, which we allow to be different between jobs
104            # (since you might have different triggering conditions on pull vs.
105            # trunk, say.)
106            if "if" in job:
107                del job["if"]
108
109            # same is true for ['with']['test-matrix']
110            if "test-matrix" in job.get("with", {}):
111                del job["with"]["test-matrix"]
112
113            tag_to_jobs[sync_tag].append((path, {job_id: job}))
114
115    # For each sync tag, check that all the jobs have the same code.
116    for sync_tag, path_and_jobs in tag_to_jobs.items():
117        baseline_path, baseline_dict = path_and_jobs.pop()
118        baseline_str = dump(baseline_dict)
119
120        printed_baseline = False
121
122        for path, job_dict in path_and_jobs:
123            job_str = dump(job_dict)
124            if baseline_str != job_str:
125                print_lint_message(path, job_dict, sync_tag)
126
127                if not printed_baseline:
128                    print_lint_message(baseline_path, baseline_dict, sync_tag)
129                    printed_baseline = True
130