#!/usr/bin/python2 # # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Tool to validate code in prod branch before pushing to lab. The script runs push_to_prod suite to verify code in prod branch is ready to be pushed. Link to design document: https://docs.google.com/a/google.com/document/d/1JMz0xS3fZRSHMpFkkKAL_rxsdbNZomhHbC3B8L71uuI/edit To verify if prod branch can be pushed to lab, run following command in chromeos-staging-master2.hot server: /usr/local/autotest/site_utils/test_push.py -e someone@company.com The script uses latest gandof stable build as test build by default. """ from __future__ import absolute_import from __future__ import division from __future__ import print_function import argparse import ast import datetime import getpass import multiprocessing import os import re import subprocess import sys import time import traceback from six.moves import urllib import common try: from autotest_lib.frontend import setup_django_environment from autotest_lib.frontend.afe import models from autotest_lib.frontend.afe import rpc_utils except ImportError: # Unittest may not have Django database configured and will fail to import. pass from autotest_lib.client.common_lib import global_config from autotest_lib.client.common_lib import priorities from autotest_lib.client.common_lib.cros import retry from autotest_lib.frontend.afe import rpc_client_lib from autotest_lib.server import constants from autotest_lib.server import site_utils from autotest_lib.server import utils from autotest_lib.server.cros import provision from autotest_lib.server.cros.dynamic_suite import frontend_wrappers from autotest_lib.site_utils import test_push_common AUTOTEST_DIR=common.autotest_dir CONFIG = global_config.global_config AFE = frontend_wrappers.RetryingAFE(timeout_min=0.5, delay_sec=2) TKO = frontend_wrappers.RetryingTKO(timeout_min=0.1, delay_sec=10) MAIL_FROM = 'chromeos-test@google.com' BUILD_REGEX = 'R[\d]+-[\d]+\.[\d]+\.[\d]+' RUN_SUITE_COMMAND = 'run_suite.py' PUSH_TO_PROD_SUITE = 'push_to_prod' DUMMY_SUITE = 'dummy' DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB = 30 IMAGE_BUCKET = CONFIG.get_config_value('CROS', 'image_storage_server') DEFAULT_NUM_DUTS = ( ('gandof', 4), ('quawks', 2), ) SUITE_JOB_START_INFO_REGEX = ('^.*Created suite job:.*' 'tab_id=view_job&object_id=(\d+)$') URL_HOST = CONFIG.get_config_value('SERVER', 'hostname', type=str) URL_PATTERN = CONFIG.get_config_value('CROS', 'log_url_pattern', type=str) # Some test could be extra / missing or have mismatched results for various # reasons. Add such test in this list and explain the reason. _IGNORED_TESTS = [ # test_push uses a stable image build to test, which is quite behind ToT. # The following expectations are correct at ToT, but need to be ignored # until stable image is recent enough. # TODO(pprabhu): Remove once R70 is stable. 'dummy_Fail.RetrySuccess', 'dummy_Fail.RetryFail', ] # Multiprocessing proxy objects that are used to share data between background # suite-running processes and main process. The multiprocessing-compatible # versions are initialized in _main. _run_suite_output = [] _all_suite_ids = [] DEFAULT_SERVICE_RESPAWN_LIMIT = 2 class TestPushException(Exception): """Exception to be raised when the test to push to prod failed.""" pass @retry.retry(TestPushException, timeout_min=5, delay_sec=30) def check_dut_inventory(required_num_duts, pool): """Check DUT inventory for each board in the pool specified.. @param required_num_duts: a dict specifying the number of DUT each platform requires in order to finish push tests. @param pool: the pool used by test_push. @raise TestPushException: if number of DUTs are less than the requirement. """ print('Checking DUT inventory...') pool_label = constants.Labels.POOL_PREFIX + pool hosts = AFE.run('get_hosts', status='Ready', locked=False) hosts = [h for h in hosts if pool_label in h.get('labels', [])] platforms = [host['platform'] for host in hosts] current_inventory = {p : platforms.count(p) for p in platforms} error_msg = '' for platform, req_num in required_num_duts.items(): curr_num = current_inventory.get(platform, 0) if curr_num < req_num: error_msg += ('\nRequire %d %s DUTs in pool: %s, only %d are Ready' ' now' % (req_num, platform, pool, curr_num)) if error_msg: raise TestPushException('Not enough DUTs to run push tests. %s' % error_msg) def powerwash_dut_to_test_repair(hostname, timeout): """Powerwash dut to test repair workflow. @param hostname: hostname of the dut. @param timeout: seconds of the powerwash test to hit timeout. @raise TestPushException: if DUT fail to run the test. """ t = models.Test.objects.get(name='platform_Powerwash') c = utils.read_file(os.path.join(AUTOTEST_DIR, t.path)) job_id = rpc_utils.create_job_common( 'powerwash', priority=priorities.Priority.SUPER, control_type='Server', control_file=c, hosts=[hostname]) end = time.time() + timeout while not TKO.get_job_test_statuses_from_db(job_id): if time.time() >= end: AFE.run('abort_host_queue_entries', job=job_id) raise TestPushException( 'Powerwash test on %s timeout after %ds, abort it.' % (hostname, timeout)) time.sleep(10) verify_test_results(job_id, test_push_common.EXPECTED_TEST_RESULTS_POWERWASH) # Kick off verify, verify will fail and a repair should be triggered. AFE.reverify_hosts(hostnames=[hostname]) def reverify_all_push_duts(): """Reverify all the push DUTs.""" print('Reverifying all DUTs.') hosts = [h.hostname for h in AFE.get_hosts()] AFE.reverify_hosts(hostnames=hosts) def parse_arguments(argv): """Parse arguments for test_push tool. @param argv Argument vector, as for `sys.argv`, including the command name in `argv[0]`. @return: Parsed arguments. """ parser = argparse.ArgumentParser(prog=argv[0]) parser.add_argument('-b', '--board', dest='board', default='gandof', help='Default is gandof.') parser.add_argument('-sb', '--shard_board', dest='shard_board', default='quawks', help='Default is quawks.') parser.add_argument('-i', '--build', dest='build', default=None, help='Default is the latest stale build of given ' 'board. Must be a stable build, otherwise AU test ' 'will fail. (ex: gandolf-release/R54-8743.25.0)') parser.add_argument('-si', '--shard_build', dest='shard_build', default=None, help='Default is the latest stable build of given ' 'board. Must be a stable build, otherwise AU test ' 'will fail.') parser.add_argument('-p', '--pool', dest='pool', default='bvt') parser.add_argument('-t', '--timeout_min', dest='timeout_min', type=int, default=DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB, help='Time in mins to wait before abort the jobs we ' 'are waiting on. Only for the asynchronous suites ' 'triggered by create_and_return flag.') parser.add_argument('-ud', '--num_duts', dest='num_duts', default=dict(DEFAULT_NUM_DUTS), type=ast.literal_eval, help="Python dict literal that specifies the required" " number of DUTs for each board. E.g {'gandof':4}") parser.add_argument('-c', '--continue_on_failure', action='store_true', dest='continue_on_failure', help='All tests continue to run when there is failure') parser.add_argument('-sl', '--service_respawn_limit', type=int, default=DEFAULT_SERVICE_RESPAWN_LIMIT, help='If a service crashes more than this, the test ' 'push is considered failed.') arguments = parser.parse_args(argv[1:]) # Get latest stable build as default build. version_map = AFE.get_stable_version_map(AFE.CROS_IMAGE_TYPE) if not arguments.build: arguments.build = version_map.get_image_name(arguments.board) if not arguments.shard_build: arguments.shard_build = version_map.get_image_name( arguments.shard_board) return arguments def do_run_suite(suite_name, arguments, use_shard=False, create_and_return=False): """Call run_suite to run a suite job, and return the suite job id. The script waits the suite job to finish before returning the suite job id. Also it will echo the run_suite output to stdout. @param suite_name: Name of a suite, e.g., dummy. @param arguments: Arguments for run_suite command. @param use_shard: If true, suite is scheduled for shard board. @param create_and_return: If True, run_suite just creates the suite, print the job id, then finish immediately. @return: Suite job ID. """ if use_shard: board = arguments.shard_board build = arguments.shard_build else: board = arguments.board build = arguments.build # Remove cros-version label to force provision. hosts = AFE.get_hosts(label=constants.Labels.BOARD_PREFIX+board, locked=False) for host in hosts: labels_to_remove = [ l for l in host.labels if l.startswith(provision.CROS_VERSION_PREFIX)] if labels_to_remove: AFE.run('host_remove_labels', id=host.id, labels=labels_to_remove) # Test repair work flow on shards, powerwash test will timeout after 7m. if use_shard and not create_and_return: powerwash_dut_to_test_repair(host.hostname, timeout=420) current_dir = os.path.dirname(os.path.realpath(__file__)) cmd = [os.path.join(current_dir, RUN_SUITE_COMMAND), '-s', suite_name, '-b', board, '-i', build, '-p', arguments.pool, '--minimum_duts', str(arguments.num_duts[board])] if create_and_return: cmd += ['-c'] suite_job_id = None proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) while True: line = proc.stdout.readline() # Break when run_suite process completed. if not line and proc.poll() != None: break print(line.rstrip()) _run_suite_output.append(line.rstrip()) if not suite_job_id: m = re.match(SUITE_JOB_START_INFO_REGEX, line) if m and m.group(1): suite_job_id = int(m.group(1)) _all_suite_ids.append(suite_job_id) if not suite_job_id: raise TestPushException('Failed to retrieve suite job ID.') # If create_and_return specified, wait for the suite to finish. if create_and_return: end = time.time() + arguments.timeout_min * 60 while not AFE.get_jobs(id=suite_job_id, finished=True): if time.time() < end: time.sleep(10) else: AFE.run('abort_host_queue_entries', job=suite_job_id) raise TestPushException( 'Asynchronous suite triggered by create_and_return ' 'flag has timed out after %d mins. Aborting it.' % arguments.timeout_min) print('Suite job %s is completed.' % suite_job_id) return suite_job_id def check_dut_image(build, suite_job_id): """Confirm all DUTs used for the suite are imaged to expected build. @param build: Expected build to be imaged. @param suite_job_id: job ID of the suite job. @raise TestPushException: If a DUT does not have expected build imaged. """ print('Checking image installed in DUTs...') job_ids = [job.id for job in models.Job.objects.filter(parent_job_id=suite_job_id)] hqes = [models.HostQueueEntry.objects.filter(job_id=job_id)[0] for job_id in job_ids] hostnames = set([hqe.host.hostname for hqe in hqes]) for hostname in hostnames: found_build = site_utils.get_build_from_afe(hostname, AFE) if found_build != build: raise TestPushException('DUT is not imaged properly. Host %s has ' 'build %s, while build %s is expected.' % (hostname, found_build, build)) def test_suite(suite_name, expected_results, arguments, use_shard=False, create_and_return=False): """Call run_suite to start a suite job and verify results. @param suite_name: Name of a suite, e.g., dummy @param expected_results: A dictionary of test name to test result. @param arguments: Arguments for run_suite command. @param use_shard: If true, suite is scheduled for shard board. @param create_and_return: If True, run_suite just creates the suite, print the job id, then finish immediately. """ suite_job_id = do_run_suite(suite_name, arguments, use_shard, create_and_return) # Confirm all DUTs used for the suite are imaged to expected build. # hqe.host_id for jobs running in shard is not synced back to master db, # therefore, skip verifying dut build for jobs running in shard. build_expected = arguments.build if not use_shard: check_dut_image(build_expected, suite_job_id) # Verify test results are the expected results. verify_test_results(suite_job_id, expected_results) def verify_test_results(job_id, expected_results): """Verify the test results with the expected results. @param job_id: id of the running jobs. For suite job, it is suite_job_id. @param expected_results: A dictionary of test name to test result. @raise TestPushException: If verify fails. """ print('Comparing test results...') test_views = site_utils.get_test_views_from_tko(job_id, TKO) summary = test_push_common.summarize_push(test_views, expected_results, _IGNORED_TESTS) # Test link to log can be loaded. job_name = '%s-%s' % (job_id, getpass.getuser()) log_link = URL_PATTERN % (rpc_client_lib.add_protocol(URL_HOST), job_name) try: urllib.request.urlopen(log_link).read() except urllib.error.URLError: summary.append('Failed to load page for link to log: %s.' % log_link) if summary: raise TestPushException('\n'.join(summary)) def test_suite_wrapper(queue, suite_name, expected_results, arguments, use_shard=False, create_and_return=False): """Wrapper to call test_suite. Handle exception and pipe it to parent process. @param queue: Queue to save exception to be accessed by parent process. @param suite_name: Name of a suite, e.g., dummy @param expected_results: A dictionary of test name to test result. @param arguments: Arguments for run_suite command. @param use_shard: If true, suite is scheduled for shard board. @param create_and_return: If True, run_suite just creates the suite, print the job id, then finish immediately. """ try: test_suite(suite_name, expected_results, arguments, use_shard, create_and_return) except Exception: # Store the whole exc_info leads to a PicklingError. except_type, except_value, tb = sys.exc_info() queue.put((except_type, except_value, traceback.extract_tb(tb))) def check_queue(queue): """Check the queue for any exception being raised. @param queue: Queue used to store exception for parent process to access. @raise: Any exception found in the queue. """ if queue.empty(): return exc_info = queue.get() # Raise the exception with original backtrace. print('Original stack trace of the exception:\n%s' % exc_info[2]) raise exc_info[0](exc_info[1]) def _run_test_suites(arguments): """Run the actual tests that comprise the test_push.""" # Use daemon flag will kill child processes when parent process fails. use_daemon = not arguments.continue_on_failure queue = multiprocessing.Queue() push_to_prod_suite = multiprocessing.Process( target=test_suite_wrapper, args=(queue, PUSH_TO_PROD_SUITE, test_push_common.EXPECTED_TEST_RESULTS, arguments)) push_to_prod_suite.daemon = use_daemon push_to_prod_suite.start() # suite test with --create_and_return flag asynchronous_suite = multiprocessing.Process( target=test_suite_wrapper, args=(queue, DUMMY_SUITE, test_push_common.EXPECTED_TEST_RESULTS_DUMMY, arguments, True, True)) asynchronous_suite.daemon = True asynchronous_suite.start() while push_to_prod_suite.is_alive() or asynchronous_suite.is_alive(): check_queue(queue) time.sleep(5) check_queue(queue) push_to_prod_suite.join() asynchronous_suite.join() def check_service_crash(respawn_limit, start_time): """Check whether scheduler or host_scheduler crash during testing. Since the testing push is kicked off at the beginning of a given hour, the way to check whether a service is crashed is to check whether the times of the service being respawn during testing push is over the respawn_limit. @param respawn_limit: The maximum number of times the service is allowed to be respawn. @param start_time: The time that testing push is kicked off. """ def _parse(filename_prefix, filename): """Helper method to parse the time of the log. @param filename_prefix: The prefix of the filename. @param filename: The name of the log file. """ return datetime.datetime.strptime(filename[len(filename_prefix):], "%Y-%m-%d-%H.%M.%S") services = ['scheduler', 'host_scheduler'] logs = os.listdir('%s/logs/' % AUTOTEST_DIR) curr_time = datetime.datetime.now() error_msg = '' for service in services: log_prefix = '%s.log.' % service respawn_count = sum(1 for l in logs if l.startswith(log_prefix) and start_time <= _parse(log_prefix, l) <= curr_time) if respawn_count > respawn_limit: error_msg += ('%s has been respawned %s times during testing push at %s. ' 'It is very likely crashed. Please check!\n' % (service, respawn_count, start_time.strftime("%Y-%m-%d-%H"))) if error_msg: raise TestPushException(error_msg) _SUCCESS_MSG = """ All staging tests completed successfully. Instructions for pushing to prod are available at https://goto.google.com/autotest-to-prod """ def _main(arguments): """Run test and promote repo branches if tests succeed. @param arguments: command line arguments. """ # TODO Use chromite.lib.parallel.Manager instead, to workaround the # too-long-tmp-path problem. mpmanager = multiprocessing.Manager() # These are globals used by other functions in this module to communicate # back from worker processes. global _run_suite_output _run_suite_output = mpmanager.list() global _all_suite_ids _all_suite_ids = mpmanager.list() try: start_time = datetime.datetime.now() reverify_all_push_duts() time.sleep(15) # Wait for the verify test to start. check_dut_inventory(arguments.num_duts, arguments.pool) _run_test_suites(arguments) check_service_crash(arguments.service_respawn_limit, start_time) print(_SUCCESS_MSG) except Exception: # Abort running jobs unless flagged to continue when there is a failure. if not arguments.continue_on_failure: for suite_id in _all_suite_ids: if AFE.get_jobs(id=suite_id, finished=False): AFE.run('abort_host_queue_entries', job=suite_id) raise def main(): """Entry point.""" arguments = parse_arguments(sys.argv) _main(arguments) if __name__ == '__main__': main()