1# Copyright 2017 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Library providing an API to lucifer.""" 6 7import os 8import logging 9import pipes 10import socket 11 12import common 13from autotest_lib.client.bin import local_host 14from autotest_lib.client.common_lib import global_config 15from autotest_lib.server.hosts import ssh_host 16from autotest_lib.frontend.afe import models 17 18_config = global_config.global_config 19_SECTION = 'LUCIFER' 20 21# TODO(crbug.com/748234): Move these to shadow_config.ini 22# See also drones.AUTOTEST_INSTALL_DIR 23_ENV = '/usr/bin/env' 24_AUTOTEST_DIR = '/usr/local/autotest' 25_JOB_REPORTER_PATH = os.path.join(_AUTOTEST_DIR, 'bin', 'job_reporter') 26 27logger = logging.getLogger(__name__) 28 29 30def is_lucifer_enabled(): 31 """Return True if lucifer is enabled in the config.""" 32 return True 33 34 35def is_enabled_for(level): 36 """Return True if lucifer is enabled for the given level. 37 38 @param level: string, e.g. 'PARSING', 'GATHERING' 39 """ 40 if not is_lucifer_enabled(): 41 return False 42 config_level = (_config.get_config_value(_SECTION, 'lucifer_level') 43 .upper()) 44 return level.upper() == config_level 45 46 47def is_lucifer_owned(job): 48 """Return True if job is already sent to lucifer.""" 49 return hasattr(job, 'jobhandoff') 50 51 52def is_split_job(hqe_id): 53 """Return True if HQE is part of a job with HQEs in a different group. 54 55 For examples if the given HQE have execution_subdir=foo and the job 56 has an HQE with execution_subdir=bar, then return True. The only 57 situation where this happens is if provisioning in a multi-DUT job 58 fails, the HQEs will each be in their own group. 59 60 See https://bugs.chromium.org/p/chromium/issues/detail?id=811877 61 62 @param hqe_id: HQE id 63 """ 64 hqe = models.HostQueueEntry.objects.get(id=hqe_id) 65 hqes = hqe.job.hostqueueentry_set.all() 66 try: 67 _get_consistent_execution_path(hqes) 68 except _ExecutionPathError: 69 return True 70 return False 71 72 73# TODO(crbug.com/748234): This is temporary to enable toggling 74# lucifer rollouts with an option. 75def spawn_starting_job_handler(manager, job): 76 """Spawn job_reporter to handle a job. 77 78 Pass all arguments by keyword. 79 80 @param manager: scheduler.drone_manager.DroneManager instance 81 @param job: Job instance 82 @returns: Drone instance 83 """ 84 raise NotImplementedError 85 86 87# TODO(crbug.com/748234): This is temporary to enable toggling 88# lucifer rollouts with an option. 89def spawn_gathering_job_handler(manager, job, autoserv_exit, pidfile_id=None): 90 """Spawn job_reporter to handle a job. 91 92 Pass all arguments by keyword. 93 94 @param manager: scheduler.drone_manager.DroneManager instance 95 @param job: Job instance 96 @param autoserv_exit: autoserv exit status 97 @param pidfile_id: PidfileId instance 98 @returns: Drone instance 99 """ 100 manager = _DroneManager(manager) 101 if pidfile_id is None: 102 drone = manager.pick_drone_to_use() 103 else: 104 drone = manager.get_drone_for_pidfile(pidfile_id) 105 results_dir = _results_dir(manager, job) 106 num_tests_failed = manager.get_num_tests_failed(pidfile_id) 107 args = [ 108 _JOB_REPORTER_PATH, 109 110 # General configuration 111 '--jobdir', _get_jobdir(), 112 '--run-job-path', _get_run_job_path(), 113 '--watcher-path', _get_watcher_path(), 114 115 # Job specific 116 '--job-id', str(job.id), 117 '--lucifer-level', 'GATHERING', 118 '--autoserv-exit', str(autoserv_exit), 119 '--need-gather', 120 '--num-tests-failed', str(num_tests_failed), 121 '--results-dir', results_dir, 122 ] 123 if _get_gcp_creds(): 124 args = [ 125 'GOOGLE_APPLICATION_CREDENTIALS=%s' 126 % pipes.quote(_get_gcp_creds()), 127 ] + args 128 output_file = os.path.join(results_dir, 'job_reporter_output.log') 129 drone.spawn(_ENV, args, output_file=output_file) 130 return drone 131 132 133# TODO(crbug.com/748234): This is temporary to enable toggling 134# lucifer rollouts with an option. 135def spawn_parsing_job_handler(manager, job, autoserv_exit, pidfile_id=None): 136 """Spawn job_reporter to handle a job. 137 138 Pass all arguments by keyword. 139 140 @param manager: scheduler.drone_manager.DroneManager instance 141 @param job: Job instance 142 @param autoserv_exit: autoserv exit status 143 @param pidfile_id: PidfileId instance 144 @returns: Drone instance 145 """ 146 manager = _DroneManager(manager) 147 if pidfile_id is None: 148 drone = manager.pick_drone_to_use() 149 else: 150 drone = manager.get_drone_for_pidfile(pidfile_id) 151 results_dir = _results_dir(manager, job) 152 args = [ 153 _JOB_REPORTER_PATH, 154 155 # General configuration 156 '--jobdir', _get_jobdir(), 157 '--run-job-path', _get_run_job_path(), 158 '--watcher-path', _get_watcher_path(), 159 160 # Job specific 161 '--job-id', str(job.id), 162 '--lucifer-level', 'GATHERING', 163 '--autoserv-exit', str(autoserv_exit), 164 '--results-dir', results_dir, 165 ] 166 if _get_gcp_creds(): 167 args = [ 168 'GOOGLE_APPLICATION_CREDENTIALS=%s' 169 % pipes.quote(_get_gcp_creds()), 170 ] + args 171 output_file = os.path.join(results_dir, 'job_reporter_output.log') 172 drone.spawn(_ENV, args, output_file=output_file) 173 return drone 174 175 176def _get_jobdir(): 177 return _config.get_config_value(_SECTION, 'jobdir') 178 179 180def _get_run_job_path(): 181 return os.path.join(_get_binaries_path(), 'lucifer_run_job') 182 183 184def _get_watcher_path(): 185 return os.path.join(_get_binaries_path(), 'lucifer_watcher') 186 187 188def _get_binaries_path(): 189 """Get binaries dir path from config..""" 190 return _config.get_config_value(_SECTION, 'binaries_path') 191 192 193def _get_gcp_creds(): 194 """Return path to GCP service account credentials. 195 196 This is the empty string by default, if no credentials will be used. 197 """ 198 return _config.get_config_value(_SECTION, 'gcp_creds', default='') 199 200 201class _DroneManager(object): 202 """Simplified drone API.""" 203 204 def __init__(self, old_manager): 205 """Initialize instance. 206 207 @param old_manager: old style DroneManager 208 """ 209 self._manager = old_manager 210 211 def get_num_tests_failed(self, pidfile_id): 212 """Return the number of tests failed for autoserv by pidfile. 213 214 @param pidfile_id: PidfileId instance. 215 @returns: int (-1 if missing) 216 """ 217 state = self._manager.get_pidfile_contents(pidfile_id) 218 if state.num_tests_failed is None: 219 return -1 220 return state.num_tests_failed 221 222 def get_drone_for_pidfile(self, pidfile_id): 223 """Return a drone to use from a pidfile. 224 225 @param pidfile_id: PidfileId instance. 226 """ 227 return _wrap_drone(self._manager.get_drone_for_pidfile_id(pidfile_id)) 228 229 def pick_drone_to_use(self, num_processes=1, prefer_ssp=False): 230 """Return a drone to use. 231 232 Various options can be passed to optimize drone selection. 233 234 @param num_processes: number of processes the drone is intended 235 to run 236 @param prefer_ssp: indicates whether drones supporting 237 server-side packaging should be preferred. The returned 238 drone is not guaranteed to support it. 239 """ 240 old_drone = self._manager.pick_drone_to_use( 241 num_processes=num_processes, 242 prefer_ssp=prefer_ssp, 243 ) 244 return _wrap_drone(old_drone) 245 246 def absolute_path(self, path): 247 """Return absolute path for drone results. 248 249 The returned path might be remote. 250 """ 251 return self._manager.absolute_path(path) 252 253 254def _wrap_drone(old_drone): 255 """Wrap an old style drone.""" 256 host = old_drone._host 257 if isinstance(host, local_host.LocalHost): 258 return LocalDrone() 259 elif isinstance(host, ssh_host.SSHHost): 260 return RemoteDrone(host) 261 else: 262 raise TypeError('Drone has an unknown host type') 263 264 265def _results_dir(manager, job): 266 """Return results dir for a job. 267 268 Path may be on a remote host. 269 """ 270 return manager.absolute_path(_working_directory(job)) 271 272 273def _working_directory(job): 274 return _get_consistent_execution_path(job.hostqueueentry_set.all()) 275 276 277def _get_consistent_execution_path(execution_entries): 278 first_execution_path = execution_entries[0].execution_path() 279 for execution_entry in execution_entries[1:]: 280 if execution_entry.execution_path() != first_execution_path: 281 raise _ExecutionPathError( 282 '%s (%s) != %s (%s)' 283 % (execution_entry.execution_path(), 284 execution_entry, 285 first_execution_path, 286 execution_entries[0])) 287 return first_execution_path 288 289 290class _ExecutionPathError(Exception): 291 """Raised by _get_consistent_execution_path().""" 292 293 294class Drone(object): 295 """Simplified drone API.""" 296 297 def hostname(self): 298 """Return the hostname of the drone.""" 299 300 def spawn(self, path, args, output_file): 301 """Spawn an independent process. 302 303 path must be an absolute path. path may be on a remote machine. 304 args is a list of arguments. 305 306 The process is spawned in its own session. It should not try to 307 obtain a controlling terminal. 308 309 The new process will have stdin opened to /dev/null and stdout, 310 stderr opened to output_file. 311 312 output_file is a pathname, but how it is interpreted is 313 implementation defined, e.g., it may be a remote file. 314 """ 315 316 317class LocalDrone(Drone): 318 """Local implementation of Drone.""" 319 320 def hostname(self): 321 return socket.gethostname() 322 323 def spawn(self, path, args, output_file): 324 _spawn(path, [path] + args, output_file) 325 326 327class RemoteDrone(Drone): 328 """Remote implementation of Drone through SSH.""" 329 330 def __init__(self, host): 331 if not isinstance(host, ssh_host.SSHHost): 332 raise TypeError('RemoteDrone must be passed an SSHHost') 333 self._host = host 334 335 def hostname(self): 336 return self._host.hostname 337 338 def spawn(self, path, args, output_file): 339 cmd_parts = [path] + args 340 safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts) 341 safe_file = pipes.quote(output_file) 342 # SSH creates a session for each command, so we do not have to 343 # do it. 344 self._host.run('%(cmd)s <%(null)s >>%(file)s 2>&1 &' 345 % {'cmd': safe_cmd, 346 'file': safe_file, 347 'null': os.devnull}) 348 349 350def _spawn(path, argv, output_file): 351 """Spawn a new process in its own session. 352 353 path must be an absolute path. The first item in argv should be 354 path. 355 356 In the calling process, this function returns on success. 357 The forked process puts itself in its own session and execs. 358 359 The new process will have stdin opened to /dev/null and stdout, 360 stderr opened to output_file. 361 """ 362 logger.info('Spawning %r, %r, %r', path, argv, output_file) 363 assert all(isinstance(arg, basestring) for arg in argv) 364 pid = os.fork() 365 if pid: 366 os.waitpid(pid, 0) 367 return 368 # Double fork to reparent to init since monitor_db does not reap. 369 if os.fork(): 370 os._exit(os.EX_OK) 371 os.setsid() 372 null_fd = os.open(os.devnull, os.O_RDONLY) 373 os.dup2(null_fd, 0) 374 os.close(null_fd) 375 out_fd = os.open(output_file, os.O_WRONLY | os.O_APPEND | os.O_CREAT) 376 os.dup2(out_fd, 1) 377 os.dup2(out_fd, 2) 378 os.close(out_fd) 379 os.execv(path, argv) 380