1#pylint: disable-msg=C0111 2 3import cPickle 4import logging 5import os 6import time 7 8import common 9from autotest_lib.scheduler import drone_utility, email_manager 10from autotest_lib.client.bin import local_host 11from autotest_lib.client.common_lib import error, global_config 12 13CONFIG = global_config.global_config 14AUTOTEST_INSTALL_DIR = CONFIG.get_config_value('SCHEDULER', 15 'drone_installation_directory') 16DEFAULT_CONTAINER_PATH = CONFIG.get_config_value('AUTOSERV', 'container_path') 17 18SSP_REQUIRED = CONFIG.get_config_value('SCHEDULER', 'exit_on_failed_ssp_setup', 19 default=False) 20 21class DroneUnreachable(Exception): 22 """The drone is non-sshable.""" 23 pass 24 25 26class SiteDrone(object): 27 """ 28 Attributes: 29 * allowed_users: set of usernames allowed to use this drone. if None, 30 any user can use this drone. 31 """ 32 def __init__(self, timestamp_remote_calls=True): 33 """Instantiate an abstract drone. 34 35 @param timestamp_remote_calls: If true, drone_utility is invoked with 36 the --call_time option and the current time. Currently this is only 37 used for testing. 38 """ 39 self._calls = [] 40 self.hostname = None 41 self.enabled = True 42 self.max_processes = 0 43 self.active_processes = 0 44 self.allowed_users = None 45 self._autotest_install_dir = AUTOTEST_INSTALL_DIR 46 self._host = None 47 self.timestamp_remote_calls = timestamp_remote_calls 48 self._processes_to_kill = [] 49 50 51 def shutdown(self): 52 pass 53 54 55 @property 56 def _drone_utility_path(self): 57 return os.path.join(self._autotest_install_dir, 58 'scheduler', 'drone_utility.py') 59 60 61 def used_capacity(self): 62 """Gets the capacity used by this drone 63 64 Returns a tuple of (percentage_full, -max_capacity). This is to aid 65 direct comparisons, so that a 0/10 drone is considered less heavily 66 loaded than a 0/2 drone. 67 68 This value should never be used directly. It should only be used in 69 direct comparisons using the basic comparison operators, or using the 70 cmp() function. 71 """ 72 if self.max_processes == 0: 73 return (1.0, 0) 74 return (float(self.active_processes) / self.max_processes, 75 -self.max_processes) 76 77 78 def usable_by(self, user): 79 if self.allowed_users is None: 80 return True 81 return user in self.allowed_users 82 83 84 def _execute_calls_impl(self, calls): 85 if not self._host: 86 raise ValueError('Drone cannot execute calls without a host.') 87 drone_utility_cmd = self._drone_utility_path 88 if self.timestamp_remote_calls: 89 drone_utility_cmd = '%s --call_time %s' % ( 90 drone_utility_cmd, time.time()) 91 logging.info("Running drone_utility on %s", self.hostname) 92 result = self._host.run('python %s' % drone_utility_cmd, 93 stdin=cPickle.dumps(calls), stdout_tee=None, 94 connect_timeout=300) 95 try: 96 return cPickle.loads(result.stdout) 97 except Exception: # cPickle.loads can throw all kinds of exceptions 98 logging.critical('Invalid response:\n---\n%s\n---', result.stdout) 99 raise 100 101 102 def _execute_calls(self, calls): 103 return_message = self._execute_calls_impl(calls) 104 for warning in return_message['warnings']: 105 subject = 'Warning from drone %s' % self.hostname 106 logging.warning(subject + '\n' + warning) 107 email_manager.manager.enqueue_notify_email(subject, warning) 108 return return_message['results'] 109 110 111 def get_calls(self): 112 """Returns the calls queued against this drone. 113 114 @return: A list of calls queued against the drone. 115 """ 116 return self._calls 117 118 119 def call(self, method, *args, **kwargs): 120 return self._execute_calls( 121 [drone_utility.call(method, *args, **kwargs)]) 122 123 124 def queue_call(self, method, *args, **kwargs): 125 self._calls.append(drone_utility.call(method, *args, **kwargs)) 126 127 128 def clear_call_queue(self): 129 self._calls = [] 130 131 132 def execute_queued_calls(self): 133 """Execute queued calls. 134 135 If there are any processes queued to kill, kill them then process the 136 remaining queued up calls. 137 """ 138 if self._processes_to_kill: 139 self.queue_call('kill_processes', self._processes_to_kill) 140 self.clear_processes_to_kill() 141 142 if not self._calls: 143 return 144 results = self._execute_calls(self._calls) 145 self.clear_call_queue() 146 return results 147 148 149 def set_autotest_install_dir(self, path): 150 pass 151 152 153 def queue_kill_process(self, process): 154 """Queue a process to kill/abort. 155 156 @param process: Process to kill/abort. 157 """ 158 self._processes_to_kill.append(process) 159 160 161 def clear_processes_to_kill(self): 162 """Reset the list of processes to kill for this tick.""" 163 self._processes_to_kill = [] 164 165 166class _AbstractDrone(SiteDrone): 167 pass 168 169 170class _LocalDrone(_AbstractDrone): 171 def __init__(self, timestamp_remote_calls=True): 172 super(_LocalDrone, self).__init__( 173 timestamp_remote_calls=timestamp_remote_calls) 174 self.hostname = 'localhost' 175 self._host = local_host.LocalHost() 176 177 178 def send_file_to(self, drone, source_path, destination_path, 179 can_fail=False): 180 if drone.hostname == self.hostname: 181 self.queue_call('copy_file_or_directory', source_path, 182 destination_path) 183 else: 184 self.queue_call('send_file_to', drone.hostname, source_path, 185 destination_path, can_fail) 186 187 188class _RemoteDrone(_AbstractDrone): 189 def __init__(self, hostname, timestamp_remote_calls=True): 190 super(_RemoteDrone, self).__init__( 191 timestamp_remote_calls=timestamp_remote_calls) 192 self.hostname = hostname 193 self._host = drone_utility.create_host(hostname) 194 if not self._host.is_up(): 195 logging.error('Drone %s is unpingable, kicking out', hostname) 196 raise DroneUnreachable 197 198 199 def set_autotest_install_dir(self, path): 200 self._autotest_install_dir = path 201 202 203 def shutdown(self): 204 super(_RemoteDrone, self).shutdown() 205 self._host.close() 206 207 208 def send_file_to(self, drone, source_path, destination_path, 209 can_fail=False): 210 if drone.hostname == self.hostname: 211 self.queue_call('copy_file_or_directory', source_path, 212 destination_path) 213 elif isinstance(drone, _LocalDrone): 214 drone.queue_call('get_file_from', self.hostname, source_path, 215 destination_path) 216 else: 217 self.queue_call('send_file_to', drone.hostname, source_path, 218 destination_path, can_fail) 219 220 221def get_drone(hostname): 222 """ 223 Use this factory method to get drone objects. 224 """ 225 if hostname == 'localhost': 226 return _LocalDrone() 227 try: 228 return _RemoteDrone(hostname) 229 except DroneUnreachable: 230 return None 231