• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright © 2020 - 2022 Collabora Ltd.
3# Authors:
4#   Tomeu Vizoso <tomeu.vizoso@collabora.com>
5#   David Heidelberg <david.heidelberg@collabora.com>
6#
7# For the dependencies, see the requirements.txt
8# SPDX-License-Identifier: MIT
9
10"""
11Helper script to restrict running only required CI jobs
12and show the job(s) logs.
13"""
14
15import argparse
16import re
17import sys
18import time
19from collections import defaultdict
20from concurrent.futures import ThreadPoolExecutor
21from functools import partial
22from itertools import chain
23from subprocess import check_output, CalledProcessError
24from typing import TYPE_CHECKING, Iterable, Literal, Optional
25
26import gitlab
27import gitlab.v4.objects
28from colorama import Fore, Style
29from gitlab_common import (
30    GITLAB_URL,
31    TOKEN_DIR,
32    get_gitlab_pipeline_from_url,
33    get_gitlab_project,
34    get_token_from_default_dir,
35    pretty_duration,
36    read_token,
37    wait_for_pipeline,
38)
39from gitlab_gql import GitlabGQL, create_job_needs_dag, filter_dag, print_dag
40
41if TYPE_CHECKING:
42    from gitlab_gql import Dag
43
44REFRESH_WAIT_LOG = 10
45REFRESH_WAIT_JOBS = 6
46
47URL_START = "\033]8;;"
48URL_END = "\033]8;;\a"
49
50STATUS_COLORS = {
51    "created": "",
52    "running": Fore.BLUE,
53    "success": Fore.GREEN,
54    "failed": Fore.RED,
55    "canceled": Fore.MAGENTA,
56    "manual": "",
57    "pending": "",
58    "skipped": "",
59}
60
61COMPLETED_STATUSES = ["success", "failed"]
62
63
64def print_job_status(job, new_status=False) -> None:
65    """It prints a nice, colored job status with a link to the job."""
66    if job.status == "canceled":
67        return
68
69    if new_status and job.status == "created":
70        return
71
72    if job.duration:
73        duration = job.duration
74    elif job.started_at:
75        duration = time.perf_counter() - time.mktime(job.started_at.timetuple())
76
77    print(
78        STATUS_COLORS[job.status]
79        + "�� job "
80        + URL_START
81        + f"{job.web_url}\a{job.name}"
82        + URL_END
83        + (f" has new status: {job.status}" if new_status else f" :: {job.status}")
84        + (f" ({pretty_duration(duration)})" if job.started_at else "")
85        + Style.RESET_ALL
86    )
87
88
89def pretty_wait(sec: int) -> None:
90    """shows progressbar in dots"""
91    for val in range(sec, 0, -1):
92        print(f"⏲  {val} seconds", end="\r")
93        time.sleep(1)
94
95
96def monitor_pipeline(
97    project,
98    pipeline,
99    target_jobs_regex: re.Pattern,
100    dependencies,
101    force_manual: bool,
102    stress: int,
103) -> tuple[Optional[int], Optional[int]]:
104    """Monitors pipeline and delegate canceling jobs"""
105    statuses: dict[str, str] = defaultdict(str)
106    target_statuses: dict[str, str] = defaultdict(str)
107    stress_status_counter = defaultdict(lambda: defaultdict(int))
108    target_id = None
109
110    while True:
111        deps_failed = []
112        to_cancel = []
113        for job in pipeline.jobs.list(all=True, sort="desc"):
114            # target jobs
115            if target_jobs_regex.fullmatch(job.name):
116                target_id = job.id
117
118                if stress and job.status in ["success", "failed"]:
119                    if (
120                        stress < 0
121                        or sum(stress_status_counter[job.name].values()) < stress
122                    ):
123                        job = enable_job(project, pipeline, job, "retry", force_manual)
124                        stress_status_counter[job.name][job.status] += 1
125                else:
126                    job = enable_job(project, pipeline, job, "target", force_manual)
127
128                print_job_status(job, job.status not in target_statuses[job.name])
129                target_statuses[job.name] = job.status
130                continue
131
132            # all jobs
133            if job.status != statuses[job.name]:
134                print_job_status(job, True)
135                statuses[job.name] = job.status
136
137            # run dependencies and cancel the rest
138            if job.name in dependencies:
139                job = enable_job(project, pipeline, job, "dep", True)
140                if job.status == "failed":
141                    deps_failed.append(job.name)
142            else:
143                to_cancel.append(job)
144
145        cancel_jobs(project, to_cancel)
146
147        if stress:
148            enough = True
149            for job_name, status in stress_status_counter.items():
150                print(
151                    f"{job_name}\tsucc: {status['success']}; "
152                    f"fail: {status['failed']}; "
153                    f"total: {sum(status.values())} of {stress}",
154                    flush=False,
155                )
156                if stress < 0 or sum(status.values()) < stress:
157                    enough = False
158
159            if not enough:
160                pretty_wait(REFRESH_WAIT_JOBS)
161                continue
162
163        print("---------------------------------", flush=False)
164
165        if len(target_statuses) == 1 and {"running"}.intersection(
166            target_statuses.values()
167        ):
168            return target_id, None
169
170        if (
171            {"failed"}.intersection(target_statuses.values())
172            and not set(["running", "pending"]).intersection(target_statuses.values())
173        ):
174            return None, 1
175
176        if (
177            {"skipped"}.intersection(target_statuses.values())
178            and not {"running", "pending"}.intersection(target_statuses.values())
179        ):
180            print(
181                Fore.RED,
182                "Target in skipped state, aborting. Failed dependencies:",
183                deps_failed,
184                Fore.RESET,
185            )
186            return None, 1
187
188        if {"success", "manual"}.issuperset(target_statuses.values()):
189            return None, 0
190
191        pretty_wait(REFRESH_WAIT_JOBS)
192
193
194def get_pipeline_job(
195    pipeline: gitlab.v4.objects.ProjectPipeline,
196    id: int,
197) -> gitlab.v4.objects.ProjectPipelineJob:
198    pipeline_jobs = pipeline.jobs.list(all=True)
199    return [j for j in pipeline_jobs if j.id == id][0]
200
201
202def enable_job(
203    project: gitlab.v4.objects.Project,
204    pipeline: gitlab.v4.objects.ProjectPipeline,
205    job: gitlab.v4.objects.ProjectPipelineJob,
206    action_type: Literal["target", "dep", "retry"],
207    force_manual: bool,
208) -> gitlab.v4.objects.ProjectPipelineJob:
209    """enable job"""
210    if (
211        (job.status in ["success", "failed"] and action_type != "retry")
212        or (job.status == "manual" and not force_manual)
213        or job.status in ["skipped", "running", "created", "pending"]
214    ):
215        return job
216
217    pjob = project.jobs.get(job.id, lazy=True)
218
219    if job.status in ["success", "failed", "canceled"]:
220        new_job = pjob.retry()
221        job = get_pipeline_job(pipeline, new_job["id"])
222    else:
223        pjob.play()
224        job = get_pipeline_job(pipeline, pjob.id)
225
226    if action_type == "target":
227        jtype = "�� "
228    elif action_type == "retry":
229        jtype = "↻"
230    else:
231        jtype = "(dependency)"
232
233    print(Fore.MAGENTA + f"{jtype} job {job.name} manually enabled" + Style.RESET_ALL)
234
235    return job
236
237
238def cancel_job(project, job) -> None:
239    """Cancel GitLab job"""
240    if job.status in [
241        "canceled",
242        "success",
243        "failed",
244        "skipped",
245    ]:
246        return
247    pjob = project.jobs.get(job.id, lazy=True)
248    pjob.cancel()
249    print(f"♲ {job.name}", end=" ")
250
251
252def cancel_jobs(project, to_cancel) -> None:
253    """Cancel unwanted GitLab jobs"""
254    if not to_cancel:
255        return
256
257    with ThreadPoolExecutor(max_workers=6) as exe:
258        part = partial(cancel_job, project)
259        exe.map(part, to_cancel)
260    print()
261
262
263def print_log(project, job_id) -> None:
264    """Print job log into output"""
265    printed_lines = 0
266    while True:
267        job = project.jobs.get(job_id)
268
269        # GitLab's REST API doesn't offer pagination for logs, so we have to refetch it all
270        lines = job.trace().decode("raw_unicode_escape").splitlines()
271        for line in lines[printed_lines:]:
272            print(line)
273        printed_lines = len(lines)
274
275        if job.status in COMPLETED_STATUSES:
276            print(Fore.GREEN + f"Job finished: {job.web_url}" + Style.RESET_ALL)
277            return
278        pretty_wait(REFRESH_WAIT_LOG)
279
280
281def parse_args() -> None:
282    """Parse args"""
283    parser = argparse.ArgumentParser(
284        description="Tool to trigger a subset of container jobs "
285        + "and monitor the progress of a test job",
286        epilog="Example: mesa-monitor.py --rev $(git rev-parse HEAD) "
287        + '--target ".*traces" ',
288    )
289    parser.add_argument(
290        "--target",
291        metavar="target-job",
292        help="Target job regex. For multiple targets, separate with pipe | character",
293        required=True,
294        nargs=argparse.ONE_OR_MORE,
295    )
296    parser.add_argument(
297        "--token",
298        metavar="token",
299        type=str,
300        default=get_token_from_default_dir(),
301        help="Use the provided GitLab token or token file, "
302             f"otherwise it's read from {TOKEN_DIR / 'gitlab-token'}",
303    )
304    parser.add_argument(
305        "--force-manual", action="store_true", help="Force jobs marked as manual"
306    )
307    parser.add_argument(
308        "--stress",
309        default=0,
310        type=int,
311        help="Stresstest job(s). Number or repetitions or -1 for infinite.",
312    )
313    parser.add_argument(
314        "--project",
315        default="mesa",
316        help="GitLab project in the format <user>/<project> or just <project>",
317    )
318
319    mutex_group1 = parser.add_mutually_exclusive_group()
320    mutex_group1.add_argument(
321        "--rev", default="HEAD", metavar="revision", help="repository git revision (default: HEAD)"
322    )
323    mutex_group1.add_argument(
324        "--pipeline-url",
325        help="URL of the pipeline to use, instead of auto-detecting it.",
326    )
327    mutex_group1.add_argument(
328        "--mr",
329        type=int,
330        help="ID of a merge request; the latest pipeline in that MR will be used.",
331    )
332
333    args = parser.parse_args()
334
335    # argparse doesn't support groups inside add_mutually_exclusive_group(),
336    # which means we can't just put `--project` and `--rev` in a group together,
337    # we have to do this by heand instead.
338    if args.pipeline_url and args.project != parser.get_default("project"):
339        # weird phrasing but it's the error add_mutually_exclusive_group() gives
340        parser.error("argument --project: not allowed with argument --pipeline-url")
341
342    return args
343
344
345def print_detected_jobs(
346    target_dep_dag: "Dag", dependency_jobs: Iterable[str], target_jobs: Iterable[str]
347) -> None:
348    def print_job_set(color: str, kind: str, job_set: Iterable[str]):
349        print(
350            color + f"Running {len(job_set)} {kind} jobs: ",
351            "\n",
352            ", ".join(sorted(job_set)),
353            Fore.RESET,
354            "\n",
355        )
356
357    print(Fore.YELLOW + "Detected target job and its dependencies:", "\n")
358    print_dag(target_dep_dag)
359    print_job_set(Fore.MAGENTA, "dependency", dependency_jobs)
360    print_job_set(Fore.BLUE, "target", target_jobs)
361
362
363def find_dependencies(token: str | None,
364                      target_jobs_regex: re.Pattern,
365                      project_path: str,
366                      iid: int) -> set[str]:
367    """
368    Find the dependencies of the target jobs in a GitLab pipeline.
369
370    This function uses the GitLab GraphQL API to fetch the job dependency graph
371    of a pipeline, filters the graph to only include the target jobs and their
372    dependencies, and returns the names of these jobs.
373
374    Args:
375        token (str | None): The GitLab API token. If None, the API is accessed without
376                            authentication.
377        target_jobs_regex (re.Pattern): A regex pattern to match the names of the target jobs.
378        project_path (str): The path of the GitLab project.
379        iid (int): The internal ID of the pipeline.
380
381    Returns:
382        set[str]: A set of the names of the target jobs and their dependencies.
383
384    Raises:
385        SystemExit: If no target jobs are found in the pipeline.
386    """
387    gql_instance = GitlabGQL(token=token)
388    dag = create_job_needs_dag(
389        gql_instance, {"projectPath": project_path.path_with_namespace, "iid": iid}
390    )
391
392    target_dep_dag = filter_dag(dag, target_jobs_regex)
393    if not target_dep_dag:
394        print(Fore.RED + "The job(s) were not found in the pipeline." + Fore.RESET)
395        sys.exit(1)
396
397    dependency_jobs = set(chain.from_iterable(d["needs"] for d in target_dep_dag.values()))
398    target_jobs = set(target_dep_dag.keys())
399    print_detected_jobs(target_dep_dag, dependency_jobs, target_jobs)
400    return target_jobs.union(dependency_jobs)
401
402
403if __name__ == "__main__":
404    try:
405        t_start = time.perf_counter()
406
407        args = parse_args()
408
409        token = read_token(args.token)
410
411        gl = gitlab.Gitlab(url=GITLAB_URL,
412                           private_token=token,
413                           retry_transient_errors=True)
414
415        REV: str = args.rev
416
417        if args.pipeline_url:
418            pipe, cur_project = get_gitlab_pipeline_from_url(gl, args.pipeline_url)
419            REV = pipe.sha
420        else:
421            mesa_project = gl.projects.get("mesa/mesa")
422            projects = [mesa_project]
423            if args.mr:
424                REV = mesa_project.mergerequests.get(args.mr).sha
425            else:
426                REV = check_output(['git', 'rev-parse', REV]).decode('ascii').strip()
427
428                if args.rev == 'HEAD':
429                    try:
430                        branch_name = check_output([
431                            'git', 'symbolic-ref', '-q', 'HEAD',
432                        ]).decode('ascii').strip()
433                    except CalledProcessError:
434                        branch_name = ""
435
436                    # Ignore detached heads
437                    if branch_name:
438                        tracked_remote = check_output([
439                            'git', 'for-each-ref', '--format=%(upstream)',
440                            branch_name,
441                        ]).decode('ascii').strip()
442
443                        # Ignore local branches that do not track any remote
444                        if tracked_remote:
445                            remote_rev = check_output([
446                                'git', 'rev-parse', tracked_remote,
447                            ]).decode('ascii').strip()
448
449                            if REV != remote_rev:
450                                print(
451                                    f"Local HEAD commit {REV[:10]} is different than "
452                                    f"tracked remote HEAD commit {remote_rev[:10]}"
453                                )
454                                print("Did you forget to `git push` ?")
455
456                projects.append(get_gitlab_project(gl, args.project))
457            (pipe, cur_project) = wait_for_pipeline(projects, REV)
458
459        print(f"Revision: {REV}")
460        print(f"Pipeline: {pipe.web_url}")
461
462        target = '|'.join(args.target)
463        target = target.strip()
464
465        deps = set()
466        print("�� job: " + Fore.BLUE + target + Style.RESET_ALL)
467
468        # Implicitly include `parallel:` jobs
469        target = f'({target})' + r'( \d+/\d+)?'
470
471        target_jobs_regex = re.compile(target)
472
473        deps = find_dependencies(
474            token=token,
475            target_jobs_regex=target_jobs_regex,
476            iid=pipe.iid,
477            project_path=cur_project
478        )
479        target_job_id, ret = monitor_pipeline(
480            cur_project, pipe, target_jobs_regex, deps, args.force_manual, args.stress
481        )
482
483        if target_job_id:
484            print_log(cur_project, target_job_id)
485
486        t_end = time.perf_counter()
487        spend_minutes = (t_end - t_start) / 60
488        print(f"⏲ Duration of script execution: {spend_minutes:0.1f} minutes")
489
490        sys.exit(ret)
491    except KeyboardInterrupt:
492        sys.exit(1)
493