• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.sync
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.internal.*
6 import kotlinx.coroutines.selects.*
7 import kotlin.contracts.*
8 import kotlin.coroutines.CoroutineContext
9 import kotlin.jvm.*
10 
11 /**
12  * Mutual exclusion for coroutines.
13  *
14  * Mutex has two states: _locked_ and _unlocked_.
15  * It is **non-reentrant**, that is invoking [lock] even from the same thread/coroutine that currently holds
16  * the lock still suspends the invoker.
17  *
18  * JVM API note:
19  * Memory semantic of the [Mutex] is similar to `synchronized` block on JVM:
20  * An unlock operation on a [Mutex] happens-before every subsequent successful lock on that [Mutex].
21  * Unsuccessful call to [tryLock] do not have any memory effects.
22  */
23 public interface Mutex {
24     /**
25      * Returns `true` if this mutex is locked.
26      */
27     public val isLocked: Boolean
28 
29     /**
30      * Tries to lock this mutex, returning `false` if this mutex is already locked.
31      *
32      * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
33      * released at the end of your critical section, and [unlock] is never invoked before a successful
34      * lock acquisition.
35      *
36      * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
37      *        is already locked with the same token (same identity), this function throws [IllegalStateException].
38      */
39     public fun tryLock(owner: Any? = null): Boolean
40 
41     /**
42      * Locks this mutex, suspending caller until the lock is acquired (in other words, while the lock is held elsewhere).
43      *
44      * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
45      * suspending function is waiting, this function immediately resumes with [CancellationException].
46      * There is a **prompt cancellation guarantee**: even if this function is ready to return the result, but was cancelled
47      * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
48      * This function releases the lock if it was already acquired by this function before the [CancellationException]
49      * was thrown.
50      *
51      * Note that this function does not check for cancellation when it is not suspended.
52      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
53      *
54      * Use [tryLock] to try acquiring the lock without waiting.
55      *
56      * This function is fair; suspended callers are resumed in first-in-first-out order.
57      *
58      * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
59      * released at the end of the critical section, and [unlock] is never invoked before a successful
60      * lock acquisition.
61      *
62      * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
63      *        is already locked with the same token (same identity), this function throws [IllegalStateException].
64      */
65     public suspend fun lock(owner: Any? = null)
66 
67     /**
68      * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
69      * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
70      * the reference to this mutex is passed into the corresponding block.
71      */
72     @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " +
73         "For additional details please refer to #2794") // WARNING since 1.6.0
74     public val onLock: SelectClause2<Any?, Mutex>
75 
76     /**
77      * Checks whether this mutex is locked by the specified owner.
78      *
79      * @return `true` when this mutex is locked by the specified owner;
80      * `false` if the mutex is not locked or locked by another owner.
81      */
82     public fun holdsLock(owner: Any): Boolean
83 
84     /**
85      * Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked or
86      * was locked with a different owner token (by identity).
87      *
88      * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
89      * released at the end of the critical section, and [unlock] is never invoked before a successful
90      * lock acquisition.
91      *
92      * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
93      *        was locked with the different token (by identity), this function throws [IllegalStateException].
94      */
95     public fun unlock(owner: Any? = null)
96 }
97 
98 /**
99  * Creates a [Mutex] instance.
100  * The mutex created is fair: lock is granted in first come, first served order.
101  *
102  * @param locked initial state of the mutex.
103  */
104 @Suppress("FunctionName")
Mutexnull105 public fun Mutex(locked: Boolean = false): Mutex =
106     MutexImpl(locked)
107 
108 /**
109  * Executes the given [action] under this mutex's lock.
110  *
111  * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
112  *        is already locked with the same token (same identity), this function throws [IllegalStateException].
113  *
114  * @return the return value of the action.
115  */
116 @OptIn(ExperimentalContracts::class)
117 public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
118     contract {
119         callsInPlace(action, InvocationKind.EXACTLY_ONCE)
120     }
121     lock(owner)
122     return try {
123         action()
124     } finally {
125         unlock(owner)
126     }
127 }
128 
129 
130 internal open class MutexImpl(locked: Boolean) : SemaphoreAndMutexImpl(1, if (locked) 1 else 0), Mutex {
131     /**
132      * After the lock is acquired, the corresponding owner is stored in this field.
133      * The [unlock] operation checks the owner and either re-sets it to [NO_OWNER],
134      * if there is no waiting request, or to the owner of the suspended [lock] operation
135      * to be resumed, otherwise.
136      */
137     private val owner = atomic<Any?>(if (locked) null else NO_OWNER)
138 
139     private val onSelectCancellationUnlockConstructor: OnCancellationConstructor =
ownernull140         { _: SelectInstance<*>, owner: Any?, _: Any? ->
141             { _, _, _ -> unlock(owner) }
142         }
143 
144     override val isLocked: Boolean get() =
145         availablePermits == 0
146 
holdsLocknull147     override fun holdsLock(owner: Any): Boolean = holdsLockImpl(owner) == HOLDS_LOCK_YES
148 
149     /**
150      * [HOLDS_LOCK_UNLOCKED] if the mutex is unlocked
151      * [HOLDS_LOCK_YES] if the mutex is held with the specified [owner]
152      * [HOLDS_LOCK_ANOTHER_OWNER] if the mutex is held with a different owner
153      */
154     private fun holdsLockImpl(owner: Any?): Int {
155         while (true) {
156             // Is this mutex locked?
157             if (!isLocked) return HOLDS_LOCK_UNLOCKED
158             val curOwner = this.owner.value
159             // Wait in a spin-loop until the owner is set
160             if (curOwner === NO_OWNER) continue // <-- ATTENTION, BLOCKING PART HERE
161             // Check the owner
162             return if (curOwner === owner) HOLDS_LOCK_YES else HOLDS_LOCK_ANOTHER_OWNER
163         }
164     }
165 
locknull166     override suspend fun lock(owner: Any?) {
167         if (tryLock(owner)) return
168         lockSuspend(owner)
169     }
170 
lockSuspendnull171     private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> { cont ->
172         val contWithOwner = CancellableContinuationWithOwner(cont, owner)
173         acquire(contWithOwner)
174     }
175 
tryLocknull176     override fun tryLock(owner: Any?): Boolean = when (tryLockImpl(owner)) {
177         TRY_LOCK_SUCCESS -> true
178         TRY_LOCK_FAILED -> false
179         TRY_LOCK_ALREADY_LOCKED_BY_OWNER -> error("This mutex is already locked by the specified owner: $owner")
180         else -> error("unexpected")
181     }
182 
tryLockImplnull183     private fun tryLockImpl(owner: Any?): Int {
184         while (true) {
185             if (tryAcquire()) {
186                 assert { this.owner.value === NO_OWNER }
187                 this.owner.value = owner
188                 return TRY_LOCK_SUCCESS
189             } else {
190                 // The semaphore permit acquisition has failed.
191                 // However, we need to check that this mutex is not
192                 // locked by our owner.
193                 if (owner == null) return TRY_LOCK_FAILED
194                 when (holdsLockImpl(owner)) {
195                     // This mutex is already locked by our owner.
196                     HOLDS_LOCK_YES -> return TRY_LOCK_ALREADY_LOCKED_BY_OWNER
197                     // This mutex is locked by another owner, `trylock(..)` must return `false`.
198                     HOLDS_LOCK_ANOTHER_OWNER -> return TRY_LOCK_FAILED
199                     // This mutex is no longer locked, restart the operation.
200                     HOLDS_LOCK_UNLOCKED -> continue
201                 }
202             }
203         }
204     }
205 
unlocknull206     override fun unlock(owner: Any?) {
207         while (true) {
208             // Is this mutex locked?
209             check(isLocked) { "This mutex is not locked" }
210             // Read the owner, waiting until it is set in a spin-loop if required.
211             val curOwner = this.owner.value
212             if (curOwner === NO_OWNER) continue // <-- ATTENTION, BLOCKING PART HERE
213             // Check the owner.
214             check(curOwner === owner || owner == null) { "This mutex is locked by $curOwner, but $owner is expected" }
215             // Try to clean the owner first. We need to use CAS here to synchronize with concurrent `unlock(..)`-s.
216             if (!this.owner.compareAndSet(curOwner, NO_OWNER)) continue
217             // Release the semaphore permit at the end.
218             release()
219             return
220         }
221     }
222 
223     @Suppress("UNCHECKED_CAST", "OverridingDeprecatedMember", "OVERRIDE_DEPRECATION")
224     override val onLock: SelectClause2<Any?, Mutex> get() = SelectClause2Impl(
225         clauseObject = this,
226         regFunc = MutexImpl::onLockRegFunction as RegistrationFunction,
227         processResFunc = MutexImpl::onLockProcessResult as ProcessResultFunction,
228         onCancellationConstructor = onSelectCancellationUnlockConstructor
229     )
230 
onLockRegFunctionnull231     protected open fun onLockRegFunction(select: SelectInstance<*>, owner: Any?) {
232         if (owner != null && holdsLock(owner)) {
233             select.selectInRegistrationPhase(ON_LOCK_ALREADY_LOCKED_BY_OWNER)
234         } else {
235             onAcquireRegFunction(SelectInstanceWithOwner(select as SelectInstanceInternal<*>, owner), owner)
236         }
237     }
238 
onLockProcessResultnull239     protected open fun onLockProcessResult(owner: Any?, result: Any?): Any? {
240         if (result == ON_LOCK_ALREADY_LOCKED_BY_OWNER) {
241             error("This mutex is already locked by the specified owner: $owner")
242         }
243         return this
244     }
245 
246     @OptIn(InternalForInheritanceCoroutinesApi::class)
247     private inner class CancellableContinuationWithOwner(
248         @JvmField
249         val cont: CancellableContinuationImpl<Unit>,
250         @JvmField
251         val owner: Any?
<lambda>null252     ) : CancellableContinuation<Unit> by cont, Waiter by cont {
253         override fun <R : Unit> tryResume(
254             value: R,
255             idempotent: Any?,
256             onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
257         ): Any? {
258             assert { this@MutexImpl.owner.value === NO_OWNER }
259             val token = cont.tryResume(value, idempotent) { _, _, _ ->
260                 assert { this@MutexImpl.owner.value.let { it === NO_OWNER || it === owner } }
261                 this@MutexImpl.owner.value = owner
262                 unlock(owner)
263             }
264             if (token != null) {
265                 assert { this@MutexImpl.owner.value === NO_OWNER }
266                 this@MutexImpl.owner.value = owner
267             }
268             return token
269         }
270 
271         override fun <R : Unit> resume(
272             value: R,
273             onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
274         ) {
275             assert { this@MutexImpl.owner.value === NO_OWNER }
276             this@MutexImpl.owner.value = owner
277             cont.resume(value) { unlock(owner) }
278         }
279     }
280 
281     private inner class SelectInstanceWithOwner<Q>(
282         @JvmField
283         val select: SelectInstanceInternal<Q>,
284         @JvmField
285         val owner: Any?
<lambda>null286     ) : SelectInstanceInternal<Q> by select {
287         override fun trySelect(clauseObject: Any, result: Any?): Boolean {
288             assert { this@MutexImpl.owner.value === NO_OWNER }
289             return select.trySelect(clauseObject, result).also { success ->
290                 if (success) this@MutexImpl.owner.value = owner
291             }
292         }
293 
294         override fun selectInRegistrationPhase(internalResult: Any?) {
295             assert { this@MutexImpl.owner.value === NO_OWNER }
296             this@MutexImpl.owner.value = owner
297             select.selectInRegistrationPhase(internalResult)
298         }
299     }
300 
toStringnull301     override fun toString() = "Mutex@${hexAddress}[isLocked=$isLocked,owner=${owner.value}]"
302 }
303 
304 private val NO_OWNER = Symbol("NO_OWNER")
305 private val ON_LOCK_ALREADY_LOCKED_BY_OWNER = Symbol("ALREADY_LOCKED_BY_OWNER")
306 
307 private const val TRY_LOCK_SUCCESS = 0
308 private const val TRY_LOCK_FAILED = 1
309 private const val TRY_LOCK_ALREADY_LOCKED_BY_OWNER = 2
310 
311 private const val HOLDS_LOCK_UNLOCKED = 0
312 private const val HOLDS_LOCK_YES = 1
313 private const val HOLDS_LOCK_ANOTHER_OWNER = 2
314