• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.coroutines.*
10 import kotlin.coroutines.intrinsics.*
11 import kotlin.jvm.*
12 import kotlin.native.concurrent.*
13 
14 private const val UNDECIDED = 0
15 private const val SUSPENDED = 1
16 private const val RESUMED = 2
17 
18 @JvmField
19 @SharedImmutable
20 internal val RESUME_TOKEN = Symbol("RESUME_TOKEN")
21 
22 /**
23  * @suppress **This is unstable API and it is subject to change.**
24  */
25 @PublishedApi
26 internal open class CancellableContinuationImpl<in T>(
27     final override val delegate: Continuation<T>,
28     resumeMode: Int
29 ) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame {
30     init {
31         assert { resumeMode != MODE_UNINITIALIZED } // invalid mode for CancellableContinuationImpl
32     }
33 
34     public override val context: CoroutineContext = delegate.context
35 
36     /*
37      * Implementation notes
38      *
39      * CancellableContinuationImpl is a subset of Job with following limitations:
40      * 1) It can have only cancellation listener (no "on cancelling")
41      * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
42      * 3) It can have at most one cancellation listener
43      * 4) Its cancellation listeners cannot be deregistered
44      * As a consequence it has much simpler state machine, more lightweight machinery and
45      * less dependencies.
46      */
47 
48     /* decision state machine
49 
50         +-----------+   trySuspend   +-----------+
51         | UNDECIDED | -------------> | SUSPENDED |
52         +-----------+                +-----------+
53               |
54               | tryResume
55               V
56         +-----------+
57         |  RESUMED  |
58         +-----------+
59 
60         Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
61      */
62     private val _decision = atomic(UNDECIDED)
63 
64     /*
65        === Internal states ===
66        name        state class          public state    description
67        ------      ------------         ------------    -----------
68        ACTIVE      Active               : Active        active, no listeners
69        SINGLE_A    CancelHandler        : Active        active, one cancellation listener
70        CANCELLED   CancelledContinuation: Cancelled     cancelled (final state)
71        COMPLETED   any                  : Completed     produced some result or threw an exception (final state)
72      */
73     private val _state = atomic<Any?>(Active)
74 
75     private var parentHandle: DisposableHandle? = null
76 
77     internal val state: Any? get() = _state.value
78 
79     public override val isActive: Boolean get() = state is NotCompleted
80 
81     public override val isCompleted: Boolean get() = state !is NotCompleted
82 
83     public override val isCancelled: Boolean get() = state is CancelledContinuation
84 
85     // We cannot invoke `state.toString()` since it may cause a circular dependency
86     private val stateDebugRepresentation get() = when(state) {
87         is NotCompleted -> "Active"
88         is CancelledContinuation -> "Cancelled"
89         else -> "Completed"
90     }
91 
92     public override fun initCancellability() {
93         /*
94         * Invariant: at the moment of invocation, `this` has not yet
95         * leaked to user code and no one is able to invoke `resume` or `cancel`
96         * on it yet. Also, this function is not invoked for reusable continuations.
97         */
98         val handle = installParentHandle()
99             ?: return // fast path -- don't do anything without parent
100         // now check our state _after_ registering, could have completed while we were registering,
101         // but only if parent was cancelled. Parent could be in a "cancelling" state for a while,
102         // so we are helping it and cleaning the node ourselves
103         if (isCompleted) {
104             // Can be invoked concurrently in 'parentCancelled', no problems here
105             handle.dispose()
106             parentHandle = NonDisposableHandle
107         }
108     }
109 
110     private fun isReusable(): Boolean = resumeMode.isReusableMode && (delegate as DispatchedContinuation<*>).isReusable()
111 
112     /**
113      * Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work.
114      * Invariant: used only by [suspendCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
115      */
116     @JvmName("resetStateReusable") // Prettier stack traces
117     internal fun resetStateReusable(): Boolean {
118         assert { resumeMode == MODE_CANCELLABLE_REUSABLE }
119         assert { parentHandle !== NonDisposableHandle }
120         val state = _state.value
121         assert { state !is NotCompleted }
122         if (state is CompletedContinuation && state.idempotentResume != null) {
123             // Cannot reuse continuation that was resumed with idempotent marker
124             detachChild()
125             return false
126         }
127         _decision.value = UNDECIDED
128         _state.value = Active
129         return true
130     }
131 
132     public override val callerFrame: CoroutineStackFrame?
133         get() = delegate as? CoroutineStackFrame
134 
135     public override fun getStackTraceElement(): StackTraceElement? = null
136 
137     override fun takeState(): Any? = state
138 
139     // Note: takeState does not clear the state so we don't use takenState
140     // and we use the actual current state where in CAS-loop
141     override fun cancelCompletedResult(takenState: Any?, cause: Throwable): Unit = _state.loop { state ->
142         when (state) {
143             is NotCompleted -> error("Not completed")
144             is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do
145             is CompletedContinuation -> {
146                 check(!state.cancelled) { "Must be called at most once" }
147                 val update = state.copy(cancelCause = cause)
148                 if (_state.compareAndSet(state, update)) {
149                     state.invokeHandlers(this, cause)
150                     return // done
151                 }
152             }
153             else -> {
154                 // completed normally without marker class, promote to CompletedContinuation in case
155                 // if invokeOnCancellation if called later
156                 if (_state.compareAndSet(state, CompletedContinuation(state, cancelCause = cause))) {
157                     return // done
158                 }
159             }
160         }
161     }
162 
163     /*
164      * Attempt to postpone cancellation for reusable cancellable continuation
165      */
166     private fun cancelLater(cause: Throwable): Boolean {
167         // Ensure that we are postponing cancellation to the right reusable instance
168         if (!isReusable()) return false
169         val dispatched = delegate as DispatchedContinuation<*>
170         return dispatched.postponeCancellation(cause)
171     }
172 
173     public override fun cancel(cause: Throwable?): Boolean {
174         _state.loop { state ->
175             if (state !is NotCompleted) return false // false if already complete or cancelling
176             // Active -- update to final state
177             val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
178             if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
179             // Invoke cancel handler if it was present
180             (state as? CancelHandler)?.let { callCancelHandler(it, cause) }
181             // Complete state update
182             detachChildIfNonResuable()
183             dispatchResume(resumeMode) // no need for additional cancellation checks
184             return true
185         }
186     }
187 
188     internal fun parentCancelled(cause: Throwable) {
189         if (cancelLater(cause)) return
190         cancel(cause)
191         // Even if cancellation has failed, we should detach child to avoid potential leak
192         detachChildIfNonResuable()
193     }
194 
195     private inline fun callCancelHandlerSafely(block: () -> Unit) {
196         try {
197             block()
198         } catch (ex: Throwable) {
199             // Handler should never fail, if it does -- it is an unhandled exception
200             handleCoroutineException(
201                 context,
202                 CompletionHandlerException("Exception in invokeOnCancellation handler for $this", ex)
203             )
204         }
205     }
206 
207     private fun callCancelHandler(handler: CompletionHandler, cause: Throwable?) =
208         /*
209         * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
210         * because we play type tricks on Kotlin/JS and handler is not necessarily a function there
211         */
212         callCancelHandlerSafely { handler.invokeIt(cause) }
213 
214     fun callCancelHandler(handler: CancelHandler, cause: Throwable?) =
215         callCancelHandlerSafely { handler.invoke(cause) }
216 
217     fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
218         try {
219             onCancellation.invoke(cause)
220         } catch (ex: Throwable) {
221             // Handler should never fail, if it does -- it is an unhandled exception
222             handleCoroutineException(
223                 context,
224                 CompletionHandlerException("Exception in resume onCancellation handler for $this", ex)
225             )
226         }
227     }
228 
229     /**
230      * It is used when parent is cancelled to get the cancellation cause for this continuation.
231      */
232     open fun getContinuationCancellationCause(parent: Job): Throwable =
233         parent.getCancellationException()
234 
235     private fun trySuspend(): Boolean {
236         _decision.loop { decision ->
237             when (decision) {
238                 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
239                 RESUMED -> return false
240                 else -> error("Already suspended")
241             }
242         }
243     }
244 
245     private fun tryResume(): Boolean {
246         _decision.loop { decision ->
247             when (decision) {
248                 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
249                 SUSPENDED -> return false
250                 else -> error("Already resumed")
251             }
252         }
253     }
254 
255     @PublishedApi
256     internal fun getResult(): Any? {
257         val isReusable = isReusable()
258         // trySuspend may fail either if 'block' has resumed/cancelled a continuation
259         // or we got async cancellation from parent.
260         if (trySuspend()) {
261             /*
262              * Invariant: parentHandle is `null` *only* for reusable continuations.
263              * We were neither resumed nor cancelled, time to suspend.
264              * But first we have to install parent cancellation handle (if we didn't yet),
265              * so CC could be properly resumed on parent cancellation.
266              *
267              * This read has benign data-race with write of 'NonDisposableHandle'
268              * in 'detachChildIfNotReusable'.
269              */
270             if (parentHandle == null) {
271                 installParentHandle()
272             }
273             /*
274              * Release the continuation after installing the handle (if needed).
275              * If we were successful, then do nothing, it's ok to reuse the instance now.
276              * Otherwise, dispose the handle by ourselves.
277             */
278             if (isReusable) {
279                 releaseClaimedReusableContinuation()
280             }
281             return COROUTINE_SUSPENDED
282         }
283         // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
284         if (isReusable) {
285             // release claimed reusable continuation for the future reuse
286             releaseClaimedReusableContinuation()
287         }
288         val state = this.state
289         if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
290         // if the parent job was already cancelled, then throw the corresponding cancellation exception
291         // otherwise, there is a race if suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
292         // before the block returns. This getResult would return a result as opposed to cancellation
293         // exception that should have happened if the continuation is dispatched for execution later.
294         if (resumeMode.isCancellableMode) {
295             val job = context[Job]
296             if (job != null && !job.isActive) {
297                 val cause = job.getCancellationException()
298                 cancelCompletedResult(state, cause)
299                 throw recoverStackTrace(cause, this)
300             }
301         }
302         return getSuccessfulResult(state)
303     }
304 
305     private fun installParentHandle(): DisposableHandle? {
306         val parent = context[Job] ?: return null // don't do anything without a parent
307         // Install the handle
308         val handle = parent.invokeOnCompletion(
309             onCancelling = true,
310             handler = ChildContinuation(this).asHandler
311         )
312         parentHandle = handle
313         return handle
314     }
315 
316     /**
317      * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation,
318      * in which case it detaches from the parent and cancels this continuation.
319      */
320     private fun releaseClaimedReusableContinuation() {
321         // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
322         val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return
323         detachChild()
324         cancel(cancellationCause)
325     }
326 
327     override fun resumeWith(result: Result<T>) =
328         resumeImpl(result.toState(this), resumeMode)
329 
330     override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
331         resumeImpl(value, resumeMode, onCancellation)
332 
333     public override fun invokeOnCancellation(handler: CompletionHandler) {
334         val cancelHandler = makeCancelHandler(handler)
335         _state.loop { state ->
336             when (state) {
337                 is Active -> {
338                     if (_state.compareAndSet(state, cancelHandler)) return // quit on cas success
339                 }
340                 is CancelHandler -> multipleHandlersError(handler, state)
341                 is CompletedExceptionally -> {
342                     /*
343                      * Continuation was already cancelled or completed exceptionally.
344                      * NOTE: multiple invokeOnCancellation calls with different handlers are not allowed,
345                      * so we check to make sure handler was installed just once.
346                      */
347                     if (!state.makeHandled()) multipleHandlersError(handler, state)
348                     /*
349                      * Call the handler only if it was cancelled (not called when completed exceptionally).
350                      * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
351                      * because we play type tricks on Kotlin/JS and handler is not necessarily a function there
352                      */
353                     if (state is CancelledContinuation) {
354                         callCancelHandler(handler, (state as? CompletedExceptionally)?.cause)
355                     }
356                     return
357                 }
358                 is CompletedContinuation -> {
359                     /*
360                      * Continuation was already completed, and might already have cancel handler.
361                      */
362                     if (state.cancelHandler != null) multipleHandlersError(handler, state)
363                     // BeforeResumeCancelHandler does not need to be called on a completed continuation
364                     if (cancelHandler is BeforeResumeCancelHandler) return
365                     if (state.cancelled) {
366                         // Was already cancelled while being dispatched -- invoke the handler directly
367                         callCancelHandler(handler, state.cancelCause)
368                         return
369                     }
370                     val update = state.copy(cancelHandler = cancelHandler)
371                     if (_state.compareAndSet(state, update)) return // quit on cas success
372                 }
373                 else -> {
374                     /*
375                      * Continuation was already completed normally, but might get cancelled while being dispatched.
376                      * Change its state to CompletedContinuation, unless we have BeforeResumeCancelHandler which
377                      * does not need to be called in this case.
378                      */
379                     if (cancelHandler is BeforeResumeCancelHandler) return
380                     val update = CompletedContinuation(state, cancelHandler = cancelHandler)
381                     if (_state.compareAndSet(state, update)) return // quit on cas success
382                 }
383             }
384         }
385     }
386 
387     private fun multipleHandlersError(handler: CompletionHandler, state: Any?) {
388         error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
389     }
390 
391     private fun makeCancelHandler(handler: CompletionHandler): CancelHandler =
392         if (handler is CancelHandler) handler else InvokeOnCancel(handler)
393 
394     private fun dispatchResume(mode: Int) {
395         if (tryResume()) return // completed before getResult invocation -- bail out
396         // otherwise, getResult has already commenced, i.e. completed later or in other thread
397         dispatch(mode)
398     }
399 
400     private fun resumedState(
401         state: NotCompleted,
402         proposedUpdate: Any?,
403         resumeMode: Int,
404         onCancellation: ((cause: Throwable) -> Unit)?,
405         idempotent: Any?
406     ): Any? = when {
407         proposedUpdate is CompletedExceptionally -> {
408             assert { idempotent == null } // there are no idempotent exceptional resumes
409             assert { onCancellation == null } // only successful results can be cancelled
410             proposedUpdate
411         }
412         !resumeMode.isCancellableMode && idempotent == null -> proposedUpdate // cannot be cancelled in process, all is fine
413         onCancellation != null || (state is CancelHandler && state !is BeforeResumeCancelHandler) || idempotent != null ->
414             // mark as CompletedContinuation if special cases are present:
415             // Cancellation handlers that shall be called after resume or idempotent resume
416             CompletedContinuation(proposedUpdate, state as? CancelHandler, onCancellation, idempotent)
417         else -> proposedUpdate // simple case -- use the value directly
418     }
419 
420     private fun resumeImpl(
421         proposedUpdate: Any?,
422         resumeMode: Int,
423         onCancellation: ((cause: Throwable) -> Unit)? = null
424     ) {
425         _state.loop { state ->
426             when (state) {
427                 is NotCompleted -> {
428                     val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null)
429                     if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
430                     detachChildIfNonResuable()
431                     dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process
432                     return // done
433                 }
434                 is CancelledContinuation -> {
435                     /*
436                      * If continuation was cancelled, then resume attempt must be ignored,
437                      * because cancellation is asynchronous and may race with resume.
438                      * Racy exceptions will be lost, too.
439                      */
440                     if (state.makeResumed()) { // check if trying to resume one (otherwise error)
441                         // call onCancellation
442                         onCancellation?.let { callOnCancellation(it, state.cause) }
443                         return // done
444                     }
445                 }
446             }
447             alreadyResumedError(proposedUpdate) // otherwise, an error (second resume attempt)
448         }
449     }
450 
451     /**
452      * Similar to [tryResume], but does not actually completes resume (needs [completeResume] call).
453      * Returns [RESUME_TOKEN] when resumed, `null` when it was already resumed or cancelled.
454      */
455     private fun tryResumeImpl(
456         proposedUpdate: Any?,
457         idempotent: Any?,
458         onCancellation: ((cause: Throwable) -> Unit)?
459     ): Symbol? {
460         _state.loop { state ->
461             when (state) {
462                 is NotCompleted -> {
463                     val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent)
464                     if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
465                     detachChildIfNonResuable()
466                     return RESUME_TOKEN
467                 }
468                 is CompletedContinuation -> {
469                     return if (idempotent != null && state.idempotentResume === idempotent) {
470                         assert { state.result == proposedUpdate } // "Non-idempotent resume"
471                         RESUME_TOKEN // resumed with the same token -- ok
472                     } else {
473                         null // resumed with a different token or non-idempotent -- too late
474                     }
475                 }
476                 else -> return null // cannot resume -- not active anymore
477             }
478         }
479     }
480 
481     private fun alreadyResumedError(proposedUpdate: Any?): Nothing {
482         error("Already resumed, but proposed with update $proposedUpdate")
483     }
484 
485     // Unregister from parent job
486     private fun detachChildIfNonResuable() {
487         // If instance is reusable, do not detach on every reuse, #releaseInterceptedContinuation will do it for us in the end
488         if (!isReusable()) detachChild()
489     }
490 
491     /**
492      * Detaches from the parent.
493      */
494     internal fun detachChild() {
495         val handle = parentHandle ?: return
496         handle.dispose()
497         parentHandle = NonDisposableHandle
498     }
499 
500     // Note: Always returns RESUME_TOKEN | null
501     override fun tryResume(value: T, idempotent: Any?): Any? =
502         tryResumeImpl(value, idempotent, onCancellation = null)
503 
504     override fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? =
505         tryResumeImpl(value, idempotent, onCancellation)
506 
507     override fun tryResumeWithException(exception: Throwable): Any? =
508         tryResumeImpl(CompletedExceptionally(exception), idempotent = null, onCancellation = null)
509 
510     // note: token is always RESUME_TOKEN
511     override fun completeResume(token: Any) {
512         assert { token === RESUME_TOKEN }
513         dispatchResume(resumeMode)
514     }
515 
516     override fun CoroutineDispatcher.resumeUndispatched(value: T) {
517         val dc = delegate as? DispatchedContinuation
518         resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
519     }
520 
521     override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
522         val dc = delegate as? DispatchedContinuation
523         resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
524     }
525 
526     @Suppress("UNCHECKED_CAST")
527     override fun <T> getSuccessfulResult(state: Any?): T =
528         when (state) {
529             is CompletedContinuation -> state.result as T
530             else -> state as T
531         }
532 
533     // The exceptional state in CancellableContinuationImpl is stored directly and it is not recovered yet.
534     // The stacktrace recovery is invoked here.
535     override fun getExceptionalResult(state: Any?): Throwable? =
536         super.getExceptionalResult(state)?.let { recoverStackTrace(it, delegate) }
537 
538     // For nicer debugging
539     public override fun toString(): String =
540         "${nameString()}(${delegate.toDebugString()}){$stateDebugRepresentation}@$hexAddress"
541 
542     protected open fun nameString(): String =
543         "CancellableContinuation"
544 
545 }
546 
547 // Marker for active continuation
548 internal interface NotCompleted
549 
550 private object Active : NotCompleted {
toStringnull551     override fun toString(): String = "Active"
552 }
553 
554 /**
555  * Base class for all [CancellableContinuation.invokeOnCancellation] handlers to avoid an extra instance
556  * on JVM, yet support JS where you cannot extend from a functional type.
557  */
558 internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted
559 
560 /**
561  * Base class for all [CancellableContinuation.invokeOnCancellation] handlers that don't need to be invoked
562  * if continuation is cancelled after resumption, during dispatch, because the corresponding resources
563  * were already released before calling `resume`. This cancel handler is called only before `resume`.
564  * It avoids allocation of [CompletedContinuation] instance during resume on JVM.
565  */
566 internal abstract class BeforeResumeCancelHandler : CancelHandler()
567 
568 // Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly
569 private class InvokeOnCancel( // Clashes with InvokeOnCancellation
570     private val handler: CompletionHandler
571 ) : CancelHandler() {
572     override fun invoke(cause: Throwable?) {
573         handler.invoke(cause)
574     }
575     override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]"
576 }
577 
578 // Completed with additional metadata
579 private data class CompletedContinuation(
580     @JvmField val result: Any?,
581     @JvmField val cancelHandler: CancelHandler? = null, // installed via invokeOnCancellation
582     @JvmField val onCancellation: ((cause: Throwable) -> Unit)? = null, // installed via resume block
583     @JvmField val idempotentResume: Any? = null,
584     @JvmField val cancelCause: Throwable? = null
585 ) {
586     val cancelled: Boolean get() = cancelCause != null
587 
invokeHandlersnull588     fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) {
589         cancelHandler?.let { cont.callCancelHandler(it, cause) }
590         onCancellation?.let { cont.callOnCancellation(it, cause) }
591     }
592 }
593