1import heapq 2import os 3import logging 4 5import common 6from autotest_lib.client.common_lib import error 7from autotest_lib.client.common_lib import global_config 8from autotest_lib.client.common_lib import utils 9from autotest_lib.scheduler import drone_task_queue 10from autotest_lib.scheduler import drone_utility 11from autotest_lib.scheduler import drones 12from autotest_lib.scheduler import scheduler_config 13from autotest_lib.scheduler import thread_lib 14 15try: 16 from chromite.lib import metrics 17except ImportError: 18 metrics = utils.metrics_mock 19 20 21# results on drones will be placed under the drone_installation_directory in a 22# directory with this name 23_DRONE_RESULTS_DIR_SUFFIX = 'results' 24 25WORKING_DIRECTORY = object() # see execute_command() 26 27 28AUTOSERV_PID_FILE = '.autoserv_execute' 29CRASHINFO_PID_FILE = '.collect_crashinfo_execute' 30PARSER_PID_FILE = '.parser_execute' 31ARCHIVER_PID_FILE = '.archiver_execute' 32 33ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE, 34 ARCHIVER_PID_FILE) 35 36_THREADED_DRONE_MANAGER = global_config.global_config.get_config_value( 37 scheduler_config.CONFIG_SECTION, 'threaded_drone_manager', 38 type=bool, default=True) 39 40HOSTS_JOB_SUBDIR = 'hosts/' 41PARSE_LOG = '.parse.log' 42ENABLE_ARCHIVING = global_config.global_config.get_config_value( 43 scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool) 44 45 46class DroneManagerError(Exception): 47 pass 48 49 50class CustomEquals(object): 51 def _id(self): 52 raise NotImplementedError 53 54 55 def __eq__(self, other): 56 if not isinstance(other, type(self)): 57 return NotImplemented 58 return self._id() == other._id() 59 60 61 def __ne__(self, other): 62 return not self == other 63 64 65 def __hash__(self): 66 return hash(self._id()) 67 68 69class Process(CustomEquals): 70 def __init__(self, hostname, pid, ppid=None): 71 self.hostname = hostname 72 self.pid = pid 73 self.ppid = ppid 74 75 def _id(self): 76 return (self.hostname, self.pid) 77 78 79 def __str__(self): 80 return '%s/%s' % (self.hostname, self.pid) 81 82 83 def __repr__(self): 84 return super(Process, self).__repr__() + '<%s>' % self 85 86 87class PidfileId(CustomEquals): 88 def __init__(self, path): 89 self.path = path 90 91 92 def _id(self): 93 return self.path 94 95 96 def __str__(self): 97 return str(self.path) 98 99 100class _PidfileInfo(object): 101 age = 0 102 num_processes = None 103 104 105class PidfileContents(object): 106 process = None 107 exit_status = None 108 num_tests_failed = None 109 110 def is_invalid(self): 111 return False 112 113 114 def is_running(self): 115 return self.process and not self.exit_status 116 117 118class InvalidPidfile(object): 119 process = None 120 exit_status = None 121 num_tests_failed = None 122 123 124 def __init__(self, error): 125 self.error = error 126 127 128 def is_invalid(self): 129 return True 130 131 132 def is_running(self): 133 return False 134 135 136 def __str__(self): 137 return self.error 138 139 140class _DroneHeapWrapper(object): 141 """Wrapper to compare drones based on used_capacity(). 142 143 These objects can be used to keep a heap of drones by capacity. 144 """ 145 def __init__(self, drone): 146 self.drone = drone 147 148 149 def __cmp__(self, other): 150 assert isinstance(other, _DroneHeapWrapper) 151 return cmp(self.drone.used_capacity(), other.drone.used_capacity()) 152 153 154class DroneManager(object): 155 """ 156 This class acts as an interface from the scheduler to drones, whether it be 157 only a single "drone" for localhost or multiple remote drones. 158 159 All paths going into and out of this class are relative to the full results 160 directory, except for those returns by absolute_path(). 161 """ 162 163 164 # Minimum time to wait before next email 165 # about a drone hitting process limit is sent. 166 NOTIFY_INTERVAL = 60 * 60 * 24 # one day 167 _STATS_KEY = 'drone_manager' 168 169 _ACTIVE_PROCESS_GAUGE = metrics.Gauge( 170 'chromeos/autotest/drone/active_processes') 171 172 173 def __init__(self): 174 # absolute path of base results dir 175 self._results_dir = None 176 # holds Process objects 177 self._process_set = set() 178 # holds the list of all processes running on all drones 179 self._all_processes = {} 180 # maps PidfileId to PidfileContents 181 self._pidfiles = {} 182 # same as _pidfiles 183 self._pidfiles_second_read = {} 184 # maps PidfileId to _PidfileInfo 185 self._registered_pidfile_info = {} 186 # used to generate unique temporary paths 187 self._temporary_path_counter = 0 188 # maps hostname to Drone object 189 self._drones = {} 190 self._results_drone = None 191 # maps results dir to dict mapping file path to contents 192 self._attached_files = {} 193 # heapq of _DroneHeapWrappers 194 self._drone_queue = [] 195 # A threaded task queue used to refresh drones asynchronously. 196 if _THREADED_DRONE_MANAGER: 197 self._refresh_task_queue = thread_lib.ThreadedTaskQueue( 198 name='%s.refresh_queue' % self._STATS_KEY) 199 else: 200 self._refresh_task_queue = drone_task_queue.DroneTaskQueue() 201 202 203 def initialize(self, base_results_dir, drone_hostnames, 204 results_repository_hostname): 205 self._results_dir = base_results_dir 206 207 for hostname in drone_hostnames: 208 self._add_drone(hostname) 209 210 if not self._drones: 211 # all drones failed to initialize 212 raise DroneManagerError('No valid drones found') 213 214 self.refresh_drone_configs() 215 216 logging.info('Using results repository on %s', 217 results_repository_hostname) 218 self._results_drone = drones.get_drone(results_repository_hostname) 219 results_installation_dir = global_config.global_config.get_config_value( 220 scheduler_config.CONFIG_SECTION, 221 'results_host_installation_directory', default=None) 222 if results_installation_dir: 223 self._results_drone.set_autotest_install_dir( 224 results_installation_dir) 225 # don't initialize() the results drone - we don't want to clear out any 226 # directories and we don't need to kill any processes 227 228 229 def reinitialize_drones(self): 230 for drone in self.get_drones(): 231 with metrics.SecondsTimer( 232 'chromeos/autotest/drone_manager/' 233 'reinitialize_drones_duration', 234 fields={'drone': drone.hostname}): 235 drone.call('initialize', self._results_dir) 236 237 238 def shutdown(self): 239 for drone in self.get_drones(): 240 drone.shutdown() 241 242 243 def _get_max_pidfile_refreshes(self): 244 """ 245 Normally refresh() is called on every monitor_db.Dispatcher.tick(). 246 247 @returns: The number of refresh() calls before we forget a pidfile. 248 """ 249 pidfile_timeout = global_config.global_config.get_config_value( 250 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes', 251 type=int, default=2000) 252 return pidfile_timeout 253 254 255 def _add_drone(self, hostname): 256 """ 257 Add drone. 258 259 Catches AutoservRunError if the drone fails initialization and does not 260 add it to the list of usable drones. 261 262 @param hostname: Hostname of the drone we are trying to add. 263 """ 264 logging.info('Adding drone %s' % hostname) 265 drone = drones.get_drone(hostname) 266 if drone: 267 try: 268 drone.call('initialize', self.absolute_path('')) 269 except error.AutoservRunError as e: 270 logging.error('Failed to initialize drone %s with error: %s', 271 hostname, e) 272 return 273 self._drones[drone.hostname] = drone 274 275 276 def _remove_drone(self, hostname): 277 self._drones.pop(hostname, None) 278 279 280 def refresh_drone_configs(self): 281 """ 282 Reread global config options for all drones. 283 """ 284 # Import server_manager_utils is delayed rather than at the beginning of 285 # this module. The reason is that test_that imports drone_manager when 286 # importing autoserv_utils. The import is done before test_that setup 287 # django (test_that only setup django in setup_local_afe, since it's 288 # not needed when test_that runs the test in a lab duts through :lab: 289 # option. Therefore, if server_manager_utils is imported at the 290 # beginning of this module, test_that will fail since django is not 291 # setup yet. 292 from autotest_lib.site_utils import server_manager_utils 293 config = global_config.global_config 294 section = scheduler_config.CONFIG_SECTION 295 config.parse_config_file() 296 for hostname, drone in self._drones.iteritems(): 297 if server_manager_utils.use_server_db(): 298 server = server_manager_utils.get_servers(hostname=hostname)[0] 299 attributes = dict([(a.attribute, a.value) 300 for a in server.attributes.all()]) 301 drone.enabled = ( 302 int(attributes.get('disabled', 0)) == 0) 303 drone.max_processes = int( 304 attributes.get( 305 'max_processes', 306 scheduler_config.config.max_processes_per_drone)) 307 allowed_users = attributes.get('users', None) 308 else: 309 disabled = config.get_config_value( 310 section, '%s_disabled' % hostname, default='') 311 drone.enabled = not bool(disabled) 312 drone.max_processes = config.get_config_value( 313 section, '%s_max_processes' % hostname, type=int, 314 default=scheduler_config.config.max_processes_per_drone) 315 316 allowed_users = config.get_config_value( 317 section, '%s_users' % hostname, default=None) 318 if allowed_users: 319 drone.allowed_users = set(allowed_users.split()) 320 else: 321 drone.allowed_users = None 322 logging.info('Drone %s.max_processes: %s', hostname, 323 drone.max_processes) 324 logging.info('Drone %s.enabled: %s', hostname, drone.enabled) 325 logging.info('Drone %s.allowed_users: %s', hostname, 326 drone.allowed_users) 327 logging.info('Drone %s.support_ssp: %s', hostname, 328 drone.support_ssp) 329 330 self._reorder_drone_queue() # max_processes may have changed 331 # Clear notification record about reaching max_processes limit. 332 self._notify_record = {} 333 334 335 def get_drones(self): 336 return self._drones.itervalues() 337 338 339 def cleanup_orphaned_containers(self): 340 """Queue cleanup_orphaned_containers call at each drone. 341 """ 342 for drone in self._drones.values(): 343 logging.info('Queue cleanup_orphaned_containers at %s', 344 drone.hostname) 345 drone.queue_call('cleanup_orphaned_containers') 346 347 348 def _get_drone_for_process(self, process): 349 return self._drones[process.hostname] 350 351 352 def _get_drone_for_pidfile_id(self, pidfile_id): 353 pidfile_contents = self.get_pidfile_contents(pidfile_id) 354 assert pidfile_contents.process is not None 355 return self._get_drone_for_process(pidfile_contents.process) 356 357 358 def _drop_old_pidfiles(self): 359 # use items() since the dict is modified in unregister_pidfile() 360 for pidfile_id, info in self._registered_pidfile_info.items(): 361 if info.age > self._get_max_pidfile_refreshes(): 362 logging.warning('dropping leaked pidfile %s', pidfile_id) 363 self.unregister_pidfile(pidfile_id) 364 else: 365 info.age += 1 366 367 368 def _reset(self): 369 self._process_set = set() 370 self._all_processes = {} 371 self._pidfiles = {} 372 self._pidfiles_second_read = {} 373 self._drone_queue = [] 374 375 376 def _parse_pidfile(self, drone, raw_contents): 377 """Parse raw pidfile contents. 378 379 @param drone: The drone on which this pidfile was found. 380 @param raw_contents: The raw contents of a pidfile, eg: 381 "pid\nexit_staus\nnum_tests_failed\n". 382 """ 383 contents = PidfileContents() 384 if not raw_contents: 385 return contents 386 lines = raw_contents.splitlines() 387 if len(lines) > 3: 388 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' % 389 (len(lines), lines)) 390 try: 391 pid = int(lines[0]) 392 contents.process = Process(drone.hostname, pid) 393 # if len(lines) == 2, assume we caught Autoserv between writing 394 # exit_status and num_failed_tests, so just ignore it and wait for 395 # the next cycle 396 if len(lines) == 3: 397 contents.exit_status = int(lines[1]) 398 contents.num_tests_failed = int(lines[2]) 399 except ValueError, exc: 400 return InvalidPidfile('Corrupt pid file: ' + str(exc.args)) 401 402 return contents 403 404 405 def _process_pidfiles(self, drone, pidfiles, store_in_dict): 406 for pidfile_path, contents in pidfiles.iteritems(): 407 pidfile_id = PidfileId(pidfile_path) 408 contents = self._parse_pidfile(drone, contents) 409 store_in_dict[pidfile_id] = contents 410 411 412 def _add_process(self, drone, process_info): 413 process = Process(drone.hostname, int(process_info['pid']), 414 int(process_info['ppid'])) 415 self._process_set.add(process) 416 417 418 def _add_autoserv_process(self, drone, process_info): 419 assert process_info['comm'] == 'autoserv' 420 # only root autoserv processes have pgid == pid 421 if process_info['pgid'] != process_info['pid']: 422 return 423 self._add_process(drone, process_info) 424 425 426 def _enqueue_drone(self, drone): 427 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone)) 428 429 430 def _reorder_drone_queue(self): 431 heapq.heapify(self._drone_queue) 432 433 434 def _compute_active_processes(self, drone): 435 drone.active_processes = 0 436 for pidfile_id, contents in self._pidfiles.iteritems(): 437 is_running = contents.exit_status is None 438 on_this_drone = (contents.process 439 and contents.process.hostname == drone.hostname) 440 if is_running and on_this_drone: 441 info = self._registered_pidfile_info[pidfile_id] 442 if info.num_processes is not None: 443 drone.active_processes += info.num_processes 444 self._ACTIVE_PROCESS_GAUGE.set( 445 drone.active_processes, 446 fields={'drone_hostname': drone.hostname}) 447 448 449 def _check_drone_process_limit(self, drone): 450 """ 451 Notify if the number of processes on |drone| is approaching limit. 452 453 @param drone: A Drone object. 454 """ 455 try: 456 percent = float(drone.active_processes) / drone.max_processes 457 except ZeroDivisionError: 458 percent = 100 459 metrics.Float('chromeos/autotest/drone/active_process_percentage' 460 ).set(percent, fields={'drone_hostname': drone.hostname}) 461 462 def trigger_refresh(self): 463 """Triggers a drone manager refresh. 464 465 @raises DroneManagerError: If a drone has un-executed calls. 466 Since they will get clobbered when we queue refresh calls. 467 """ 468 self._reset() 469 self._drop_old_pidfiles() 470 pidfile_paths = [pidfile_id.path 471 for pidfile_id in self._registered_pidfile_info] 472 drones = list(self.get_drones()) 473 for drone in drones: 474 calls = drone.get_calls() 475 if calls: 476 raise DroneManagerError('Drone %s has un-executed calls: %s ' 477 'which might get corrupted through ' 478 'this invocation' % 479 (drone, [str(call) for call in calls])) 480 drone.queue_call('refresh', pidfile_paths) 481 logging.info("Invoking drone refresh.") 482 with metrics.SecondsTimer( 483 'chromeos/autotest/drone_manager/trigger_refresh_duration'): 484 self._refresh_task_queue.execute(drones, wait=False) 485 486 487 def sync_refresh(self): 488 """Complete the drone refresh started by trigger_refresh. 489 490 Waits for all drone threads then refreshes internal datastructures 491 with drone process information. 492 """ 493 494 # This gives us a dictionary like what follows: 495 # {drone: [{'pidfiles': (raw contents of pidfile paths), 496 # 'autoserv_processes': (autoserv process info from ps), 497 # 'all_processes': (all process info from ps), 498 # 'parse_processes': (parse process infor from ps), 499 # 'pidfile_second_read': (pidfile contents, again),}] 500 # drone2: ...} 501 # The values of each drone are only a list because this adheres to the 502 # drone utility interface (each call is executed and its results are 503 # places in a list, but since we never couple the refresh calls with 504 # any other call, this list will always contain a single dict). 505 with metrics.SecondsTimer( 506 'chromeos/autotest/drone_manager/sync_refresh_duration'): 507 all_results = self._refresh_task_queue.get_results() 508 logging.info("Drones refreshed.") 509 510 # The loop below goes through and parses pidfile contents. Pidfiles 511 # are used to track autoserv execution, and will always contain < 3 512 # lines of the following: pid, exit code, number of tests. Each pidfile 513 # is identified by a PidfileId object, which contains a unique pidfile 514 # path (unique because it contains the job id) making it hashable. 515 # All pidfiles are stored in the drone managers _pidfiles dict as: 516 # {pidfile_id: pidfile_contents(Process(drone, pid), 517 # exit_code, num_tests_failed)} 518 # In handle agents, each agent knows its pidfile_id, and uses this 519 # to retrieve the refreshed contents of its pidfile via the 520 # PidfileRunMonitor (through its tick) before making decisions. If 521 # the agent notices that its process has exited, it unregisters the 522 # pidfile from the drone_managers._registered_pidfile_info dict 523 # through its epilog. 524 for drone, results_list in all_results.iteritems(): 525 results = results_list[0] 526 drone_hostname = drone.hostname.replace('.', '_') 527 528 for process_info in results['all_processes']: 529 if process_info['comm'] == 'autoserv': 530 self._add_autoserv_process(drone, process_info) 531 drone_pid = drone.hostname, int(process_info['pid']) 532 self._all_processes[drone_pid] = process_info 533 534 for process_info in results['parse_processes']: 535 self._add_process(drone, process_info) 536 537 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles) 538 self._process_pidfiles(drone, results['pidfiles_second_read'], 539 self._pidfiles_second_read) 540 541 self._compute_active_processes(drone) 542 if drone.enabled: 543 self._enqueue_drone(drone) 544 self._check_drone_process_limit(drone) 545 546 547 def refresh(self): 548 """Refresh all drones.""" 549 with metrics.SecondsTimer( 550 'chromeos/autotest/drone_manager/refresh_duration'): 551 self.trigger_refresh() 552 self.sync_refresh() 553 554 555 @metrics.SecondsTimerDecorator( 556 'chromeos/autotest/drone_manager/execute_actions_duration') 557 def execute_actions(self): 558 """ 559 Called at the end of a scheduler cycle to execute all queued actions 560 on drones. 561 """ 562 # Invoke calls queued on all drones since the last call to execute 563 # and wait for them to return. 564 if _THREADED_DRONE_MANAGER: 565 thread_lib.ThreadedTaskQueue( 566 name='%s.execute_queue' % self._STATS_KEY).execute( 567 self._drones.values()) 568 else: 569 drone_task_queue.DroneTaskQueue().execute(self._drones.values()) 570 571 try: 572 self._results_drone.execute_queued_calls() 573 except error.AutoservError: 574 m = 'chromeos/autotest/errors/results_repository_failed' 575 metrics.Counter(m).increment( 576 fields={'drone_hostname': self._results_drone.hostname}) 577 self._results_drone.clear_call_queue() 578 579 580 def get_orphaned_autoserv_processes(self): 581 """ 582 Returns a set of Process objects for orphaned processes only. 583 """ 584 return set(process for process in self._process_set 585 if process.ppid == 1) 586 587 588 def kill_process(self, process): 589 """ 590 Kill the given process. 591 """ 592 logging.info('killing %s', process) 593 drone = self._get_drone_for_process(process) 594 drone.queue_kill_process(process) 595 596 597 def _ensure_directory_exists(self, path): 598 if not os.path.exists(path): 599 os.makedirs(path) 600 601 602 def total_running_processes(self): 603 return sum(drone.active_processes for drone in self.get_drones()) 604 605 606 def max_runnable_processes(self, username, drone_hostnames_allowed): 607 """ 608 Return the maximum number of processes that can be run (in a single 609 execution) given the current load on drones. 610 @param username: login of user to run a process. may be None. 611 @param drone_hostnames_allowed: list of drones that can be used. May be 612 None 613 """ 614 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue 615 if wrapper.drone.usable_by(username) and 616 (drone_hostnames_allowed is None or 617 wrapper.drone.hostname in 618 drone_hostnames_allowed)] 619 if not usable_drone_wrappers: 620 # all drones disabled or inaccessible 621 return 0 622 runnable_processes = [ 623 wrapper.drone.max_processes - wrapper.drone.active_processes 624 for wrapper in usable_drone_wrappers] 625 return max([0] + runnable_processes) 626 627 628 def _least_loaded_drone(self, drones): 629 drone_to_use = drones[0] 630 for drone in drones[1:]: 631 if drone.used_capacity() < drone_to_use.used_capacity(): 632 drone_to_use = drone 633 return drone_to_use 634 635 636 def _choose_drone_for_execution(self, num_processes, username, 637 drone_hostnames_allowed, 638 require_ssp=False): 639 """Choose a drone to execute command. 640 641 @param num_processes: Number of processes needed for execution. 642 @param username: Name of the user to execute the command. 643 @param drone_hostnames_allowed: A list of names of drone allowed. 644 @param require_ssp: Require server-side packaging to execute the, 645 command, default to False. 646 647 @return: A drone object to be used for execution. 648 """ 649 # cycle through drones is order of increasing used capacity until 650 # we find one that can handle these processes 651 checked_drones = [] 652 usable_drones = [] 653 # Drones do not support server-side packaging, used as backup if no 654 # drone is found to run command requires server-side packaging. 655 no_ssp_drones = [] 656 drone_to_use = None 657 while self._drone_queue: 658 drone = heapq.heappop(self._drone_queue).drone 659 checked_drones.append(drone) 660 logging.info('Checking drone %s', drone.hostname) 661 if not drone.usable_by(username): 662 continue 663 664 drone_allowed = (drone_hostnames_allowed is None 665 or drone.hostname in drone_hostnames_allowed) 666 if not drone_allowed: 667 logging.debug('Drone %s not allowed: ', drone.hostname) 668 continue 669 if require_ssp and not drone.support_ssp: 670 logging.debug('Drone %s does not support server-side ' 671 'packaging.', drone.hostname) 672 no_ssp_drones.append(drone) 673 continue 674 675 usable_drones.append(drone) 676 677 if drone.active_processes + num_processes <= drone.max_processes: 678 drone_to_use = drone 679 break 680 logging.info('Drone %s has %d active + %s requested > %s max', 681 drone.hostname, drone.active_processes, num_processes, 682 drone.max_processes) 683 684 if not drone_to_use and usable_drones: 685 # Drones are all over loaded, pick the one with least load. 686 drone_summary = ','.join('%s %s/%s' % (drone.hostname, 687 drone.active_processes, 688 drone.max_processes) 689 for drone in usable_drones) 690 logging.error('No drone has capacity to handle %d processes (%s) ' 691 'for user %s', num_processes, drone_summary, username) 692 drone_to_use = self._least_loaded_drone(usable_drones) 693 elif not drone_to_use and require_ssp and no_ssp_drones: 694 # No drone supports server-side packaging, choose the least loaded. 695 drone_to_use = self._least_loaded_drone(no_ssp_drones) 696 697 # refill _drone_queue 698 for drone in checked_drones: 699 self._enqueue_drone(drone) 700 701 return drone_to_use 702 703 704 def _substitute_working_directory_into_command(self, command, 705 working_directory): 706 for i, item in enumerate(command): 707 if item is WORKING_DIRECTORY: 708 command[i] = working_directory 709 710 711 def execute_command(self, command, working_directory, pidfile_name, 712 num_processes, log_file=None, paired_with_pidfile=None, 713 username=None, drone_hostnames_allowed=None): 714 """ 715 Execute the given command, taken as an argv list. 716 717 @param command: command to execute as a list. if any item is 718 WORKING_DIRECTORY, the absolute path to the working directory 719 will be substituted for it. 720 @param working_directory: directory in which the pidfile will be written 721 @param pidfile_name: name of the pidfile this process will write 722 @param num_processes: number of processes to account for from this 723 execution 724 @param log_file (optional): path (in the results repository) to hold 725 command output. 726 @param paired_with_pidfile (optional): a PidfileId for an 727 already-executed process; the new process will execute on the 728 same drone as the previous process. 729 @param username (optional): login of the user responsible for this 730 process. 731 @param drone_hostnames_allowed (optional): hostnames of the drones that 732 this command is allowed to 733 execute on 734 """ 735 abs_working_directory = self.absolute_path(working_directory) 736 if not log_file: 737 log_file = self.get_temporary_path('execute') 738 log_file = self.absolute_path(log_file) 739 740 self._substitute_working_directory_into_command(command, 741 abs_working_directory) 742 743 if paired_with_pidfile: 744 drone = self._get_drone_for_pidfile_id(paired_with_pidfile) 745 else: 746 require_ssp = '--require-ssp' in command 747 drone = self._choose_drone_for_execution( 748 num_processes, username, drone_hostnames_allowed, 749 require_ssp=require_ssp) 750 # Enable --warn-no-ssp option for autoserv to log a warning and run 751 # the command without using server-side packaging. 752 if require_ssp and not drone.support_ssp: 753 command.append('--warn-no-ssp') 754 755 if not drone: 756 raise DroneManagerError('command failed; no drones available: %s' 757 % command) 758 759 logging.info("command = %s", command) 760 logging.info('log file = %s:%s', drone.hostname, log_file) 761 self._write_attached_files(working_directory, drone) 762 drone.queue_call('execute_command', command, abs_working_directory, 763 log_file, pidfile_name) 764 drone.active_processes += num_processes 765 self._reorder_drone_queue() 766 767 pidfile_path = os.path.join(abs_working_directory, pidfile_name) 768 pidfile_id = PidfileId(pidfile_path) 769 self.register_pidfile(pidfile_id) 770 self._registered_pidfile_info[pidfile_id].num_processes = num_processes 771 return pidfile_id 772 773 774 def get_pidfile_id_from(self, execution_tag, pidfile_name): 775 path = os.path.join(self.absolute_path(execution_tag), pidfile_name) 776 return PidfileId(path) 777 778 779 def register_pidfile(self, pidfile_id): 780 """ 781 Indicate that the DroneManager should look for the given pidfile when 782 refreshing. 783 """ 784 if pidfile_id not in self._registered_pidfile_info: 785 logging.info('monitoring pidfile %s', pidfile_id) 786 self._registered_pidfile_info[pidfile_id] = _PidfileInfo() 787 self._reset_pidfile_age(pidfile_id) 788 789 790 def _reset_pidfile_age(self, pidfile_id): 791 if pidfile_id in self._registered_pidfile_info: 792 self._registered_pidfile_info[pidfile_id].age = 0 793 794 795 def unregister_pidfile(self, pidfile_id): 796 if pidfile_id in self._registered_pidfile_info: 797 logging.info('forgetting pidfile %s', pidfile_id) 798 del self._registered_pidfile_info[pidfile_id] 799 800 801 def declare_process_count(self, pidfile_id, num_processes): 802 self._registered_pidfile_info[pidfile_id].num_processes = num_processes 803 804 805 def get_pidfile_contents(self, pidfile_id, use_second_read=False): 806 """ 807 Retrieve a PidfileContents object for the given pidfile_id. If 808 use_second_read is True, use results that were read after the processes 809 were checked, instead of before. 810 """ 811 self._reset_pidfile_age(pidfile_id) 812 if use_second_read: 813 pidfile_map = self._pidfiles_second_read 814 else: 815 pidfile_map = self._pidfiles 816 return pidfile_map.get(pidfile_id, PidfileContents()) 817 818 819 def is_process_running(self, process): 820 """ 821 Check if the given process is in the running process list. 822 """ 823 if process in self._process_set: 824 return True 825 826 drone_pid = process.hostname, process.pid 827 if drone_pid in self._all_processes: 828 logging.error('Process %s found, but not an autoserv process. ' 829 'Is %s', process, self._all_processes[drone_pid]) 830 return True 831 832 return False 833 834 835 def get_temporary_path(self, base_name): 836 """ 837 Get a new temporary path guaranteed to be unique across all drones 838 for this scheduler execution. 839 """ 840 self._temporary_path_counter += 1 841 return os.path.join(drone_utility._TEMPORARY_DIRECTORY, 842 '%s.%s' % (base_name, self._temporary_path_counter)) 843 844 845 def absolute_path(self, path, on_results_repository=False): 846 if on_results_repository: 847 base_dir = self._results_dir 848 else: 849 base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR, 850 _DRONE_RESULTS_DIR_SUFFIX) 851 return os.path.join(base_dir, path) 852 853 854 def _copy_results_helper(self, process, source_path, destination_path, 855 to_results_repository=False): 856 logging.debug('_copy_results_helper. process: %s, source_path: %s, ' 857 'destination_path: %s, to_results_repository: %s', 858 process, source_path, destination_path, 859 to_results_repository) 860 full_source = self.absolute_path(source_path) 861 full_destination = self.absolute_path( 862 destination_path, on_results_repository=to_results_repository) 863 source_drone = self._get_drone_for_process(process) 864 if to_results_repository: 865 source_drone.send_file_to(self._results_drone, full_source, 866 full_destination, can_fail=True) 867 else: 868 source_drone.queue_call('copy_file_or_directory', full_source, 869 full_destination) 870 871 872 def copy_to_results_repository(self, process, source_path, 873 destination_path=None): 874 """ 875 Copy results from the given process at source_path to destination_path 876 in the results repository. 877 878 This will only copy the results back for Special Agent Tasks (Cleanup, 879 Verify, Repair) that reside in the hosts/ subdirectory of results if 880 the copy_task_results_back flag has been set to True inside 881 global_config.ini 882 883 It will also only copy .parse.log files back to the scheduler if the 884 copy_parse_log_back flag in global_config.ini has been set to True. 885 """ 886 if not ENABLE_ARCHIVING: 887 return 888 copy_task_results_back = global_config.global_config.get_config_value( 889 scheduler_config.CONFIG_SECTION, 'copy_task_results_back', 890 type=bool) 891 copy_parse_log_back = global_config.global_config.get_config_value( 892 scheduler_config.CONFIG_SECTION, 'copy_parse_log_back', 893 type=bool) 894 special_task = source_path.startswith(HOSTS_JOB_SUBDIR) 895 parse_log = source_path.endswith(PARSE_LOG) 896 if (copy_task_results_back or not special_task) and ( 897 copy_parse_log_back or not parse_log): 898 if destination_path is None: 899 destination_path = source_path 900 self._copy_results_helper(process, source_path, destination_path, 901 to_results_repository=True) 902 903 def _copy_to_results_repository(self, process, source_path, 904 destination_path=None): 905 """ 906 Copy results from the given process at source_path to destination_path 907 in the results repository, without special task handling. 908 """ 909 if destination_path is None: 910 destination_path = source_path 911 self._copy_results_helper(process, source_path, destination_path, 912 to_results_repository=True) 913 914 915 def copy_results_on_drone(self, process, source_path, destination_path): 916 """ 917 Copy a results directory from one place to another on the drone. 918 """ 919 self._copy_results_helper(process, source_path, destination_path) 920 921 922 def _write_attached_files(self, results_dir, drone): 923 attached_files = self._attached_files.pop(results_dir, {}) 924 for file_path, contents in attached_files.iteritems(): 925 drone.queue_call('write_to_file', self.absolute_path(file_path), 926 contents) 927 928 929 def attach_file_to_execution(self, results_dir, file_contents, 930 file_path=None): 931 """ 932 When the process for the results directory is executed, the given file 933 contents will be placed in a file on the drone. Returns the path at 934 which the file will be placed. 935 """ 936 if not file_path: 937 file_path = self.get_temporary_path('attach') 938 files_for_execution = self._attached_files.setdefault(results_dir, {}) 939 assert file_path not in files_for_execution 940 files_for_execution[file_path] = file_contents 941 return file_path 942 943 944 def write_lines_to_file(self, file_path, lines, paired_with_process=None): 945 """ 946 Write the given lines (as a list of strings) to a file. If 947 paired_with_process is given, the file will be written on the drone 948 running the given Process. Otherwise, the file will be written to the 949 results repository. 950 """ 951 file_contents = '\n'.join(lines) + '\n' 952 if paired_with_process: 953 drone = self._get_drone_for_process(paired_with_process) 954 on_results_repository = False 955 else: 956 drone = self._results_drone 957 on_results_repository = True 958 full_path = self.absolute_path( 959 file_path, on_results_repository=on_results_repository) 960 drone.queue_call('write_to_file', full_path, file_contents) 961 962 963_the_instance = None 964 965def instance(): 966 if _the_instance is None: 967 _set_instance(DroneManager()) 968 return _the_instance 969 970 971def _set_instance(instance): # usable for testing 972 global _the_instance 973 _the_instance = instance 974