1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.sync
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.selects.*
11 import kotlin.contracts.*
12 import kotlin.jvm.*
13
14 /**
15 * Mutual exclusion for coroutines.
16 *
17 * Mutex has two states: _locked_ and _unlocked_.
18 * It is **non-reentrant**, that is invoking [lock] even from the same thread/coroutine that currently holds
19 * the lock still suspends the invoker.
20 *
21 * JVM API note:
22 * Memory semantic of the [Mutex] is similar to `synchronized` block on JVM:
23 * An unlock operation on a [Mutex] happens-before every subsequent successful lock on that [Mutex].
24 * Unsuccessful call to [tryLock] do not have any memory effects.
25 */
26 public interface Mutex {
27 /**
28 * Returns `true` if this mutex is locked.
29 */
30 public val isLocked: Boolean
31
32 /**
33 * Tries to lock this mutex, returning `false` if this mutex is already locked.
34 *
35 * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
36 * released at the end of your critical section, and [unlock] is never invoked before a successful
37 * lock acquisition.
38 *
39 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
40 * is already locked with the same token (same identity), this function throws [IllegalStateException].
41 */
42 public fun tryLock(owner: Any? = null): Boolean
43
44 /**
45 * Locks this mutex, suspending caller while the mutex is locked.
46 *
47 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
48 * function is suspended, this function immediately resumes with [CancellationException].
49 * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
50 * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
51 * This function releases the lock if it was already acquired by this function before the [CancellationException]
52 * was thrown.
53 *
54 * Note that this function does not check for cancellation when it is not suspended.
55 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
56 *
57 * Use [tryLock] to try acquiring the lock without waiting.
58 *
59 * This function is fair; suspended callers are resumed in first-in-first-out order.
60 *
61 * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
62 * released at the end of the critical section, and [unlock] is never invoked before a successful
63 * lock acquisition.
64 *
65 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
66 * is already locked with the same token (same identity), this function throws [IllegalStateException].
67 */
68 public suspend fun lock(owner: Any? = null)
69
70 /**
71 * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
72 * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
73 * the reference to this mutex is passed into the corresponding block.
74 */
75 @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " +
76 "For additional details please refer to #2794") // WARNING since 1.6.0
77 public val onLock: SelectClause2<Any?, Mutex>
78
79 /**
80 * Checks whether this mutex is locked by the specified owner.
81 *
82 * @return `true` when this mutex is locked by the specified owner;
83 * `false` if the mutex is not locked or locked by another owner.
84 */
85 public fun holdsLock(owner: Any): Boolean
86
87 /**
88 * Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked or
89 * was locked with a different owner token (by identity).
90 *
91 * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
92 * released at the end of the critical section, and [unlock] is never invoked before a successful
93 * lock acquisition.
94 *
95 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
96 * was locked with the different token (by identity), this function throws [IllegalStateException].
97 */
98 public fun unlock(owner: Any? = null)
99 }
100
101 /**
102 * Creates a [Mutex] instance.
103 * The mutex created is fair: lock is granted in first come, first served order.
104 *
105 * @param locked initial state of the mutex.
106 */
107 @Suppress("FunctionName")
Mutexnull108 public fun Mutex(locked: Boolean = false): Mutex =
109 MutexImpl(locked)
110
111 /**
112 * Executes the given [action] under this mutex's lock.
113 *
114 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
115 * is already locked with the same token (same identity), this function throws [IllegalStateException].
116 *
117 * @return the return value of the action.
118 */
119 @OptIn(ExperimentalContracts::class)
120 public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
121 contract {
122 callsInPlace(action, InvocationKind.EXACTLY_ONCE)
123 }
124
125 lock(owner)
126 try {
127 return action()
128 } finally {
129 unlock(owner)
130 }
131 }
132
133
134 internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1 else 0), Mutex {
135 /**
136 * After the lock is acquired, the corresponding owner is stored in this field.
137 * The [unlock] operation checks the owner and either re-sets it to [NO_OWNER],
138 * if there is no waiting request, or to the owner of the suspended [lock] operation
139 * to be resumed, otherwise.
140 */
141 private val owner = atomic<Any?>(if (locked) null else NO_OWNER)
142
143 private val onSelectCancellationUnlockConstructor: OnCancellationConstructor =
ownernull144 { _: SelectInstance<*>, owner: Any?, _: Any? ->
145 { unlock(owner) }
146 }
147
148 override val isLocked: Boolean get() =
149 availablePermits == 0
150
holdsLocknull151 override fun holdsLock(owner: Any): Boolean = holdsLockImpl(owner) == HOLDS_LOCK_YES
152
153 /**
154 * [HOLDS_LOCK_UNLOCKED] if the mutex is unlocked
155 * [HOLDS_LOCK_YES] if the mutex is held with the specified [owner]
156 * [HOLDS_LOCK_ANOTHER_OWNER] if the mutex is held with a different owner
157 */
158 private fun holdsLockImpl(owner: Any?): Int {
159 while (true) {
160 // Is this mutex locked?
161 if (!isLocked) return HOLDS_LOCK_UNLOCKED
162 val curOwner = this.owner.value
163 // Wait in a spin-loop until the owner is set
164 if (curOwner === NO_OWNER) continue // <-- ATTENTION, BLOCKING PART HERE
165 // Check the owner
166 return if (curOwner === owner) HOLDS_LOCK_YES else HOLDS_LOCK_ANOTHER_OWNER
167 }
168 }
169
locknull170 override suspend fun lock(owner: Any?) {
171 if (tryLock(owner)) return
172 lockSuspend(owner)
173 }
174
lockSuspendnull175 private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> { cont ->
176 val contWithOwner = CancellableContinuationWithOwner(cont, owner)
177 acquire(contWithOwner)
178 }
179
tryLocknull180 override fun tryLock(owner: Any?): Boolean = when (tryLockImpl(owner)) {
181 TRY_LOCK_SUCCESS -> true
182 TRY_LOCK_FAILED -> false
183 TRY_LOCK_ALREADY_LOCKED_BY_OWNER -> error("This mutex is already locked by the specified owner: $owner")
184 else -> error("unexpected")
185 }
186
tryLockImplnull187 private fun tryLockImpl(owner: Any?): Int {
188 while (true) {
189 if (tryAcquire()) {
190 assert { this.owner.value === NO_OWNER }
191 this.owner.value = owner
192 return TRY_LOCK_SUCCESS
193 } else {
194 // The semaphore permit acquisition has failed.
195 // However, we need to check that this mutex is not
196 // locked by our owner.
197 if (owner == null) return TRY_LOCK_FAILED
198 when (holdsLockImpl(owner)) {
199 // This mutex is already locked by our owner.
200 HOLDS_LOCK_YES -> return TRY_LOCK_ALREADY_LOCKED_BY_OWNER
201 // This mutex is locked by another owner, `trylock(..)` must return `false`.
202 HOLDS_LOCK_ANOTHER_OWNER -> return TRY_LOCK_FAILED
203 // This mutex is no longer locked, restart the operation.
204 HOLDS_LOCK_UNLOCKED -> continue
205 }
206 }
207 }
208 }
209
unlocknull210 override fun unlock(owner: Any?) {
211 while (true) {
212 // Is this mutex locked?
213 check(isLocked) { "This mutex is not locked" }
214 // Read the owner, waiting until it is set in a spin-loop if required.
215 val curOwner = this.owner.value
216 if (curOwner === NO_OWNER) continue // <-- ATTENTION, BLOCKING PART HERE
217 // Check the owner.
218 check(curOwner === owner || owner == null) { "This mutex is locked by $curOwner, but $owner is expected" }
219 // Try to clean the owner first. We need to use CAS here to synchronize with concurrent `unlock(..)`-s.
220 if (!this.owner.compareAndSet(curOwner, NO_OWNER)) continue
221 // Release the semaphore permit at the end.
222 release()
223 return
224 }
225 }
226
227 @Suppress("UNCHECKED_CAST", "OverridingDeprecatedMember", "OVERRIDE_DEPRECATION")
228 override val onLock: SelectClause2<Any?, Mutex> get() = SelectClause2Impl(
229 clauseObject = this,
230 regFunc = MutexImpl::onLockRegFunction as RegistrationFunction,
231 processResFunc = MutexImpl::onLockProcessResult as ProcessResultFunction,
232 onCancellationConstructor = onSelectCancellationUnlockConstructor
233 )
234
onLockRegFunctionnull235 protected open fun onLockRegFunction(select: SelectInstance<*>, owner: Any?) {
236 if (owner != null && holdsLock(owner)) {
237 select.selectInRegistrationPhase(ON_LOCK_ALREADY_LOCKED_BY_OWNER)
238 } else {
239 onAcquireRegFunction(SelectInstanceWithOwner(select as SelectInstanceInternal<*>, owner), owner)
240 }
241 }
242
onLockProcessResultnull243 protected open fun onLockProcessResult(owner: Any?, result: Any?): Any? {
244 if (result == ON_LOCK_ALREADY_LOCKED_BY_OWNER) {
245 error("This mutex is already locked by the specified owner: $owner")
246 }
247 return this
248 }
249
250 private inner class CancellableContinuationWithOwner(
251 @JvmField
252 val cont: CancellableContinuationImpl<Unit>,
253 @JvmField
254 val owner: Any?
<lambda>null255 ) : CancellableContinuation<Unit> by cont, Waiter by cont {
256 override fun tryResume(value: Unit, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? {
257 assert { this@MutexImpl.owner.value === NO_OWNER }
258 val token = cont.tryResume(value, idempotent) {
259 assert { this@MutexImpl.owner.value.let { it === NO_OWNER ||it === owner } }
260 this@MutexImpl.owner.value = owner
261 unlock(owner)
262 }
263 if (token != null) {
264 assert { this@MutexImpl.owner.value === NO_OWNER }
265 this@MutexImpl.owner.value = owner
266 }
267 return token
268 }
269
270 override fun resume(value: Unit, onCancellation: ((cause: Throwable) -> Unit)?) {
271 assert { this@MutexImpl.owner.value === NO_OWNER }
272 this@MutexImpl.owner.value = owner
273 cont.resume(value) { unlock(owner) }
274 }
275 }
276
277 private inner class SelectInstanceWithOwner<Q>(
278 @JvmField
279 val select: SelectInstanceInternal<Q>,
280 @JvmField
281 val owner: Any?
<lambda>null282 ) : SelectInstanceInternal<Q> by select {
283 override fun trySelect(clauseObject: Any, result: Any?): Boolean {
284 assert { this@MutexImpl.owner.value === NO_OWNER }
285 return select.trySelect(clauseObject, result).also { success ->
286 if (success) this@MutexImpl.owner.value = owner
287 }
288 }
289
290 override fun selectInRegistrationPhase(internalResult: Any?) {
291 assert { this@MutexImpl.owner.value === NO_OWNER }
292 this@MutexImpl.owner.value = owner
293 select.selectInRegistrationPhase(internalResult)
294 }
295 }
296
toStringnull297 override fun toString() = "Mutex@${hexAddress}[isLocked=$isLocked,owner=${owner.value}]"
298 }
299
300 private val NO_OWNER = Symbol("NO_OWNER")
301 private val ON_LOCK_ALREADY_LOCKED_BY_OWNER = Symbol("ALREADY_LOCKED_BY_OWNER")
302
303 private const val TRY_LOCK_SUCCESS = 0
304 private const val TRY_LOCK_FAILED = 1
305 private const val TRY_LOCK_ALREADY_LOCKED_BY_OWNER = 2
306
307 private const val HOLDS_LOCK_UNLOCKED = 0
308 private const val HOLDS_LOCK_YES = 1
309 private const val HOLDS_LOCK_ANOTHER_OWNER = 2
310