• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Basic locking functionality."""
7
8from __future__ import print_function
9
10import contextlib
11import os
12import errno
13import fcntl
14import stat
15import tempfile
16
17from autotest_lib.utils.frozen_chromite.lib import cros_build_lib
18from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging
19from autotest_lib.utils.frozen_chromite.lib import osutils
20from autotest_lib.utils.frozen_chromite.lib import retry_util
21from autotest_lib.utils.frozen_chromite.lib import timeout_util
22
23
24LOCKF = 'lockf'
25FLOCK = 'flock'
26
27
28class LockNotAcquiredError(Exception):
29  """Signals that the lock was not acquired."""
30
31
32class LockingError(Exception):
33  """Signals miscellaneous problems in the locking process."""
34
35
36@contextlib.contextmanager
37def _optional_timer_context(timeout):
38  """Use the timeout_util.Timeout contextmanager if timeout is set."""
39  if timeout:
40    with timeout_util.Timeout(timeout):
41      yield
42  else:
43    yield
44
45
46class _Lock(cros_build_lib.MasterPidContextManager):
47  """Base lockf based locking.  Derivatives need to override _GetFd"""
48
49  def __init__(self, description=None, verbose=True, locktype=LOCKF,
50               blocking=True, blocking_timeout=None):
51    """Initialize this instance.
52
53    Two types of locks are available: LOCKF and FLOCK.
54
55    Use LOCKF (POSIX locks) if:
56      - you need to lock a file between processes created by the
57        parallel/multiprocess libraries
58
59    Use FLOCK (BSD locks) if these scenarios apply:
60      - you need to lock a file between shell scripts running the flock program
61      - you need the lock to be bound to the fd and thus inheritable across
62        execs
63
64    Note: These two locks are completely independent; using one on a path will
65          not block using the other on the same path.
66
67    Args:
68      path: On disk pathway to lock.  Can be a directory or a file.
69      description: A description for this lock- what is it protecting?
70      verbose: Verbose logging?
71      locktype: Type of lock to use (lockf or flock).
72      blocking: If True, use a blocking lock.
73      blocking_timeout: If not None, time is seconds to wait on blocking calls.
74    """
75    cros_build_lib.MasterPidContextManager.__init__(self)
76    self._verbose = verbose
77    self.description = description
78    self._fd = None
79    self.locking_mechanism = fcntl.flock if locktype == FLOCK else fcntl.lockf
80    # Store (to log) the locktype string.
81    self.locktype = locktype
82    self.blocking = blocking
83    self.blocking_timeout = blocking_timeout
84
85  @property
86  def fd(self):
87    if self._fd is None:
88      self._fd = self._GetFd()
89      # Ensure that all derivatives of this lock can't bleed the fd
90      # across execs.
91      fcntl.fcntl(self._fd, fcntl.F_SETFD,
92                  fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
93    return self._fd
94
95  def _GetFd(self):
96    raise NotImplementedError(self, '_GetFd')
97
98  def _enforce_lock(self, flags, message):
99    # Try nonblocking first, if it fails, display the context/message,
100    # and then wait on the lock.
101    try:
102      self.locking_mechanism(self.fd, flags|fcntl.LOCK_NB)
103      return
104    except EnvironmentError as e:
105      if e.errno == errno.EDEADLK:
106        self.unlock()
107      elif e.errno != errno.EAGAIN:
108        raise
109    if self.description:
110      message = '%s: blocking (LOCK_NB) (%s) while %s' % (self.description,
111                                                          self.locktype,
112                                                          message)
113    if not self.blocking:
114      self.close()
115      raise LockNotAcquiredError(message)
116    if self._verbose:
117      logging.info(message)
118
119    try:
120      with _optional_timer_context(self.blocking_timeout):
121        self.locking_mechanism(self.fd, flags)
122    except timeout_util.TimeoutError:
123      description = self.description or 'locking._enforce_lock'
124      logging.error(
125          'Timed out after waiting %d seconds for blocking lock (%s): %s',
126          self.blocking_timeout, self.locktype, description)
127      raise
128    except EnvironmentError as e:
129      if e.errno != errno.EDEADLK:
130        message = ('%s: blocking wait failed errno %s'
131                   % (self.description, e))
132        raise
133      self.unlock()
134      self.locking_mechanism(self.fd, flags)
135    logging.debug('%s: lock has been acquired (%s), continuing.',
136                  self.description, self.locktype)
137
138  def lock(self, shared=False):
139    """Take a lock of type |shared|.
140
141    Any existing lock will be updated if need be.
142
143    Args:
144      shared: If True make the lock shared.
145
146    Returns:
147      self, allowing it to be used as a `with` target.
148
149    Raises:
150      IOError if the operation fails in some way.
151      LockNotAcquiredError if the lock couldn't be acquired (non-blocking
152        mode only).
153    """
154    self._enforce_lock(
155        fcntl.LOCK_SH if shared else fcntl.LOCK_EX,
156        'taking a %s lock' % ('shared' if shared else 'exclusive'))
157    return self
158
159  def read_lock(self, message='taking read lock'):
160    """Take a read lock (shared), downgrading from write if required.
161
162    Args:
163      message: A description of what/why this lock is being taken.
164
165    Returns:
166      self, allowing it to be used as a `with` target.
167
168    Raises:
169      IOError if the operation fails in some way.
170    """
171    self._enforce_lock(fcntl.LOCK_SH, message)
172    return self
173
174  def write_lock(self, message='taking write lock'):
175    """Take a write lock (exclusive), upgrading from read if required.
176
177    Note that if the lock state is being upgraded from read to write,
178    a deadlock potential exists- as such we *will* release the lock
179    to work around it.  Any consuming code should not assume that
180    transitioning from shared to exclusive means no one else has
181    gotten at the critical resource in between for this reason.
182
183    Args:
184      message: A description of what/why this lock is being taken.
185
186    Returns:
187      self, allowing it to be used as a `with` target.
188
189    Raises:
190      IOError if the operation fails in some way.
191    """
192    self._enforce_lock(fcntl.LOCK_EX, message)
193    return self
194
195  def unlock(self):
196    """Release any locks held.  Noop if no locks are held.
197
198    Raises:
199      IOError if the operation fails in some way.
200    """
201    if self._fd is not None:
202      logging.debug('%s: lock is being released (%s).',
203                    self.description, self.locktype)
204      self.locking_mechanism(self._fd, fcntl.LOCK_UN)
205
206  def __del__(self):
207    # TODO(ferringb): Convert this to snakeoil.weakref.WeakRefFinalizer
208    # if/when that rebasing occurs.
209    self.close()
210
211  def close(self):
212    """Release the underlying lock and close the fd."""
213    if self._fd is not None:
214      self.unlock()
215      os.close(self._fd)
216      self._fd = None
217
218  def _enter(self):
219    # Force the fd to be opened via touching the property.
220    # We do this to ensure that even if entering a context w/out a lock
221    # held, we can do locking in that critical section if the code requests it.
222    # pylint: disable=pointless-statement
223    self.fd
224    return self
225
226  def _exit(self, exc_type, exc, exc_tb):
227    try:
228      self.unlock()
229    finally:
230      self.close()
231
232  def IsLocked(self):
233    """Return True if the lock is grabbed."""
234    return bool(self._fd)
235
236
237class FileLock(_Lock):
238  """Use a specified file as a locking mechanism."""
239
240  def __init__(self, path, description=None, verbose=True,
241               locktype=LOCKF, world_writable=False, blocking=True,
242               blocking_timeout=None):
243    """Initializer for FileLock.
244
245    Args:
246      path: On disk pathway to lock.  Can be a directory or a file.
247      description: A description for this lock- what is it protecting?
248      verbose: Verbose logging?
249      locktype: Type of lock to use (lockf or flock).
250      world_writable: If true, the lock file will be created as root and be made
251        writable to all users.
252      blocking: If True, use a blocking lock.
253      blocking_timeout: If not None, time is seconds to wait on blocking calls.
254    """
255    if description is None:
256      description = 'lock %s' % (path,)
257    _Lock.__init__(self, description=description, verbose=verbose,
258                   locktype=locktype, blocking=blocking,
259                   blocking_timeout=blocking_timeout)
260    self.path = os.path.abspath(path)
261    self.world_writable = world_writable
262
263  def _GetFd(self):
264    if self.world_writable:
265      create = True
266      try:
267        create = stat.S_IMODE(os.stat(self.path).st_mode) != 0o666
268      except OSError as e:
269        if e.errno != errno.ENOENT:
270          raise
271      if create:
272        osutils.SafeMakedirs(os.path.dirname(self.path), sudo=True)
273        cros_build_lib.sudo_run(['touch', self.path], print_cmd=False)
274        cros_build_lib.sudo_run(['chmod', '666', self.path], print_cmd=False)
275
276    # If we're on py3.4 and this attribute is exposed, use it to close
277    # the threading race between open and fcntl setting; this is
278    # extremely paranoid code, but might as well.
279    cloexec = getattr(os, 'O_CLOEXEC', 0)
280    # There exist race conditions where the lock may be created by
281    # root, thus denying subsequent accesses from others. To prevent
282    # this, we create the lock with mode 0o666.
283    try:
284      value = os.umask(000)
285      fd = os.open(self.path, os.W_OK|os.O_CREAT|cloexec, 0o666)
286    finally:
287      os.umask(value)
288    return fd
289
290
291class ProcessLock(_Lock):
292  """Process level locking visible to parent/child only.
293
294  This lock is basically a more robust version of what
295  multiprocessing.Lock does.  That implementation uses semaphores
296  internally which require cleanup/deallocation code to run to release
297  the lock; a SIGKILL hitting the process holding the lock violates those
298  assumptions leading to a stuck lock.
299
300  Thus this implementation is based around locking of a deleted tempfile;
301  lockf locks are guranteed to be released once the process/fd is closed.
302  """
303
304  def _GetFd(self):
305    with tempfile.TemporaryFile() as f:
306      # We don't want to hold onto the object indefinitely; we just want
307      # the fd to a temporary inode, preferably one that isn't vfs accessible.
308      # Since TemporaryFile closes the fd once the object is GC'd, we just
309      # dupe the fd so we retain a copy, while the original TemporaryFile
310      # goes away.
311      return os.dup(f.fileno())
312
313
314class PortableLinkLock(object):
315  """A more primitive lock that relies on the atomicity of creating hardlinks.
316
317  Use this lock if you need to be compatible with shadow utils like groupadd
318  or useradd.
319  """
320
321  def __init__(self, path, max_retry=0, sleep=1):
322    """Construct an instance.
323
324    Args:
325      path: path to file to lock on.  Multiple processes attempting to lock the
326        same path will compete for a system wide lock.
327      max_retry: maximum number of times to attempt to acquire the lock.
328      sleep: See retry_util.GenericRetry's sleep parameter.
329    """
330    self._path = path
331    self._target_path = None
332    # These two poorly named variables are just passed straight through to
333    # retry_util.RetryException.
334    self._max_retry = max_retry
335    self._sleep = sleep
336
337  def __enter__(self):
338    fd, self._target_path = tempfile.mkstemp(
339        prefix=self._path + '.chromite.portablelock.')
340    os.close(fd)
341    try:
342      retry_util.RetryException(OSError, self._max_retry,
343                                os.link, self._target_path, self._path,
344                                sleep=self._sleep)
345    except OSError:
346      raise LockNotAcquiredError('Timeout while trying to lock %s' % self._path)
347    finally:
348      osutils.SafeUnlink(self._target_path)
349
350    return self
351
352  def __exit__(self, exc_type, exc_val, exc_tb):
353    try:
354      if self._target_path:
355        osutils.SafeUnlink(self._target_path)
356    finally:
357      osutils.SafeUnlink(self._path)
358
359
360class PipeLock(object):
361  """A simple one-way lock based on pipe().
362
363  This is used when code is calling os.fork() directly and needs to synchronize
364  behavior between the two.  The same process should not try to use Wait/Post
365  as it will just see its own results.  If you need bidirection locks, you'll
366  need to create two yourself.
367
368  Be sure to delete the lock when you're done to prevent fd leakage.
369  """
370
371  def __init__(self):
372    # TODO(vapier): Simplify this when we're Python 3 only.
373    # pylint: disable=using-constant-test
374    pipe2 = getattr(os, 'pipe2', None)
375    if pipe2:
376      cloexec = getattr(os, 'O_CLOEXEC', 0)
377      # Pylint-1.7 is unable to handle this conditional logic.
378      # pylint: disable=not-callable
379      pipes = pipe2(cloexec)
380    else:
381      pipes = os.pipe()
382    self.read_fd, self.write_fd = pipes
383
384  def Wait(self, size=1):
385    """Read |size| bytes from the pipe.
386
387    Args:
388      size: How many bytes to read.  It must match the length of |data| passed
389        by the other end during its call to Post.
390
391    Returns:
392      The data read back.
393    """
394    return os.read(self.read_fd, size)
395
396  def Post(self, data=b'!'):
397    """Write |data| to the pipe.
398
399    Args:
400      data: The data to send to the other side calling Wait.  It must be of the
401        exact length that is passed to Wait.
402    """
403    os.write(self.write_fd, data)
404
405  def __del__(self):
406    os.close(self.read_fd)
407    os.close(self.write_fd)
408