1#! /usr/bin/python 2 3# Copyright 2015 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7""" 8Manage swarming bots. 9 10* Launch bots, e.g. 200 bots: 11 $ swarming_bots.py launch --working_dir WORKING_DIR --id_range '1-200' 12 13* Kill bot 1-200: 14 $ swarming_bots.py kill --working_dir WORKING_DIR --id_range '1-200' 15 16* Check bot 1-200, start if not running: 17 $ swarming_bots.py check --working_dir WORKING_DIR --id_range '1-200' 18 19* The hierachy of working dir is like 20 WORKING_DIR 21 |-- bot_0 22 | |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem 23 | |-- bot_config.log 24 | |-- swarming_bot.log 25 | |-- swarming_bot.zip 26 | |-- swarming_bot.pid 27 |-- bot_1 28 |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem 29 |-- bot_config.log 30 |-- swarming_bot.log 31 |-- swarming_bot.zip 32 |-- pid 33 Note bot_config.py:get_dimensions() will rely on the the bot number 34 in the path to generate bot id. 35 36* TODO (fdeng): 37 ** Restart a bot given a bot id. 38""" 39import argparse 40import logging 41import logging.handlers 42import os 43import re 44import shutil 45import signal 46import socket 47import subprocess 48import sys 49import threading 50import time 51import urllib 52 53import common 54 55from autotest_lib.client.common_lib import global_config 56 57 58LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' 59LOG_FILE_SIZE = 1024 * 5000 # 5000 KB 60LOG_FILE_BACKUPCOUNT = 5 61DEFAULT_SWARMING_PROXY = global_config.global_config.get_config_value( 62 'CROS', "swarming_proxy", default=None) 63ID_RANGE_FMT = r'(\d+)-(\d+)' 64KILL_PROC_TIMEOUT_SECS = 3600 * 3 # 3 hours 65MAX_KILL_PROC_SLEEP_SECS = 60 66 67 68class BotManagementError(Exception): 69 """Raised for any bot management related error.""" 70 71 72class PidMisMatchError(BotManagementError): 73 """Raised if pid file doesn't match what's found by pgrep.""" 74 75 def __init__(self, known_pid, new_pid): 76 """Initialize. 77 78 @param known_pid: pid in the pid file. 79 @param new_pid: new pid found by pgrep. 80 81 """ 82 self.known_pid = known_pid 83 self.new_pid = new_pid 84 msg = 'pid does not match, pid: %s, found %s' % ( 85 self.known_pid, self.new_pid) 86 super(PidMisMatchError, self).__init__(msg) 87 88 89class DuplicateBotError(BotManagementError): 90 """Raised when multiple processes are detected for the same bot id.""" 91 92 93def get_hostname(): 94 return socket.getfqdn().split(u'.', 1)[0] 95 96 97class SwarmingBot(object): 98 """Class represent a swarming bot.""" 99 100 101 PID_FILE = 'swarming_bot.pid' 102 BOT_DIR_FORMAT = 'bot_%s' 103 BOT_FILENAME = 'swarming_bot.zip' 104 # Used to search for bot process 105 # The process may bootstrap itself into swarming_bot.1.zip and swarming_bot.2.zip 106 BOT_CMD_PATTERN = 'swarming_bot.*zip start_bot' 107 108 109 def __init__(self, bot_id, parent_dir, swarming_proxy, 110 specify_bot_id=False): 111 """Initialize. 112 113 @param bot_id: An integer. 114 @param bot_dir: The working directory for the bot. 115 The directory is used to store bot code, 116 log file, and any file generated by the bot 117 at run time. 118 @param swarming_proxy: URL to the swarming instance. 119 """ 120 self.bot_id = bot_id 121 self.specify_bot_id = specify_bot_id 122 if specify_bot_id: 123 self.bot_id = '%s-%s' % (get_hostname(), str(self.bot_id)) 124 125 self.swarming_proxy = swarming_proxy 126 self.parent_dir = os.path.abspath(os.path.expanduser(parent_dir)) 127 self.bot_dir = os.path.join(self.parent_dir, 128 self.BOT_DIR_FORMAT % self.bot_id) 129 self.pid_file = os.path.join(self.bot_dir, self.PID_FILE) 130 self.pid = None 131 self._refresh_pid() 132 if self.pid is None: 133 logging.debug('[Bot %s] Initialize: bot is not running', 134 self.bot_id) 135 else: 136 logging.debug('[Bot %s] Initialize: bot is running ' 137 'as process %s', self.bot_id, self.pid) 138 139 140 def _write_pid(self): 141 """Write pid to file""" 142 with open(self.pid_file, 'w') as f: 143 f.write(str(self.pid)) 144 145 146 def _cleanup_pid(self): 147 """Cleanup self.pid and pid file.""" 148 self.pid = None 149 if os.path.exists(self.pid_file): 150 os.remove(self.pid_file) 151 152 153 def _is_process_running(self): 154 """Check if the process is running.""" 155 pattern = os.path.join(self.bot_dir, self.BOT_CMD_PATTERN) 156 pattern = '%s %s' % (sys.executable, pattern) 157 cmd = ['pgrep', '-f', pattern] 158 logging.debug('[Bot %s] check process running: %s', 159 self.bot_id, str(cmd)) 160 try: 161 output = subprocess.check_output(cmd) 162 pids = output.splitlines() 163 if len(pids) > 1: 164 raise DuplicateBotError('Multiple processes (pid: %s) detected for Bot %s' 165 % (str(pids), self.bot_id)) 166 pid = int(pids[0]) 167 if pid != self.pid: 168 raise PidMisMatchError(self.pid, pid) 169 return True 170 except subprocess.CalledProcessError as e: 171 if e.returncode == 1: 172 return False 173 else: 174 raise 175 176 177 def _refresh_pid(self): 178 """Check process status and update self.pid accordingly.""" 179 # Reload pid from pid file. 180 if os.path.exists(self.pid_file): 181 with open(self.pid_file) as f: 182 try: 183 pid = f.readline().strip() 184 self.pid = int(pid) 185 except ValueError as e: 186 self.pid = None 187 try: 188 if not self._is_process_running(): 189 self._cleanup_pid() 190 except PidMisMatchError as e: 191 logging.error('[Bot %s] %s, updating pid file', 192 self.bot_id, str(e)) 193 self.pid = e.new_pid 194 self._write_pid() 195 196 197 def is_running(self): 198 """Return if the bot is running.""" 199 self._refresh_pid() 200 return bool(self.pid) 201 202 203 def ensure_running(self): 204 """Start a swarming bot.""" 205 if self.is_running(): 206 logging.info( 207 '[Bot %s] Skip start, bot is already running (pid %s).', 208 self.bot_id, self.pid) 209 return 210 logging.debug('[Bot %s] Bootstrap bot in %s', self.bot_id, self.bot_dir) 211 if os.path.exists(self.bot_dir): 212 shutil.rmtree(self.bot_dir) 213 os.makedirs(self.bot_dir) 214 dest = os.path.join(self.bot_dir, self.BOT_FILENAME) 215 new_env = dict(os.environ) 216 new_env['SWARMING_EXTERNAL_BOT_SETUP'] = '1' 217 logging.debug('[Bot %s] Getting bot code from: %s/bot_code', 218 self.bot_id, self.swarming_proxy) 219 if self.specify_bot_id: 220 url = '%s/bot_code?bot_id=%s' % (self.swarming_proxy, self.bot_id) 221 new_env['SWARMING_BOT_ID'] = self.bot_id 222 else: 223 url = '%s/bot_code' % self.swarming_proxy 224 225 logging.info('Download bot code from %s', url) 226 urllib.urlretrieve(url, dest) 227 cmd = [sys.executable, self.BOT_FILENAME] 228 logging.debug('[Bot %s] Calling command: %s', self. bot_id, cmd) 229 process = subprocess.Popen( 230 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, 231 cwd=self.bot_dir, env=new_env) 232 self.pid = process.pid 233 self._write_pid() 234 logging.info('[Bot %s] Created bot (pid: %d)', self.bot_id, self.pid) 235 236 237 def kill(self): 238 """Kill the bot.""" 239 if not self.is_running(): 240 logging.info('[Bot %s] Skip killing bot, Bot is not running', 241 self.bot_id) 242 return 243 try: 244 logging.info('[Bot %s] killing bot (pid: %d)', 245 self.bot_id, self.pid) 246 os.kill(self.pid, signal.SIGTERM) 247 start = time.time() 248 sleep = 1 249 while(time.time() - start < KILL_PROC_TIMEOUT_SECS): 250 if not self.is_running(): 251 return 252 sleep = min(sleep * 2, MAX_KILL_PROC_SLEEP_SECS) 253 logging.debug('[Bot %s] Waiting %d secs for bot to finish' 254 ' any running task and exist.', 255 self.bot_id, sleep) 256 time.sleep(sleep) 257 else: 258 logging.error( 259 '[Bot %s] Failed to kill pid %s within %d secs, ' 260 'the bot may be running a long running task, you ' 261 'can retry the script. SIGKILL the process is not ' 262 'recommended, it might lead to unexpected error.', 263 self.bot_id, self.pid, KILL_PROC_TIMEOUT_SECS) 264 except Exception as e: 265 raise BotManagementError('[Bot %s] %s' % (self.bot_id, str(e))) 266 267 268class BotManager(object): 269 """Class that manages swarming bots.""" 270 271 272 CHECK_BOTS_PATTERN = '{executable} {working_dir}.*{bot_cmd_pattern}' 273 274 275 def __init__(self, bot_ids, working_dir, swarming_proxy, 276 specify_bot_id=False): 277 """Initialize. 278 279 @param bot_ids: a set of integers. 280 @param working_dir: Working directory of the bots. 281 Store temporary files. 282 @param swarming_proxy: The swarming instance to talk to. 283 """ 284 self.bot_ids = bot_ids 285 self.working_dir = os.path.abspath(os.path.expanduser(working_dir)) 286 self.bots = [SwarmingBot(bid, self.working_dir, swarming_proxy, 287 specify_bot_id) 288 for bid in bot_ids] 289 290 def launch(self): 291 """Launch bots.""" 292 for bot in self.bots: 293 try: 294 bot.ensure_running() 295 except BotManagementError as e: 296 logging.error('[BotManager] Failed to start Bot %s: %s', 297 bot.bot_id, str(e)) 298 # If we let the process exit immediately, the last process won't 299 # be launched sometimes. So sleep for 3 secs. 300 # The right way is to query the server until all bots are seen 301 # by the server by visiting 302 # https://SWARMING_PROXY/swarming/api/v1/client/bots 303 # However, this would require oauth authentication (install 304 # oauth library and install credentials). 305 logging.info('Wait 3 seconds for process creation to complete.') 306 time.sleep(3) 307 308 309 def kill(self): 310 """Kill running bots.""" 311 # Start threads to kill bots. 312 threads = [] 313 for bot in self.bots: 314 t = threading.Thread(target=bot.kill) 315 threads.append(t) 316 t.setDaemon(True) 317 t.start() 318 # Wait on threads. 319 try: 320 while threading.active_count() > 1: 321 time.sleep(0.1) 322 except KeyboardInterrupt: 323 msg = 'Ctrl-c recieved! Bots status not confirmed. Exit.' 324 logging.error(msg) 325 print msg 326 327 328 def check(self): 329 """Check running bots, start it if not running.""" 330 pattern = self.CHECK_BOTS_PATTERN.format( 331 executable=sys.executable, working_dir=self.working_dir, 332 bot_cmd_pattern=SwarmingBot.BOT_CMD_PATTERN) 333 cmd = ['pgrep', '-f', pattern] 334 logging.debug('[BotManager] Check bot counts: %s', str(cmd)) 335 try: 336 output = subprocess.check_output(cmd) 337 bot_count = len(output.splitlines()) 338 except subprocess.CalledProcessError as e: 339 if e.returncode == 1: 340 bot_count = 0 341 else: 342 raise 343 missing_count = len(self.bot_ids) - bot_count 344 logging.info( 345 '[BotManager] Check bot counts: %d bots running, missing: %d', 346 bot_count, missing_count) 347 if missing_count > 0: 348 logging.info('[BotManager] Checking all bots') 349 self.launch() 350 351 352def parse_range(id_range): 353 """Convert an id range to a set of bot ids. 354 355 @param id_range: A range of integer, e.g "1-200". 356 357 @returns a set of bot ids set([1,2,...200]) 358 """ 359 m = re.match(ID_RANGE_FMT, id_range) 360 if not m: 361 raise ValueError('Could not parse %s' % id_range) 362 min, max = int(m.group(1)), int(m.group(2)) 363 return set(bid for bid in range(min, max+1)) 364 365 366def _parse_args(args): 367 """Parse args. 368 369 @param args: Argument list passed from main. 370 371 @return: A tuple with the parsed args, as returned by parser.parse_args. 372 """ 373 parser = argparse.ArgumentParser( 374 description='Launch swarming bots on a autotest server') 375 action_help = ('launch: launch bots. ' 376 'kill: kill bots. ' 377 'check: check if bots are running, if not, starting bots.') 378 parser.add_argument( 379 'action', choices=('launch', 'kill', 'check'), help=action_help) 380 parser.add_argument( 381 '-r', '--id_range', type=str, dest='id_range', required=True, 382 help='A range of integer, each bot created will be labeled ' 383 'with an id from this range. E.g. "1-200"') 384 parser.add_argument( 385 '-d', '--working_dir', type=str, dest='working_dir', required=True, 386 help='A working directory where bots will store files ' 387 'generated at runtime') 388 parser.add_argument( 389 '-p', '--swarming_proxy', type=str, dest='swarming_proxy', 390 default=DEFAULT_SWARMING_PROXY, 391 help='The URL of the swarming instance to talk to, ' 392 'Default to the one specified in global config') 393 parser.add_argument( 394 '-f', '--log_file', dest='log_file', 395 help='Path to the log file.') 396 parser.add_argument( 397 '-v', '--verbose', dest='verbose', action='store_true', 398 help='Verbose mode') 399 400 return parser.parse_args(args) 401 402 403def setup_logging(verbose, log_file): 404 """Setup logging. 405 406 @param verbose: bool, if True, log at DEBUG level, otherwise INFO level. 407 @param log_file; path to log file. 408 """ 409 log_formatter = logging.Formatter(LOGGING_FORMAT) 410 if not log_file: 411 handler = logging.StreamHandler() 412 else: 413 handler = logging.handlers.RotatingFileHandler( 414 filename=log_file, maxBytes=LOG_FILE_SIZE, 415 backupCount=LOG_FILE_BACKUPCOUNT) 416 handler.setFormatter(log_formatter) 417 logger = logging.getLogger() 418 log_level = logging.DEBUG if verbose else logging.INFO 419 logger.setLevel(log_level) 420 logger.addHandler(handler) 421 422 423def main(args): 424 """Main. 425 426 @args: A list of system arguments. 427 """ 428 args = _parse_args(args) 429 setup_logging(args.verbose, args.log_file) 430 431 if not args.swarming_proxy: 432 logging.error( 433 'No swarming proxy instance specified. ' 434 'Specify swarming_proxy in [CROS] in shadow_config, ' 435 'or use --swarming_proxy') 436 return 1 437 if not args.swarming_proxy.startswith('https://'): 438 swarming_proxy = 'https://' + args.swarming_proxy 439 else: 440 swarming_proxy = args.swarming_proxy 441 442 logging.info('Connecting to %s', swarming_proxy) 443 m = BotManager(parse_range(args.id_range), 444 args.working_dir, args.swarming_proxy) 445 446 if args.action == 'launch': 447 m.launch() 448 elif args.action == 'kill': 449 m.kill() 450 elif args.action == 'check': 451 m.check() 452 453 454if __name__ == '__main__': 455 sys.exit(main(sys.argv[1:])) 456