1# -*- coding: utf-8 -*- 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Basic locking functionality.""" 7 8from __future__ import print_function 9 10import contextlib 11import os 12import errno 13import fcntl 14import stat 15import tempfile 16 17from autotest_lib.utils.frozen_chromite.lib import cros_build_lib 18from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging 19from autotest_lib.utils.frozen_chromite.lib import osutils 20from autotest_lib.utils.frozen_chromite.lib import retry_util 21from autotest_lib.utils.frozen_chromite.lib import timeout_util 22 23 24LOCKF = 'lockf' 25FLOCK = 'flock' 26 27 28class LockNotAcquiredError(Exception): 29 """Signals that the lock was not acquired.""" 30 31 32class LockingError(Exception): 33 """Signals miscellaneous problems in the locking process.""" 34 35 36@contextlib.contextmanager 37def _optional_timer_context(timeout): 38 """Use the timeout_util.Timeout contextmanager if timeout is set.""" 39 if timeout: 40 with timeout_util.Timeout(timeout): 41 yield 42 else: 43 yield 44 45 46class _Lock(cros_build_lib.MasterPidContextManager): 47 """Base lockf based locking. Derivatives need to override _GetFd""" 48 49 def __init__(self, description=None, verbose=True, locktype=LOCKF, 50 blocking=True, blocking_timeout=None): 51 """Initialize this instance. 52 53 Two types of locks are available: LOCKF and FLOCK. 54 55 Use LOCKF (POSIX locks) if: 56 - you need to lock a file between processes created by the 57 parallel/multiprocess libraries 58 59 Use FLOCK (BSD locks) if these scenarios apply: 60 - you need to lock a file between shell scripts running the flock program 61 - you need the lock to be bound to the fd and thus inheritable across 62 execs 63 64 Note: These two locks are completely independent; using one on a path will 65 not block using the other on the same path. 66 67 Args: 68 path: On disk pathway to lock. Can be a directory or a file. 69 description: A description for this lock- what is it protecting? 70 verbose: Verbose logging? 71 locktype: Type of lock to use (lockf or flock). 72 blocking: If True, use a blocking lock. 73 blocking_timeout: If not None, time is seconds to wait on blocking calls. 74 """ 75 cros_build_lib.MasterPidContextManager.__init__(self) 76 self._verbose = verbose 77 self.description = description 78 self._fd = None 79 self.locking_mechanism = fcntl.flock if locktype == FLOCK else fcntl.lockf 80 # Store (to log) the locktype string. 81 self.locktype = locktype 82 self.blocking = blocking 83 self.blocking_timeout = blocking_timeout 84 85 @property 86 def fd(self): 87 if self._fd is None: 88 self._fd = self._GetFd() 89 # Ensure that all derivatives of this lock can't bleed the fd 90 # across execs. 91 fcntl.fcntl(self._fd, fcntl.F_SETFD, 92 fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC) 93 return self._fd 94 95 def _GetFd(self): 96 raise NotImplementedError(self, '_GetFd') 97 98 def _enforce_lock(self, flags, message): 99 # Try nonblocking first, if it fails, display the context/message, 100 # and then wait on the lock. 101 try: 102 self.locking_mechanism(self.fd, flags|fcntl.LOCK_NB) 103 return 104 except EnvironmentError as e: 105 if e.errno == errno.EDEADLK: 106 self.unlock() 107 elif e.errno != errno.EAGAIN: 108 raise 109 if self.description: 110 message = '%s: blocking (LOCK_NB) (%s) while %s' % (self.description, 111 self.locktype, 112 message) 113 if not self.blocking: 114 self.close() 115 raise LockNotAcquiredError(message) 116 if self._verbose: 117 logging.info(message) 118 119 try: 120 with _optional_timer_context(self.blocking_timeout): 121 self.locking_mechanism(self.fd, flags) 122 except timeout_util.TimeoutError: 123 description = self.description or 'locking._enforce_lock' 124 logging.error( 125 'Timed out after waiting %d seconds for blocking lock (%s): %s', 126 self.blocking_timeout, self.locktype, description) 127 raise 128 except EnvironmentError as e: 129 if e.errno != errno.EDEADLK: 130 message = ('%s: blocking wait failed errno %s' 131 % (self.description, e)) 132 raise 133 self.unlock() 134 self.locking_mechanism(self.fd, flags) 135 logging.debug('%s: lock has been acquired (%s), continuing.', 136 self.description, self.locktype) 137 138 def lock(self, shared=False): 139 """Take a lock of type |shared|. 140 141 Any existing lock will be updated if need be. 142 143 Args: 144 shared: If True make the lock shared. 145 146 Returns: 147 self, allowing it to be used as a `with` target. 148 149 Raises: 150 IOError if the operation fails in some way. 151 LockNotAcquiredError if the lock couldn't be acquired (non-blocking 152 mode only). 153 """ 154 self._enforce_lock( 155 fcntl.LOCK_SH if shared else fcntl.LOCK_EX, 156 'taking a %s lock' % ('shared' if shared else 'exclusive')) 157 return self 158 159 def read_lock(self, message='taking read lock'): 160 """Take a read lock (shared), downgrading from write if required. 161 162 Args: 163 message: A description of what/why this lock is being taken. 164 165 Returns: 166 self, allowing it to be used as a `with` target. 167 168 Raises: 169 IOError if the operation fails in some way. 170 """ 171 self._enforce_lock(fcntl.LOCK_SH, message) 172 return self 173 174 def write_lock(self, message='taking write lock'): 175 """Take a write lock (exclusive), upgrading from read if required. 176 177 Note that if the lock state is being upgraded from read to write, 178 a deadlock potential exists- as such we *will* release the lock 179 to work around it. Any consuming code should not assume that 180 transitioning from shared to exclusive means no one else has 181 gotten at the critical resource in between for this reason. 182 183 Args: 184 message: A description of what/why this lock is being taken. 185 186 Returns: 187 self, allowing it to be used as a `with` target. 188 189 Raises: 190 IOError if the operation fails in some way. 191 """ 192 self._enforce_lock(fcntl.LOCK_EX, message) 193 return self 194 195 def unlock(self): 196 """Release any locks held. Noop if no locks are held. 197 198 Raises: 199 IOError if the operation fails in some way. 200 """ 201 if self._fd is not None: 202 logging.debug('%s: lock is being released (%s).', 203 self.description, self.locktype) 204 self.locking_mechanism(self._fd, fcntl.LOCK_UN) 205 206 def __del__(self): 207 # TODO(ferringb): Convert this to snakeoil.weakref.WeakRefFinalizer 208 # if/when that rebasing occurs. 209 self.close() 210 211 def close(self): 212 """Release the underlying lock and close the fd.""" 213 if self._fd is not None: 214 self.unlock() 215 os.close(self._fd) 216 self._fd = None 217 218 def _enter(self): 219 # Force the fd to be opened via touching the property. 220 # We do this to ensure that even if entering a context w/out a lock 221 # held, we can do locking in that critical section if the code requests it. 222 # pylint: disable=pointless-statement 223 self.fd 224 return self 225 226 def _exit(self, exc_type, exc, exc_tb): 227 try: 228 self.unlock() 229 finally: 230 self.close() 231 232 def IsLocked(self): 233 """Return True if the lock is grabbed.""" 234 return bool(self._fd) 235 236 237class FileLock(_Lock): 238 """Use a specified file as a locking mechanism.""" 239 240 def __init__(self, path, description=None, verbose=True, 241 locktype=LOCKF, world_writable=False, blocking=True, 242 blocking_timeout=None): 243 """Initializer for FileLock. 244 245 Args: 246 path: On disk pathway to lock. Can be a directory or a file. 247 description: A description for this lock- what is it protecting? 248 verbose: Verbose logging? 249 locktype: Type of lock to use (lockf or flock). 250 world_writable: If true, the lock file will be created as root and be made 251 writable to all users. 252 blocking: If True, use a blocking lock. 253 blocking_timeout: If not None, time is seconds to wait on blocking calls. 254 """ 255 if description is None: 256 description = 'lock %s' % (path,) 257 _Lock.__init__(self, description=description, verbose=verbose, 258 locktype=locktype, blocking=blocking, 259 blocking_timeout=blocking_timeout) 260 self.path = os.path.abspath(path) 261 self.world_writable = world_writable 262 263 def _GetFd(self): 264 if self.world_writable: 265 create = True 266 try: 267 create = stat.S_IMODE(os.stat(self.path).st_mode) != 0o666 268 except OSError as e: 269 if e.errno != errno.ENOENT: 270 raise 271 if create: 272 osutils.SafeMakedirs(os.path.dirname(self.path), sudo=True) 273 cros_build_lib.sudo_run(['touch', self.path], print_cmd=False) 274 cros_build_lib.sudo_run(['chmod', '666', self.path], print_cmd=False) 275 276 # If we're on py3.4 and this attribute is exposed, use it to close 277 # the threading race between open and fcntl setting; this is 278 # extremely paranoid code, but might as well. 279 cloexec = getattr(os, 'O_CLOEXEC', 0) 280 # There exist race conditions where the lock may be created by 281 # root, thus denying subsequent accesses from others. To prevent 282 # this, we create the lock with mode 0o666. 283 try: 284 value = os.umask(000) 285 fd = os.open(self.path, os.W_OK|os.O_CREAT|cloexec, 0o666) 286 finally: 287 os.umask(value) 288 return fd 289 290 291class ProcessLock(_Lock): 292 """Process level locking visible to parent/child only. 293 294 This lock is basically a more robust version of what 295 multiprocessing.Lock does. That implementation uses semaphores 296 internally which require cleanup/deallocation code to run to release 297 the lock; a SIGKILL hitting the process holding the lock violates those 298 assumptions leading to a stuck lock. 299 300 Thus this implementation is based around locking of a deleted tempfile; 301 lockf locks are guranteed to be released once the process/fd is closed. 302 """ 303 304 def _GetFd(self): 305 with tempfile.TemporaryFile() as f: 306 # We don't want to hold onto the object indefinitely; we just want 307 # the fd to a temporary inode, preferably one that isn't vfs accessible. 308 # Since TemporaryFile closes the fd once the object is GC'd, we just 309 # dupe the fd so we retain a copy, while the original TemporaryFile 310 # goes away. 311 return os.dup(f.fileno()) 312 313 314class PortableLinkLock(object): 315 """A more primitive lock that relies on the atomicity of creating hardlinks. 316 317 Use this lock if you need to be compatible with shadow utils like groupadd 318 or useradd. 319 """ 320 321 def __init__(self, path, max_retry=0, sleep=1): 322 """Construct an instance. 323 324 Args: 325 path: path to file to lock on. Multiple processes attempting to lock the 326 same path will compete for a system wide lock. 327 max_retry: maximum number of times to attempt to acquire the lock. 328 sleep: See retry_util.GenericRetry's sleep parameter. 329 """ 330 self._path = path 331 self._target_path = None 332 # These two poorly named variables are just passed straight through to 333 # retry_util.RetryException. 334 self._max_retry = max_retry 335 self._sleep = sleep 336 337 def __enter__(self): 338 fd, self._target_path = tempfile.mkstemp( 339 prefix=self._path + '.chromite.portablelock.') 340 os.close(fd) 341 try: 342 retry_util.RetryException(OSError, self._max_retry, 343 os.link, self._target_path, self._path, 344 sleep=self._sleep) 345 except OSError: 346 raise LockNotAcquiredError('Timeout while trying to lock %s' % self._path) 347 finally: 348 osutils.SafeUnlink(self._target_path) 349 350 return self 351 352 def __exit__(self, exc_type, exc_val, exc_tb): 353 try: 354 if self._target_path: 355 osutils.SafeUnlink(self._target_path) 356 finally: 357 osutils.SafeUnlink(self._path) 358 359 360class PipeLock(object): 361 """A simple one-way lock based on pipe(). 362 363 This is used when code is calling os.fork() directly and needs to synchronize 364 behavior between the two. The same process should not try to use Wait/Post 365 as it will just see its own results. If you need bidirection locks, you'll 366 need to create two yourself. 367 368 Be sure to delete the lock when you're done to prevent fd leakage. 369 """ 370 371 def __init__(self): 372 # TODO(vapier): Simplify this when we're Python 3 only. 373 # pylint: disable=using-constant-test 374 pipe2 = getattr(os, 'pipe2', None) 375 if pipe2: 376 cloexec = getattr(os, 'O_CLOEXEC', 0) 377 # Pylint-1.7 is unable to handle this conditional logic. 378 # pylint: disable=not-callable 379 pipes = pipe2(cloexec) 380 else: 381 pipes = os.pipe() 382 self.read_fd, self.write_fd = pipes 383 384 def Wait(self, size=1): 385 """Read |size| bytes from the pipe. 386 387 Args: 388 size: How many bytes to read. It must match the length of |data| passed 389 by the other end during its call to Post. 390 391 Returns: 392 The data read back. 393 """ 394 return os.read(self.read_fd, size) 395 396 def Post(self, data=b'!'): 397 """Write |data| to the pipe. 398 399 Args: 400 data: The data to send to the other side calling Wait. It must be of the 401 exact length that is passed to Wait. 402 """ 403 os.write(self.write_fd, data) 404 405 def __del__(self): 406 os.close(self.read_fd) 407 os.close(self.write_fd) 408