• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python2
2#
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Tool to validate code in prod branch before pushing to lab.
8
9The script runs push_to_prod suite to verify code in prod branch is ready to be
10pushed. Link to design document:
11https://docs.google.com/a/google.com/document/d/1JMz0xS3fZRSHMpFkkKAL_rxsdbNZomhHbC3B8L71uuI/edit
12
13To verify if prod branch can be pushed to lab, run following command in
14chromeos-staging-master2.hot server:
15/usr/local/autotest/site_utils/test_push.py -e someone@company.com
16
17The script uses latest gandof stable build as test build by default.
18
19"""
20
21from __future__ import absolute_import
22from __future__ import division
23from __future__ import print_function
24
25import argparse
26import ast
27import datetime
28import getpass
29import multiprocessing
30import os
31import re
32import subprocess
33import sys
34import time
35import traceback
36from six.moves import urllib
37
38import common
39try:
40    from autotest_lib.frontend import setup_django_environment
41    from autotest_lib.frontend.afe import models
42    from autotest_lib.frontend.afe import rpc_utils
43except ImportError:
44    # Unittest may not have Django database configured and will fail to import.
45    pass
46from autotest_lib.client.common_lib import global_config
47from autotest_lib.client.common_lib import priorities
48from autotest_lib.client.common_lib.cros import retry
49from autotest_lib.frontend.afe import rpc_client_lib
50from autotest_lib.server import constants
51from autotest_lib.server import site_utils
52from autotest_lib.server import utils
53from autotest_lib.server.cros import provision
54from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
55from autotest_lib.site_utils import test_push_common
56
57AUTOTEST_DIR=common.autotest_dir
58CONFIG = global_config.global_config
59
60AFE = frontend_wrappers.RetryingAFE(timeout_min=0.5, delay_sec=2)
61TKO = frontend_wrappers.RetryingTKO(timeout_min=0.1, delay_sec=10)
62
63MAIL_FROM = 'chromeos-test@google.com'
64BUILD_REGEX = 'R[\d]+-[\d]+\.[\d]+\.[\d]+'
65RUN_SUITE_COMMAND = 'run_suite.py'
66PUSH_TO_PROD_SUITE = 'push_to_prod'
67DUMMY_SUITE = 'dummy'
68DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB = 30
69IMAGE_BUCKET = CONFIG.get_config_value('CROS', 'image_storage_server')
70DEFAULT_NUM_DUTS = (
71        ('gandof', 4),
72        ('quawks', 2),
73)
74
75SUITE_JOB_START_INFO_REGEX = ('^.*Created suite job:.*'
76                              'tab_id=view_job&object_id=(\d+)$')
77
78URL_HOST = CONFIG.get_config_value('SERVER', 'hostname', type=str)
79URL_PATTERN = CONFIG.get_config_value('CROS', 'log_url_pattern', type=str)
80
81# Some test could be extra / missing or have mismatched results for various
82# reasons. Add such test in this list and explain the reason.
83_IGNORED_TESTS = [
84    # test_push uses a stable image build to test, which is quite behind ToT.
85    # The following expectations are correct at ToT, but need to be ignored
86    # until stable image is recent enough.
87
88    # TODO(pprabhu): Remove once R70 is stable.
89    'dummy_Fail.RetrySuccess',
90    'dummy_Fail.RetryFail',
91]
92
93# Multiprocessing proxy objects that are used to share data between background
94# suite-running processes and main process. The multiprocessing-compatible
95# versions are initialized in _main.
96_run_suite_output = []
97_all_suite_ids = []
98
99DEFAULT_SERVICE_RESPAWN_LIMIT = 2
100
101
102class TestPushException(Exception):
103    """Exception to be raised when the test to push to prod failed."""
104    pass
105
106@retry.retry(TestPushException, timeout_min=5, delay_sec=30)
107def check_dut_inventory(required_num_duts, pool):
108    """Check DUT inventory for each board in the pool specified..
109
110    @param required_num_duts: a dict specifying the number of DUT each platform
111                              requires in order to finish push tests.
112    @param pool: the pool used by test_push.
113    @raise TestPushException: if number of DUTs are less than the requirement.
114    """
115    print('Checking DUT inventory...')
116    pool_label = constants.Labels.POOL_PREFIX + pool
117    hosts = AFE.run('get_hosts', status='Ready', locked=False)
118    hosts = [h for h in hosts if pool_label in h.get('labels', [])]
119    platforms = [host['platform'] for host in hosts]
120    current_inventory = {p : platforms.count(p) for p in platforms}
121    error_msg = ''
122    for platform, req_num in required_num_duts.items():
123        curr_num = current_inventory.get(platform, 0)
124        if curr_num < req_num:
125            error_msg += ('\nRequire %d %s DUTs in pool: %s, only %d are Ready'
126                          ' now' % (req_num, platform, pool, curr_num))
127    if error_msg:
128        raise TestPushException('Not enough DUTs to run push tests. %s' %
129                                error_msg)
130
131
132def powerwash_dut_to_test_repair(hostname, timeout):
133    """Powerwash dut to test repair workflow.
134
135    @param hostname: hostname of the dut.
136    @param timeout: seconds of the powerwash test to hit timeout.
137    @raise TestPushException: if DUT fail to run the test.
138    """
139    t = models.Test.objects.get(name='platform_Powerwash')
140    c = utils.read_file(os.path.join(AUTOTEST_DIR, t.path))
141    job_id = rpc_utils.create_job_common(
142             'powerwash', priority=priorities.Priority.SUPER,
143             control_type='Server', control_file=c, hosts=[hostname])
144
145    end = time.time() + timeout
146    while not TKO.get_job_test_statuses_from_db(job_id):
147        if time.time() >= end:
148            AFE.run('abort_host_queue_entries', job=job_id)
149            raise TestPushException(
150                'Powerwash test on %s timeout after %ds, abort it.' %
151                (hostname, timeout))
152        time.sleep(10)
153    verify_test_results(job_id,
154                        test_push_common.EXPECTED_TEST_RESULTS_POWERWASH)
155    # Kick off verify, verify will fail and a repair should be triggered.
156    AFE.reverify_hosts(hostnames=[hostname])
157
158
159def reverify_all_push_duts():
160    """Reverify all the push DUTs."""
161    print('Reverifying all DUTs.')
162    hosts = [h.hostname for h in AFE.get_hosts()]
163    AFE.reverify_hosts(hostnames=hosts)
164
165
166def parse_arguments(argv):
167    """Parse arguments for test_push tool.
168
169    @param argv   Argument vector, as for `sys.argv`, including the
170                  command name in `argv[0]`.
171    @return: Parsed arguments.
172
173    """
174    parser = argparse.ArgumentParser(prog=argv[0])
175    parser.add_argument('-b', '--board', dest='board', default='gandof',
176                        help='Default is gandof.')
177    parser.add_argument('-sb', '--shard_board', dest='shard_board',
178                        default='quawks',
179                        help='Default is quawks.')
180    parser.add_argument('-i', '--build', dest='build', default=None,
181                        help='Default is the latest stale build of given '
182                             'board. Must be a stable build, otherwise AU test '
183                             'will fail. (ex: gandolf-release/R54-8743.25.0)')
184    parser.add_argument('-si', '--shard_build', dest='shard_build', default=None,
185                        help='Default is the latest stable build of given '
186                             'board. Must be a stable build, otherwise AU test '
187                             'will fail.')
188    parser.add_argument('-p', '--pool', dest='pool', default='bvt')
189    parser.add_argument('-t', '--timeout_min', dest='timeout_min', type=int,
190                        default=DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB,
191                        help='Time in mins to wait before abort the jobs we '
192                             'are waiting on. Only for the asynchronous suites '
193                             'triggered by create_and_return flag.')
194    parser.add_argument('-ud', '--num_duts', dest='num_duts',
195                        default=dict(DEFAULT_NUM_DUTS),
196                        type=ast.literal_eval,
197                        help="Python dict literal that specifies the required"
198                        " number of DUTs for each board. E.g {'gandof':4}")
199    parser.add_argument('-c', '--continue_on_failure', action='store_true',
200                        dest='continue_on_failure',
201                        help='All tests continue to run when there is failure')
202    parser.add_argument('-sl', '--service_respawn_limit', type=int,
203                        default=DEFAULT_SERVICE_RESPAWN_LIMIT,
204                        help='If a service crashes more than this, the test '
205                             'push is considered failed.')
206
207    arguments = parser.parse_args(argv[1:])
208
209    # Get latest stable build as default build.
210    version_map = AFE.get_stable_version_map(AFE.CROS_IMAGE_TYPE)
211    if not arguments.build:
212        arguments.build = version_map.get_image_name(arguments.board)
213    if not arguments.shard_build:
214        arguments.shard_build = version_map.get_image_name(
215            arguments.shard_board)
216    return arguments
217
218
219def do_run_suite(suite_name, arguments, use_shard=False,
220                 create_and_return=False):
221    """Call run_suite to run a suite job, and return the suite job id.
222
223    The script waits the suite job to finish before returning the suite job id.
224    Also it will echo the run_suite output to stdout.
225
226    @param suite_name: Name of a suite, e.g., dummy.
227    @param arguments: Arguments for run_suite command.
228    @param use_shard: If true, suite is scheduled for shard board.
229    @param create_and_return: If True, run_suite just creates the suite, print
230                              the job id, then finish immediately.
231
232    @return: Suite job ID.
233
234    """
235    if use_shard:
236        board = arguments.shard_board
237        build = arguments.shard_build
238    else:
239        board = arguments.board
240        build = arguments.build
241
242    # Remove cros-version label to force provision.
243    hosts = AFE.get_hosts(label=constants.Labels.BOARD_PREFIX+board,
244                          locked=False)
245    for host in hosts:
246        labels_to_remove = [
247                l for l in host.labels
248                if l.startswith(provision.CROS_VERSION_PREFIX)]
249        if labels_to_remove:
250            AFE.run('host_remove_labels', id=host.id, labels=labels_to_remove)
251
252        # Test repair work flow on shards, powerwash test will timeout after 7m.
253        if use_shard and not create_and_return:
254            powerwash_dut_to_test_repair(host.hostname, timeout=420)
255
256    current_dir = os.path.dirname(os.path.realpath(__file__))
257    cmd = [os.path.join(current_dir, RUN_SUITE_COMMAND),
258           '-s', suite_name,
259           '-b', board,
260           '-i', build,
261           '-p', arguments.pool,
262           '--minimum_duts', str(arguments.num_duts[board])]
263    if create_and_return:
264        cmd += ['-c']
265
266    suite_job_id = None
267
268    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
269                            stderr=subprocess.STDOUT)
270
271    while True:
272        line = proc.stdout.readline()
273
274        # Break when run_suite process completed.
275        if not line and proc.poll() != None:
276            break
277        print(line.rstrip())
278        _run_suite_output.append(line.rstrip())
279
280        if not suite_job_id:
281            m = re.match(SUITE_JOB_START_INFO_REGEX, line)
282            if m and m.group(1):
283                suite_job_id = int(m.group(1))
284                _all_suite_ids.append(suite_job_id)
285
286    if not suite_job_id:
287        raise TestPushException('Failed to retrieve suite job ID.')
288
289    # If create_and_return specified, wait for the suite to finish.
290    if create_and_return:
291        end = time.time() + arguments.timeout_min * 60
292        while not AFE.get_jobs(id=suite_job_id, finished=True):
293            if time.time() < end:
294                time.sleep(10)
295            else:
296                AFE.run('abort_host_queue_entries', job=suite_job_id)
297                raise TestPushException(
298                        'Asynchronous suite triggered by create_and_return '
299                        'flag has timed out after %d mins. Aborting it.' %
300                        arguments.timeout_min)
301
302    print('Suite job %s is completed.' % suite_job_id)
303    return suite_job_id
304
305
306def check_dut_image(build, suite_job_id):
307    """Confirm all DUTs used for the suite are imaged to expected build.
308
309    @param build: Expected build to be imaged.
310    @param suite_job_id: job ID of the suite job.
311    @raise TestPushException: If a DUT does not have expected build imaged.
312    """
313    print('Checking image installed in DUTs...')
314    job_ids = [job.id for job in
315               models.Job.objects.filter(parent_job_id=suite_job_id)]
316    hqes = [models.HostQueueEntry.objects.filter(job_id=job_id)[0]
317            for job_id in job_ids]
318    hostnames = set([hqe.host.hostname for hqe in hqes])
319    for hostname in hostnames:
320        found_build = site_utils.get_build_from_afe(hostname, AFE)
321        if found_build != build:
322            raise TestPushException('DUT is not imaged properly. Host %s has '
323                                    'build %s, while build %s is expected.' %
324                                    (hostname, found_build, build))
325
326
327def test_suite(suite_name, expected_results, arguments, use_shard=False,
328               create_and_return=False):
329    """Call run_suite to start a suite job and verify results.
330
331    @param suite_name: Name of a suite, e.g., dummy
332    @param expected_results: A dictionary of test name to test result.
333    @param arguments: Arguments for run_suite command.
334    @param use_shard: If true, suite is scheduled for shard board.
335    @param create_and_return: If True, run_suite just creates the suite, print
336                              the job id, then finish immediately.
337    """
338    suite_job_id = do_run_suite(suite_name, arguments, use_shard,
339                                create_and_return)
340
341    # Confirm all DUTs used for the suite are imaged to expected build.
342    # hqe.host_id for jobs running in shard is not synced back to master db,
343    # therefore, skip verifying dut build for jobs running in shard.
344    build_expected = arguments.build
345    if not use_shard:
346        check_dut_image(build_expected, suite_job_id)
347
348    # Verify test results are the expected results.
349    verify_test_results(suite_job_id, expected_results)
350
351
352def verify_test_results(job_id, expected_results):
353    """Verify the test results with the expected results.
354
355    @param job_id: id of the running jobs. For suite job, it is suite_job_id.
356    @param expected_results: A dictionary of test name to test result.
357    @raise TestPushException: If verify fails.
358    """
359    print('Comparing test results...')
360    test_views = site_utils.get_test_views_from_tko(job_id, TKO)
361    summary = test_push_common.summarize_push(test_views, expected_results,
362                                              _IGNORED_TESTS)
363
364    # Test link to log can be loaded.
365    job_name = '%s-%s' % (job_id, getpass.getuser())
366    log_link = URL_PATTERN % (rpc_client_lib.add_protocol(URL_HOST), job_name)
367    try:
368        urllib.request.urlopen(log_link).read()
369    except urllib.error.URLError:
370        summary.append('Failed to load page for link to log: %s.' % log_link)
371
372    if summary:
373        raise TestPushException('\n'.join(summary))
374
375def test_suite_wrapper(queue, suite_name, expected_results, arguments,
376                       use_shard=False, create_and_return=False):
377    """Wrapper to call test_suite. Handle exception and pipe it to parent
378    process.
379
380    @param queue: Queue to save exception to be accessed by parent process.
381    @param suite_name: Name of a suite, e.g., dummy
382    @param expected_results: A dictionary of test name to test result.
383    @param arguments: Arguments for run_suite command.
384    @param use_shard: If true, suite is scheduled for shard board.
385    @param create_and_return: If True, run_suite just creates the suite, print
386                              the job id, then finish immediately.
387    """
388    try:
389        test_suite(suite_name, expected_results, arguments, use_shard,
390                   create_and_return)
391    except Exception:
392        # Store the whole exc_info leads to a PicklingError.
393        except_type, except_value, tb = sys.exc_info()
394        queue.put((except_type, except_value, traceback.extract_tb(tb)))
395
396
397def check_queue(queue):
398    """Check the queue for any exception being raised.
399
400    @param queue: Queue used to store exception for parent process to access.
401    @raise: Any exception found in the queue.
402    """
403    if queue.empty():
404        return
405    exc_info = queue.get()
406    # Raise the exception with original backtrace.
407    print('Original stack trace of the exception:\n%s' % exc_info[2])
408    raise exc_info[0](exc_info[1])
409
410
411def _run_test_suites(arguments):
412    """Run the actual tests that comprise the test_push."""
413    # Use daemon flag will kill child processes when parent process fails.
414    use_daemon = not arguments.continue_on_failure
415    queue = multiprocessing.Queue()
416
417    push_to_prod_suite = multiprocessing.Process(
418            target=test_suite_wrapper,
419            args=(queue, PUSH_TO_PROD_SUITE,
420                  test_push_common.EXPECTED_TEST_RESULTS, arguments))
421    push_to_prod_suite.daemon = use_daemon
422    push_to_prod_suite.start()
423
424    # suite test with --create_and_return flag
425    asynchronous_suite = multiprocessing.Process(
426            target=test_suite_wrapper,
427            args=(queue, DUMMY_SUITE,
428                  test_push_common.EXPECTED_TEST_RESULTS_DUMMY,
429                  arguments, True, True))
430    asynchronous_suite.daemon = True
431    asynchronous_suite.start()
432
433    while push_to_prod_suite.is_alive() or asynchronous_suite.is_alive():
434        check_queue(queue)
435        time.sleep(5)
436    check_queue(queue)
437    push_to_prod_suite.join()
438    asynchronous_suite.join()
439
440
441def check_service_crash(respawn_limit, start_time):
442  """Check whether scheduler or host_scheduler crash during testing.
443
444  Since the testing push is kicked off at the beginning of a given hour, the way
445  to check whether a service is crashed is to check whether the times of the
446  service being respawn during testing push is over the respawn_limit.
447
448  @param respawn_limit: The maximum number of times the service is allowed to
449                        be respawn.
450  @param start_time: The time that testing push is kicked off.
451  """
452  def _parse(filename_prefix, filename):
453    """Helper method to parse the time of the log.
454
455    @param filename_prefix: The prefix of the filename.
456    @param filename: The name of the log file.
457    """
458    return datetime.datetime.strptime(filename[len(filename_prefix):],
459                                      "%Y-%m-%d-%H.%M.%S")
460
461  services = ['scheduler', 'host_scheduler']
462  logs = os.listdir('%s/logs/' % AUTOTEST_DIR)
463  curr_time = datetime.datetime.now()
464
465  error_msg = ''
466  for service in services:
467    log_prefix = '%s.log.' % service
468    respawn_count = sum(1 for l in logs if l.startswith(log_prefix)
469                        and start_time <= _parse(log_prefix, l) <= curr_time)
470
471    if respawn_count > respawn_limit:
472      error_msg += ('%s has been respawned %s times during testing push at %s. '
473                    'It is very likely crashed. Please check!\n' %
474                    (service, respawn_count,
475                     start_time.strftime("%Y-%m-%d-%H")))
476  if error_msg:
477    raise TestPushException(error_msg)
478
479
480_SUCCESS_MSG = """
481All staging tests completed successfully.
482
483Instructions for pushing to prod are available at
484https://goto.google.com/autotest-to-prod
485"""
486
487
488def _main(arguments):
489    """Run test and promote repo branches if tests succeed.
490
491    @param arguments: command line arguments.
492    """
493
494    # TODO Use chromite.lib.parallel.Manager instead, to workaround the
495    # too-long-tmp-path problem.
496    mpmanager = multiprocessing.Manager()
497    # These are globals used by other functions in this module to communicate
498    # back from worker processes.
499    global _run_suite_output
500    _run_suite_output = mpmanager.list()
501    global _all_suite_ids
502    _all_suite_ids = mpmanager.list()
503
504    try:
505        start_time = datetime.datetime.now()
506        reverify_all_push_duts()
507        time.sleep(15) # Wait for the verify test to start.
508        check_dut_inventory(arguments.num_duts, arguments.pool)
509        _run_test_suites(arguments)
510        check_service_crash(arguments.service_respawn_limit, start_time)
511        print(_SUCCESS_MSG)
512    except Exception:
513        # Abort running jobs unless flagged to continue when there is a failure.
514        if not arguments.continue_on_failure:
515            for suite_id in _all_suite_ids:
516                if AFE.get_jobs(id=suite_id, finished=False):
517                    AFE.run('abort_host_queue_entries', job=suite_id)
518        raise
519
520
521def main():
522    """Entry point."""
523    arguments = parse_arguments(sys.argv)
524    _main(arguments)
525
526
527if __name__ == '__main__':
528    main()
529