<lambda>null1 package kotlinx.coroutines
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.internal.*
5 import kotlin.coroutines.*
6 import kotlin.coroutines.intrinsics.*
7 import kotlin.jvm.*
8
9 private const val UNDECIDED = 0
10 private const val SUSPENDED = 1
11 private const val RESUMED = 2
12
13 private const val DECISION_SHIFT = 29
14 private const val INDEX_MASK = (1 shl DECISION_SHIFT) - 1
15 private const val NO_INDEX = INDEX_MASK
16
17 private inline val Int.decision get() = this shr DECISION_SHIFT
18 private inline val Int.index get() = this and INDEX_MASK
19 @Suppress("NOTHING_TO_INLINE")
20 private inline fun decisionAndIndex(decision: Int, index: Int) = (decision shl DECISION_SHIFT) + index
21
22 @JvmField
23 internal val RESUME_TOKEN = Symbol("RESUME_TOKEN")
24
25 /**
26 * @suppress **This is unstable API and it is subject to change.**
27 */
28 @OptIn(InternalForInheritanceCoroutinesApi::class)
29 @PublishedApi
30 internal open class CancellableContinuationImpl<in T>(
31 final override val delegate: Continuation<T>,
32 resumeMode: Int
33 ) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame, Waiter {
34 init {
35 assert { resumeMode != MODE_UNINITIALIZED } // invalid mode for CancellableContinuationImpl
36 }
37
38 public override val context: CoroutineContext = delegate.context
39
40 /*
41 * Implementation notes
42 *
43 * CancellableContinuationImpl is a subset of Job with following limitations:
44 * 1) It can have only cancellation listener (no "on cancelling")
45 * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
46 * 3) It can have at most one cancellation listener
47 * 4) Its cancellation listeners cannot be deregistered
48 * As a consequence it has much simpler state machine, more lightweight machinery and
49 * less dependencies.
50 */
51
52 /** decision state machine
53
54 +-----------+ trySuspend +-----------+
55 | UNDECIDED | -------------> | SUSPENDED |
56 +-----------+ +-----------+
57 |
58 | tryResume
59 V
60 +-----------+
61 | RESUMED |
62 +-----------+
63
64 Note: both tryResume and trySuspend can be invoked at most once, first invocation wins.
65 If the cancellation handler is specified via a [Segment] instance and the index in it
66 (so [Segment.onCancellation] should be called), the [_decisionAndIndex] field may store
67 this index additionally to the "decision" value.
68 */
69 private val _decisionAndIndex = atomic(decisionAndIndex(UNDECIDED, NO_INDEX))
70
71 /*
72 === Internal states ===
73 name state class public state description
74 ------ ------------ ------------ -----------
75 ACTIVE Active : Active active, no listeners
76 SINGLE_A CancelHandler : Active active, one cancellation listener
77 CANCELLED CancelledContinuation: Cancelled cancelled (final state)
78 COMPLETED any : Completed produced some result or threw an exception (final state)
79 */
80 private val _state = atomic<Any?>(Active)
81
82 /*
83 * This field has a concurrent rendezvous in the following scenario:
84 *
85 * - installParentHandle publishes this instance on T1
86 *
87 * T1 writes:
88 * - handle = installed; right after the installation
89 * - Shortly after: if (isComplete) handle = NonDisposableHandle
90 *
91 * Any other T writes if the parent job is cancelled in detachChild:
92 * - handle = NonDisposableHandle
93 *
94 * We want to preserve a strict invariant on parentHandle transition, allowing only three of them:
95 * null -> anyHandle
96 * anyHandle -> NonDisposableHandle
97 * null -> NonDisposableHandle
98 *
99 * With a guarantee that after disposal the only state handle may end up in is NonDisposableHandle
100 */
101 private val _parentHandle = atomic<DisposableHandle?>(null)
102 private val parentHandle: DisposableHandle?
103 get() = _parentHandle.value
104
105 internal val state: Any? get() = _state.value
106
107 public override val isActive: Boolean get() = state is NotCompleted
108
109 public override val isCompleted: Boolean get() = state !is NotCompleted
110
111 public override val isCancelled: Boolean get() = state is CancelledContinuation
112
113 // We cannot invoke `state.toString()` since it may cause a circular dependency
114 private val stateDebugRepresentation get() = when(state) {
115 is NotCompleted -> "Active"
116 is CancelledContinuation -> "Cancelled"
117 else -> "Completed"
118 }
119
120 public override fun initCancellability() {
121 /*
122 * Invariant: at the moment of invocation, `this` has not yet
123 * leaked to user code and no one is able to invoke `resume` or `cancel`
124 * on it yet. Also, this function is not invoked for reusable continuations.
125 */
126 val handle = installParentHandle()
127 ?: return // fast path -- don't do anything without parent
128 // now check our state _after_ registering, could have completed while we were registering,
129 // but only if parent was cancelled. Parent could be in a "cancelling" state for a while,
130 // so we are helping it and cleaning the node ourselves
131 if (isCompleted) {
132 // Can be invoked concurrently in 'parentCancelled', no problems here
133 handle.dispose()
134 _parentHandle.value = NonDisposableHandle
135 }
136 }
137
138 private fun isReusable(): Boolean = resumeMode.isReusableMode && (delegate as DispatchedContinuation<*>).isReusable()
139
140 /**
141 * Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work.
142 * Invariant: used only by [suspendCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
143 */
144 @JvmName("resetStateReusable") // Prettier stack traces
145 internal fun resetStateReusable(): Boolean {
146 assert { resumeMode == MODE_CANCELLABLE_REUSABLE }
147 assert { parentHandle !== NonDisposableHandle }
148 val state = _state.value
149 assert { state !is NotCompleted }
150 if (state is CompletedContinuation<*> && state.idempotentResume != null) {
151 // Cannot reuse continuation that was resumed with idempotent marker
152 detachChild()
153 return false
154 }
155 _decisionAndIndex.value = decisionAndIndex(UNDECIDED, NO_INDEX)
156 _state.value = Active
157 return true
158 }
159
160 public override val callerFrame: CoroutineStackFrame?
161 get() = delegate as? CoroutineStackFrame
162
163 public override fun getStackTraceElement(): StackTraceElement? = null
164
165 override fun takeState(): Any? = state
166
167 // Note: takeState does not clear the state so we don't use takenState
168 // and we use the actual current state where in CAS-loop
169 override fun cancelCompletedResult(takenState: Any?, cause: Throwable): Unit = _state.loop { state ->
170 when (state) {
171 is NotCompleted -> error("Not completed")
172 is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do
173 is CompletedContinuation<*> -> {
174 check(!state.cancelled) { "Must be called at most once" }
175 val update = state.copy(cancelCause = cause)
176 if (_state.compareAndSet(state, update)) {
177 state.invokeHandlers(this, cause)
178 return // done
179 }
180 }
181 else -> {
182 // completed normally without marker class, promote to CompletedContinuation in case
183 // if invokeOnCancellation if called later
184 if (_state.compareAndSet(state, CompletedContinuation(state, cancelCause = cause))) {
185 return // done
186 }
187 }
188 }
189 }
190
191 /*
192 * Attempt to postpone cancellation for reusable cancellable continuation
193 */
194 private fun cancelLater(cause: Throwable): Boolean {
195 // Ensure that we are postponing cancellation to the right reusable instance
196 if (!isReusable()) return false
197 val dispatched = delegate as DispatchedContinuation<*>
198 return dispatched.postponeCancellation(cause)
199 }
200
201 public override fun cancel(cause: Throwable?): Boolean {
202 _state.loop { state ->
203 if (state !is NotCompleted) return false // false if already complete or cancelling
204 // Active -- update to final state
205 val update = CancelledContinuation(this, cause, handled = state is CancelHandler || state is Segment<*>)
206 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
207 // Invoke cancel handler if it was present
208 when (state) {
209 is CancelHandler -> callCancelHandler(state, cause)
210 is Segment<*> -> callSegmentOnCancellation(state, cause)
211 }
212 // Complete state update
213 detachChildIfNonReusable()
214 dispatchResume(resumeMode) // no need for additional cancellation checks
215 return true
216 }
217 }
218
219 internal fun parentCancelled(cause: Throwable) {
220 if (cancelLater(cause)) return
221 cancel(cause)
222 // Even if cancellation has failed, we should detach child to avoid potential leak
223 detachChildIfNonReusable()
224 }
225
226 private inline fun callCancelHandlerSafely(block: () -> Unit) {
227 try {
228 block()
229 } catch (ex: Throwable) {
230 // Handler should never fail, if it does -- it is an unhandled exception
231 handleCoroutineException(
232 context,
233 CompletionHandlerException("Exception in invokeOnCancellation handler for $this", ex)
234 )
235 }
236 }
237
238 fun callCancelHandler(handler: CancelHandler, cause: Throwable?) =
239 callCancelHandlerSafely { handler.invoke(cause) }
240
241 private fun callSegmentOnCancellation(segment: Segment<*>, cause: Throwable?) {
242 val index = _decisionAndIndex.value.index
243 check(index != NO_INDEX) { "The index for Segment.onCancellation(..) is broken" }
244 callCancelHandlerSafely { segment.onCancellation(index, cause, context) }
245 }
246
247 fun <R> callOnCancellation(
248 onCancellation: (cause: Throwable, value: R, context: CoroutineContext) -> Unit,
249 cause: Throwable,
250 value: R
251 ) {
252 try {
253 onCancellation.invoke(cause, value, context)
254 } catch (ex: Throwable) {
255 // Handler should never fail, if it does -- it is an unhandled exception
256 handleCoroutineException(
257 context,
258 CompletionHandlerException("Exception in resume onCancellation handler for $this", ex)
259 )
260 }
261 }
262
263 /**
264 * It is used when parent is cancelled to get the cancellation cause for this continuation.
265 */
266 open fun getContinuationCancellationCause(parent: Job): Throwable =
267 parent.getCancellationException()
268
269 private fun trySuspend(): Boolean {
270 _decisionAndIndex.loop { cur ->
271 when (cur.decision) {
272 UNDECIDED -> if (this._decisionAndIndex.compareAndSet(cur, decisionAndIndex(SUSPENDED, cur.index))) return true
273 RESUMED -> return false
274 else -> error("Already suspended")
275 }
276 }
277 }
278
279 private fun tryResume(): Boolean {
280 _decisionAndIndex.loop { cur ->
281 when (cur.decision) {
282 UNDECIDED -> if (this._decisionAndIndex.compareAndSet(cur, decisionAndIndex(RESUMED, cur.index))) return true
283 SUSPENDED -> return false
284 else -> error("Already resumed")
285 }
286 }
287 }
288
289 @PublishedApi
290 internal fun getResult(): Any? {
291 val isReusable = isReusable()
292 // trySuspend may fail either if 'block' has resumed/cancelled a continuation,
293 // or we got async cancellation from parent.
294 if (trySuspend()) {
295 /*
296 * Invariant: parentHandle is `null` *only* for reusable continuations.
297 * We were neither resumed nor cancelled, time to suspend.
298 * But first we have to install parent cancellation handle (if we didn't yet),
299 * so CC could be properly resumed on parent cancellation.
300 *
301 * This read has benign data-race with write of 'NonDisposableHandle'
302 * in 'detachChildIfNotReusable'.
303 */
304 if (parentHandle == null) {
305 installParentHandle()
306 }
307 /*
308 * Release the continuation after installing the handle (if needed).
309 * If we were successful, then do nothing, it's ok to reuse the instance now.
310 * Otherwise, dispose the handle by ourselves.
311 */
312 if (isReusable) {
313 releaseClaimedReusableContinuation()
314 }
315 return COROUTINE_SUSPENDED
316 }
317 // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
318 if (isReusable) {
319 // release claimed reusable continuation for the future reuse
320 releaseClaimedReusableContinuation()
321 }
322 val state = this.state
323 if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
324 // if the parent job was already cancelled, then throw the corresponding cancellation exception
325 // otherwise, there is a race if suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
326 // before the block returns. This getResult would return a result as opposed to cancellation
327 // exception that should have happened if the continuation is dispatched for execution later.
328 if (resumeMode.isCancellableMode) {
329 val job = context[Job]
330 if (job != null && !job.isActive) {
331 val cause = job.getCancellationException()
332 cancelCompletedResult(state, cause)
333 throw recoverStackTrace(cause, this)
334 }
335 }
336 return getSuccessfulResult(state)
337 }
338
339 private fun installParentHandle(): DisposableHandle? {
340 val parent = context[Job] ?: return null // don't do anything without a parent
341 // Install the handle
342 val handle = parent.invokeOnCompletion(handler = ChildContinuation(this))
343 _parentHandle.compareAndSet(null, handle)
344 return handle
345 }
346
347 /**
348 * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation,
349 * in which case it detaches from the parent and cancels this continuation.
350 */
351 internal fun releaseClaimedReusableContinuation() {
352 // Cannot be cast if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
353 val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return
354 detachChild()
355 cancel(cancellationCause)
356 }
357
358 override fun resumeWith(result: Result<T>) =
359 resumeImpl(result.toState(this), resumeMode)
360
361 @Suppress("OVERRIDE_DEPRECATION")
362 override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
363 resumeImpl(value, resumeMode, onCancellation?.let { { cause, _, _ -> onCancellation(cause) } })
364
365 override fun <R : T> resume(
366 value: R,
367 onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
368 ) =
369 resumeImpl(value, resumeMode, onCancellation)
370
371 /**
372 * An optimized version for the code below that does not allocate
373 * a cancellation handler object and efficiently stores the specified
374 * [segment] and [index] in this [CancellableContinuationImpl].
375 *
376 * The only difference is that `segment.onCancellation(..)` is never
377 * called if this continuation is already completed;
378 *
379 * ```
380 * invokeOnCancellation { cause ->
381 * segment.onCancellation(index, cause)
382 * }
383 * ```
384 */
385 override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
386 _decisionAndIndex.update {
387 check(it.index == NO_INDEX) {
388 "invokeOnCancellation should be called at most once"
389 }
390 decisionAndIndex(it.decision, index)
391 }
392 invokeOnCancellationImpl(segment)
393 }
394
395 override fun invokeOnCancellation(handler: CompletionHandler) =
396 invokeOnCancellation(CancelHandler.UserSupplied(handler))
397
398 internal fun invokeOnCancellationInternal(handler: CancelHandler) = invokeOnCancellationImpl(handler)
399
400 private fun invokeOnCancellationImpl(handler: Any) {
401 assert { handler is CancelHandler || handler is Segment<*> }
402 _state.loop { state ->
403 when (state) {
404 is Active -> {
405 if (_state.compareAndSet(state, handler)) return // quit on cas success
406 }
407 is CancelHandler, is Segment<*> -> multipleHandlersError(handler, state)
408 is CompletedExceptionally -> {
409 /*
410 * Continuation was already cancelled or completed exceptionally.
411 * NOTE: multiple invokeOnCancellation calls with different handlers are not allowed,
412 * so we check to make sure handler was installed just once.
413 */
414 if (!state.makeHandled()) multipleHandlersError(handler, state)
415 /*
416 * Call the handler only if it was cancelled (not called when completed exceptionally).
417 * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
418 * because we play type tricks on Kotlin/JS and handler is not necessarily a function there
419 */
420 if (state is CancelledContinuation) {
421 val cause: Throwable? = (state as? CompletedExceptionally)?.cause
422 if (handler is CancelHandler) {
423 callCancelHandler(handler, cause)
424 } else {
425 val segment = handler as Segment<*>
426 callSegmentOnCancellation(segment, cause)
427 }
428 }
429 return
430 }
431
432 is CompletedContinuation<*> -> {
433 /*
434 * Continuation was already completed, and might already have cancel handler.
435 */
436 if (state.cancelHandler != null) multipleHandlersError(handler, state)
437 // Segment.invokeOnCancellation(..) does NOT need to be called on completed continuation.
438 if (handler is Segment<*>) return
439 handler as CancelHandler
440 if (state.cancelled) {
441 // Was already cancelled while being dispatched -- invoke the handler directly
442 callCancelHandler(handler, state.cancelCause)
443 return
444 }
445 val update = state.copy(cancelHandler = handler)
446 if (_state.compareAndSet(state, update)) return // quit on cas success
447 }
448 else -> {
449 /*
450 * Continuation was already completed normally, but might get cancelled while being dispatched.
451 * Change its state to CompletedContinuation, unless we have Segment which
452 * does not need to be called in this case.
453 */
454 if (handler is Segment<*>) return
455 handler as CancelHandler
456 val update = CompletedContinuation(state, cancelHandler = handler)
457 if (_state.compareAndSet(state, update)) return // quit on cas success
458 }
459 }
460 }
461 }
462
463 private fun multipleHandlersError(handler: Any, state: Any?) {
464 error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
465 }
466
467 private fun dispatchResume(mode: Int) {
468 if (tryResume()) return // completed before getResult invocation -- bail out
469 // otherwise, getResult has already commenced, i.e. completed later or in other thread
470 dispatch(mode)
471 }
472
473 private fun <R> resumedState(
474 state: NotCompleted,
475 proposedUpdate: R,
476 resumeMode: Int,
477 onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?,
478 idempotent: Any?
479 ): Any? = when {
480 proposedUpdate is CompletedExceptionally -> {
481 assert { idempotent == null } // there are no idempotent exceptional resumes
482 assert { onCancellation == null } // only successful results can be cancelled
483 proposedUpdate
484 }
485 !resumeMode.isCancellableMode && idempotent == null -> proposedUpdate // cannot be cancelled in process, all is fine
486 onCancellation != null || state is CancelHandler || idempotent != null ->
487 // mark as CompletedContinuation if special cases are present:
488 // Cancellation handlers that shall be called after resume or idempotent resume
489 CompletedContinuation(proposedUpdate, state as? CancelHandler, onCancellation, idempotent)
490 else -> proposedUpdate // simple case -- use the value directly
491 }
492
493 internal fun <R> resumeImpl(
494 proposedUpdate: R,
495 resumeMode: Int,
496 onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)? = null
497 ) {
498 _state.loop { state ->
499 when (state) {
500 is NotCompleted -> {
501 val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null)
502 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
503 detachChildIfNonReusable()
504 dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process
505 return // done
506 }
507
508 is CancelledContinuation -> {
509 /*
510 * If continuation was cancelled, then resume attempt must be ignored,
511 * because cancellation is asynchronous and may race with resume.
512 * Racy exceptions will be lost, too.
513 */
514 if (state.makeResumed()) { // check if trying to resume one (otherwise error)
515 // call onCancellation
516 onCancellation?.let { callOnCancellation(it, state.cause, proposedUpdate) }
517 return // done
518 }
519 }
520 }
521 alreadyResumedError(proposedUpdate) // otherwise, an error (second resume attempt)
522 }
523 }
524
525 /**
526 * Similar to [tryResume], but does not actually completes resume (needs [completeResume] call).
527 * Returns [RESUME_TOKEN] when resumed, `null` when it was already resumed or cancelled.
528 */
529 private fun <R> tryResumeImpl(
530 proposedUpdate: R,
531 idempotent: Any?,
532 onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
533 ): Symbol? {
534 _state.loop { state ->
535 when (state) {
536 is NotCompleted -> {
537 val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent)
538 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
539 detachChildIfNonReusable()
540 return RESUME_TOKEN
541 }
542 is CompletedContinuation<*> -> {
543 return if (idempotent != null && state.idempotentResume === idempotent) {
544 assert { state.result == proposedUpdate } // "Non-idempotent resume"
545 RESUME_TOKEN // resumed with the same token -- ok
546 } else {
547 null // resumed with a different token or non-idempotent -- too late
548 }
549 }
550 else -> return null // cannot resume -- not active anymore
551 }
552 }
553 }
554
555 private fun alreadyResumedError(proposedUpdate: Any?): Nothing {
556 error("Already resumed, but proposed with update $proposedUpdate")
557 }
558
559 // Unregister from parent job
560 private fun detachChildIfNonReusable() {
561 // If instance is reusable, do not detach on every reuse, #releaseInterceptedContinuation will do it for us in the end
562 if (!isReusable()) detachChild()
563 }
564
565 /**
566 * Detaches from the parent.
567 */
568 internal fun detachChild() {
569 val handle = parentHandle ?: return
570 handle.dispose()
571 _parentHandle.value = NonDisposableHandle
572 }
573
574 // Note: Always returns RESUME_TOKEN | null
575 override fun tryResume(value: T, idempotent: Any?): Any? =
576 tryResumeImpl(value, idempotent, onCancellation = null)
577
578 override fun <R : T> tryResume(
579 value: R,
580 idempotent: Any?,
581 onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
582 ): Any? =
583 tryResumeImpl(value, idempotent, onCancellation)
584
585 override fun tryResumeWithException(exception: Throwable): Any? =
586 tryResumeImpl(CompletedExceptionally(exception), idempotent = null, onCancellation = null)
587
588 // note: token is always RESUME_TOKEN
589 override fun completeResume(token: Any) {
590 assert { token === RESUME_TOKEN }
591 dispatchResume(resumeMode)
592 }
593
594 override fun CoroutineDispatcher.resumeUndispatched(value: T) {
595 val dc = delegate as? DispatchedContinuation
596 resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
597 }
598
599 override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
600 val dc = delegate as? DispatchedContinuation
601 resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
602 }
603
604 @Suppress("UNCHECKED_CAST")
605 override fun <T> getSuccessfulResult(state: Any?): T =
606 when (state) {
607 is CompletedContinuation<*> -> state.result as T
608 else -> state as T
609 }
610
611 // The exceptional state in CancellableContinuationImpl is stored directly and it is not recovered yet.
612 // The stacktrace recovery is invoked here.
613 override fun getExceptionalResult(state: Any?): Throwable? =
614 super.getExceptionalResult(state)?.let { recoverStackTrace(it, delegate) }
615
616 // For nicer debugging
617 public override fun toString(): String =
618 "${nameString()}(${delegate.toDebugString()}){$stateDebugRepresentation}@$hexAddress"
619
620 protected open fun nameString(): String =
621 "CancellableContinuation"
622
623 }
624
625 // Marker for active continuation
626 internal interface NotCompleted
627
628 private object Active : NotCompleted {
toStringnull629 override fun toString(): String = "Active"
630 }
631
632 /**
633 * Essentially the same as just a function from `Throwable?` to `Unit`.
634 * The only thing implementors can do is call [invoke].
635 * The reason this abstraction exists is to allow providing a readable [toString] in the list of completion handlers
636 * as seen from the debugger.
637 * Use [UserSupplied] to create an instance from a lambda.
638 * We can't avoid defining a separate type, because on JS, you can't inherit from a function type.
639 */
640 internal interface CancelHandler : NotCompleted {
641 /**
642 * Signals cancellation.
643 *
644 * This function:
645 * - Does not throw any exceptions.
646 * Violating this rule in an implementation leads to [handleUncaughtCoroutineException] being called with a
647 * [CompletionHandlerException] wrapping the thrown exception.
648 * - Is fast, non-blocking, and thread-safe.
649 * - Can be invoked concurrently with the surrounding code.
650 * - Can be invoked from any context.
651 *
652 * The meaning of `cause` that is passed to the handler is:
653 * - It is `null` if the continuation was cancelled directly via [CancellableContinuation.cancel] without a `cause`.
654 * - It is an instance of [CancellationException] if the continuation was _normally_ cancelled from the outside.
655 * **It should not be treated as an error**. In particular, it should not be reported to error logs.
656 * - Otherwise, the continuation had cancelled with an _error_.
657 */
658 fun invoke(cause: Throwable?)
659
660 /**
661 * A lambda passed from outside the coroutine machinery.
662 *
663 * See the requirements for [CancelHandler.invoke] when implementing this function.
664 */
665 class UserSupplied(private val handler: (cause: Throwable?) -> Unit) : CancelHandler {
666 /** @suppress */
667 override fun invoke(cause: Throwable?) { handler(cause) }
668
669 override fun toString() = "CancelHandler.UserSupplied[${handler.classSimpleName}@$hexAddress]"
670 }
671 }
672
673 // Completed with additional metadata
674 private data class CompletedContinuation<R>(
675 @JvmField val result: R,
676 // installed via `invokeOnCancellation`
677 @JvmField val cancelHandler: CancelHandler? = null,
678 // installed via the `resume` block
679 @JvmField val onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)? = null,
680 @JvmField val idempotentResume: Any? = null,
681 @JvmField val cancelCause: Throwable? = null
682 ) {
683 val cancelled: Boolean get() = cancelCause != null
684
invokeHandlersnull685 fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) {
686 cancelHandler?.let { cont.callCancelHandler(it, cause) }
687 onCancellation?.let { cont.callOnCancellation(it, cause, result) }
688 }
689 }
690
691 // Same as ChildHandleNode, but for cancellable continuation
692 private class ChildContinuation(
693 @JvmField val child: CancellableContinuationImpl<*>
694 ) : JobNode() {
695 override val onCancelling get() = true
696
invokenull697 override fun invoke(cause: Throwable?) {
698 child.parentCancelled(child.getContinuationCancellationCause(job))
699 }
700 }
701