1#!/usr/bin/env python3 2# 3# Copyright (C) 2020 - 2023 Collabora Limited 4# Authors: 5# Gustavo Padovan <gustavo.padovan@collabora.com> 6# Guilherme Gallo <guilherme.gallo@collabora.com> 7# 8# SPDX-License-Identifier: MIT 9 10"""Send a job to LAVA, track it and collect log back""" 11 12import contextlib 13import json 14import pathlib 15import sys 16import time 17from collections import defaultdict 18from dataclasses import dataclass, fields 19from datetime import datetime, timedelta 20from os import environ, getenv, path 21from typing import Any, Optional 22 23import fire 24from lavacli.utils import flow_yaml as lava_yaml 25 26from lava.exceptions import ( 27 MesaCIException, 28 MesaCIParseException, 29 MesaCIRetryError, 30 MesaCITimeoutError, 31) 32from lava.utils import ( 33 CONSOLE_LOG, 34 GitlabSection, 35 LAVAJob, 36 LAVAJobDefinition, 37 LogFollower, 38 LogSectionType, 39 call_proxy, 40 fatal_err, 41 hide_sensitive_data, 42 print_log, 43 setup_lava_proxy, 44) 45from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS 46 47# Initialize structural logging with a defaultdict, it can be changed for more 48# sophisticated dict-like data abstractions. 49STRUCTURAL_LOG = defaultdict(list) 50 51try: 52 from ci.structured_logger import StructuredLogger 53except ImportError as e: 54 print_log( 55 f"Could not import StructuredLogger library: {e}. " 56 "Falling back to defaultdict based structured logger." 57 ) 58 59# Timeout in seconds to decide if the device from the dispatched LAVA job has 60# hung or not due to the lack of new log output. 61DEVICE_HANGING_TIMEOUT_SEC = int(getenv("DEVICE_HANGING_TIMEOUT_SEC", 5*60)) 62 63# How many seconds the script should wait before try a new polling iteration to 64# check if the dispatched LAVA job is running or waiting in the job queue. 65WAIT_FOR_DEVICE_POLLING_TIME_SEC = int( 66 getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 1) 67) 68 69# How many seconds the script will wait to let LAVA finalize the job and give 70# the final details. 71WAIT_FOR_LAVA_POST_PROCESSING_SEC = int(getenv("LAVA_WAIT_LAVA_POST_PROCESSING_SEC", 5)) 72WAIT_FOR_LAVA_POST_PROCESSING_RETRIES = int( 73 getenv("LAVA_WAIT_LAVA_POST_PROCESSING_RETRIES", 6) 74) 75 76# How many seconds to wait between log output LAVA RPC calls. 77LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5)) 78 79# How many retries should be made when a timeout happen. 80NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int( 81 getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2) 82) 83 84 85def raise_exception_from_metadata(metadata: dict, job_id: int) -> None: 86 """ 87 Investigate infrastructure errors from the job metadata. 88 If it finds an error, raise it as MesaCIException. 89 """ 90 if "result" not in metadata or metadata["result"] != "fail": 91 return 92 if "error_type" in metadata: 93 error_type = metadata["error_type"] 94 if error_type == "Infrastructure": 95 raise MesaCIException( 96 f"LAVA job {job_id} failed with Infrastructure Error. Retry." 97 ) 98 if error_type == "Job": 99 # This happens when LAVA assumes that the job cannot terminate or 100 # with mal-formed job definitions. As we are always validating the 101 # jobs, only the former is probable to happen. E.g.: When some LAVA 102 # action timed out more times than expected in job definition. 103 raise MesaCIException( 104 f"LAVA job {job_id} failed with JobError " 105 "(possible LAVA timeout misconfiguration/bug). Retry." 106 ) 107 if "case" in metadata and metadata["case"] == "validate": 108 raise MesaCIException( 109 f"LAVA job {job_id} failed validation (possible download error). Retry." 110 ) 111 112 113def raise_lava_error(job) -> None: 114 # Look for infrastructure errors, raise them, and retry if we see them. 115 results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id) 116 results = lava_yaml.load(results_yaml) 117 for res in results: 118 metadata = res["metadata"] 119 raise_exception_from_metadata(metadata, job.job_id) 120 121 # If we reach this far, it means that the job ended without hwci script 122 # result and no LAVA infrastructure problem was found 123 job.status = "fail" 124 125 126def show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{CONSOLE_LOG['FG_GREEN']}"): 127 with GitlabSection( 128 "job_data", 129 "LAVA job info", 130 type=LogSectionType.LAVA_POST_PROCESSING, 131 start_collapsed=True, 132 colour=colour, 133 ): 134 wait_post_processing_retries: int = WAIT_FOR_LAVA_POST_PROCESSING_RETRIES 135 while not job.is_post_processed() and wait_post_processing_retries > 0: 136 # Wait a little until LAVA finishes processing metadata 137 time.sleep(WAIT_FOR_LAVA_POST_PROCESSING_SEC) 138 wait_post_processing_retries -= 1 139 140 if not job.is_post_processed(): 141 waited_for_sec: int = ( 142 WAIT_FOR_LAVA_POST_PROCESSING_RETRIES 143 * WAIT_FOR_LAVA_POST_PROCESSING_SEC 144 ) 145 print_log( 146 f"Waited for {waited_for_sec} seconds " 147 "for LAVA to post-process the job, it haven't finished yet. " 148 "Dumping it's info anyway" 149 ) 150 151 details: dict[str, str] = job.show() 152 for field, value in details.items(): 153 print(f"{field:<15}: {value}") 154 job.refresh_log() 155 156 157def fetch_logs(job, max_idle_time, log_follower) -> None: 158 is_job_hanging(job, max_idle_time) 159 160 time.sleep(LOG_POLLING_TIME_SEC) 161 new_log_lines = fetch_new_log_lines(job) 162 parsed_lines = parse_log_lines(job, log_follower, new_log_lines) 163 164 for line in parsed_lines: 165 print_log(line) 166 167 168def is_job_hanging(job, max_idle_time): 169 # Poll to check for new logs, assuming that a prolonged period of 170 # silence means that the device has died and we should try it again 171 if datetime.now() - job.last_log_time > max_idle_time: 172 max_idle_time_min = max_idle_time.total_seconds() / 60 173 174 raise MesaCITimeoutError( 175 f"{CONSOLE_LOG['BOLD']}" 176 f"{CONSOLE_LOG['FG_YELLOW']}" 177 f"LAVA job {job.job_id} does not respond for {max_idle_time_min} " 178 "minutes. Retry." 179 f"{CONSOLE_LOG['RESET']}", 180 timeout_duration=max_idle_time, 181 ) 182 183 184def parse_log_lines(job, log_follower, new_log_lines): 185 186 if log_follower.feed(new_log_lines): 187 # If we had non-empty log data, we can assure that the device is alive. 188 job.heartbeat() 189 parsed_lines = log_follower.flush() 190 191 # Only parse job results when the script reaches the end of the logs. 192 # Depending on how much payload the RPC scheduler.jobs.logs get, it may 193 # reach the LAVA_POST_PROCESSING phase. 194 if log_follower.current_section.type in ( 195 LogSectionType.TEST_CASE, 196 LogSectionType.LAVA_POST_PROCESSING, 197 ): 198 parsed_lines = job.parse_job_result_from_log(parsed_lines) 199 return parsed_lines 200 201 202def fetch_new_log_lines(job): 203 204 # The XMLRPC binary packet may be corrupted, causing a YAML scanner error. 205 # Retry the log fetching several times before exposing the error. 206 for _ in range(5): 207 with contextlib.suppress(MesaCIParseException): 208 new_log_lines = job.get_logs() 209 break 210 else: 211 raise MesaCIParseException 212 return new_log_lines 213 214 215def submit_job(job): 216 try: 217 job.submit() 218 except Exception as mesa_ci_err: 219 raise MesaCIException( 220 f"Could not submit LAVA job. Reason: {mesa_ci_err}" 221 ) from mesa_ci_err 222 223 224def wait_for_job_get_started(job): 225 print_log(f"Waiting for job {job.job_id} to start.") 226 while not job.is_started(): 227 time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC) 228 job.refresh_log() 229 print_log(f"Job {job.job_id} started.") 230 231 232def bootstrap_log_follower() -> LogFollower: 233 gl = GitlabSection( 234 id="lava_boot", 235 header="LAVA boot", 236 type=LogSectionType.LAVA_BOOT, 237 start_collapsed=True, 238 ) 239 print(gl.start()) 240 return LogFollower(starting_section=gl) 241 242 243def follow_job_execution(job, log_follower): 244 with log_follower: 245 max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC) 246 # Start to check job's health 247 job.heartbeat() 248 while not job.is_finished: 249 fetch_logs(job, max_idle_time, log_follower) 250 structural_log_phases(job, log_follower) 251 252 # Mesa Developers expect to have a simple pass/fail job result. 253 # If this does not happen, it probably means a LAVA infrastructure error 254 # happened. 255 if job.status not in ["pass", "fail"]: 256 raise_lava_error(job) 257 258 # LogFollower does some cleanup after the early exit (trigger by 259 # `hwci: pass|fail` regex), let's update the phases after the cleanup. 260 structural_log_phases(job, log_follower) 261 262 263def structural_log_phases(job, log_follower): 264 phases: dict[str, Any] = { 265 s.header.split(" - ")[0]: { 266 k: str(getattr(s, k)) for k in ("start_time", "end_time") 267 } 268 for s in log_follower.section_history 269 } 270 job.log["dut_job_phases"] = phases 271 272 273def print_job_final_status(job): 274 if job.status == "running": 275 job.status = "hung" 276 277 color = LAVAJob.COLOR_STATUS_MAP.get(job.status, CONSOLE_LOG["FG_RED"]) 278 print_log( 279 f"{color}" 280 f"LAVA Job finished with status: {job.status}" 281 f"{CONSOLE_LOG['RESET']}" 282 ) 283 284 job.refresh_log() 285 show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{color}") 286 287 288def execute_job_with_retries( 289 proxy, job_definition, retry_count, jobs_log 290) -> Optional[LAVAJob]: 291 last_failed_job = None 292 for attempt_no in range(1, retry_count + 2): 293 # Need to get the logger value from its object to enable autosave 294 # features, if AutoSaveDict is enabled from StructuredLogging module 295 jobs_log.append({}) 296 job_log = jobs_log[-1] 297 job = LAVAJob(proxy, job_definition, job_log) 298 STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no 299 try: 300 job_log["submitter_start_time"] = datetime.now().isoformat() 301 submit_job(job) 302 wait_for_job_get_started(job) 303 log_follower: LogFollower = bootstrap_log_follower() 304 follow_job_execution(job, log_follower) 305 return job 306 307 except (MesaCIException, KeyboardInterrupt) as exception: 308 job.handle_exception(exception) 309 310 finally: 311 print_job_final_status(job) 312 # If LAVA takes too long to post process the job, the submitter 313 # gives up and proceeds. 314 job_log["submitter_end_time"] = datetime.now().isoformat() 315 last_failed_job = job 316 print_log( 317 f"{CONSOLE_LOG['BOLD']}" 318 f"Finished executing LAVA job in the attempt #{attempt_no}" 319 f"{CONSOLE_LOG['RESET']}" 320 ) 321 322 return last_failed_job 323 324 325def retriable_follow_job(proxy, job_definition) -> LAVAJob: 326 number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION 327 328 last_attempted_job = execute_job_with_retries( 329 proxy, job_definition, number_of_retries, STRUCTURAL_LOG["dut_jobs"] 330 ) 331 332 if last_attempted_job.exception is not None: 333 # Infra failed in all attempts 334 raise MesaCIRetryError( 335 f"{CONSOLE_LOG['BOLD']}" 336 f"{CONSOLE_LOG['FG_RED']}" 337 "Job failed after it exceeded the number of " 338 f"{number_of_retries} retries." 339 f"{CONSOLE_LOG['RESET']}", 340 retry_count=number_of_retries, 341 last_job=last_attempted_job, 342 ) 343 344 return last_attempted_job 345 346 347@dataclass 348class PathResolver: 349 def __post_init__(self): 350 for field in fields(self): 351 value = getattr(self, field.name) 352 if not value: 353 continue 354 if field.type == pathlib.Path: 355 value = pathlib.Path(value) 356 setattr(self, field.name, value.resolve()) 357 358 359@dataclass 360class LAVAJobSubmitter(PathResolver): 361 boot_method: str 362 ci_project_dir: str 363 device_type: str 364 job_timeout_min: int # The job timeout in minutes 365 build_url: str = None 366 dtb_filename: str = None 367 dump_yaml: bool = False # Whether to dump the YAML payload to stdout 368 first_stage_init: str = None 369 jwt_file: pathlib.Path = None 370 kernel_image_name: str = None 371 kernel_image_type: str = "" 372 kernel_url_prefix: str = None 373 kernel_external: str = None 374 lava_tags: str = "" # Comma-separated LAVA tags for the job 375 mesa_job_name: str = "mesa_ci_job" 376 pipeline_info: str = "" 377 rootfs_url_prefix: str = None 378 validate_only: bool = False # Whether to only validate the job, not execute it 379 visibility_group: str = None # Only affects LAVA farm maintainers 380 job_rootfs_overlay_url: str = None 381 structured_log_file: pathlib.Path = None # Log file path with structured LAVA log 382 ssh_client_image: str = None # x86_64 SSH client image to follow the job's output 383 project_name: str = None # Project name to be used in the job name 384 __structured_log_context = contextlib.nullcontext() # Structured Logger context 385 386 def __post_init__(self) -> None: 387 super().__post_init__() 388 # Remove mesa job names with spaces, which breaks the lava-test-case command 389 self.mesa_job_name = self.mesa_job_name.split(" ")[0] 390 391 if not self.structured_log_file: 392 return 393 394 self.__structured_log_context = StructuredLoggerWrapper(self).logger_context() 395 self.proxy = setup_lava_proxy() 396 397 def __prepare_submission(self) -> str: 398 # Overwrite the timeout for the testcases with the value offered by the 399 # user. The testcase running time should be at least 4 times greater than 400 # the other sections (boot and setup), so we can safely ignore them. 401 # If LAVA fails to stop the job at this stage, it will fall back to the 402 # script section timeout with a reasonable delay. 403 GL_SECTION_TIMEOUTS[LogSectionType.TEST_CASE] = timedelta( 404 minutes=self.job_timeout_min 405 ) 406 407 job_definition = LAVAJobDefinition(self).generate_lava_job_definition() 408 409 if self.dump_yaml: 410 self.dump_job_definition(job_definition) 411 412 validation_job = LAVAJob(self.proxy, job_definition) 413 if errors := validation_job.validate(): 414 fatal_err(f"Error in LAVA job definition: {errors}") 415 print_log("LAVA job definition validated successfully") 416 417 return job_definition 418 419 @classmethod 420 def is_under_ci(cls): 421 ci_envvar: str = getenv("CI", "false") 422 return ci_envvar.lower() == "true" 423 424 def dump_job_definition(self, job_definition) -> None: 425 with GitlabSection( 426 "yaml_dump", 427 "LAVA job definition (YAML)", 428 type=LogSectionType.LAVA_BOOT, 429 start_collapsed=True, 430 ): 431 print(hide_sensitive_data(job_definition)) 432 433 def submit(self) -> None: 434 """ 435 Prepares and submits the LAVA job. 436 If `validate_only` is True, it validates the job without submitting it. 437 If the job finishes with a non-pass status or encounters an exception, 438 the program exits with a non-zero return code. 439 """ 440 job_definition: str = self.__prepare_submission() 441 442 if self.validate_only: 443 return 444 445 with self.__structured_log_context: 446 last_attempt_job = None 447 try: 448 last_attempt_job = retriable_follow_job(self.proxy, job_definition) 449 450 except MesaCIRetryError as retry_exception: 451 last_attempt_job = retry_exception.last_job 452 453 except Exception as exception: 454 STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception) 455 raise exception 456 457 finally: 458 self.finish_script(last_attempt_job) 459 460 def print_log_artifact_url(self): 461 relative_log_path = self.structured_log_file.relative_to(pathlib.Path.cwd()) 462 full_path = f"$ARTIFACTS_BASE_URL/{relative_log_path}" 463 artifact_url = path.expandvars(full_path) 464 465 print_log(f"Structural Logging data available at: {artifact_url}") 466 467 def finish_script(self, last_attempt_job): 468 if self.is_under_ci() and self.structured_log_file: 469 self.print_log_artifact_url() 470 471 if not last_attempt_job: 472 # No job was run, something bad happened 473 STRUCTURAL_LOG["job_combined_status"] = "script_crash" 474 current_exception = str(sys.exc_info()[0]) 475 STRUCTURAL_LOG["job_combined_fail_reason"] = current_exception 476 raise SystemExit(1) 477 478 STRUCTURAL_LOG["job_combined_status"] = last_attempt_job.status 479 480 if last_attempt_job.status != "pass": 481 raise SystemExit(1) 482 483 484class StructuredLoggerWrapper: 485 def __init__(self, submitter: LAVAJobSubmitter) -> None: 486 self.__submitter: LAVAJobSubmitter = submitter 487 488 def _init_logger(self): 489 STRUCTURAL_LOG["fixed_tags"] = self.__submitter.lava_tags 490 STRUCTURAL_LOG["dut_job_type"] = self.__submitter.device_type 491 STRUCTURAL_LOG["job_combined_fail_reason"] = None 492 STRUCTURAL_LOG["job_combined_status"] = "not_submitted" 493 STRUCTURAL_LOG["dut_attempt_counter"] = 0 494 495 # Initialize dut_jobs list to enable appends 496 STRUCTURAL_LOG["dut_jobs"] = [] 497 498 @contextlib.contextmanager 499 def _simple_logger_context(self): 500 log_file = pathlib.Path(self.__submitter.structured_log_file) 501 log_file.parent.mkdir(parents=True, exist_ok=True) 502 try: 503 # Truncate the file 504 log_file.write_text("") 505 yield 506 finally: 507 log_file.write_text(json.dumps(STRUCTURAL_LOG, indent=2)) 508 509 def logger_context(self): 510 context = contextlib.nullcontext() 511 try: 512 513 global STRUCTURAL_LOG 514 STRUCTURAL_LOG = StructuredLogger( 515 self.__submitter.structured_log_file, truncate=True 516 ).data 517 except NameError: 518 context = self._simple_logger_context() 519 520 self._init_logger() 521 return context 522 523 524if __name__ == "__main__": 525 # given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us -> 526 # GitLab runner -> GitLab primary -> user, safe to say we don't need any 527 # more buffering 528 sys.stdout.reconfigure(line_buffering=True) 529 sys.stderr.reconfigure(line_buffering=True) 530 # LAVA farm is giving datetime in UTC timezone, let's set it locally for the 531 # script run. 532 # Setting environ here will not affect the system time, as the os.environ 533 # lifetime follows the script one. 534 environ["TZ"] = "UTC" 535 time.tzset() 536 537 fire.Fire(LAVAJobSubmitter) 538