• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#! /usr/bin/python
2
3# Copyright 2015 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""
8Manage swarming bots.
9
10* Launch bots, e.g. 200 bots:
11    $ swarming_bots.py launch --working_dir WORKING_DIR --id_range '1-200'
12
13* Kill bot 1-200:
14    $ swarming_bots.py kill --working_dir WORKING_DIR --id_range '1-200'
15
16* Check bot 1-200, start if not running:
17    $ swarming_bots.py check --working_dir WORKING_DIR --id_range '1-200'
18
19* The hierachy of working dir is like
20  WORKING_DIR
21    |-- bot_0
22    |   |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem
23    |   |-- bot_config.log
24    |   |-- swarming_bot.log
25    |   |-- swarming_bot.zip
26    |   |-- swarming_bot.pid
27    |-- bot_1
28        |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem
29        |-- bot_config.log
30        |-- swarming_bot.log
31        |-- swarming_bot.zip
32        |-- pid
33  Note bot_config.py:get_dimensions() will rely on the the bot number
34  in the path to generate bot id.
35
36* TODO (fdeng):
37    ** Restart a bot given a bot id.
38"""
39import argparse
40import logging
41import logging.handlers
42import os
43import re
44import shutil
45import signal
46import socket
47import subprocess
48import sys
49import threading
50import time
51import urllib
52
53import common
54
55from autotest_lib.client.common_lib import global_config
56
57
58LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
59LOG_FILE_SIZE = 1024 * 5000 # 5000 KB
60LOG_FILE_BACKUPCOUNT = 5
61DEFAULT_SWARMING_PROXY = global_config.global_config.get_config_value(
62        'CROS', "swarming_proxy", default=None)
63ID_RANGE_FMT = r'(\d+)-(\d+)'
64KILL_PROC_TIMEOUT_SECS = 3600 * 3 # 3 hours
65MAX_KILL_PROC_SLEEP_SECS = 60
66
67
68class BotManagementError(Exception):
69    """Raised for any bot management related error."""
70
71
72class PidMisMatchError(BotManagementError):
73    """Raised if pid file doesn't match what's found by pgrep."""
74
75    def __init__(self, known_pid, new_pid):
76        """Initialize.
77
78        @param known_pid: pid in the pid file.
79        @param new_pid: new pid found by pgrep.
80
81        """
82        self.known_pid = known_pid
83        self.new_pid = new_pid
84        msg = 'pid does not match, pid: %s, found %s' % (
85                self.known_pid, self.new_pid)
86        super(PidMisMatchError, self).__init__(msg)
87
88
89class DuplicateBotError(BotManagementError):
90    """Raised when multiple processes are detected for the same bot id."""
91
92
93def get_hostname():
94    return socket.getfqdn().split(u'.', 1)[0]
95
96
97class SwarmingBot(object):
98    """Class represent a swarming bot."""
99
100
101    PID_FILE = 'swarming_bot.pid'
102    BOT_DIR_FORMAT = 'bot_%s'
103    BOT_FILENAME = 'swarming_bot.zip'
104    # Used to search for bot process
105    # The process may bootstrap itself into swarming_bot.1.zip and swarming_bot.2.zip
106    BOT_CMD_PATTERN = 'swarming_bot.*zip start_bot'
107
108
109    def __init__(self, bot_id, parent_dir, swarming_proxy,
110                 specify_bot_id=False):
111        """Initialize.
112
113        @param bot_id: An integer.
114        @param bot_dir: The working directory for the bot.
115                        The directory is used to store bot code,
116                        log file, and any file generated by the bot
117                        at run time.
118        @param swarming_proxy: URL to the swarming instance.
119        """
120        self.bot_id = bot_id
121        self.specify_bot_id = specify_bot_id
122        if specify_bot_id:
123            self.bot_id = '%s-%s' % (get_hostname(), str(self.bot_id))
124
125        self.swarming_proxy = swarming_proxy
126        self.parent_dir = os.path.abspath(os.path.expanduser(parent_dir))
127        self.bot_dir = os.path.join(self.parent_dir,
128                                    self.BOT_DIR_FORMAT % self.bot_id)
129        self.pid_file = os.path.join(self.bot_dir, self.PID_FILE)
130        self.pid = None
131        self._refresh_pid()
132        if self.pid is None:
133            logging.debug('[Bot %s] Initialize: bot is not running',
134                          self.bot_id)
135        else:
136            logging.debug('[Bot %s] Initialize: bot is running '
137                          'as process %s', self.bot_id, self.pid)
138
139
140    def _write_pid(self):
141        """Write pid to file"""
142        with open(self.pid_file, 'w') as f:
143            f.write(str(self.pid))
144
145
146    def _cleanup_pid(self):
147        """Cleanup self.pid and pid file."""
148        self.pid = None
149        if os.path.exists(self.pid_file):
150            os.remove(self.pid_file)
151
152
153    def _is_process_running(self):
154        """Check if the process is running."""
155        pattern = os.path.join(self.bot_dir, self.BOT_CMD_PATTERN)
156        pattern = '%s %s' % (sys.executable, pattern)
157        cmd = ['pgrep', '-f', pattern]
158        logging.debug('[Bot %s] check process running: %s',
159                      self.bot_id, str(cmd))
160        try:
161            output = subprocess.check_output(cmd)
162            pids = output.splitlines()
163            if len(pids) > 1:
164                raise DuplicateBotError('Multiple processes (pid: %s) detected for Bot %s'
165                                        % (str(pids), self.bot_id))
166            pid = int(pids[0])
167            if pid != self.pid:
168                raise PidMisMatchError(self.pid, pid)
169            return True
170        except subprocess.CalledProcessError as e:
171            if e.returncode == 1:
172                return False
173            else:
174                raise
175
176
177    def _refresh_pid(self):
178        """Check process status and update self.pid accordingly."""
179        # Reload pid from pid file.
180        if os.path.exists(self.pid_file):
181            with open(self.pid_file) as f:
182                try:
183                    pid = f.readline().strip()
184                    self.pid = int(pid)
185                except ValueError as e:
186                    self.pid = None
187        try:
188            if not self._is_process_running():
189                self._cleanup_pid()
190        except PidMisMatchError as e:
191            logging.error('[Bot %s] %s, updating pid file',
192                          self.bot_id, str(e))
193            self.pid = e.new_pid
194            self._write_pid()
195
196
197    def is_running(self):
198        """Return if the bot is running."""
199        self._refresh_pid()
200        return bool(self.pid)
201
202
203    def ensure_running(self):
204        """Start a swarming bot."""
205        if self.is_running():
206            logging.info(
207                    '[Bot %s] Skip start, bot is already running (pid %s).',
208                    self.bot_id, self.pid)
209            return
210        logging.debug('[Bot %s] Bootstrap bot in %s', self.bot_id, self.bot_dir)
211        if os.path.exists(self.bot_dir):
212            shutil.rmtree(self.bot_dir)
213        os.makedirs(self.bot_dir)
214        dest = os.path.join(self.bot_dir, self.BOT_FILENAME)
215        new_env = dict(os.environ)
216        new_env['SWARMING_EXTERNAL_BOT_SETUP'] = '1'
217        logging.debug('[Bot %s] Getting bot code from: %s/bot_code',
218                      self.bot_id, self.swarming_proxy)
219        if self.specify_bot_id:
220            url = '%s/bot_code?bot_id=%s' % (self.swarming_proxy, self.bot_id)
221            new_env['SWARMING_BOT_ID'] = self.bot_id
222        else:
223            url = '%s/bot_code' % self.swarming_proxy
224
225        logging.info('Download bot code from %s', url)
226        urllib.urlretrieve(url, dest)
227        cmd = [sys.executable, self.BOT_FILENAME]
228        logging.debug('[Bot %s] Calling command: %s', self. bot_id, cmd)
229        process = subprocess.Popen(
230                cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
231                cwd=self.bot_dir, env=new_env)
232        self.pid = process.pid
233        self._write_pid()
234        logging.info('[Bot %s] Created bot (pid: %d)', self.bot_id, self.pid)
235
236
237    def kill(self):
238        """Kill the bot."""
239        if not self.is_running():
240            logging.info('[Bot %s] Skip killing bot, Bot is not running',
241                          self.bot_id)
242            return
243        try:
244            logging.info('[Bot %s] killing bot (pid: %d)',
245                          self.bot_id, self.pid)
246            os.kill(self.pid, signal.SIGTERM)
247            start = time.time()
248            sleep = 1
249            while(time.time() - start < KILL_PROC_TIMEOUT_SECS):
250                if not self.is_running():
251                    return
252                sleep = min(sleep * 2, MAX_KILL_PROC_SLEEP_SECS)
253                logging.debug('[Bot %s] Waiting %d secs for bot to finish'
254                              ' any running task and exist.',
255                              self.bot_id, sleep)
256                time.sleep(sleep)
257            else:
258                logging.error(
259                        '[Bot %s] Failed to kill pid %s within %d secs, '
260                        'the bot may be running a long running task, you '
261                        'can retry the script. SIGKILL the process is not '
262                        'recommended, it might lead to unexpected error.',
263                        self.bot_id, self.pid, KILL_PROC_TIMEOUT_SECS)
264        except Exception as e:
265            raise BotManagementError('[Bot %s] %s' % (self.bot_id, str(e)))
266
267
268class BotManager(object):
269    """Class that manages swarming bots."""
270
271
272    CHECK_BOTS_PATTERN = '{executable} {working_dir}.*{bot_cmd_pattern}'
273
274
275    def __init__(self, bot_ids, working_dir, swarming_proxy,
276                 specify_bot_id=False):
277        """Initialize.
278
279        @param bot_ids: a set of integers.
280        @param working_dir: Working directory of the bots.
281                            Store temporary files.
282        @param swarming_proxy: The swarming instance to talk to.
283        """
284        self.bot_ids = bot_ids
285        self.working_dir = os.path.abspath(os.path.expanduser(working_dir))
286        self.bots = [SwarmingBot(bid, self.working_dir, swarming_proxy,
287                                 specify_bot_id)
288                     for bid in bot_ids]
289
290    def launch(self):
291        """Launch bots."""
292        for bot in self.bots:
293          try:
294              bot.ensure_running()
295          except BotManagementError as e:
296              logging.error('[BotManager] Failed to start Bot %s: %s',
297                            bot.bot_id, str(e))
298        # If we let the process exit immediately, the last process won't
299        # be launched sometimes. So sleep for 3 secs.
300        # The right way is to query the server until all bots are seen
301        # by the server by visiting
302        # https://SWARMING_PROXY/swarming/api/v1/client/bots
303        # However, this would require oauth authentication (install
304        # oauth library and install credentials).
305        logging.info('Wait 3 seconds for process creation to complete.')
306        time.sleep(3)
307
308
309    def kill(self):
310        """Kill running bots."""
311        # Start threads to kill bots.
312        threads = []
313        for bot in self.bots:
314            t = threading.Thread(target=bot.kill)
315            threads.append(t)
316            t.setDaemon(True)
317            t.start()
318        # Wait on threads.
319        try:
320            while threading.active_count() > 1:
321                time.sleep(0.1)
322        except KeyboardInterrupt:
323            msg = 'Ctrl-c recieved! Bots status not confirmed. Exit.'
324            logging.error(msg)
325            print msg
326
327
328    def check(self):
329        """Check running bots, start it if not running."""
330        pattern =  self.CHECK_BOTS_PATTERN.format(
331                executable=sys.executable, working_dir=self.working_dir,
332                bot_cmd_pattern=SwarmingBot.BOT_CMD_PATTERN)
333        cmd = ['pgrep', '-f', pattern]
334        logging.debug('[BotManager] Check bot counts: %s', str(cmd))
335        try:
336            output = subprocess.check_output(cmd)
337            bot_count = len(output.splitlines())
338        except subprocess.CalledProcessError as e:
339            if e.returncode == 1:
340                bot_count = 0
341            else:
342                raise
343        missing_count = len(self.bot_ids) - bot_count
344        logging.info(
345                '[BotManager] Check bot counts: %d bots running, missing: %d',
346                bot_count, missing_count)
347        if missing_count > 0:
348            logging.info('[BotManager] Checking all bots')
349            self.launch()
350
351
352def parse_range(id_range):
353    """Convert an id range to a set of bot ids.
354
355    @param id_range: A range of integer, e.g "1-200".
356
357    @returns a set of bot ids set([1,2,...200])
358    """
359    m = re.match(ID_RANGE_FMT, id_range)
360    if not m:
361        raise ValueError('Could not parse %s' % id_range)
362    min, max = int(m.group(1)), int(m.group(2))
363    return set(bid for bid in range(min, max+1))
364
365
366def _parse_args(args):
367    """Parse args.
368
369    @param args: Argument list passed from main.
370
371    @return: A tuple with the parsed args, as returned by parser.parse_args.
372    """
373    parser = argparse.ArgumentParser(
374            description='Launch swarming bots on a autotest server')
375    action_help = ('launch: launch bots. '
376                  'kill: kill bots. '
377                  'check: check if bots are running, if not, starting bots.')
378    parser.add_argument(
379            'action', choices=('launch', 'kill', 'check'), help=action_help)
380    parser.add_argument(
381            '-r', '--id_range', type=str, dest='id_range', required=True,
382            help='A range of integer, each bot created will be labeled '
383                 'with an id from this range. E.g. "1-200"')
384    parser.add_argument(
385            '-d', '--working_dir', type=str, dest='working_dir', required=True,
386            help='A working directory where bots will store files '
387                 'generated at runtime')
388    parser.add_argument(
389            '-p', '--swarming_proxy', type=str, dest='swarming_proxy',
390            default=DEFAULT_SWARMING_PROXY,
391            help='The URL of the swarming instance to talk to, '
392                 'Default to the one specified in global config')
393    parser.add_argument(
394            '-f', '--log_file', dest='log_file',
395            help='Path to the log file.')
396    parser.add_argument(
397            '-v', '--verbose', dest='verbose', action='store_true',
398            help='Verbose mode')
399
400    return parser.parse_args(args)
401
402
403def setup_logging(verbose, log_file):
404    """Setup logging.
405
406    @param verbose: bool, if True, log at DEBUG level, otherwise INFO level.
407    @param log_file; path to log file.
408    """
409    log_formatter = logging.Formatter(LOGGING_FORMAT)
410    if not log_file:
411        handler = logging.StreamHandler()
412    else:
413        handler = logging.handlers.RotatingFileHandler(
414                filename=log_file, maxBytes=LOG_FILE_SIZE,
415                backupCount=LOG_FILE_BACKUPCOUNT)
416    handler.setFormatter(log_formatter)
417    logger = logging.getLogger()
418    log_level = logging.DEBUG if verbose else logging.INFO
419    logger.setLevel(log_level)
420    logger.addHandler(handler)
421
422
423def main(args):
424    """Main.
425
426    @args: A list of system arguments.
427    """
428    args = _parse_args(args)
429    setup_logging(args.verbose, args.log_file)
430
431    if not args.swarming_proxy:
432        logging.error(
433                'No swarming proxy instance specified. '
434                'Specify swarming_proxy in [CROS] in shadow_config, '
435                'or use --swarming_proxy')
436        return 1
437    if not args.swarming_proxy.startswith('https://'):
438        swarming_proxy = 'https://' + args.swarming_proxy
439    else:
440        swarming_proxy = args.swarming_proxy
441
442    logging.info('Connecting to %s', swarming_proxy)
443    m = BotManager(parse_range(args.id_range),
444                   args.working_dir, args.swarming_proxy)
445
446    if args.action == 'launch':
447        m.launch()
448    elif args.action == 'kill':
449        m.kill()
450    elif args.action == 'check':
451        m.check()
452
453
454if __name__ == '__main__':
455    sys.exit(main(sys.argv[1:]))
456