• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.internal
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlin.coroutines.*
6 import kotlin.jvm.*
7 
8 private val UNDEFINED = Symbol("UNDEFINED")
9 @JvmField
10 internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED")
11 
12 internal class DispatchedContinuation<in T>(
13     @JvmField internal val dispatcher: CoroutineDispatcher,
14     @JvmField val continuation: Continuation<T>
15 ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
16     @JvmField
17     @Suppress("PropertyName")
18     internal var _state: Any? = UNDEFINED
19     override val callerFrame: CoroutineStackFrame? get() = continuation as? CoroutineStackFrame
20     override fun getStackTraceElement(): StackTraceElement? = null
21     @JvmField // pre-cached value to avoid ctx.fold on every resumption
22     internal val countOrElement = threadContextElements(context)
23 
24     /**
25      * Possible states of reusability:
26      *
27      * 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
28      *     was used and then invalidated (e.g. because of the cancellation).
29      * 2) [CancellableContinuation]. Continuation to be/that is being reused.
30      * 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
31      *    ```
32      *    // state == null | CC
33      *    suspendCancellableCoroutineReusable { cont ->
34      *        // state == REUSABLE_CLAIMED
35      *        block(cont)
36      *    }
37      *    // state == CC
38      *    ```
39      * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
40      *    [CancellableContinuationImpl.getResult] will check for cancellation later.
41      *
42      * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
43      * In the `getResult`, we have the following code:
44      * ```
45      * if (trySuspend()) {
46      *     // <- at this moment current continuation can be redispatched and claimed again.
47      *     attachChildToParent()
48      *     releaseClaimedContinuation()
49      * }
50      * ```
51      */
52     private val _reusableCancellableContinuation = atomic<Any?>(null)
53 
54     private val reusableCancellableContinuation: CancellableContinuationImpl<*>?
55         get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
56 
57     internal fun isReusable(): Boolean {
58         /*
59         Invariant: caller.resumeMode.isReusableMode
60          * Reusability control:
61          * `null` -> no reusability at all, `false`
62          * anything else -> reusable.
63          */
64         return _reusableCancellableContinuation.value != null
65     }
66 
67     /**
68      * Awaits until previous call to `suspendCancellableCoroutineReusable` will
69      * stop mutating cached instance
70      */
71     internal fun awaitReusability() {
72         _reusableCancellableContinuation.loop {
73             if (it !== REUSABLE_CLAIMED) return
74         }
75     }
76 
77     internal fun release() {
78         /*
79          * Called from `releaseInterceptedContinuation`, can be concurrent with
80          * the code in `getResult` right after `trySuspend` returned `true`, so we have
81          * to wait for a release here.
82          */
83         awaitReusability()
84         reusableCancellableContinuation?.detachChild()
85     }
86 
87     /**
88      * Claims the continuation for [suspendCancellableCoroutineReusable] block,
89      * so all cancellations will be postponed.
90      */
91     @Suppress("UNCHECKED_CAST")
92     internal fun claimReusableCancellableContinuation(): CancellableContinuationImpl<T>? {
93         /*
94          * Transitions:
95          * 1) `null` -> claimed, caller will instantiate CC instance
96          * 2) `CC` -> claimed, caller will reuse CC instance
97          */
98         _reusableCancellableContinuation.loop { state ->
99             when {
100                 state === null -> {
101                     /*
102                      * null -> CC was not yet published -> we do not compete with cancel
103                      * -> can use plain store instead of CAS
104                      */
105                     _reusableCancellableContinuation.value = REUSABLE_CLAIMED
106                     return null
107                 }
108                 // potentially competing with cancel
109                 state is CancellableContinuationImpl<*> -> {
110                     if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
111                         return state as CancellableContinuationImpl<T>
112                     }
113                 }
114                 state === REUSABLE_CLAIMED -> {
115                     // Do nothing, wait until reusable instance will be returned from
116                     // getResult() of a previous `suspendCancellableCoroutineReusable`
117                 }
118                 state is Throwable -> {
119                     // Also do nothing, Throwable can only indicate that the CC
120                     // is in REUSABLE_CLAIMED state, but with postponed cancellation
121                 }
122                 else -> error("Inconsistent state $state")
123             }
124         }
125     }
126 
127     /**
128      * Checks whether there were any attempts to cancel reusable CC while it was in [REUSABLE_CLAIMED] state
129      * and returns cancellation cause if so, `null` otherwise.
130      * If continuation was cancelled, it becomes non-reusable.
131      *
132      * ```
133      * suspendCancellableCoroutineReusable { // <- claimed
134      * // Any asynchronous cancellation is "postponed" while this block
135      * // is being executed
136      * } // postponed cancellation is checked here in `getResult`
137      * ```
138      *
139      * See [CancellableContinuationImpl.getResult].
140      */
141     internal fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
142         _reusableCancellableContinuation.loop { state ->
143             // not when(state) to avoid Intrinsics.equals call
144             when {
145                 state === REUSABLE_CLAIMED -> {
146                     if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
147                 }
148                 state is Throwable -> {
149                     require(_reusableCancellableContinuation.compareAndSet(state, null))
150                     return state
151                 }
152                 else -> error("Inconsistent state $state")
153             }
154         }
155     }
156 
157     /**
158      * Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state.
159      * Returns `true` if cancellation is (or previously was) postponed, `false` otherwise.
160      */
161     internal fun postponeCancellation(cause: Throwable): Boolean {
162         _reusableCancellableContinuation.loop { state ->
163             when (state) {
164                 REUSABLE_CLAIMED -> {
165                     if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cause))
166                         return true
167                 }
168                 is Throwable -> return true
169                 else -> {
170                     // Invalidate
171                     if (_reusableCancellableContinuation.compareAndSet(state, null))
172                         return false
173                 }
174             }
175         }
176     }
177 
178     override fun takeState(): Any? {
179         val state = _state
180         assert { state !== UNDEFINED } // fail-fast if repeatedly invoked
181         _state = UNDEFINED
182         return state
183     }
184 
185     override val delegate: Continuation<T>
186         get() = this
187 
188     override fun resumeWith(result: Result<T>) {
189         val state = result.toState()
190         if (dispatcher.safeIsDispatchNeeded(context)) {
191             _state = state
192             resumeMode = MODE_ATOMIC
193             dispatcher.safeDispatch(context, this)
194         } else {
195             executeUnconfined(state, MODE_ATOMIC) {
196                 withCoroutineContext(context, countOrElement) {
197                     continuation.resumeWith(result)
198                 }
199             }
200         }
201     }
202 
203     // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
204     // It is used only in Continuation<T>.resumeCancellableWith
205     @Suppress("NOTHING_TO_INLINE")
206     internal inline fun resumeCancellableWith(result: Result<T>) {
207         val state = result.toState()
208         if (dispatcher.safeIsDispatchNeeded(context)) {
209             _state = state
210             resumeMode = MODE_CANCELLABLE
211             dispatcher.safeDispatch(context, this)
212         } else {
213             executeUnconfined(state, MODE_CANCELLABLE) {
214                 if (!resumeCancelled(state)) {
215                     resumeUndispatchedWith(result)
216                 }
217             }
218         }
219     }
220 
221     // inline here is to save us an entry on the stack for the sake of better stacktraces
222     @Suppress("NOTHING_TO_INLINE")
223     internal inline fun resumeCancelled(state: Any?): Boolean {
224         val job = context[Job]
225         if (job != null && !job.isActive) {
226             val cause = job.getCancellationException()
227             cancelCompletedResult(state, cause)
228             resumeWithException(cause)
229             return true
230         }
231         return false
232     }
233 
234     @Suppress("NOTHING_TO_INLINE")
235     internal inline fun resumeUndispatchedWith(result: Result<T>) {
236         withContinuationContext(continuation, countOrElement) {
237             continuation.resumeWith(result)
238         }
239     }
240 
241     // used by "yield" implementation
242     internal fun dispatchYield(context: CoroutineContext, value: T) {
243         _state = value
244         resumeMode = MODE_CANCELLABLE
245         dispatcher.dispatchYield(context, this)
246     }
247 
248     override fun toString(): String =
249         "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
250 }
251 
safeDispatchnull252 internal fun CoroutineDispatcher.safeDispatch(context: CoroutineContext, runnable: Runnable) {
253     try {
254         dispatch(context, runnable)
255     } catch (e: Throwable) {
256         throw DispatchException(e, this, context)
257     }
258 }
259 
safeIsDispatchNeedednull260 internal fun CoroutineDispatcher.safeIsDispatchNeeded(context: CoroutineContext): Boolean {
261     try {
262         return isDispatchNeeded(context)
263     } catch (e: Throwable) {
264         throw DispatchException(e, this, context)
265     }
266 }
267 
268 /**
269  * It is not inline to save bytecode (it is pretty big and used in many places)
270  * and we leave it public so that its name is not mangled in use stack traces if it shows there.
271  * It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher.
272  * @suppress **This an internal API and should not be used from general code.**
273  */
274 @InternalCoroutinesApi
resumeCancellableWithnull275 public fun <T> Continuation<T>.resumeCancellableWith(
276     result: Result<T>,
277 ): Unit = when (this) {
278     is DispatchedContinuation -> resumeCancellableWith(result)
279     else -> resumeWith(result)
280 }
281 
yieldUndispatchednull282 internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
283     executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
284         run()
285     }
286 
287 /**
288  * Executes given [block] as part of current event loop, updating current continuation
289  * mode and state if continuation is not resumed immediately.
290  * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
291  * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
292  */
executeUnconfinednull293 private inline fun DispatchedContinuation<*>.executeUnconfined(
294     contState: Any?, mode: Int, doYield: Boolean = false,
295     block: () -> Unit
296 ): Boolean {
297     assert { mode != MODE_UNINITIALIZED } // invalid execution mode
298     val eventLoop = ThreadLocalEventLoop.eventLoop
299     // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
300     if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
301     return if (eventLoop.isUnconfinedLoopActive) {
302         // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
303         _state = contState
304         resumeMode = mode
305         eventLoop.dispatchUnconfined(this)
306         true // queued into the active loop
307     } else {
308         // Was not active -- run event loop until all unconfined tasks are executed
309         runUnconfinedEventLoop(eventLoop, block = block)
310         false
311     }
312 }
313