1#!/usr/bin/env python3 2# 3# Copyright (C) 2020, 2021 Collabora Limited 4# Author: Gustavo Padovan <gustavo.padovan@collabora.com> 5# 6# Permission is hereby granted, free of charge, to any person obtaining a 7# copy of this software and associated documentation files (the "Software"), 8# to deal in the Software without restriction, including without limitation 9# the rights to use, copy, modify, merge, publish, distribute, sublicense, 10# and/or sell copies of the Software, and to permit persons to whom the 11# Software is furnished to do so, subject to the following conditions: 12# 13# The above copyright notice and this permission notice (including the next 14# paragraph) shall be included in all copies or substantial portions of the 15# Software. 16# 17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 20# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 23# SOFTWARE. 24 25"""Send a job to LAVA, track it and collect log back""" 26 27import argparse 28import lavacli 29import os 30import sys 31import time 32import traceback 33import urllib.parse 34import xmlrpc 35import yaml 36 37from datetime import datetime, timedelta 38from lavacli.utils import loader 39 40# Timeout in minutes to decide if the device from the dispatched LAVA job has 41# hung or not due to the lack of new log output. 42DEVICE_HANGING_TIMEOUT_MIN = 5 43 44# How many seconds the script should wait before try a new polling iteration to 45# check if the dispatched LAVA job is running or waiting in the job queue. 46WAIT_FOR_DEVICE_POLLING_TIME_SEC = 10 47 48# How many seconds to wait between log output LAVA RPC calls. 49LOG_POLLING_TIME_SEC = 5 50 51# How many retries should be made when a timeout happen. 52NUMBER_OF_RETRIES_TIMEOUT_DETECTION = 2 53 54 55def print_log(msg): 56 print("{}: {}".format(datetime.now(), msg)) 57 58def fatal_err(msg): 59 print_log(msg) 60 sys.exit(1) 61 62def generate_lava_yaml(args): 63 # General metadata and permissions, plus also inexplicably kernel arguments 64 values = { 65 'job_name': 'mesa: {}'.format(args.pipeline_info), 66 'device_type': args.device_type, 67 'visibility': { 'group': [ args.visibility_group ] }, 68 'priority': 75, 69 'context': { 70 'extra_nfsroot_args': ' init=/init rootwait minio_results={}'.format(args.job_artifacts_base) 71 }, 72 'timeouts': { 73 'job': { 74 'minutes': args.job_timeout 75 } 76 }, 77 } 78 79 if args.lava_tags: 80 values['tags'] = args.lava_tags.split(',') 81 82 # URLs to our kernel rootfs to boot from, both generated by the base 83 # container build 84 deploy = { 85 'timeout': { 'minutes': 10 }, 86 'to': 'tftp', 87 'os': 'oe', 88 'kernel': { 89 'url': '{}/{}'.format(args.base_system_url_prefix, args.kernel_image_name), 90 }, 91 'nfsrootfs': { 92 'url': '{}/lava-rootfs.tgz'.format(args.base_system_url_prefix), 93 'compression': 'gz', 94 } 95 } 96 if args.kernel_image_type: 97 deploy['kernel']['type'] = args.kernel_image_type 98 if args.dtb: 99 deploy['dtb'] = { 100 'url': '{}/{}.dtb'.format(args.base_system_url_prefix, args.dtb) 101 } 102 103 # always boot over NFS 104 boot = { 105 'timeout': { 'minutes': 25 }, 106 'method': args.boot_method, 107 'commands': 'nfs', 108 'prompts': ['lava-shell:'], 109 } 110 111 # skeleton test definition: only declaring each job as a single 'test' 112 # since LAVA's test parsing is not useful to us 113 test = { 114 'timeout': { 'minutes': args.job_timeout }, 115 'failure_retry': 1, 116 'definitions': [ { 117 'name': 'mesa', 118 'from': 'inline', 119 'path': 'inline/mesa.yaml', 120 'repository': { 121 'metadata': { 122 'name': 'mesa', 123 'description': 'Mesa test plan', 124 'os': [ 'oe' ], 125 'scope': [ 'functional' ], 126 'format': 'Lava-Test Test Definition 1.0', 127 }, 128 'parse': { 129 'pattern': r'hwci: (?P<test_case_id>\S*):\s+(?P<result>(pass|fail))' 130 }, 131 'run': { 132 }, 133 }, 134 } ], 135 } 136 137 # job execution script: 138 # - inline .gitlab-ci/common/init-stage1.sh 139 # - fetch and unpack per-pipeline build artifacts from build job 140 # - fetch and unpack per-job environment from lava-submit.sh 141 # - exec .gitlab-ci/common/init-stage2.sh 142 init_lines = [] 143 with open(args.first_stage_init, 'r') as init_sh: 144 init_lines += [ x.rstrip() for x in init_sh if not x.startswith('#') and x.rstrip() ] 145 init_lines += [ 146 'mkdir -p {}'.format(args.ci_project_dir), 147 'wget -S --progress=dot:giga -O- {} | tar -xz -C {}'.format(args.mesa_build_url, args.ci_project_dir), 148 'wget -S --progress=dot:giga -O- {} | tar -xz -C /'.format(args.job_rootfs_overlay_url), 149 'set +x', 150 'export CI_JOB_JWT="{}"'.format(args.jwt), 151 'set -x', 152 'exec /init-stage2.sh', 153 ] 154 test['definitions'][0]['repository']['run']['steps'] = init_lines 155 156 values['actions'] = [ 157 { 'deploy': deploy }, 158 { 'boot': boot }, 159 { 'test': test }, 160 ] 161 162 return yaml.dump(values, width=10000000) 163 164 165def setup_lava_proxy(): 166 config = lavacli.load_config("default") 167 uri, usr, tok = (config.get(key) for key in ("uri", "username", "token")) 168 uri_obj = urllib.parse.urlparse(uri) 169 uri_str = "{}://{}:{}@{}{}".format(uri_obj.scheme, usr, tok, uri_obj.netloc, uri_obj.path) 170 transport = lavacli.RequestsTransport( 171 uri_obj.scheme, 172 config.get("proxy"), 173 config.get("timeout", 120.0), 174 config.get("verify_ssl_cert", True), 175 ) 176 proxy = xmlrpc.client.ServerProxy( 177 uri_str, allow_none=True, transport=transport) 178 179 print_log("Proxy for {} created.".format(config['uri'])) 180 181 return proxy 182 183 184def _call_proxy(fn, *args): 185 retries = 60 186 for n in range(1, retries + 1): 187 try: 188 return fn(*args) 189 except xmlrpc.client.ProtocolError as err: 190 if n == retries: 191 traceback.print_exc() 192 fatal_err("A protocol error occurred (Err {} {})".format(err.errcode, err.errmsg)) 193 else: 194 time.sleep(15) 195 pass 196 except xmlrpc.client.Fault as err: 197 traceback.print_exc() 198 fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode)) 199 200 201def get_job_results(proxy, job_id, test_suite, test_case): 202 # Look for infrastructure errors and retry if we see them. 203 results_yaml = _call_proxy(proxy.results.get_testjob_results_yaml, job_id) 204 results = yaml.load(results_yaml, Loader=loader(False)) 205 for res in results: 206 metadata = res['metadata'] 207 if not 'result' in metadata or metadata['result'] != 'fail': 208 continue 209 if 'error_type' in metadata and metadata['error_type'] == "Infrastructure": 210 print_log("LAVA job {} failed with Infrastructure Error. Retry.".format(job_id)) 211 return False 212 if 'case' in metadata and metadata['case'] == "validate": 213 print_log("LAVA job {} failed validation (possible download error). Retry.".format(job_id)) 214 return False 215 216 results_yaml = _call_proxy(proxy.results.get_testcase_results_yaml, job_id, test_suite, test_case) 217 results = yaml.load(results_yaml, Loader=loader(False)) 218 if not results: 219 fatal_err("LAVA: no result for test_suite '{}', test_case '{}'".format(test_suite, test_case)) 220 221 print_log("LAVA: result for test_suite '{}', test_case '{}': {}".format(test_suite, test_case, results[0]['result'])) 222 if results[0]['result'] != 'pass': 223 fatal_err("FAIL") 224 225 return True 226 227def wait_until_job_is_started(proxy, job_id): 228 print_log(f"Waiting for job {job_id} to start.") 229 current_state = "Submitted" 230 waiting_states = ["Submitted", "Scheduling", "Scheduled"] 231 while current_state in waiting_states: 232 job_state = _call_proxy(proxy.scheduler.job_state, job_id) 233 current_state = job_state["job_state"] 234 235 time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC) 236 print_log(f"Job {job_id} started.") 237 238def follow_job_execution(proxy, job_id): 239 line_count = 0 240 finished = False 241 last_time_logs = datetime.now() 242 while not finished: 243 (finished, data) = _call_proxy(proxy.scheduler.jobs.logs, job_id, line_count) 244 logs = yaml.load(str(data), Loader=loader(False)) 245 if logs: 246 # Reset the timeout 247 last_time_logs = datetime.now() 248 for line in logs: 249 print("{} {}".format(line["dt"], line["msg"])) 250 251 line_count += len(logs) 252 253 else: 254 time_limit = timedelta(minutes=DEVICE_HANGING_TIMEOUT_MIN) 255 if datetime.now() - last_time_logs > time_limit: 256 print_log("LAVA job {} doesn't advance (machine got hung?). Retry.".format(job_id)) 257 return False 258 259 # `proxy.scheduler.jobs.logs` does not block, even when there is no 260 # new log to be fetched. To avoid dosing the LAVA dispatcher 261 # machine, let's add a sleep to save them some stamina. 262 time.sleep(LOG_POLLING_TIME_SEC) 263 264 return True 265 266def show_job_data(proxy, job_id): 267 show = _call_proxy(proxy.scheduler.jobs.show, job_id) 268 for field, value in show.items(): 269 print("{}\t: {}".format(field, value)) 270 271 272def validate_job(proxy, job_file): 273 try: 274 return _call_proxy(proxy.scheduler.jobs.validate, job_file, True) 275 except: 276 return False 277 278def submit_job(proxy, job_file): 279 return _call_proxy(proxy.scheduler.jobs.submit, job_file) 280 281 282def main(args): 283 proxy = setup_lava_proxy() 284 285 yaml_file = generate_lava_yaml(args) 286 287 if args.dump_yaml: 288 censored_args = args 289 censored_args.jwt = "jwt-hidden" 290 print(generate_lava_yaml(censored_args)) 291 292 if args.validate_only: 293 ret = validate_job(proxy, yaml_file) 294 if not ret: 295 fatal_err("Error in LAVA job definition") 296 print("LAVA job definition validated successfully") 297 return 298 299 retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION 300 301 while retry_count >= 0: 302 job_id = submit_job(proxy, yaml_file) 303 304 print_log("LAVA job id: {}".format(job_id)) 305 306 wait_until_job_is_started(proxy, job_id) 307 308 if not follow_job_execution(proxy, job_id): 309 print_log(f"Job {job_id} has timed out. Cancelling it.") 310 # Cancel the job as it is considered unreachable by Mesa CI. 311 proxy.scheduler.jobs.cancel(job_id) 312 313 retry_count -= 1 314 continue 315 316 show_job_data(proxy, job_id) 317 318 if get_job_results(proxy, job_id, "0_mesa", "mesa") == True: 319 break 320 321 322if __name__ == '__main__': 323 # given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us -> 324 # GitLab runner -> GitLab primary -> user, safe to say we don't need any 325 # more buffering 326 sys.stdout.reconfigure(line_buffering=True) 327 sys.stderr.reconfigure(line_buffering=True) 328 parser = argparse.ArgumentParser("LAVA job submitter") 329 330 parser.add_argument("--pipeline-info") 331 parser.add_argument("--base-system-url-prefix") 332 parser.add_argument("--mesa-build-url") 333 parser.add_argument("--job-rootfs-overlay-url") 334 parser.add_argument("--job-artifacts-base") 335 parser.add_argument("--job-timeout", type=int) 336 parser.add_argument("--first-stage-init") 337 parser.add_argument("--ci-project-dir") 338 parser.add_argument("--device-type") 339 parser.add_argument("--dtb", nargs='?', default="") 340 parser.add_argument("--kernel-image-name") 341 parser.add_argument("--kernel-image-type", nargs='?', default="") 342 parser.add_argument("--boot-method") 343 parser.add_argument("--lava-tags", nargs='?', default="") 344 parser.add_argument("--jwt") 345 parser.add_argument("--validate-only", action='store_true') 346 parser.add_argument("--dump-yaml", action='store_true') 347 parser.add_argument("--visibility-group") 348 349 parser.set_defaults(func=main) 350 args = parser.parse_args() 351 args.func(args) 352