1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.sync
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.selects.*
11 import kotlin.contracts.*
12 import kotlin.coroutines.*
13 import kotlin.js.*
14 import kotlin.math.*
15
16 /**
17 * A counting semaphore for coroutines that logically maintains a number of available permits.
18 * Each [acquire] takes a single permit or suspends until it is available.
19 * Each [release] adds a permit, potentially releasing a suspended acquirer.
20 * Semaphore is fair and maintains a FIFO order of acquirers.
21 *
22 * Semaphores are mostly used to limit the number of coroutines that have access to particular resource.
23 * Semaphore with `permits = 1` is essentially a [Mutex].
24 **/
25 public interface Semaphore {
26 /**
27 * Returns the current number of permits available in this semaphore.
28 */
29 public val availablePermits: Int
30
31 /**
32 * Acquires a permit from this semaphore, suspending until one is available.
33 * All suspending acquirers are processed in first-in-first-out (FIFO) order.
34 *
35 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
36 * function is suspended, this function immediately resumes with [CancellationException].
37 * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
38 * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
39 * This function releases the semaphore if it was already acquired by this function before the [CancellationException]
40 * was thrown.
41 *
42 * Note, that this function does not check for cancellation when it does not suspend.
43 * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically
44 * check for cancellation in tight loops if needed.
45 *
46 * Use [tryAcquire] to try to acquire a permit of this semaphore without suspension.
47 */
48 public suspend fun acquire()
49
50 /**
51 * Tries to acquire a permit from this semaphore without suspension.
52 *
53 * @return `true` if a permit was acquired, `false` otherwise.
54 */
55 public fun tryAcquire(): Boolean
56
57 /**
58 * Releases a permit, returning it into this semaphore. Resumes the first
59 * suspending acquirer if there is one at the point of invocation.
60 * Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire].
61 */
62 public fun release()
63 }
64
65 /**
66 * Creates new [Semaphore] instance.
67 * @param permits the number of permits available in this semaphore.
68 * @param acquiredPermits the number of already acquired permits,
69 * should be between `0` and `permits` (inclusively).
70 */
71 @Suppress("FunctionName")
Semaphorenull72 public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits)
73
74 /**
75 * Executes the given [action], acquiring a permit from this semaphore at the beginning
76 * and releasing it after the [action] is completed.
77 *
78 * @return the return value of the [action].
79 */
80 @OptIn(ExperimentalContracts::class)
81 public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
82 contract {
83 callsInPlace(action, InvocationKind.EXACTLY_ONCE)
84 }
85
86 acquire()
87 try {
88 return action()
89 } finally {
90 release()
91 }
92 }
93
94 @Suppress("UNCHECKED_CAST")
95 internal open class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore {
96 /*
97 The queue of waiting acquirers is essentially an infinite array based on the list of segments
98 (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue
99 and dequeue operation, we increment the corresponding counter at the beginning of the operation
100 and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
101 works with an individual cell. We use the corresponding segment pointers to find the required ones.
102
103 Here is a state machine for cells. Note that only one `acquire` and at most one `release` operation
104 can deal with each cell, and that `release` uses `getAndSet(PERMIT)` to perform transitions for performance reasons
105 so that the state `PERMIT` represents different logical states.
106
107 +------+ `acquire` suspends +------+ `release` tries +--------+ // if `cont.tryResume(..)` succeeds, then
108 | NULL | -------------------> | cont | -------------------> | PERMIT | (cont RETRIEVED) // the corresponding `acquire` operation gets
109 +------+ +------+ to resume `cont` +--------+ // a permit and the `release` one completes.
110 | |
111 | | `acquire` request is cancelled and the continuation is
112 | `release` comes | replaced with a special `CANCEL` token to avoid memory leaks
113 | to the slot before V
114 | `acquire` and puts +-----------+ `release` has +--------+
115 | a permit into the | CANCELLED | -----------------> | PERMIT | (RElEASE FAILED)
116 | slot, waiting for +-----------+ failed +--------+
117 | `acquire` after
118 | that.
119 |
120 | `acquire` gets +-------+
121 | +-----------------> | TAKEN | (ELIMINATION HAPPENED)
122 V | the permit +-------+
123 +--------+ |
124 | PERMIT | -<
125 +--------+ |
126 | `release` has waited a bounded time, +--------+
127 +---------------------------------------> | BROKEN | (BOTH RELEASE AND ACQUIRE FAILED)
128 but `acquire` has not come +--------+
129 */
130
131 private val head: AtomicRef<SemaphoreSegment>
132 private val deqIdx = atomic(0L)
133 private val tail: AtomicRef<SemaphoreSegment>
134 private val enqIdx = atomic(0L)
135
136 init {
<lambda>null137 require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" }
<lambda>null138 require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" }
139 val s = SemaphoreSegment(0, null, 2)
140 head = atomic(s)
141 tail = atomic(s)
142 }
143
144 /**
145 * This counter indicates the number of available permits if it is positive,
146 * or the negated number of waiters on this semaphore otherwise.
147 * Note, that 32-bit counter is enough here since the maximal number of available
148 * permits is [permits] which is [Int], and the maximum number of waiting acquirers
149 * cannot be greater than 2^31 in any real application.
150 */
151 private val _availablePermits = atomic(permits - acquiredPermits)
152 override val availablePermits: Int get() = max(_availablePermits.value, 0)
153
_null154 private val onCancellationRelease = { _: Throwable -> release() }
155
tryAcquirenull156 override fun tryAcquire(): Boolean {
157 while (true) {
158 // Get the current number of available permits.
159 val p = _availablePermits.value
160 // Is the number of available permits greater
161 // than the maximal one because of an incorrect
162 // `release()` call without a preceding `acquire()`?
163 // Change it to `permits` and start from the beginning.
164 if (p > permits) {
165 coerceAvailablePermitsAtMaximum()
166 continue
167 }
168 // Try to decrement the number of available
169 // permits if it is greater than zero.
170 if (p <= 0) return false
171 if (_availablePermits.compareAndSet(p, p - 1)) return true
172 }
173 }
174
acquirenull175 override suspend fun acquire() {
176 // Decrement the number of available permits.
177 val p = decPermits()
178 // Is the permit acquired?
179 if (p > 0) return // permit acquired
180 // Try to suspend otherwise.
181 // While it looks better when the following function is inlined,
182 // it is important to make `suspend` function invocations in a way
183 // so that the tail-call optimization can be applied here.
184 acquireSlowPath()
185 }
186
acquireSlowPathnull187 private suspend fun acquireSlowPath() = suspendCancellableCoroutineReusable<Unit> sc@ { cont ->
188 // Try to suspend.
189 if (addAcquireToQueue(cont)) return@sc
190 // The suspension has been failed
191 // due to the synchronous resumption mode.
192 // Restart the whole `acquire`.
193 acquire(cont)
194 }
195
196 @JsName("acquireCont")
acquirenull197 protected fun acquire(waiter: CancellableContinuation<Unit>) = acquire(
198 waiter = waiter,
199 suspend = { cont -> addAcquireToQueue(cont as Waiter) },
contnull200 onAcquired = { cont -> cont.resume(Unit, onCancellationRelease) }
201 )
202
203 @JsName("acquireInternal")
acquirenull204 private inline fun <W> acquire(waiter: W, suspend: (waiter: W) -> Boolean, onAcquired: (waiter: W) -> Unit) {
205 while (true) {
206 // Decrement the number of available permits at first.
207 val p = decPermits()
208 // Is the permit acquired?
209 if (p > 0) {
210 onAcquired(waiter)
211 return
212 }
213 // Permit has not been acquired, try to suspend.
214 if (suspend(waiter)) return
215 }
216 }
217
218 // We do not fully support `onAcquire` as it is needed only for `Mutex.onLock`.
219 @Suppress("UNUSED_PARAMETER")
onAcquireRegFunctionnull220 protected fun onAcquireRegFunction(select: SelectInstance<*>, ignoredParam: Any?) =
221 acquire(
222 waiter = select,
223 suspend = { s -> addAcquireToQueue(s as Waiter) },
snull224 onAcquired = { s -> s.selectInRegistrationPhase(Unit) }
225 )
226
227 /**
228 * Decrements the number of available permits
229 * and ensures that it is not greater than [permits]
230 * at the point of decrement. The last may happen
231 * due to an incorrect `release()` call without
232 * a preceding `acquire()`.
233 */
decPermitsnull234 private fun decPermits(): Int {
235 while (true) {
236 // Decrement the number of available permits.
237 val p = _availablePermits.getAndDecrement()
238 // Is the number of available permits greater
239 // than the maximal one due to an incorrect
240 // `release()` call without a preceding `acquire()`?
241 if (p > permits) continue
242 // The number of permits is correct, return it.
243 return p
244 }
245 }
246
releasenull247 override fun release() {
248 while (true) {
249 // Increment the number of available permits.
250 val p = _availablePermits.getAndIncrement()
251 // Is this `release` call correct and does not
252 // exceed the maximal number of permits?
253 if (p >= permits) {
254 // Revert the number of available permits
255 // back to the correct one and fail with error.
256 coerceAvailablePermitsAtMaximum()
257 error("The number of released permits cannot be greater than $permits")
258 }
259 // Is there a waiter that should be resumed?
260 if (p >= 0) return
261 // Try to resume the first waiter, and
262 // restart the operation if either this
263 // first waiter is cancelled or
264 // due to `SYNC` resumption mode.
265 if (tryResumeNextFromQueue()) return
266 }
267 }
268
269 /**
270 * Changes the number of available permits to
271 * [permits] if it became greater due to an
272 * incorrect [release] call.
273 */
coerceAvailablePermitsAtMaximumnull274 private fun coerceAvailablePermitsAtMaximum() {
275 while (true) {
276 val cur = _availablePermits.value
277 if (cur <= permits) break
278 if (_availablePermits.compareAndSet(cur, permits)) break
279 }
280 }
281
282 /**
283 * Returns `false` if the received permit cannot be used and the calling operation should restart.
284 */
addAcquireToQueuenull285 private fun addAcquireToQueue(waiter: Waiter): Boolean {
286 val curTail = this.tail.value
287 val enqIdx = enqIdx.getAndIncrement()
288 val createNewSegment = ::createSegment
289 val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
290 createNewSegment = createNewSegment).segment // cannot be closed
291 val i = (enqIdx % SEGMENT_SIZE).toInt()
292 // the regular (fast) path -- if the cell is empty, try to install continuation
293 if (segment.cas(i, null, waiter)) { // installed continuation successfully
294 waiter.invokeOnCancellation(segment, i)
295 return true
296 }
297 // On CAS failure -- the cell must be either PERMIT or BROKEN
298 // If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it
299 if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair
300 /// This continuation is not yet published, but still can be cancelled via outer job
301 when (waiter) {
302 is CancellableContinuation<*> -> {
303 waiter as CancellableContinuation<Unit>
304 waiter.resume(Unit, onCancellationRelease)
305 }
306 is SelectInstance<*> -> {
307 waiter.selectInRegistrationPhase(Unit)
308 }
309 else -> error("unexpected: $waiter")
310 }
311 return true
312 }
313 assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it
314 return false // broken cell, need to retry on a different cell
315 }
316
317 @Suppress("UNCHECKED_CAST")
tryResumeNextFromQueuenull318 private fun tryResumeNextFromQueue(): Boolean {
319 val curHead = this.head.value
320 val deqIdx = deqIdx.getAndIncrement()
321 val id = deqIdx / SEGMENT_SIZE
322 val createNewSegment = ::createSegment
323 val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
324 createNewSegment = createNewSegment).segment // cannot be closed
325 segment.cleanPrev()
326 if (segment.id > id) return false
327 val i = (deqIdx % SEGMENT_SIZE).toInt()
328 val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state
329 when {
330 cellState === null -> {
331 // Acquire has not touched this cell yet, wait until it comes for a bounded time
332 // The cell state can only transition from PERMIT to TAKEN by addAcquireToQueue
333 repeat(MAX_SPIN_CYCLES) {
334 if (segment.get(i) === TAKEN) return true
335 }
336 // Try to break the slot in order not to wait
337 return !segment.cas(i, PERMIT, BROKEN)
338 }
339 cellState === CANCELLED -> return false // the acquirer has already been cancelled
340 else -> return cellState.tryResumeAcquire()
341 }
342 }
343
tryResumeAcquirenull344 private fun Any.tryResumeAcquire(): Boolean = when(this) {
345 is CancellableContinuation<*> -> {
346 this as CancellableContinuation<Unit>
347 val token = tryResume(Unit, null, onCancellationRelease)
348 if (token != null) {
349 completeResume(token)
350 true
351 } else false
352 }
353 is SelectInstance<*> -> {
354 trySelect(this@SemaphoreImpl, Unit)
355 }
356 else -> error("unexpected: $this")
357 }
358 }
359
createSegmentnull360 private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0)
361
362 private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) : Segment<SemaphoreSegment>(id, prev, pointers) {
363 val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
364 override val numberOfSlots: Int get() = SEGMENT_SIZE
365
366 @Suppress("NOTHING_TO_INLINE")
367 inline fun get(index: Int): Any? = acquirers[index].value
368
369 @Suppress("NOTHING_TO_INLINE")
370 inline fun set(index: Int, value: Any?) {
371 acquirers[index].value = value
372 }
373
374 @Suppress("NOTHING_TO_INLINE")
375 inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
376
377 @Suppress("NOTHING_TO_INLINE")
378 inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value)
379
380 // Cleans the acquirer slot located by the specified index
381 // and removes this segment physically if all slots are cleaned.
382 override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
383 // Clean the slot
384 set(index, CANCELLED)
385 // Remove this segment if needed
386 onSlotCleaned()
387 }
388
389 override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
390 }
391 private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100)
392 private val PERMIT = Symbol("PERMIT")
393 private val TAKEN = Symbol("TAKEN")
394 private val BROKEN = Symbol("BROKEN")
395 private val CANCELLED = Symbol("CANCELLED")
396 private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16)
397