1#!/usr/bin/env python3 2# Copyright © 2020 - 2022 Collabora Ltd. 3# Authors: 4# Tomeu Vizoso <tomeu.vizoso@collabora.com> 5# David Heidelberg <david.heidelberg@collabora.com> 6# 7# For the dependencies, see the requirements.txt 8# SPDX-License-Identifier: MIT 9 10""" 11Helper script to restrict running only required CI jobs 12and show the job(s) logs. 13""" 14 15import argparse 16import re 17import sys 18import time 19from collections import defaultdict 20from concurrent.futures import ThreadPoolExecutor 21from functools import partial 22from itertools import chain 23from subprocess import check_output, CalledProcessError 24from typing import TYPE_CHECKING, Iterable, Literal, Optional 25 26import gitlab 27import gitlab.v4.objects 28from colorama import Fore, Style 29from gitlab_common import ( 30 GITLAB_URL, 31 TOKEN_DIR, 32 get_gitlab_pipeline_from_url, 33 get_gitlab_project, 34 get_token_from_default_dir, 35 pretty_duration, 36 read_token, 37 wait_for_pipeline, 38) 39from gitlab_gql import GitlabGQL, create_job_needs_dag, filter_dag, print_dag 40 41if TYPE_CHECKING: 42 from gitlab_gql import Dag 43 44REFRESH_WAIT_LOG = 10 45REFRESH_WAIT_JOBS = 6 46 47URL_START = "\033]8;;" 48URL_END = "\033]8;;\a" 49 50STATUS_COLORS = { 51 "created": "", 52 "running": Fore.BLUE, 53 "success": Fore.GREEN, 54 "failed": Fore.RED, 55 "canceled": Fore.MAGENTA, 56 "manual": "", 57 "pending": "", 58 "skipped": "", 59} 60 61COMPLETED_STATUSES = ["success", "failed"] 62 63 64def print_job_status(job, new_status=False) -> None: 65 """It prints a nice, colored job status with a link to the job.""" 66 if job.status == "canceled": 67 return 68 69 if new_status and job.status == "created": 70 return 71 72 if job.duration: 73 duration = job.duration 74 elif job.started_at: 75 duration = time.perf_counter() - time.mktime(job.started_at.timetuple()) 76 77 print( 78 STATUS_COLORS[job.status] 79 + " job " 80 + URL_START 81 + f"{job.web_url}\a{job.name}" 82 + URL_END 83 + (f" has new status: {job.status}" if new_status else f" :: {job.status}") 84 + (f" ({pretty_duration(duration)})" if job.started_at else "") 85 + Style.RESET_ALL 86 ) 87 88 89def pretty_wait(sec: int) -> None: 90 """shows progressbar in dots""" 91 for val in range(sec, 0, -1): 92 print(f"⏲ {val} seconds", end="\r") 93 time.sleep(1) 94 95 96def monitor_pipeline( 97 project, 98 pipeline, 99 target_jobs_regex: re.Pattern, 100 dependencies, 101 force_manual: bool, 102 stress: int, 103) -> tuple[Optional[int], Optional[int]]: 104 """Monitors pipeline and delegate canceling jobs""" 105 statuses: dict[str, str] = defaultdict(str) 106 target_statuses: dict[str, str] = defaultdict(str) 107 stress_status_counter = defaultdict(lambda: defaultdict(int)) 108 target_id = None 109 110 while True: 111 deps_failed = [] 112 to_cancel = [] 113 for job in pipeline.jobs.list(all=True, sort="desc"): 114 # target jobs 115 if target_jobs_regex.fullmatch(job.name): 116 target_id = job.id 117 118 if stress and job.status in ["success", "failed"]: 119 if ( 120 stress < 0 121 or sum(stress_status_counter[job.name].values()) < stress 122 ): 123 job = enable_job(project, pipeline, job, "retry", force_manual) 124 stress_status_counter[job.name][job.status] += 1 125 else: 126 job = enable_job(project, pipeline, job, "target", force_manual) 127 128 print_job_status(job, job.status not in target_statuses[job.name]) 129 target_statuses[job.name] = job.status 130 continue 131 132 # all jobs 133 if job.status != statuses[job.name]: 134 print_job_status(job, True) 135 statuses[job.name] = job.status 136 137 # run dependencies and cancel the rest 138 if job.name in dependencies: 139 job = enable_job(project, pipeline, job, "dep", True) 140 if job.status == "failed": 141 deps_failed.append(job.name) 142 else: 143 to_cancel.append(job) 144 145 cancel_jobs(project, to_cancel) 146 147 if stress: 148 enough = True 149 for job_name, status in stress_status_counter.items(): 150 print( 151 f"{job_name}\tsucc: {status['success']}; " 152 f"fail: {status['failed']}; " 153 f"total: {sum(status.values())} of {stress}", 154 flush=False, 155 ) 156 if stress < 0 or sum(status.values()) < stress: 157 enough = False 158 159 if not enough: 160 pretty_wait(REFRESH_WAIT_JOBS) 161 continue 162 163 print("---------------------------------", flush=False) 164 165 if len(target_statuses) == 1 and {"running"}.intersection( 166 target_statuses.values() 167 ): 168 return target_id, None 169 170 if ( 171 {"failed"}.intersection(target_statuses.values()) 172 and not set(["running", "pending"]).intersection(target_statuses.values()) 173 ): 174 return None, 1 175 176 if ( 177 {"skipped"}.intersection(target_statuses.values()) 178 and not {"running", "pending"}.intersection(target_statuses.values()) 179 ): 180 print( 181 Fore.RED, 182 "Target in skipped state, aborting. Failed dependencies:", 183 deps_failed, 184 Fore.RESET, 185 ) 186 return None, 1 187 188 if {"success", "manual"}.issuperset(target_statuses.values()): 189 return None, 0 190 191 pretty_wait(REFRESH_WAIT_JOBS) 192 193 194def get_pipeline_job( 195 pipeline: gitlab.v4.objects.ProjectPipeline, 196 id: int, 197) -> gitlab.v4.objects.ProjectPipelineJob: 198 pipeline_jobs = pipeline.jobs.list(all=True) 199 return [j for j in pipeline_jobs if j.id == id][0] 200 201 202def enable_job( 203 project: gitlab.v4.objects.Project, 204 pipeline: gitlab.v4.objects.ProjectPipeline, 205 job: gitlab.v4.objects.ProjectPipelineJob, 206 action_type: Literal["target", "dep", "retry"], 207 force_manual: bool, 208) -> gitlab.v4.objects.ProjectPipelineJob: 209 """enable job""" 210 if ( 211 (job.status in ["success", "failed"] and action_type != "retry") 212 or (job.status == "manual" and not force_manual) 213 or job.status in ["skipped", "running", "created", "pending"] 214 ): 215 return job 216 217 pjob = project.jobs.get(job.id, lazy=True) 218 219 if job.status in ["success", "failed", "canceled"]: 220 new_job = pjob.retry() 221 job = get_pipeline_job(pipeline, new_job["id"]) 222 else: 223 pjob.play() 224 job = get_pipeline_job(pipeline, pjob.id) 225 226 if action_type == "target": 227 jtype = " " 228 elif action_type == "retry": 229 jtype = "↻" 230 else: 231 jtype = "(dependency)" 232 233 print(Fore.MAGENTA + f"{jtype} job {job.name} manually enabled" + Style.RESET_ALL) 234 235 return job 236 237 238def cancel_job(project, job) -> None: 239 """Cancel GitLab job""" 240 if job.status in [ 241 "canceled", 242 "success", 243 "failed", 244 "skipped", 245 ]: 246 return 247 pjob = project.jobs.get(job.id, lazy=True) 248 pjob.cancel() 249 print(f"♲ {job.name}", end=" ") 250 251 252def cancel_jobs(project, to_cancel) -> None: 253 """Cancel unwanted GitLab jobs""" 254 if not to_cancel: 255 return 256 257 with ThreadPoolExecutor(max_workers=6) as exe: 258 part = partial(cancel_job, project) 259 exe.map(part, to_cancel) 260 print() 261 262 263def print_log(project, job_id) -> None: 264 """Print job log into output""" 265 printed_lines = 0 266 while True: 267 job = project.jobs.get(job_id) 268 269 # GitLab's REST API doesn't offer pagination for logs, so we have to refetch it all 270 lines = job.trace().decode("raw_unicode_escape").splitlines() 271 for line in lines[printed_lines:]: 272 print(line) 273 printed_lines = len(lines) 274 275 if job.status in COMPLETED_STATUSES: 276 print(Fore.GREEN + f"Job finished: {job.web_url}" + Style.RESET_ALL) 277 return 278 pretty_wait(REFRESH_WAIT_LOG) 279 280 281def parse_args() -> None: 282 """Parse args""" 283 parser = argparse.ArgumentParser( 284 description="Tool to trigger a subset of container jobs " 285 + "and monitor the progress of a test job", 286 epilog="Example: mesa-monitor.py --rev $(git rev-parse HEAD) " 287 + '--target ".*traces" ', 288 ) 289 parser.add_argument( 290 "--target", 291 metavar="target-job", 292 help="Target job regex. For multiple targets, separate with pipe | character", 293 required=True, 294 nargs=argparse.ONE_OR_MORE, 295 ) 296 parser.add_argument( 297 "--token", 298 metavar="token", 299 type=str, 300 default=get_token_from_default_dir(), 301 help="Use the provided GitLab token or token file, " 302 f"otherwise it's read from {TOKEN_DIR / 'gitlab-token'}", 303 ) 304 parser.add_argument( 305 "--force-manual", action="store_true", help="Force jobs marked as manual" 306 ) 307 parser.add_argument( 308 "--stress", 309 default=0, 310 type=int, 311 help="Stresstest job(s). Number or repetitions or -1 for infinite.", 312 ) 313 parser.add_argument( 314 "--project", 315 default="mesa", 316 help="GitLab project in the format <user>/<project> or just <project>", 317 ) 318 319 mutex_group1 = parser.add_mutually_exclusive_group() 320 mutex_group1.add_argument( 321 "--rev", default="HEAD", metavar="revision", help="repository git revision (default: HEAD)" 322 ) 323 mutex_group1.add_argument( 324 "--pipeline-url", 325 help="URL of the pipeline to use, instead of auto-detecting it.", 326 ) 327 mutex_group1.add_argument( 328 "--mr", 329 type=int, 330 help="ID of a merge request; the latest pipeline in that MR will be used.", 331 ) 332 333 args = parser.parse_args() 334 335 # argparse doesn't support groups inside add_mutually_exclusive_group(), 336 # which means we can't just put `--project` and `--rev` in a group together, 337 # we have to do this by heand instead. 338 if args.pipeline_url and args.project != parser.get_default("project"): 339 # weird phrasing but it's the error add_mutually_exclusive_group() gives 340 parser.error("argument --project: not allowed with argument --pipeline-url") 341 342 return args 343 344 345def print_detected_jobs( 346 target_dep_dag: "Dag", dependency_jobs: Iterable[str], target_jobs: Iterable[str] 347) -> None: 348 def print_job_set(color: str, kind: str, job_set: Iterable[str]): 349 print( 350 color + f"Running {len(job_set)} {kind} jobs: ", 351 "\n", 352 ", ".join(sorted(job_set)), 353 Fore.RESET, 354 "\n", 355 ) 356 357 print(Fore.YELLOW + "Detected target job and its dependencies:", "\n") 358 print_dag(target_dep_dag) 359 print_job_set(Fore.MAGENTA, "dependency", dependency_jobs) 360 print_job_set(Fore.BLUE, "target", target_jobs) 361 362 363def find_dependencies(token: str | None, 364 target_jobs_regex: re.Pattern, 365 project_path: str, 366 iid: int) -> set[str]: 367 """ 368 Find the dependencies of the target jobs in a GitLab pipeline. 369 370 This function uses the GitLab GraphQL API to fetch the job dependency graph 371 of a pipeline, filters the graph to only include the target jobs and their 372 dependencies, and returns the names of these jobs. 373 374 Args: 375 token (str | None): The GitLab API token. If None, the API is accessed without 376 authentication. 377 target_jobs_regex (re.Pattern): A regex pattern to match the names of the target jobs. 378 project_path (str): The path of the GitLab project. 379 iid (int): The internal ID of the pipeline. 380 381 Returns: 382 set[str]: A set of the names of the target jobs and their dependencies. 383 384 Raises: 385 SystemExit: If no target jobs are found in the pipeline. 386 """ 387 gql_instance = GitlabGQL(token=token) 388 dag = create_job_needs_dag( 389 gql_instance, {"projectPath": project_path.path_with_namespace, "iid": iid} 390 ) 391 392 target_dep_dag = filter_dag(dag, target_jobs_regex) 393 if not target_dep_dag: 394 print(Fore.RED + "The job(s) were not found in the pipeline." + Fore.RESET) 395 sys.exit(1) 396 397 dependency_jobs = set(chain.from_iterable(d["needs"] for d in target_dep_dag.values())) 398 target_jobs = set(target_dep_dag.keys()) 399 print_detected_jobs(target_dep_dag, dependency_jobs, target_jobs) 400 return target_jobs.union(dependency_jobs) 401 402 403if __name__ == "__main__": 404 try: 405 t_start = time.perf_counter() 406 407 args = parse_args() 408 409 token = read_token(args.token) 410 411 gl = gitlab.Gitlab(url=GITLAB_URL, 412 private_token=token, 413 retry_transient_errors=True) 414 415 REV: str = args.rev 416 417 if args.pipeline_url: 418 pipe, cur_project = get_gitlab_pipeline_from_url(gl, args.pipeline_url) 419 REV = pipe.sha 420 else: 421 mesa_project = gl.projects.get("mesa/mesa") 422 projects = [mesa_project] 423 if args.mr: 424 REV = mesa_project.mergerequests.get(args.mr).sha 425 else: 426 REV = check_output(['git', 'rev-parse', REV]).decode('ascii').strip() 427 428 if args.rev == 'HEAD': 429 try: 430 branch_name = check_output([ 431 'git', 'symbolic-ref', '-q', 'HEAD', 432 ]).decode('ascii').strip() 433 except CalledProcessError: 434 branch_name = "" 435 436 # Ignore detached heads 437 if branch_name: 438 tracked_remote = check_output([ 439 'git', 'for-each-ref', '--format=%(upstream)', 440 branch_name, 441 ]).decode('ascii').strip() 442 443 # Ignore local branches that do not track any remote 444 if tracked_remote: 445 remote_rev = check_output([ 446 'git', 'rev-parse', tracked_remote, 447 ]).decode('ascii').strip() 448 449 if REV != remote_rev: 450 print( 451 f"Local HEAD commit {REV[:10]} is different than " 452 f"tracked remote HEAD commit {remote_rev[:10]}" 453 ) 454 print("Did you forget to `git push` ?") 455 456 projects.append(get_gitlab_project(gl, args.project)) 457 (pipe, cur_project) = wait_for_pipeline(projects, REV) 458 459 print(f"Revision: {REV}") 460 print(f"Pipeline: {pipe.web_url}") 461 462 target = '|'.join(args.target) 463 target = target.strip() 464 465 deps = set() 466 print(" job: " + Fore.BLUE + target + Style.RESET_ALL) 467 468 # Implicitly include `parallel:` jobs 469 target = f'({target})' + r'( \d+/\d+)?' 470 471 target_jobs_regex = re.compile(target) 472 473 deps = find_dependencies( 474 token=token, 475 target_jobs_regex=target_jobs_regex, 476 iid=pipe.iid, 477 project_path=cur_project 478 ) 479 target_job_id, ret = monitor_pipeline( 480 cur_project, pipe, target_jobs_regex, deps, args.force_manual, args.stress 481 ) 482 483 if target_job_id: 484 print_log(cur_project, target_job_id) 485 486 t_end = time.perf_counter() 487 spend_minutes = (t_end - t_start) / 60 488 print(f"⏲ Duration of script execution: {spend_minutes:0.1f} minutes") 489 490 sys.exit(ret) 491 except KeyboardInterrupt: 492 sys.exit(1) 493