1#!/usr/bin/env python3 2# 3# Copyright (C) 2020 - 2022 Collabora Limited 4# Authors: 5# Gustavo Padovan <gustavo.padovan@collabora.com> 6# Guilherme Gallo <guilherme.gallo@collabora.com> 7# 8# SPDX-License-Identifier: MIT 9 10"""Send a job to LAVA, track it and collect log back""" 11 12 13import argparse 14import contextlib 15import pathlib 16import re 17import sys 18import time 19import traceback 20import urllib.parse 21import xmlrpc.client 22from datetime import datetime, timedelta 23from os import getenv 24from typing import Any, Optional 25 26import lavacli 27import yaml 28from lava.exceptions import ( 29 MesaCIException, 30 MesaCIKnownIssueException, 31 MesaCIParseException, 32 MesaCIRetryError, 33 MesaCITimeoutError, 34) 35from lava.utils import ( 36 CONSOLE_LOG, 37 GitlabSection, 38 LogFollower, 39 LogSectionType, 40 fatal_err, 41 hide_sensitive_data, 42 print_log, 43) 44from lavacli.utils import loader 45 46# Timeout in seconds to decide if the device from the dispatched LAVA job has 47# hung or not due to the lack of new log output. 48DEVICE_HANGING_TIMEOUT_SEC = int(getenv("LAVA_DEVICE_HANGING_TIMEOUT_SEC", 5*60)) 49 50# How many seconds the script should wait before try a new polling iteration to 51# check if the dispatched LAVA job is running or waiting in the job queue. 52WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 10)) 53 54# How many seconds to wait between log output LAVA RPC calls. 55LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5)) 56 57# How many retries should be made when a timeout happen. 58NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2)) 59 60# How many attempts should be made when a timeout happen during LAVA device boot. 61NUMBER_OF_ATTEMPTS_LAVA_BOOT = int(getenv("LAVA_NUMBER_OF_ATTEMPTS_LAVA_BOOT", 3)) 62 63 64def generate_lava_yaml(args): 65 # General metadata and permissions, plus also inexplicably kernel arguments 66 values = { 67 'job_name': 'mesa: {}'.format(args.pipeline_info), 68 'device_type': args.device_type, 69 'visibility': { 'group': [ args.visibility_group ] }, 70 'priority': 75, 71 'context': { 72 'extra_nfsroot_args': ' init=/init rootwait usbcore.quirks=0bda:8153:k' 73 }, 74 "timeouts": { 75 "job": {"minutes": args.job_timeout}, 76 "action": {"minutes": 3}, 77 "actions": { 78 "depthcharge-action": { 79 "minutes": 3 * NUMBER_OF_ATTEMPTS_LAVA_BOOT, 80 } 81 } 82 }, 83 } 84 85 if args.lava_tags: 86 values['tags'] = args.lava_tags.split(',') 87 88 # URLs to our kernel rootfs to boot from, both generated by the base 89 # container build 90 deploy = { 91 'timeout': { 'minutes': 10 }, 92 'to': 'tftp', 93 'os': 'oe', 94 'kernel': { 95 'url': '{}/{}'.format(args.kernel_url_prefix, args.kernel_image_name), 96 }, 97 'nfsrootfs': { 98 'url': '{}/lava-rootfs.tgz'.format(args.rootfs_url_prefix), 99 'compression': 'gz', 100 } 101 } 102 if args.kernel_image_type: 103 deploy['kernel']['type'] = args.kernel_image_type 104 if args.dtb: 105 deploy['dtb'] = { 106 'url': '{}/{}.dtb'.format(args.kernel_url_prefix, args.dtb) 107 } 108 109 # always boot over NFS 110 boot = { 111 "failure_retry": NUMBER_OF_ATTEMPTS_LAVA_BOOT, 112 "method": args.boot_method, 113 "commands": "nfs", 114 "prompts": ["lava-shell:"], 115 } 116 117 # skeleton test definition: only declaring each job as a single 'test' 118 # since LAVA's test parsing is not useful to us 119 run_steps = [] 120 test = { 121 'timeout': { 'minutes': args.job_timeout }, 122 'failure_retry': 1, 123 'definitions': [ { 124 'name': 'mesa', 125 'from': 'inline', 126 'lava-signal': 'kmsg', 127 'path': 'inline/mesa.yaml', 128 'repository': { 129 'metadata': { 130 'name': 'mesa', 131 'description': 'Mesa test plan', 132 'os': [ 'oe' ], 133 'scope': [ 'functional' ], 134 'format': 'Lava-Test Test Definition 1.0', 135 }, 136 'run': { 137 "steps": run_steps 138 }, 139 }, 140 } ], 141 } 142 143 # job execution script: 144 # - inline .gitlab-ci/common/init-stage1.sh 145 # - fetch and unpack per-pipeline build artifacts from build job 146 # - fetch and unpack per-job environment from lava-submit.sh 147 # - exec .gitlab-ci/common/init-stage2.sh 148 149 with open(args.first_stage_init, 'r') as init_sh: 150 run_steps += [ x.rstrip() for x in init_sh if not x.startswith('#') and x.rstrip() ] 151 152 if args.jwt_file: 153 with open(args.jwt_file) as jwt_file: 154 run_steps += [ 155 "set +x", 156 f'echo -n "{jwt_file.read()}" > "{args.jwt_file}" # HIDEME', 157 "set -x", 158 f'echo "export CI_JOB_JWT_FILE={args.jwt_file}" >> /set-job-env-vars.sh', 159 ] 160 else: 161 run_steps += [ 162 "echo Could not find jwt file, disabling MINIO requests...", 163 "sed -i '/MINIO_RESULTS_UPLOAD/d' /set-job-env-vars.sh", 164 ] 165 166 run_steps += [ 167 'mkdir -p {}'.format(args.ci_project_dir), 168 'wget -S --progress=dot:giga -O- {} | tar -xz -C {}'.format(args.build_url, args.ci_project_dir), 169 'wget -S --progress=dot:giga -O- {} | tar -xz -C /'.format(args.job_rootfs_overlay_url), 170 171 # Sleep a bit to give time for bash to dump shell xtrace messages into 172 # console which may cause interleaving with LAVA_SIGNAL_STARTTC in some 173 # devices like a618. 174 'sleep 1', 175 176 # Putting CI_JOB name as the testcase name, it may help LAVA farm 177 # maintainers with monitoring 178 f"lava-test-case 'mesa-ci_{args.mesa_job_name}' --shell /init-stage2.sh", 179 ] 180 181 values['actions'] = [ 182 { 'deploy': deploy }, 183 { 'boot': boot }, 184 { 'test': test }, 185 ] 186 187 return yaml.dump(values, width=10000000) 188 189 190def setup_lava_proxy(): 191 config = lavacli.load_config("default") 192 uri, usr, tok = (config.get(key) for key in ("uri", "username", "token")) 193 uri_obj = urllib.parse.urlparse(uri) 194 uri_str = "{}://{}:{}@{}{}".format(uri_obj.scheme, usr, tok, uri_obj.netloc, uri_obj.path) 195 transport = lavacli.RequestsTransport( 196 uri_obj.scheme, 197 config.get("proxy"), 198 config.get("timeout", 120.0), 199 config.get("verify_ssl_cert", True), 200 ) 201 proxy = xmlrpc.client.ServerProxy( 202 uri_str, allow_none=True, transport=transport) 203 204 print_log("Proxy for {} created.".format(config['uri'])) 205 206 return proxy 207 208 209def _call_proxy(fn, *args): 210 retries = 60 211 for n in range(1, retries + 1): 212 try: 213 return fn(*args) 214 except xmlrpc.client.ProtocolError as err: 215 if n == retries: 216 traceback.print_exc() 217 fatal_err("A protocol error occurred (Err {} {})".format(err.errcode, err.errmsg)) 218 else: 219 time.sleep(15) 220 except xmlrpc.client.Fault as err: 221 traceback.print_exc() 222 fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode)) 223 224 225class LAVAJob: 226 COLOR_STATUS_MAP = { 227 "pass": CONSOLE_LOG["FG_GREEN"], 228 "hung": CONSOLE_LOG["FG_YELLOW"], 229 "fail": CONSOLE_LOG["FG_RED"], 230 "canceled": CONSOLE_LOG["FG_MAGENTA"], 231 } 232 233 def __init__(self, proxy, definition): 234 self.job_id = None 235 self.proxy = proxy 236 self.definition = definition 237 self.last_log_line = 0 238 self.last_log_time = None 239 self.is_finished = False 240 self.status = "created" 241 242 def heartbeat(self): 243 self.last_log_time = datetime.now() 244 self.status = "running" 245 246 def validate(self) -> Optional[dict]: 247 """Returns a dict with errors, if the validation fails. 248 249 Returns: 250 Optional[dict]: a dict with the validation errors, if any 251 """ 252 return _call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True) 253 254 def submit(self): 255 try: 256 self.job_id = _call_proxy(self.proxy.scheduler.jobs.submit, self.definition) 257 except MesaCIException: 258 return False 259 return True 260 261 def cancel(self): 262 if self.job_id: 263 self.proxy.scheduler.jobs.cancel(self.job_id) 264 265 def is_started(self) -> bool: 266 waiting_states = ["Submitted", "Scheduling", "Scheduled"] 267 job_state: dict[str, str] = _call_proxy( 268 self.proxy.scheduler.job_state, self.job_id 269 ) 270 return job_state["job_state"] not in waiting_states 271 272 def _load_log_from_data(self, data) -> list[str]: 273 lines = [] 274 # When there is no new log data, the YAML is empty 275 if loaded_lines := yaml.load(str(data), Loader=loader(False)): 276 lines = loaded_lines 277 self.last_log_line += len(lines) 278 return lines 279 280 def get_logs(self) -> list[str]: 281 try: 282 (finished, data) = _call_proxy( 283 self.proxy.scheduler.jobs.logs, self.job_id, self.last_log_line 284 ) 285 self.is_finished = finished 286 return self._load_log_from_data(data) 287 288 except Exception as mesa_ci_err: 289 raise MesaCIParseException( 290 f"Could not get LAVA job logs. Reason: {mesa_ci_err}" 291 ) from mesa_ci_err 292 293 def parse_job_result_from_log( 294 self, lava_lines: list[dict[str, str]] 295 ) -> list[dict[str, str]]: 296 """Use the console log to catch if the job has completed successfully or 297 not. Returns the list of log lines until the result line.""" 298 299 last_line = None # Print all lines. lines[:None] == lines[:] 300 301 for idx, line in enumerate(lava_lines): 302 if result := re.search(r"hwci: mesa: (pass|fail)", line): 303 self.is_finished = True 304 self.status = result.group(1) 305 306 last_line = idx + 1 307 # We reached the log end here. hwci script has finished. 308 break 309 return lava_lines[:last_line] 310 311 312def find_exception_from_metadata(metadata, job_id): 313 if "result" not in metadata or metadata["result"] != "fail": 314 return 315 if "error_type" in metadata: 316 error_type = metadata["error_type"] 317 if error_type == "Infrastructure": 318 raise MesaCIException( 319 f"LAVA job {job_id} failed with Infrastructure Error. Retry." 320 ) 321 if error_type == "Job": 322 # This happens when LAVA assumes that the job cannot terminate or 323 # with mal-formed job definitions. As we are always validating the 324 # jobs, only the former is probable to happen. E.g.: When some LAVA 325 # action timed out more times than expected in job definition. 326 raise MesaCIException( 327 f"LAVA job {job_id} failed with JobError " 328 "(possible LAVA timeout misconfiguration/bug). Retry." 329 ) 330 if "case" in metadata and metadata["case"] == "validate": 331 raise MesaCIException( 332 f"LAVA job {job_id} failed validation (possible download error). Retry." 333 ) 334 return metadata 335 336 337def find_lava_error(job) -> None: 338 # Look for infrastructure errors and retry if we see them. 339 results_yaml = _call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id) 340 results = yaml.load(results_yaml, Loader=loader(False)) 341 for res in results: 342 metadata = res["metadata"] 343 find_exception_from_metadata(metadata, job.job_id) 344 345 # If we reach this far, it means that the job ended without hwci script 346 # result and no LAVA infrastructure problem was found 347 job.status = "fail" 348 349 350def show_job_data(job): 351 with GitlabSection( 352 "job_data", 353 "LAVA job info", 354 type=LogSectionType.LAVA_POST_PROCESSING, 355 start_collapsed=True, 356 ): 357 show = _call_proxy(job.proxy.scheduler.jobs.show, job.job_id) 358 for field, value in show.items(): 359 print("{}\t: {}".format(field, value)) 360 361 362def fetch_logs(job, max_idle_time, log_follower) -> None: 363 # Poll to check for new logs, assuming that a prolonged period of 364 # silence means that the device has died and we should try it again 365 if datetime.now() - job.last_log_time > max_idle_time: 366 max_idle_time_min = max_idle_time.total_seconds() / 60 367 368 raise MesaCITimeoutError( 369 f"{CONSOLE_LOG['BOLD']}" 370 f"{CONSOLE_LOG['FG_YELLOW']}" 371 f"LAVA job {job.job_id} does not respond for {max_idle_time_min} " 372 "minutes. Retry." 373 f"{CONSOLE_LOG['RESET']}", 374 timeout_duration=max_idle_time, 375 ) 376 377 time.sleep(LOG_POLLING_TIME_SEC) 378 379 # The XMLRPC binary packet may be corrupted, causing a YAML scanner error. 380 # Retry the log fetching several times before exposing the error. 381 for _ in range(5): 382 with contextlib.suppress(MesaCIParseException): 383 new_log_lines = job.get_logs() 384 break 385 else: 386 raise MesaCIParseException 387 388 if log_follower.feed(new_log_lines): 389 # If we had non-empty log data, we can assure that the device is alive. 390 job.heartbeat() 391 parsed_lines = log_follower.flush() 392 393 # Only parse job results when the script reaches the end of the logs. 394 # Depending on how much payload the RPC scheduler.jobs.logs get, it may 395 # reach the LAVA_POST_PROCESSING phase. 396 if log_follower.current_section.type in ( 397 LogSectionType.TEST_CASE, 398 LogSectionType.LAVA_POST_PROCESSING, 399 ): 400 parsed_lines = job.parse_job_result_from_log(parsed_lines) 401 402 for line in parsed_lines: 403 print_log(line) 404 405 406def follow_job_execution(job): 407 try: 408 job.submit() 409 except Exception as mesa_ci_err: 410 raise MesaCIException( 411 f"Could not submit LAVA job. Reason: {mesa_ci_err}" 412 ) from mesa_ci_err 413 414 print_log(f"Waiting for job {job.job_id} to start.") 415 while not job.is_started(): 416 time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC) 417 print_log(f"Job {job.job_id} started.") 418 419 gl = GitlabSection( 420 id="lava_boot", 421 header="LAVA boot", 422 type=LogSectionType.LAVA_BOOT, 423 start_collapsed=True, 424 ) 425 print(gl.start()) 426 max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC) 427 with LogFollower(current_section=gl) as lf: 428 429 max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC) 430 # Start to check job's health 431 job.heartbeat() 432 while not job.is_finished: 433 fetch_logs(job, max_idle_time, lf) 434 435 show_job_data(job) 436 437 # Mesa Developers expect to have a simple pass/fail job result. 438 # If this does not happen, it probably means a LAVA infrastructure error 439 # happened. 440 if job.status not in ["pass", "fail"]: 441 find_lava_error(job) 442 443 444def print_job_final_status(job): 445 if job.status == "running": 446 job.status = "hung" 447 448 color = LAVAJob.COLOR_STATUS_MAP.get(job.status, CONSOLE_LOG["FG_RED"]) 449 print_log( 450 f"{color}" 451 f"LAVA Job finished with status: {job.status}" 452 f"{CONSOLE_LOG['RESET']}" 453 ) 454 455 456def retriable_follow_job(proxy, job_definition) -> LAVAJob: 457 retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION 458 459 for attempt_no in range(1, retry_count + 2): 460 job = LAVAJob(proxy, job_definition) 461 try: 462 follow_job_execution(job) 463 return job 464 except MesaCIKnownIssueException as found_issue: 465 print_log(found_issue) 466 job.status = "canceled" 467 except MesaCIException as mesa_exception: 468 print_log(mesa_exception) 469 job.cancel() 470 except KeyboardInterrupt as e: 471 print_log("LAVA job submitter was interrupted. Cancelling the job.") 472 job.cancel() 473 raise e 474 finally: 475 print_log( 476 f"{CONSOLE_LOG['BOLD']}" 477 f"Finished executing LAVA job in the attempt #{attempt_no}" 478 f"{CONSOLE_LOG['RESET']}" 479 ) 480 print_job_final_status(job) 481 482 raise MesaCIRetryError( 483 f"{CONSOLE_LOG['BOLD']}" 484 f"{CONSOLE_LOG['FG_RED']}" 485 "Job failed after it exceeded the number of " 486 f"{retry_count} retries." 487 f"{CONSOLE_LOG['RESET']}", 488 retry_count=retry_count, 489 ) 490 491 492def treat_mesa_job_name(args): 493 # Remove mesa job names with spaces, which breaks the lava-test-case command 494 args.mesa_job_name = args.mesa_job_name.split(" ")[0] 495 496 497def main(args): 498 proxy = setup_lava_proxy() 499 500 job_definition = generate_lava_yaml(args) 501 502 if args.dump_yaml: 503 with GitlabSection( 504 "yaml_dump", 505 "LAVA job definition (YAML)", 506 type=LogSectionType.LAVA_BOOT, 507 start_collapsed=True, 508 ): 509 print(hide_sensitive_data(job_definition)) 510 job = LAVAJob(proxy, job_definition) 511 512 if errors := job.validate(): 513 fatal_err(f"Error in LAVA job definition: {errors}") 514 print_log("LAVA job definition validated successfully") 515 516 if args.validate_only: 517 return 518 519 finished_job = retriable_follow_job(proxy, job_definition) 520 exit_code = 0 if finished_job.status == "pass" else 1 521 sys.exit(exit_code) 522 523 524def create_parser(): 525 parser = argparse.ArgumentParser("LAVA job submitter") 526 527 parser.add_argument("--pipeline-info") 528 parser.add_argument("--rootfs-url-prefix") 529 parser.add_argument("--kernel-url-prefix") 530 parser.add_argument("--build-url") 531 parser.add_argument("--job-rootfs-overlay-url") 532 parser.add_argument("--job-timeout", type=int) 533 parser.add_argument("--first-stage-init") 534 parser.add_argument("--ci-project-dir") 535 parser.add_argument("--device-type") 536 parser.add_argument("--dtb", nargs='?', default="") 537 parser.add_argument("--kernel-image-name") 538 parser.add_argument("--kernel-image-type", nargs='?', default="") 539 parser.add_argument("--boot-method") 540 parser.add_argument("--lava-tags", nargs='?', default="") 541 parser.add_argument("--jwt-file", type=pathlib.Path) 542 parser.add_argument("--validate-only", action='store_true') 543 parser.add_argument("--dump-yaml", action='store_true') 544 parser.add_argument("--visibility-group") 545 parser.add_argument("--mesa-job-name") 546 547 return parser 548 549 550if __name__ == "__main__": 551 # given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us -> 552 # GitLab runner -> GitLab primary -> user, safe to say we don't need any 553 # more buffering 554 sys.stdout.reconfigure(line_buffering=True) 555 sys.stderr.reconfigure(line_buffering=True) 556 557 parser = create_parser() 558 559 parser.set_defaults(func=main) 560 args = parser.parse_args() 561 treat_mesa_job_name(args) 562 args.func(args) 563