1#!/usr/bin/python2 2 3from __future__ import print_function 4 5import argparse 6import logging 7import multiprocessing 8import subprocess 9import sys 10from multiprocessing.pool import ThreadPool 11 12import common 13from autotest_lib.server import frontend 14from autotest_lib.site_utils.lib import infra 15 16DEPLOY_SERVER_LOCAL = ('/usr/local/autotest/site_utils/deploy_server_local.py') 17POOL_SIZE = 124 18 19 20def _filter_servers(servers): 21 """Filter a set of servers to those that should be deployed to.""" 22 non_push_roles = {'devserver', 'crash_server', 'reserve'} 23 for s in servers: 24 if s['status'] == 'repair_required': 25 continue 26 if s['status'] == 'backup': 27 continue 28 if set(s['roles']) & non_push_roles: 29 continue 30 yield s 31 32 33def discover_servers(afe): 34 """Discover the in-production servers to update. 35 36 Returns the set of servers from serverdb that are in production and should 37 be updated. This filters out servers in need of repair, or servers of roles 38 that are not yet supported by deploy_server / deploy_server_local. 39 40 @param afe: Server to contact with RPC requests. 41 42 @returns: A set of server hostnames. 43 """ 44 # Example server details.... 45 # { 46 # 'hostname': 'server1', 47 # 'status': 'backup', 48 # 'roles': ['drone', 'scheduler'], 49 # 'attributes': {'max_processes': 300} 50 # } 51 rpc = frontend.AFE(server=afe) 52 servers = rpc.run('get_servers') 53 54 return {s['hostname'] for s in _filter_servers(servers)} 55 56 57def _parse_arguments(args): 58 """Parse command line arguments. 59 60 @param args: The command line arguments to parse. (usually sys.argv[1:]) 61 62 @returns A tuple of (argparse.Namespace populated with argument values, 63 list of extra args to pass to deploy_server_local). 64 """ 65 parser = argparse.ArgumentParser( 66 formatter_class=argparse.RawDescriptionHelpFormatter, 67 description='Run deploy_server_local on a bunch of servers. Extra ' 68 'arguments will be passed through.', 69 epilog=('Update all servers:\n' 70 ' deploy_server.py -x --afe cautotest\n' 71 '\n' 72 'Update one server:\n' 73 ' deploy_server.py <server> -x\n' 74 )) 75 76 parser.add_argument('-x', action='store_true', 77 help='Actually perform actions. If not supplied, ' 78 'script does nothing.') 79 parser.add_argument('--afe', 80 help='The AFE server used to get servers from server_db,' 81 'e.g, cautotest. Used only if no SERVER specified.') 82 parser.add_argument('servers', action='store', nargs='*', metavar='SERVER') 83 84 return parser.parse_known_args() 85 86 87def _update_server(server, extra_args=[]): 88 """Run deploy_server_local for given server. 89 90 @param server: hostname to update. 91 @param extra_args: args to be passed in to deploy_server_local. 92 93 @return: A tuple of (server, success, output), where: 94 server: Name of the server. 95 sucess: True if update succeeds, False otherwise. 96 output: A string of the deploy_server_local script output 97 including any errors. 98 """ 99 cmd = ('%s %s' % 100 (DEPLOY_SERVER_LOCAL, ' '.join(extra_args))) 101 success = False 102 try: 103 output = infra.execute_command(server, cmd) 104 success = True 105 except subprocess.CalledProcessError as e: 106 output = e.output 107 108 return server, success, output 109 110def _update_in_parallel(servers, extra_args=[]): 111 """Update a group of servers in parallel. 112 113 @param servers: A list of servers to update. 114 @param options: Options for the push. 115 116 @returns A dictionary from server names that failed to the output 117 of the update script. 118 """ 119 # Create a list to record all the finished servers. 120 manager = multiprocessing.Manager() 121 finished_servers = manager.list() 122 123 do_server = lambda s: _update_server(s, extra_args) 124 125 # The update actions run in parallel. If any update failed, we should wait 126 # for other running updates being finished. Abort in the middle of an update 127 # may leave the server in a bad state. 128 pool = ThreadPool(POOL_SIZE) 129 try: 130 results = pool.map_async(do_server, servers) 131 pool.close() 132 133 # Track the updating progress for current group of servers. 134 incomplete_servers = set() 135 server_names = set([s[0] for s in servers]) 136 while not results.ready(): 137 incomplete_servers = sorted(set(servers) - set(finished_servers)) 138 print('Not finished yet. %d servers in this group. ' 139 '%d servers are still running:\n%s\n' % 140 (len(servers), len(incomplete_servers), incomplete_servers)) 141 # Check the progress every 20s 142 results.wait(20) 143 144 # After update finished, parse the result. 145 failures = {} 146 for server, success, output in results.get(): 147 if not success: 148 failures[server] = output 149 150 return failures 151 152 finally: 153 pool.terminate() 154 pool.join() 155 156 157def main(args): 158 """Entry point to deploy_server.py 159 160 @param args: The command line arguments to parse. (usually sys.argv) 161 162 @returns The system exit code. 163 """ 164 options, extra_args = _parse_arguments(args[1:]) 165 # Remove all the handlers from the root logger to get rid of the handlers 166 # introduced by the import packages. 167 logging.getLogger().handlers = [] 168 logging.basicConfig(level=logging.DEBUG) 169 170 servers = options.servers 171 if not servers: 172 if not options.afe: 173 print('No servers or afe specified. Aborting') 174 return 1 175 print('Retrieving servers from %s..' % options.afe) 176 servers = discover_servers(options.afe) 177 print('Retrieved servers were: %s' % servers) 178 179 if not options.x: 180 print('Doing nothing because -x was not supplied.') 181 print('servers: %s' % options.servers) 182 print('extra args for deploy_server_local: %s' % extra_args) 183 return 0 184 185 failures = _update_in_parallel(servers, extra_args) 186 187 if not failures: 188 print('Completed all updates successfully.') 189 return 0 190 191 print('The following servers failed, with the following output:') 192 for s, o in failures.iteritems(): 193 print('======== %s ========' % s) 194 print(o) 195 196 print('The servers that failed were:') 197 print('\n'.join(failures.keys())) 198 print('\n\nTo retry on failed servers, run the following command:') 199 retry_cmd = [args[0], '-x'] + failures.keys() + extra_args 200 print(' '.join(retry_cmd)) 201 return 1 202 203 204 205if __name__ == '__main__': 206 sys.exit(main(sys.argv)) 207