• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.sync
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.selects.*
11 import kotlin.contracts.*
12 import kotlin.jvm.*
13 
14 /**
15  * Mutual exclusion for coroutines.
16  *
17  * Mutex has two states: _locked_ and _unlocked_.
18  * It is **non-reentrant**, that is invoking [lock] even from the same thread/coroutine that currently holds
19  * the lock still suspends the invoker.
20  *
21  * JVM API note:
22  * Memory semantic of the [Mutex] is similar to `synchronized` block on JVM:
23  * An unlock operation on a [Mutex] happens-before every subsequent successful lock on that [Mutex].
24  * Unsuccessful call to [tryLock] do not have any memory effects.
25  */
26 public interface Mutex {
27     /**
28      * Returns `true` if this mutex is locked.
29      */
30     public val isLocked: Boolean
31 
32     /**
33      * Tries to lock this mutex, returning `false` if this mutex is already locked.
34      *
35      * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
36      * released at the end of your critical section, and [unlock] is never invoked before a successful
37      * lock acquisition.
38      *
39      * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
40      *        is already locked with the same token (same identity), this function throws [IllegalStateException].
41      */
42     public fun tryLock(owner: Any? = null): Boolean
43 
44     /**
45      * Locks this mutex, suspending caller while the mutex is locked.
46      *
47      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
48      * function is suspended, this function immediately resumes with [CancellationException].
49      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
50      * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
51      * This function releases the lock if it was already acquired by this function before the [CancellationException]
52      * was thrown.
53      *
54      * Note that this function does not check for cancellation when it is not suspended.
55      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
56      *
57      * Use [tryLock] to try acquiring the lock without waiting.
58      *
59      * This function is fair; suspended callers are resumed in first-in-first-out order.
60      *
61      * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
62      * released at the end of the critical section, and [unlock] is never invoked before a successful
63      * lock acquisition.
64      *
65      * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
66      *        is already locked with the same token (same identity), this function throws [IllegalStateException].
67      */
68     public suspend fun lock(owner: Any? = null)
69 
70     /**
71      * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
72      * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
73      * the reference to this mutex is passed into the corresponding block.
74      */
75     @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " +
76         "For additional details please refer to #2794") // WARNING since 1.6.0
77     public val onLock: SelectClause2<Any?, Mutex>
78 
79     /**
80      * Checks whether this mutex is locked by the specified owner.
81      *
82      * @return `true` when this mutex is locked by the specified owner;
83      * `false` if the mutex is not locked or locked by another owner.
84      */
85     public fun holdsLock(owner: Any): Boolean
86 
87     /**
88      * Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked or
89      * was locked with a different owner token (by identity).
90      *
91      * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
92      * released at the end of the critical section, and [unlock] is never invoked before a successful
93      * lock acquisition.
94      *
95      * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
96      *        was locked with the different token (by identity), this function throws [IllegalStateException].
97      */
98     public fun unlock(owner: Any? = null)
99 }
100 
101 /**
102  * Creates a [Mutex] instance.
103  * The mutex created is fair: lock is granted in first come, first served order.
104  *
105  * @param locked initial state of the mutex.
106  */
107 @Suppress("FunctionName")
Mutexnull108 public fun Mutex(locked: Boolean = false): Mutex =
109     MutexImpl(locked)
110 
111 /**
112  * Executes the given [action] under this mutex's lock.
113  *
114  * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
115  *        is already locked with the same token (same identity), this function throws [IllegalStateException].
116  *
117  * @return the return value of the action.
118  */
119 @OptIn(ExperimentalContracts::class)
120 public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
121     contract {
122         callsInPlace(action, InvocationKind.EXACTLY_ONCE)
123     }
124 
125     lock(owner)
126     try {
127         return action()
128     } finally {
129         unlock(owner)
130     }
131 }
132 
133 
134 internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1 else 0), Mutex {
135     /**
136      * After the lock is acquired, the corresponding owner is stored in this field.
137      * The [unlock] operation checks the owner and either re-sets it to [NO_OWNER],
138      * if there is no waiting request, or to the owner of the suspended [lock] operation
139      * to be resumed, otherwise.
140      */
141     private val owner = atomic<Any?>(if (locked) null else NO_OWNER)
142 
143     private val onSelectCancellationUnlockConstructor: OnCancellationConstructor =
ownernull144         { _: SelectInstance<*>, owner: Any?, _: Any? ->
145             { unlock(owner) }
146         }
147 
148     override val isLocked: Boolean get() =
149         availablePermits == 0
150 
holdsLocknull151     override fun holdsLock(owner: Any): Boolean = holdsLockImpl(owner) == HOLDS_LOCK_YES
152 
153     /**
154      * [HOLDS_LOCK_UNLOCKED] if the mutex is unlocked
155      * [HOLDS_LOCK_YES] if the mutex is held with the specified [owner]
156      * [HOLDS_LOCK_ANOTHER_OWNER] if the mutex is held with a different owner
157      */
158     private fun holdsLockImpl(owner: Any?): Int {
159         while (true) {
160             // Is this mutex locked?
161             if (!isLocked) return HOLDS_LOCK_UNLOCKED
162             val curOwner = this.owner.value
163             // Wait in a spin-loop until the owner is set
164             if (curOwner === NO_OWNER) continue // <-- ATTENTION, BLOCKING PART HERE
165             // Check the owner
166             return if (curOwner === owner) HOLDS_LOCK_YES else HOLDS_LOCK_ANOTHER_OWNER
167         }
168     }
169 
locknull170     override suspend fun lock(owner: Any?) {
171         if (tryLock(owner)) return
172         lockSuspend(owner)
173     }
174 
lockSuspendnull175     private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> { cont ->
176         val contWithOwner = CancellableContinuationWithOwner(cont, owner)
177         acquire(contWithOwner)
178     }
179 
tryLocknull180     override fun tryLock(owner: Any?): Boolean = when (tryLockImpl(owner)) {
181         TRY_LOCK_SUCCESS -> true
182         TRY_LOCK_FAILED -> false
183         TRY_LOCK_ALREADY_LOCKED_BY_OWNER -> error("This mutex is already locked by the specified owner: $owner")
184         else -> error("unexpected")
185     }
186 
tryLockImplnull187     private fun tryLockImpl(owner: Any?): Int {
188         while (true) {
189             if (tryAcquire()) {
190                 assert { this.owner.value === NO_OWNER }
191                 this.owner.value = owner
192                 return TRY_LOCK_SUCCESS
193             } else {
194                 // The semaphore permit acquisition has failed.
195                 // However, we need to check that this mutex is not
196                 // locked by our owner.
197                 if (owner == null) return TRY_LOCK_FAILED
198                 when (holdsLockImpl(owner)) {
199                     // This mutex is already locked by our owner.
200                     HOLDS_LOCK_YES -> return TRY_LOCK_ALREADY_LOCKED_BY_OWNER
201                     // This mutex is locked by another owner, `trylock(..)` must return `false`.
202                     HOLDS_LOCK_ANOTHER_OWNER -> return TRY_LOCK_FAILED
203                     // This mutex is no longer locked, restart the operation.
204                     HOLDS_LOCK_UNLOCKED -> continue
205                 }
206             }
207         }
208     }
209 
unlocknull210     override fun unlock(owner: Any?) {
211         while (true) {
212             // Is this mutex locked?
213             check(isLocked) { "This mutex is not locked" }
214             // Read the owner, waiting until it is set in a spin-loop if required.
215             val curOwner = this.owner.value
216             if (curOwner === NO_OWNER) continue // <-- ATTENTION, BLOCKING PART HERE
217             // Check the owner.
218             check(curOwner === owner || owner == null) { "This mutex is locked by $curOwner, but $owner is expected" }
219             // Try to clean the owner first. We need to use CAS here to synchronize with concurrent `unlock(..)`-s.
220             if (!this.owner.compareAndSet(curOwner, NO_OWNER)) continue
221             // Release the semaphore permit at the end.
222             release()
223             return
224         }
225     }
226 
227     @Suppress("UNCHECKED_CAST", "OverridingDeprecatedMember", "OVERRIDE_DEPRECATION")
228     override val onLock: SelectClause2<Any?, Mutex> get() = SelectClause2Impl(
229         clauseObject = this,
230         regFunc = MutexImpl::onLockRegFunction as RegistrationFunction,
231         processResFunc = MutexImpl::onLockProcessResult as ProcessResultFunction,
232         onCancellationConstructor = onSelectCancellationUnlockConstructor
233     )
234 
onLockRegFunctionnull235     protected open fun onLockRegFunction(select: SelectInstance<*>, owner: Any?) {
236         if (owner != null && holdsLock(owner)) {
237             select.selectInRegistrationPhase(ON_LOCK_ALREADY_LOCKED_BY_OWNER)
238         } else {
239             onAcquireRegFunction(SelectInstanceWithOwner(select as SelectInstanceInternal<*>, owner), owner)
240         }
241     }
242 
onLockProcessResultnull243     protected open fun onLockProcessResult(owner: Any?, result: Any?): Any? {
244         if (result == ON_LOCK_ALREADY_LOCKED_BY_OWNER) {
245             error("This mutex is already locked by the specified owner: $owner")
246         }
247         return this
248     }
249 
250     private inner class CancellableContinuationWithOwner(
251         @JvmField
252         val cont: CancellableContinuationImpl<Unit>,
253         @JvmField
254         val owner: Any?
<lambda>null255     ) : CancellableContinuation<Unit> by cont, Waiter by cont {
256         override fun tryResume(value: Unit, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? {
257             assert { this@MutexImpl.owner.value === NO_OWNER }
258             val token = cont.tryResume(value, idempotent) {
259                 assert { this@MutexImpl.owner.value.let { it === NO_OWNER ||it === owner } }
260                 this@MutexImpl.owner.value = owner
261                 unlock(owner)
262             }
263             if (token != null) {
264                 assert { this@MutexImpl.owner.value === NO_OWNER }
265                 this@MutexImpl.owner.value = owner
266             }
267             return token
268         }
269 
270         override fun resume(value: Unit, onCancellation: ((cause: Throwable) -> Unit)?) {
271             assert { this@MutexImpl.owner.value === NO_OWNER }
272             this@MutexImpl.owner.value = owner
273             cont.resume(value) { unlock(owner) }
274         }
275     }
276 
277     private inner class SelectInstanceWithOwner<Q>(
278         @JvmField
279         val select: SelectInstanceInternal<Q>,
280         @JvmField
281         val owner: Any?
<lambda>null282     ) : SelectInstanceInternal<Q> by select {
283         override fun trySelect(clauseObject: Any, result: Any?): Boolean {
284             assert { this@MutexImpl.owner.value === NO_OWNER }
285             return select.trySelect(clauseObject, result).also { success ->
286                 if (success) this@MutexImpl.owner.value = owner
287             }
288         }
289 
290         override fun selectInRegistrationPhase(internalResult: Any?) {
291             assert { this@MutexImpl.owner.value === NO_OWNER }
292             this@MutexImpl.owner.value = owner
293             select.selectInRegistrationPhase(internalResult)
294         }
295     }
296 
toStringnull297     override fun toString() = "Mutex@${hexAddress}[isLocked=$isLocked,owner=${owner.value}]"
298 }
299 
300 private val NO_OWNER = Symbol("NO_OWNER")
301 private val ON_LOCK_ALREADY_LOCKED_BY_OWNER = Symbol("ALREADY_LOCKED_BY_OWNER")
302 
303 private const val TRY_LOCK_SUCCESS = 0
304 private const val TRY_LOCK_FAILED = 1
305 private const val TRY_LOCK_ALREADY_LOCKED_BY_OWNER = 2
306 
307 private const val HOLDS_LOCK_UNLOCKED = 0
308 private const val HOLDS_LOCK_YES = 1
309 private const val HOLDS_LOCK_ANOTHER_OWNER = 2
310