1import heapq 2import os 3import logging 4 5import common 6from autotest_lib.client.common_lib import error 7from autotest_lib.client.common_lib import global_config 8from autotest_lib.client.common_lib import utils 9from autotest_lib.scheduler import drone_task_queue 10from autotest_lib.scheduler import drone_utility 11from autotest_lib.scheduler import drones 12from autotest_lib.scheduler import scheduler_config 13from autotest_lib.scheduler import thread_lib 14 15try: 16 from chromite.lib import metrics 17except ImportError: 18 metrics = utils.metrics_mock 19 20 21# results on drones will be placed under the drone_installation_directory in a 22# directory with this name 23_DRONE_RESULTS_DIR_SUFFIX = 'results' 24 25WORKING_DIRECTORY = object() # see execute_command() 26 27 28AUTOSERV_PID_FILE = '.autoserv_execute' 29CRASHINFO_PID_FILE = '.collect_crashinfo_execute' 30PARSER_PID_FILE = '.parser_execute' 31ARCHIVER_PID_FILE = '.archiver_execute' 32 33ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE, 34 ARCHIVER_PID_FILE) 35 36_THREADED_DRONE_MANAGER = global_config.global_config.get_config_value( 37 scheduler_config.CONFIG_SECTION, 'threaded_drone_manager', 38 type=bool, default=True) 39 40HOSTS_JOB_SUBDIR = 'hosts/' 41PARSE_LOG = '.parse.log' 42ENABLE_ARCHIVING = global_config.global_config.get_config_value( 43 scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool) 44 45 46class DroneManagerError(Exception): 47 pass 48 49 50class CustomEquals(object): 51 def _id(self): 52 raise NotImplementedError 53 54 55 def __eq__(self, other): 56 if not isinstance(other, type(self)): 57 return NotImplemented 58 return self._id() == other._id() 59 60 61 def __ne__(self, other): 62 return not self == other 63 64 65 def __hash__(self): 66 return hash(self._id()) 67 68 69class Process(CustomEquals): 70 def __init__(self, hostname, pid, ppid=None): 71 self.hostname = hostname 72 self.pid = pid 73 self.ppid = ppid 74 75 def _id(self): 76 return (self.hostname, self.pid) 77 78 79 def __str__(self): 80 return '%s/%s' % (self.hostname, self.pid) 81 82 83 def __repr__(self): 84 return super(Process, self).__repr__() + '<%s>' % self 85 86 87class PidfileId(CustomEquals): 88 def __init__(self, path): 89 self.path = path 90 91 92 def _id(self): 93 return self.path 94 95 96 def __str__(self): 97 return str(self.path) 98 99 100class _PidfileInfo(object): 101 age = 0 102 num_processes = None 103 104 105class PidfileContents(object): 106 process = None 107 exit_status = None 108 num_tests_failed = None 109 110 def is_invalid(self): 111 return False 112 113 114 def is_running(self): 115 return self.process and not self.exit_status 116 117 118class InvalidPidfile(object): 119 process = None 120 exit_status = None 121 num_tests_failed = None 122 123 124 def __init__(self, error): 125 self.error = error 126 127 128 def is_invalid(self): 129 return True 130 131 132 def is_running(self): 133 return False 134 135 136 def __str__(self): 137 return self.error 138 139 140class _DroneHeapWrapper(object): 141 """Wrapper to compare drones based on used_capacity(). 142 143 These objects can be used to keep a heap of drones by capacity. 144 """ 145 def __init__(self, drone): 146 self.drone = drone 147 148 149 def __cmp__(self, other): 150 assert isinstance(other, _DroneHeapWrapper) 151 return cmp(self.drone.used_capacity(), other.drone.used_capacity()) 152 153 154class DroneManager(object): 155 """ 156 This class acts as an interface from the scheduler to drones, whether it be 157 only a single "drone" for localhost or multiple remote drones. 158 159 All paths going into and out of this class are relative to the full results 160 directory, except for those returns by absolute_path(). 161 """ 162 163 164 # Minimum time to wait before next email 165 # about a drone hitting process limit is sent. 166 NOTIFY_INTERVAL = 60 * 60 * 24 # one day 167 _STATS_KEY = 'drone_manager' 168 169 170 171 def __init__(self): 172 # absolute path of base results dir 173 self._results_dir = None 174 # holds Process objects 175 self._process_set = set() 176 # holds the list of all processes running on all drones 177 self._all_processes = {} 178 # maps PidfileId to PidfileContents 179 self._pidfiles = {} 180 # same as _pidfiles 181 self._pidfiles_second_read = {} 182 # maps PidfileId to _PidfileInfo 183 self._registered_pidfile_info = {} 184 # used to generate unique temporary paths 185 self._temporary_path_counter = 0 186 # maps hostname to Drone object 187 self._drones = {} 188 self._results_drone = None 189 # maps results dir to dict mapping file path to contents 190 self._attached_files = {} 191 # heapq of _DroneHeapWrappers 192 self._drone_queue = [] 193 # A threaded task queue used to refresh drones asynchronously. 194 if _THREADED_DRONE_MANAGER: 195 self._refresh_task_queue = thread_lib.ThreadedTaskQueue( 196 name='%s.refresh_queue' % self._STATS_KEY) 197 else: 198 self._refresh_task_queue = drone_task_queue.DroneTaskQueue() 199 200 201 def initialize(self, base_results_dir, drone_hostnames, 202 results_repository_hostname): 203 self._results_dir = base_results_dir 204 205 for hostname in drone_hostnames: 206 self._add_drone(hostname) 207 208 if not self._drones: 209 # all drones failed to initialize 210 raise DroneManagerError('No valid drones found') 211 212 self.refresh_drone_configs() 213 214 logging.info('Using results repository on %s', 215 results_repository_hostname) 216 self._results_drone = drones.get_drone(results_repository_hostname) 217 results_installation_dir = global_config.global_config.get_config_value( 218 scheduler_config.CONFIG_SECTION, 219 'results_host_installation_directory', default=None) 220 if results_installation_dir: 221 self._results_drone.set_autotest_install_dir( 222 results_installation_dir) 223 # don't initialize() the results drone - we don't want to clear out any 224 # directories and we don't need to kill any processes 225 226 227 def reinitialize_drones(self): 228 for drone in self.get_drones(): 229 with metrics.SecondsTimer( 230 'chromeos/autotest/drone_manager/' 231 'reinitialize_drones_duration', 232 fields={'drone': drone.hostname}): 233 drone.call('initialize', self._results_dir) 234 235 236 def shutdown(self): 237 for drone in self.get_drones(): 238 drone.shutdown() 239 240 241 def _get_max_pidfile_refreshes(self): 242 """ 243 Normally refresh() is called on every monitor_db.Dispatcher.tick(). 244 245 @returns: The number of refresh() calls before we forget a pidfile. 246 """ 247 pidfile_timeout = global_config.global_config.get_config_value( 248 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes', 249 type=int, default=2000) 250 return pidfile_timeout 251 252 253 def _add_drone(self, hostname): 254 """ 255 Add drone. 256 257 Catches AutoservRunError if the drone fails initialization and does not 258 add it to the list of usable drones. 259 260 @param hostname: Hostname of the drone we are trying to add. 261 """ 262 logging.info('Adding drone %s' % hostname) 263 drone = drones.get_drone(hostname) 264 if drone: 265 try: 266 drone.call('initialize', self.absolute_path('')) 267 except error.AutoservRunError as e: 268 logging.error('Failed to initialize drone %s with error: %s', 269 hostname, e) 270 return 271 self._drones[drone.hostname] = drone 272 273 274 def _remove_drone(self, hostname): 275 self._drones.pop(hostname, None) 276 277 278 def refresh_drone_configs(self): 279 """ 280 Reread global config options for all drones. 281 """ 282 # Import server_manager_utils is delayed rather than at the beginning of 283 # this module. The reason is that test_that imports drone_manager when 284 # importing autoserv_utils. The import is done before test_that setup 285 # django (test_that only setup django in setup_local_afe, since it's 286 # not needed when test_that runs the test in a lab duts through :lab: 287 # option. Therefore, if server_manager_utils is imported at the 288 # beginning of this module, test_that will fail since django is not 289 # setup yet. 290 from autotest_lib.site_utils import server_manager_utils 291 config = global_config.global_config 292 section = scheduler_config.CONFIG_SECTION 293 config.parse_config_file() 294 for hostname, drone in self._drones.iteritems(): 295 if server_manager_utils.use_server_db(): 296 server = server_manager_utils.get_servers(hostname=hostname)[0] 297 attributes = dict([(a.attribute, a.value) 298 for a in server.attributes.all()]) 299 drone.enabled = ( 300 int(attributes.get('disabled', 0)) == 0) 301 drone.max_processes = int( 302 attributes.get( 303 'max_processes', 304 scheduler_config.config.max_processes_per_drone)) 305 allowed_users = attributes.get('users', None) 306 else: 307 disabled = config.get_config_value( 308 section, '%s_disabled' % hostname, default='') 309 drone.enabled = not bool(disabled) 310 drone.max_processes = config.get_config_value( 311 section, '%s_max_processes' % hostname, type=int, 312 default=scheduler_config.config.max_processes_per_drone) 313 314 allowed_users = config.get_config_value( 315 section, '%s_users' % hostname, default=None) 316 if allowed_users: 317 drone.allowed_users = set(allowed_users.split()) 318 else: 319 drone.allowed_users = None 320 logging.info('Drone %s.max_processes: %s', hostname, 321 drone.max_processes) 322 logging.info('Drone %s.enabled: %s', hostname, drone.enabled) 323 logging.info('Drone %s.allowed_users: %s', hostname, 324 drone.allowed_users) 325 326 self._reorder_drone_queue() # max_processes may have changed 327 # Clear notification record about reaching max_processes limit. 328 self._notify_record = {} 329 330 331 def get_drones(self): 332 return self._drones.itervalues() 333 334 335 def cleanup_orphaned_containers(self): 336 """Queue cleanup_orphaned_containers call at each drone. 337 """ 338 for drone in self._drones.values(): 339 logging.info('Queue cleanup_orphaned_containers at %s', 340 drone.hostname) 341 drone.queue_call('cleanup_orphaned_containers') 342 343 344 def _get_drone_for_process(self, process): 345 return self._drones[process.hostname] 346 347 348 def _get_drone_for_pidfile_id(self, pidfile_id): 349 pidfile_contents = self.get_pidfile_contents(pidfile_id) 350 if pidfile_contents.process is None: 351 raise DroneManagerError('Fail to get a drone due to empty pidfile') 352 return self._get_drone_for_process(pidfile_contents.process) 353 354 355 def get_drone_for_pidfile_id(self, pidfile_id): 356 """Public API for luciferlib. 357 358 @param pidfile_id: PidfileId instance. 359 """ 360 return self._get_drone_for_pidfile_id(pidfile_id) 361 362 363 def _drop_old_pidfiles(self): 364 # use items() since the dict is modified in unregister_pidfile() 365 for pidfile_id, info in self._registered_pidfile_info.items(): 366 if info.age > self._get_max_pidfile_refreshes(): 367 logging.warning('dropping leaked pidfile %s', pidfile_id) 368 self.unregister_pidfile(pidfile_id) 369 else: 370 info.age += 1 371 372 373 def _reset(self): 374 self._process_set = set() 375 self._all_processes = {} 376 self._pidfiles = {} 377 self._pidfiles_second_read = {} 378 self._drone_queue = [] 379 380 381 def _parse_pidfile(self, drone, raw_contents): 382 """Parse raw pidfile contents. 383 384 @param drone: The drone on which this pidfile was found. 385 @param raw_contents: The raw contents of a pidfile, eg: 386 "pid\nexit_staus\nnum_tests_failed\n". 387 """ 388 contents = PidfileContents() 389 if not raw_contents: 390 return contents 391 lines = raw_contents.splitlines() 392 if len(lines) > 3: 393 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' % 394 (len(lines), lines)) 395 try: 396 pid = int(lines[0]) 397 contents.process = Process(drone.hostname, pid) 398 # if len(lines) == 2, assume we caught Autoserv between writing 399 # exit_status and num_failed_tests, so just ignore it and wait for 400 # the next cycle 401 if len(lines) == 3: 402 contents.exit_status = int(lines[1]) 403 contents.num_tests_failed = int(lines[2]) 404 except ValueError, exc: 405 return InvalidPidfile('Corrupt pid file: ' + str(exc.args)) 406 407 return contents 408 409 410 def _process_pidfiles(self, drone, pidfiles, store_in_dict): 411 for pidfile_path, contents in pidfiles.iteritems(): 412 pidfile_id = PidfileId(pidfile_path) 413 contents = self._parse_pidfile(drone, contents) 414 store_in_dict[pidfile_id] = contents 415 416 417 def _add_process(self, drone, process_info): 418 process = Process(drone.hostname, int(process_info['pid']), 419 int(process_info['ppid'])) 420 self._process_set.add(process) 421 422 423 def _add_autoserv_process(self, drone, process_info): 424 assert process_info['comm'] == 'autoserv' 425 # only root autoserv processes have pgid == pid 426 if process_info['pgid'] != process_info['pid']: 427 return 428 self._add_process(drone, process_info) 429 430 431 def _enqueue_drone(self, drone): 432 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone)) 433 434 435 def _reorder_drone_queue(self): 436 heapq.heapify(self._drone_queue) 437 438 439 def reorder_drone_queue(self): 440 """Reorder drone queue according to modified process counts. 441 442 This public API is exposed for luciferlib to wrap. 443 """ 444 self._reorder_drone_queue() 445 446 447 def _compute_active_processes(self, drone): 448 drone.active_processes = 0 449 for pidfile_id, contents in self._pidfiles.iteritems(): 450 is_running = contents.exit_status is None 451 on_this_drone = (contents.process 452 and contents.process.hostname == drone.hostname) 453 if is_running and on_this_drone: 454 info = self._registered_pidfile_info[pidfile_id] 455 if info.num_processes is not None: 456 drone.active_processes += info.num_processes 457 458 metrics.Gauge('chromeos/autotest/drone/active_processes').set( 459 drone.active_processes, 460 fields={'drone_hostname': drone.hostname}) 461 462 463 def _check_drone_process_limit(self, drone): 464 """ 465 Notify if the number of processes on |drone| is approaching limit. 466 467 @param drone: A Drone object. 468 """ 469 try: 470 percent = float(drone.active_processes) / drone.max_processes 471 except ZeroDivisionError: 472 percent = 100 473 metrics.Float('chromeos/autotest/drone/active_process_percentage' 474 ).set(percent, fields={'drone_hostname': drone.hostname}) 475 476 def trigger_refresh(self): 477 """Triggers a drone manager refresh. 478 479 @raises DroneManagerError: If a drone has un-executed calls. 480 Since they will get clobbered when we queue refresh calls. 481 """ 482 self._reset() 483 self._drop_old_pidfiles() 484 pidfile_paths = [pidfile_id.path 485 for pidfile_id in self._registered_pidfile_info] 486 drones = list(self.get_drones()) 487 for drone in drones: 488 calls = drone.get_calls() 489 if calls: 490 raise DroneManagerError('Drone %s has un-executed calls: %s ' 491 'which might get corrupted through ' 492 'this invocation' % 493 (drone, [str(call) for call in calls])) 494 drone.queue_call('refresh', pidfile_paths) 495 logging.info("Invoking drone refresh.") 496 with metrics.SecondsTimer( 497 'chromeos/autotest/drone_manager/trigger_refresh_duration'): 498 self._refresh_task_queue.execute(drones, wait=False) 499 500 501 def sync_refresh(self): 502 """Complete the drone refresh started by trigger_refresh. 503 504 Waits for all drone threads then refreshes internal datastructures 505 with drone process information. 506 """ 507 508 # This gives us a dictionary like what follows: 509 # {drone: [{'pidfiles': (raw contents of pidfile paths), 510 # 'autoserv_processes': (autoserv process info from ps), 511 # 'all_processes': (all process info from ps), 512 # 'parse_processes': (parse process infor from ps), 513 # 'pidfile_second_read': (pidfile contents, again),}] 514 # drone2: ...} 515 # The values of each drone are only a list because this adheres to the 516 # drone utility interface (each call is executed and its results are 517 # places in a list, but since we never couple the refresh calls with 518 # any other call, this list will always contain a single dict). 519 with metrics.SecondsTimer( 520 'chromeos/autotest/drone_manager/sync_refresh_duration'): 521 all_results = self._refresh_task_queue.get_results() 522 logging.info("Drones refreshed.") 523 524 # The loop below goes through and parses pidfile contents. Pidfiles 525 # are used to track autoserv execution, and will always contain < 3 526 # lines of the following: pid, exit code, number of tests. Each pidfile 527 # is identified by a PidfileId object, which contains a unique pidfile 528 # path (unique because it contains the job id) making it hashable. 529 # All pidfiles are stored in the drone managers _pidfiles dict as: 530 # {pidfile_id: pidfile_contents(Process(drone, pid), 531 # exit_code, num_tests_failed)} 532 # In handle agents, each agent knows its pidfile_id, and uses this 533 # to retrieve the refreshed contents of its pidfile via the 534 # PidfileRunMonitor (through its tick) before making decisions. If 535 # the agent notices that its process has exited, it unregisters the 536 # pidfile from the drone_managers._registered_pidfile_info dict 537 # through its epilog. 538 for drone, results_list in all_results.iteritems(): 539 results = results_list[0] 540 drone_hostname = drone.hostname.replace('.', '_') 541 542 for process_info in results['all_processes']: 543 if process_info['comm'] == 'autoserv': 544 self._add_autoserv_process(drone, process_info) 545 drone_pid = drone.hostname, int(process_info['pid']) 546 self._all_processes[drone_pid] = process_info 547 548 for process_info in results['parse_processes']: 549 self._add_process(drone, process_info) 550 551 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles) 552 self._process_pidfiles(drone, results['pidfiles_second_read'], 553 self._pidfiles_second_read) 554 555 self._compute_active_processes(drone) 556 if drone.enabled: 557 self._enqueue_drone(drone) 558 self._check_drone_process_limit(drone) 559 560 561 def refresh(self): 562 """Refresh all drones.""" 563 with metrics.SecondsTimer( 564 'chromeos/autotest/drone_manager/refresh_duration'): 565 self.trigger_refresh() 566 self.sync_refresh() 567 568 569 @metrics.SecondsTimerDecorator( 570 'chromeos/autotest/drone_manager/execute_actions_duration') 571 def execute_actions(self): 572 """ 573 Called at the end of a scheduler cycle to execute all queued actions 574 on drones. 575 """ 576 # Invoke calls queued on all drones since the last call to execute 577 # and wait for them to return. 578 if _THREADED_DRONE_MANAGER: 579 thread_lib.ThreadedTaskQueue( 580 name='%s.execute_queue' % self._STATS_KEY).execute( 581 self._drones.values()) 582 else: 583 drone_task_queue.DroneTaskQueue().execute(self._drones.values()) 584 585 try: 586 self._results_drone.execute_queued_calls() 587 except error.AutoservError: 588 m = 'chromeos/autotest/errors/results_repository_failed' 589 metrics.Counter(m).increment( 590 fields={'drone_hostname': self._results_drone.hostname}) 591 self._results_drone.clear_call_queue() 592 593 594 def get_orphaned_autoserv_processes(self): 595 """ 596 Returns a set of Process objects for orphaned processes only. 597 """ 598 return set(process for process in self._process_set 599 if process.ppid == 1) 600 601 602 def kill_process(self, process): 603 """ 604 Kill the given process. 605 """ 606 logging.info('killing %s', process) 607 drone = self._get_drone_for_process(process) 608 drone.queue_kill_process(process) 609 610 611 def _ensure_directory_exists(self, path): 612 if not os.path.exists(path): 613 os.makedirs(path) 614 615 616 def total_running_processes(self): 617 return sum(drone.active_processes for drone in self.get_drones()) 618 619 620 def max_runnable_processes(self, username, drone_hostnames_allowed): 621 """ 622 Return the maximum number of processes that can be run (in a single 623 execution) given the current load on drones. 624 @param username: login of user to run a process. may be None. 625 @param drone_hostnames_allowed: list of drones that can be used. May be 626 None 627 """ 628 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue 629 if wrapper.drone.usable_by(username) and 630 (drone_hostnames_allowed is None or 631 wrapper.drone.hostname in 632 drone_hostnames_allowed)] 633 if not usable_drone_wrappers: 634 # all drones disabled or inaccessible 635 return 0 636 runnable_processes = [ 637 wrapper.drone.max_processes - wrapper.drone.active_processes 638 for wrapper in usable_drone_wrappers] 639 return max([0] + runnable_processes) 640 641 642 def _least_loaded_drone(self, drones): 643 return min(drones, key=lambda d: d.used_capacity()) 644 645 646 def pick_drone_to_use(self, num_processes=1): 647 """Return a drone to use. 648 649 Various options can be passed to optimize drone selection. 650 651 num_processes is the number of processes the drone is intended 652 to run. 653 654 This public API is exposed for luciferlib to wrap. 655 656 Returns a drone instance (see drones.py). 657 """ 658 return self._choose_drone_for_execution( 659 num_processes=num_processes, 660 username=None, # Always allow all drones 661 drone_hostnames_allowed=None, # Always allow all drones 662 ) 663 664 665 def _choose_drone_for_execution(self, num_processes, username, 666 drone_hostnames_allowed): 667 """Choose a drone to execute command. 668 669 @param num_processes: Number of processes needed for execution. 670 @param username: Name of the user to execute the command. 671 @param drone_hostnames_allowed: A list of names of drone allowed. 672 673 @return: A drone object to be used for execution. 674 """ 675 # cycle through drones is order of increasing used capacity until 676 # we find one that can handle these processes 677 checked_drones = [] 678 usable_drones = [] 679 drone_to_use = None 680 while self._drone_queue: 681 drone = heapq.heappop(self._drone_queue).drone 682 checked_drones.append(drone) 683 logging.info('Checking drone %s', drone.hostname) 684 if not drone.usable_by(username): 685 continue 686 687 drone_allowed = (drone_hostnames_allowed is None 688 or drone.hostname in drone_hostnames_allowed) 689 if not drone_allowed: 690 logging.debug('Drone %s not allowed: ', drone.hostname) 691 continue 692 693 usable_drones.append(drone) 694 695 if drone.active_processes + num_processes <= drone.max_processes: 696 drone_to_use = drone 697 break 698 logging.info('Drone %s has %d active + %s requested > %s max', 699 drone.hostname, drone.active_processes, num_processes, 700 drone.max_processes) 701 702 if not drone_to_use and usable_drones: 703 # Drones are all over loaded, pick the one with least load. 704 drone_summary = ','.join('%s %s/%s' % (drone.hostname, 705 drone.active_processes, 706 drone.max_processes) 707 for drone in usable_drones) 708 logging.error('No drone has capacity to handle %d processes (%s) ' 709 'for user %s', num_processes, drone_summary, username) 710 drone_to_use = self._least_loaded_drone(usable_drones) 711 712 # refill _drone_queue 713 for drone in checked_drones: 714 self._enqueue_drone(drone) 715 716 return drone_to_use 717 718 719 def _substitute_working_directory_into_command(self, command, 720 working_directory): 721 for i, item in enumerate(command): 722 if item is WORKING_DIRECTORY: 723 command[i] = working_directory 724 725 726 def execute_command(self, command, working_directory, pidfile_name, 727 num_processes, log_file=None, paired_with_pidfile=None, 728 username=None, drone_hostnames_allowed=None): 729 """ 730 Execute the given command, taken as an argv list. 731 732 @param command: command to execute as a list. if any item is 733 WORKING_DIRECTORY, the absolute path to the working directory 734 will be substituted for it. 735 @param working_directory: directory in which the pidfile will be written 736 @param pidfile_name: name of the pidfile this process will write 737 @param num_processes: number of processes to account for from this 738 execution 739 @param log_file (optional): path (in the results repository) to hold 740 command output. 741 @param paired_with_pidfile (optional): a PidfileId for an 742 already-executed process; the new process will execute on the 743 same drone as the previous process. 744 @param username (optional): login of the user responsible for this 745 process. 746 @param drone_hostnames_allowed (optional): hostnames of the drones that 747 this command is allowed to 748 execute on 749 """ 750 abs_working_directory = self.absolute_path(working_directory) 751 if not log_file: 752 log_file = self.get_temporary_path('execute') 753 log_file = self.absolute_path(log_file) 754 755 self._substitute_working_directory_into_command(command, 756 abs_working_directory) 757 758 if paired_with_pidfile: 759 drone = self._get_drone_for_pidfile_id(paired_with_pidfile) 760 else: 761 drone = self._choose_drone_for_execution( 762 num_processes, username, drone_hostnames_allowed) 763 764 if not drone: 765 raise DroneManagerError('command failed; no drones available: %s' 766 % command) 767 768 logging.info("command = %s", command) 769 logging.info('log file = %s:%s', drone.hostname, log_file) 770 self._write_attached_files(working_directory, drone) 771 drone.queue_call('execute_command', command, abs_working_directory, 772 log_file, pidfile_name) 773 drone.active_processes += num_processes 774 self._reorder_drone_queue() 775 776 pidfile_path = os.path.join(abs_working_directory, pidfile_name) 777 pidfile_id = PidfileId(pidfile_path) 778 self.register_pidfile(pidfile_id) 779 self._registered_pidfile_info[pidfile_id].num_processes = num_processes 780 return pidfile_id 781 782 783 def get_pidfile_id_from(self, execution_tag, pidfile_name): 784 path = os.path.join(self.absolute_path(execution_tag), pidfile_name) 785 return PidfileId(path) 786 787 788 def register_pidfile(self, pidfile_id): 789 """ 790 Indicate that the DroneManager should look for the given pidfile when 791 refreshing. 792 """ 793 if pidfile_id not in self._registered_pidfile_info: 794 logging.info('monitoring pidfile %s', pidfile_id) 795 self._registered_pidfile_info[pidfile_id] = _PidfileInfo() 796 self._reset_pidfile_age(pidfile_id) 797 798 799 def _reset_pidfile_age(self, pidfile_id): 800 if pidfile_id in self._registered_pidfile_info: 801 self._registered_pidfile_info[pidfile_id].age = 0 802 803 804 def unregister_pidfile(self, pidfile_id): 805 if pidfile_id in self._registered_pidfile_info: 806 logging.info('forgetting pidfile %s', pidfile_id) 807 del self._registered_pidfile_info[pidfile_id] 808 809 810 def declare_process_count(self, pidfile_id, num_processes): 811 self._registered_pidfile_info[pidfile_id].num_processes = num_processes 812 813 814 def get_pidfile_contents(self, pidfile_id, use_second_read=False): 815 """ 816 Retrieve a PidfileContents object for the given pidfile_id. If 817 use_second_read is True, use results that were read after the processes 818 were checked, instead of before. 819 """ 820 self._reset_pidfile_age(pidfile_id) 821 if use_second_read: 822 pidfile_map = self._pidfiles_second_read 823 else: 824 pidfile_map = self._pidfiles 825 return pidfile_map.get(pidfile_id, PidfileContents()) 826 827 828 def is_process_running(self, process): 829 """ 830 Check if the given process is in the running process list. 831 """ 832 if process in self._process_set: 833 return True 834 835 drone_pid = process.hostname, process.pid 836 if drone_pid in self._all_processes: 837 logging.error('Process %s found, but not an autoserv process. ' 838 'Is %s', process, self._all_processes[drone_pid]) 839 return True 840 841 return False 842 843 844 def get_temporary_path(self, base_name): 845 """ 846 Get a new temporary path guaranteed to be unique across all drones 847 for this scheduler execution. 848 """ 849 self._temporary_path_counter += 1 850 return os.path.join(drone_utility._TEMPORARY_DIRECTORY, 851 '%s.%s' % (base_name, self._temporary_path_counter)) 852 853 854 def absolute_path(self, path, on_results_repository=False): 855 if on_results_repository: 856 base_dir = self._results_dir 857 else: 858 base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR, 859 _DRONE_RESULTS_DIR_SUFFIX) 860 return os.path.join(base_dir, path) 861 862 863 def _copy_results_helper(self, process, source_path, destination_path, 864 to_results_repository=False): 865 logging.debug('_copy_results_helper. process: %s, source_path: %s, ' 866 'destination_path: %s, to_results_repository: %s', 867 process, source_path, destination_path, 868 to_results_repository) 869 full_source = self.absolute_path(source_path) 870 full_destination = self.absolute_path( 871 destination_path, on_results_repository=to_results_repository) 872 source_drone = self._get_drone_for_process(process) 873 if to_results_repository: 874 source_drone.send_file_to(self._results_drone, full_source, 875 full_destination, can_fail=True) 876 else: 877 source_drone.queue_call('copy_file_or_directory', full_source, 878 full_destination) 879 880 881 def copy_to_results_repository(self, process, source_path, 882 destination_path=None): 883 """ 884 Copy results from the given process at source_path to destination_path 885 in the results repository. 886 887 This will only copy the results back for Special Agent Tasks (Cleanup, 888 Verify, Repair) that reside in the hosts/ subdirectory of results if 889 the copy_task_results_back flag has been set to True inside 890 global_config.ini 891 892 It will also only copy .parse.log files back to the scheduler if the 893 copy_parse_log_back flag in global_config.ini has been set to True. 894 """ 895 if not ENABLE_ARCHIVING: 896 return 897 copy_task_results_back = global_config.global_config.get_config_value( 898 scheduler_config.CONFIG_SECTION, 'copy_task_results_back', 899 type=bool) 900 copy_parse_log_back = global_config.global_config.get_config_value( 901 scheduler_config.CONFIG_SECTION, 'copy_parse_log_back', 902 type=bool) 903 special_task = source_path.startswith(HOSTS_JOB_SUBDIR) 904 parse_log = source_path.endswith(PARSE_LOG) 905 if (copy_task_results_back or not special_task) and ( 906 copy_parse_log_back or not parse_log): 907 if destination_path is None: 908 destination_path = source_path 909 self._copy_results_helper(process, source_path, destination_path, 910 to_results_repository=True) 911 912 def _copy_to_results_repository(self, process, source_path, 913 destination_path=None): 914 """ 915 Copy results from the given process at source_path to destination_path 916 in the results repository, without special task handling. 917 """ 918 if destination_path is None: 919 destination_path = source_path 920 self._copy_results_helper(process, source_path, destination_path, 921 to_results_repository=True) 922 923 924 def copy_results_on_drone(self, process, source_path, destination_path): 925 """ 926 Copy a results directory from one place to another on the drone. 927 """ 928 self._copy_results_helper(process, source_path, destination_path) 929 930 931 def _write_attached_files(self, results_dir, drone): 932 attached_files = self._attached_files.pop(results_dir, {}) 933 for file_path, contents in attached_files.iteritems(): 934 drone.queue_call('write_to_file', self.absolute_path(file_path), 935 contents) 936 937 938 def attach_file_to_execution(self, results_dir, file_contents, 939 file_path=None): 940 """ 941 When the process for the results directory is executed, the given file 942 contents will be placed in a file on the drone. Returns the path at 943 which the file will be placed. 944 """ 945 if not file_path: 946 file_path = self.get_temporary_path('attach') 947 files_for_execution = self._attached_files.setdefault(results_dir, {}) 948 assert file_path not in files_for_execution 949 files_for_execution[file_path] = file_contents 950 return file_path 951 952 953 def write_lines_to_file(self, file_path, lines, paired_with_process=None): 954 """ 955 Write the given lines (as a list of strings) to a file. If 956 paired_with_process is given, the file will be written on the drone 957 running the given Process. Otherwise, the file will be written to the 958 results repository. 959 """ 960 file_contents = '\n'.join(lines) + '\n' 961 if paired_with_process: 962 drone = self._get_drone_for_process(paired_with_process) 963 on_results_repository = False 964 else: 965 drone = self._results_drone 966 on_results_repository = True 967 full_path = self.absolute_path( 968 file_path, on_results_repository=on_results_repository) 969 drone.queue_call('write_to_file', full_path, file_contents) 970 971 972_the_instance = None 973 974def instance(): 975 if _the_instance is None: 976 _set_instance(DroneManager()) 977 return _the_instance 978 979 980def _set_instance(instance): # usable for testing 981 global _the_instance 982 _the_instance = instance 983