<lambda>null1 package kotlinx.coroutines.sync
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.internal.*
6 import kotlinx.coroutines.selects.*
7 import kotlin.contracts.*
8 import kotlin.coroutines.CoroutineContext
9 import kotlin.jvm.*
10
11 /**
12 * Mutual exclusion for coroutines.
13 *
14 * Mutex has two states: _locked_ and _unlocked_.
15 * It is **non-reentrant**, that is invoking [lock] even from the same thread/coroutine that currently holds
16 * the lock still suspends the invoker.
17 *
18 * JVM API note:
19 * Memory semantic of the [Mutex] is similar to `synchronized` block on JVM:
20 * An unlock operation on a [Mutex] happens-before every subsequent successful lock on that [Mutex].
21 * Unsuccessful call to [tryLock] do not have any memory effects.
22 */
23 public interface Mutex {
24 /**
25 * Returns `true` if this mutex is locked.
26 */
27 public val isLocked: Boolean
28
29 /**
30 * Tries to lock this mutex, returning `false` if this mutex is already locked.
31 *
32 * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
33 * released at the end of your critical section, and [unlock] is never invoked before a successful
34 * lock acquisition.
35 *
36 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
37 * is already locked with the same token (same identity), this function throws [IllegalStateException].
38 */
39 public fun tryLock(owner: Any? = null): Boolean
40
41 /**
42 * Locks this mutex, suspending caller until the lock is acquired (in other words, while the lock is held elsewhere).
43 *
44 * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
45 * suspending function is waiting, this function immediately resumes with [CancellationException].
46 * There is a **prompt cancellation guarantee**: even if this function is ready to return the result, but was cancelled
47 * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
48 * This function releases the lock if it was already acquired by this function before the [CancellationException]
49 * was thrown.
50 *
51 * Note that this function does not check for cancellation when it is not suspended.
52 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
53 *
54 * Use [tryLock] to try acquiring the lock without waiting.
55 *
56 * This function is fair; suspended callers are resumed in first-in-first-out order.
57 *
58 * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
59 * released at the end of the critical section, and [unlock] is never invoked before a successful
60 * lock acquisition.
61 *
62 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
63 * is already locked with the same token (same identity), this function throws [IllegalStateException].
64 */
65 public suspend fun lock(owner: Any? = null)
66
67 /**
68 * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
69 * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
70 * the reference to this mutex is passed into the corresponding block.
71 */
72 @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " +
73 "For additional details please refer to #2794") // WARNING since 1.6.0
74 public val onLock: SelectClause2<Any?, Mutex>
75
76 /**
77 * Checks whether this mutex is locked by the specified owner.
78 *
79 * @return `true` when this mutex is locked by the specified owner;
80 * `false` if the mutex is not locked or locked by another owner.
81 */
82 public fun holdsLock(owner: Any): Boolean
83
84 /**
85 * Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked or
86 * was locked with a different owner token (by identity).
87 *
88 * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
89 * released at the end of the critical section, and [unlock] is never invoked before a successful
90 * lock acquisition.
91 *
92 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
93 * was locked with the different token (by identity), this function throws [IllegalStateException].
94 */
95 public fun unlock(owner: Any? = null)
96 }
97
98 /**
99 * Creates a [Mutex] instance.
100 * The mutex created is fair: lock is granted in first come, first served order.
101 *
102 * @param locked initial state of the mutex.
103 */
104 @Suppress("FunctionName")
Mutexnull105 public fun Mutex(locked: Boolean = false): Mutex =
106 MutexImpl(locked)
107
108 /**
109 * Executes the given [action] under this mutex's lock.
110 *
111 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
112 * is already locked with the same token (same identity), this function throws [IllegalStateException].
113 *
114 * @return the return value of the action.
115 */
116 @OptIn(ExperimentalContracts::class)
117 public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
118 contract {
119 callsInPlace(action, InvocationKind.EXACTLY_ONCE)
120 }
121 lock(owner)
122 return try {
123 action()
124 } finally {
125 unlock(owner)
126 }
127 }
128
129
130 internal open class MutexImpl(locked: Boolean) : SemaphoreAndMutexImpl(1, if (locked) 1 else 0), Mutex {
131 /**
132 * After the lock is acquired, the corresponding owner is stored in this field.
133 * The [unlock] operation checks the owner and either re-sets it to [NO_OWNER],
134 * if there is no waiting request, or to the owner of the suspended [lock] operation
135 * to be resumed, otherwise.
136 */
137 private val owner = atomic<Any?>(if (locked) null else NO_OWNER)
138
139 private val onSelectCancellationUnlockConstructor: OnCancellationConstructor =
ownernull140 { _: SelectInstance<*>, owner: Any?, _: Any? ->
141 { _, _, _ -> unlock(owner) }
142 }
143
144 override val isLocked: Boolean get() =
145 availablePermits == 0
146
holdsLocknull147 override fun holdsLock(owner: Any): Boolean = holdsLockImpl(owner) == HOLDS_LOCK_YES
148
149 /**
150 * [HOLDS_LOCK_UNLOCKED] if the mutex is unlocked
151 * [HOLDS_LOCK_YES] if the mutex is held with the specified [owner]
152 * [HOLDS_LOCK_ANOTHER_OWNER] if the mutex is held with a different owner
153 */
154 private fun holdsLockImpl(owner: Any?): Int {
155 while (true) {
156 // Is this mutex locked?
157 if (!isLocked) return HOLDS_LOCK_UNLOCKED
158 val curOwner = this.owner.value
159 // Wait in a spin-loop until the owner is set
160 if (curOwner === NO_OWNER) continue // <-- ATTENTION, BLOCKING PART HERE
161 // Check the owner
162 return if (curOwner === owner) HOLDS_LOCK_YES else HOLDS_LOCK_ANOTHER_OWNER
163 }
164 }
165
locknull166 override suspend fun lock(owner: Any?) {
167 if (tryLock(owner)) return
168 lockSuspend(owner)
169 }
170
lockSuspendnull171 private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> { cont ->
172 val contWithOwner = CancellableContinuationWithOwner(cont, owner)
173 acquire(contWithOwner)
174 }
175
tryLocknull176 override fun tryLock(owner: Any?): Boolean = when (tryLockImpl(owner)) {
177 TRY_LOCK_SUCCESS -> true
178 TRY_LOCK_FAILED -> false
179 TRY_LOCK_ALREADY_LOCKED_BY_OWNER -> error("This mutex is already locked by the specified owner: $owner")
180 else -> error("unexpected")
181 }
182
tryLockImplnull183 private fun tryLockImpl(owner: Any?): Int {
184 while (true) {
185 if (tryAcquire()) {
186 assert { this.owner.value === NO_OWNER }
187 this.owner.value = owner
188 return TRY_LOCK_SUCCESS
189 } else {
190 // The semaphore permit acquisition has failed.
191 // However, we need to check that this mutex is not
192 // locked by our owner.
193 if (owner == null) return TRY_LOCK_FAILED
194 when (holdsLockImpl(owner)) {
195 // This mutex is already locked by our owner.
196 HOLDS_LOCK_YES -> return TRY_LOCK_ALREADY_LOCKED_BY_OWNER
197 // This mutex is locked by another owner, `trylock(..)` must return `false`.
198 HOLDS_LOCK_ANOTHER_OWNER -> return TRY_LOCK_FAILED
199 // This mutex is no longer locked, restart the operation.
200 HOLDS_LOCK_UNLOCKED -> continue
201 }
202 }
203 }
204 }
205
unlocknull206 override fun unlock(owner: Any?) {
207 while (true) {
208 // Is this mutex locked?
209 check(isLocked) { "This mutex is not locked" }
210 // Read the owner, waiting until it is set in a spin-loop if required.
211 val curOwner = this.owner.value
212 if (curOwner === NO_OWNER) continue // <-- ATTENTION, BLOCKING PART HERE
213 // Check the owner.
214 check(curOwner === owner || owner == null) { "This mutex is locked by $curOwner, but $owner is expected" }
215 // Try to clean the owner first. We need to use CAS here to synchronize with concurrent `unlock(..)`-s.
216 if (!this.owner.compareAndSet(curOwner, NO_OWNER)) continue
217 // Release the semaphore permit at the end.
218 release()
219 return
220 }
221 }
222
223 @Suppress("UNCHECKED_CAST", "OverridingDeprecatedMember", "OVERRIDE_DEPRECATION")
224 override val onLock: SelectClause2<Any?, Mutex> get() = SelectClause2Impl(
225 clauseObject = this,
226 regFunc = MutexImpl::onLockRegFunction as RegistrationFunction,
227 processResFunc = MutexImpl::onLockProcessResult as ProcessResultFunction,
228 onCancellationConstructor = onSelectCancellationUnlockConstructor
229 )
230
onLockRegFunctionnull231 protected open fun onLockRegFunction(select: SelectInstance<*>, owner: Any?) {
232 if (owner != null && holdsLock(owner)) {
233 select.selectInRegistrationPhase(ON_LOCK_ALREADY_LOCKED_BY_OWNER)
234 } else {
235 onAcquireRegFunction(SelectInstanceWithOwner(select as SelectInstanceInternal<*>, owner), owner)
236 }
237 }
238
onLockProcessResultnull239 protected open fun onLockProcessResult(owner: Any?, result: Any?): Any? {
240 if (result == ON_LOCK_ALREADY_LOCKED_BY_OWNER) {
241 error("This mutex is already locked by the specified owner: $owner")
242 }
243 return this
244 }
245
246 @OptIn(InternalForInheritanceCoroutinesApi::class)
247 private inner class CancellableContinuationWithOwner(
248 @JvmField
249 val cont: CancellableContinuationImpl<Unit>,
250 @JvmField
251 val owner: Any?
<lambda>null252 ) : CancellableContinuation<Unit> by cont, Waiter by cont {
253 override fun <R : Unit> tryResume(
254 value: R,
255 idempotent: Any?,
256 onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
257 ): Any? {
258 assert { this@MutexImpl.owner.value === NO_OWNER }
259 val token = cont.tryResume(value, idempotent) { _, _, _ ->
260 assert { this@MutexImpl.owner.value.let { it === NO_OWNER || it === owner } }
261 this@MutexImpl.owner.value = owner
262 unlock(owner)
263 }
264 if (token != null) {
265 assert { this@MutexImpl.owner.value === NO_OWNER }
266 this@MutexImpl.owner.value = owner
267 }
268 return token
269 }
270
271 override fun <R : Unit> resume(
272 value: R,
273 onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
274 ) {
275 assert { this@MutexImpl.owner.value === NO_OWNER }
276 this@MutexImpl.owner.value = owner
277 cont.resume(value) { unlock(owner) }
278 }
279 }
280
281 private inner class SelectInstanceWithOwner<Q>(
282 @JvmField
283 val select: SelectInstanceInternal<Q>,
284 @JvmField
285 val owner: Any?
<lambda>null286 ) : SelectInstanceInternal<Q> by select {
287 override fun trySelect(clauseObject: Any, result: Any?): Boolean {
288 assert { this@MutexImpl.owner.value === NO_OWNER }
289 return select.trySelect(clauseObject, result).also { success ->
290 if (success) this@MutexImpl.owner.value = owner
291 }
292 }
293
294 override fun selectInRegistrationPhase(internalResult: Any?) {
295 assert { this@MutexImpl.owner.value === NO_OWNER }
296 this@MutexImpl.owner.value = owner
297 select.selectInRegistrationPhase(internalResult)
298 }
299 }
300
toStringnull301 override fun toString() = "Mutex@${hexAddress}[isLocked=$isLocked,owner=${owner.value}]"
302 }
303
304 private val NO_OWNER = Symbol("NO_OWNER")
305 private val ON_LOCK_ALREADY_LOCKED_BY_OWNER = Symbol("ALREADY_LOCKED_BY_OWNER")
306
307 private const val TRY_LOCK_SUCCESS = 0
308 private const val TRY_LOCK_FAILED = 1
309 private const val TRY_LOCK_ALREADY_LOCKED_BY_OWNER = 2
310
311 private const val HOLDS_LOCK_UNLOCKED = 0
312 private const val HOLDS_LOCK_YES = 1
313 private const val HOLDS_LOCK_ANOTHER_OWNER = 2
314