1#!/usr/bin/python 2 3"""Utility module that executes management commands on the drone. 4 51. This is the module responsible for orchestrating processes on a drone. 62. It receives instructions via stdin and replies via stdout. 73. Each invocation is responsible for the initiation of a set of batched calls. 84. The batched calls may be synchronous or asynchronous. 95. The caller is responsible for monitoring asynchronous calls through pidfiles. 10""" 11 12#pylint: disable-msg=missing-docstring 13 14import argparse 15import collections 16import datetime 17import getpass 18import itertools 19import logging 20import multiprocessing 21import os 22import pickle 23import shutil 24import signal 25import subprocess 26import sys 27import time 28import traceback 29 30import common 31 32from autotest_lib.client.common_lib import error 33from autotest_lib.client.common_lib import global_config 34from autotest_lib.client.common_lib import logging_manager 35from autotest_lib.client.common_lib import utils 36from autotest_lib.client.common_lib.cros import retry 37from autotest_lib.scheduler import drone_logging_config 38from autotest_lib.scheduler import scheduler_config 39from autotest_lib.server import subcommand 40 41 42# An environment variable we add to the environment to enable us to 43# distinguish processes we started from those that were started by 44# something else during recovery. Name credit goes to showard. ;) 45DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK' 46 47_TEMPORARY_DIRECTORY = 'drone_tmp' 48_TRANSFER_FAILED_FILE = '.transfer_failed' 49 50# script and log file for cleaning up orphaned lxc containers. 51LXC_CLEANUP_SCRIPT = os.path.join(common.autotest_dir, 'site_utils', 52 'lxc_cleanup.py') 53LXC_CLEANUP_LOG_FILE = os.path.join(common.autotest_dir, 'logs', 54 'lxc_cleanup.log') 55 56 57class _MethodCall(object): 58 def __init__(self, method, args, kwargs): 59 self._method = method 60 self._args = args 61 self._kwargs = kwargs 62 63 64 def execute_on(self, drone_utility): 65 method = getattr(drone_utility, self._method) 66 return method(*self._args, **self._kwargs) 67 68 69 def __str__(self): 70 args = ', '.join(repr(arg) for arg in self._args) 71 kwargs = ', '.join('%s=%r' % (key, value) for key, value in 72 self._kwargs.iteritems()) 73 full_args = ', '.join(item for item in (args, kwargs) if item) 74 return '%s(%s)' % (self._method, full_args) 75 76 77def call(method, *args, **kwargs): 78 return _MethodCall(method, args, kwargs) 79 80 81class DroneUtility(object): 82 """ 83 This class executes actual OS calls on the drone machine. 84 85 All paths going into and out of this class are absolute. 86 """ 87 _WARNING_DURATION = 400 88 89 def __init__(self): 90 # Tattoo ourselves so that all of our spawn bears our mark. 91 os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid())) 92 93 self.warnings = [] 94 self._subcommands = [] 95 96 97 def initialize(self, results_dir): 98 temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY) 99 if os.path.exists(temporary_directory): 100 # TODO crbug.com/391111: before we have a better solution to 101 # periodically cleanup tmp files, we have to use rm -rf to delete 102 # the whole folder. shutil.rmtree has performance issue when a 103 # folder has large amount of files, e.g., drone_tmp. 104 os.system('rm -rf %s' % temporary_directory) 105 self._ensure_directory_exists(temporary_directory) 106 # TODO (sbasi) crbug.com/345011 - Remove this configuration variable 107 # and clean up build_externals so it NO-OP's. 108 build_externals = global_config.global_config.get_config_value( 109 scheduler_config.CONFIG_SECTION, 'drone_build_externals', 110 default=True, type=bool) 111 if build_externals: 112 build_extern_cmd = os.path.join(common.autotest_dir, 113 'utils', 'build_externals.py') 114 utils.run(build_extern_cmd) 115 116 117 def _warn(self, warning): 118 self.warnings.append(warning) 119 120 121 def refresh(self, pidfile_paths): 122 """Refreshes our view of the processes referred to by pdfile_paths. 123 124 See drone_utility.ProcessRefresher.__call__ for details. 125 """ 126 check_mark = global_config.global_config.get_config_value( 127 'SCHEDULER', 'check_processes_for_dark_mark', bool, False) 128 use_pool = global_config.global_config.get_config_value( 129 'SCHEDULER', 'drone_utility_refresh_use_pool', bool, False) 130 result, warnings = ProcessRefresher(check_mark, use_pool)(pidfile_paths) 131 self.warnings += warnings 132 return result 133 134 135 def get_signal_queue_to_kill(self, process): 136 """Get the signal queue needed to kill a process. 137 138 autoserv process has a handle on SIGTERM, in which it can do some 139 cleanup work. However, abort a process with SIGTERM then SIGKILL has 140 its overhead, detailed in following CL: 141 https://chromium-review.googlesource.com/230323 142 This method checks the process's argument and determine if SIGTERM is 143 required, and returns signal queue accordingly. 144 145 @param process: A drone_manager.Process object to be killed. 146 147 @return: The signal queue needed to kill a process. 148 149 """ 150 signal_queue_with_sigterm = (signal.SIGTERM, signal.SIGKILL) 151 try: 152 ps_output = subprocess.check_output( 153 ['/bin/ps', '-p', str(process.pid), '-o', 'args']) 154 # For test running with server-side packaging, SIGTERM needs to be 155 # sent for autoserv process to destroy container used by the test. 156 if '--require-ssp' in ps_output: 157 logging.debug('PID %d requires SIGTERM to abort to cleanup ' 158 'container.', process.pid) 159 return signal_queue_with_sigterm 160 except subprocess.CalledProcessError: 161 # Ignore errors, return the signal queue with SIGTERM to be safe. 162 return signal_queue_with_sigterm 163 # Default to kill the process with SIGKILL directly. 164 return (signal.SIGKILL,) 165 166 167 def kill_processes(self, process_list): 168 """Send signals escalating in severity to the processes in process_list. 169 170 @param process_list: A list of drone_manager.Process objects 171 representing the processes to kill. 172 """ 173 try: 174 logging.info('List of process to be killed: %s', process_list) 175 processes_to_kill = {} 176 for p in process_list: 177 signal_queue = self.get_signal_queue_to_kill(p) 178 processes_to_kill[signal_queue] = ( 179 processes_to_kill.get(signal_queue, []) + [p]) 180 sig_counts = {} 181 for signal_queue, processes in processes_to_kill.iteritems(): 182 sig_counts.update(utils.nuke_pids( 183 [-process.pid for process in processes], 184 signal_queue=signal_queue)) 185 except error.AutoservRunError as e: 186 self._warn('Error occured when killing processes. Error: %s' % e) 187 188 189 def _ensure_directory_exists(self, path): 190 if not os.path.exists(path): 191 os.makedirs(path) 192 return 193 if os.path.isdir(path): 194 return 195 assert os.path.isfile(path) 196 if '/hosts/' in path: 197 return 198 raise IOError('Path %s exists as a file, not a directory') 199 200 201 def execute_command(self, command, working_directory, log_file, 202 pidfile_name): 203 out_file = None 204 if log_file: 205 self._ensure_directory_exists(os.path.dirname(log_file)) 206 try: 207 out_file = open(log_file, 'a') 208 separator = ('*' * 80) + '\n' 209 out_file.write('\n' + separator) 210 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command)) 211 out_file.write(separator) 212 except (OSError, IOError): 213 pass 214 215 if not out_file: 216 out_file = open('/dev/null', 'w') 217 218 in_devnull = open('/dev/null', 'r') 219 220 self._ensure_directory_exists(working_directory) 221 pidfile_path = os.path.join(working_directory, pidfile_name) 222 if os.path.exists(pidfile_path): 223 self._warn('Pidfile %s already exists' % pidfile_path) 224 os.remove(pidfile_path) 225 226 subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT, 227 stdin=in_devnull) 228 out_file.close() 229 in_devnull.close() 230 231 232 def write_to_file(self, file_path, contents, is_retry=False): 233 """Write the specified contents to the end of the given file. 234 235 @param file_path: Path to the file. 236 @param contents: Content to be written to the file. 237 @param is_retry: True if this is a retry after file permission be 238 corrected. 239 """ 240 self._ensure_directory_exists(os.path.dirname(file_path)) 241 try: 242 file_object = open(file_path, 'a') 243 file_object.write(contents) 244 file_object.close() 245 except IOError as e: 246 # TODO(dshi): crbug.com/459344 Remove following retry when test 247 # container can be unprivileged container. 248 # If write failed with error 'Permission denied', one possible cause 249 # is that the file was created in a container and thus owned by 250 # root. If so, fix the file permission, and try again. 251 if e.errno == 13 and not is_retry: 252 logging.error('Error write to file %s: %s. Will be retried.', 253 file_path, e) 254 utils.run('sudo chown %s "%s"' % (os.getuid(), file_path)) 255 utils.run('sudo chgrp %s "%s"' % (os.getgid(), file_path)) 256 self.write_to_file(file_path, contents, is_retry=True) 257 else: 258 self._warn('Error write to file %s: %s' % (file_path, e)) 259 260 261 def copy_file_or_directory(self, source_path, destination_path): 262 """ 263 This interface is designed to match server.hosts.abstract_ssh.get_file 264 (and send_file). That is, if the source_path ends with a slash, the 265 contents of the directory are copied; otherwise, the directory iself is 266 copied. 267 """ 268 if self._same_file(source_path, destination_path): 269 return 270 self._ensure_directory_exists(os.path.dirname(destination_path)) 271 if source_path.endswith('/'): 272 # copying a directory's contents to another directory 273 assert os.path.isdir(source_path) 274 assert os.path.isdir(destination_path) 275 for filename in os.listdir(source_path): 276 self.copy_file_or_directory( 277 os.path.join(source_path, filename), 278 os.path.join(destination_path, filename)) 279 elif os.path.isdir(source_path): 280 try: 281 shutil.copytree(source_path, destination_path, symlinks=True) 282 except shutil.Error: 283 # Ignore copy directory error due to missing files. The cause 284 # of this behavior is that, gs_offloader zips up folders with 285 # too many files. There is a race condition that repair job 286 # tries to copy provision job results to the test job result 287 # folder, meanwhile gs_offloader is uploading the provision job 288 # result and zipping up folders which contains too many files. 289 pass 290 elif os.path.islink(source_path): 291 # copied from shutil.copytree() 292 link_to = os.readlink(source_path) 293 os.symlink(link_to, destination_path) 294 else: 295 try: 296 shutil.copy(source_path, destination_path) 297 except IOError: 298 # Ignore copy error following the same above reason. 299 pass 300 301 302 def _same_file(self, source_path, destination_path): 303 """Checks if the source and destination are the same 304 305 Returns True if the destination is the same as the source, False 306 otherwise. Also returns False if the destination does not exist. 307 """ 308 if not os.path.exists(destination_path): 309 return False 310 return os.path.samefile(source_path, destination_path) 311 312 313 def cleanup_orphaned_containers(self): 314 """Run lxc_cleanup script to clean up orphaned container. 315 """ 316 # The script needs to run with sudo as the containers are privileged. 317 # TODO(dshi): crbug.com/459344 Call lxc_cleanup.main when test 318 # container can be unprivileged container. 319 command = ['sudo', LXC_CLEANUP_SCRIPT, '-x', '-v', '-l', 320 LXC_CLEANUP_LOG_FILE] 321 logging.info('Running %s', command) 322 # stdout and stderr needs to be direct to /dev/null, otherwise existing 323 # of drone_utils process will kill lxc_cleanup script. 324 subprocess.Popen( 325 command, shell=False, stdin=None, stdout=open('/dev/null', 'w'), 326 stderr=open('/dev/null', 'a'), preexec_fn=os.setpgrp) 327 328 329 def wait_for_all_async_commands(self): 330 for subproc in self._subcommands: 331 subproc.fork_waitfor() 332 self._subcommands = [] 333 334 335 def _poll_async_commands(self): 336 still_running = [] 337 for subproc in self._subcommands: 338 if subproc.poll() is None: 339 still_running.append(subproc) 340 self._subcommands = still_running 341 342 343 def _wait_for_some_async_commands(self): 344 self._poll_async_commands() 345 max_processes = scheduler_config.config.max_transfer_processes 346 while len(self._subcommands) >= max_processes: 347 time.sleep(1) 348 self._poll_async_commands() 349 350 351 def run_async_command(self, function, args): 352 subproc = subcommand.subcommand(function, args) 353 self._subcommands.append(subproc) 354 subproc.fork_start() 355 356 357 def _sync_get_file_from(self, hostname, source_path, destination_path): 358 logging.debug('_sync_get_file_from hostname: %s, source_path: %s,' 359 'destination_path: %s', hostname, source_path, 360 destination_path) 361 self._ensure_directory_exists(os.path.dirname(destination_path)) 362 host = create_host(hostname) 363 host.get_file(source_path, destination_path, delete_dest=True) 364 365 366 def get_file_from(self, hostname, source_path, destination_path): 367 self.run_async_command(self._sync_get_file_from, 368 (hostname, source_path, destination_path)) 369 370 371 def _sync_send_file_to(self, hostname, source_path, destination_path, 372 can_fail): 373 logging.debug('_sync_send_file_to. hostname: %s, source_path: %s, ' 374 'destination_path: %s, can_fail:%s', hostname, 375 source_path, destination_path, can_fail) 376 host = create_host(hostname) 377 try: 378 host.run('mkdir -p ' + os.path.dirname(destination_path)) 379 host.send_file(source_path, destination_path, delete_dest=True) 380 except error.AutoservError: 381 if not can_fail: 382 raise 383 384 if os.path.isdir(source_path): 385 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE) 386 file_object = open(failed_file, 'w') 387 try: 388 file_object.write('%s:%s\n%s\n%s' % 389 (hostname, destination_path, 390 datetime.datetime.now(), 391 traceback.format_exc())) 392 finally: 393 file_object.close() 394 else: 395 copy_to = destination_path + _TRANSFER_FAILED_FILE 396 self._ensure_directory_exists(os.path.dirname(copy_to)) 397 self.copy_file_or_directory(source_path, copy_to) 398 399 400 def send_file_to(self, hostname, source_path, destination_path, 401 can_fail=False): 402 self.run_async_command(self._sync_send_file_to, 403 (hostname, source_path, destination_path, 404 can_fail)) 405 406 407 def _report_long_execution(self, calls, duration): 408 call_count = {} 409 for call in calls: 410 call_count.setdefault(call._method, 0) 411 call_count[call._method] += 1 412 call_summary = '\n'.join('%d %s' % (count, method) 413 for method, count in call_count.iteritems()) 414 self._warn('Execution took %f sec\n%s' % (duration, call_summary)) 415 416 417 def execute_calls(self, calls): 418 results = [] 419 start_time = time.time() 420 max_processes = scheduler_config.config.max_transfer_processes 421 for method_call in calls: 422 results.append(method_call.execute_on(self)) 423 if len(self._subcommands) >= max_processes: 424 self._wait_for_some_async_commands() 425 self.wait_for_all_async_commands() 426 427 duration = time.time() - start_time 428 if duration > self._WARNING_DURATION: 429 self._report_long_execution(calls, duration) 430 431 warnings = self.warnings 432 self.warnings = [] 433 return dict(results=results, warnings=warnings) 434 435 436_MAX_REFRESH_POOL_SIZE = 50 437 438class ProcessRefresher(object): 439 """Object to refresh process information from give pidfiles. 440 441 Usage: ProcessRefresh(True)(pidfile_list) 442 """ 443 444 def __init__(self, check_mark, use_pool=False): 445 """ 446 @param check_mark: If True, only consider processes that were 447 explicitly marked by a former drone_utility call as autotest 448 related processes. 449 @param use_pool: If True, use a multiprocessing.Pool to parallelize 450 costly operations. 451 """ 452 self._check_mark = check_mark 453 self._use_pool = use_pool 454 self._pool = None 455 456 457 def __call__(self, pidfile_paths): 458 """ 459 @param pidfile_paths: A list of paths to check for pidfiles. 460 461 @returns (result, warnings) 462 where result is a dict with the following keys: 463 - pidfiles: dict mapping pidfile paths to file contents, for 464 pidfiles that exist. 465 - all_processes: list of dicts corresponding to all running 466 processes. Each dict contain pid, pgid, ppid, comm, and args (see 467 "man ps" for details). 468 - autoserv_processes: likewise, restricted to autoserv processes. 469 - parse_processes: likewise, restricted to parse processes. 470 - pidfiles_second_read: same info as pidfiles, but gathered after 471 the processes are scanned. 472 and warnings is a list of warnings genearted during process refresh. 473 """ 474 475 if self._use_pool: 476 pool_size = max( 477 min(len(pidfile_paths), _MAX_REFRESH_POOL_SIZE), 478 1) 479 self._pool = multiprocessing.Pool(pool_size) 480 else: 481 pool_size = 0 482 logging.info('Refreshing %d pidfiles with %d helper processes', 483 len(pidfile_paths), pool_size) 484 485 warnings = [] 486 # It is necessary to explicitly force this to be a list because results 487 # are pickled by DroneUtility. 488 proc_infos = list(_get_process_info()) 489 490 autoserv_processes, extra_warnings = self._filter_proc_infos( 491 proc_infos, 'autoserv') 492 warnings += extra_warnings 493 parse_processes, extra_warnings = self._filter_proc_infos(proc_infos, 494 'parse') 495 warnings += extra_warnings 496 site_parse_processes, extra_warnings = self._filter_proc_infos( 497 proc_infos, 'site_parse') 498 warnings += extra_warnings 499 500 result = { 501 'pidfiles': self._read_pidfiles(pidfile_paths), 502 'all_processes': proc_infos, 503 'autoserv_processes': autoserv_processes, 504 'parse_processes': (parse_processes + site_parse_processes), 505 'pidfiles_second_read': self._read_pidfiles(pidfile_paths), 506 } 507 return result, warnings 508 509 510 def _read_pidfiles(self, pidfile_paths): 511 """Uses a process pool to read requested pidfile_paths.""" 512 if self._use_pool: 513 contents = self._pool.map(_read_pidfile, pidfile_paths) 514 contents = [c for c in contents if c is not None] 515 return {k: v for k, v in contents} 516 else: 517 pidfiles = {} 518 for path in pidfile_paths: 519 content = _read_pidfile(path) 520 if content is None: 521 continue 522 pidfiles[content.path] = content.content 523 return pidfiles 524 525 526 def _filter_proc_infos(self, proc_infos, command_name): 527 """Filters process info for the given command_name. 528 529 Examines ps output as returned by get_process_info and return 530 the process dicts for processes matching the given command name. 531 532 @proc_infos: ps output as returned by _get_process_info. 533 @param command_name: The name of the command, eg 'autoserv'. 534 535 @return: (proc_infos, warnings) where proc_infos is a list of ProcInfo 536 as returned by _get_process_info and warnings is a list of 537 warnings generated while filtering. 538 """ 539 proc_infos = [info for info in proc_infos 540 if info['comm'] == command_name] 541 if not self._check_mark: 542 return proc_infos, [] 543 544 if self._use_pool: 545 dark_marks = self._pool.map( 546 _process_has_dark_mark, 547 [info['pid'] for info in proc_infos] 548 ) 549 else: 550 dark_marks = [_process_has_dark_mark(info['pid']) 551 for info in proc_infos] 552 553 marked_proc_infos = [] 554 warnings = [] 555 for marked, info in itertools.izip(dark_marks, proc_infos): 556 if marked: 557 marked_proc_infos.append(info) 558 else: 559 warnings.append( 560 '%(comm)s process pid %(pid)s has no dark mark; ' 561 'ignoring.' % info) 562 return marked_proc_infos, warnings 563 564 565def create_host(hostname): 566 # TODO(crbug.com/739466) Delay import to avoid a ~0.7 second penalty 567 # drone_utility calls that don't actually interact with DUTs. 568 from autotest_lib.server import hosts 569 username = global_config.global_config.get_config_value( 570 'SCHEDULER', hostname + '_username', default=getpass.getuser()) 571 return hosts.SSHHost(hostname, user=username) 572 573 574def parse_input(): 575 input_chunks = [] 576 chunk_of_input = sys.stdin.read() 577 while chunk_of_input: 578 input_chunks.append(chunk_of_input) 579 chunk_of_input = sys.stdin.read() 580 pickled_input = ''.join(input_chunks) 581 582 try: 583 return pickle.loads(pickled_input) 584 except Exception: 585 separator = '*' * 50 586 raise ValueError('Unpickling input failed\n' 587 'Input: %r\n' 588 'Exception from pickle:\n' 589 '%s\n%s\n%s' % 590 (pickled_input, separator, traceback.format_exc(), 591 separator)) 592 593 594def _parse_args(args): 595 parser = argparse.ArgumentParser(description='Local drone process manager.') 596 parser.add_argument('--call_time', 597 help='Time this process was invoked from the master', 598 default=None, type=float) 599 return parser.parse_args(args) 600 601 602def return_data(data): 603 print pickle.dumps(data) 604 605def _process_has_dark_mark(pid): 606 """Checks if a process was launched earlier by drone_utility. 607 608 @param pid: The pid of the process to check. 609 """ 610 try: 611 with open('/proc/%s/environ' % pid, 'rb') as env_file: 612 env_data = env_file.read() 613 except EnvironmentError: 614 return False 615 return DARK_MARK_ENVIRONMENT_VAR in env_data 616 617 618_PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args') 619def _get_process_info(): 620 """Parse ps output for all process information. 621 622 @returns A generator of dicts. Each dict has the following keys: 623 - comm: command_name, 624 - pgid: process group id, 625 - ppid: parent process id, 626 - pid: process id, 627 - args: args the command was invoked with, 628 """ 629 @retry.retry(subprocess.CalledProcessError, 630 timeout_min=0.5, delay_sec=0.25) 631 def run_ps(): 632 return subprocess.check_output( 633 ['/bin/ps', '--no-header', 'x', '-o', ','.join(_PS_ARGS)]) 634 635 ps_output = run_ps() 636 # split each line into the columns output by ps 637 split_lines = [line.split(None, 4) for line in ps_output.splitlines()] 638 return (dict(itertools.izip(_PS_ARGS, line_components)) 639 for line_components in split_lines) 640 641 642_PidfileContent = collections.namedtuple('_PidfileContent', ['path', 'content']) 643def _read_pidfile(pidfile_path): 644 """Reads the content of the given pidfile if it exists 645 646 @param: pidfile_path: Path of the file to read. 647 @returns: _PidfileContent tuple on success, None otherwise. 648 """ 649 if not os.path.exists(pidfile_path): 650 return None 651 try: 652 with open(pidfile_path, 'r') as file_object: 653 return _PidfileContent(pidfile_path, file_object.read()) 654 except IOError: 655 return None 656 657 658def main(): 659 logging_manager.configure_logging( 660 drone_logging_config.DroneLoggingConfig()) 661 calls = parse_input() 662 args = _parse_args(sys.argv[1:]) 663 664 drone_utility = DroneUtility() 665 return_value = drone_utility.execute_calls(calls) 666 return_data(return_value) 667 668 669if __name__ == '__main__': 670 main() 671