• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 - 2022 Collabora Limited
4# Authors:
5#     Gustavo Padovan <gustavo.padovan@collabora.com>
6#     Guilherme Gallo <guilherme.gallo@collabora.com>
7#
8# SPDX-License-Identifier: MIT
9
10"""Send a job to LAVA, track it and collect log back"""
11
12
13import argparse
14import contextlib
15import pathlib
16import re
17import sys
18import time
19import traceback
20import urllib.parse
21import xmlrpc.client
22from datetime import datetime, timedelta
23from os import getenv
24from typing import Any, Optional
25
26import lavacli
27import yaml
28from lava.exceptions import (
29    MesaCIException,
30    MesaCIKnownIssueException,
31    MesaCIParseException,
32    MesaCIRetryError,
33    MesaCITimeoutError,
34)
35from lava.utils import (
36    CONSOLE_LOG,
37    GitlabSection,
38    LogFollower,
39    LogSectionType,
40    fatal_err,
41    hide_sensitive_data,
42    print_log,
43)
44from lavacli.utils import loader
45
46# Timeout in seconds to decide if the device from the dispatched LAVA job has
47# hung or not due to the lack of new log output.
48DEVICE_HANGING_TIMEOUT_SEC = int(getenv("LAVA_DEVICE_HANGING_TIMEOUT_SEC",  5*60))
49
50# How many seconds the script should wait before try a new polling iteration to
51# check if the dispatched LAVA job is running or waiting in the job queue.
52WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 10))
53
54# How many seconds to wait between log output LAVA RPC calls.
55LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5))
56
57# How many retries should be made when a timeout happen.
58NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2))
59
60# How many attempts should be made when a timeout happen during LAVA device boot.
61NUMBER_OF_ATTEMPTS_LAVA_BOOT = int(getenv("LAVA_NUMBER_OF_ATTEMPTS_LAVA_BOOT", 3))
62
63
64def generate_lava_yaml(args):
65    # General metadata and permissions, plus also inexplicably kernel arguments
66    values = {
67        'job_name': 'mesa: {}'.format(args.pipeline_info),
68        'device_type': args.device_type,
69        'visibility': { 'group': [ args.visibility_group ] },
70        'priority': 75,
71        'context': {
72            'extra_nfsroot_args': ' init=/init rootwait usbcore.quirks=0bda:8153:k'
73        },
74        "timeouts": {
75            "job": {"minutes": args.job_timeout},
76            "action": {"minutes": 3},
77            "actions": {
78                "depthcharge-action": {
79                    "minutes": 3 * NUMBER_OF_ATTEMPTS_LAVA_BOOT,
80                }
81            }
82        },
83    }
84
85    if args.lava_tags:
86        values['tags'] = args.lava_tags.split(',')
87
88    # URLs to our kernel rootfs to boot from, both generated by the base
89    # container build
90    deploy = {
91      'timeout': { 'minutes': 10 },
92      'to': 'tftp',
93      'os': 'oe',
94      'kernel': {
95        'url': '{}/{}'.format(args.kernel_url_prefix, args.kernel_image_name),
96      },
97      'nfsrootfs': {
98        'url': '{}/lava-rootfs.tgz'.format(args.rootfs_url_prefix),
99        'compression': 'gz',
100      }
101    }
102    if args.kernel_image_type:
103        deploy['kernel']['type'] = args.kernel_image_type
104    if args.dtb:
105        deploy['dtb'] = {
106          'url': '{}/{}.dtb'.format(args.kernel_url_prefix, args.dtb)
107        }
108
109    # always boot over NFS
110    boot = {
111        "failure_retry": NUMBER_OF_ATTEMPTS_LAVA_BOOT,
112        "method": args.boot_method,
113        "commands": "nfs",
114        "prompts": ["lava-shell:"],
115    }
116
117    # skeleton test definition: only declaring each job as a single 'test'
118    # since LAVA's test parsing is not useful to us
119    run_steps = []
120    test = {
121      'timeout': { 'minutes': args.job_timeout },
122      'failure_retry': 1,
123      'definitions': [ {
124        'name': 'mesa',
125        'from': 'inline',
126        'lava-signal': 'kmsg',
127        'path': 'inline/mesa.yaml',
128        'repository': {
129          'metadata': {
130            'name': 'mesa',
131            'description': 'Mesa test plan',
132            'os': [ 'oe' ],
133            'scope': [ 'functional' ],
134            'format': 'Lava-Test Test Definition 1.0',
135          },
136          'run': {
137            "steps": run_steps
138          },
139        },
140      } ],
141    }
142
143    # job execution script:
144    #   - inline .gitlab-ci/common/init-stage1.sh
145    #   - fetch and unpack per-pipeline build artifacts from build job
146    #   - fetch and unpack per-job environment from lava-submit.sh
147    #   - exec .gitlab-ci/common/init-stage2.sh
148
149    with open(args.first_stage_init, 'r') as init_sh:
150      run_steps += [ x.rstrip() for x in init_sh if not x.startswith('#') and x.rstrip() ]
151
152    if args.jwt_file:
153        with open(args.jwt_file) as jwt_file:
154            run_steps += [
155                "set +x",
156                f'echo -n "{jwt_file.read()}" > "{args.jwt_file}"  # HIDEME',
157                "set -x",
158                f'echo "export CI_JOB_JWT_FILE={args.jwt_file}" >> /set-job-env-vars.sh',
159            ]
160    else:
161        run_steps += [
162            "echo Could not find jwt file, disabling MINIO requests...",
163            "sed -i '/MINIO_RESULTS_UPLOAD/d' /set-job-env-vars.sh",
164        ]
165
166    run_steps += [
167      'mkdir -p {}'.format(args.ci_project_dir),
168      'wget -S --progress=dot:giga -O- {} | tar -xz -C {}'.format(args.build_url, args.ci_project_dir),
169      'wget -S --progress=dot:giga -O- {} | tar -xz -C /'.format(args.job_rootfs_overlay_url),
170
171      # Sleep a bit to give time for bash to dump shell xtrace messages into
172      # console which may cause interleaving with LAVA_SIGNAL_STARTTC in some
173      # devices like a618.
174      'sleep 1',
175
176      # Putting CI_JOB name as the testcase name, it may help LAVA farm
177      # maintainers with monitoring
178      f"lava-test-case 'mesa-ci_{args.mesa_job_name}' --shell /init-stage2.sh",
179    ]
180
181    values['actions'] = [
182      { 'deploy': deploy },
183      { 'boot': boot },
184      { 'test': test },
185    ]
186
187    return yaml.dump(values, width=10000000)
188
189
190def setup_lava_proxy():
191    config = lavacli.load_config("default")
192    uri, usr, tok = (config.get(key) for key in ("uri", "username", "token"))
193    uri_obj = urllib.parse.urlparse(uri)
194    uri_str = "{}://{}:{}@{}{}".format(uri_obj.scheme, usr, tok, uri_obj.netloc, uri_obj.path)
195    transport = lavacli.RequestsTransport(
196        uri_obj.scheme,
197        config.get("proxy"),
198        config.get("timeout", 120.0),
199        config.get("verify_ssl_cert", True),
200    )
201    proxy = xmlrpc.client.ServerProxy(
202        uri_str, allow_none=True, transport=transport)
203
204    print_log("Proxy for {} created.".format(config['uri']))
205
206    return proxy
207
208
209def _call_proxy(fn, *args):
210    retries = 60
211    for n in range(1, retries + 1):
212        try:
213            return fn(*args)
214        except xmlrpc.client.ProtocolError as err:
215            if n == retries:
216                traceback.print_exc()
217                fatal_err("A protocol error occurred (Err {} {})".format(err.errcode, err.errmsg))
218            else:
219                time.sleep(15)
220        except xmlrpc.client.Fault as err:
221            traceback.print_exc()
222            fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode))
223
224
225class LAVAJob:
226    COLOR_STATUS_MAP = {
227        "pass": CONSOLE_LOG["FG_GREEN"],
228        "hung": CONSOLE_LOG["FG_YELLOW"],
229        "fail": CONSOLE_LOG["FG_RED"],
230        "canceled": CONSOLE_LOG["FG_MAGENTA"],
231    }
232
233    def __init__(self, proxy, definition):
234        self.job_id = None
235        self.proxy = proxy
236        self.definition = definition
237        self.last_log_line = 0
238        self.last_log_time = None
239        self.is_finished = False
240        self.status = "created"
241
242    def heartbeat(self):
243        self.last_log_time = datetime.now()
244        self.status = "running"
245
246    def validate(self) -> Optional[dict]:
247        """Returns a dict with errors, if the validation fails.
248
249        Returns:
250            Optional[dict]: a dict with the validation errors, if any
251        """
252        return _call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True)
253
254    def submit(self):
255        try:
256            self.job_id = _call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
257        except MesaCIException:
258            return False
259        return True
260
261    def cancel(self):
262        if self.job_id:
263            self.proxy.scheduler.jobs.cancel(self.job_id)
264
265    def is_started(self) -> bool:
266        waiting_states = ["Submitted", "Scheduling", "Scheduled"]
267        job_state: dict[str, str] = _call_proxy(
268            self.proxy.scheduler.job_state, self.job_id
269        )
270        return job_state["job_state"] not in waiting_states
271
272    def _load_log_from_data(self, data) -> list[str]:
273        lines = []
274        # When there is no new log data, the YAML is empty
275        if loaded_lines := yaml.load(str(data), Loader=loader(False)):
276            lines = loaded_lines
277            self.last_log_line += len(lines)
278        return lines
279
280    def get_logs(self) -> list[str]:
281        try:
282            (finished, data) = _call_proxy(
283                self.proxy.scheduler.jobs.logs, self.job_id, self.last_log_line
284            )
285            self.is_finished = finished
286            return self._load_log_from_data(data)
287
288        except Exception as mesa_ci_err:
289            raise MesaCIParseException(
290                f"Could not get LAVA job logs. Reason: {mesa_ci_err}"
291            ) from mesa_ci_err
292
293    def parse_job_result_from_log(
294        self, lava_lines: list[dict[str, str]]
295    ) -> list[dict[str, str]]:
296        """Use the console log to catch if the job has completed successfully or
297        not. Returns the list of log lines until the result line."""
298
299        last_line = None  # Print all lines. lines[:None] == lines[:]
300
301        for idx, line in enumerate(lava_lines):
302            if result := re.search(r"hwci: mesa: (pass|fail)", line):
303                self.is_finished = True
304                self.status = result.group(1)
305
306                last_line = idx + 1
307                # We reached the log end here. hwci script has finished.
308                break
309        return lava_lines[:last_line]
310
311
312def find_exception_from_metadata(metadata, job_id):
313    if "result" not in metadata or metadata["result"] != "fail":
314        return
315    if "error_type" in metadata:
316        error_type = metadata["error_type"]
317        if error_type == "Infrastructure":
318            raise MesaCIException(
319                f"LAVA job {job_id} failed with Infrastructure Error. Retry."
320            )
321        if error_type == "Job":
322            # This happens when LAVA assumes that the job cannot terminate or
323            # with mal-formed job definitions. As we are always validating the
324            # jobs, only the former is probable to happen. E.g.: When some LAVA
325            # action timed out more times than expected in job definition.
326            raise MesaCIException(
327                f"LAVA job {job_id} failed with JobError "
328                "(possible LAVA timeout misconfiguration/bug). Retry."
329            )
330    if "case" in metadata and metadata["case"] == "validate":
331        raise MesaCIException(
332            f"LAVA job {job_id} failed validation (possible download error). Retry."
333        )
334    return metadata
335
336
337def find_lava_error(job) -> None:
338    # Look for infrastructure errors and retry if we see them.
339    results_yaml = _call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
340    results = yaml.load(results_yaml, Loader=loader(False))
341    for res in results:
342        metadata = res["metadata"]
343        find_exception_from_metadata(metadata, job.job_id)
344
345    # If we reach this far, it means that the job ended without hwci script
346    # result and no LAVA infrastructure problem was found
347    job.status = "fail"
348
349
350def show_job_data(job):
351    with GitlabSection(
352        "job_data",
353        "LAVA job info",
354        type=LogSectionType.LAVA_POST_PROCESSING,
355        start_collapsed=True,
356    ):
357        show = _call_proxy(job.proxy.scheduler.jobs.show, job.job_id)
358        for field, value in show.items():
359            print("{}\t: {}".format(field, value))
360
361
362def fetch_logs(job, max_idle_time, log_follower) -> None:
363    # Poll to check for new logs, assuming that a prolonged period of
364    # silence means that the device has died and we should try it again
365    if datetime.now() - job.last_log_time > max_idle_time:
366        max_idle_time_min = max_idle_time.total_seconds() / 60
367
368        raise MesaCITimeoutError(
369            f"{CONSOLE_LOG['BOLD']}"
370            f"{CONSOLE_LOG['FG_YELLOW']}"
371            f"LAVA job {job.job_id} does not respond for {max_idle_time_min} "
372            "minutes. Retry."
373            f"{CONSOLE_LOG['RESET']}",
374            timeout_duration=max_idle_time,
375        )
376
377    time.sleep(LOG_POLLING_TIME_SEC)
378
379    # The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
380    # Retry the log fetching several times before exposing the error.
381    for _ in range(5):
382        with contextlib.suppress(MesaCIParseException):
383            new_log_lines = job.get_logs()
384            break
385    else:
386        raise MesaCIParseException
387
388    if log_follower.feed(new_log_lines):
389        # If we had non-empty log data, we can assure that the device is alive.
390        job.heartbeat()
391    parsed_lines = log_follower.flush()
392
393    # Only parse job results when the script reaches the end of the logs.
394    # Depending on how much payload the RPC scheduler.jobs.logs get, it may
395    # reach the LAVA_POST_PROCESSING phase.
396    if log_follower.current_section.type in (
397        LogSectionType.TEST_CASE,
398        LogSectionType.LAVA_POST_PROCESSING,
399    ):
400        parsed_lines = job.parse_job_result_from_log(parsed_lines)
401
402    for line in parsed_lines:
403        print_log(line)
404
405
406def follow_job_execution(job):
407    try:
408        job.submit()
409    except Exception as mesa_ci_err:
410        raise MesaCIException(
411            f"Could not submit LAVA job. Reason: {mesa_ci_err}"
412        ) from mesa_ci_err
413
414    print_log(f"Waiting for job {job.job_id} to start.")
415    while not job.is_started():
416        time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
417    print_log(f"Job {job.job_id} started.")
418
419    gl = GitlabSection(
420        id="lava_boot",
421        header="LAVA boot",
422        type=LogSectionType.LAVA_BOOT,
423        start_collapsed=True,
424    )
425    print(gl.start())
426    max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
427    with LogFollower(current_section=gl) as lf:
428
429        max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
430        # Start to check job's health
431        job.heartbeat()
432        while not job.is_finished:
433            fetch_logs(job, max_idle_time, lf)
434
435    show_job_data(job)
436
437    # Mesa Developers expect to have a simple pass/fail job result.
438    # If this does not happen, it probably means a LAVA infrastructure error
439    # happened.
440    if job.status not in ["pass", "fail"]:
441        find_lava_error(job)
442
443
444def print_job_final_status(job):
445    if job.status == "running":
446        job.status = "hung"
447
448    color = LAVAJob.COLOR_STATUS_MAP.get(job.status, CONSOLE_LOG["FG_RED"])
449    print_log(
450        f"{color}"
451        f"LAVA Job finished with status: {job.status}"
452        f"{CONSOLE_LOG['RESET']}"
453    )
454
455
456def retriable_follow_job(proxy, job_definition) -> LAVAJob:
457    retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
458
459    for attempt_no in range(1, retry_count + 2):
460        job = LAVAJob(proxy, job_definition)
461        try:
462            follow_job_execution(job)
463            return job
464        except MesaCIKnownIssueException as found_issue:
465            print_log(found_issue)
466            job.status = "canceled"
467        except MesaCIException as mesa_exception:
468            print_log(mesa_exception)
469            job.cancel()
470        except KeyboardInterrupt as e:
471            print_log("LAVA job submitter was interrupted. Cancelling the job.")
472            job.cancel()
473            raise e
474        finally:
475            print_log(
476                f"{CONSOLE_LOG['BOLD']}"
477                f"Finished executing LAVA job in the attempt #{attempt_no}"
478                f"{CONSOLE_LOG['RESET']}"
479            )
480            print_job_final_status(job)
481
482    raise MesaCIRetryError(
483        f"{CONSOLE_LOG['BOLD']}"
484        f"{CONSOLE_LOG['FG_RED']}"
485        "Job failed after it exceeded the number of "
486        f"{retry_count} retries."
487        f"{CONSOLE_LOG['RESET']}",
488        retry_count=retry_count,
489    )
490
491
492def treat_mesa_job_name(args):
493    # Remove mesa job names with spaces, which breaks the lava-test-case command
494    args.mesa_job_name = args.mesa_job_name.split(" ")[0]
495
496
497def main(args):
498    proxy = setup_lava_proxy()
499
500    job_definition = generate_lava_yaml(args)
501
502    if args.dump_yaml:
503        with GitlabSection(
504            "yaml_dump",
505            "LAVA job definition (YAML)",
506            type=LogSectionType.LAVA_BOOT,
507            start_collapsed=True,
508        ):
509            print(hide_sensitive_data(job_definition))
510    job = LAVAJob(proxy, job_definition)
511
512    if errors := job.validate():
513        fatal_err(f"Error in LAVA job definition: {errors}")
514    print_log("LAVA job definition validated successfully")
515
516    if args.validate_only:
517        return
518
519    finished_job = retriable_follow_job(proxy, job_definition)
520    exit_code = 0 if finished_job.status == "pass" else 1
521    sys.exit(exit_code)
522
523
524def create_parser():
525    parser = argparse.ArgumentParser("LAVA job submitter")
526
527    parser.add_argument("--pipeline-info")
528    parser.add_argument("--rootfs-url-prefix")
529    parser.add_argument("--kernel-url-prefix")
530    parser.add_argument("--build-url")
531    parser.add_argument("--job-rootfs-overlay-url")
532    parser.add_argument("--job-timeout", type=int)
533    parser.add_argument("--first-stage-init")
534    parser.add_argument("--ci-project-dir")
535    parser.add_argument("--device-type")
536    parser.add_argument("--dtb", nargs='?', default="")
537    parser.add_argument("--kernel-image-name")
538    parser.add_argument("--kernel-image-type", nargs='?', default="")
539    parser.add_argument("--boot-method")
540    parser.add_argument("--lava-tags", nargs='?', default="")
541    parser.add_argument("--jwt-file", type=pathlib.Path)
542    parser.add_argument("--validate-only", action='store_true')
543    parser.add_argument("--dump-yaml", action='store_true')
544    parser.add_argument("--visibility-group")
545    parser.add_argument("--mesa-job-name")
546
547    return parser
548
549
550if __name__ == "__main__":
551    # given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
552    # GitLab runner -> GitLab primary -> user, safe to say we don't need any
553    # more buffering
554    sys.stdout.reconfigure(line_buffering=True)
555    sys.stderr.reconfigure(line_buffering=True)
556
557    parser = create_parser()
558
559    parser.set_defaults(func=main)
560    args = parser.parse_args()
561    treat_mesa_job_name(args)
562    args.func(args)
563