1# Copyright 2017 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Job leasing utilities 6 7See infra/lucifer for the implementation of job leasing. 8 9https://chromium.googlesource.com/chromiumos/infra/lucifer 10 11Jobs are leased to processes to own and run. A process owning a job 12obtain a job lease. Ongoing ownership of the lease is established using 13an exclusive fcntl lock on the lease file. 14 15If a lease file is older than a few seconds and is not locked, then its 16owning process should be considered crashed. 17""" 18 19from __future__ import absolute_import 20from __future__ import division 21from __future__ import print_function 22 23import contextlib 24import errno 25import fcntl 26import logging 27import os 28import socket 29import time 30 31from scandir import scandir 32 33logger = logging.getLogger(__name__) 34 35 36@contextlib.contextmanager 37def obtain_lease(path): 38 """Return a context manager owning a lease file. 39 40 The process that obtains the lease will maintain an exclusive, 41 unlimited fcntl lock on the lock file. 42 """ 43 with open(path, 'w') as f: 44 fcntl.lockf(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) 45 try: 46 yield path 47 finally: 48 os.unlink(path) 49 50 51def leases_iter(jobdir): 52 """Yield Lease instances from jobdir. 53 54 @param jobdir: job lease file directory 55 @returns: iterator of Leases 56 """ 57 for entry in scandir(jobdir): 58 if _is_lease_entry(entry): 59 yield Lease(entry) 60 61 62class Lease(object): 63 "Represents a job lease." 64 65 # Seconds after a lease file's mtime where its owning process is not 66 # considered dead. 67 _FRESH_LIMIT = 5 68 69 def __init__(self, entry): 70 """Initialize instance. 71 72 @param entry: scandir.DirEntry instance 73 """ 74 self._entry = entry 75 76 @property 77 def id(self): 78 """Return id of leased job.""" 79 return int(self._entry.name) 80 81 def expired(self): 82 """Return True if the lease is expired. 83 84 A lease is considered expired if there is no fcntl lock on it 85 and the grace period for the owning process to obtain the lock 86 has passed. The lease is not considered expired if the owning 87 process removed the lock file normally, as an expired lease 88 indicates that some error has occurred and clean up operations 89 are needed. 90 """ 91 try: 92 stat_result = self._entry.stat() 93 except OSError as e: # pragma: no cover 94 if e.errno == errno.ENOENT: 95 return False 96 raise 97 mtime = stat_result.st_mtime_ns / (10 ** 9) 98 if time.time() - mtime < self._FRESH_LIMIT: 99 return False 100 return not _fcntl_locked(self._entry.path) 101 102 def cleanup(self): 103 """Remove the lease file. 104 105 This does not need to be called normally, as the owning process 106 should clean up its files. 107 """ 108 try: 109 os.unlink(self._entry.path) 110 except OSError as e: 111 logger.warning('Error removing %s: %s', self._entry.path, e) 112 try: 113 os.unlink(self._sock_path) 114 except OSError as e: 115 # This is fine; it means that job_reporter crashed, but 116 # lucifer_run_job was able to run its cleanup. 117 logger.debug('Error removing %s: %s', self._sock_path, e) 118 119 def abort(self): 120 """Abort the job. 121 122 This sends a datagram to the abort socket associated with the 123 lease. 124 125 If the socket is closed, either the connect() call or the send() 126 call will raise socket.error with ECONNREFUSED. 127 """ 128 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 129 logger.debug('Connecting to abort socket %s', self._sock_path) 130 sock.connect(self._sock_path) 131 logger.debug('Sending abort to %s', self._sock_path) 132 # The value sent does not matter. 133 sent = sock.send('abort') 134 # TODO(ayatane): I don't know if it is possible for sent to be 0 135 assert sent > 0 136 137 def maybe_abort(self): 138 """Abort the job, ignoring errors.""" 139 try: 140 self.abort() 141 except socket.error as e: 142 logger.debug('Error aborting socket: %s', e) 143 144 @property 145 def _sock_path(self): 146 """Return the path of the abort socket corresponding to the lease.""" 147 return self._entry.path + ".sock" 148 149 150def _is_lease_entry(entry): 151 """Return True if the DirEntry is for a lease.""" 152 return entry.name.isdigit() 153 154 155def _fcntl_locked(path): 156 """Return True if a file is fcntl locked. 157 158 @param path: path to file 159 """ 160 fd = os.open(path, os.O_WRONLY) 161 try: 162 fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 163 except IOError: 164 return True 165 else: 166 return False 167 finally: 168 os.close(fd) 169