1#!/usr/bin/python2 2# 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Tool to validate code in prod branch before pushing to lab. 8 9The script runs push_to_prod suite to verify code in prod branch is ready to be 10pushed. Link to design document: 11https://docs.google.com/a/google.com/document/d/1JMz0xS3fZRSHMpFkkKAL_rxsdbNZomhHbC3B8L71uuI/edit 12 13To verify if prod branch can be pushed to lab, run following command in 14chromeos-staging-master2.hot server: 15/usr/local/autotest/site_utils/test_push.py -e someone@company.com 16 17The script uses latest gandof stable build as test build by default. 18 19""" 20 21import argparse 22import ast 23import datetime 24import getpass 25import multiprocessing 26import os 27import re 28import subprocess 29import sys 30import time 31import traceback 32import urllib2 33 34import common 35try: 36 from autotest_lib.frontend import setup_django_environment 37 from autotest_lib.frontend.afe import models 38 from autotest_lib.frontend.afe import rpc_utils 39except ImportError: 40 # Unittest may not have Django database configured and will fail to import. 41 pass 42from autotest_lib.client.common_lib import global_config 43from autotest_lib.client.common_lib import priorities 44from autotest_lib.client.common_lib.cros import retry 45from autotest_lib.frontend.afe import rpc_client_lib 46from autotest_lib.server import constants 47from autotest_lib.server import site_utils 48from autotest_lib.server import utils 49from autotest_lib.server.cros import provision 50from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 51from autotest_lib.site_utils import test_push_common 52 53AUTOTEST_DIR=common.autotest_dir 54CONFIG = global_config.global_config 55 56AFE = frontend_wrappers.RetryingAFE(timeout_min=0.5, delay_sec=2) 57TKO = frontend_wrappers.RetryingTKO(timeout_min=0.1, delay_sec=10) 58 59MAIL_FROM = 'chromeos-test@google.com' 60BUILD_REGEX = 'R[\d]+-[\d]+\.[\d]+\.[\d]+' 61RUN_SUITE_COMMAND = 'run_suite.py' 62PUSH_TO_PROD_SUITE = 'push_to_prod' 63DUMMY_SUITE = 'dummy' 64DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB = 30 65IMAGE_BUCKET = CONFIG.get_config_value('CROS', 'image_storage_server') 66DEFAULT_NUM_DUTS = ( 67 ('gandof', 4), 68 ('quawks', 2), 69) 70 71SUITE_JOB_START_INFO_REGEX = ('^.*Created suite job:.*' 72 'tab_id=view_job&object_id=(\d+)$') 73 74URL_HOST = CONFIG.get_config_value('SERVER', 'hostname', type=str) 75URL_PATTERN = CONFIG.get_config_value('CROS', 'log_url_pattern', type=str) 76 77# Some test could be extra / missing or have mismatched results for various 78# reasons. Add such test in this list and explain the reason. 79_IGNORED_TESTS = [ 80 # test_push uses a stable image build to test, which is quite behind ToT. 81 # The following expectations are correct at ToT, but need to be ignored 82 # until stable image is recent enough. 83 84 # TODO(pprabhu): Remove once R70 is stable. 85 'dummy_Fail.RetrySuccess', 86 'dummy_Fail.RetryFail', 87] 88 89# Multiprocessing proxy objects that are used to share data between background 90# suite-running processes and main process. The multiprocessing-compatible 91# versions are initialized in _main. 92_run_suite_output = [] 93_all_suite_ids = [] 94 95DEFAULT_SERVICE_RESPAWN_LIMIT = 2 96 97 98class TestPushException(Exception): 99 """Exception to be raised when the test to push to prod failed.""" 100 pass 101 102@retry.retry(TestPushException, timeout_min=5, delay_sec=30) 103def check_dut_inventory(required_num_duts, pool): 104 """Check DUT inventory for each board in the pool specified.. 105 106 @param required_num_duts: a dict specifying the number of DUT each platform 107 requires in order to finish push tests. 108 @param pool: the pool used by test_push. 109 @raise TestPushException: if number of DUTs are less than the requirement. 110 """ 111 print 'Checking DUT inventory...' 112 pool_label = constants.Labels.POOL_PREFIX + pool 113 hosts = AFE.run('get_hosts', status='Ready', locked=False) 114 hosts = [h for h in hosts if pool_label in h.get('labels', [])] 115 platforms = [host['platform'] for host in hosts] 116 current_inventory = {p : platforms.count(p) for p in platforms} 117 error_msg = '' 118 for platform, req_num in required_num_duts.items(): 119 curr_num = current_inventory.get(platform, 0) 120 if curr_num < req_num: 121 error_msg += ('\nRequire %d %s DUTs in pool: %s, only %d are Ready' 122 ' now' % (req_num, platform, pool, curr_num)) 123 if error_msg: 124 raise TestPushException('Not enough DUTs to run push tests. %s' % 125 error_msg) 126 127 128def powerwash_dut_to_test_repair(hostname, timeout): 129 """Powerwash dut to test repair workflow. 130 131 @param hostname: hostname of the dut. 132 @param timeout: seconds of the powerwash test to hit timeout. 133 @raise TestPushException: if DUT fail to run the test. 134 """ 135 t = models.Test.objects.get(name='platform_Powerwash') 136 c = utils.read_file(os.path.join(AUTOTEST_DIR, t.path)) 137 job_id = rpc_utils.create_job_common( 138 'powerwash', priority=priorities.Priority.SUPER, 139 control_type='Server', control_file=c, hosts=[hostname]) 140 141 end = time.time() + timeout 142 while not TKO.get_job_test_statuses_from_db(job_id): 143 if time.time() >= end: 144 AFE.run('abort_host_queue_entries', job=job_id) 145 raise TestPushException( 146 'Powerwash test on %s timeout after %ds, abort it.' % 147 (hostname, timeout)) 148 time.sleep(10) 149 verify_test_results(job_id, 150 test_push_common.EXPECTED_TEST_RESULTS_POWERWASH) 151 # Kick off verify, verify will fail and a repair should be triggered. 152 AFE.reverify_hosts(hostnames=[hostname]) 153 154 155def reverify_all_push_duts(): 156 """Reverify all the push DUTs.""" 157 print 'Reverifying all DUTs.' 158 hosts = [h.hostname for h in AFE.get_hosts()] 159 AFE.reverify_hosts(hostnames=hosts) 160 161 162def parse_arguments(argv): 163 """Parse arguments for test_push tool. 164 165 @param argv Argument vector, as for `sys.argv`, including the 166 command name in `argv[0]`. 167 @return: Parsed arguments. 168 169 """ 170 parser = argparse.ArgumentParser(prog=argv[0]) 171 parser.add_argument('-b', '--board', dest='board', default='gandof', 172 help='Default is gandof.') 173 parser.add_argument('-sb', '--shard_board', dest='shard_board', 174 default='quawks', 175 help='Default is quawks.') 176 parser.add_argument('-i', '--build', dest='build', default=None, 177 help='Default is the latest stale build of given ' 178 'board. Must be a stable build, otherwise AU test ' 179 'will fail. (ex: gandolf-release/R54-8743.25.0)') 180 parser.add_argument('-si', '--shard_build', dest='shard_build', default=None, 181 help='Default is the latest stable build of given ' 182 'board. Must be a stable build, otherwise AU test ' 183 'will fail.') 184 parser.add_argument('-p', '--pool', dest='pool', default='bvt') 185 parser.add_argument('-t', '--timeout_min', dest='timeout_min', type=int, 186 default=DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB, 187 help='Time in mins to wait before abort the jobs we ' 188 'are waiting on. Only for the asynchronous suites ' 189 'triggered by create_and_return flag.') 190 parser.add_argument('-ud', '--num_duts', dest='num_duts', 191 default=dict(DEFAULT_NUM_DUTS), 192 type=ast.literal_eval, 193 help="Python dict literal that specifies the required" 194 " number of DUTs for each board. E.g {'gandof':4}") 195 parser.add_argument('-c', '--continue_on_failure', action='store_true', 196 dest='continue_on_failure', 197 help='All tests continue to run when there is failure') 198 parser.add_argument('-sl', '--service_respawn_limit', type=int, 199 default=DEFAULT_SERVICE_RESPAWN_LIMIT, 200 help='If a service crashes more than this, the test ' 201 'push is considered failed.') 202 203 arguments = parser.parse_args(argv[1:]) 204 205 # Get latest stable build as default build. 206 version_map = AFE.get_stable_version_map(AFE.CROS_IMAGE_TYPE) 207 if not arguments.build: 208 arguments.build = version_map.get_image_name(arguments.board) 209 if not arguments.shard_build: 210 arguments.shard_build = version_map.get_image_name( 211 arguments.shard_board) 212 return arguments 213 214 215def do_run_suite(suite_name, arguments, use_shard=False, 216 create_and_return=False): 217 """Call run_suite to run a suite job, and return the suite job id. 218 219 The script waits the suite job to finish before returning the suite job id. 220 Also it will echo the run_suite output to stdout. 221 222 @param suite_name: Name of a suite, e.g., dummy. 223 @param arguments: Arguments for run_suite command. 224 @param use_shard: If true, suite is scheduled for shard board. 225 @param create_and_return: If True, run_suite just creates the suite, print 226 the job id, then finish immediately. 227 228 @return: Suite job ID. 229 230 """ 231 if use_shard: 232 board = arguments.shard_board 233 build = arguments.shard_build 234 else: 235 board = arguments.board 236 build = arguments.build 237 238 # Remove cros-version label to force provision. 239 hosts = AFE.get_hosts(label=constants.Labels.BOARD_PREFIX+board, 240 locked=False) 241 for host in hosts: 242 labels_to_remove = [ 243 l for l in host.labels 244 if l.startswith(provision.CROS_VERSION_PREFIX)] 245 if labels_to_remove: 246 AFE.run('host_remove_labels', id=host.id, labels=labels_to_remove) 247 248 # Test repair work flow on shards, powerwash test will timeout after 7m. 249 if use_shard and not create_and_return: 250 powerwash_dut_to_test_repair(host.hostname, timeout=420) 251 252 current_dir = os.path.dirname(os.path.realpath(__file__)) 253 cmd = [os.path.join(current_dir, RUN_SUITE_COMMAND), 254 '-s', suite_name, 255 '-b', board, 256 '-i', build, 257 '-p', arguments.pool, 258 '--minimum_duts', str(arguments.num_duts[board])] 259 if create_and_return: 260 cmd += ['-c'] 261 262 suite_job_id = None 263 264 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, 265 stderr=subprocess.STDOUT) 266 267 while True: 268 line = proc.stdout.readline() 269 270 # Break when run_suite process completed. 271 if not line and proc.poll() != None: 272 break 273 print line.rstrip() 274 _run_suite_output.append(line.rstrip()) 275 276 if not suite_job_id: 277 m = re.match(SUITE_JOB_START_INFO_REGEX, line) 278 if m and m.group(1): 279 suite_job_id = int(m.group(1)) 280 _all_suite_ids.append(suite_job_id) 281 282 if not suite_job_id: 283 raise TestPushException('Failed to retrieve suite job ID.') 284 285 # If create_and_return specified, wait for the suite to finish. 286 if create_and_return: 287 end = time.time() + arguments.timeout_min * 60 288 while not AFE.get_jobs(id=suite_job_id, finished=True): 289 if time.time() < end: 290 time.sleep(10) 291 else: 292 AFE.run('abort_host_queue_entries', job=suite_job_id) 293 raise TestPushException( 294 'Asynchronous suite triggered by create_and_return ' 295 'flag has timed out after %d mins. Aborting it.' % 296 arguments.timeout_min) 297 298 print 'Suite job %s is completed.' % suite_job_id 299 return suite_job_id 300 301 302def check_dut_image(build, suite_job_id): 303 """Confirm all DUTs used for the suite are imaged to expected build. 304 305 @param build: Expected build to be imaged. 306 @param suite_job_id: job ID of the suite job. 307 @raise TestPushException: If a DUT does not have expected build imaged. 308 """ 309 print 'Checking image installed in DUTs...' 310 job_ids = [job.id for job in 311 models.Job.objects.filter(parent_job_id=suite_job_id)] 312 hqes = [models.HostQueueEntry.objects.filter(job_id=job_id)[0] 313 for job_id in job_ids] 314 hostnames = set([hqe.host.hostname for hqe in hqes]) 315 for hostname in hostnames: 316 found_build = site_utils.get_build_from_afe(hostname, AFE) 317 if found_build != build: 318 raise TestPushException('DUT is not imaged properly. Host %s has ' 319 'build %s, while build %s is expected.' % 320 (hostname, found_build, build)) 321 322 323def test_suite(suite_name, expected_results, arguments, use_shard=False, 324 create_and_return=False): 325 """Call run_suite to start a suite job and verify results. 326 327 @param suite_name: Name of a suite, e.g., dummy 328 @param expected_results: A dictionary of test name to test result. 329 @param arguments: Arguments for run_suite command. 330 @param use_shard: If true, suite is scheduled for shard board. 331 @param create_and_return: If True, run_suite just creates the suite, print 332 the job id, then finish immediately. 333 """ 334 suite_job_id = do_run_suite(suite_name, arguments, use_shard, 335 create_and_return) 336 337 # Confirm all DUTs used for the suite are imaged to expected build. 338 # hqe.host_id for jobs running in shard is not synced back to master db, 339 # therefore, skip verifying dut build for jobs running in shard. 340 build_expected = arguments.build 341 if not use_shard: 342 check_dut_image(build_expected, suite_job_id) 343 344 # Verify test results are the expected results. 345 verify_test_results(suite_job_id, expected_results) 346 347 348def verify_test_results(job_id, expected_results): 349 """Verify the test results with the expected results. 350 351 @param job_id: id of the running jobs. For suite job, it is suite_job_id. 352 @param expected_results: A dictionary of test name to test result. 353 @raise TestPushException: If verify fails. 354 """ 355 print 'Comparing test results...' 356 test_views = site_utils.get_test_views_from_tko(job_id, TKO) 357 summary = test_push_common.summarize_push(test_views, expected_results, 358 _IGNORED_TESTS) 359 360 # Test link to log can be loaded. 361 job_name = '%s-%s' % (job_id, getpass.getuser()) 362 log_link = URL_PATTERN % (rpc_client_lib.add_protocol(URL_HOST), job_name) 363 try: 364 urllib2.urlopen(log_link).read() 365 except urllib2.URLError: 366 summary.append('Failed to load page for link to log: %s.' % log_link) 367 368 if summary: 369 raise TestPushException('\n'.join(summary)) 370 371def test_suite_wrapper(queue, suite_name, expected_results, arguments, 372 use_shard=False, create_and_return=False): 373 """Wrapper to call test_suite. Handle exception and pipe it to parent 374 process. 375 376 @param queue: Queue to save exception to be accessed by parent process. 377 @param suite_name: Name of a suite, e.g., dummy 378 @param expected_results: A dictionary of test name to test result. 379 @param arguments: Arguments for run_suite command. 380 @param use_shard: If true, suite is scheduled for shard board. 381 @param create_and_return: If True, run_suite just creates the suite, print 382 the job id, then finish immediately. 383 """ 384 try: 385 test_suite(suite_name, expected_results, arguments, use_shard, 386 create_and_return) 387 except Exception: 388 # Store the whole exc_info leads to a PicklingError. 389 except_type, except_value, tb = sys.exc_info() 390 queue.put((except_type, except_value, traceback.extract_tb(tb))) 391 392 393def check_queue(queue): 394 """Check the queue for any exception being raised. 395 396 @param queue: Queue used to store exception for parent process to access. 397 @raise: Any exception found in the queue. 398 """ 399 if queue.empty(): 400 return 401 exc_info = queue.get() 402 # Raise the exception with original backtrace. 403 print 'Original stack trace of the exception:\n%s' % exc_info[2] 404 raise exc_info[0](exc_info[1]) 405 406 407def _run_test_suites(arguments): 408 """Run the actual tests that comprise the test_push.""" 409 # Use daemon flag will kill child processes when parent process fails. 410 use_daemon = not arguments.continue_on_failure 411 queue = multiprocessing.Queue() 412 413 push_to_prod_suite = multiprocessing.Process( 414 target=test_suite_wrapper, 415 args=(queue, PUSH_TO_PROD_SUITE, 416 test_push_common.EXPECTED_TEST_RESULTS, arguments)) 417 push_to_prod_suite.daemon = use_daemon 418 push_to_prod_suite.start() 419 420 # suite test with --create_and_return flag 421 asynchronous_suite = multiprocessing.Process( 422 target=test_suite_wrapper, 423 args=(queue, DUMMY_SUITE, 424 test_push_common.EXPECTED_TEST_RESULTS_DUMMY, 425 arguments, True, True)) 426 asynchronous_suite.daemon = True 427 asynchronous_suite.start() 428 429 while push_to_prod_suite.is_alive() or asynchronous_suite.is_alive(): 430 check_queue(queue) 431 time.sleep(5) 432 check_queue(queue) 433 push_to_prod_suite.join() 434 asynchronous_suite.join() 435 436 437def check_service_crash(respawn_limit, start_time): 438 """Check whether scheduler or host_scheduler crash during testing. 439 440 Since the testing push is kicked off at the beginning of a given hour, the way 441 to check whether a service is crashed is to check whether the times of the 442 service being respawn during testing push is over the respawn_limit. 443 444 @param respawn_limit: The maximum number of times the service is allowed to 445 be respawn. 446 @param start_time: The time that testing push is kicked off. 447 """ 448 def _parse(filename_prefix, filename): 449 """Helper method to parse the time of the log. 450 451 @param filename_prefix: The prefix of the filename. 452 @param filename: The name of the log file. 453 """ 454 return datetime.datetime.strptime(filename[len(filename_prefix):], 455 "%Y-%m-%d-%H.%M.%S") 456 457 services = ['scheduler', 'host_scheduler'] 458 logs = os.listdir('%s/logs/' % AUTOTEST_DIR) 459 curr_time = datetime.datetime.now() 460 461 error_msg = '' 462 for service in services: 463 log_prefix = '%s.log.' % service 464 respawn_count = sum(1 for l in logs if l.startswith(log_prefix) 465 and start_time <= _parse(log_prefix, l) <= curr_time) 466 467 if respawn_count > respawn_limit: 468 error_msg += ('%s has been respawned %s times during testing push at %s. ' 469 'It is very likely crashed. Please check!\n' % 470 (service, respawn_count, 471 start_time.strftime("%Y-%m-%d-%H"))) 472 if error_msg: 473 raise TestPushException(error_msg) 474 475 476_SUCCESS_MSG = """ 477All staging tests completed successfully. 478 479Instructions for pushing to prod are available at 480https://goto.google.com/autotest-to-prod 481""" 482 483 484def _main(arguments): 485 """Run test and promote repo branches if tests succeed. 486 487 @param arguments: command line arguments. 488 """ 489 490 # TODO Use chromite.lib.parallel.Manager instead, to workaround the 491 # too-long-tmp-path problem. 492 mpmanager = multiprocessing.Manager() 493 # These are globals used by other functions in this module to communicate 494 # back from worker processes. 495 global _run_suite_output 496 _run_suite_output = mpmanager.list() 497 global _all_suite_ids 498 _all_suite_ids = mpmanager.list() 499 500 try: 501 start_time = datetime.datetime.now() 502 reverify_all_push_duts() 503 time.sleep(15) # Wait for the verify test to start. 504 check_dut_inventory(arguments.num_duts, arguments.pool) 505 _run_test_suites(arguments) 506 check_service_crash(arguments.service_respawn_limit, start_time) 507 print _SUCCESS_MSG 508 except Exception: 509 # Abort running jobs unless flagged to continue when there is a failure. 510 if not arguments.continue_on_failure: 511 for suite_id in _all_suite_ids: 512 if AFE.get_jobs(id=suite_id, finished=False): 513 AFE.run('abort_host_queue_entries', job=suite_id) 514 raise 515 516 517def main(): 518 """Entry point.""" 519 arguments = parse_arguments(sys.argv) 520 _main(arguments) 521 522 523if __name__ == '__main__': 524 main() 525