• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2
3from __future__ import print_function
4
5import argparse
6import logging
7import multiprocessing
8import subprocess
9import sys
10
11import common
12from autotest_lib.server import frontend
13from autotest_lib.site_utils.lib import infra
14
15DEPLOY_SERVER_LOCAL = ('/usr/local/autotest/site_utils/deploy_server_local.py')
16POOL_SIZE = 124
17
18
19def _filter_servers(servers):
20    """Filter a set of servers to those that should be deployed to."""
21    non_push_roles = {'devserver', 'crash_server', 'reserve'}
22    for s in servers:
23        if s['status'] == 'repair_required':
24            continue
25        if s['status'] == 'backup':
26            continue
27        if set(s['roles']) & non_push_roles:
28            continue
29        yield s
30
31
32def discover_servers(afe):
33    """Discover the in-production servers to update.
34
35    Returns the set of servers from serverdb that are in production and should
36    be updated. This filters out servers in need of repair, or servers of roles
37    that are not yet supported by deploy_server / deploy_server_local.
38
39    @param afe: Server to contact with RPC requests.
40
41    @returns: A set of server hostnames.
42    """
43    # Example server details....
44    # {
45    #     'hostname': 'server1',
46    #     'status': 'backup',
47    #     'roles': ['drone', 'scheduler'],
48    #     'attributes': {'max_processes': 300}
49    # }
50    rpc = frontend.AFE(server=afe)
51    servers = rpc.run('get_servers')
52
53    return {s['hostname'] for s in _filter_servers(servers)}
54
55
56def _parse_arguments(args):
57    """Parse command line arguments.
58
59    @param args: The command line arguments to parse. (usually sys.argv[1:])
60
61    @returns A tuple of (argparse.Namespace populated with argument values,
62                         list of extra args to pass to deploy_server_local).
63    """
64    parser = argparse.ArgumentParser(
65            formatter_class=argparse.RawDescriptionHelpFormatter,
66            description='Run deploy_server_local on a bunch of servers. Extra '
67                        'arguments will be passed through.',
68            epilog=('Update all servers:\n'
69                    '  deploy_server.py -x --afe cautotest\n'
70                    '\n'
71                    'Update one server:\n'
72                    '  deploy_server.py <server> -x\n'
73                    ))
74
75    parser.add_argument('-x', action='store_true',
76                        help='Actually perform actions. If not supplied, '
77                             'script does nothing.')
78    parser.add_argument('--afe',
79            help='The AFE server used to get servers from server_db,'
80                 'e.g, cautotest. Used only if no SERVER specified.')
81    parser.add_argument('servers', action='store', nargs='*', metavar='SERVER')
82
83    return parser.parse_known_args()
84
85
86def _update_server(server, extra_args=[]):
87    """Run deploy_server_local for given server.
88
89    @param server: hostname to update.
90    @param extra_args: args to be passed in to deploy_server_local.
91
92    @return: A tuple of (server, success, output), where:
93             server: Name of the server.
94             sucess: True if update succeeds, False otherwise.
95             output: A string of the deploy_server_local script output
96                     including any errors.
97    """
98    cmd = ('%s %s' %
99           (DEPLOY_SERVER_LOCAL, ' '.join(extra_args)))
100    success = False
101    try:
102        output = infra.execute_command(server, cmd)
103        success = True
104    except subprocess.CalledProcessError as e:
105        output = e.output
106
107    return server, success, output
108
109def _update_in_parallel(servers, extra_args=[]):
110    """Update a group of servers in parallel.
111
112    @param servers: A list of servers to update.
113    @param options: Options for the push.
114
115    @returns A dictionary from server names that failed to the output
116             of the update script.
117    """
118    # Create a list to record all the finished servers.
119    manager = multiprocessing.Manager()
120    finished_servers = manager.list()
121
122    do_server = lambda s: _update_server(s, extra_args)
123
124    # The update actions run in parallel. If any update failed, we should wait
125    # for other running updates being finished. Abort in the middle of an update
126    # may leave the server in a bad state.
127    pool = multiprocessing.pool.ThreadPool(POOL_SIZE)
128    try:
129        results = pool.map_async(do_server, servers)
130        pool.close()
131
132        # Track the updating progress for current group of servers.
133        incomplete_servers = set()
134        server_names = set([s[0] for s in servers])
135        while not results.ready():
136            incomplete_servers = sorted(set(servers) - set(finished_servers))
137            print('Not finished yet. %d servers in this group. '
138                '%d servers are still running:\n%s\n' %
139                (len(servers), len(incomplete_servers), incomplete_servers))
140            # Check the progress every 20s
141            results.wait(20)
142
143        # After update finished, parse the result.
144        failures = {}
145        for server, success, output in results.get():
146            if not success:
147                failures[server] = output
148
149        return failures
150
151    finally:
152        pool.terminate()
153        pool.join()
154
155
156def main(args):
157    """Entry point to deploy_server.py
158
159    @param args: The command line arguments to parse. (usually sys.argv)
160
161    @returns The system exit code.
162    """
163    options, extra_args = _parse_arguments(args[1:])
164    # Remove all the handlers from the root logger to get rid of the handlers
165    # introduced by the import packages.
166    logging.getLogger().handlers = []
167    logging.basicConfig(level=logging.DEBUG)
168
169    servers = options.servers
170    if not servers:
171        if not options.afe:
172            print('No servers or afe specified. Aborting')
173            return 1
174        print('Retrieving servers from %s..' % options.afe)
175        servers = discover_servers(options.afe)
176        print('Retrieved servers were: %s' % servers)
177
178    if not options.x:
179        print('Doing nothing because -x was not supplied.')
180        print('servers: %s' % options.servers)
181        print('extra args for deploy_server_local: %s' % extra_args)
182        return 0
183
184    failures = _update_in_parallel(servers, extra_args)
185
186    if not failures:
187        print('Completed all updates successfully.')
188        return 0
189
190    print('The following servers failed, with the following output:')
191    for s, o in failures.iteritems():
192        print('======== %s ========' % s)
193        print(o)
194
195    print('The servers that failed were:')
196    print('\n'.join(failures.keys()))
197    print('\n\nTo retry on failed servers, run the following command:')
198    retry_cmd = [args[0], '-x'] + failures.keys() + extra_args
199    print(' '.join(retry_cmd))
200    return 1
201
202
203
204if __name__ == '__main__':
205    sys.exit(main(sys.argv))
206