1#!/usr/bin/python2 2# 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Tool to validate code in prod branch before pushing to lab. 8 9The script runs push_to_prod suite to verify code in prod branch is ready to be 10pushed. Link to design document: 11https://docs.google.com/a/google.com/document/d/1JMz0xS3fZRSHMpFkkKAL_rxsdbNZomhHbC3B8L71uuI/edit 12 13To verify if prod branch can be pushed to lab, run following command in 14chromeos-staging-master2.hot server: 15/usr/local/autotest/site_utils/test_push.py -e someone@company.com 16 17The script uses latest gandof stable build as test build by default. 18 19""" 20 21from __future__ import absolute_import 22from __future__ import division 23from __future__ import print_function 24 25import argparse 26import ast 27import datetime 28import getpass 29import multiprocessing 30import os 31import re 32import subprocess 33import sys 34import time 35import traceback 36from six.moves import urllib 37 38import common 39try: 40 from autotest_lib.frontend import setup_django_environment 41 from autotest_lib.frontend.afe import models 42 from autotest_lib.frontend.afe import rpc_utils 43except ImportError: 44 # Unittest may not have Django database configured and will fail to import. 45 pass 46from autotest_lib.client.common_lib import global_config 47from autotest_lib.client.common_lib import priorities 48from autotest_lib.client.common_lib.cros import retry 49from autotest_lib.frontend.afe import rpc_client_lib 50from autotest_lib.server import constants 51from autotest_lib.server import site_utils 52from autotest_lib.server import utils 53from autotest_lib.server.cros import provision 54from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 55from autotest_lib.site_utils import test_push_common 56 57AUTOTEST_DIR=common.autotest_dir 58CONFIG = global_config.global_config 59 60AFE = frontend_wrappers.RetryingAFE(timeout_min=0.5, delay_sec=2) 61TKO = frontend_wrappers.RetryingTKO(timeout_min=0.1, delay_sec=10) 62 63MAIL_FROM = 'chromeos-test@google.com' 64BUILD_REGEX = 'R[\d]+-[\d]+\.[\d]+\.[\d]+' 65RUN_SUITE_COMMAND = 'run_suite.py' 66PUSH_TO_PROD_SUITE = 'push_to_prod' 67DUMMY_SUITE = 'dummy' 68DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB = 30 69IMAGE_BUCKET = CONFIG.get_config_value('CROS', 'image_storage_server') 70DEFAULT_NUM_DUTS = ( 71 ('gandof', 4), 72 ('quawks', 2), 73) 74 75SUITE_JOB_START_INFO_REGEX = ('^.*Created suite job:.*' 76 'tab_id=view_job&object_id=(\d+)$') 77 78URL_HOST = CONFIG.get_config_value('SERVER', 'hostname', type=str) 79URL_PATTERN = CONFIG.get_config_value('CROS', 'log_url_pattern', type=str) 80 81# Some test could be extra / missing or have mismatched results for various 82# reasons. Add such test in this list and explain the reason. 83_IGNORED_TESTS = [ 84 # test_push uses a stable image build to test, which is quite behind ToT. 85 # The following expectations are correct at ToT, but need to be ignored 86 # until stable image is recent enough. 87 88 # TODO(pprabhu): Remove once R70 is stable. 89 'dummy_Fail.RetrySuccess', 90 'dummy_Fail.RetryFail', 91] 92 93# Multiprocessing proxy objects that are used to share data between background 94# suite-running processes and main process. The multiprocessing-compatible 95# versions are initialized in _main. 96_run_suite_output = [] 97_all_suite_ids = [] 98 99DEFAULT_SERVICE_RESPAWN_LIMIT = 2 100 101 102class TestPushException(Exception): 103 """Exception to be raised when the test to push to prod failed.""" 104 pass 105 106@retry.retry(TestPushException, timeout_min=5, delay_sec=30) 107def check_dut_inventory(required_num_duts, pool): 108 """Check DUT inventory for each board in the pool specified.. 109 110 @param required_num_duts: a dict specifying the number of DUT each platform 111 requires in order to finish push tests. 112 @param pool: the pool used by test_push. 113 @raise TestPushException: if number of DUTs are less than the requirement. 114 """ 115 print('Checking DUT inventory...') 116 pool_label = constants.Labels.POOL_PREFIX + pool 117 hosts = AFE.run('get_hosts', status='Ready', locked=False) 118 hosts = [h for h in hosts if pool_label in h.get('labels', [])] 119 platforms = [host['platform'] for host in hosts] 120 current_inventory = {p : platforms.count(p) for p in platforms} 121 error_msg = '' 122 for platform, req_num in required_num_duts.items(): 123 curr_num = current_inventory.get(platform, 0) 124 if curr_num < req_num: 125 error_msg += ('\nRequire %d %s DUTs in pool: %s, only %d are Ready' 126 ' now' % (req_num, platform, pool, curr_num)) 127 if error_msg: 128 raise TestPushException('Not enough DUTs to run push tests. %s' % 129 error_msg) 130 131 132def powerwash_dut_to_test_repair(hostname, timeout): 133 """Powerwash dut to test repair workflow. 134 135 @param hostname: hostname of the dut. 136 @param timeout: seconds of the powerwash test to hit timeout. 137 @raise TestPushException: if DUT fail to run the test. 138 """ 139 t = models.Test.objects.get(name='platform_Powerwash') 140 c = utils.read_file(os.path.join(AUTOTEST_DIR, t.path)) 141 job_id = rpc_utils.create_job_common( 142 'powerwash', priority=priorities.Priority.SUPER, 143 control_type='Server', control_file=c, hosts=[hostname]) 144 145 end = time.time() + timeout 146 while not TKO.get_job_test_statuses_from_db(job_id): 147 if time.time() >= end: 148 AFE.run('abort_host_queue_entries', job=job_id) 149 raise TestPushException( 150 'Powerwash test on %s timeout after %ds, abort it.' % 151 (hostname, timeout)) 152 time.sleep(10) 153 verify_test_results(job_id, 154 test_push_common.EXPECTED_TEST_RESULTS_POWERWASH) 155 # Kick off verify, verify will fail and a repair should be triggered. 156 AFE.reverify_hosts(hostnames=[hostname]) 157 158 159def reverify_all_push_duts(): 160 """Reverify all the push DUTs.""" 161 print('Reverifying all DUTs.') 162 hosts = [h.hostname for h in AFE.get_hosts()] 163 AFE.reverify_hosts(hostnames=hosts) 164 165 166def parse_arguments(argv): 167 """Parse arguments for test_push tool. 168 169 @param argv Argument vector, as for `sys.argv`, including the 170 command name in `argv[0]`. 171 @return: Parsed arguments. 172 173 """ 174 parser = argparse.ArgumentParser(prog=argv[0]) 175 parser.add_argument('-b', '--board', dest='board', default='gandof', 176 help='Default is gandof.') 177 parser.add_argument('-sb', '--shard_board', dest='shard_board', 178 default='quawks', 179 help='Default is quawks.') 180 parser.add_argument('-i', '--build', dest='build', default=None, 181 help='Default is the latest stale build of given ' 182 'board. Must be a stable build, otherwise AU test ' 183 'will fail. (ex: gandolf-release/R54-8743.25.0)') 184 parser.add_argument('-si', '--shard_build', dest='shard_build', default=None, 185 help='Default is the latest stable build of given ' 186 'board. Must be a stable build, otherwise AU test ' 187 'will fail.') 188 parser.add_argument('-p', '--pool', dest='pool', default='bvt') 189 parser.add_argument('-t', '--timeout_min', dest='timeout_min', type=int, 190 default=DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB, 191 help='Time in mins to wait before abort the jobs we ' 192 'are waiting on. Only for the asynchronous suites ' 193 'triggered by create_and_return flag.') 194 parser.add_argument('-ud', '--num_duts', dest='num_duts', 195 default=dict(DEFAULT_NUM_DUTS), 196 type=ast.literal_eval, 197 help="Python dict literal that specifies the required" 198 " number of DUTs for each board. E.g {'gandof':4}") 199 parser.add_argument('-c', '--continue_on_failure', action='store_true', 200 dest='continue_on_failure', 201 help='All tests continue to run when there is failure') 202 parser.add_argument('-sl', '--service_respawn_limit', type=int, 203 default=DEFAULT_SERVICE_RESPAWN_LIMIT, 204 help='If a service crashes more than this, the test ' 205 'push is considered failed.') 206 207 arguments = parser.parse_args(argv[1:]) 208 209 # Get latest stable build as default build. 210 version_map = AFE.get_stable_version_map(AFE.CROS_IMAGE_TYPE) 211 if not arguments.build: 212 arguments.build = version_map.get_image_name(arguments.board) 213 if not arguments.shard_build: 214 arguments.shard_build = version_map.get_image_name( 215 arguments.shard_board) 216 return arguments 217 218 219def do_run_suite(suite_name, arguments, use_shard=False, 220 create_and_return=False): 221 """Call run_suite to run a suite job, and return the suite job id. 222 223 The script waits the suite job to finish before returning the suite job id. 224 Also it will echo the run_suite output to stdout. 225 226 @param suite_name: Name of a suite, e.g., dummy. 227 @param arguments: Arguments for run_suite command. 228 @param use_shard: If true, suite is scheduled for shard board. 229 @param create_and_return: If True, run_suite just creates the suite, print 230 the job id, then finish immediately. 231 232 @return: Suite job ID. 233 234 """ 235 if use_shard: 236 board = arguments.shard_board 237 build = arguments.shard_build 238 else: 239 board = arguments.board 240 build = arguments.build 241 242 # Remove cros-version label to force provision. 243 hosts = AFE.get_hosts(label=constants.Labels.BOARD_PREFIX+board, 244 locked=False) 245 for host in hosts: 246 labels_to_remove = [ 247 l for l in host.labels 248 if l.startswith(provision.CROS_VERSION_PREFIX)] 249 if labels_to_remove: 250 AFE.run('host_remove_labels', id=host.id, labels=labels_to_remove) 251 252 # Test repair work flow on shards, powerwash test will timeout after 7m. 253 if use_shard and not create_and_return: 254 powerwash_dut_to_test_repair(host.hostname, timeout=420) 255 256 current_dir = os.path.dirname(os.path.realpath(__file__)) 257 cmd = [os.path.join(current_dir, RUN_SUITE_COMMAND), 258 '-s', suite_name, 259 '-b', board, 260 '-i', build, 261 '-p', arguments.pool, 262 '--minimum_duts', str(arguments.num_duts[board])] 263 if create_and_return: 264 cmd += ['-c'] 265 266 suite_job_id = None 267 268 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, 269 stderr=subprocess.STDOUT) 270 271 while True: 272 line = proc.stdout.readline() 273 274 # Break when run_suite process completed. 275 if not line and proc.poll() != None: 276 break 277 print(line.rstrip()) 278 _run_suite_output.append(line.rstrip()) 279 280 if not suite_job_id: 281 m = re.match(SUITE_JOB_START_INFO_REGEX, line) 282 if m and m.group(1): 283 suite_job_id = int(m.group(1)) 284 _all_suite_ids.append(suite_job_id) 285 286 if not suite_job_id: 287 raise TestPushException('Failed to retrieve suite job ID.') 288 289 # If create_and_return specified, wait for the suite to finish. 290 if create_and_return: 291 end = time.time() + arguments.timeout_min * 60 292 while not AFE.get_jobs(id=suite_job_id, finished=True): 293 if time.time() < end: 294 time.sleep(10) 295 else: 296 AFE.run('abort_host_queue_entries', job=suite_job_id) 297 raise TestPushException( 298 'Asynchronous suite triggered by create_and_return ' 299 'flag has timed out after %d mins. Aborting it.' % 300 arguments.timeout_min) 301 302 print('Suite job %s is completed.' % suite_job_id) 303 return suite_job_id 304 305 306def check_dut_image(build, suite_job_id): 307 """Confirm all DUTs used for the suite are imaged to expected build. 308 309 @param build: Expected build to be imaged. 310 @param suite_job_id: job ID of the suite job. 311 @raise TestPushException: If a DUT does not have expected build imaged. 312 """ 313 print('Checking image installed in DUTs...') 314 job_ids = [job.id for job in 315 models.Job.objects.filter(parent_job_id=suite_job_id)] 316 hqes = [models.HostQueueEntry.objects.filter(job_id=job_id)[0] 317 for job_id in job_ids] 318 hostnames = set([hqe.host.hostname for hqe in hqes]) 319 for hostname in hostnames: 320 found_build = site_utils.get_build_from_afe(hostname, AFE) 321 if found_build != build: 322 raise TestPushException('DUT is not imaged properly. Host %s has ' 323 'build %s, while build %s is expected.' % 324 (hostname, found_build, build)) 325 326 327def test_suite(suite_name, expected_results, arguments, use_shard=False, 328 create_and_return=False): 329 """Call run_suite to start a suite job and verify results. 330 331 @param suite_name: Name of a suite, e.g., dummy 332 @param expected_results: A dictionary of test name to test result. 333 @param arguments: Arguments for run_suite command. 334 @param use_shard: If true, suite is scheduled for shard board. 335 @param create_and_return: If True, run_suite just creates the suite, print 336 the job id, then finish immediately. 337 """ 338 suite_job_id = do_run_suite(suite_name, arguments, use_shard, 339 create_and_return) 340 341 # Confirm all DUTs used for the suite are imaged to expected build. 342 # hqe.host_id for jobs running in shard is not synced back to master db, 343 # therefore, skip verifying dut build for jobs running in shard. 344 build_expected = arguments.build 345 if not use_shard: 346 check_dut_image(build_expected, suite_job_id) 347 348 # Verify test results are the expected results. 349 verify_test_results(suite_job_id, expected_results) 350 351 352def verify_test_results(job_id, expected_results): 353 """Verify the test results with the expected results. 354 355 @param job_id: id of the running jobs. For suite job, it is suite_job_id. 356 @param expected_results: A dictionary of test name to test result. 357 @raise TestPushException: If verify fails. 358 """ 359 print('Comparing test results...') 360 test_views = site_utils.get_test_views_from_tko(job_id, TKO) 361 summary = test_push_common.summarize_push(test_views, expected_results, 362 _IGNORED_TESTS) 363 364 # Test link to log can be loaded. 365 job_name = '%s-%s' % (job_id, getpass.getuser()) 366 log_link = URL_PATTERN % (rpc_client_lib.add_protocol(URL_HOST), job_name) 367 try: 368 urllib.request.urlopen(log_link).read() 369 except urllib.error.URLError: 370 summary.append('Failed to load page for link to log: %s.' % log_link) 371 372 if summary: 373 raise TestPushException('\n'.join(summary)) 374 375def test_suite_wrapper(queue, suite_name, expected_results, arguments, 376 use_shard=False, create_and_return=False): 377 """Wrapper to call test_suite. Handle exception and pipe it to parent 378 process. 379 380 @param queue: Queue to save exception to be accessed by parent process. 381 @param suite_name: Name of a suite, e.g., dummy 382 @param expected_results: A dictionary of test name to test result. 383 @param arguments: Arguments for run_suite command. 384 @param use_shard: If true, suite is scheduled for shard board. 385 @param create_and_return: If True, run_suite just creates the suite, print 386 the job id, then finish immediately. 387 """ 388 try: 389 test_suite(suite_name, expected_results, arguments, use_shard, 390 create_and_return) 391 except Exception: 392 # Store the whole exc_info leads to a PicklingError. 393 except_type, except_value, tb = sys.exc_info() 394 queue.put((except_type, except_value, traceback.extract_tb(tb))) 395 396 397def check_queue(queue): 398 """Check the queue for any exception being raised. 399 400 @param queue: Queue used to store exception for parent process to access. 401 @raise: Any exception found in the queue. 402 """ 403 if queue.empty(): 404 return 405 exc_info = queue.get() 406 # Raise the exception with original backtrace. 407 print('Original stack trace of the exception:\n%s' % exc_info[2]) 408 raise exc_info[0](exc_info[1]) 409 410 411def _run_test_suites(arguments): 412 """Run the actual tests that comprise the test_push.""" 413 # Use daemon flag will kill child processes when parent process fails. 414 use_daemon = not arguments.continue_on_failure 415 queue = multiprocessing.Queue() 416 417 push_to_prod_suite = multiprocessing.Process( 418 target=test_suite_wrapper, 419 args=(queue, PUSH_TO_PROD_SUITE, 420 test_push_common.EXPECTED_TEST_RESULTS, arguments)) 421 push_to_prod_suite.daemon = use_daemon 422 push_to_prod_suite.start() 423 424 # suite test with --create_and_return flag 425 asynchronous_suite = multiprocessing.Process( 426 target=test_suite_wrapper, 427 args=(queue, DUMMY_SUITE, 428 test_push_common.EXPECTED_TEST_RESULTS_DUMMY, 429 arguments, True, True)) 430 asynchronous_suite.daemon = True 431 asynchronous_suite.start() 432 433 while push_to_prod_suite.is_alive() or asynchronous_suite.is_alive(): 434 check_queue(queue) 435 time.sleep(5) 436 check_queue(queue) 437 push_to_prod_suite.join() 438 asynchronous_suite.join() 439 440 441def check_service_crash(respawn_limit, start_time): 442 """Check whether scheduler or host_scheduler crash during testing. 443 444 Since the testing push is kicked off at the beginning of a given hour, the way 445 to check whether a service is crashed is to check whether the times of the 446 service being respawn during testing push is over the respawn_limit. 447 448 @param respawn_limit: The maximum number of times the service is allowed to 449 be respawn. 450 @param start_time: The time that testing push is kicked off. 451 """ 452 def _parse(filename_prefix, filename): 453 """Helper method to parse the time of the log. 454 455 @param filename_prefix: The prefix of the filename. 456 @param filename: The name of the log file. 457 """ 458 return datetime.datetime.strptime(filename[len(filename_prefix):], 459 "%Y-%m-%d-%H.%M.%S") 460 461 services = ['scheduler', 'host_scheduler'] 462 logs = os.listdir('%s/logs/' % AUTOTEST_DIR) 463 curr_time = datetime.datetime.now() 464 465 error_msg = '' 466 for service in services: 467 log_prefix = '%s.log.' % service 468 respawn_count = sum(1 for l in logs if l.startswith(log_prefix) 469 and start_time <= _parse(log_prefix, l) <= curr_time) 470 471 if respawn_count > respawn_limit: 472 error_msg += ('%s has been respawned %s times during testing push at %s. ' 473 'It is very likely crashed. Please check!\n' % 474 (service, respawn_count, 475 start_time.strftime("%Y-%m-%d-%H"))) 476 if error_msg: 477 raise TestPushException(error_msg) 478 479 480_SUCCESS_MSG = """ 481All staging tests completed successfully. 482 483Instructions for pushing to prod are available at 484https://goto.google.com/autotest-to-prod 485""" 486 487 488def _main(arguments): 489 """Run test and promote repo branches if tests succeed. 490 491 @param arguments: command line arguments. 492 """ 493 494 # TODO Use chromite.lib.parallel.Manager instead, to workaround the 495 # too-long-tmp-path problem. 496 mpmanager = multiprocessing.Manager() 497 # These are globals used by other functions in this module to communicate 498 # back from worker processes. 499 global _run_suite_output 500 _run_suite_output = mpmanager.list() 501 global _all_suite_ids 502 _all_suite_ids = mpmanager.list() 503 504 try: 505 start_time = datetime.datetime.now() 506 reverify_all_push_duts() 507 time.sleep(15) # Wait for the verify test to start. 508 check_dut_inventory(arguments.num_duts, arguments.pool) 509 _run_test_suites(arguments) 510 check_service_crash(arguments.service_respawn_limit, start_time) 511 print(_SUCCESS_MSG) 512 except Exception: 513 # Abort running jobs unless flagged to continue when there is a failure. 514 if not arguments.continue_on_failure: 515 for suite_id in _all_suite_ids: 516 if AFE.get_jobs(id=suite_id, finished=False): 517 AFE.run('abort_host_queue_entries', job=suite_id) 518 raise 519 520 521def main(): 522 """Entry point.""" 523 arguments = parse_arguments(sys.argv) 524 _main(arguments) 525 526 527if __name__ == '__main__': 528 main() 529