• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 @file:Suppress("DEPRECATION_ERROR")
5 
6 package kotlinx.coroutines
7 
8 import kotlinx.atomicfu.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.intrinsics.*
11 import kotlinx.coroutines.selects.*
12 import kotlin.coroutines.*
13 import kotlin.coroutines.intrinsics.*
14 import kotlin.js.*
15 import kotlin.jvm.*
16 import kotlin.native.concurrent.*
17 
18 /**
19  * A concrete implementation of [Job]. It is optionally a child to a parent job.
20  *
21  * This is an open class designed for extension by more specific classes that might augment the
22  * state and mare store addition state information for completed jobs, like their result values.
23  *
24  * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
25  * @suppress **This is unstable API and it is subject to change.**
26  */
27 @Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
28 public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
29     final override val key: CoroutineContext.Key<*> get() = Job
30 
31     /*
32        === Internal states ===
33 
34        name       state class              public state  description
35        ------     ------------             ------------  -----------
36        EMPTY_N    EmptyNew               : New           no listeners
37        EMPTY_A    EmptyActive            : Active        no listeners
38        SINGLE     JobNode                : Active        a single listener
39        SINGLE+    JobNode                : Active        a single listener + NodeList added as its next
40        LIST_N     InactiveNodeList       : New           a list of listeners (promoted once, does not got back to EmptyNew)
41        LIST_A     NodeList               : Active        a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
42        COMPLETING Finishing              : Completing    has a list of listeners (promoted once from LIST_*)
43        CANCELLING Finishing              : Cancelling    -- " --
44        FINAL_C    Cancelled              : Cancelled     Cancelled (final state)
45        FINAL_R    <any>                  : Completed     produced some result
46 
47        === Transitions ===
48 
49            New states      Active states       Inactive states
50 
51           +---------+       +---------+                          }
52           | EMPTY_N | ----> | EMPTY_A | ----+                    } Empty states
53           +---------+       +---------+     |                    }
54                |  |           |     ^       |    +----------+
55                |  |           |     |       +--> |  FINAL_* |
56                |  |           V     |       |    +----------+
57                |  |         +---------+     |                    }
58                |  |         | SINGLE  | ----+                    } JobNode states
59                |  |         +---------+     |                    }
60                |  |              |          |                    }
61                |  |              V          |                    }
62                |  |         +---------+     |                    }
63                |  +-------> | SINGLE+ | ----+                    }
64                |            +---------+     |                    }
65                |                 |          |
66                V                 V          |
67           +---------+       +---------+     |                    }
68           | LIST_N  | ----> | LIST_A  | ----+                    } [Inactive]NodeList states
69           +---------+       +---------+     |                    }
70              |   |             |   |        |
71              |   |    +--------+   |        |
72              |   |    |            V        |
73              |   |    |    +------------+   |   +------------+   }
74              |   +-------> | COMPLETING | --+-- | CANCELLING |   } Finishing states
75              |        |    +------------+       +------------+   }
76              |        |         |                    ^
77              |        |         |                    |
78              +--------+---------+--------------------+
79 
80 
81        This state machine and its transition matrix are optimized for the common case when a job is created in active
82        state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
83        successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
84        state without going to COMPLETING state)
85 
86        Note that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
87 
88        ---------- TIMELINE of state changes and notification in Job lifecycle ----------
89 
90        | The longest possible chain of events in shown, shorter versions cut-through intermediate states,
91        |  while still performing all the notifications in this order.
92 
93          + Job object is created
94        ## NEW: state == EMPTY_NEW | is InactiveNodeList
95          + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
96          ~ waits for start
97          >> start / join / await invoked
98        ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
99          + onStart (lazy coroutine is started)
100          ~ active coroutine is working (or scheduled to execution)
101          >> childCancelled / cancelImpl invoked
102        ## CANCELLING: state is Finishing, state.rootCause != null
103         ------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancelling=true) returns NonDisposableHandle
104         ------ new children get immediately cancelled, but are still admitted to the list
105          + onCancelling
106          + notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception)
107          + cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too)
108          ~ waits for completion of coroutine body
109          >> makeCompleting / makeCompletingOnce invoked
110        ## COMPLETING: state is Finishing, state.isCompleting == true
111         ------ new children are not admitted anymore, attachChild returns NonDisposableHandle
112          ~ waits for children
113          >> last child completes
114          - computes the final exception
115        ## SEALED: state is Finishing, state.isSealed == true
116         ------ cancel/childCancelled returns false (cannot handle exceptions anymore)
117          + cancelParent (final exception is communicated to the parent, parent incorporates it)
118          + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
119        ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
120         ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
121          + parentHandle.dispose
122          + notifyCompletion (invoke all completion listeners)
123          + onCompletionInternal / onCompleted / onCancelled
124 
125        ---------------------------------------------------------------------------------
126      */
127 
128     // Note: use shared objects while we have no listeners
129     private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
130 
131     private val _parentHandle = atomic<ChildHandle?>(null)
132     internal var parentHandle: ChildHandle?
133         get() = _parentHandle.value
134         set(value) { _parentHandle.value = value }
135 
136     // ------------ initialization ------------
137 
138     /**
139      * Initializes parent job.
140      * It shall be invoked at most once after construction after all other initialization.
141      */
142     protected fun initParentJob(parent: Job?) {
143         assert { parentHandle == null }
144         if (parent == null) {
145             parentHandle = NonDisposableHandle
146             return
147         }
148         parent.start() // make sure the parent is started
149         @Suppress("DEPRECATION")
150         val handle = parent.attachChild(this)
151         parentHandle = handle
152         // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
153         if (isCompleted) {
154             handle.dispose()
155             parentHandle = NonDisposableHandle // release it just in case, to aid GC
156         }
157     }
158 
159     // ------------ state query ------------
160     /**
161      * Returns current state of this job.
162      * If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
163      * and should be [unboxed][unboxState] before returning to user code.
164      */
165     internal val state: Any? get() {
166         _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
167             if (state !is OpDescriptor) return state
168             state.perform(this)
169         }
170     }
171 
172     /**
173      * @suppress **This is unstable API and it is subject to change.**
174      */
175     private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
176         while (true) {
177             block(state)
178         }
179     }
180 
181     public override val isActive: Boolean get() {
182         val state = this.state
183         return state is Incomplete && state.isActive
184     }
185 
186     public final override val isCompleted: Boolean get() = state !is Incomplete
187 
188     public final override val isCancelled: Boolean get() {
189         val state = this.state
190         return state is CompletedExceptionally || (state is Finishing && state.isCancelling)
191     }
192 
193     // ------------ state update ------------
194 
195     // Finalizes Finishing -> Completed (terminal state) transition.
196     // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
197     // Returns final state that was created and updated to
198     private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
199         /*
200          * Note: proposed state can be Incomplete, e.g.
201          * async {
202          *     something.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
203          * }
204          */
205         assert { this.state === state } // consistency check -- it cannot change
206         assert { !state.isSealed } // consistency check -- cannot be sealed yet
207         assert { state.isCompleting } // consistency check -- must be marked as completing
208         val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
209         // Create the final exception and seal the state so that no more exceptions can be added
210         var wasCancelling = false // KLUDGE: we cannot have contract for our own expect fun synchronized
211         val finalException = synchronized(state) {
212             wasCancelling = state.isCancelling
213             val exceptions = state.sealLocked(proposedException)
214             val finalCause = getFinalRootCause(state, exceptions)
215             if (finalCause != null) addSuppressedExceptions(finalCause, exceptions)
216             finalCause
217         }
218         // Create the final state object
219         val finalState = when {
220             // was not cancelled (no exception) -> use proposed update value
221             finalException == null -> proposedUpdate
222             // small optimization when we can used proposeUpdate object as is on cancellation
223             finalException === proposedException -> proposedUpdate
224             // cancelled job final state
225             else -> CompletedExceptionally(finalException)
226         }
227         // Now handle the final exception
228         if (finalException != null) {
229             val handled = cancelParent(finalException) || handleJobException(finalException)
230             if (handled) (finalState as CompletedExceptionally).makeHandled()
231         }
232         // Process state updates for the final state before the state of the Job is actually set to the final state
233         // to avoid races where outside observer may see the job in the final state, yet exception is not handled yet.
234         if (!wasCancelling) onCancelling(finalException)
235         onCompletionInternal(finalState)
236         // Then CAS to completed state -> it must succeed
237         val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete())
238         assert { casSuccess }
239         // And process all post-completion actions
240         completeStateFinalization(state, finalState)
241         return finalState
242     }
243 
244     private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
245         // A case of no exceptions
246         if (exceptions.isEmpty()) {
247             // materialize cancellation exception if it was not materialized yet
248             if (state.isCancelling) return defaultCancellationException()
249             return null
250         }
251         /*
252          * 1) If we have non-CE, use it as root cause
253          * 2) If our original cause was TCE, use *non-original* TCE because of the special nature of TCE
254          *    * It is a CE, so it's not reported by children
255          *    * The first instance (cancellation cause) is created by timeout coroutine and has no meaningful stacktrace
256          *    * The potential second instance is thrown by withTimeout lexical block itself, then it has recovered stacktrace
257          * 3) Just return the very first CE
258          */
259         val firstNonCancellation = exceptions.firstOrNull { it !is CancellationException }
260         if (firstNonCancellation != null) return firstNonCancellation
261         val first = exceptions[0]
262         if (first is TimeoutCancellationException) {
263             val detailedTimeoutException = exceptions.firstOrNull { it !== first && it is TimeoutCancellationException }
264             if (detailedTimeoutException != null) return detailedTimeoutException
265         }
266         return first
267     }
268 
269     private fun addSuppressedExceptions(rootCause: Throwable, exceptions: List<Throwable>) {
270         if (exceptions.size <= 1) return // nothing more to do here
271         val seenExceptions = identitySet<Throwable>(exceptions.size)
272         /*
273          * Note that root cause may be a recovered exception as well.
274          * To avoid cycles we unwrap the root cause and check for self-suppression against unwrapped cause,
275          * but add suppressed exceptions to the recovered root cause (as it is our final exception)
276          */
277         val unwrappedCause = unwrap(rootCause)
278         for (exception in exceptions) {
279             val unwrapped = unwrap(exception)
280             if (unwrapped !== rootCause && unwrapped !== unwrappedCause &&
281                 unwrapped !is CancellationException && seenExceptions.add(unwrapped)) {
282                 rootCause.addSuppressedThrowable(unwrapped)
283             }
284         }
285     }
286 
287     // fast-path method to finalize normally completed coroutines without children
288     // returns true if complete, and afterCompletion(update) shall be called
289     private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
290         assert { state is Empty || state is JobNode } // only simple state without lists where children can concurrently add
291         assert { update !is CompletedExceptionally } // only for normal completion
292         if (!_state.compareAndSet(state, update.boxIncomplete())) return false
293         onCancelling(null) // simple state is not a failure
294         onCompletionInternal(update)
295         completeStateFinalization(state, update)
296         return true
297     }
298 
299     // suppressed == true when any exceptions were suppressed while building the final completion cause
300     private fun completeStateFinalization(state: Incomplete, update: Any?) {
301         /*
302          * Now the job in THE FINAL state. We need to properly handle the resulting state.
303          * Order of various invocations here is important.
304          *
305          * 1) Unregister from parent job.
306          */
307         parentHandle?.let {
308             it.dispose() // volatile read parentHandle _after_ state was updated
309             parentHandle = NonDisposableHandle // release it just in case, to aid GC
310         }
311         val cause = (update as? CompletedExceptionally)?.cause
312         /*
313          * 2) Invoke completion handlers: .join(), callbacks etc.
314          *    It's important to invoke them only AFTER exception handling and everything else, see #208
315          */
316         if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
317             try {
318                 state.invoke(cause)
319             } catch (ex: Throwable) {
320                 handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
321             }
322         } else {
323             state.list?.notifyCompletion(cause)
324         }
325     }
326 
327     private fun notifyCancelling(list: NodeList, cause: Throwable) {
328         // first cancel our own children
329         onCancelling(cause)
330         notifyHandlers<JobCancellingNode>(list, cause)
331         // then cancel parent
332         cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
333     }
334 
335     /**
336      * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
337      * Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
338      *
339      * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
340      * may leak to the [CoroutineExceptionHandler].
341      */
342     private fun cancelParent(cause: Throwable): Boolean {
343         // Is scoped coroutine -- don't propagate, will be rethrown
344         if (isScopedCoroutine) return true
345 
346         /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
347          * This allow parent to cancel its children (normally) without being cancelled itself, unless
348          * child crashes and produce some other exception during its completion.
349          */
350         val isCancellation = cause is CancellationException
351         val parent = parentHandle
352         // No parent -- ignore CE, report other exceptions.
353         if (parent === null || parent === NonDisposableHandle) {
354             return isCancellation
355         }
356 
357         // Notify parent but don't forget to check cancellation
358         return parent.childCancelled(cause) || isCancellation
359     }
360 
361     private fun NodeList.notifyCompletion(cause: Throwable?) =
362         notifyHandlers<JobNode>(this, cause)
363 
364     private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
365         var exception: Throwable? = null
366         list.forEach<T> { node ->
367             try {
368                 node.invoke(cause)
369             } catch (ex: Throwable) {
370                 exception?.apply { addSuppressedThrowable(ex) } ?: run {
371                     exception =  CompletionHandlerException("Exception in completion handler $node for $this", ex)
372                 }
373             }
374         }
375         exception?.let { handleOnCompletionException(it) }
376     }
377 
378     public final override fun start(): Boolean {
379         loopOnState { state ->
380             when (startInternal(state)) {
381                 FALSE -> return false
382                 TRUE -> return true
383             }
384         }
385     }
386 
387     // returns: RETRY/FALSE/TRUE:
388     //   FALSE when not new,
389     //   TRUE  when started
390     //   RETRY when need to retry
391     private fun startInternal(state: Any?): Int {
392         when (state) {
393             is Empty -> { // EMPTY_X state -- no completion handlers
394                 if (state.isActive) return FALSE // already active
395                 if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
396                 onStart()
397                 return TRUE
398             }
399             is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
400                 if (!_state.compareAndSet(state, state.list)) return RETRY
401                 onStart()
402                 return TRUE
403             }
404             else -> return FALSE // not a new state
405         }
406     }
407 
408     /**
409      * Override to provide the actual [start] action.
410      * This function is invoked exactly once when non-active coroutine is [started][start].
411      */
412     protected open fun onStart() {}
413 
414     public final override fun getCancellationException(): CancellationException =
415         when (val state = this.state) {
416             is Finishing -> state.rootCause?.toCancellationException("$classSimpleName is cancelling")
417                 ?: error("Job is still new or active: $this")
418             is Incomplete -> error("Job is still new or active: $this")
419             is CompletedExceptionally -> state.cause.toCancellationException()
420             else -> JobCancellationException("$classSimpleName has completed normally", null, this)
421         }
422 
423     protected fun Throwable.toCancellationException(message: String? = null): CancellationException =
424         this as? CancellationException ?: defaultCancellationException(message, this)
425 
426     /**
427      * Returns the cause that signals the completion of this job -- it returns the original
428      * [cancel] cause, [CancellationException] or **`null` if this job had completed normally**.
429      * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
430      * is being cancelled yet.
431      */
432     protected val completionCause: Throwable?
433         get() = when (val state = state) {
434             is Finishing -> state.rootCause
435                 ?: error("Job is still new or active: $this")
436             is Incomplete -> error("Job is still new or active: $this")
437             is CompletedExceptionally -> state.cause
438             else -> null
439         }
440 
441     /**
442      * Returns `true` when [completionCause] exception was handled by parent coroutine.
443      */
444     protected val completionCauseHandled: Boolean
445         get() = state.let { it is CompletedExceptionally && it.handled }
446 
447     @Suppress("OverridingDeprecatedMember")
448     public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
449         invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler)
450 
451     public final override fun invokeOnCompletion(
452         onCancelling: Boolean,
453         invokeImmediately: Boolean,
454         handler: CompletionHandler
455     ): DisposableHandle {
456         // Create node upfront -- for common cases it just initializes JobNode.job field,
457         // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
458         val node: JobNode = makeNode(handler, onCancelling)
459         loopOnState { state ->
460             when (state) {
461                 is Empty -> { // EMPTY_X state -- no completion handlers
462                     if (state.isActive) {
463                         // try move to SINGLE state
464                         if (_state.compareAndSet(state, node)) return node
465                     } else
466                         promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
467                 }
468                 is Incomplete -> {
469                     val list = state.list
470                     if (list == null) { // SINGLE/SINGLE+
471                         promoteSingleToNodeList(state as JobNode)
472                     } else {
473                         var rootCause: Throwable? = null
474                         var handle: DisposableHandle = NonDisposableHandle
475                         if (onCancelling && state is Finishing) {
476                             synchronized(state) {
477                                 // check if we are installing cancellation handler on job that is being cancelled
478                                 rootCause = state.rootCause // != null if cancelling job
479                                 // We add node to the list in two cases --- either the job is not being cancelled
480                                 // or we are adding a child to a coroutine that is not completing yet
481                                 if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
482                                     // Note: add node the list while holding lock on state (make sure it cannot change)
483                                     if (!addLastAtomic(state, list, node)) return@loopOnState // retry
484                                     // just return node if we don't have to invoke handler (not cancelling yet)
485                                     if (rootCause == null) return node
486                                     // otherwise handler is invoked immediately out of the synchronized section & handle returned
487                                     handle = node
488                                 }
489                             }
490                         }
491                         if (rootCause != null) {
492                             // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
493                             if (invokeImmediately) handler.invokeIt(rootCause)
494                             return handle
495                         } else {
496                             if (addLastAtomic(state, list, node)) return node
497                         }
498                     }
499                 }
500                 else -> { // is complete
501                     // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
502                     // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
503                     if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
504                     return NonDisposableHandle
505                 }
506             }
507         }
508     }
509 
510     private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode {
511         val node = if (onCancelling) {
512             (handler as? JobCancellingNode)
513                 ?: InvokeOnCancelling(handler)
514         } else {
515             (handler as? JobNode)
516                 ?.also { assert { it !is JobCancellingNode } }
517                 ?: InvokeOnCompletion(handler)
518         }
519         node.job = this
520         return node
521     }
522 
523     private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
524         list.addLastIf(node) { this.state === expect }
525 
526     private fun promoteEmptyToNodeList(state: Empty) {
527         // try to promote it to LIST state with the corresponding state
528         val list = NodeList()
529         val update = if (state.isActive) list else InactiveNodeList(list)
530         _state.compareAndSet(state, update)
531     }
532 
533     private fun promoteSingleToNodeList(state: JobNode) {
534         // try to promote it to list (SINGLE+ state)
535         state.addOneIfEmpty(NodeList())
536         // it must be in SINGLE+ state or state has changed (node could have need removed from state)
537         val list = state.nextNode // either our NodeList or somebody else won the race, updated state
538         // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
539         _state.compareAndSet(state, list)
540     }
541 
542     public final override suspend fun join() {
543         if (!joinInternal()) { // fast-path no wait
544             coroutineContext.ensureActive()
545             return // do not suspend
546         }
547         return joinSuspend() // slow-path wait
548     }
549 
550     private fun joinInternal(): Boolean {
551         loopOnState { state ->
552             if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
553             if (startInternal(state) >= 0) return true // wait unless need to retry
554         }
555     }
556 
557     private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
558         // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
559         cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont).asHandler))
560     }
561 
562     public final override val onJoin: SelectClause0
563         get() = this
564 
565     // registerSelectJoin
566     public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
567         // fast-path -- check state and select/return if needed
568         loopOnState { state ->
569             if (select.isSelected) return
570             if (state !is Incomplete) {
571                 // already complete -- select result
572                 if (select.trySelect()) {
573                     block.startCoroutineUnintercepted(select.completion)
574                 }
575                 return
576             }
577             if (startInternal(state) == 0) {
578                 // slow-path -- register waiter for completion
579                 select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(select, block).asHandler))
580                 return
581             }
582         }
583     }
584 
585     /**
586      * @suppress **This is unstable API and it is subject to change.**
587      */
588     internal fun removeNode(node: JobNode) {
589         // remove logic depends on the state of the job
590         loopOnState { state ->
591             when (state) {
592                 is JobNode -> { // SINGE/SINGLE+ state -- one completion handler
593                     if (state !== node) return // a different job node --> we were already removed
594                     // try remove and revert back to empty state
595                     if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
596                 }
597                 is Incomplete -> { // may have a list of completion handlers
598                     // remove node from the list if there is a list
599                     if (state.list != null) node.remove()
600                     return
601                 }
602                 else -> return // it is complete and does not have any completion handlers
603             }
604         }
605     }
606 
607     /**
608      * Returns `true` for job that do not have "body block" to complete and should immediately go into
609      * completing state and start waiting for children.
610      *
611      * @suppress **This is unstable API and it is subject to change.**
612      */
613     internal open val onCancelComplete: Boolean get() = false
614 
615     // external cancel with cause, never invoked implicitly from internal machinery
616     public override fun cancel(cause: CancellationException?) {
617         cancelInternal(cause ?: defaultCancellationException())
618     }
619 
620     protected open fun cancellationExceptionMessage(): String = "Job was cancelled"
621 
622     // HIDDEN in Job interface. Invoked only by legacy compiled code.
623     // external cancel with (optional) cause, never invoked implicitly from internal machinery
624     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Added since 1.2.0 for binary compatibility with versions <= 1.1.x")
625     public override fun cancel(cause: Throwable?): Boolean {
626         cancelInternal(cause?.toCancellationException() ?: defaultCancellationException())
627         return true
628     }
629 
630     // It is overridden in channel-linked implementation
631     public open fun cancelInternal(cause: Throwable) {
632         cancelImpl(cause)
633     }
634 
635     // Parent is cancelling child
636     public final override fun parentCancelled(parentJob: ParentJob) {
637         cancelImpl(parentJob)
638     }
639 
640     /**
641      * Child was cancelled with a cause.
642      * In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
643      * It is overridden in supervisor implementations to completely ignore any child cancellation.
644      * Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
645      *
646      * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
647      * may leak to the [CoroutineExceptionHandler].
648      */
649     public open fun childCancelled(cause: Throwable): Boolean {
650         if (cause is CancellationException) return true
651         return cancelImpl(cause) && handlesException
652     }
653 
654     /**
655      * Makes this [Job] cancelled with a specified [cause].
656      * It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
657      */
658     public fun cancelCoroutine(cause: Throwable?): Boolean = cancelImpl(cause)
659 
660     // cause is Throwable or ParentJob when cancelChild was invoked
661     // returns true is exception was handled, false otherwise
662     internal fun cancelImpl(cause: Any?): Boolean {
663         var finalState: Any? = COMPLETING_ALREADY
664         if (onCancelComplete) {
665             // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
666             // completing and had recorded exception
667             finalState = cancelMakeCompleting(cause)
668             if (finalState === COMPLETING_WAITING_CHILDREN) return true
669         }
670         if (finalState === COMPLETING_ALREADY) {
671             finalState = makeCancelling(cause)
672         }
673         return when {
674             finalState === COMPLETING_ALREADY -> true
675             finalState === COMPLETING_WAITING_CHILDREN -> true
676             finalState === TOO_LATE_TO_CANCEL -> false
677             else -> {
678                 afterCompletion(finalState)
679                 true
680             }
681         }
682     }
683 
684     // cause is Throwable or ParentJob when cancelChild was invoked
685     // It contains a loop and never returns COMPLETING_RETRY, can return
686     // COMPLETING_ALREADY -- if already completed/completing
687     // COMPLETING_WAITING_CHILDREN -- if started waiting for children
688     // final state -- when completed, for call to afterCompletion
689     private fun cancelMakeCompleting(cause: Any?): Any? {
690         loopOnState { state ->
691             if (state !is Incomplete || state is Finishing && state.isCompleting) {
692                 // already completed/completing, do not even create exception to propose update
693                 return COMPLETING_ALREADY
694             }
695             val proposedUpdate = CompletedExceptionally(createCauseException(cause))
696             val finalState = tryMakeCompleting(state, proposedUpdate)
697             if (finalState !== COMPLETING_RETRY) return finalState
698         }
699     }
700 
701     @Suppress("NOTHING_TO_INLINE") // Save a stack frame
702     internal inline fun defaultCancellationException(message: String? = null, cause: Throwable? = null) =
703         JobCancellationException(message ?: cancellationExceptionMessage(), cause, this)
704 
705     override fun getChildJobCancellationCause(): CancellationException {
706         // determine root cancellation cause of this job (why is it cancelling its children?)
707         val state = this.state
708         val rootCause = when (state) {
709             is Finishing -> state.rootCause
710             is CompletedExceptionally -> state.cause
711             is Incomplete -> error("Cannot be cancelling child in this state: $state")
712             else -> null // create exception with the below code on normal completion
713         }
714         return (rootCause as? CancellationException) ?: JobCancellationException("Parent job is ${stateString(state)}", rootCause, this)
715     }
716 
717     // cause is Throwable or ParentJob when cancelChild was invoked
718     private fun createCauseException(cause: Any?): Throwable = when (cause) {
719         is Throwable? -> cause ?: defaultCancellationException()
720         else -> (cause as ParentJob).getChildJobCancellationCause()
721     }
722 
723     // transitions to Cancelling state
724     // cause is Throwable or ParentJob when cancelChild was invoked
725     // It contains a loop and never returns COMPLETING_RETRY, can return
726     // COMPLETING_ALREADY -- if already completing or successfully made cancelling, added exception
727     // COMPLETING_WAITING_CHILDREN -- if started waiting for children, added exception
728     // TOO_LATE_TO_CANCEL -- too late to cancel, did not add exception
729     // final state -- when completed, for call to afterCompletion
730     private fun makeCancelling(cause: Any?): Any? {
731         var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
732         loopOnState { state ->
733             when (state) {
734                 is Finishing -> { // already finishing -- collect exceptions
735                     val notifyRootCause = synchronized(state) {
736                         if (state.isSealed) return TOO_LATE_TO_CANCEL // already sealed -- cannot add exception nor mark cancelled
737                         // add exception, do nothing is parent is cancelling child that is already being cancelled
738                         val wasCancelling = state.isCancelling // will notify if was not cancelling
739                         // Materialize missing exception if it is the first exception (otherwise -- don't)
740                         if (cause != null || !wasCancelling) {
741                             val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
742                             state.addExceptionLocked(causeException)
743                         }
744                         // take cause for notification if was not in cancelling state before
745                         state.rootCause.takeIf { !wasCancelling }
746                     }
747                     notifyRootCause?.let { notifyCancelling(state.list, it) }
748                     return COMPLETING_ALREADY
749                 }
750                 is Incomplete -> {
751                     // Not yet finishing -- try to make it cancelling
752                     val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
753                     if (state.isActive) {
754                         // active state becomes cancelling
755                         if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY
756                     } else {
757                         // non active state starts completing
758                         val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException))
759                         when {
760                             finalState === COMPLETING_ALREADY -> error("Cannot happen in $state")
761                             finalState === COMPLETING_RETRY -> return@loopOnState
762                             else -> return finalState
763                         }
764                     }
765                 }
766                 else -> return TOO_LATE_TO_CANCEL // already complete
767             }
768         }
769     }
770 
771     // Performs promotion of incomplete coroutine state to NodeList for the purpose of
772     // converting coroutine state to Cancelling, returns null when need to retry
773     private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:
774         when (state) {
775             is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state
776             is JobNode -> {
777                 // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
778                 // correctly capture a reference to it
779                 promoteSingleToNodeList(state)
780                 null // retry
781             }
782             else -> error("State should have list: $state")
783         }
784 
785     // try make new Cancelling state on the condition that we're still in the expected state
786     private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
787         assert { state !is Finishing } // only for non-finishing states
788         assert { state.isActive } // only for active states
789         // get state's list or else promote to list to correctly operate on child lists
790         val list = getOrPromoteCancellingList(state) ?: return false
791         // Create cancelling state (with rootCause!)
792         val cancelling = Finishing(list, false, rootCause)
793         if (!_state.compareAndSet(state, cancelling)) return false
794         // Notify listeners
795         notifyCancelling(list, rootCause)
796         return true
797     }
798 
799     /**
800      * Completes this job. Used by [CompletableDeferred.complete] (and exceptionally)
801      * and by [JobImpl.cancel]. It returns `false` on repeated invocation
802      * (when this job is already completing).
803      */
804     internal fun makeCompleting(proposedUpdate: Any?): Boolean {
805         loopOnState { state ->
806             val finalState = tryMakeCompleting(state, proposedUpdate)
807             when {
808                 finalState === COMPLETING_ALREADY -> return false
809                 finalState === COMPLETING_WAITING_CHILDREN -> return true
810                 finalState === COMPLETING_RETRY -> return@loopOnState
811                 else -> {
812                     afterCompletion(finalState)
813                     return true
814                 }
815             }
816         }
817     }
818 
819     /**
820      * Completes this job. Used by [AbstractCoroutine.resume].
821      * It throws [IllegalStateException] on repeated invocation (when this job is already completing).
822      * Returns:
823      * * [COMPLETING_WAITING_CHILDREN] if started waiting for children.
824      * * Final state otherwise (caller should do [afterCompletion])
825      */
826     internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
827         loopOnState { state ->
828             val finalState = tryMakeCompleting(state, proposedUpdate)
829             when {
830                 finalState === COMPLETING_ALREADY ->
831                     throw IllegalStateException(
832                         "Job $this is already complete or completing, " +
833                             "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
834                     )
835                 finalState === COMPLETING_RETRY -> return@loopOnState
836                 else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
837             }
838         }
839     }
840 
841     // Returns one of COMPLETING symbols or final state:
842     // COMPLETING_ALREADY -- when already complete or completing
843     // COMPLETING_RETRY -- when need to retry due to interference
844     // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
845     // final state -- when completed, for call to afterCompletion
846     private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
847         if (state !is Incomplete)
848             return COMPLETING_ALREADY
849         /*
850          * FAST PATH -- no children to wait for && simple state (no list) && not cancelling => can complete immediately
851          * Cancellation (failures) always have to go through Finishing state to serialize exception handling.
852          * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
853          * which may miss unhandled exception.
854          */
855         if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
856             if (tryFinalizeSimpleState(state, proposedUpdate)) {
857                 // Completed successfully on fast path -- return updated state
858                 return proposedUpdate
859             }
860             return COMPLETING_RETRY
861         }
862         // The separate slow-path function to simplify profiling
863         return tryMakeCompletingSlowPath(state, proposedUpdate)
864     }
865 
866     // Returns one of COMPLETING symbols or final state:
867     // COMPLETING_ALREADY -- when already complete or completing
868     // COMPLETING_RETRY -- when need to retry due to interference
869     // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
870     // final state -- when completed, for call to afterCompletion
871     private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
872         // get state's list or else promote to list to correctly operate on child lists
873         val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
874         // promote to Finishing state if we are not in it yet
875         // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
876         // atomically transition to finishing & completing state
877         val finishing = state as? Finishing ?: Finishing(list, false, null)
878         // must synchronize updates to finishing state
879         var notifyRootCause: Throwable? = null
880         synchronized(finishing) {
881             // check if this state is already completing
882             if (finishing.isCompleting) return COMPLETING_ALREADY
883             // mark as completing
884             finishing.isCompleting = true
885             // if we need to promote to finishing then atomically do it here.
886             // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
887             // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
888             if (finishing !== state) {
889                 if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
890             }
891             // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
892             assert { !finishing.isSealed } // cannot be sealed
893             // add new proposed exception to the finishing state
894             val wasCancelling = finishing.isCancelling
895             (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
896             // If it just becomes cancelling --> must process cancelling notifications
897             notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
898         }
899         // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
900         notifyRootCause?.let { notifyCancelling(list, it) }
901         // now wait for children
902         val child = firstChild(state)
903         if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
904             return COMPLETING_WAITING_CHILDREN
905         // otherwise -- we have not children left (all were already cancelled?)
906         return finalizeFinishingState(finishing, proposedUpdate)
907     }
908 
909     private val Any?.exceptionOrNull: Throwable?
910         get() = (this as? CompletedExceptionally)?.cause
911 
912     private fun firstChild(state: Incomplete) =
913         state as? ChildHandleNode ?: state.list?.nextChild()
914 
915     // return false when there is no more incomplete children to wait
916     // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
917     private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
918         val handle = child.childJob.invokeOnCompletion(
919             invokeImmediately = false,
920             handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
921         )
922         if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
923         val nextChild = child.nextChild() ?: return false
924         return tryWaitForChild(state, nextChild, proposedUpdate)
925     }
926 
927     // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
928     private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
929         assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
930         // figure out if we need to wait for next child
931         val waitChild = lastChild.nextChild()
932         // try wait for next child
933         if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
934         // no more children to wait -- try update state
935         val finalState = finalizeFinishingState(state, proposedUpdate)
936         afterCompletion(finalState)
937     }
938 
939     private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? {
940         var cur = this
941         while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
942         while (true) {
943             cur = cur.nextNode
944             if (cur.isRemoved) continue
945             if (cur is ChildHandleNode) return cur
946             if (cur is NodeList) return null // checked all -- no more children
947         }
948     }
949 
950     public final override val children: Sequence<Job> get() = sequence {
951         when (val state = this@JobSupport.state) {
952             is ChildHandleNode -> yield(state.childJob)
953             is Incomplete -> state.list?.let { list ->
954                 list.forEach<ChildHandleNode> { yield(it.childJob) }
955             }
956         }
957     }
958 
959     @Suppress("OverridingDeprecatedMember")
960     public final override fun attachChild(child: ChildJob): ChildHandle {
961         /*
962          * Note: This function attaches a special ChildHandleNode node object. This node object
963          * is handled in a special way on completion on the coroutine (we wait for all of them) and
964          * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
965          * if the job is already cancelling. For cancelling state child is attached under state lock.
966          * It's required to properly wait all children before completion and provide linearizable hierarchy view:
967          * If child is attached when the job is already being cancelled, such child will receive immediate notification on
968          * cancellation, but parent *will* wait for that child before completion and will handle its exception.
969          */
970         return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(child).asHandler) as ChildHandle
971     }
972 
973     /**
974      * Override to process any exceptions that were encountered while invoking completion handlers
975      * installed via [invokeOnCompletion].
976      *
977      * @suppress **This is unstable API and it is subject to change.**
978      */
979     internal open fun handleOnCompletionException(exception: Throwable) {
980         throw exception
981     }
982 
983     /**
984      * This function is invoked once as soon as this job is being cancelled for any reason or completes,
985      * similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
986      *
987      * The meaning of [cause] parameter:
988      * * Cause is `null` when the job has completed normally.
989      * * Cause is an instance of [CancellationException] when the job was cancelled _normally_.
990      *   **It should not be treated as an error**. In particular, it should not be reported to error logs.
991      * * Otherwise, the job had been cancelled or failed with exception.
992      *
993      * The specified [cause] is not the final cancellation cause of this job.
994      * A job may produce other exceptions while it is failing and the final cause might be different.
995      *
996      * @suppress **This is unstable API and it is subject to change.*
997      */
998     protected open fun onCancelling(cause: Throwable?) {}
999 
1000     /**
1001      * Returns `true` for scoped coroutines.
1002      * Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
1003      * Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
1004      * Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
1005      */
1006     protected open val isScopedCoroutine: Boolean get() = false
1007 
1008     /**
1009      * Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal].
1010      * A valid implementation of this getter should recursively check parent as well before returning `false`.
1011      *
1012      * The only instance of the [Job] that does not handle its exceptions is [JobImpl] and its subclass [SupervisorJobImpl].
1013      * @suppress **This is unstable API and it is subject to change.*
1014      */
1015     internal open val handlesException: Boolean get() = true
1016 
1017     /**
1018      * Handles the final job [exception] that was not handled by the parent coroutine.
1019      * Returns `true` if it handles exception (so handling at later stages is not needed).
1020      * It is designed to be overridden by launch-like coroutines
1021      * (`StandaloneCoroutine` and `ActorCoroutine`) that don't have a result type
1022      * that can represent exceptions.
1023      *
1024      * This method is invoked **exactly once** when the final exception of the job is determined
1025      * and before it becomes complete. At the moment of invocation the job and all its children are complete.
1026      */
1027     protected open fun handleJobException(exception: Throwable): Boolean = false
1028 
1029     /**
1030      * Override for completion actions that need to update some external object depending on job's state,
1031      * right before all the waiters for coroutine's completion are notified.
1032      *
1033      * @param state the final state.
1034      *
1035      * @suppress **This is unstable API and it is subject to change.**
1036      */
1037     protected open fun onCompletionInternal(state: Any?) {}
1038 
1039     /**
1040      * Override for the very last action on job's completion to resume the rest of the code in
1041      * scoped coroutines. It is called when this job is externally completed in an unknown
1042      * context and thus should resume with a default mode.
1043      *
1044      * @suppress **This is unstable API and it is subject to change.**
1045      */
1046     protected open fun afterCompletion(state: Any?) {}
1047 
1048     // for nicer debugging
1049     public override fun toString(): String =
1050         "${toDebugString()}@$hexAddress"
1051 
1052     @InternalCoroutinesApi
1053     public fun toDebugString(): String = "${nameString()}{${stateString(state)}}"
1054 
1055     /**
1056      * @suppress **This is unstable API and it is subject to change.**
1057      */
1058     internal open fun nameString(): String = classSimpleName
1059 
1060     private fun stateString(state: Any?): String = when (state) {
1061         is Finishing -> when {
1062             state.isCancelling -> "Cancelling"
1063             state.isCompleting -> "Completing"
1064             else -> "Active"
1065         }
1066         is Incomplete -> if (state.isActive) "Active" else "New"
1067         is CompletedExceptionally -> "Cancelled"
1068         else -> "Completed"
1069     }
1070 
1071     // Completing & Cancelling states,
1072     // All updates are guarded by synchronized(this), reads are volatile
1073     @Suppress("UNCHECKED_CAST")
1074     private class Finishing(
1075         override val list: NodeList,
1076         isCompleting: Boolean,
1077         rootCause: Throwable?
1078     ) : SynchronizedObject(), Incomplete {
1079         private val _isCompleting = atomic(isCompleting)
1080         var isCompleting: Boolean
1081             get() = _isCompleting.value
1082             set(value) { _isCompleting.value = value }
1083 
1084         private val _rootCause = atomic(rootCause)
1085         var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
1086             get() = _rootCause.value
1087             set(value) { _rootCause.value = value }
1088 
1089         private val _exceptionsHolder = atomic<Any?>(null)
1090         private var exceptionsHolder: Any? // Contains null | Throwable | ArrayList | SEALED
1091             get() = _exceptionsHolder.value
1092             set(value) { _exceptionsHolder.value = value }
1093 
1094         // Note: cannot be modified when sealed
1095         val isSealed: Boolean get() = exceptionsHolder === SEALED
1096         val isCancelling: Boolean get() = rootCause != null
1097         override val isActive: Boolean get() = rootCause == null // !isCancelling
1098 
1099         // Seals current state and returns list of exceptions
1100         // guarded by `synchronized(this)`
1101         fun sealLocked(proposedException: Throwable?): List<Throwable> {
1102             val list = when(val eh = exceptionsHolder) { // volatile read
1103                 null -> allocateList()
1104                 is Throwable -> allocateList().also { it.add(eh) }
1105                 is ArrayList<*> -> eh as ArrayList<Throwable>
1106                 else -> error("State is $eh") // already sealed -- cannot happen
1107             }
1108             val rootCause = this.rootCause // volatile read
1109             rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
1110             if (proposedException != null && proposedException != rootCause) list.add(proposedException)
1111             exceptionsHolder = SEALED
1112             return list
1113         }
1114 
1115         // guarded by `synchronized(this)`
1116         fun addExceptionLocked(exception: Throwable) {
1117             val rootCause = this.rootCause // volatile read
1118             if (rootCause == null) {
1119                 this.rootCause = exception
1120                 return
1121             }
1122             if (exception === rootCause) return // nothing to do
1123             when (val eh = exceptionsHolder) { // volatile read
1124                 null -> exceptionsHolder = exception
1125                 is Throwable -> {
1126                     if (exception === eh) return // nothing to do
1127                     exceptionsHolder = allocateList().apply {
1128                         add(eh)
1129                         add(exception)
1130 
1131                     }
1132                 }
1133                 is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
1134                 else -> error("State is $eh") // already sealed -- cannot happen
1135             }
1136         }
1137 
1138         private fun allocateList() = ArrayList<Throwable>(4)
1139 
1140         override fun toString(): String =
1141             "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$exceptionsHolder, list=$list]"
1142     }
1143 
1144     private val Incomplete.isCancelling: Boolean
1145         get() = this is Finishing && isCancelling
1146 
1147     // Used by parent that is waiting for child completion
1148     private class ChildCompletion(
1149         private val parent: JobSupport,
1150         private val state: Finishing,
1151         private val child: ChildHandleNode,
1152         private val proposedUpdate: Any?
1153     ) : JobNode() {
1154         override fun invoke(cause: Throwable?) {
1155             parent.continueCompleting(state, child, proposedUpdate)
1156         }
1157     }
1158 
1159     private class AwaitContinuation<T>(
1160         delegate: Continuation<T>,
1161         private val job: JobSupport
1162     ) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
1163         override fun getContinuationCancellationCause(parent: Job): Throwable {
1164             val state = job.state
1165             /*
1166              * When the job we are waiting for had already completely completed exceptionally or
1167              * is failing, we shall use its root/completion cause for await's result.
1168              */
1169             if (state is Finishing) state.rootCause?.let { return it }
1170             if (state is CompletedExceptionally) return state.cause
1171             return parent.getCancellationException()
1172         }
1173 
1174         protected override fun nameString(): String =
1175             "AwaitContinuation"
1176     }
1177 
1178     /*
1179      * =================================================================================================
1180      * This is ready-to-use implementation for Deferred interface.
1181      * However, it is not type-safe. Conceptually it just exposes the value of the underlying
1182      * completed state as `Any?`
1183      * =================================================================================================
1184      */
1185 
1186     public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
1187 
1188     public fun getCompletionExceptionOrNull(): Throwable? {
1189         val state = this.state
1190         check(state !is Incomplete) { "This job has not completed yet" }
1191         return state.exceptionOrNull
1192     }
1193 
1194     /**
1195      * @suppress **This is unstable API and it is subject to change.**
1196      */
1197     internal fun getCompletedInternal(): Any? {
1198         val state = this.state
1199         check(state !is Incomplete) { "This job has not completed yet" }
1200         if (state is CompletedExceptionally) throw state.cause
1201         return state.unboxState()
1202     }
1203 
1204     /**
1205      * @suppress **This is unstable API and it is subject to change.**
1206      */
1207     internal suspend fun awaitInternal(): Any? {
1208         // fast-path -- check state (avoid extra object creation)
1209         while (true) { // lock-free loop on state
1210             val state = this.state
1211             if (state !is Incomplete) {
1212                 // already complete -- just return result
1213                 if (state is CompletedExceptionally) { // Slow path to recover stacktrace
1214                     recoverAndThrow(state.cause)
1215                 }
1216                 return state.unboxState()
1217 
1218             }
1219             if (startInternal(state) >= 0) break // break unless needs to retry
1220         }
1221         return awaitSuspend() // slow-path
1222     }
1223 
1224     private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
1225         /*
1226          * Custom code here, so that parent coroutine that is using await
1227          * on its child deferred (async) coroutine would throw the exception that this child had
1228          * thrown and not a JobCancellationException.
1229          */
1230         val cont = AwaitContinuation(uCont.intercepted(), this)
1231         // we are mimicking suspendCancellableCoroutine here and call initCancellability, too.
1232         cont.initCancellability()
1233         cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
1234         cont.getResult()
1235     }
1236 
1237     /**
1238      * @suppress **This is unstable API and it is subject to change.**
1239      */
1240     // registerSelectAwaitInternal
1241     @Suppress("UNCHECKED_CAST")
1242     internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) {
1243         // fast-path -- check state and select/return if needed
1244         loopOnState { state ->
1245             if (select.isSelected) return
1246             if (state !is Incomplete) {
1247                 // already complete -- select result
1248                 if (select.trySelect()) {
1249                     if (state is CompletedExceptionally) {
1250                         select.resumeSelectWithException(state.cause)
1251                     }
1252                     else {
1253                         block.startCoroutineUnintercepted(state.unboxState() as T, select.completion)
1254                     }
1255                 }
1256                 return
1257             }
1258             if (startInternal(state) == 0) {
1259                 // slow-path -- register waiter for completion
1260                 select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(select, block).asHandler))
1261                 return
1262             }
1263         }
1264     }
1265 
1266     /**
1267      * @suppress **This is unstable API and it is subject to change.**
1268      */
1269     @Suppress("UNCHECKED_CAST")
1270     internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
1271         val state = this.state
1272         // Note: await is non-atomic (can be cancelled while dispatched)
1273         if (state is CompletedExceptionally)
1274             select.resumeSelectWithException(state.cause)
1275         else
1276             block.startCoroutineCancellable(state.unboxState() as T, select.completion)
1277     }
1278 }
1279 
1280 /*
1281  * Class to represent object as the final state of the Job
1282  */
1283 private class IncompleteStateBox(@JvmField val state: Incomplete)
boxIncompletenull1284 internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
1285 internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
1286 
1287 // --------------- helper classes & constants for job implementation
1288 
1289 @SharedImmutable
1290 private val COMPLETING_ALREADY = Symbol("COMPLETING_ALREADY")
1291 @JvmField
1292 @SharedImmutable
1293 internal val COMPLETING_WAITING_CHILDREN = Symbol("COMPLETING_WAITING_CHILDREN")
1294 @SharedImmutable
1295 private val COMPLETING_RETRY = Symbol("COMPLETING_RETRY")
1296 @SharedImmutable
1297 private val TOO_LATE_TO_CANCEL = Symbol("TOO_LATE_TO_CANCEL")
1298 
1299 private const val RETRY = -1
1300 private const val FALSE = 0
1301 private const val TRUE = 1
1302 
1303 @SharedImmutable
1304 private val SEALED = Symbol("SEALED")
1305 @SharedImmutable
1306 private val EMPTY_NEW = Empty(false)
1307 @SharedImmutable
1308 private val EMPTY_ACTIVE = Empty(true)
1309 
1310 private class Empty(override val isActive: Boolean) : Incomplete {
1311     override val list: NodeList? get() = null
1312     override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
1313 }
1314 
1315 @PublishedApi // for a custom job in the test module
1316 internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
1317     init { initParentJob(parent) }
1318     override val onCancelComplete get() = true
1319     /*
1320      * Check whether parent is able to handle exceptions as well.
1321      * With this check, an exception in that pattern will be handled once:
1322      * ```
1323      * launch {
1324      *     val child = Job(coroutineContext[Job])
1325      *     launch(child) { throw ... }
1326      * }
1327      * ```
1328      */
1329     override val handlesException: Boolean = handlesException()
completenull1330     override fun complete() = makeCompleting(Unit)
1331     override fun completeExceptionally(exception: Throwable): Boolean =
1332         makeCompleting(CompletedExceptionally(exception))
1333 
1334     @JsName("handlesExceptionF")
1335     private fun handlesException(): Boolean {
1336         var parentJob = (parentHandle as? ChildHandleNode)?.job ?: return false
1337         while (true) {
1338             if (parentJob.handlesException) return true
1339             parentJob = (parentJob.parentHandle as? ChildHandleNode)?.job ?: return false
1340         }
1341     }
1342 }
1343 
1344 // -------- invokeOnCompletion nodes
1345 
1346 internal interface Incomplete {
1347     val isActive: Boolean
1348     val list: NodeList? // is null only for Empty and JobNode incomplete state objects
1349 }
1350 
1351 internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Incomplete {
1352     /**
1353      * Initialized by [JobSupport.makeNode].
1354      */
1355     lateinit var job: JobSupport
1356     override val isActive: Boolean get() = true
1357     override val list: NodeList? get() = null
disposenull1358     override fun dispose() = job.removeNode(this)
1359     override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
1360 }
1361 
1362 internal class NodeList : LockFreeLinkedListHead(), Incomplete {
1363     override val isActive: Boolean get() = true
1364     override val list: NodeList get() = this
1365 
1366     fun getString(state: String) = buildString {
1367         append("List{")
1368         append(state)
1369         append("}[")
1370         var first = true
1371         this@NodeList.forEach<JobNode> { node ->
1372             if (first) first = false else append(", ")
1373             append(node)
1374         }
1375         append("]")
1376     }
1377 
1378     override fun toString(): String =
1379         if (DEBUG) getString("Active") else super.toString()
1380 }
1381 
1382 internal class InactiveNodeList(
1383     override val list: NodeList
1384 ) : Incomplete {
1385     override val isActive: Boolean get() = false
toStringnull1386     override fun toString(): String = if (DEBUG) list.getString("New") else super.toString()
1387 }
1388 
1389 private class InvokeOnCompletion(
1390     private val handler: CompletionHandler
1391 ) : JobNode()  {
1392     override fun invoke(cause: Throwable?) = handler.invoke(cause)
1393 }
1394 
1395 private class ResumeOnCompletion(
1396     private val continuation: Continuation<Unit>
1397 ) : JobNode() {
invokenull1398     override fun invoke(cause: Throwable?) = continuation.resume(Unit)
1399 }
1400 
1401 private class ResumeAwaitOnCompletion<T>(
1402     private val continuation: CancellableContinuationImpl<T>
1403 ) : JobNode() {
1404     override fun invoke(cause: Throwable?) {
1405         val state = job.state
1406         assert { state !is Incomplete }
1407         if (state is CompletedExceptionally) {
1408             // Resume with with the corresponding exception to preserve it
1409             continuation.resumeWithException(state.cause)
1410         } else {
1411             // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
1412             @Suppress("UNCHECKED_CAST")
1413             continuation.resume(state.unboxState() as T)
1414         }
1415     }
1416 }
1417 
1418 internal class DisposeOnCompletion(
1419     private val handle: DisposableHandle
1420 ) : JobNode() {
invokenull1421     override fun invoke(cause: Throwable?) = handle.dispose()
1422 }
1423 
1424 private class SelectJoinOnCompletion<R>(
1425     private val select: SelectInstance<R>,
1426     private val block: suspend () -> R
1427 ) : JobNode() {
1428     override fun invoke(cause: Throwable?) {
1429         if (select.trySelect())
1430             block.startCoroutineCancellable(select.completion)
1431     }
1432 }
1433 
1434 private class SelectAwaitOnCompletion<T, R>(
1435     private val select: SelectInstance<R>,
1436     private val block: suspend (T) -> R
1437 ) : JobNode() {
invokenull1438     override fun invoke(cause: Throwable?) {
1439         if (select.trySelect())
1440             job.selectAwaitCompletion(select, block)
1441     }
1442 }
1443 
1444 // -------- invokeOnCancellation nodes
1445 
1446 /**
1447  * Marker for node that shall be invoked on in _cancelling_ state.
1448  * **Note: may be invoked multiple times.**
1449  */
1450 internal abstract class JobCancellingNode : JobNode()
1451 
1452 private class InvokeOnCancelling(
1453     private val handler: CompletionHandler
1454 ) : JobCancellingNode()  {
1455     // delegate handler shall be invoked at most once, so here is an additional flag
1456     private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
invokenull1457     override fun invoke(cause: Throwable?) {
1458         if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
1459     }
1460 }
1461 
1462 internal class ChildHandleNode(
1463     @JvmField val childJob: ChildJob
1464 ) : JobCancellingNode(), ChildHandle {
1465     override val parent: Job get() = job
invokenull1466     override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
1467     override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
1468 }
1469 
1470 // Same as ChildHandleNode, but for cancellable continuation
1471 internal class ChildContinuation(
1472     @JvmField val child: CancellableContinuationImpl<*>
1473 ) : JobCancellingNode() {
1474     override fun invoke(cause: Throwable?) {
1475         child.parentCancelled(child.getContinuationCancellationCause(job))
1476     }
1477 }
1478 
1479