# Copyright 2017 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Job leasing utilities See infra/lucifer for the implementation of job leasing. https://chromium.googlesource.com/chromiumos/infra/lucifer Jobs are leased to processes to own and run. A process owning a job obtain a job lease. Ongoing ownership of the lease is established using an exclusive fcntl lock on the lease file. If a lease file is older than a few seconds and is not locked, then its owning process should be considered crashed. """ from __future__ import absolute_import from __future__ import division from __future__ import print_function import contextlib import errno import fcntl import logging import os import socket import time from scandir import scandir logger = logging.getLogger(__name__) @contextlib.contextmanager def obtain_lease(path): """Return a context manager owning a lease file. The process that obtains the lease will maintain an exclusive, unlimited fcntl lock on the lock file. """ with open(path, 'w') as f: fcntl.lockf(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) try: yield path finally: os.unlink(path) def leases_iter(jobdir): """Yield Lease instances from jobdir. @param jobdir: job lease file directory @returns: iterator of Leases """ for entry in scandir(jobdir): if _is_lease_entry(entry): yield Lease(entry) class Lease(object): "Represents a job lease." # Seconds after a lease file's mtime where its owning process is not # considered dead. _FRESH_LIMIT = 5 def __init__(self, entry): """Initialize instance. @param entry: scandir.DirEntry instance """ self._entry = entry @property def id(self): """Return id of leased job.""" return int(self._entry.name) def expired(self): """Return True if the lease is expired. A lease is considered expired if there is no fcntl lock on it and the grace period for the owning process to obtain the lock has passed. The lease is not considered expired if the owning process removed the lock file normally, as an expired lease indicates that some error has occurred and clean up operations are needed. """ try: stat_result = self._entry.stat() except OSError as e: # pragma: no cover if e.errno == errno.ENOENT: return False raise mtime = stat_result.st_mtime_ns / (10 ** 9) if time.time() - mtime < self._FRESH_LIMIT: return False return not _fcntl_locked(self._entry.path) def cleanup(self): """Remove the lease file. This does not need to be called normally, as the owning process should clean up its files. """ try: os.unlink(self._entry.path) except OSError as e: logger.warning('Error removing %s: %s', self._entry.path, e) try: os.unlink(self._sock_path) except OSError as e: # This is fine; it means that job_reporter crashed, but # lucifer_run_job was able to run its cleanup. logger.debug('Error removing %s: %s', self._sock_path, e) def abort(self): """Abort the job. This sends a datagram to the abort socket associated with the lease. If the socket is closed, either the connect() call or the send() call will raise socket.error with ECONNREFUSED. """ sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) logger.debug('Connecting to abort socket %s', self._sock_path) sock.connect(self._sock_path) logger.debug('Sending abort to %s', self._sock_path) # The value sent does not matter. sent = sock.send('abort') # TODO(ayatane): I don't know if it is possible for sent to be 0 assert sent > 0 def maybe_abort(self): """Abort the job, ignoring errors.""" try: self.abort() except socket.error as e: logger.debug('Error aborting socket: %s', e) @property def _sock_path(self): """Return the path of the abort socket corresponding to the lease.""" return self._entry.path + ".sock" def _is_lease_entry(entry): """Return True if the DirEntry is for a lease.""" return entry.name.isdigit() def _fcntl_locked(path): """Return True if a file is fcntl locked. @param path: path to file """ fd = os.open(path, os.O_WRONLY) try: fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: return True else: return False finally: os.close(fd)