1import heapq 2import os 3import logging 4 5import common 6from autotest_lib.client.common_lib import error 7from autotest_lib.client.common_lib import global_config 8from autotest_lib.client.common_lib import utils 9from autotest_lib.scheduler import drone_task_queue 10from autotest_lib.scheduler import drone_utility 11from autotest_lib.scheduler import drones 12from autotest_lib.scheduler import scheduler_config 13from autotest_lib.scheduler import thread_lib 14 15try: 16 from chromite.lib import metrics 17except ImportError: 18 metrics = utils.metrics_mock 19 20 21# results on drones will be placed under the drone_installation_directory in a 22# directory with this name 23_DRONE_RESULTS_DIR_SUFFIX = 'results' 24 25WORKING_DIRECTORY = object() # see execute_command() 26 27 28AUTOSERV_PID_FILE = '.autoserv_execute' 29CRASHINFO_PID_FILE = '.collect_crashinfo_execute' 30PARSER_PID_FILE = '.parser_execute' 31ARCHIVER_PID_FILE = '.archiver_execute' 32 33ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE, 34 ARCHIVER_PID_FILE) 35 36_THREADED_DRONE_MANAGER = global_config.global_config.get_config_value( 37 scheduler_config.CONFIG_SECTION, 'threaded_drone_manager', 38 type=bool, default=True) 39 40HOSTS_JOB_SUBDIR = 'hosts/' 41PARSE_LOG = '.parse.log' 42ENABLE_ARCHIVING = global_config.global_config.get_config_value( 43 scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool) 44 45 46class DroneManagerError(Exception): 47 pass 48 49 50class CustomEquals(object): 51 def _id(self): 52 raise NotImplementedError 53 54 55 def __eq__(self, other): 56 if not isinstance(other, type(self)): 57 return NotImplemented 58 return self._id() == other._id() 59 60 61 def __ne__(self, other): 62 return not self == other 63 64 65 def __hash__(self): 66 return hash(self._id()) 67 68 69class Process(CustomEquals): 70 def __init__(self, hostname, pid, ppid=None): 71 self.hostname = hostname 72 self.pid = pid 73 self.ppid = ppid 74 75 def _id(self): 76 return (self.hostname, self.pid) 77 78 79 def __str__(self): 80 return '%s/%s' % (self.hostname, self.pid) 81 82 83 def __repr__(self): 84 return super(Process, self).__repr__() + '<%s>' % self 85 86 87class PidfileId(CustomEquals): 88 def __init__(self, path): 89 self.path = path 90 91 92 def _id(self): 93 return self.path 94 95 96 def __str__(self): 97 return str(self.path) 98 99 100class _PidfileInfo(object): 101 age = 0 102 num_processes = None 103 104 105class PidfileContents(object): 106 process = None 107 exit_status = None 108 num_tests_failed = None 109 110 def is_invalid(self): 111 return False 112 113 114 def is_running(self): 115 return self.process and not self.exit_status 116 117 118class InvalidPidfile(object): 119 process = None 120 exit_status = None 121 num_tests_failed = None 122 123 124 def __init__(self, error): 125 self.error = error 126 127 128 def is_invalid(self): 129 return True 130 131 132 def is_running(self): 133 return False 134 135 136 def __str__(self): 137 return self.error 138 139 140class _DroneHeapWrapper(object): 141 """Wrapper to compare drones based on used_capacity(). 142 143 These objects can be used to keep a heap of drones by capacity. 144 """ 145 def __init__(self, drone): 146 self.drone = drone 147 148 149 def __cmp__(self, other): 150 assert isinstance(other, _DroneHeapWrapper) 151 return cmp(self.drone.used_capacity(), other.drone.used_capacity()) 152 153 154class DroneManager(object): 155 """ 156 This class acts as an interface from the scheduler to drones, whether it be 157 only a single "drone" for localhost or multiple remote drones. 158 159 All paths going into and out of this class are relative to the full results 160 directory, except for those returns by absolute_path(). 161 """ 162 163 164 # Minimum time to wait before next email 165 # about a drone hitting process limit is sent. 166 NOTIFY_INTERVAL = 60 * 60 * 24 # one day 167 _STATS_KEY = 'drone_manager' 168 169 170 171 def __init__(self): 172 # absolute path of base results dir 173 self._results_dir = None 174 # holds Process objects 175 self._process_set = set() 176 # holds the list of all processes running on all drones 177 self._all_processes = {} 178 # maps PidfileId to PidfileContents 179 self._pidfiles = {} 180 # same as _pidfiles 181 self._pidfiles_second_read = {} 182 # maps PidfileId to _PidfileInfo 183 self._registered_pidfile_info = {} 184 # used to generate unique temporary paths 185 self._temporary_path_counter = 0 186 # maps hostname to Drone object 187 self._drones = {} 188 self._results_drone = None 189 # maps results dir to dict mapping file path to contents 190 self._attached_files = {} 191 # heapq of _DroneHeapWrappers 192 self._drone_queue = [] 193 # A threaded task queue used to refresh drones asynchronously. 194 if _THREADED_DRONE_MANAGER: 195 self._refresh_task_queue = thread_lib.ThreadedTaskQueue( 196 name='%s.refresh_queue' % self._STATS_KEY) 197 else: 198 self._refresh_task_queue = drone_task_queue.DroneTaskQueue() 199 200 201 def initialize(self, base_results_dir, drone_hostnames, 202 results_repository_hostname): 203 self._results_dir = base_results_dir 204 205 for hostname in drone_hostnames: 206 self._add_drone(hostname) 207 208 if not self._drones: 209 # all drones failed to initialize 210 raise DroneManagerError('No valid drones found') 211 212 self.refresh_drone_configs() 213 214 logging.info('Using results repository on %s', 215 results_repository_hostname) 216 self._results_drone = drones.get_drone(results_repository_hostname) 217 results_installation_dir = global_config.global_config.get_config_value( 218 scheduler_config.CONFIG_SECTION, 219 'results_host_installation_directory', default=None) 220 if results_installation_dir: 221 self._results_drone.set_autotest_install_dir( 222 results_installation_dir) 223 # don't initialize() the results drone - we don't want to clear out any 224 # directories and we don't need to kill any processes 225 226 227 def reinitialize_drones(self): 228 for drone in self.get_drones(): 229 with metrics.SecondsTimer( 230 'chromeos/autotest/drone_manager/' 231 'reinitialize_drones_duration', 232 fields={'drone': drone.hostname}): 233 drone.call('initialize', self._results_dir) 234 235 236 def shutdown(self): 237 for drone in self.get_drones(): 238 drone.shutdown() 239 240 241 def _get_max_pidfile_refreshes(self): 242 """ 243 Normally refresh() is called on every monitor_db.Dispatcher.tick(). 244 245 @returns: The number of refresh() calls before we forget a pidfile. 246 """ 247 pidfile_timeout = global_config.global_config.get_config_value( 248 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes', 249 type=int, default=2000) 250 return pidfile_timeout 251 252 253 def _add_drone(self, hostname): 254 """ 255 Add drone. 256 257 Catches AutoservRunError if the drone fails initialization and does not 258 add it to the list of usable drones. 259 260 @param hostname: Hostname of the drone we are trying to add. 261 """ 262 logging.info('Adding drone %s' % hostname) 263 drone = drones.get_drone(hostname) 264 if drone: 265 try: 266 drone.call('initialize', self.absolute_path('')) 267 except error.AutoservRunError as e: 268 logging.error('Failed to initialize drone %s with error: %s', 269 hostname, e) 270 return 271 self._drones[drone.hostname] = drone 272 273 274 def _remove_drone(self, hostname): 275 self._drones.pop(hostname, None) 276 277 278 def refresh_drone_configs(self): 279 """ 280 Reread global config options for all drones. 281 """ 282 # Import server_manager_utils is delayed rather than at the beginning of 283 # this module. The reason is that test_that imports drone_manager when 284 # importing autoserv_utils. The import is done before test_that setup 285 # django (test_that only setup django in setup_local_afe, since it's 286 # not needed when test_that runs the test in a lab duts through :lab: 287 # option. Therefore, if server_manager_utils is imported at the 288 # beginning of this module, test_that will fail since django is not 289 # setup yet. 290 from autotest_lib.site_utils import server_manager_utils 291 config = global_config.global_config 292 section = scheduler_config.CONFIG_SECTION 293 config.parse_config_file() 294 for hostname, drone in self._drones.iteritems(): 295 if server_manager_utils.use_server_db(): 296 server = server_manager_utils.get_servers(hostname=hostname)[0] 297 attributes = dict([(a.attribute, a.value) 298 for a in server.attributes.all()]) 299 drone.enabled = ( 300 int(attributes.get('disabled', 0)) == 0) 301 drone.max_processes = int( 302 attributes.get( 303 'max_processes', 304 scheduler_config.config.max_processes_per_drone)) 305 allowed_users = attributes.get('users', None) 306 else: 307 disabled = config.get_config_value( 308 section, '%s_disabled' % hostname, default='') 309 drone.enabled = not bool(disabled) 310 drone.max_processes = config.get_config_value( 311 section, '%s_max_processes' % hostname, type=int, 312 default=scheduler_config.config.max_processes_per_drone) 313 314 allowed_users = config.get_config_value( 315 section, '%s_users' % hostname, default=None) 316 if allowed_users: 317 drone.allowed_users = set(allowed_users.split()) 318 else: 319 drone.allowed_users = None 320 logging.info('Drone %s.max_processes: %s', hostname, 321 drone.max_processes) 322 logging.info('Drone %s.enabled: %s', hostname, drone.enabled) 323 logging.info('Drone %s.allowed_users: %s', hostname, 324 drone.allowed_users) 325 logging.info('Drone %s.support_ssp: %s', hostname, 326 drone.support_ssp) 327 328 self._reorder_drone_queue() # max_processes may have changed 329 # Clear notification record about reaching max_processes limit. 330 self._notify_record = {} 331 332 333 def get_drones(self): 334 return self._drones.itervalues() 335 336 337 def cleanup_orphaned_containers(self): 338 """Queue cleanup_orphaned_containers call at each drone. 339 """ 340 for drone in self._drones.values(): 341 logging.info('Queue cleanup_orphaned_containers at %s', 342 drone.hostname) 343 drone.queue_call('cleanup_orphaned_containers') 344 345 346 def _get_drone_for_process(self, process): 347 return self._drones[process.hostname] 348 349 350 def _get_drone_for_pidfile_id(self, pidfile_id): 351 pidfile_contents = self.get_pidfile_contents(pidfile_id) 352 if pidfile_contents.process is None: 353 raise DroneManagerError('Fail to get a drone due to empty pidfile') 354 return self._get_drone_for_process(pidfile_contents.process) 355 356 357 def get_drone_for_pidfile_id(self, pidfile_id): 358 """Public API for luciferlib. 359 360 @param pidfile_id: PidfileId instance. 361 """ 362 return self._get_drone_for_pidfile_id(pidfile_id) 363 364 365 def _drop_old_pidfiles(self): 366 # use items() since the dict is modified in unregister_pidfile() 367 for pidfile_id, info in self._registered_pidfile_info.items(): 368 if info.age > self._get_max_pidfile_refreshes(): 369 logging.warning('dropping leaked pidfile %s', pidfile_id) 370 self.unregister_pidfile(pidfile_id) 371 else: 372 info.age += 1 373 374 375 def _reset(self): 376 self._process_set = set() 377 self._all_processes = {} 378 self._pidfiles = {} 379 self._pidfiles_second_read = {} 380 self._drone_queue = [] 381 382 383 def _parse_pidfile(self, drone, raw_contents): 384 """Parse raw pidfile contents. 385 386 @param drone: The drone on which this pidfile was found. 387 @param raw_contents: The raw contents of a pidfile, eg: 388 "pid\nexit_staus\nnum_tests_failed\n". 389 """ 390 contents = PidfileContents() 391 if not raw_contents: 392 return contents 393 lines = raw_contents.splitlines() 394 if len(lines) > 3: 395 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' % 396 (len(lines), lines)) 397 try: 398 pid = int(lines[0]) 399 contents.process = Process(drone.hostname, pid) 400 # if len(lines) == 2, assume we caught Autoserv between writing 401 # exit_status and num_failed_tests, so just ignore it and wait for 402 # the next cycle 403 if len(lines) == 3: 404 contents.exit_status = int(lines[1]) 405 contents.num_tests_failed = int(lines[2]) 406 except ValueError, exc: 407 return InvalidPidfile('Corrupt pid file: ' + str(exc.args)) 408 409 return contents 410 411 412 def _process_pidfiles(self, drone, pidfiles, store_in_dict): 413 for pidfile_path, contents in pidfiles.iteritems(): 414 pidfile_id = PidfileId(pidfile_path) 415 contents = self._parse_pidfile(drone, contents) 416 store_in_dict[pidfile_id] = contents 417 418 419 def _add_process(self, drone, process_info): 420 process = Process(drone.hostname, int(process_info['pid']), 421 int(process_info['ppid'])) 422 self._process_set.add(process) 423 424 425 def _add_autoserv_process(self, drone, process_info): 426 assert process_info['comm'] == 'autoserv' 427 # only root autoserv processes have pgid == pid 428 if process_info['pgid'] != process_info['pid']: 429 return 430 self._add_process(drone, process_info) 431 432 433 def _enqueue_drone(self, drone): 434 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone)) 435 436 437 def _reorder_drone_queue(self): 438 heapq.heapify(self._drone_queue) 439 440 441 def _compute_active_processes(self, drone): 442 drone.active_processes = 0 443 for pidfile_id, contents in self._pidfiles.iteritems(): 444 is_running = contents.exit_status is None 445 on_this_drone = (contents.process 446 and contents.process.hostname == drone.hostname) 447 if is_running and on_this_drone: 448 info = self._registered_pidfile_info[pidfile_id] 449 if info.num_processes is not None: 450 drone.active_processes += info.num_processes 451 452 metrics.Gauge('chromeos/autotest/drone/active_processes').set( 453 drone.active_processes, 454 fields={'drone_hostname': drone.hostname}) 455 456 457 def _check_drone_process_limit(self, drone): 458 """ 459 Notify if the number of processes on |drone| is approaching limit. 460 461 @param drone: A Drone object. 462 """ 463 try: 464 percent = float(drone.active_processes) / drone.max_processes 465 except ZeroDivisionError: 466 percent = 100 467 metrics.Float('chromeos/autotest/drone/active_process_percentage' 468 ).set(percent, fields={'drone_hostname': drone.hostname}) 469 470 def trigger_refresh(self): 471 """Triggers a drone manager refresh. 472 473 @raises DroneManagerError: If a drone has un-executed calls. 474 Since they will get clobbered when we queue refresh calls. 475 """ 476 self._reset() 477 self._drop_old_pidfiles() 478 pidfile_paths = [pidfile_id.path 479 for pidfile_id in self._registered_pidfile_info] 480 drones = list(self.get_drones()) 481 for drone in drones: 482 calls = drone.get_calls() 483 if calls: 484 raise DroneManagerError('Drone %s has un-executed calls: %s ' 485 'which might get corrupted through ' 486 'this invocation' % 487 (drone, [str(call) for call in calls])) 488 drone.queue_call('refresh', pidfile_paths) 489 logging.info("Invoking drone refresh.") 490 with metrics.SecondsTimer( 491 'chromeos/autotest/drone_manager/trigger_refresh_duration'): 492 self._refresh_task_queue.execute(drones, wait=False) 493 494 495 def sync_refresh(self): 496 """Complete the drone refresh started by trigger_refresh. 497 498 Waits for all drone threads then refreshes internal datastructures 499 with drone process information. 500 """ 501 502 # This gives us a dictionary like what follows: 503 # {drone: [{'pidfiles': (raw contents of pidfile paths), 504 # 'autoserv_processes': (autoserv process info from ps), 505 # 'all_processes': (all process info from ps), 506 # 'parse_processes': (parse process infor from ps), 507 # 'pidfile_second_read': (pidfile contents, again),}] 508 # drone2: ...} 509 # The values of each drone are only a list because this adheres to the 510 # drone utility interface (each call is executed and its results are 511 # places in a list, but since we never couple the refresh calls with 512 # any other call, this list will always contain a single dict). 513 with metrics.SecondsTimer( 514 'chromeos/autotest/drone_manager/sync_refresh_duration'): 515 all_results = self._refresh_task_queue.get_results() 516 logging.info("Drones refreshed.") 517 518 # The loop below goes through and parses pidfile contents. Pidfiles 519 # are used to track autoserv execution, and will always contain < 3 520 # lines of the following: pid, exit code, number of tests. Each pidfile 521 # is identified by a PidfileId object, which contains a unique pidfile 522 # path (unique because it contains the job id) making it hashable. 523 # All pidfiles are stored in the drone managers _pidfiles dict as: 524 # {pidfile_id: pidfile_contents(Process(drone, pid), 525 # exit_code, num_tests_failed)} 526 # In handle agents, each agent knows its pidfile_id, and uses this 527 # to retrieve the refreshed contents of its pidfile via the 528 # PidfileRunMonitor (through its tick) before making decisions. If 529 # the agent notices that its process has exited, it unregisters the 530 # pidfile from the drone_managers._registered_pidfile_info dict 531 # through its epilog. 532 for drone, results_list in all_results.iteritems(): 533 results = results_list[0] 534 drone_hostname = drone.hostname.replace('.', '_') 535 536 for process_info in results['all_processes']: 537 if process_info['comm'] == 'autoserv': 538 self._add_autoserv_process(drone, process_info) 539 drone_pid = drone.hostname, int(process_info['pid']) 540 self._all_processes[drone_pid] = process_info 541 542 for process_info in results['parse_processes']: 543 self._add_process(drone, process_info) 544 545 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles) 546 self._process_pidfiles(drone, results['pidfiles_second_read'], 547 self._pidfiles_second_read) 548 549 self._compute_active_processes(drone) 550 if drone.enabled: 551 self._enqueue_drone(drone) 552 self._check_drone_process_limit(drone) 553 554 555 def refresh(self): 556 """Refresh all drones.""" 557 with metrics.SecondsTimer( 558 'chromeos/autotest/drone_manager/refresh_duration'): 559 self.trigger_refresh() 560 self.sync_refresh() 561 562 563 @metrics.SecondsTimerDecorator( 564 'chromeos/autotest/drone_manager/execute_actions_duration') 565 def execute_actions(self): 566 """ 567 Called at the end of a scheduler cycle to execute all queued actions 568 on drones. 569 """ 570 # Invoke calls queued on all drones since the last call to execute 571 # and wait for them to return. 572 if _THREADED_DRONE_MANAGER: 573 thread_lib.ThreadedTaskQueue( 574 name='%s.execute_queue' % self._STATS_KEY).execute( 575 self._drones.values()) 576 else: 577 drone_task_queue.DroneTaskQueue().execute(self._drones.values()) 578 579 try: 580 self._results_drone.execute_queued_calls() 581 except error.AutoservError: 582 m = 'chromeos/autotest/errors/results_repository_failed' 583 metrics.Counter(m).increment( 584 fields={'drone_hostname': self._results_drone.hostname}) 585 self._results_drone.clear_call_queue() 586 587 588 def get_orphaned_autoserv_processes(self): 589 """ 590 Returns a set of Process objects for orphaned processes only. 591 """ 592 return set(process for process in self._process_set 593 if process.ppid == 1) 594 595 596 def kill_process(self, process): 597 """ 598 Kill the given process. 599 """ 600 logging.info('killing %s', process) 601 drone = self._get_drone_for_process(process) 602 drone.queue_kill_process(process) 603 604 605 def _ensure_directory_exists(self, path): 606 if not os.path.exists(path): 607 os.makedirs(path) 608 609 610 def total_running_processes(self): 611 return sum(drone.active_processes for drone in self.get_drones()) 612 613 614 def max_runnable_processes(self, username, drone_hostnames_allowed): 615 """ 616 Return the maximum number of processes that can be run (in a single 617 execution) given the current load on drones. 618 @param username: login of user to run a process. may be None. 619 @param drone_hostnames_allowed: list of drones that can be used. May be 620 None 621 """ 622 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue 623 if wrapper.drone.usable_by(username) and 624 (drone_hostnames_allowed is None or 625 wrapper.drone.hostname in 626 drone_hostnames_allowed)] 627 if not usable_drone_wrappers: 628 # all drones disabled or inaccessible 629 return 0 630 runnable_processes = [ 631 wrapper.drone.max_processes - wrapper.drone.active_processes 632 for wrapper in usable_drone_wrappers] 633 return max([0] + runnable_processes) 634 635 636 def _least_loaded_drone(self, drones): 637 return min(drones, key=lambda d: d.used_capacity()) 638 639 640 def pick_drone_to_use(self, num_processes=1, prefer_ssp=False): 641 """Return a drone to use. 642 643 Various options can be passed to optimize drone selection. 644 645 num_processes is the number of processes the drone is intended 646 to run. 647 648 prefer_ssp indicates whether drones supporting server-side 649 packaging should be preferred. The returned drone is not 650 guaranteed to support it. 651 652 This public API is exposed for luciferlib to wrap. 653 654 Returns a drone instance (see drones.py). 655 """ 656 return self._choose_drone_for_execution( 657 num_processes=num_processes, 658 username=None, # Always allow all drones 659 drone_hostnames_allowed=None, # Always allow all drones 660 require_ssp=prefer_ssp, 661 ) 662 663 664 def _choose_drone_for_execution(self, num_processes, username, 665 drone_hostnames_allowed, 666 require_ssp=False): 667 """Choose a drone to execute command. 668 669 @param num_processes: Number of processes needed for execution. 670 @param username: Name of the user to execute the command. 671 @param drone_hostnames_allowed: A list of names of drone allowed. 672 @param require_ssp: Require server-side packaging to execute the, 673 command, default to False. 674 675 @return: A drone object to be used for execution. 676 """ 677 # cycle through drones is order of increasing used capacity until 678 # we find one that can handle these processes 679 checked_drones = [] 680 usable_drones = [] 681 # Drones do not support server-side packaging, used as backup if no 682 # drone is found to run command requires server-side packaging. 683 no_ssp_drones = [] 684 drone_to_use = None 685 while self._drone_queue: 686 drone = heapq.heappop(self._drone_queue).drone 687 checked_drones.append(drone) 688 logging.info('Checking drone %s', drone.hostname) 689 if not drone.usable_by(username): 690 continue 691 692 drone_allowed = (drone_hostnames_allowed is None 693 or drone.hostname in drone_hostnames_allowed) 694 if not drone_allowed: 695 logging.debug('Drone %s not allowed: ', drone.hostname) 696 continue 697 if require_ssp and not drone.support_ssp: 698 logging.debug('Drone %s does not support server-side ' 699 'packaging.', drone.hostname) 700 no_ssp_drones.append(drone) 701 continue 702 703 usable_drones.append(drone) 704 705 if drone.active_processes + num_processes <= drone.max_processes: 706 drone_to_use = drone 707 break 708 logging.info('Drone %s has %d active + %s requested > %s max', 709 drone.hostname, drone.active_processes, num_processes, 710 drone.max_processes) 711 712 if not drone_to_use and usable_drones: 713 # Drones are all over loaded, pick the one with least load. 714 drone_summary = ','.join('%s %s/%s' % (drone.hostname, 715 drone.active_processes, 716 drone.max_processes) 717 for drone in usable_drones) 718 logging.error('No drone has capacity to handle %d processes (%s) ' 719 'for user %s', num_processes, drone_summary, username) 720 drone_to_use = self._least_loaded_drone(usable_drones) 721 elif not drone_to_use and require_ssp and no_ssp_drones: 722 # No drone supports server-side packaging, choose the least loaded. 723 drone_to_use = self._least_loaded_drone(no_ssp_drones) 724 725 # refill _drone_queue 726 for drone in checked_drones: 727 self._enqueue_drone(drone) 728 729 return drone_to_use 730 731 732 def _substitute_working_directory_into_command(self, command, 733 working_directory): 734 for i, item in enumerate(command): 735 if item is WORKING_DIRECTORY: 736 command[i] = working_directory 737 738 739 def execute_command(self, command, working_directory, pidfile_name, 740 num_processes, log_file=None, paired_with_pidfile=None, 741 username=None, drone_hostnames_allowed=None): 742 """ 743 Execute the given command, taken as an argv list. 744 745 @param command: command to execute as a list. if any item is 746 WORKING_DIRECTORY, the absolute path to the working directory 747 will be substituted for it. 748 @param working_directory: directory in which the pidfile will be written 749 @param pidfile_name: name of the pidfile this process will write 750 @param num_processes: number of processes to account for from this 751 execution 752 @param log_file (optional): path (in the results repository) to hold 753 command output. 754 @param paired_with_pidfile (optional): a PidfileId for an 755 already-executed process; the new process will execute on the 756 same drone as the previous process. 757 @param username (optional): login of the user responsible for this 758 process. 759 @param drone_hostnames_allowed (optional): hostnames of the drones that 760 this command is allowed to 761 execute on 762 """ 763 abs_working_directory = self.absolute_path(working_directory) 764 if not log_file: 765 log_file = self.get_temporary_path('execute') 766 log_file = self.absolute_path(log_file) 767 768 self._substitute_working_directory_into_command(command, 769 abs_working_directory) 770 771 if paired_with_pidfile: 772 drone = self._get_drone_for_pidfile_id(paired_with_pidfile) 773 else: 774 require_ssp = '--require-ssp' in command 775 drone = self._choose_drone_for_execution( 776 num_processes, username, drone_hostnames_allowed, 777 require_ssp=require_ssp) 778 # Enable --warn-no-ssp option for autoserv to log a warning and run 779 # the command without using server-side packaging. 780 if require_ssp and not drone.support_ssp: 781 command.append('--warn-no-ssp') 782 783 if not drone: 784 raise DroneManagerError('command failed; no drones available: %s' 785 % command) 786 787 logging.info("command = %s", command) 788 logging.info('log file = %s:%s', drone.hostname, log_file) 789 self._write_attached_files(working_directory, drone) 790 drone.queue_call('execute_command', command, abs_working_directory, 791 log_file, pidfile_name) 792 drone.active_processes += num_processes 793 self._reorder_drone_queue() 794 795 pidfile_path = os.path.join(abs_working_directory, pidfile_name) 796 pidfile_id = PidfileId(pidfile_path) 797 self.register_pidfile(pidfile_id) 798 self._registered_pidfile_info[pidfile_id].num_processes = num_processes 799 return pidfile_id 800 801 802 def get_pidfile_id_from(self, execution_tag, pidfile_name): 803 path = os.path.join(self.absolute_path(execution_tag), pidfile_name) 804 return PidfileId(path) 805 806 807 def register_pidfile(self, pidfile_id): 808 """ 809 Indicate that the DroneManager should look for the given pidfile when 810 refreshing. 811 """ 812 if pidfile_id not in self._registered_pidfile_info: 813 logging.info('monitoring pidfile %s', pidfile_id) 814 self._registered_pidfile_info[pidfile_id] = _PidfileInfo() 815 self._reset_pidfile_age(pidfile_id) 816 817 818 def _reset_pidfile_age(self, pidfile_id): 819 if pidfile_id in self._registered_pidfile_info: 820 self._registered_pidfile_info[pidfile_id].age = 0 821 822 823 def unregister_pidfile(self, pidfile_id): 824 if pidfile_id in self._registered_pidfile_info: 825 logging.info('forgetting pidfile %s', pidfile_id) 826 del self._registered_pidfile_info[pidfile_id] 827 828 829 def declare_process_count(self, pidfile_id, num_processes): 830 self._registered_pidfile_info[pidfile_id].num_processes = num_processes 831 832 833 def get_pidfile_contents(self, pidfile_id, use_second_read=False): 834 """ 835 Retrieve a PidfileContents object for the given pidfile_id. If 836 use_second_read is True, use results that were read after the processes 837 were checked, instead of before. 838 """ 839 self._reset_pidfile_age(pidfile_id) 840 if use_second_read: 841 pidfile_map = self._pidfiles_second_read 842 else: 843 pidfile_map = self._pidfiles 844 return pidfile_map.get(pidfile_id, PidfileContents()) 845 846 847 def is_process_running(self, process): 848 """ 849 Check if the given process is in the running process list. 850 """ 851 if process in self._process_set: 852 return True 853 854 drone_pid = process.hostname, process.pid 855 if drone_pid in self._all_processes: 856 logging.error('Process %s found, but not an autoserv process. ' 857 'Is %s', process, self._all_processes[drone_pid]) 858 return True 859 860 return False 861 862 863 def get_temporary_path(self, base_name): 864 """ 865 Get a new temporary path guaranteed to be unique across all drones 866 for this scheduler execution. 867 """ 868 self._temporary_path_counter += 1 869 return os.path.join(drone_utility._TEMPORARY_DIRECTORY, 870 '%s.%s' % (base_name, self._temporary_path_counter)) 871 872 873 def absolute_path(self, path, on_results_repository=False): 874 if on_results_repository: 875 base_dir = self._results_dir 876 else: 877 base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR, 878 _DRONE_RESULTS_DIR_SUFFIX) 879 return os.path.join(base_dir, path) 880 881 882 def _copy_results_helper(self, process, source_path, destination_path, 883 to_results_repository=False): 884 logging.debug('_copy_results_helper. process: %s, source_path: %s, ' 885 'destination_path: %s, to_results_repository: %s', 886 process, source_path, destination_path, 887 to_results_repository) 888 full_source = self.absolute_path(source_path) 889 full_destination = self.absolute_path( 890 destination_path, on_results_repository=to_results_repository) 891 source_drone = self._get_drone_for_process(process) 892 if to_results_repository: 893 source_drone.send_file_to(self._results_drone, full_source, 894 full_destination, can_fail=True) 895 else: 896 source_drone.queue_call('copy_file_or_directory', full_source, 897 full_destination) 898 899 900 def copy_to_results_repository(self, process, source_path, 901 destination_path=None): 902 """ 903 Copy results from the given process at source_path to destination_path 904 in the results repository. 905 906 This will only copy the results back for Special Agent Tasks (Cleanup, 907 Verify, Repair) that reside in the hosts/ subdirectory of results if 908 the copy_task_results_back flag has been set to True inside 909 global_config.ini 910 911 It will also only copy .parse.log files back to the scheduler if the 912 copy_parse_log_back flag in global_config.ini has been set to True. 913 """ 914 if not ENABLE_ARCHIVING: 915 return 916 copy_task_results_back = global_config.global_config.get_config_value( 917 scheduler_config.CONFIG_SECTION, 'copy_task_results_back', 918 type=bool) 919 copy_parse_log_back = global_config.global_config.get_config_value( 920 scheduler_config.CONFIG_SECTION, 'copy_parse_log_back', 921 type=bool) 922 special_task = source_path.startswith(HOSTS_JOB_SUBDIR) 923 parse_log = source_path.endswith(PARSE_LOG) 924 if (copy_task_results_back or not special_task) and ( 925 copy_parse_log_back or not parse_log): 926 if destination_path is None: 927 destination_path = source_path 928 self._copy_results_helper(process, source_path, destination_path, 929 to_results_repository=True) 930 931 def _copy_to_results_repository(self, process, source_path, 932 destination_path=None): 933 """ 934 Copy results from the given process at source_path to destination_path 935 in the results repository, without special task handling. 936 """ 937 if destination_path is None: 938 destination_path = source_path 939 self._copy_results_helper(process, source_path, destination_path, 940 to_results_repository=True) 941 942 943 def copy_results_on_drone(self, process, source_path, destination_path): 944 """ 945 Copy a results directory from one place to another on the drone. 946 """ 947 self._copy_results_helper(process, source_path, destination_path) 948 949 950 def _write_attached_files(self, results_dir, drone): 951 attached_files = self._attached_files.pop(results_dir, {}) 952 for file_path, contents in attached_files.iteritems(): 953 drone.queue_call('write_to_file', self.absolute_path(file_path), 954 contents) 955 956 957 def attach_file_to_execution(self, results_dir, file_contents, 958 file_path=None): 959 """ 960 When the process for the results directory is executed, the given file 961 contents will be placed in a file on the drone. Returns the path at 962 which the file will be placed. 963 """ 964 if not file_path: 965 file_path = self.get_temporary_path('attach') 966 files_for_execution = self._attached_files.setdefault(results_dir, {}) 967 assert file_path not in files_for_execution 968 files_for_execution[file_path] = file_contents 969 return file_path 970 971 972 def write_lines_to_file(self, file_path, lines, paired_with_process=None): 973 """ 974 Write the given lines (as a list of strings) to a file. If 975 paired_with_process is given, the file will be written on the drone 976 running the given Process. Otherwise, the file will be written to the 977 results repository. 978 """ 979 file_contents = '\n'.join(lines) + '\n' 980 if paired_with_process: 981 drone = self._get_drone_for_process(paired_with_process) 982 on_results_repository = False 983 else: 984 drone = self._results_drone 985 on_results_repository = True 986 full_path = self.absolute_path( 987 file_path, on_results_repository=on_results_repository) 988 drone.queue_call('write_to_file', full_path, file_contents) 989 990 991_the_instance = None 992 993def instance(): 994 if _the_instance is None: 995 _set_instance(DroneManager()) 996 return _the_instance 997 998 999def _set_instance(instance): # usable for testing 1000 global _the_instance 1001 _the_instance = instance 1002