• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright © 2020 - 2022 Collabora Ltd.
3# Authors:
4#   Tomeu Vizoso <tomeu.vizoso@collabora.com>
5#   David Heidelberg <david.heidelberg@collabora.com>
6#   Guilherme Gallo <guilherme.gallo@collabora.com>
7#
8# SPDX-License-Identifier: MIT
9'''Shared functions between the scripts.'''
10
11import logging
12import os
13import re
14import time
15from functools import cache
16from pathlib import Path
17
18GITLAB_URL = "https://gitlab.freedesktop.org"
19TOKEN_DIR = Path(os.getenv("XDG_CONFIG_HOME") or Path.home() / ".config")
20
21# Known GitLab token prefixes: https://docs.gitlab.com/ee/security/token_overview.html#token-prefixes
22TOKEN_PREFIXES: dict[str, str] = {
23    "Personal access token": "glpat-",
24    "OAuth Application Secret": "gloas-",
25    "Deploy token": "gldt-",
26    "Runner authentication token": "glrt-",
27    "CI/CD Job token": "glcbt-",
28    "Trigger token": "glptt-",
29    "Feed token": "glft-",
30    "Incoming mail token": "glimt-",
31    "GitLab Agent for Kubernetes token": "glagent-",
32    "SCIM Tokens": "glsoat-",
33}
34
35
36@cache
37def print_once(*args, **kwargs):
38    """Print without spamming the output"""
39    print(*args, **kwargs)
40
41
42def pretty_duration(seconds):
43    """Pretty print duration"""
44    hours, rem = divmod(seconds, 3600)
45    minutes, seconds = divmod(rem, 60)
46    if hours:
47        return f"{hours:0.0f}h{minutes:02.0f}m{seconds:02.0f}s"
48    if minutes:
49        return f"{minutes:0.0f}m{seconds:02.0f}s"
50    return f"{seconds:0.0f}s"
51
52
53def get_gitlab_pipeline_from_url(gl, pipeline_url) -> tuple:
54    """
55    Extract the project and pipeline object from the url string
56    :param gl: Gitlab object
57    :param pipeline_url: string with a url to a pipeline
58    :return: ProjectPipeline, Project objects
59    """
60    pattern = rf"^{re.escape(GITLAB_URL)}/(.*)/-/pipelines/([0-9]+)$"
61    match = re.match(pattern, pipeline_url)
62    if not match:
63        raise AssertionError(f"url {pipeline_url} doesn't follow the pattern {pattern}")
64    namespace_with_project, pipeline_id = match.groups()
65    cur_project = gl.projects.get(namespace_with_project)
66    pipe = cur_project.pipelines.get(pipeline_id)
67    return pipe, cur_project
68
69
70def get_gitlab_project(glab, name: str):
71    """Finds a specified gitlab project for given user"""
72    if "/" in name:
73        project_path = name
74    else:
75        glab.auth()
76        username = glab.user.username
77        project_path = f"{username}/{name}"
78    return glab.projects.get(project_path)
79
80
81def get_token_from_default_dir() -> str:
82    """
83    Retrieves the GitLab token from the default directory.
84
85    Returns:
86        str: The path to the GitLab token file.
87
88    Raises:
89        FileNotFoundError: If the token file is not found.
90    """
91    token_file = TOKEN_DIR / "gitlab-token"
92    try:
93        return str(token_file.resolve())
94    except FileNotFoundError as ex:
95        print(
96            f"Could not find {token_file}, please provide a token file as an argument"
97        )
98        raise ex
99
100
101def validate_gitlab_token(token: str) -> bool:
102    # Match against recognised token prefixes
103    token_suffix = None
104    for token_type, token_prefix in TOKEN_PREFIXES.items():
105        if token.startswith(token_prefix):
106            logging.info(f"Found probable token type: {token_type}")
107            token_suffix = token[len(token_prefix):]
108            break
109
110    if not token_suffix:
111        return False
112
113    # Basic validation of the token suffix based on:
114    # https://gitlab.com/gitlab-org/gitlab/-/blob/master/gems/gitlab-secret_detection/lib/gitleaks.toml
115    if not re.match(r"(\w+-)?[0-9a-zA-Z_\-]{20,64}", token_suffix):
116        return False
117
118    return True
119
120
121def get_token_from_arg(token_arg: str | Path | None) -> str | None:
122    if not token_arg:
123        logging.info("No token provided.")
124        return None
125
126    token_path = Path(token_arg)
127    if token_path.is_file():
128        return read_token_from_file(token_path)
129
130    return handle_direct_token(token_path, token_arg)
131
132
133def read_token_from_file(token_path: Path) -> str:
134    token = token_path.read_text().strip()
135    logging.info(f"Token read from file: {token_path}")
136    return token
137
138
139def handle_direct_token(token_path: Path, token_arg: str | Path) -> str | None:
140    if token_path == Path(get_token_from_default_dir()):
141        logging.warning(
142            f"The default token file {token_path} was not found. "
143            "Please provide a token file or a token directly via --token arg."
144        )
145        return None
146    logging.info("Token provided directly as an argument.")
147    return str(token_arg)
148
149
150def read_token(token_arg: str | Path | None) -> str | None:
151    token = get_token_from_arg(token_arg)
152    if token and not validate_gitlab_token(token):
153        logging.warning("The provided token is either an old token or does not seem to "
154                        "be a valid token.")
155        logging.warning("Newer tokens are the ones created from a Gitlab 14.5+ instance.")
156        logging.warning("See https://about.gitlab.com/releases/2021/11/22/"
157                        "gitlab-14-5-released/"
158                        "#new-gitlab-access-token-prefix-and-detection")
159    return token
160
161
162def wait_for_pipeline(projects, sha: str, timeout=None):
163    """await until pipeline appears in Gitlab"""
164    project_names = [project.path_with_namespace for project in projects]
165    print(f"⏲ for the pipeline to appear in {project_names}..", end="")
166    start_time = time.time()
167    while True:
168        for project in projects:
169            pipelines = project.pipelines.list(sha=sha)
170            if pipelines:
171                print("", flush=True)
172                return (pipelines[0], project)
173        print("", end=".", flush=True)
174        if timeout and time.time() - start_time > timeout:
175            print(" not found", flush=True)
176            return (None, None)
177        time.sleep(1)
178