#!/usr/bin/python from __future__ import print_function import argparse import logging import multiprocessing import os import subprocess import sys import time import common from autotest_lib.server import frontend from autotest_lib.site_utils.lib import infra DEPLOY_SERVER_LOCAL = ('/usr/local/autotest/site_utils/deploy_server_local.py') POOL_SIZE = 124 PUSH_ORDER = {'database': 0, 'database_slave': 0, 'drone': 1, 'shard': 1, 'golo_proxy': 1, 'sentinel': 1, 'afe': 2, 'scheduler': 2, 'host_scheduler': 2, 'suite_scheduler': 2} def discover_servers(afe, server_filter=set()): """Discover the in-production servers to update. @param afe: Server to contact with RPC requests. @param server_filter: A set of servers to get status for. @returns: A list of a list of tuple of (server_name, server_status, roles). The list is sorted by the order to be updated. Servers in the same sublist can be pushed together. """ # Example server details.... # { # 'hostname': 'server1', # 'status': 'backup', # 'roles': ['drone', 'scheduler'], # 'attributes': {'max_processes': 300} # } rpc = frontend.AFE(server=afe) servers = rpc.run('get_servers') # Do not update servers that need repair, and filter the server list by # given server_filter if needed. servers = [s for s in servers if (s['status'] != 'repair_required' and (not server_filter or s['hostname'] in server_filter))] # Do not update reserve, devserver or crash_server (not YET supported). servers = [s for s in servers if 'devserver' not in s['roles'] and 'crash_server' not in s['roles'] and 'reserve' not in s['roles']] sorted_servers = [] for i in range(max(PUSH_ORDER.values()) + 1): sorted_servers.append([]) servers_with_unknown_order = [] for server in servers: info = (server['hostname'], server['status'], server['roles']) try: order = min([PUSH_ORDER[r] for r in server['roles'] if r in PUSH_ORDER]) sorted_servers[order].append(info) except ValueError: # All roles are not indexed in PUSH_ORDER. servers_with_unknown_order.append(info) # Push all servers with unknown roles together. if servers_with_unknown_order: sorted_servers.append(servers_with_unknown_order) found_servers = set([s['hostname'] for s in servers]) # Inject the servers passed in by user but not found in server database. extra_servers = [] for server in server_filter - found_servers: extra_servers.append((server, 'unknown', ['unknown'])) if extra_servers: sorted_servers.append(extra_servers) return sorted_servers def parse_arguments(args): """Parse command line arguments. @param args: The command line arguments to parse. (usually sys.argv[1:]) @returns An argparse.Namespace populated with argument values. """ parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description='Command to update an entire autotest installation.', epilog=('Update all servers:\n' ' deploy_server.py\n' '\n' 'Update one server:\n' ' deploy_server.py \n' '\n' 'Send arguments to remote deploy_server_local.py:\n' ' deploy_server.py -- --dryrun\n' '\n' 'See what arguments would be run on specified servers:\n' ' deploy_server.py --dryrun --' ' --skip-update\n')) parser.add_argument('-v', '--verbose', action='store_true', dest='verbose', help='Log all deploy script output.') parser.add_argument('--continue', action='store_true', dest='cont', help='Continue to the next server on failure.') parser.add_argument('--afe', required=True, help='What is the main server for this installation? (cautotest).') parser.add_argument('--update_push_servers', action='store_true', help='Indicate to update test_push servers.') parser.add_argument('--force_update', action='store_true', help='Force to run update commands for afe, tko, build_externals') parser.add_argument('--dryrun', action='store_true', help='Don\'t actually run remote commands.') parser.add_argument('--logfile', action='store', default='/tmp/deployment.log', help='Path to the file to save the deployment log to. Default is ' '/tmp/deployment.log') parser.add_argument('args', nargs=argparse.REMAINDER, help=(', ... -- , ...')) results = parser.parse_args(args) # We take the args list and further split it down. Everything before -- # is a server name, and everything after it is an argument to pass along # to deploy_server_local.py. # # This: # server_a, server_b -- --dryrun --skip-report # # Becomes: # args.servers['server_a', 'server_b'] # args.args['--dryrun', '--skip-report'] try: local_args_index = results.args.index('--') + 1 except ValueError: # If -- isn't present, they are all servers. results.servers = results.args results.args = [] else: # Split arguments. results.servers = results.args[:local_args_index-1] results.args = results.args[local_args_index:] return results def update_server(inputs): """Deploy for given server. @param inputs: Inputs for the update action, including: server: Name of the server to update. status: Status of the server. options: Options for the update. @return: A tuple of (server, success, output), where: server: Name of the server to be updated. sucess: True if update succeeds, False otherwise. output: A string of the deploy_server_local script output including any errors. """ start = time.time() server = inputs['server'] status = inputs['status'] # Shared list to record the finished server. finished_servers = inputs['finished_servers'] options = inputs['options'] print('Updating server %s...' % server) if status == 'backup': extra_args = ['--skip-service-status'] else: extra_args = [] cmd = ('%s %s' % (DEPLOY_SERVER_LOCAL, ' '.join(options.args + extra_args))) output = '%s: %s' % (server, cmd) success = True if not options.dryrun: for i in range(5): try: print('[%s/5] Try to update server %s' % (i, server)) output = infra.execute_command(server, cmd) finished_servers.append(server) break except subprocess.CalledProcessError as e: print('%s: Command failed with error: %s' % (server, e)) success = False output = e.output print('Time used to update server %s: %s' % (server, time.time()-start)) return server, success, output def update_in_parallel(servers, options): """Update a group of servers in parallel. @param servers: A list of tuple of (server_name, server_status, roles). @param options: Options for the push. @returns A list of servers that failed to update. """ # Create a list to record all the finished servers. manager = multiprocessing.Manager() finished_servers = manager.list() args = [] for server, status, _ in servers: args.append({'server': server, 'status': status, 'finished_servers': finished_servers, 'options': options}) # The update actions run in parallel. If any update failed, we should wait # for other running updates being finished. Abort in the middle of an update # may leave the server in a bad state. pool = multiprocessing.pool.ThreadPool(POOL_SIZE) try: failed_servers = [] results = pool.map_async(update_server, args) pool.close() # Track the updating progress for current group of servers. incomplete_servers = set() server_names = set([s[0] for s in servers]) while not results.ready(): incomplete_servers = server_names - set(finished_servers) print('Not finished yet. %d servers in this group. ' '%d servers are still running:\n%s\n' % (len(servers), len(incomplete_servers), incomplete_servers)) # Check the progress every 1 mins results.wait(60) # After update finished, parse the result. for server, success, output in results.get(): if options.dryrun: print('Dry run, updating server %s is skipped.' % server) else: if success: msg = ('Successfully updated server %s.\n' % server) if options.verbose: print(output) print() else: msg = ('Failed to update server %s.\nError: %s' % (server, output.strip())) print(msg) failed_servers.append(server) # Write the result into logfile. with open(options.logfile, 'a') as f: f.write(msg) finally: pool.terminate() pool.join() return failed_servers def main(args): """Main routine that drives all the real work. @param args: The command line arguments to parse. (usually sys.argv) @returns The system exit code. """ options = parse_arguments(args[1:]) # Remove all the handlers from the root logger to get rid of the handlers # introduced by the import packages. logging.getLogger().handlers = [] logging.basicConfig(level=logging.DEBUG if options.verbose else logging.INFO) print('Retrieving server status...') sorted_servers = discover_servers(options.afe, set(options.servers or [])) # Display what we plan to update. print('Will update (in this order):') i = 1 for servers in sorted_servers: print('%s Group %d (%d servers) %s' % ('='*30, i, len(servers), '='*30)) for server, status, roles in servers: print('\t%-36s:\t%s\t%s' % (server, status, roles)) i += 1 print() if os.path.exists(options.logfile): os.remove(options.logfile) print ('Start updating, push logs of every server will be saved ' 'at %s' % options.logfile) failed = [] skipped = [] for servers in sorted_servers: if not failed or options.cont: failed += update_in_parallel(servers, options) else: skipped.extend(s[0] for s in servers) # Only include server name. if failed: print('Errors updating:') for server in failed: print(' %s' % server) print() print('To retry:') print(' %s %s' % (str(args[0]), str(' '.join(failed + skipped)))) # Exit with error. return 1 if __name__ == '__main__': sys.exit(main(sys.argv))