• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.coroutines.*
10 import kotlin.coroutines.intrinsics.*
11 import kotlin.jvm.*
12 import kotlin.native.concurrent.*
13 
14 private const val UNDECIDED = 0
15 private const val SUSPENDED = 1
16 private const val RESUMED = 2
17 
18 @JvmField
19 @SharedImmutable
20 internal val RESUME_TOKEN = Symbol("RESUME_TOKEN")
21 
22 /**
23  * @suppress **This is unstable API and it is subject to change.**
24  */
25 @PublishedApi
26 internal open class CancellableContinuationImpl<in T>(
27     final override val delegate: Continuation<T>,
28     resumeMode: Int
29 ) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame {
30     init {
31         assert { resumeMode != MODE_UNINITIALIZED } // invalid mode for CancellableContinuationImpl
32     }
33 
34     public override val context: CoroutineContext = delegate.context
35 
36     /*
37      * Implementation notes
38      *
39      * CancellableContinuationImpl is a subset of Job with following limitations:
40      * 1) It can have only cancellation listener (no "on cancelling")
41      * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
42      * 3) It can have at most one cancellation listener
43      * 4) Its cancellation listeners cannot be deregistered
44      * As a consequence it has much simpler state machine, more lightweight machinery and
45      * less dependencies.
46      */
47 
48     /* decision state machine
49 
50         +-----------+   trySuspend   +-----------+
51         | UNDECIDED | -------------> | SUSPENDED |
52         +-----------+                +-----------+
53               |
54               | tryResume
55               V
56         +-----------+
57         |  RESUMED  |
58         +-----------+
59 
60         Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
61      */
62     private val _decision = atomic(UNDECIDED)
63 
64     /*
65        === Internal states ===
66        name        state class          public state    description
67        ------      ------------         ------------    -----------
68        ACTIVE      Active               : Active        active, no listeners
69        SINGLE_A    CancelHandler        : Active        active, one cancellation listener
70        CANCELLED   CancelledContinuation: Cancelled     cancelled (final state)
71        COMPLETED   any                  : Completed     produced some result or threw an exception (final state)
72      */
73     private val _state = atomic<Any?>(Active)
74 
75     private val _parentHandle = atomic<DisposableHandle?>(null)
76     private var parentHandle: DisposableHandle?
77         get() = _parentHandle.value
78         set(value) { _parentHandle.value = value }
79 
80     internal val state: Any? get() = _state.value
81 
82     public override val isActive: Boolean get() = state is NotCompleted
83 
84     public override val isCompleted: Boolean get() = state !is NotCompleted
85 
86     public override val isCancelled: Boolean get() = state is CancelledContinuation
87 
88     public override fun initCancellability() {
89         setupCancellation()
90     }
91 
92     private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)
93 
94     /**
95      * Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work.
96      * Invariant: used only by [suspendCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
97      */
98     @JvmName("resetStateReusable") // Prettier stack traces
99     internal fun resetStateReusable(): Boolean {
100         assert { resumeMode == MODE_CANCELLABLE_REUSABLE } // invalid mode for CancellableContinuationImpl
101         assert { parentHandle !== NonDisposableHandle }
102         val state = _state.value
103         assert { state !is NotCompleted }
104         if (state is CompletedContinuation && state.idempotentResume != null) {
105             // Cannot reuse continuation that was resumed with idempotent marker
106             detachChild()
107             return false
108         }
109         _decision.value = UNDECIDED
110         _state.value = Active
111         return true
112     }
113 
114     /**
115      * Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations.
116      * It is only invoked from an internal [getResult] function for reusable continuations
117      * and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere.
118      */
119     private fun setupCancellation() {
120         if (checkCompleted()) return
121         if (parentHandle !== null) return // fast path 2 -- was already initialized
122         val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
123         val handle = parent.invokeOnCompletion(
124             onCancelling = true,
125             handler = ChildContinuation(parent, this).asHandler
126         )
127         parentHandle = handle
128         // now check our state _after_ registering (could have completed while we were registering)
129         // Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us
130         if (isCompleted && !isReusable()) {
131             handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle
132             parentHandle = NonDisposableHandle // release it just in case, to aid GC
133         }
134     }
135 
136     private fun checkCompleted(): Boolean {
137         val completed = isCompleted
138         if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations
139         val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
140         val cause = dispatched.checkPostponedCancellation(this) ?: return completed
141         if (!completed) {
142             // Note: this cancel may fail if one more concurrent cancel is currently being invoked
143             cancel(cause)
144         }
145         return true
146     }
147 
148     public override val callerFrame: CoroutineStackFrame?
149         get() = delegate as? CoroutineStackFrame
150 
151     public override fun getStackTraceElement(): StackTraceElement? = null
152 
153     override fun takeState(): Any? = state
154 
155     // Note: takeState does not clear the state so we don't use takenState
156     // and we use the actual current state where in CAS-loop
157     override fun cancelCompletedResult(takenState: Any?, cause: Throwable): Unit = _state.loop { state ->
158         when (state) {
159             is NotCompleted -> error("Not completed")
160             is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do
161             is CompletedContinuation -> {
162                 check(!state.cancelled) { "Must be called at most once" }
163                 val update = state.copy(cancelCause = cause)
164                 if (_state.compareAndSet(state, update)) {
165                     state.invokeHandlers(this, cause)
166                     return // done
167                 }
168             }
169             else -> {
170                 // completed normally without marker class, promote to CompletedContinuation in case
171                 // if invokeOnCancellation if called later
172                 if (_state.compareAndSet(state, CompletedContinuation(state, cancelCause = cause))) {
173                     return // done
174                 }
175             }
176         }
177     }
178 
179     /*
180      * Attempt to postpone cancellation for reusable cancellable continuation
181      */
182     private fun cancelLater(cause: Throwable): Boolean {
183         if (!resumeMode.isReusableMode) return false
184         val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false
185         return dispatched.postponeCancellation(cause)
186     }
187 
188     public override fun cancel(cause: Throwable?): Boolean {
189         _state.loop { state ->
190             if (state !is NotCompleted) return false // false if already complete or cancelling
191             // Active -- update to final state
192             val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
193             if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
194             // Invoke cancel handler if it was present
195             (state as? CancelHandler)?.let { callCancelHandler(it, cause) }
196             // Complete state update
197             detachChildIfNonResuable()
198             dispatchResume(resumeMode) // no need for additional cancellation checks
199             return true
200         }
201     }
202 
203     internal fun parentCancelled(cause: Throwable) {
204         if (cancelLater(cause)) return
205         cancel(cause)
206         // Even if cancellation has failed, we should detach child to avoid potential leak
207         detachChildIfNonResuable()
208     }
209 
210     private inline fun callCancelHandlerSafely(block: () -> Unit) {
211         try {
212            block()
213         } catch (ex: Throwable) {
214             // Handler should never fail, if it does -- it is an unhandled exception
215             handleCoroutineException(
216                 context,
217                 CompletionHandlerException("Exception in invokeOnCancellation handler for $this", ex)
218             )
219         }
220     }
221 
222     private fun callCancelHandler(handler: CompletionHandler, cause: Throwable?) =
223         /*
224         * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
225         * because we play type tricks on Kotlin/JS and handler is not necessarily a function there
226         */
227         callCancelHandlerSafely { handler.invokeIt(cause) }
228 
229     fun callCancelHandler(handler: CancelHandler, cause: Throwable?) =
230         callCancelHandlerSafely { handler.invoke(cause) }
231 
232     fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
233         try {
234             onCancellation.invoke(cause)
235         } catch (ex: Throwable) {
236             // Handler should never fail, if it does -- it is an unhandled exception
237             handleCoroutineException(
238                 context,
239                 CompletionHandlerException("Exception in resume onCancellation handler for $this", ex)
240             )
241         }
242     }
243 
244     /**
245      * It is used when parent is cancelled to get the cancellation cause for this continuation.
246      */
247     open fun getContinuationCancellationCause(parent: Job): Throwable =
248         parent.getCancellationException()
249 
250     private fun trySuspend(): Boolean {
251         _decision.loop { decision ->
252             when (decision) {
253                 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
254                 RESUMED -> return false
255                 else -> error("Already suspended")
256             }
257         }
258     }
259 
260     private fun tryResume(): Boolean {
261         _decision.loop { decision ->
262             when (decision) {
263                 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
264                 SUSPENDED -> return false
265                 else -> error("Already resumed")
266             }
267         }
268     }
269 
270     @PublishedApi
271     internal fun getResult(): Any? {
272         setupCancellation()
273         if (trySuspend()) return COROUTINE_SUSPENDED
274         // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
275         val state = this.state
276         if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
277         // if the parent job was already cancelled, then throw the corresponding cancellation exception
278         // otherwise, there is a race if suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
279         // before the block returns. This getResult would return a result as opposed to cancellation
280         // exception that should have happened if the continuation is dispatched for execution later.
281         if (resumeMode.isCancellableMode) {
282             val job = context[Job]
283             if (job != null && !job.isActive) {
284                 val cause = job.getCancellationException()
285                 cancelCompletedResult(state, cause)
286                 throw recoverStackTrace(cause, this)
287             }
288         }
289         return getSuccessfulResult(state)
290     }
291 
292     override fun resumeWith(result: Result<T>) =
293         resumeImpl(result.toState(this), resumeMode)
294 
295     override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
296         resumeImpl(value, resumeMode, onCancellation)
297 
298     public override fun invokeOnCancellation(handler: CompletionHandler) {
299         val cancelHandler = makeCancelHandler(handler)
300         _state.loop { state ->
301             when (state) {
302                 is Active -> {
303                     if (_state.compareAndSet(state, cancelHandler)) return // quit on cas success
304                 }
305                 is CancelHandler -> multipleHandlersError(handler, state)
306                 is CompletedExceptionally -> {
307                     /*
308                      * Continuation was already cancelled or completed exceptionally.
309                      * NOTE: multiple invokeOnCancellation calls with different handlers are not allowed,
310                      * so we check to make sure handler was installed just once.
311                      */
312                     if (!state.makeHandled()) multipleHandlersError(handler, state)
313                     /*
314                      * Call the handler only if it was cancelled (not called when completed exceptionally).
315                      * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
316                      * because we play type tricks on Kotlin/JS and handler is not necessarily a function there
317                      */
318                     if (state is CancelledContinuation) {
319                         callCancelHandler(handler, (state as? CompletedExceptionally)?.cause)
320                     }
321                     return
322                 }
323                 is CompletedContinuation -> {
324                     /*
325                      * Continuation was already completed, and might already have cancel handler.
326                      */
327                     if (state.cancelHandler != null) multipleHandlersError(handler, state)
328                     // BeforeResumeCancelHandler does not need to be called on a completed continuation
329                     if (cancelHandler is BeforeResumeCancelHandler) return
330                     if (state.cancelled) {
331                         // Was already cancelled while being dispatched -- invoke the handler directly
332                         callCancelHandler(handler, state.cancelCause)
333                         return
334                     }
335                     val update = state.copy(cancelHandler = cancelHandler)
336                     if (_state.compareAndSet(state, update)) return // quit on cas success
337                 }
338                 else -> {
339                     /*
340                      * Continuation was already completed normally, but might get cancelled while being dispatched.
341                      * Change its state to CompletedContinuation, unless we have BeforeResumeCancelHandler which
342                      * does not need to be called in this case.
343                      */
344                     if (cancelHandler is BeforeResumeCancelHandler) return
345                     val update = CompletedContinuation(state, cancelHandler = cancelHandler)
346                     if (_state.compareAndSet(state, update)) return // quit on cas success
347                 }
348             }
349         }
350     }
351 
352     private fun multipleHandlersError(handler: CompletionHandler, state: Any?) {
353         error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
354     }
355 
356     private fun makeCancelHandler(handler: CompletionHandler): CancelHandler =
357         if (handler is CancelHandler) handler else InvokeOnCancel(handler)
358 
359     private fun dispatchResume(mode: Int) {
360         if (tryResume()) return // completed before getResult invocation -- bail out
361         // otherwise, getResult has already commenced, i.e. completed later or in other thread
362         dispatch(mode)
363     }
364 
365     private fun resumedState(
366         state: NotCompleted,
367         proposedUpdate: Any?,
368         resumeMode: Int,
369         onCancellation: ((cause: Throwable) -> Unit)?,
370         idempotent: Any?
371     ): Any? = when {
372         proposedUpdate is CompletedExceptionally -> {
373             assert { idempotent == null } // there are no idempotent exceptional resumes
374             assert { onCancellation == null } // only successful results can be cancelled
375             proposedUpdate
376         }
377         !resumeMode.isCancellableMode && idempotent == null -> proposedUpdate // cannot be cancelled in process, all is fine
378         onCancellation != null || (state is CancelHandler && state !is BeforeResumeCancelHandler) || idempotent != null ->
379             // mark as CompletedContinuation if special cases are present:
380             // Cancellation handlers that shall be called after resume or idempotent resume
381             CompletedContinuation(proposedUpdate, state as? CancelHandler, onCancellation, idempotent)
382         else -> proposedUpdate // simple case -- use the value directly
383     }
384 
385     private fun resumeImpl(
386         proposedUpdate: Any?,
387         resumeMode: Int,
388         onCancellation: ((cause: Throwable) -> Unit)? = null
389     ) {
390         _state.loop { state ->
391             when (state) {
392                 is NotCompleted -> {
393                     val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null)
394                     if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
395                     detachChildIfNonResuable()
396                     dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process
397                     return // done
398                 }
399                 is CancelledContinuation -> {
400                     /*
401                      * If continuation was cancelled, then resume attempt must be ignored,
402                      * because cancellation is asynchronous and may race with resume.
403                      * Racy exceptions will be lost, too.
404                      */
405                     if (state.makeResumed()) { // check if trying to resume one (otherwise error)
406                         // call onCancellation
407                         onCancellation?.let { callOnCancellation(it, state.cause) }
408                         return // done
409                     }
410                 }
411             }
412             alreadyResumedError(proposedUpdate) // otherwise, an error (second resume attempt)
413         }
414     }
415 
416     /**
417      * Similar to [tryResume], but does not actually completes resume (needs [completeResume] call).
418      * Returns [RESUME_TOKEN] when resumed, `null` when it was already resumed or cancelled.
419      */
420     private fun tryResumeImpl(
421         proposedUpdate: Any?,
422         idempotent: Any?,
423         onCancellation: ((cause: Throwable) -> Unit)?
424     ): Symbol? {
425         _state.loop { state ->
426             when (state) {
427                 is NotCompleted -> {
428                     val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent)
429                     if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
430                     detachChildIfNonResuable()
431                     return RESUME_TOKEN
432                 }
433                 is CompletedContinuation -> {
434                     return if (idempotent != null && state.idempotentResume === idempotent) {
435                         assert { state.result == proposedUpdate } // "Non-idempotent resume"
436                         RESUME_TOKEN // resumed with the same token -- ok
437                     } else {
438                         null // resumed with a different token or non-idempotent -- too late
439                     }
440                 }
441                 else -> return null // cannot resume -- not active anymore
442             }
443         }
444     }
445 
446     private fun alreadyResumedError(proposedUpdate: Any?): Nothing {
447         error("Already resumed, but proposed with update $proposedUpdate")
448     }
449 
450     // Unregister from parent job
451     private fun detachChildIfNonResuable() {
452         // If instance is reusable, do not detach on every reuse, #releaseInterceptedContinuation will do it for us in the end
453         if (!isReusable()) detachChild()
454     }
455 
456     /**
457      * Detaches from the parent.
458      * Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true`
459      */
460     internal fun detachChild() {
461         val handle = parentHandle
462         handle?.dispose()
463         parentHandle = NonDisposableHandle
464     }
465 
466     // Note: Always returns RESUME_TOKEN | null
467     override fun tryResume(value: T, idempotent: Any?): Any? =
468         tryResumeImpl(value, idempotent, onCancellation = null)
469 
470     override fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? =
471         tryResumeImpl(value, idempotent, onCancellation)
472 
473     override fun tryResumeWithException(exception: Throwable): Any? =
474         tryResumeImpl(CompletedExceptionally(exception), idempotent = null, onCancellation = null)
475 
476     // note: token is always RESUME_TOKEN
477     override fun completeResume(token: Any) {
478         assert { token === RESUME_TOKEN }
479         dispatchResume(resumeMode)
480     }
481 
482     override fun CoroutineDispatcher.resumeUndispatched(value: T) {
483         val dc = delegate as? DispatchedContinuation
484         resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
485     }
486 
487     override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
488         val dc = delegate as? DispatchedContinuation
489         resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
490     }
491 
492     @Suppress("UNCHECKED_CAST")
493     override fun <T> getSuccessfulResult(state: Any?): T =
494         when (state) {
495             is CompletedContinuation -> state.result as T
496             else -> state as T
497         }
498 
499     // The exceptional state in CancellableContinuationImpl is stored directly and it is not recovered yet.
500     // The stacktrace recovery is invoked here.
501     override fun getExceptionalResult(state: Any?): Throwable? =
502         super.getExceptionalResult(state)?.let { recoverStackTrace(it, delegate) }
503 
504     // For nicer debugging
505     public override fun toString(): String =
506         "${nameString()}(${delegate.toDebugString()}){$state}@$hexAddress"
507 
508     protected open fun nameString(): String =
509         "CancellableContinuation"
510 
511 }
512 
513 // Marker for active continuation
514 internal interface NotCompleted
515 
516 private object Active : NotCompleted {
toStringnull517     override fun toString(): String = "Active"
518 }
519 
520 /**
521  * Base class for all [CancellableContinuation.invokeOnCancellation] handlers to avoid an extra instance
522  * on JVM, yet support JS where you cannot extend from a functional type.
523  */
524 internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted
525 
526 /**
527  * Base class for all [CancellableContinuation.invokeOnCancellation] handlers that don't need to be invoked
528  * if continuation is cancelled after resumption, during dispatch, because the corresponding resources
529  * were already released before calling `resume`. This cancel handler is called only before `resume`.
530  * It avoids allocation of [CompletedContinuation] instance during resume on JVM.
531  */
532 internal abstract class BeforeResumeCancelHandler : CancelHandler()
533 
534 // Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly
535 private class InvokeOnCancel( // Clashes with InvokeOnCancellation
536     private val handler: CompletionHandler
537 ) : CancelHandler() {
538     override fun invoke(cause: Throwable?) {
539         handler.invoke(cause)
540     }
541     override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]"
542 }
543 
544 // Completed with additional metadata
545 private data class CompletedContinuation(
546     @JvmField val result: Any?,
547     @JvmField val cancelHandler: CancelHandler? = null, // installed via invokeOnCancellation
548     @JvmField val onCancellation: ((cause: Throwable) -> Unit)? = null, // installed via resume block
549     @JvmField val idempotentResume: Any? = null,
550     @JvmField val cancelCause: Throwable? = null
551 ) {
552     val cancelled: Boolean get() = cancelCause != null
553 
invokeHandlersnull554     fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) {
555         cancelHandler?.let { cont.callCancelHandler(it, cause) }
556         onCancellation?.let { cont.callOnCancellation(it, cause) }
557     }
558 }
559