• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.sync
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.selects.*
11 import kotlin.contracts.*
12 import kotlin.coroutines.*
13 import kotlin.js.*
14 import kotlin.math.*
15 
16 /**
17  * A counting semaphore for coroutines that logically maintains a number of available permits.
18  * Each [acquire] takes a single permit or suspends until it is available.
19  * Each [release] adds a permit, potentially releasing a suspended acquirer.
20  * Semaphore is fair and maintains a FIFO order of acquirers.
21  *
22  * Semaphores are mostly used to limit the number of coroutines that have access to particular resource.
23  * Semaphore with `permits = 1` is essentially a [Mutex].
24  **/
25 public interface Semaphore {
26     /**
27      * Returns the current number of permits available in this semaphore.
28      */
29     public val availablePermits: Int
30 
31     /**
32      * Acquires a permit from this semaphore, suspending until one is available.
33      * All suspending acquirers are processed in first-in-first-out (FIFO) order.
34      *
35      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
36      * function is suspended, this function immediately resumes with [CancellationException].
37      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
38      * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
39      * This function releases the semaphore if it was already acquired by this function before the [CancellationException]
40      * was thrown.
41      *
42      * Note, that this function does not check for cancellation when it does not suspend.
43      * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically
44      * check for cancellation in tight loops if needed.
45      *
46      * Use [tryAcquire] to try to acquire a permit of this semaphore without suspension.
47      */
48     public suspend fun acquire()
49 
50     /**
51      * Tries to acquire a permit from this semaphore without suspension.
52      *
53      * @return `true` if a permit was acquired, `false` otherwise.
54      */
55     public fun tryAcquire(): Boolean
56 
57     /**
58      * Releases a permit, returning it into this semaphore. Resumes the first
59      * suspending acquirer if there is one at the point of invocation.
60      * Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire].
61      */
62     public fun release()
63 }
64 
65 /**
66  * Creates new [Semaphore] instance.
67  * @param permits the number of permits available in this semaphore.
68  * @param acquiredPermits the number of already acquired permits,
69  *        should be between `0` and `permits` (inclusively).
70  */
71 @Suppress("FunctionName")
Semaphorenull72 public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits)
73 
74 /**
75  * Executes the given [action], acquiring a permit from this semaphore at the beginning
76  * and releasing it after the [action] is completed.
77  *
78  * @return the return value of the [action].
79  */
80 @OptIn(ExperimentalContracts::class)
81 public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
82     contract {
83         callsInPlace(action, InvocationKind.EXACTLY_ONCE)
84     }
85 
86     acquire()
87     try {
88         return action()
89     } finally {
90         release()
91     }
92 }
93 
94 @Suppress("UNCHECKED_CAST")
95 internal open class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore {
96     /*
97        The queue of waiting acquirers is essentially an infinite array based on the list of segments
98        (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue
99        and dequeue operation, we increment the corresponding counter at the beginning of the operation
100        and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
101        works with an individual cell. We use the corresponding segment pointers to find the required ones.
102 
103        Here is a state machine for cells. Note that only one `acquire` and at most one `release` operation
104        can deal with each cell, and that `release` uses `getAndSet(PERMIT)` to perform transitions for performance reasons
105        so that the state `PERMIT` represents different logical states.
106 
107          +------+ `acquire` suspends   +------+   `release` tries    +--------+                    // if `cont.tryResume(..)` succeeds, then
108          | NULL | -------------------> | cont | -------------------> | PERMIT | (cont RETRIEVED)   // the corresponding `acquire` operation gets
109          +------+                      +------+   to resume `cont`   +--------+                    // a permit and the `release` one completes.
110             |                             |
111             |                             | `acquire` request is cancelled and the continuation is
112             | `release` comes             | replaced with a special `CANCEL` token to avoid memory leaks
113             | to the slot before          V
114             | `acquire` and puts    +-----------+   `release` has    +--------+
115             | a permit into the     | CANCELLED | -----------------> | PERMIT | (RElEASE FAILED)
116             | slot, waiting for     +-----------+        failed      +--------+
117             | `acquire` after
118             | that.
119             |
120             |           `acquire` gets   +-------+
121             |        +-----------------> | TAKEN | (ELIMINATION HAPPENED)
122             V        |    the permit     +-------+
123         +--------+   |
124         | PERMIT | -<
125         +--------+  |
126                     |  `release` has waited a bounded time,   +--------+
127                     +---------------------------------------> | BROKEN | (BOTH RELEASE AND ACQUIRE FAILED)
128                            but `acquire` has not come         +--------+
129     */
130 
131     private val head: AtomicRef<SemaphoreSegment>
132     private val deqIdx = atomic(0L)
133     private val tail: AtomicRef<SemaphoreSegment>
134     private val enqIdx = atomic(0L)
135 
136     init {
<lambda>null137         require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" }
<lambda>null138         require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" }
139         val s = SemaphoreSegment(0, null, 2)
140         head = atomic(s)
141         tail = atomic(s)
142     }
143 
144     /**
145      * This counter indicates the number of available permits if it is positive,
146      * or the negated number of waiters on this semaphore otherwise.
147      * Note, that 32-bit counter is enough here since the maximal number of available
148      * permits is [permits] which is [Int], and the maximum number of waiting acquirers
149      * cannot be greater than 2^31 in any real application.
150      */
151     private val _availablePermits = atomic(permits - acquiredPermits)
152     override val availablePermits: Int get() = max(_availablePermits.value, 0)
153 
_null154     private val onCancellationRelease = { _: Throwable -> release() }
155 
tryAcquirenull156     override fun tryAcquire(): Boolean {
157         while (true) {
158             // Get the current number of available permits.
159             val p = _availablePermits.value
160             // Is the number of available permits greater
161             // than the maximal one because of an incorrect
162             // `release()` call without a preceding `acquire()`?
163             // Change it to `permits` and start from the beginning.
164             if (p > permits) {
165                 coerceAvailablePermitsAtMaximum()
166                 continue
167             }
168             // Try to decrement the number of available
169             // permits if it is greater than zero.
170             if (p <= 0) return false
171             if (_availablePermits.compareAndSet(p, p - 1)) return true
172         }
173     }
174 
acquirenull175     override suspend fun acquire() {
176         // Decrement the number of available permits.
177         val p = decPermits()
178         // Is the permit acquired?
179         if (p > 0) return // permit acquired
180         // Try to suspend otherwise.
181         // While it looks better when the following function is inlined,
182         // it is important to make `suspend` function invocations in a way
183         // so that the tail-call optimization can be applied here.
184         acquireSlowPath()
185     }
186 
acquireSlowPathnull187     private suspend fun acquireSlowPath() = suspendCancellableCoroutineReusable<Unit> sc@ { cont ->
188         // Try to suspend.
189         if (addAcquireToQueue(cont)) return@sc
190         // The suspension has been failed
191         // due to the synchronous resumption mode.
192         // Restart the whole `acquire`.
193         acquire(cont)
194     }
195 
196     @JsName("acquireCont")
acquirenull197     protected fun acquire(waiter: CancellableContinuation<Unit>) = acquire(
198         waiter = waiter,
199         suspend = { cont -> addAcquireToQueue(cont as Waiter) },
contnull200         onAcquired = { cont -> cont.resume(Unit, onCancellationRelease) }
201     )
202 
203     @JsName("acquireInternal")
acquirenull204     private inline fun <W> acquire(waiter: W, suspend: (waiter: W) -> Boolean, onAcquired: (waiter: W) -> Unit) {
205         while (true) {
206             // Decrement the number of available permits at first.
207             val p = decPermits()
208             // Is the permit acquired?
209             if (p > 0) {
210                 onAcquired(waiter)
211                 return
212             }
213             // Permit has not been acquired, try to suspend.
214             if (suspend(waiter)) return
215         }
216     }
217 
218     // We do not fully support `onAcquire` as it is needed only for `Mutex.onLock`.
219     @Suppress("UNUSED_PARAMETER")
onAcquireRegFunctionnull220     protected fun onAcquireRegFunction(select: SelectInstance<*>, ignoredParam: Any?) =
221         acquire(
222             waiter = select,
223             suspend = { s -> addAcquireToQueue(s as Waiter) },
snull224             onAcquired = { s -> s.selectInRegistrationPhase(Unit) }
225         )
226 
227     /**
228      * Decrements the number of available permits
229      * and ensures that it is not greater than [permits]
230      * at the point of decrement. The last may happen
231      * due to an incorrect `release()` call without
232      * a preceding `acquire()`.
233      */
decPermitsnull234     private fun decPermits(): Int {
235         while (true) {
236             // Decrement the number of available permits.
237             val p = _availablePermits.getAndDecrement()
238             // Is the number of available permits greater
239             // than the maximal one due to an incorrect
240             // `release()` call without a preceding `acquire()`?
241             if (p > permits) continue
242             // The number of permits is correct, return it.
243             return p
244         }
245     }
246 
releasenull247     override fun release() {
248         while (true) {
249             // Increment the number of available permits.
250             val p = _availablePermits.getAndIncrement()
251             // Is this `release` call correct and does not
252             // exceed the maximal number of permits?
253             if (p >= permits) {
254                 // Revert the number of available permits
255                 // back to the correct one and fail with error.
256                 coerceAvailablePermitsAtMaximum()
257                 error("The number of released permits cannot be greater than $permits")
258             }
259             // Is there a waiter that should be resumed?
260             if (p >= 0) return
261             // Try to resume the first waiter, and
262             // restart the operation if either this
263             // first waiter is cancelled or
264             // due to `SYNC` resumption mode.
265             if (tryResumeNextFromQueue()) return
266         }
267     }
268 
269     /**
270      * Changes the number of available permits to
271      * [permits] if it became greater due to an
272      * incorrect [release] call.
273      */
coerceAvailablePermitsAtMaximumnull274     private fun coerceAvailablePermitsAtMaximum() {
275         while (true) {
276             val cur = _availablePermits.value
277             if (cur <= permits) break
278             if (_availablePermits.compareAndSet(cur, permits)) break
279         }
280     }
281 
282     /**
283      * Returns `false` if the received permit cannot be used and the calling operation should restart.
284      */
addAcquireToQueuenull285     private fun addAcquireToQueue(waiter: Waiter): Boolean {
286         val curTail = this.tail.value
287         val enqIdx = enqIdx.getAndIncrement()
288         val createNewSegment = ::createSegment
289         val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
290             createNewSegment = createNewSegment).segment // cannot be closed
291         val i = (enqIdx % SEGMENT_SIZE).toInt()
292         // the regular (fast) path -- if the cell is empty, try to install continuation
293         if (segment.cas(i, null, waiter)) { // installed continuation successfully
294             waiter.invokeOnCancellation(segment, i)
295             return true
296         }
297         // On CAS failure -- the cell must be either PERMIT or BROKEN
298         // If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it
299         if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair
300             /// This continuation is not yet published, but still can be cancelled via outer job
301             when (waiter) {
302                 is CancellableContinuation<*> -> {
303                     waiter as CancellableContinuation<Unit>
304                     waiter.resume(Unit, onCancellationRelease)
305                 }
306                 is SelectInstance<*> -> {
307                     waiter.selectInRegistrationPhase(Unit)
308                 }
309                 else -> error("unexpected: $waiter")
310             }
311             return true
312         }
313         assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it
314         return false // broken cell, need to retry on a different cell
315     }
316 
317     @Suppress("UNCHECKED_CAST")
tryResumeNextFromQueuenull318     private fun tryResumeNextFromQueue(): Boolean {
319         val curHead = this.head.value
320         val deqIdx = deqIdx.getAndIncrement()
321         val id = deqIdx / SEGMENT_SIZE
322         val createNewSegment = ::createSegment
323         val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
324             createNewSegment = createNewSegment).segment // cannot be closed
325         segment.cleanPrev()
326         if (segment.id > id) return false
327         val i = (deqIdx % SEGMENT_SIZE).toInt()
328         val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state
329         when {
330             cellState === null -> {
331                 // Acquire has not touched this cell yet, wait until it comes for a bounded time
332                 // The cell state can only transition from PERMIT to TAKEN by addAcquireToQueue
333                 repeat(MAX_SPIN_CYCLES) {
334                     if (segment.get(i) === TAKEN) return true
335                 }
336                 // Try to break the slot in order not to wait
337                 return !segment.cas(i, PERMIT, BROKEN)
338             }
339             cellState === CANCELLED -> return false // the acquirer has already been cancelled
340             else -> return cellState.tryResumeAcquire()
341         }
342     }
343 
tryResumeAcquirenull344     private fun Any.tryResumeAcquire(): Boolean = when(this) {
345         is CancellableContinuation<*> -> {
346             this as CancellableContinuation<Unit>
347             val token = tryResume(Unit, null, onCancellationRelease)
348             if (token != null) {
349                 completeResume(token)
350                 true
351             } else false
352         }
353         is SelectInstance<*> -> {
354             trySelect(this@SemaphoreImpl, Unit)
355         }
356         else -> error("unexpected: $this")
357     }
358 }
359 
createSegmentnull360 private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0)
361 
362 private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) : Segment<SemaphoreSegment>(id, prev, pointers) {
363     val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
364     override val numberOfSlots: Int get() = SEGMENT_SIZE
365 
366     @Suppress("NOTHING_TO_INLINE")
367     inline fun get(index: Int): Any? = acquirers[index].value
368 
369     @Suppress("NOTHING_TO_INLINE")
370     inline fun set(index: Int, value: Any?) {
371         acquirers[index].value = value
372     }
373 
374     @Suppress("NOTHING_TO_INLINE")
375     inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
376 
377     @Suppress("NOTHING_TO_INLINE")
378     inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value)
379 
380     // Cleans the acquirer slot located by the specified index
381     // and removes this segment physically if all slots are cleaned.
382     override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
383         // Clean the slot
384         set(index, CANCELLED)
385         // Remove this segment if needed
386         onSlotCleaned()
387     }
388 
389     override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
390 }
391 private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100)
392 private val PERMIT = Symbol("PERMIT")
393 private val TAKEN = Symbol("TAKEN")
394 private val BROKEN = Symbol("BROKEN")
395 private val CANCELLED = Symbol("CANCELLED")
396 private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16)
397