• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.internal
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11 import kotlin.native.concurrent.*
12 
13 @SharedImmutable
14 private val UNDEFINED = Symbol("UNDEFINED")
15 @SharedImmutable
16 @JvmField
17 internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED")
18 
19 internal class DispatchedContinuation<in T>(
20     @JvmField val dispatcher: CoroutineDispatcher,
21     @JvmField val continuation: Continuation<T>
22 ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
23     @JvmField
24     @Suppress("PropertyName")
25     internal var _state: Any? = UNDEFINED
26     override val callerFrame: CoroutineStackFrame? get() = continuation as? CoroutineStackFrame
27     override fun getStackTraceElement(): StackTraceElement? = null
28     @JvmField // pre-cached value to avoid ctx.fold on every resumption
29     internal val countOrElement = threadContextElements(context)
30 
31     /**
32      * Possible states of reusability:
33      *
34      * 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
35      *     was used and then invalidated (e.g. because of the cancellation).
36      * 2) [CancellableContinuation]. Continuation to be/that is being reused.
37      * 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
38      *    ```
39      *    // state == null | CC
40      *    suspendCancellableCoroutineReusable { cont ->
41      *        // state == REUSABLE_CLAIMED
42      *        block(cont)
43      *    }
44      *    // state == CC
45      *    ```
46      * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
47      *    [CancellableContinuationImpl.getResult] will check for cancellation later.
48      *
49      * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
50      * In the `getResult`, we have the following code:
51      * ```
52      * if (trySuspend()) {
53      *     // <- at this moment current continuation can be redispatched and claimed again.
54      *     attachChildToParent()
55      *     releaseClaimedContinuation()
56      * }
57      * ```
58      */
59     private val _reusableCancellableContinuation = atomic<Any?>(null)
60 
61     private val reusableCancellableContinuation: CancellableContinuationImpl<*>?
62         get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
63 
64     fun isReusable(): Boolean {
65         /*
66         Invariant: caller.resumeMode.isReusableMode
67          * Reusability control:
68          * `null` -> no reusability at all, `false`
69          * anything else -> reusable.
70          */
71         return _reusableCancellableContinuation.value != null
72     }
73 
74     /**
75      * Awaits until previous call to `suspendCancellableCoroutineReusable` will
76      * stop mutating cached instance
77      */
78     fun awaitReusability() {
79         _reusableCancellableContinuation.loop {
80             if (it !== REUSABLE_CLAIMED) return
81         }
82     }
83 
84     fun release() {
85         /*
86          * Called from `releaseInterceptedContinuation`, can be concurrent with
87          * the code in `getResult` right after `trySuspend` returned `true`, so we have
88          * to wait for a release here.
89          */
90         awaitReusability()
91         reusableCancellableContinuation?.detachChild()
92     }
93 
94     /**
95      * Claims the continuation for [suspendCancellableCoroutineReusable] block,
96      * so all cancellations will be postponed.
97      */
98     @Suppress("UNCHECKED_CAST")
99     fun claimReusableCancellableContinuation(): CancellableContinuationImpl<T>? {
100         /*
101          * Transitions:
102          * 1) `null` -> claimed, caller will instantiate CC instance
103          * 2) `CC` -> claimed, caller will reuse CC instance
104          */
105         _reusableCancellableContinuation.loop { state ->
106             when {
107                 state === null -> {
108                     /*
109                      * null -> CC was not yet published -> we do not compete with cancel
110                      * -> can use plain store instead of CAS
111                      */
112                     _reusableCancellableContinuation.value = REUSABLE_CLAIMED
113                     return null
114                 }
115                 // potentially competing with cancel
116                 state is CancellableContinuationImpl<*> -> {
117                     if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
118                         return state as CancellableContinuationImpl<T>
119                     }
120                 }
121                 state === REUSABLE_CLAIMED -> {
122                     // Do nothing, wait until reusable instance will be returned from
123                     // getResult() of a previous `suspendCancellableCoroutineReusable`
124                 }
125                 state is Throwable -> {
126                     // Also do nothing, Throwable can only indicate that the CC
127                     // is in REUSABLE_CLAIMED state, but with postponed cancellation
128                 }
129                 else -> error("Inconsistent state $state")
130             }
131         }
132     }
133 
134     /**
135      * Checks whether there were any attempts to cancel reusable CC while it was in [REUSABLE_CLAIMED] state
136      * and returns cancellation cause if so, `null` otherwise.
137      * If continuation was cancelled, it becomes non-reusable.
138      *
139      * ```
140      * suspendCancellableCoroutineReusable { // <- claimed
141      * // Any asynchronous cancellation is "postponed" while this block
142      * // is being executed
143      * } // postponed cancellation is checked here in `getResult`
144      * ```
145      *
146      * See [CancellableContinuationImpl.getResult].
147      */
148     fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
149         _reusableCancellableContinuation.loop { state ->
150             // not when(state) to avoid Intrinsics.equals call
151             when {
152                 state === REUSABLE_CLAIMED -> {
153                     if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
154                 }
155                 state is Throwable -> {
156                     require(_reusableCancellableContinuation.compareAndSet(state, null))
157                     return state
158                 }
159                 else -> error("Inconsistent state $state")
160             }
161         }
162     }
163 
164     /**
165      * Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state.
166      * Returns `true` if cancellation is (or previously was) postponed, `false` otherwise.
167      */
168     fun postponeCancellation(cause: Throwable): Boolean {
169         _reusableCancellableContinuation.loop { state ->
170             when (state) {
171                 REUSABLE_CLAIMED -> {
172                     if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cause))
173                         return true
174                 }
175                 is Throwable -> return true
176                 else -> {
177                     // Invalidate
178                     if (_reusableCancellableContinuation.compareAndSet(state, null))
179                         return false
180                 }
181             }
182         }
183     }
184 
185     override fun takeState(): Any? {
186         val state = _state
187         assert { state !== UNDEFINED } // fail-fast if repeatedly invoked
188         _state = UNDEFINED
189         return state
190     }
191 
192     override val delegate: Continuation<T>
193         get() = this
194 
195     override fun resumeWith(result: Result<T>) {
196         val context = continuation.context
197         val state = result.toState()
198         if (dispatcher.isDispatchNeeded(context)) {
199             _state = state
200             resumeMode = MODE_ATOMIC
201             dispatcher.dispatch(context, this)
202         } else {
203             executeUnconfined(state, MODE_ATOMIC) {
204                 withCoroutineContext(this.context, countOrElement) {
205                     continuation.resumeWith(result)
206                 }
207             }
208         }
209     }
210 
211     // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
212     // It is used only in Continuation<T>.resumeCancellableWith
213     @Suppress("NOTHING_TO_INLINE")
214     inline fun resumeCancellableWith(
215         result: Result<T>,
216         noinline onCancellation: ((cause: Throwable) -> Unit)?
217     ) {
218         val state = result.toState(onCancellation)
219         if (dispatcher.isDispatchNeeded(context)) {
220             _state = state
221             resumeMode = MODE_CANCELLABLE
222             dispatcher.dispatch(context, this)
223         } else {
224             executeUnconfined(state, MODE_CANCELLABLE) {
225                 if (!resumeCancelled(state)) {
226                     resumeUndispatchedWith(result)
227                 }
228             }
229         }
230     }
231 
232     // takeState had already cleared the state so we cancel takenState here
233     override fun cancelCompletedResult(takenState: Any?, cause: Throwable) {
234         // It is Ok to call onCancellation here without try/catch around it, since this function only faces
235         // a "bound" cancellation handler that performs the safe call to the user-specified code.
236         if (takenState is CompletedWithCancellation) {
237             takenState.onCancellation(cause)
238         }
239     }
240 
241     @Suppress("NOTHING_TO_INLINE")
242     inline fun resumeCancelled(state: Any?): Boolean {
243         val job = context[Job]
244         if (job != null && !job.isActive) {
245             val cause = job.getCancellationException()
246             cancelCompletedResult(state, cause)
247             resumeWithException(cause)
248             return true
249         }
250         return false
251     }
252 
253     @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
254     inline fun resumeUndispatchedWith(result: Result<T>) {
255         withContinuationContext(continuation, countOrElement) {
256             continuation.resumeWith(result)
257         }
258     }
259 
260     // used by "yield" implementation
261     internal fun dispatchYield(context: CoroutineContext, value: T) {
262         _state = value
263         resumeMode = MODE_CANCELLABLE
264         dispatcher.dispatchYield(context, this)
265     }
266 
267     override fun toString(): String =
268         "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
269 }
270 
271 /**
272  * It is not inline to save bytecode (it is pretty big and used in many places)
273  * and we leave it public so that its name is not mangled in use stack traces if it shows there.
274  * It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher.
275  * @suppress **This an internal API and should not be used from general code.**
276  */
277 @InternalCoroutinesApi
resumeCancellableWithnull278 public fun <T> Continuation<T>.resumeCancellableWith(
279     result: Result<T>,
280     onCancellation: ((cause: Throwable) -> Unit)? = null
281 ): Unit = when (this) {
282     is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
283     else -> resumeWith(result)
284 }
285 
yieldUndispatchednull286 internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
287     executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
288         run()
289     }
290 
291 /**
292  * Executes given [block] as part of current event loop, updating current continuation
293  * mode and state if continuation is not resumed immediately.
294  * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
295  * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
296  */
executeUnconfinednull297 private inline fun DispatchedContinuation<*>.executeUnconfined(
298     contState: Any?, mode: Int, doYield: Boolean = false,
299     block: () -> Unit
300 ): Boolean {
301     assert { mode != MODE_UNINITIALIZED } // invalid execution mode
302     val eventLoop = ThreadLocalEventLoop.eventLoop
303     // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
304     if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
305     return if (eventLoop.isUnconfinedLoopActive) {
306         // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
307         _state = contState
308         resumeMode = mode
309         eventLoop.dispatchUnconfined(this)
310         true // queued into the active loop
311     } else {
312         // Was not active -- run event loop until all unconfined tasks are executed
313         runUnconfinedEventLoop(eventLoop, block = block)
314         false
315     }
316 }
317