• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Job leasing utilities
6
7See infra/lucifer for the implementation of job leasing.
8
9https://chromium.googlesource.com/chromiumos/infra/lucifer
10
11Jobs are leased to processes to own and run.  A process owning a job
12obtain a job lease.  Ongoing ownership of the lease is established using
13an exclusive fcntl lock on the lease file.
14
15If a lease file is older than a few seconds and is not locked, then its
16owning process should be considered crashed.
17"""
18
19from __future__ import absolute_import
20from __future__ import division
21from __future__ import print_function
22
23import contextlib
24import errno
25import fcntl
26import logging
27import os
28import socket
29import time
30
31from scandir import scandir
32
33logger = logging.getLogger(__name__)
34
35
36@contextlib.contextmanager
37def obtain_lease(path):
38    """Return a context manager owning a lease file.
39
40    The process that obtains the lease will maintain an exclusive,
41    unlimited fcntl lock on the lock file.
42    """
43    with open(path, 'w') as f:
44        fcntl.lockf(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
45        try:
46            yield path
47        finally:
48            os.unlink(path)
49
50
51def leases_iter(jobdir):
52    """Yield Lease instances from jobdir.
53
54    @param jobdir: job lease file directory
55    @returns: iterator of Leases
56    """
57    for entry in scandir(jobdir):
58        if _is_lease_entry(entry):
59            yield Lease(entry)
60
61
62class Lease(object):
63    "Represents a job lease."
64
65    # Seconds after a lease file's mtime where its owning process is not
66    # considered dead.
67    _FRESH_LIMIT = 5
68
69    def __init__(self, entry):
70        """Initialize instance.
71
72        @param entry: scandir.DirEntry instance
73        """
74        self._entry = entry
75
76    @property
77    def id(self):
78        """Return id of leased job."""
79        return int(self._entry.name)
80
81    def expired(self):
82        """Return True if the lease is expired.
83
84        A lease is considered expired if there is no fcntl lock on it
85        and the grace period for the owning process to obtain the lock
86        has passed.  The lease is not considered expired if the owning
87        process removed the lock file normally, as an expired lease
88        indicates that some error has occurred and clean up operations
89        are needed.
90        """
91        try:
92            stat_result = self._entry.stat()
93        except OSError as e:  # pragma: no cover
94            if e.errno == errno.ENOENT:
95                return False
96            raise
97        mtime = stat_result.st_mtime_ns / (10 ** 9)
98        if time.time() - mtime < self._FRESH_LIMIT:
99            return False
100        return not _fcntl_locked(self._entry.path)
101
102    def cleanup(self):
103        """Remove the lease file.
104
105        This does not need to be called normally, as the owning process
106        should clean up its files.
107        """
108        try:
109            os.unlink(self._entry.path)
110        except OSError as e:
111            logger.warning('Error removing %s: %s', self._entry.path, e)
112        try:
113            os.unlink(self._sock_path)
114        except OSError as e:
115            # This is fine; it means that job_reporter crashed, but
116            # lucifer_run_job was able to run its cleanup.
117            logger.debug('Error removing %s: %s', self._sock_path, e)
118
119    def abort(self):
120        """Abort the job.
121
122        This sends a datagram to the abort socket associated with the
123        lease.
124
125        If the socket is closed, either the connect() call or the send()
126        call will raise socket.error with ECONNREFUSED.
127        """
128        sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
129        logger.debug('Connecting to abort socket %s', self._sock_path)
130        sock.connect(self._sock_path)
131        logger.debug('Sending abort to %s', self._sock_path)
132        # The value sent does not matter.
133        sent = sock.send('abort')
134        # TODO(ayatane): I don't know if it is possible for sent to be 0
135        assert sent > 0
136
137    def maybe_abort(self):
138        """Abort the job, ignoring errors."""
139        try:
140            self.abort()
141        except socket.error as e:
142            logger.debug('Error aborting socket: %s', e)
143
144    @property
145    def _sock_path(self):
146        """Return the path of the abort socket corresponding to the lease."""
147        return self._entry.path + ".sock"
148
149
150def _is_lease_entry(entry):
151    """Return True if the DirEntry is for a lease."""
152    return entry.name.isdigit()
153
154
155def _fcntl_locked(path):
156    """Return True if a file is fcntl locked.
157
158    @param path: path to file
159    """
160    fd = os.open(path, os.O_WRONLY)
161    try:
162        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
163    except IOError:
164        return True
165    else:
166        return False
167    finally:
168        os.close(fd)
169