1#!/usr/bin/env python3 2# 3# Copyright (C) 2020 - 2023 Collabora Limited 4# Authors: 5# Gustavo Padovan <gustavo.padovan@collabora.com> 6# Guilherme Gallo <guilherme.gallo@collabora.com> 7# 8# SPDX-License-Identifier: MIT 9 10"""Send a job to LAVA, track it and collect log back""" 11 12import contextlib 13import json 14import pathlib 15import sys 16import time 17from collections import defaultdict 18from dataclasses import dataclass, field, fields 19from datetime import datetime, timedelta, UTC 20from os import environ, getenv 21from typing import Any, Optional, Self 22 23import fire 24from lavacli.utils import flow_yaml as lava_yaml 25 26from lava.exceptions import ( 27 MesaCIException, 28 MesaCIFatalException, 29 MesaCIRetriableException, 30 MesaCIParseException, 31 MesaCIRetryError, 32 MesaCITimeoutError, 33) 34from lava.utils import ( 35 CONSOLE_LOG, 36 GitlabSection, 37 LAVAJob, 38 LAVAJobDefinition, 39 LogFollower, 40 LogSectionType, 41 call_proxy, 42 fatal_err, 43 hide_sensitive_data, 44 print_log, 45 setup_lava_proxy, 46) 47from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS 48 49# Initialize structural logging with a defaultdict, it can be changed for more 50# sophisticated dict-like data abstractions. 51STRUCTURAL_LOG = defaultdict(list) 52 53try: 54 from structured_logger import StructuredLogger 55except ImportError as e: 56 print_log( 57 f"Could not import StructuredLogger library: {e}. " 58 "Falling back to defaultdict based structured logger." 59 ) 60 61# Timeout in seconds to decide if the device from the dispatched LAVA job has 62# hung or not due to the lack of new log output. 63DEVICE_HANGING_TIMEOUT_SEC = int(getenv("DEVICE_HANGING_TIMEOUT_SEC", 5 * 60)) 64 65# How many seconds the script should wait before try a new polling iteration to 66# check if the dispatched LAVA job is running or waiting in the job queue. 67WAIT_FOR_DEVICE_POLLING_TIME_SEC = int( 68 getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 1) 69) 70 71# How many seconds the script will wait to let LAVA finalize the job and give 72# the final details. 73WAIT_FOR_LAVA_POST_PROCESSING_SEC = int(getenv("LAVA_WAIT_LAVA_POST_PROCESSING_SEC", 5)) 74WAIT_FOR_LAVA_POST_PROCESSING_RETRIES = int( 75 getenv("LAVA_WAIT_LAVA_POST_PROCESSING_RETRIES", 6) 76) 77 78# How many seconds to wait between log output LAVA RPC calls. 79LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5)) 80 81# How many retries should be made when a timeout happen. 82NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int( 83 getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2) 84) 85 86CI_JOB_TIMEOUT_SEC = int(getenv("CI_JOB_TIMEOUT", 3600)) 87# How many seconds the script will wait to let LAVA run the job and give the final details. 88EXPECTED_JOB_DURATION_SEC = int(getenv("EXPECTED_JOB_DURATION_SEC", 60 * 10)) 89# CI_JOB_STARTED is given by GitLab CI/CD in UTC timezone by default. 90CI_JOB_STARTED_AT_RAW = getenv("CI_JOB_STARTED_AT", "") 91CI_JOB_STARTED_AT: datetime = ( 92 datetime.fromisoformat(CI_JOB_STARTED_AT_RAW) 93 if CI_JOB_STARTED_AT_RAW 94 else datetime.now(tz=UTC) 95) 96 97 98def raise_exception_from_metadata(metadata: dict, job_id: int) -> None: 99 """ 100 Investigate infrastructure errors from the job metadata. 101 If it finds an error, raise it as MesaCIRetriableException. 102 """ 103 if "result" not in metadata or metadata["result"] != "fail": 104 return 105 if "error_type" in metadata: 106 error_type: str = metadata["error_type"] 107 error_msg: str = metadata.get("error_msg", "") 108 full_err_msg: str = error_type if not error_msg else f"{error_type}: {error_msg}" 109 if error_type == "Job": 110 # This happens when LAVA assumes that the job cannot terminate or 111 # with mal-formed job definitions. As we are always validating the 112 # jobs, only the former is probable to happen. E.g.: When some LAVA 113 # action timed out more times than expected in job definition. 114 raise MesaCIRetriableException( 115 f"LAVA job {job_id} failed with {full_err_msg}. Retry." 116 "(possible LAVA timeout misconfiguration/bug). Retry." 117 ) 118 if error_type: 119 raise MesaCIRetriableException( 120 f"LAVA job {job_id} failed with error type: {full_err_msg}. Retry." 121 ) 122 if "case" in metadata and metadata["case"] == "validate": 123 raise MesaCIRetriableException( 124 f"LAVA job {job_id} failed validation (possible download error). Retry." 125 ) 126 127 128def raise_lava_error(job) -> None: 129 # Look for infrastructure errors, raise them, and retry if we see them. 130 results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id) 131 results = lava_yaml.load(results_yaml) 132 for res in results: 133 metadata = res["metadata"] 134 raise_exception_from_metadata(metadata, job.job_id) 135 136 # If we reach this far, it means that the job ended without hwci script 137 # result and no LAVA infrastructure problem was found 138 job.status = "fail" 139 140 141 142def fetch_logs(job, max_idle_time, log_follower) -> None: 143 is_job_hanging(job, max_idle_time) 144 145 time.sleep(LOG_POLLING_TIME_SEC) 146 new_log_lines = fetch_new_log_lines(job) 147 parsed_lines = parse_log_lines(job, log_follower, new_log_lines) 148 149 for line in parsed_lines: 150 print_log(line) 151 152 153def is_job_hanging(job, max_idle_time): 154 # Poll to check for new logs, assuming that a prolonged period of 155 # silence means that the device has died and we should try it again 156 if datetime.now(tz=UTC) - job.last_log_time > max_idle_time: 157 max_idle_time_min = max_idle_time.total_seconds() / 60 158 159 raise MesaCITimeoutError( 160 f"{CONSOLE_LOG['FG_BOLD_YELLOW']}" 161 f"LAVA job {job.job_id} unresponsive for {max_idle_time_min} " 162 "minutes; retrying the job." 163 f"{CONSOLE_LOG['RESET']}", 164 timeout_duration=max_idle_time, 165 ) 166 167 168def parse_log_lines(job, log_follower, new_log_lines): 169 if log_follower.feed(new_log_lines): 170 # If we had non-empty log data, we can assure that the device is alive. 171 job.heartbeat() 172 parsed_lines = log_follower.flush() 173 174 # Only parse job results when the script reaches the end of the logs. 175 # Depending on how much payload the RPC scheduler.jobs.logs get, it may 176 # reach the LAVA_POST_PROCESSING phase. 177 if log_follower.current_section.type in ( 178 LogSectionType.TEST_CASE, 179 LogSectionType.LAVA_POST_PROCESSING, 180 ): 181 parsed_lines = job.parse_job_result_from_log(parsed_lines) 182 return parsed_lines 183 184 185def fetch_new_log_lines(job): 186 # The XMLRPC binary packet may be corrupted, causing a YAML scanner error. 187 # Retry the log fetching several times before exposing the error. 188 for _ in range(5): 189 with contextlib.suppress(MesaCIParseException): 190 new_log_lines = job.get_logs() 191 break 192 else: 193 raise MesaCIParseException 194 return new_log_lines 195 196 197def submit_job(job): 198 try: 199 job.submit() 200 except Exception as mesa_ci_err: 201 raise MesaCIRetriableException( 202 f"Could not submit LAVA job. Reason: {mesa_ci_err}" 203 ) from mesa_ci_err 204 205 206def wait_for_job_get_started(job, attempt_no): 207 print_log(f"Waiting for job {job.job_id} to start.") 208 while not job.is_started(): 209 current_job_duration_sec: int = int( 210 (datetime.now(tz=UTC) - CI_JOB_STARTED_AT).total_seconds() 211 ) 212 remaining_time_sec: int = max(0, CI_JOB_TIMEOUT_SEC - current_job_duration_sec) 213 if remaining_time_sec < EXPECTED_JOB_DURATION_SEC: 214 job.cancel() 215 raise MesaCIFatalException( 216 f"{CONSOLE_LOG['FG_BOLD_YELLOW']}" 217 f"Job {job.job_id} only has {remaining_time_sec} seconds " 218 "remaining to run, but it is expected to take at least " 219 f"{EXPECTED_JOB_DURATION_SEC} seconds." 220 f"{CONSOLE_LOG['RESET']}", 221 ) 222 time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC) 223 job.refresh_log() 224 print_log(f"Job {job.job_id} started.") 225 226 227def bootstrap_log_follower(main_test_case, timestamp_relative_to) -> LogFollower: 228 start_section = GitlabSection( 229 id="dut_boot", 230 header="Booting hardware device", 231 type=LogSectionType.LAVA_BOOT, 232 start_collapsed=True, 233 suppress_end=True, # init-stage2 prints the end for us 234 timestamp_relative_to=timestamp_relative_to, 235 ) 236 print(start_section.start()) 237 return LogFollower( 238 starting_section=start_section, 239 main_test_case=main_test_case, 240 timestamp_relative_to=timestamp_relative_to 241 ) 242 243 244def follow_job_execution(job, log_follower): 245 with log_follower: 246 max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC) 247 # Start to check job's health 248 job.heartbeat() 249 while not job.is_finished: 250 fetch_logs(job, max_idle_time, log_follower) 251 structural_log_phases(job, log_follower) 252 253 # Mesa Developers expect to have a simple pass/fail job result. 254 # If this does not happen, it probably means a LAVA infrastructure error 255 # happened. 256 if job.status not in ["pass", "fail"]: 257 raise_lava_error(job) 258 259 # LogFollower does some cleanup after the early exit (trigger by 260 # `hwci: pass|fail` regex), let's update the phases after the cleanup. 261 structural_log_phases(job, log_follower) 262 263 264def structural_log_phases(job, log_follower): 265 phases: dict[str, Any] = { 266 s.header.split(" - ")[0]: { 267 k: str(getattr(s, k)) for k in ("start_time", "end_time") 268 } 269 for s in log_follower.section_history 270 } 271 job.log["dut_job_phases"] = phases 272 273 274def print_job_final_status(job, timestamp_relative_to): 275 job.refresh_log() 276 if job.status == "running": 277 job.status = "hung" 278 279 colour = LAVAJob.COLOR_STATUS_MAP.get(job.status, CONSOLE_LOG["FG_RED"]) 280 with GitlabSection( 281 "job_data", 282 f"Hardware job info for {job.status} job", 283 type=LogSectionType.LAVA_POST_PROCESSING, 284 start_collapsed=True, 285 colour=colour, 286 timestamp_relative_to=timestamp_relative_to, 287 ): 288 wait_post_processing_retries: int = WAIT_FOR_LAVA_POST_PROCESSING_RETRIES 289 while not job.is_post_processed() and wait_post_processing_retries > 0: 290 # Wait a little until LAVA finishes processing metadata 291 time.sleep(WAIT_FOR_LAVA_POST_PROCESSING_SEC) 292 wait_post_processing_retries -= 1 293 294 if not job.is_post_processed(): 295 waited_for_sec: int = ( 296 WAIT_FOR_LAVA_POST_PROCESSING_RETRIES 297 * WAIT_FOR_LAVA_POST_PROCESSING_SEC 298 ) 299 print_log( 300 "Timed out waiting for LAVA post-processing after " 301 f"{waited_for_sec} seconds. Printing incomplete information " 302 "anyway." 303 ) 304 305 details: dict[str, str] = job.show() 306 for field, value in details.items(): 307 print(f"{field:<15}: {value}") 308 job.refresh_log() 309 310 311def execute_job_with_retries( 312 proxy, job_definition, retry_count, jobs_log, main_test_case, 313 timestamp_relative_to 314) -> Optional[LAVAJob]: 315 last_failed_job = None 316 for attempt_no in range(1, retry_count + 2): 317 # Need to get the logger value from its object to enable autosave 318 # features, if AutoSaveDict is enabled from StructuredLogging module 319 jobs_log.append({}) 320 job_log = jobs_log[-1] 321 job = LAVAJob(proxy, job_definition, job_log) 322 STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no 323 try: 324 job_log["submitter_start_time"] = datetime.now(tz=UTC).isoformat() 325 submit_job(job) 326 queue_section = GitlabSection( 327 id="dut_queue", 328 header="Waiting for hardware device to become available", 329 type=LogSectionType.LAVA_QUEUE, 330 start_collapsed=False, 331 timestamp_relative_to=timestamp_relative_to 332 ) 333 with queue_section as section: 334 wait_for_job_get_started(job, attempt_no) 335 log_follower: LogFollower = bootstrap_log_follower( 336 main_test_case, timestamp_relative_to 337 ) 338 follow_job_execution(job, log_follower) 339 return job 340 341 except (MesaCIException, KeyboardInterrupt) as exception: 342 job.handle_exception(exception) 343 344 finally: 345 print_job_final_status(job, timestamp_relative_to) 346 # If LAVA takes too long to post process the job, the submitter 347 # gives up and proceeds. 348 job_log["submitter_end_time"] = datetime.now(tz=UTC).isoformat() 349 last_failed_job = job 350 print_log( 351 f"{CONSOLE_LOG['BOLD']}" 352 f"Finished executing LAVA job in the attempt #{attempt_no}" 353 f"{CONSOLE_LOG['RESET']}" 354 ) 355 if job.exception and not isinstance(job.exception, MesaCIRetriableException): 356 break 357 358 return last_failed_job 359 360 361def retriable_follow_job( 362 proxy, job_definition, main_test_case, timestamp_relative_to 363) -> LAVAJob: 364 number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION 365 366 last_attempted_job = execute_job_with_retries( 367 proxy, job_definition, number_of_retries, STRUCTURAL_LOG["dut_jobs"], 368 main_test_case, timestamp_relative_to 369 ) 370 371 if last_attempted_job.exception is not None: 372 # Infra failed in all attempts 373 raise MesaCIRetryError( 374 f"{CONSOLE_LOG['BOLD']}" 375 f"{CONSOLE_LOG['FG_RED']}" 376 "Job failed after it exceeded the number of " 377 f"{number_of_retries} retries." 378 f"{CONSOLE_LOG['RESET']}", 379 retry_count=number_of_retries, 380 last_job=last_attempted_job, 381 ) 382 383 return last_attempted_job 384 385 386@dataclass 387class PathResolver: 388 def __post_init__(self): 389 for field in fields(self): 390 value = getattr(self, field.name) 391 if not value: 392 continue 393 if field.type == pathlib.Path: 394 value = pathlib.Path(value) 395 setattr(self, field.name, value.resolve()) 396 397 398@dataclass 399class LAVAJobSubmitter(PathResolver): 400 boot_method: str 401 device_type: str 402 farm: str 403 job_timeout_min: int # The job timeout in minutes 404 dtb_filename: str = None 405 dump_yaml: bool = False # Whether to dump the YAML payload to stdout 406 first_stage_init: str = None 407 jwt_file: pathlib.Path = None 408 kernel_image_name: str = None 409 kernel_image_type: str = "" 410 kernel_url_prefix: str = None 411 kernel_external: str = None 412 lava_tags: str | tuple[str, ...] = () # Comma-separated LAVA tags for the job 413 mesa_job_name: str = "mesa_ci_job" 414 pipeline_info: str = "" 415 rootfs_url: str = None 416 validate_only: bool = False # Whether to only validate the job, not execute it 417 visibility_group: str = None # Only affects LAVA farm maintainers 418 structured_log_file: pathlib.Path = None # Log file path with structured LAVA log 419 ssh_client_image: str = None # x86_64 SSH client image to follow the job's output 420 project_name: str = None # Project name to be used in the job name 421 starting_section: str = None # GitLab section used to start 422 job_submitted_at: [str | datetime] = None 423 __structured_log_context = contextlib.nullcontext() # Structured Logger context 424 _overlays: dict = field(default_factory=dict, init=False) 425 426 def __post_init__(self) -> Self: 427 super().__post_init__() 428 # Remove mesa job names with spaces, which breaks the lava-test-case command 429 self.mesa_job_name = self.mesa_job_name.split(" ")[0] 430 431 if self.structured_log_file: 432 self.__structured_log_context = StructuredLoggerWrapper(self).logger_context() 433 434 if self.job_submitted_at: 435 self.job_submitted_at = datetime.fromisoformat(self.job_submitted_at) 436 self.proxy = setup_lava_proxy() 437 438 return self 439 440 def append_overlay( 441 self, compression: str, name: str, path: str, url: str, format: str = "tar" 442 ) -> Self: 443 """ 444 Append an overlay to the LAVA job definition. 445 446 Args: 447 compression (str): The compression type of the overlay (e.g., "gz", "xz"). 448 name (str): The name of the overlay. 449 path (str): The path where the overlay should be applied. 450 url (str): The URL from where the overlay can be downloaded. 451 format (str, optional): The format of the overlay (default is "tar"). 452 453 Returns: 454 Self: The instance of LAVAJobSubmitter with the overlay appended. 455 """ 456 self._overlays[name] = { 457 "compression": compression, 458 "format": format, 459 "path": path, 460 "url": url, 461 } 462 return self 463 464 def print(self) -> Self: 465 """ 466 Prints the dictionary representation of the instance and returns the instance itself. 467 468 Returns: 469 Self: The instance of the class. 470 """ 471 print(self.__dict__) 472 return self 473 474 def __prepare_submission(self) -> str: 475 # Overwrite the timeout for the testcases with the value offered by the 476 # user. The testcase running time should be at least 4 times greater than 477 # the other sections (boot and setup), so we can safely ignore them. 478 # If LAVA fails to stop the job at this stage, it will fall back to the 479 # script section timeout with a reasonable delay. 480 GL_SECTION_TIMEOUTS[LogSectionType.TEST_CASE] = timedelta( 481 minutes=self.job_timeout_min 482 ) 483 484 job_definition = LAVAJobDefinition(self).generate_lava_job_definition() 485 486 if self.dump_yaml: 487 self.dump_job_definition(job_definition) 488 489 validation_job = LAVAJob(self.proxy, job_definition) 490 if errors := validation_job.validate(): 491 fatal_err(f"Error in LAVA job definition: {errors}") 492 493 return job_definition 494 495 @classmethod 496 def is_under_ci(cls): 497 ci_envvar: str = getenv("CI", "false") 498 return ci_envvar.lower() == "true" 499 500 def dump_job_definition(self, job_definition) -> None: 501 with GitlabSection( 502 "yaml_dump", 503 "LAVA job definition (YAML)", 504 type=LogSectionType.LAVA_BOOT, 505 start_collapsed=True, 506 ): 507 print(hide_sensitive_data(job_definition)) 508 509 def submit(self) -> None: 510 """ 511 Prepares and submits the LAVA job. 512 If `validate_only` is True, it validates the job without submitting it. 513 If the job finishes with a non-pass status or encounters an exception, 514 the program exits with a non-zero return code. 515 """ 516 job_definition: str = self.__prepare_submission() 517 518 if self.validate_only: 519 return 520 521 if self.starting_section: 522 gl = GitlabSection( 523 id=self.starting_section, 524 header="Preparing to submit job for scheduling", 525 type=LogSectionType.LAVA_SUBMIT, 526 start_collapsed=True, 527 timestamp_relative_to=self.job_submitted_at, 528 ) 529 gl.start() 530 print(gl.end()) 531 532 with self.__structured_log_context: 533 last_attempt_job = None 534 try: 535 last_attempt_job = retriable_follow_job( 536 self.proxy, job_definition, 537 f'{self.project_name}_{self.mesa_job_name}', 538 self.job_submitted_at) 539 540 except MesaCIRetryError as retry_exception: 541 last_attempt_job = retry_exception.last_job 542 543 except Exception as exception: 544 STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception) 545 raise exception 546 547 finally: 548 self.finish_script(last_attempt_job) 549 550 def finish_script(self, last_attempt_job): 551 if not last_attempt_job: 552 # No job was run, something bad happened 553 STRUCTURAL_LOG["job_combined_status"] = "script_crash" 554 current_exception = str(sys.exc_info()[1]) 555 STRUCTURAL_LOG["job_combined_fail_reason"] = current_exception 556 print(f"Interrupting the script. Reason: {current_exception}") 557 raise SystemExit(1) 558 559 STRUCTURAL_LOG["job_combined_status"] = last_attempt_job.status 560 STRUCTURAL_LOG["job_exit_code"] = last_attempt_job.exit_code 561 562 if last_attempt_job.status != "pass": 563 raise SystemExit(last_attempt_job.exit_code) 564 565 566class StructuredLoggerWrapper: 567 def __init__(self, submitter: LAVAJobSubmitter) -> None: 568 self.__submitter: LAVAJobSubmitter = submitter 569 570 def _init_logger(self): 571 STRUCTURAL_LOG["fixed_tags"] = self.__submitter.lava_tags 572 STRUCTURAL_LOG["dut_job_type"] = self.__submitter.device_type 573 STRUCTURAL_LOG["farm"] = self.__submitter.farm 574 STRUCTURAL_LOG["job_combined_fail_reason"] = None 575 STRUCTURAL_LOG["job_combined_status"] = "not_submitted" 576 STRUCTURAL_LOG["job_exit_code"] = None 577 STRUCTURAL_LOG["dut_attempt_counter"] = 0 578 579 # Initialize dut_jobs list to enable appends 580 STRUCTURAL_LOG["dut_jobs"] = [] 581 582 @contextlib.contextmanager 583 def _simple_logger_context(self): 584 log_file = pathlib.Path(self.__submitter.structured_log_file) 585 log_file.parent.mkdir(parents=True, exist_ok=True) 586 try: 587 # Truncate the file 588 log_file.write_text("") 589 yield 590 finally: 591 log_file.write_text(json.dumps(STRUCTURAL_LOG, indent=2)) 592 593 def logger_context(self): 594 context = contextlib.nullcontext() 595 try: 596 global STRUCTURAL_LOG 597 STRUCTURAL_LOG = StructuredLogger( 598 self.__submitter.structured_log_file, truncate=True 599 ).data 600 except NameError: 601 context = self._simple_logger_context() 602 603 self._init_logger() 604 return context 605 606 607if __name__ == "__main__": 608 # given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us -> 609 # GitLab runner -> GitLab primary -> user, safe to say we don't need any 610 # more buffering 611 sys.stdout.reconfigure(line_buffering=True) 612 sys.stderr.reconfigure(line_buffering=True) 613 614 fire.Fire(LAVAJobSubmitter) 615