1# Copyright 2017 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Job leasing utilities 6 7Jobs are leased to processes to own and run. A process owning a job 8obtain a job lease. Ongoing ownership of the lease is established using 9an exclusive fcntl lock on the lease file. 10 11If a lease file is older than a few seconds and is not locked, then its 12owning process should be considered crashed. 13""" 14 15from __future__ import absolute_import 16from __future__ import division 17from __future__ import print_function 18 19import contextlib 20import errno 21import fcntl 22import logging 23import os 24import socket 25import time 26 27from scandir import scandir 28 29logger = logging.getLogger(__name__) 30 31 32@contextlib.contextmanager 33def obtain_lease(path): 34 """Return a context manager owning a lease file. 35 36 The process that obtains the lease will maintain an exclusive, 37 unlimited fcntl lock on the lock file. 38 """ 39 with open(path, 'w') as f: 40 fcntl.lockf(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) 41 try: 42 yield path 43 finally: 44 os.unlink(path) 45 46 47def leases_iter(jobdir): 48 """Yield Lease instances from jobdir. 49 50 @param jobdir: job lease file directory 51 @returns: iterator of Leases 52 """ 53 for entry in scandir(jobdir): 54 if _is_lease_entry(entry): 55 yield Lease(entry) 56 57 58class Lease(object): 59 "Represents a job lease." 60 61 # Seconds after a lease file's mtime where its owning process is not 62 # considered dead. 63 _FRESH_LIMIT = 5 64 65 def __init__(self, entry): 66 """Initialize instance. 67 68 @param entry: scandir.DirEntry instance 69 """ 70 self._entry = entry 71 72 @property 73 def id(self): 74 """Return id of leased job.""" 75 return int(self._entry.name) 76 77 def expired(self): 78 """Return True if the lease is expired. 79 80 A lease is considered expired if there is no fcntl lock on it 81 and the grace period for the owning process to obtain the lock 82 has passed. The lease is not considered expired if the owning 83 process removed the lock file normally, as an expired lease 84 indicates that some error has occurred and clean up operations 85 are needed. 86 """ 87 try: 88 stat_result = self._entry.stat() 89 except OSError as e: # pragma: no cover 90 if e.errno == errno.ENOENT: 91 return False 92 raise 93 mtime = stat_result.st_mtime_ns / (10 ** 9) 94 if time.time() - mtime < self._FRESH_LIMIT: 95 return False 96 return not _fcntl_locked(self._entry.path) 97 98 def cleanup(self): 99 """Remove the lease file. 100 101 This does not need to be called normally, as the owning process 102 should clean up its files. 103 """ 104 try: 105 os.unlink(self._entry.path) 106 except OSError as e: 107 logger.warning('Error removing %s: %s', self._entry.path, e) 108 try: 109 os.unlink(self._sock_path) 110 except OSError as e: 111 # This is fine; it means that job_reporter crashed, but 112 # lucifer was able to run its cleanup. 113 logger.debug('Error removing %s: %s', self._sock_path, e) 114 115 def abort(self): 116 """Abort the job. 117 118 This sends a datagram to the abort socket associated with the 119 lease. 120 121 If the socket is closed, either the connect() call or the send() 122 call will raise socket.error with ECONNREFUSED. 123 """ 124 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 125 sock.setblocking(0) 126 logger.debug('Connecting to abort socket %s', self._sock_path) 127 sock.connect(self._sock_path) 128 logger.debug('Sending abort to %s', self._sock_path) 129 # The value sent does not matter. 130 sent = sock.send('abort') 131 # TODO(ayatane): I don't know if it is possible for sent to be 0 132 assert sent > 0 133 134 def maybe_abort(self): 135 """Abort the job, ignoring errors.""" 136 try: 137 self.abort() 138 except socket.error as e: 139 logger.debug('Error aborting socket: %s', e) 140 141 @property 142 def _sock_path(self): 143 """Return the path of the abort socket corresponding to the lease.""" 144 return self._entry.path + ".sock" 145 146 147def _is_lease_entry(entry): 148 """Return True if the DirEntry is for a lease.""" 149 return entry.name.isdigit() 150 151 152def _fcntl_locked(path): 153 """Return True if a file is fcntl locked. 154 155 @param path: path to file 156 """ 157 try: 158 fd = os.open(path, os.O_WRONLY) 159 except (IOError, OSError): 160 return False 161 try: 162 fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 163 except IOError: 164 return True 165 else: 166 return False 167 finally: 168 os.close(fd) 169