• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.internal
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11 
12 private val UNDEFINED = Symbol("UNDEFINED")
13 @JvmField
14 internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED")
15 
16 @PublishedApi
17 internal class DispatchedContinuation<in T>(
18     @JvmField internal val dispatcher: CoroutineDispatcher,
19     // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
20     @JvmField val continuation: Continuation<T>
21 ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
22     @JvmField
23     @Suppress("PropertyName")
24     internal var _state: Any? = UNDEFINED
25     override val callerFrame: CoroutineStackFrame? get() = continuation as? CoroutineStackFrame
26     override fun getStackTraceElement(): StackTraceElement? = null
27     @JvmField // pre-cached value to avoid ctx.fold on every resumption
28     internal val countOrElement = threadContextElements(context)
29 
30     /**
31      * Possible states of reusability:
32      *
33      * 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
34      *     was used and then invalidated (e.g. because of the cancellation).
35      * 2) [CancellableContinuation]. Continuation to be/that is being reused.
36      * 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
37      *    ```
38      *    // state == null | CC
39      *    suspendCancellableCoroutineReusable { cont ->
40      *        // state == REUSABLE_CLAIMED
41      *        block(cont)
42      *    }
43      *    // state == CC
44      *    ```
45      * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
46      *    [CancellableContinuationImpl.getResult] will check for cancellation later.
47      *
48      * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
49      * In the `getResult`, we have the following code:
50      * ```
51      * if (trySuspend()) {
52      *     // <- at this moment current continuation can be redispatched and claimed again.
53      *     attachChildToParent()
54      *     releaseClaimedContinuation()
55      * }
56      * ```
57      */
58     private val _reusableCancellableContinuation = atomic<Any?>(null)
59 
60     private val reusableCancellableContinuation: CancellableContinuationImpl<*>?
61         get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
62 
63     internal fun isReusable(): Boolean {
64         /*
65         Invariant: caller.resumeMode.isReusableMode
66          * Reusability control:
67          * `null` -> no reusability at all, `false`
68          * anything else -> reusable.
69          */
70         return _reusableCancellableContinuation.value != null
71     }
72 
73     /**
74      * Awaits until previous call to `suspendCancellableCoroutineReusable` will
75      * stop mutating cached instance
76      */
77     internal fun awaitReusability() {
78         _reusableCancellableContinuation.loop {
79             if (it !== REUSABLE_CLAIMED) return
80         }
81     }
82 
83     internal fun release() {
84         /*
85          * Called from `releaseInterceptedContinuation`, can be concurrent with
86          * the code in `getResult` right after `trySuspend` returned `true`, so we have
87          * to wait for a release here.
88          */
89         awaitReusability()
90         reusableCancellableContinuation?.detachChild()
91     }
92 
93     /**
94      * Claims the continuation for [suspendCancellableCoroutineReusable] block,
95      * so all cancellations will be postponed.
96      */
97     @Suppress("UNCHECKED_CAST")
98     internal fun claimReusableCancellableContinuation(): CancellableContinuationImpl<T>? {
99         /*
100          * Transitions:
101          * 1) `null` -> claimed, caller will instantiate CC instance
102          * 2) `CC` -> claimed, caller will reuse CC instance
103          */
104         _reusableCancellableContinuation.loop { state ->
105             when {
106                 state === null -> {
107                     /*
108                      * null -> CC was not yet published -> we do not compete with cancel
109                      * -> can use plain store instead of CAS
110                      */
111                     _reusableCancellableContinuation.value = REUSABLE_CLAIMED
112                     return null
113                 }
114                 // potentially competing with cancel
115                 state is CancellableContinuationImpl<*> -> {
116                     if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
117                         return state as CancellableContinuationImpl<T>
118                     }
119                 }
120                 state === REUSABLE_CLAIMED -> {
121                     // Do nothing, wait until reusable instance will be returned from
122                     // getResult() of a previous `suspendCancellableCoroutineReusable`
123                 }
124                 state is Throwable -> {
125                     // Also do nothing, Throwable can only indicate that the CC
126                     // is in REUSABLE_CLAIMED state, but with postponed cancellation
127                 }
128                 else -> error("Inconsistent state $state")
129             }
130         }
131     }
132 
133     /**
134      * Checks whether there were any attempts to cancel reusable CC while it was in [REUSABLE_CLAIMED] state
135      * and returns cancellation cause if so, `null` otherwise.
136      * If continuation was cancelled, it becomes non-reusable.
137      *
138      * ```
139      * suspendCancellableCoroutineReusable { // <- claimed
140      * // Any asynchronous cancellation is "postponed" while this block
141      * // is being executed
142      * } // postponed cancellation is checked here in `getResult`
143      * ```
144      *
145      * See [CancellableContinuationImpl.getResult].
146      */
147     internal fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
148         _reusableCancellableContinuation.loop { state ->
149             // not when(state) to avoid Intrinsics.equals call
150             when {
151                 state === REUSABLE_CLAIMED -> {
152                     if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
153                 }
154                 state is Throwable -> {
155                     require(_reusableCancellableContinuation.compareAndSet(state, null))
156                     return state
157                 }
158                 else -> error("Inconsistent state $state")
159             }
160         }
161     }
162 
163     /**
164      * Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state.
165      * Returns `true` if cancellation is (or previously was) postponed, `false` otherwise.
166      */
167     internal fun postponeCancellation(cause: Throwable): Boolean {
168         _reusableCancellableContinuation.loop { state ->
169             when (state) {
170                 REUSABLE_CLAIMED -> {
171                     if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cause))
172                         return true
173                 }
174                 is Throwable -> return true
175                 else -> {
176                     // Invalidate
177                     if (_reusableCancellableContinuation.compareAndSet(state, null))
178                         return false
179                 }
180             }
181         }
182     }
183 
184     override fun takeState(): Any? {
185         val state = _state
186         assert { state !== UNDEFINED } // fail-fast if repeatedly invoked
187         _state = UNDEFINED
188         return state
189     }
190 
191     override val delegate: Continuation<T>
192         get() = this
193 
194     override fun resumeWith(result: Result<T>) {
195         val context = continuation.context
196         val state = result.toState()
197         if (dispatcher.isDispatchNeeded(context)) {
198             _state = state
199             resumeMode = MODE_ATOMIC
200             dispatcher.dispatch(context, this)
201         } else {
202             executeUnconfined(state, MODE_ATOMIC) {
203                 withCoroutineContext(this.context, countOrElement) {
204                     continuation.resumeWith(result)
205                 }
206             }
207         }
208     }
209 
210     // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
211     // It is used only in Continuation<T>.resumeCancellableWith
212     @Suppress("NOTHING_TO_INLINE")
213     internal inline fun resumeCancellableWith(
214         result: Result<T>,
215         noinline onCancellation: ((cause: Throwable) -> Unit)?
216     ) {
217         val state = result.toState(onCancellation)
218         if (dispatcher.isDispatchNeeded(context)) {
219             _state = state
220             resumeMode = MODE_CANCELLABLE
221             dispatcher.dispatch(context, this)
222         } else {
223             executeUnconfined(state, MODE_CANCELLABLE) {
224                 if (!resumeCancelled(state)) {
225                     resumeUndispatchedWith(result)
226                 }
227             }
228         }
229     }
230 
231     // takeState had already cleared the state so we cancel takenState here
232     override fun cancelCompletedResult(takenState: Any?, cause: Throwable) {
233         // It is Ok to call onCancellation here without try/catch around it, since this function only faces
234         // a "bound" cancellation handler that performs the safe call to the user-specified code.
235         if (takenState is CompletedWithCancellation) {
236             takenState.onCancellation(cause)
237         }
238     }
239 
240     // inline here is to save us an entry on the stack for the sake of better stacktraces
241     @Suppress("NOTHING_TO_INLINE")
242     internal inline fun resumeCancelled(state: Any?): Boolean {
243         val job = context[Job]
244         if (job != null && !job.isActive) {
245             val cause = job.getCancellationException()
246             cancelCompletedResult(state, cause)
247             resumeWithException(cause)
248             return true
249         }
250         return false
251     }
252 
253     @Suppress("NOTHING_TO_INLINE")
254     internal inline fun resumeUndispatchedWith(result: Result<T>) {
255         withContinuationContext(continuation, countOrElement) {
256             continuation.resumeWith(result)
257         }
258     }
259 
260     // used by "yield" implementation
261     internal fun dispatchYield(context: CoroutineContext, value: T) {
262         _state = value
263         resumeMode = MODE_CANCELLABLE
264         dispatcher.dispatchYield(context, this)
265     }
266 
267     override fun toString(): String =
268         "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
269 }
270 
271 /**
272  * It is not inline to save bytecode (it is pretty big and used in many places)
273  * and we leave it public so that its name is not mangled in use stack traces if it shows there.
274  * It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher.
275  * @suppress **This an internal API and should not be used from general code.**
276  */
277 @InternalCoroutinesApi
resumeCancellableWithnull278 public fun <T> Continuation<T>.resumeCancellableWith(
279     result: Result<T>,
280     onCancellation: ((cause: Throwable) -> Unit)? = null
281 ): Unit = when (this) {
282     is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
283     else -> resumeWith(result)
284 }
285 
yieldUndispatchednull286 internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
287     executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
288         run()
289     }
290 
291 /**
292  * Executes given [block] as part of current event loop, updating current continuation
293  * mode and state if continuation is not resumed immediately.
294  * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
295  * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
296  */
executeUnconfinednull297 private inline fun DispatchedContinuation<*>.executeUnconfined(
298     contState: Any?, mode: Int, doYield: Boolean = false,
299     block: () -> Unit
300 ): Boolean {
301     assert { mode != MODE_UNINITIALIZED } // invalid execution mode
302     val eventLoop = ThreadLocalEventLoop.eventLoop
303     // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
304     if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
305     return if (eventLoop.isUnconfinedLoopActive) {
306         // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
307         _state = contState
308         resumeMode = mode
309         eventLoop.dispatchUnconfined(this)
310         true // queued into the active loop
311     } else {
312         // Was not active -- run event loop until all unconfined tasks are executed
313         runUnconfinedEventLoop(eventLoop, block = block)
314         false
315     }
316 }
317