1# Copyright 2017 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Library providing an API to lucifer.""" 6 7import os 8import logging 9import pipes 10import socket 11import subprocess 12 13import common 14from autotest_lib.client.bin import local_host 15from autotest_lib.client.common_lib import global_config 16from autotest_lib.scheduler.drone_manager import PidfileId 17from autotest_lib.server.hosts import ssh_host 18from autotest_lib.frontend.afe import models 19 20_config = global_config.global_config 21_SECTION = 'LUCIFER' 22 23# TODO(crbug.com/748234): Move these to shadow_config.ini 24# See also drones.AUTOTEST_INSTALL_DIR 25_ENV = '/usr/bin/env' 26_AUTOTEST_DIR = '/usr/local/autotest' 27_JOB_REPORTER_PATH = os.path.join(_AUTOTEST_DIR, 'bin', 'job_reporter') 28 29logger = logging.getLogger(__name__) 30 31 32def is_lucifer_enabled(): 33 """Return True if lucifer is enabled in the config.""" 34 return True 35 36 37def is_enabled_for(level): 38 """Return True if lucifer is enabled for the given level. 39 40 @param level: string, e.g. 'PARSING', 'GATHERING' 41 """ 42 if not is_lucifer_enabled(): 43 return False 44 config_level = (_config.get_config_value(_SECTION, 'lucifer_level') 45 .upper()) 46 return level.upper() == config_level 47 48 49def is_lucifer_owned(job): 50 """Return True if job is already sent to lucifer. 51 52 @param job: frontend.afe.models.Job instance 53 """ 54 assert isinstance(job, models.Job) 55 return hasattr(job, 'jobhandoff') 56 57 58def is_lucifer_owned_by_id(job_id): 59 """Return True if job is already sent to lucifer.""" 60 return models.JobHandoff.objects.filter(job_id=job_id).exists() 61 62 63def is_split_job(hqe_id): 64 """Return True if HQE is part of a job with HQEs in a different group. 65 66 For examples if the given HQE have execution_subdir=foo and the job 67 has an HQE with execution_subdir=bar, then return True. The only 68 situation where this happens is if provisioning in a multi-DUT job 69 fails, the HQEs will each be in their own group. 70 71 See https://bugs.chromium.org/p/chromium/issues/detail?id=811877 72 73 @param hqe_id: HQE id 74 """ 75 hqe = models.HostQueueEntry.objects.get(id=hqe_id) 76 hqes = hqe.job.hostqueueentry_set.all() 77 try: 78 _get_consistent_execution_path(hqes) 79 except ExecutionPathError: 80 return True 81 return False 82 83 84# TODO(crbug.com/748234): This is temporary to enable toggling 85# lucifer rollouts with an option. 86def spawn_starting_job_handler(manager, job): 87 """Spawn job_reporter to handle a job. 88 89 Pass all arguments by keyword. 90 91 @param manager: scheduler.drone_manager.DroneManager instance 92 @param job: Job instance 93 @returns: Drone instance 94 """ 95 manager = _DroneManager(manager) 96 drone = manager.pick_drone_to_use() 97 results_dir = _results_dir(manager, job) 98 args = [ 99 _JOB_REPORTER_PATH, 100 101 # General configuration 102 '--jobdir', _get_jobdir(), 103 '--lucifer-path', _get_lucifer_path(), 104 105 # Job specific 106 '--lucifer-level', 'STARTING', 107 '--job-id', str(job.id), 108 '--results-dir', results_dir, 109 110 # STARTING specific 111 '--execution-tag', _working_directory(job), 112 ] 113 if _get_gcp_creds(): 114 args = [ 115 'GOOGLE_APPLICATION_CREDENTIALS=%s' 116 % pipes.quote(_get_gcp_creds()), 117 ] + args 118 drone.spawn(_ENV, args, 119 output_file=_prepare_output_file(drone, results_dir)) 120 drone.add_active_processes(1) 121 manager.reorder_drone_queue() 122 manager.register_pidfile_processes( 123 os.path.join(results_dir, '.autoserv_execute'), 1) 124 return drone 125 126 127# TODO(crbug.com/748234): This is temporary to enable toggling 128# lucifer rollouts with an option. 129def spawn_parsing_job_handler(manager, job, autoserv_exit, pidfile_id=None): 130 """Spawn job_reporter to handle a job. 131 132 Pass all arguments by keyword. 133 134 @param manager: scheduler.drone_manager.DroneManager instance 135 @param job: Job instance 136 @param autoserv_exit: autoserv exit status 137 @param pidfile_id: PidfileId instance 138 @returns: Drone instance 139 """ 140 manager = _DroneManager(manager) 141 if pidfile_id is None: 142 drone = manager.pick_drone_to_use() 143 else: 144 drone = manager.get_drone_for_pidfile(pidfile_id) 145 results_dir = _results_dir(manager, job) 146 args = [ 147 _JOB_REPORTER_PATH, 148 149 # General configuration 150 '--jobdir', _get_jobdir(), 151 '--lucifer-path', _get_lucifer_path(), 152 153 # Job specific 154 '--job-id', str(job.id), 155 '--lucifer-level', 'STARTING', 156 '--parsing-only', 157 '--results-dir', results_dir, 158 ] 159 if _get_gcp_creds(): 160 args = [ 161 'GOOGLE_APPLICATION_CREDENTIALS=%s' 162 % pipes.quote(_get_gcp_creds()), 163 ] + args 164 drone.spawn(_ENV, args, 165 output_file=_prepare_output_file(drone, results_dir)) 166 drone.add_active_processes(1) 167 manager.reorder_drone_queue() 168 manager.register_pidfile_processes( 169 os.path.join(results_dir, '.autoserv_execute'), 1) 170 return drone 171 172 173_LUCIFER_DIR = 'lucifer' 174 175 176def _prepare_output_file(drone, results_dir): 177 logdir = os.path.join(results_dir, _LUCIFER_DIR) 178 drone.run('mkdir', ['-p', logdir]) 179 return os.path.join(logdir, 'job_reporter_output.log') 180 181 182def _get_jobdir(): 183 return _config.get_config_value(_SECTION, 'jobdir') 184 185 186def _get_lucifer_path(): 187 return os.path.join(_get_binaries_path(), 'lucifer') 188 189 190def _get_binaries_path(): 191 """Get binaries dir path from config..""" 192 return _config.get_config_value(_SECTION, 'binaries_path') 193 194 195def _get_gcp_creds(): 196 """Return path to GCP service account credentials. 197 198 This is the empty string by default, if no credentials will be used. 199 """ 200 return _config.get_config_value(_SECTION, 'gcp_creds', default='') 201 202 203class _DroneManager(object): 204 """Simplified drone API.""" 205 206 def __init__(self, old_manager): 207 """Initialize instance. 208 209 @param old_manager: old style DroneManager 210 """ 211 self._manager = old_manager 212 213 def get_num_tests_failed(self, pidfile_id): 214 """Return the number of tests failed for autoserv by pidfile. 215 216 @param pidfile_id: PidfileId instance. 217 @returns: int (-1 if missing) 218 """ 219 state = self._manager.get_pidfile_contents(pidfile_id) 220 if state.num_tests_failed is None: 221 return -1 222 return state.num_tests_failed 223 224 def get_drone_for_pidfile(self, pidfile_id): 225 """Return a drone to use from a pidfile. 226 227 @param pidfile_id: PidfileId instance. 228 """ 229 return _wrap_drone(self._manager.get_drone_for_pidfile_id(pidfile_id)) 230 231 def pick_drone_to_use(self, num_processes=1): 232 """Return a drone to use. 233 234 Various options can be passed to optimize drone selection. 235 236 @param num_processes: number of processes the drone is intended 237 to run 238 """ 239 old_drone = self._manager.pick_drone_to_use( 240 num_processes=num_processes, 241 ) 242 return _wrap_drone(old_drone) 243 244 def absolute_path(self, path): 245 """Return absolute path for drone results. 246 247 The returned path might be remote. 248 """ 249 return self._manager.absolute_path(path) 250 251 def register_pidfile_processes(self, path, count): 252 """Register a pidfile with the given number of processes. 253 254 This should be done to allow the drone manager to check the 255 number of processes still alive. This may be used to select 256 drones based on the number of active processes as a proxy for 257 load. 258 259 The exact semantics depends on the drone manager implementation; 260 implementation specific comments follow: 261 262 Pidfiles are kept in memory to track process count. Pidfiles 263 are rediscovered when the scheduler restarts. Thus, errors in 264 pidfile tracking can be fixed by restarting the scheduler.xo 265 """ 266 pidfile_id = PidfileId(path) 267 self._manager.register_pidfile(pidfile_id) 268 self._manager._registered_pidfile_info[pidfile_id].num_processes = count 269 270 def reorder_drone_queue(self): 271 """Reorder drone queue according to modified process counts. 272 273 Call this after Drone.add_active_processes(). 274 """ 275 self._manager.reorder_drone_queue() 276 277 278def _wrap_drone(old_drone): 279 """Wrap an old style drone.""" 280 host = old_drone._host 281 if isinstance(host, local_host.LocalHost): 282 return LocalDrone() 283 elif isinstance(host, ssh_host.SSHHost): 284 return RemoteDrone(old_drone) 285 else: 286 raise TypeError('Drone has an unknown host type') 287 288 289def _results_dir(manager, job): 290 """Return results dir for a job. 291 292 Path may be on a remote host. 293 """ 294 return manager.absolute_path(_working_directory(job)) 295 296 297def _working_directory(job): 298 return _get_consistent_execution_path(job.hostqueueentry_set.all()) 299 300 301def _get_consistent_execution_path(execution_entries): 302 first_execution_path = execution_entries[0].execution_path() 303 for execution_entry in execution_entries[1:]: 304 if execution_entry.execution_path() != first_execution_path: 305 raise ExecutionPathError( 306 '%s (%s) != %s (%s)' 307 % (execution_entry.execution_path(), 308 execution_entry, 309 first_execution_path, 310 execution_entries[0])) 311 return first_execution_path 312 313 314class ExecutionPathError(Exception): 315 """Raised by _get_consistent_execution_path().""" 316 317 318class Drone(object): 319 """Simplified drone API.""" 320 321 def hostname(self): 322 """Return the hostname of the drone.""" 323 324 def run(self, path, args): 325 """Run a command synchronously. 326 327 path must be an absolute path. path may be on a remote machine. 328 args is a list of arguments. 329 330 The process may or may not have its own session. The process 331 should be short-lived. It should not try to obtain a 332 controlling terminal. 333 334 The new process will have stdin, stdout, and stderr opened to 335 /dev/null. 336 337 This method intentionally has a very restrictive API. It should 338 be used to perform setup local to the drone, when the drone may 339 be a remote machine. 340 """ 341 342 def spawn(self, path, args, output_file): 343 """Spawn an independent process. 344 345 path must be an absolute path. path may be on a remote machine. 346 args is a list of arguments. 347 348 The process is spawned in its own session. It should not try to 349 obtain a controlling terminal. 350 351 The new process will have stdin opened to /dev/null and stdout, 352 stderr opened to output_file. 353 354 output_file is a pathname, but how it is interpreted is 355 implementation defined, e.g., it may be a remote file. 356 """ 357 358 def add_active_processes(self, count): 359 """Track additional number of active processes. 360 361 This may be used to select drones based on the number of active 362 processes as a proxy for load. 363 364 _DroneManager.register_pidfile_processes() and 365 _DroneManager.reorder_drone_queue() should also be called. 366 367 The exact semantics depends on the drone manager implementation; 368 implementation specific comments follow: 369 370 Process count is used as a proxy for workload, and one process 371 equals the workload of one autoserv or one job. This count is 372 recalculated during each scheduler tick, using pidfiles tracked 373 by the drone manager (so the count added by this function only 374 applies for one tick). 375 """ 376 377 378class LocalDrone(Drone): 379 """Local implementation of Drone.""" 380 381 def hostname(self): 382 return socket.gethostname() 383 384 def run(self, path, args): 385 with open(os.devnull, 'r+b') as null: 386 subprocess.call([path] + args, stdin=null, 387 stdout=null, stderr=null) 388 389 def spawn(self, path, args, output_file): 390 _spawn(path, [path] + args, output_file) 391 392 393class RemoteDrone(Drone): 394 """Remote implementation of Drone through SSH.""" 395 396 def __init__(self, drone): 397 host = drone._host 398 if not isinstance(host, ssh_host.SSHHost): 399 raise TypeError('RemoteDrone must be passed a drone with SSHHost') 400 self._drone = drone 401 self._host = drone._host 402 403 def hostname(self): 404 return self._host.hostname 405 406 def run(self, path, args): 407 cmd_parts = [path] + args 408 safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts) 409 self._host.run('%(cmd)s <%(null)s >%(null)s 2>&1' 410 % {'cmd': safe_cmd, 'null': os.devnull}) 411 412 def spawn(self, path, args, output_file): 413 cmd_parts = [path] + args 414 safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts) 415 safe_file = pipes.quote(output_file) 416 # SSH creates a session for each command, so we do not have to 417 # do it. 418 self._host.run('%(cmd)s <%(null)s >>%(file)s 2>&1 &' 419 % {'cmd': safe_cmd, 420 'file': safe_file, 421 'null': os.devnull}) 422 423 def add_active_processes(self, count): 424 self._drone.active_processes += count 425 426 427def _spawn(path, argv, output_file): 428 """Spawn a new process in its own session. 429 430 path must be an absolute path. The first item in argv should be 431 path. 432 433 In the calling process, this function returns on success. 434 The forked process puts itself in its own session and execs. 435 436 The new process will have stdin opened to /dev/null and stdout, 437 stderr opened to output_file. 438 """ 439 logger.info('Spawning %r, %r, %r', path, argv, output_file) 440 assert all(isinstance(arg, basestring) for arg in argv) 441 pid = os.fork() 442 if pid: 443 os.waitpid(pid, 0) 444 return 445 # Double fork to reparent to init since monitor_db does not reap. 446 if os.fork(): 447 os._exit(os.EX_OK) 448 os.setsid() 449 null_fd = os.open(os.devnull, os.O_RDONLY) 450 os.dup2(null_fd, 0) 451 os.close(null_fd) 452 out_fd = os.open(output_file, os.O_WRONLY | os.O_APPEND | os.O_CREAT) 453 os.dup2(out_fd, 1) 454 os.dup2(out_fd, 2) 455 os.close(out_fd) 456 os.execv(path, argv) 457