1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4 @file:Suppress("DEPRECATION_ERROR")
5
6 package kotlinx.coroutines
7
8 import kotlinx.atomicfu.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.selects.*
11 import kotlin.coroutines.*
12 import kotlin.coroutines.intrinsics.*
13 import kotlin.js.*
14 import kotlin.jvm.*
15
16 /**
17 * A concrete implementation of [Job]. It is optionally a child to a parent job.
18 *
19 * This is an open class designed for extension by more specific classes that might augment the
20 * state and mare store addition state information for completed jobs, like their result values.
21 *
22 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
23 * @suppress **This is unstable API and it is subject to change.**
24 */
25 @Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
26 public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob {
27 final override val key: CoroutineContext.Key<*> get() = Job
28
29 /*
30 === Internal states ===
31
32 name state class public state description
33 ------ ------------ ------------ -----------
34 EMPTY_N EmptyNew : New no listeners
35 EMPTY_A EmptyActive : Active no listeners
36 SINGLE JobNode : Active a single listener
37 SINGLE+ JobNode : Active a single listener + NodeList added as its next
38 LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
39 LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
40 COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
41 CANCELLING Finishing : Cancelling -- " --
42 FINAL_C Cancelled : Cancelled Cancelled (final state)
43 FINAL_R <any> : Completed produced some result
44
45 === Transitions ===
46
47 New states Active states Inactive states
48
49 +---------+ +---------+ }
50 | EMPTY_N | ----> | EMPTY_A | ----+ } Empty states
51 +---------+ +---------+ | }
52 | | | ^ | +----------+
53 | | | | +--> | FINAL_* |
54 | | V | | +----------+
55 | | +---------+ | }
56 | | | SINGLE | ----+ } JobNode states
57 | | +---------+ | }
58 | | | | }
59 | | V | }
60 | | +---------+ | }
61 | +-------> | SINGLE+ | ----+ }
62 | +---------+ | }
63 | | |
64 V V |
65 +---------+ +---------+ | }
66 | LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states
67 +---------+ +---------+ | }
68 | | | | |
69 | | +--------+ | |
70 | | | V |
71 | | | +------------+ | +------------+ }
72 | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
73 | | +------------+ +------------+ }
74 | | | ^
75 | | | |
76 +--------+---------+--------------------+
77
78
79 This state machine and its transition matrix are optimized for the common case when a job is created in active
80 state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
81 successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
82 state without going to COMPLETING state)
83
84 Note that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
85
86 ---------- TIMELINE of state changes and notification in Job lifecycle ----------
87
88 | The longest possible chain of events in shown, shorter versions cut-through intermediate states,
89 | while still performing all the notifications in this order.
90
91 + Job object is created
92 ## NEW: state == EMPTY_NEW | is InactiveNodeList
93 + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
94 ~ waits for start
95 >> start / join / await invoked
96 ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
97 + onStart (lazy coroutine is started)
98 ~ active coroutine is working (or scheduled to execution)
99 >> childCancelled / cancelImpl invoked
100 ## CANCELLING: state is Finishing, state.rootCause != null
101 ------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancelling=true) returns NonDisposableHandle
102 ------ new children get immediately cancelled, but are still admitted to the list
103 + onCancelling
104 + notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception)
105 + cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too)
106 ~ waits for completion of coroutine body
107 >> makeCompleting / makeCompletingOnce invoked
108 ## COMPLETING: state is Finishing, state.isCompleting == true
109 ------ new children are not admitted anymore, attachChild returns NonDisposableHandle
110 ~ waits for children
111 >> last child completes
112 - computes the final exception
113 ## SEALED: state is Finishing, state.isSealed == true
114 ------ cancel/childCancelled returns false (cannot handle exceptions anymore)
115 + cancelParent (final exception is communicated to the parent, parent incorporates it)
116 + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
117 ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
118 ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
119 + parentHandle.dispose
120 + notifyCompletion (invoke all completion listeners)
121 + onCompletionInternal / onCompleted / onCancelled
122
123 ---------------------------------------------------------------------------------
124 */
125
126 // Note: use shared objects while we have no listeners
127 // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
128 private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
129
130 private val _parentHandle = atomic<ChildHandle?>(null)
131 internal var parentHandle: ChildHandle?
132 get() = _parentHandle.value
133 set(value) { _parentHandle.value = value }
134
135 override val parent: Job?
136 get() = parentHandle?.parent
137
138 // ------------ initialization ------------
139
140 /**
141 * Initializes parent job.
142 * It shall be invoked at most once after construction after all other initialization.
143 */
144 protected fun initParentJob(parent: Job?) {
145 assert { parentHandle == null }
146 if (parent == null) {
147 parentHandle = NonDisposableHandle
148 return
149 }
150 parent.start() // make sure the parent is started
151 @Suppress("DEPRECATION")
152 val handle = parent.attachChild(this)
153 parentHandle = handle
154 // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
155 if (isCompleted) {
156 handle.dispose()
157 parentHandle = NonDisposableHandle // release it just in case, to aid GC
158 }
159 }
160
161 // ------------ state query ------------
162 /**
163 * Returns current state of this job.
164 * If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
165 * and should be [unboxed][unboxState] before returning to user code.
166 */
167 internal val state: Any? get() {
168 _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
169 if (state !is OpDescriptor) return state
170 state.perform(this)
171 }
172 }
173
174 /**
175 * @suppress **This is unstable API and it is subject to change.**
176 */
177 private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
178 while (true) {
179 block(state)
180 }
181 }
182
183 public override val isActive: Boolean get() {
184 val state = this.state
185 return state is Incomplete && state.isActive
186 }
187
188 public final override val isCompleted: Boolean get() = state !is Incomplete
189
190 public final override val isCancelled: Boolean get() {
191 val state = this.state
192 return state is CompletedExceptionally || (state is Finishing && state.isCancelling)
193 }
194
195 // ------------ state update ------------
196
197 // Finalizes Finishing -> Completed (terminal state) transition.
198 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
199 // Returns final state that was created and updated to
200 private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
201 /*
202 * Note: proposed state can be Incomplete, e.g.
203 * async {
204 * something.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
205 * }
206 */
207 assert { this.state === state } // consistency check -- it cannot change
208 assert { !state.isSealed } // consistency check -- cannot be sealed yet
209 assert { state.isCompleting } // consistency check -- must be marked as completing
210 val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
211 // Create the final exception and seal the state so that no more exceptions can be added
212 val wasCancelling: Boolean
213 val finalException = synchronized(state) {
214 wasCancelling = state.isCancelling
215 val exceptions = state.sealLocked(proposedException)
216 val finalCause = getFinalRootCause(state, exceptions)
217 if (finalCause != null) addSuppressedExceptions(finalCause, exceptions)
218 finalCause
219 }
220 // Create the final state object
221 val finalState = when {
222 // was not cancelled (no exception) -> use proposed update value
223 finalException == null -> proposedUpdate
224 // small optimization when we can used proposeUpdate object as is on cancellation
225 finalException === proposedException -> proposedUpdate
226 // cancelled job final state
227 else -> CompletedExceptionally(finalException)
228 }
229 // Now handle the final exception
230 if (finalException != null) {
231 val handled = cancelParent(finalException) || handleJobException(finalException)
232 if (handled) (finalState as CompletedExceptionally).makeHandled()
233 }
234 // Process state updates for the final state before the state of the Job is actually set to the final state
235 // to avoid races where outside observer may see the job in the final state, yet exception is not handled yet.
236 if (!wasCancelling) onCancelling(finalException)
237 onCompletionInternal(finalState)
238 // Then CAS to completed state -> it must succeed
239 val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete())
240 assert { casSuccess }
241 // And process all post-completion actions
242 completeStateFinalization(state, finalState)
243 return finalState
244 }
245
246 private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
247 // A case of no exceptions
248 if (exceptions.isEmpty()) {
249 // materialize cancellation exception if it was not materialized yet
250 if (state.isCancelling) return defaultCancellationException()
251 return null
252 }
253 /*
254 * 1) If we have non-CE, use it as root cause
255 * 2) If our original cause was TCE, use *non-original* TCE because of the special nature of TCE
256 * * It is a CE, so it's not reported by children
257 * * The first instance (cancellation cause) is created by timeout coroutine and has no meaningful stacktrace
258 * * The potential second instance is thrown by withTimeout lexical block itself, then it has recovered stacktrace
259 * 3) Just return the very first CE
260 */
261 val firstNonCancellation = exceptions.firstOrNull { it !is CancellationException }
262 if (firstNonCancellation != null) return firstNonCancellation
263 val first = exceptions[0]
264 if (first is TimeoutCancellationException) {
265 val detailedTimeoutException = exceptions.firstOrNull { it !== first && it is TimeoutCancellationException }
266 if (detailedTimeoutException != null) return detailedTimeoutException
267 }
268 return first
269 }
270
271 private fun addSuppressedExceptions(rootCause: Throwable, exceptions: List<Throwable>) {
272 if (exceptions.size <= 1) return // nothing more to do here
273 val seenExceptions = identitySet<Throwable>(exceptions.size)
274 /*
275 * Note that root cause may be a recovered exception as well.
276 * To avoid cycles we unwrap the root cause and check for self-suppression against unwrapped cause,
277 * but add suppressed exceptions to the recovered root cause (as it is our final exception)
278 */
279 val unwrappedCause = unwrap(rootCause)
280 for (exception in exceptions) {
281 val unwrapped = unwrap(exception)
282 if (unwrapped !== rootCause && unwrapped !== unwrappedCause &&
283 unwrapped !is CancellationException && seenExceptions.add(unwrapped)) {
284 rootCause.addSuppressedThrowable(unwrapped)
285 }
286 }
287 }
288
289 // fast-path method to finalize normally completed coroutines without children
290 // returns true if complete, and afterCompletion(update) shall be called
291 private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
292 assert { state is Empty || state is JobNode } // only simple state without lists where children can concurrently add
293 assert { update !is CompletedExceptionally } // only for normal completion
294 if (!_state.compareAndSet(state, update.boxIncomplete())) return false
295 onCancelling(null) // simple state is not a failure
296 onCompletionInternal(update)
297 completeStateFinalization(state, update)
298 return true
299 }
300
301 // suppressed == true when any exceptions were suppressed while building the final completion cause
302 private fun completeStateFinalization(state: Incomplete, update: Any?) {
303 /*
304 * Now the job in THE FINAL state. We need to properly handle the resulting state.
305 * Order of various invocations here is important.
306 *
307 * 1) Unregister from parent job.
308 */
309 parentHandle?.let {
310 it.dispose() // volatile read parentHandle _after_ state was updated
311 parentHandle = NonDisposableHandle // release it just in case, to aid GC
312 }
313 val cause = (update as? CompletedExceptionally)?.cause
314 /*
315 * 2) Invoke completion handlers: .join(), callbacks etc.
316 * It's important to invoke them only AFTER exception handling and everything else, see #208
317 */
318 if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
319 try {
320 state.invoke(cause)
321 } catch (ex: Throwable) {
322 handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
323 }
324 } else {
325 state.list?.notifyCompletion(cause)
326 }
327 }
328
329 private fun notifyCancelling(list: NodeList, cause: Throwable) {
330 // first cancel our own children
331 onCancelling(cause)
332 notifyHandlers<JobCancellingNode>(list, cause)
333 // then cancel parent
334 cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
335 }
336
337 /**
338 * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
339 * Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
340 *
341 * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
342 * may leak to the [CoroutineExceptionHandler].
343 */
344 private fun cancelParent(cause: Throwable): Boolean {
345 // Is scoped coroutine -- don't propagate, will be rethrown
346 if (isScopedCoroutine) return true
347
348 /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
349 * This allow parent to cancel its children (normally) without being cancelled itself, unless
350 * child crashes and produce some other exception during its completion.
351 */
352 val isCancellation = cause is CancellationException
353 val parent = parentHandle
354 // No parent -- ignore CE, report other exceptions.
355 if (parent === null || parent === NonDisposableHandle) {
356 return isCancellation
357 }
358
359 // Notify parent but don't forget to check cancellation
360 return parent.childCancelled(cause) || isCancellation
361 }
362
363 private fun NodeList.notifyCompletion(cause: Throwable?) =
364 notifyHandlers<JobNode>(this, cause)
365
366 private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
367 var exception: Throwable? = null
368 list.forEach<T> { node ->
369 try {
370 node.invoke(cause)
371 } catch (ex: Throwable) {
372 exception?.apply { addSuppressedThrowable(ex) } ?: run {
373 exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
374 }
375 }
376 }
377 exception?.let { handleOnCompletionException(it) }
378 }
379
380 public final override fun start(): Boolean {
381 loopOnState { state ->
382 when (startInternal(state)) {
383 FALSE -> return false
384 TRUE -> return true
385 }
386 }
387 }
388
389 // returns: RETRY/FALSE/TRUE:
390 // FALSE when not new,
391 // TRUE when started
392 // RETRY when need to retry
393 private fun startInternal(state: Any?): Int {
394 when (state) {
395 is Empty -> { // EMPTY_X state -- no completion handlers
396 if (state.isActive) return FALSE // already active
397 if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
398 onStart()
399 return TRUE
400 }
401 is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
402 if (!_state.compareAndSet(state, state.list)) return RETRY
403 onStart()
404 return TRUE
405 }
406 else -> return FALSE // not a new state
407 }
408 }
409
410 /**
411 * Override to provide the actual [start] action.
412 * This function is invoked exactly once when non-active coroutine is [started][start].
413 */
414 protected open fun onStart() {}
415
416 public final override fun getCancellationException(): CancellationException =
417 when (val state = this.state) {
418 is Finishing -> state.rootCause?.toCancellationException("$classSimpleName is cancelling")
419 ?: error("Job is still new or active: $this")
420 is Incomplete -> error("Job is still new or active: $this")
421 is CompletedExceptionally -> state.cause.toCancellationException()
422 else -> JobCancellationException("$classSimpleName has completed normally", null, this)
423 }
424
425 protected fun Throwable.toCancellationException(message: String? = null): CancellationException =
426 this as? CancellationException ?: defaultCancellationException(message, this)
427
428 /**
429 * Returns the cause that signals the completion of this job -- it returns the original
430 * [cancel] cause, [CancellationException] or **`null` if this job had completed normally**.
431 * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
432 * is being cancelled yet.
433 */
434 protected val completionCause: Throwable?
435 get() = when (val state = state) {
436 is Finishing -> state.rootCause
437 ?: error("Job is still new or active: $this")
438 is Incomplete -> error("Job is still new or active: $this")
439 is CompletedExceptionally -> state.cause
440 else -> null
441 }
442
443 /**
444 * Returns `true` when [completionCause] exception was handled by parent coroutine.
445 */
446 protected val completionCauseHandled: Boolean
447 get() = state.let { it is CompletedExceptionally && it.handled }
448
449 @Suppress("OverridingDeprecatedMember")
450 public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
451 invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler)
452
453 public final override fun invokeOnCompletion(
454 onCancelling: Boolean,
455 invokeImmediately: Boolean,
456 handler: CompletionHandler
457 ): DisposableHandle {
458 // Create node upfront -- for common cases it just initializes JobNode.job field,
459 // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
460 val node: JobNode = makeNode(handler, onCancelling)
461 loopOnState { state ->
462 when (state) {
463 is Empty -> { // EMPTY_X state -- no completion handlers
464 if (state.isActive) {
465 // try move to SINGLE state
466 if (_state.compareAndSet(state, node)) return node
467 } else
468 promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
469 }
470 is Incomplete -> {
471 val list = state.list
472 if (list == null) { // SINGLE/SINGLE+
473 promoteSingleToNodeList(state as JobNode)
474 } else {
475 var rootCause: Throwable? = null
476 var handle: DisposableHandle = NonDisposableHandle
477 if (onCancelling && state is Finishing) {
478 synchronized(state) {
479 // check if we are installing cancellation handler on job that is being cancelled
480 rootCause = state.rootCause // != null if cancelling job
481 // We add node to the list in two cases --- either the job is not being cancelled
482 // or we are adding a child to a coroutine that is not completing yet
483 if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
484 // Note: add node the list while holding lock on state (make sure it cannot change)
485 if (!addLastAtomic(state, list, node)) return@loopOnState // retry
486 // just return node if we don't have to invoke handler (not cancelling yet)
487 if (rootCause == null) return node
488 // otherwise handler is invoked immediately out of the synchronized section & handle returned
489 handle = node
490 }
491 }
492 }
493 if (rootCause != null) {
494 // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
495 if (invokeImmediately) handler.invokeIt(rootCause)
496 return handle
497 } else {
498 if (addLastAtomic(state, list, node)) return node
499 }
500 }
501 }
502 else -> { // is complete
503 // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
504 // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
505 if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
506 return NonDisposableHandle
507 }
508 }
509 }
510 }
511
512 private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode {
513 val node = if (onCancelling) {
514 (handler as? JobCancellingNode)
515 ?: InvokeOnCancelling(handler)
516 } else {
517 (handler as? JobNode)
518 ?.also { assert { it !is JobCancellingNode } }
519 ?: InvokeOnCompletion(handler)
520 }
521 node.job = this
522 return node
523 }
524
525 private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
526 list.addLastIf(node) { this.state === expect }
527
528 private fun promoteEmptyToNodeList(state: Empty) {
529 // try to promote it to LIST state with the corresponding state
530 val list = NodeList()
531 val update = if (state.isActive) list else InactiveNodeList(list)
532 _state.compareAndSet(state, update)
533 }
534
535 private fun promoteSingleToNodeList(state: JobNode) {
536 // try to promote it to list (SINGLE+ state)
537 state.addOneIfEmpty(NodeList())
538 // it must be in SINGLE+ state or state has changed (node could have need removed from state)
539 val list = state.nextNode // either our NodeList or somebody else won the race, updated state
540 // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
541 _state.compareAndSet(state, list)
542 }
543
544 public final override suspend fun join() {
545 if (!joinInternal()) { // fast-path no wait
546 coroutineContext.ensureActive()
547 return // do not suspend
548 }
549 return joinSuspend() // slow-path wait
550 }
551
552 private fun joinInternal(): Boolean {
553 loopOnState { state ->
554 if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
555 if (startInternal(state) >= 0) return true // wait unless need to retry
556 }
557 }
558
559 private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
560 // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
561 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont).asHandler))
562 }
563
564 @Suppress("UNCHECKED_CAST")
565 public final override val onJoin: SelectClause0
566 get() = SelectClause0Impl(
567 clauseObject = this@JobSupport,
568 regFunc = JobSupport::registerSelectForOnJoin as RegistrationFunction
569 )
570
571 @Suppress("UNUSED_PARAMETER")
572 private fun registerSelectForOnJoin(select: SelectInstance<*>, ignoredParam: Any?) {
573 if (!joinInternal()) {
574 select.selectInRegistrationPhase(Unit)
575 return
576 }
577 val disposableHandle = invokeOnCompletion(SelectOnJoinCompletionHandler(select).asHandler)
578 select.disposeOnCompletion(disposableHandle)
579 }
580
581 private inner class SelectOnJoinCompletionHandler(
582 private val select: SelectInstance<*>
583 ) : JobNode() {
584 override fun invoke(cause: Throwable?) {
585 select.trySelect(this@JobSupport, Unit)
586 }
587 }
588
589 /**
590 * @suppress **This is unstable API and it is subject to change.**
591 */
592 internal fun removeNode(node: JobNode) {
593 // remove logic depends on the state of the job
594 loopOnState { state ->
595 when (state) {
596 is JobNode -> { // SINGE/SINGLE+ state -- one completion handler
597 if (state !== node) return // a different job node --> we were already removed
598 // try remove and revert back to empty state
599 if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
600 }
601 is Incomplete -> { // may have a list of completion handlers
602 // remove node from the list if there is a list
603 if (state.list != null) node.remove()
604 return
605 }
606 else -> return // it is complete and does not have any completion handlers
607 }
608 }
609 }
610
611 /**
612 * Returns `true` for job that do not have "body block" to complete and should immediately go into
613 * completing state and start waiting for children.
614 *
615 * @suppress **This is unstable API and it is subject to change.**
616 */
617 internal open val onCancelComplete: Boolean get() = false
618
619 // external cancel with cause, never invoked implicitly from internal machinery
620 public override fun cancel(cause: CancellationException?) {
621 cancelInternal(cause ?: defaultCancellationException())
622 }
623
624 protected open fun cancellationExceptionMessage(): String = "Job was cancelled"
625
626 // HIDDEN in Job interface. Invoked only by legacy compiled code.
627 // external cancel with (optional) cause, never invoked implicitly from internal machinery
628 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Added since 1.2.0 for binary compatibility with versions <= 1.1.x")
629 public override fun cancel(cause: Throwable?): Boolean {
630 cancelInternal(cause?.toCancellationException() ?: defaultCancellationException())
631 return true
632 }
633
634 // It is overridden in channel-linked implementation
635 public open fun cancelInternal(cause: Throwable) {
636 cancelImpl(cause)
637 }
638
639 // Parent is cancelling child
640 public final override fun parentCancelled(parentJob: ParentJob) {
641 cancelImpl(parentJob)
642 }
643
644 /**
645 * Child was cancelled with a cause.
646 * In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
647 * It is overridden in supervisor implementations to completely ignore any child cancellation.
648 * Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
649 *
650 * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
651 * may leak to the [CoroutineExceptionHandler].
652 */
653 public open fun childCancelled(cause: Throwable): Boolean {
654 if (cause is CancellationException) return true
655 return cancelImpl(cause) && handlesException
656 }
657
658 /**
659 * Makes this [Job] cancelled with a specified [cause].
660 * It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
661 */
662 public fun cancelCoroutine(cause: Throwable?): Boolean = cancelImpl(cause)
663
664 // cause is Throwable or ParentJob when cancelChild was invoked
665 // returns true is exception was handled, false otherwise
666 internal fun cancelImpl(cause: Any?): Boolean {
667 var finalState: Any? = COMPLETING_ALREADY
668 if (onCancelComplete) {
669 // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
670 // completing and had recorded exception
671 finalState = cancelMakeCompleting(cause)
672 if (finalState === COMPLETING_WAITING_CHILDREN) return true
673 }
674 if (finalState === COMPLETING_ALREADY) {
675 finalState = makeCancelling(cause)
676 }
677 return when {
678 finalState === COMPLETING_ALREADY -> true
679 finalState === COMPLETING_WAITING_CHILDREN -> true
680 finalState === TOO_LATE_TO_CANCEL -> false
681 else -> {
682 afterCompletion(finalState)
683 true
684 }
685 }
686 }
687
688 // cause is Throwable or ParentJob when cancelChild was invoked
689 // It contains a loop and never returns COMPLETING_RETRY, can return
690 // COMPLETING_ALREADY -- if already completed/completing
691 // COMPLETING_WAITING_CHILDREN -- if started waiting for children
692 // final state -- when completed, for call to afterCompletion
693 private fun cancelMakeCompleting(cause: Any?): Any? {
694 loopOnState { state ->
695 if (state !is Incomplete || state is Finishing && state.isCompleting) {
696 // already completed/completing, do not even create exception to propose update
697 return COMPLETING_ALREADY
698 }
699 val proposedUpdate = CompletedExceptionally(createCauseException(cause))
700 val finalState = tryMakeCompleting(state, proposedUpdate)
701 if (finalState !== COMPLETING_RETRY) return finalState
702 }
703 }
704
705 @Suppress("NOTHING_TO_INLINE") // Save a stack frame
706 internal inline fun defaultCancellationException(message: String? = null, cause: Throwable? = null) =
707 JobCancellationException(message ?: cancellationExceptionMessage(), cause, this)
708
709 override fun getChildJobCancellationCause(): CancellationException {
710 // determine root cancellation cause of this job (why is it cancelling its children?)
711 val state = this.state
712 val rootCause = when (state) {
713 is Finishing -> state.rootCause
714 is CompletedExceptionally -> state.cause
715 is Incomplete -> error("Cannot be cancelling child in this state: $state")
716 else -> null // create exception with the below code on normal completion
717 }
718 return (rootCause as? CancellationException) ?: JobCancellationException("Parent job is ${stateString(state)}", rootCause, this)
719 }
720
721 // cause is Throwable or ParentJob when cancelChild was invoked
722 private fun createCauseException(cause: Any?): Throwable = when (cause) {
723 is Throwable? -> cause ?: defaultCancellationException()
724 else -> (cause as ParentJob).getChildJobCancellationCause()
725 }
726
727 // transitions to Cancelling state
728 // cause is Throwable or ParentJob when cancelChild was invoked
729 // It contains a loop and never returns COMPLETING_RETRY, can return
730 // COMPLETING_ALREADY -- if already completing or successfully made cancelling, added exception
731 // COMPLETING_WAITING_CHILDREN -- if started waiting for children, added exception
732 // TOO_LATE_TO_CANCEL -- too late to cancel, did not add exception
733 // final state -- when completed, for call to afterCompletion
734 private fun makeCancelling(cause: Any?): Any? {
735 var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
736 loopOnState { state ->
737 when (state) {
738 is Finishing -> { // already finishing -- collect exceptions
739 val notifyRootCause = synchronized(state) {
740 if (state.isSealed) return TOO_LATE_TO_CANCEL // already sealed -- cannot add exception nor mark cancelled
741 // add exception, do nothing is parent is cancelling child that is already being cancelled
742 val wasCancelling = state.isCancelling // will notify if was not cancelling
743 // Materialize missing exception if it is the first exception (otherwise -- don't)
744 if (cause != null || !wasCancelling) {
745 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
746 state.addExceptionLocked(causeException)
747 }
748 // take cause for notification if was not in cancelling state before
749 state.rootCause.takeIf { !wasCancelling }
750 }
751 notifyRootCause?.let { notifyCancelling(state.list, it) }
752 return COMPLETING_ALREADY
753 }
754 is Incomplete -> {
755 // Not yet finishing -- try to make it cancelling
756 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
757 if (state.isActive) {
758 // active state becomes cancelling
759 if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY
760 } else {
761 // non active state starts completing
762 val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException))
763 when {
764 finalState === COMPLETING_ALREADY -> error("Cannot happen in $state")
765 finalState === COMPLETING_RETRY -> return@loopOnState
766 else -> return finalState
767 }
768 }
769 }
770 else -> return TOO_LATE_TO_CANCEL // already complete
771 }
772 }
773 }
774
775 // Performs promotion of incomplete coroutine state to NodeList for the purpose of
776 // converting coroutine state to Cancelling, returns null when need to retry
777 private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:
778 when (state) {
779 is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state
780 is JobNode -> {
781 // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
782 // correctly capture a reference to it
783 promoteSingleToNodeList(state)
784 null // retry
785 }
786 else -> error("State should have list: $state")
787 }
788
789 // try make new Cancelling state on the condition that we're still in the expected state
790 private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
791 assert { state !is Finishing } // only for non-finishing states
792 assert { state.isActive } // only for active states
793 // get state's list or else promote to list to correctly operate on child lists
794 val list = getOrPromoteCancellingList(state) ?: return false
795 // Create cancelling state (with rootCause!)
796 val cancelling = Finishing(list, false, rootCause)
797 if (!_state.compareAndSet(state, cancelling)) return false
798 // Notify listeners
799 notifyCancelling(list, rootCause)
800 return true
801 }
802
803 /**
804 * Completes this job. Used by [CompletableDeferred.complete] (and exceptionally)
805 * and by [JobImpl.cancel]. It returns `false` on repeated invocation
806 * (when this job is already completing).
807 */
808 internal fun makeCompleting(proposedUpdate: Any?): Boolean {
809 loopOnState { state ->
810 val finalState = tryMakeCompleting(state, proposedUpdate)
811 when {
812 finalState === COMPLETING_ALREADY -> return false
813 finalState === COMPLETING_WAITING_CHILDREN -> return true
814 finalState === COMPLETING_RETRY -> return@loopOnState
815 else -> {
816 afterCompletion(finalState)
817 return true
818 }
819 }
820 }
821 }
822
823 /**
824 * Completes this job. Used by [AbstractCoroutine.resume].
825 * It throws [IllegalStateException] on repeated invocation (when this job is already completing).
826 * Returns:
827 * * [COMPLETING_WAITING_CHILDREN] if started waiting for children.
828 * * Final state otherwise (caller should do [afterCompletion])
829 */
830 internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
831 loopOnState { state ->
832 val finalState = tryMakeCompleting(state, proposedUpdate)
833 when {
834 finalState === COMPLETING_ALREADY ->
835 throw IllegalStateException(
836 "Job $this is already complete or completing, " +
837 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
838 )
839 finalState === COMPLETING_RETRY -> return@loopOnState
840 else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
841 }
842 }
843 }
844
845 // Returns one of COMPLETING symbols or final state:
846 // COMPLETING_ALREADY -- when already complete or completing
847 // COMPLETING_RETRY -- when need to retry due to interference
848 // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
849 // final state -- when completed, for call to afterCompletion
850 private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
851 if (state !is Incomplete)
852 return COMPLETING_ALREADY
853 /*
854 * FAST PATH -- no children to wait for && simple state (no list) && not cancelling => can complete immediately
855 * Cancellation (failures) always have to go through Finishing state to serialize exception handling.
856 * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
857 * which may miss unhandled exception.
858 */
859 if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
860 if (tryFinalizeSimpleState(state, proposedUpdate)) {
861 // Completed successfully on fast path -- return updated state
862 return proposedUpdate
863 }
864 return COMPLETING_RETRY
865 }
866 // The separate slow-path function to simplify profiling
867 return tryMakeCompletingSlowPath(state, proposedUpdate)
868 }
869
870 // Returns one of COMPLETING symbols or final state:
871 // COMPLETING_ALREADY -- when already complete or completing
872 // COMPLETING_RETRY -- when need to retry due to interference
873 // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
874 // final state -- when completed, for call to afterCompletion
875 private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
876 // get state's list or else promote to list to correctly operate on child lists
877 val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
878 // promote to Finishing state if we are not in it yet
879 // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
880 // atomically transition to finishing & completing state
881 val finishing = state as? Finishing ?: Finishing(list, false, null)
882 // must synchronize updates to finishing state
883 var notifyRootCause: Throwable? = null
884 synchronized(finishing) {
885 // check if this state is already completing
886 if (finishing.isCompleting) return COMPLETING_ALREADY
887 // mark as completing
888 finishing.isCompleting = true
889 // if we need to promote to finishing then atomically do it here.
890 // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
891 // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
892 if (finishing !== state) {
893 if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
894 }
895 // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
896 assert { !finishing.isSealed } // cannot be sealed
897 // add new proposed exception to the finishing state
898 val wasCancelling = finishing.isCancelling
899 (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
900 // If it just becomes cancelling --> must process cancelling notifications
901 notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
902 }
903 // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
904 notifyRootCause?.let { notifyCancelling(list, it) }
905 // now wait for children
906 val child = firstChild(state)
907 if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
908 return COMPLETING_WAITING_CHILDREN
909 // otherwise -- we have not children left (all were already cancelled?)
910 return finalizeFinishingState(finishing, proposedUpdate)
911 }
912
913 private val Any?.exceptionOrNull: Throwable?
914 get() = (this as? CompletedExceptionally)?.cause
915
916 private fun firstChild(state: Incomplete) =
917 state as? ChildHandleNode ?: state.list?.nextChild()
918
919 // return false when there is no more incomplete children to wait
920 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
921 private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
922 val handle = child.childJob.invokeOnCompletion(
923 invokeImmediately = false,
924 handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
925 )
926 if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
927 val nextChild = child.nextChild() ?: return false
928 return tryWaitForChild(state, nextChild, proposedUpdate)
929 }
930
931 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
932 private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
933 assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
934 // figure out if we need to wait for next child
935 val waitChild = lastChild.nextChild()
936 // try wait for next child
937 if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
938 // no more children to wait -- try update state
939 val finalState = finalizeFinishingState(state, proposedUpdate)
940 afterCompletion(finalState)
941 }
942
943 private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? {
944 var cur = this
945 while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
946 while (true) {
947 cur = cur.nextNode
948 if (cur.isRemoved) continue
949 if (cur is ChildHandleNode) return cur
950 if (cur is NodeList) return null // checked all -- no more children
951 }
952 }
953
954 public final override val children: Sequence<Job> get() = sequence {
955 when (val state = this@JobSupport.state) {
956 is ChildHandleNode -> yield(state.childJob)
957 is Incomplete -> state.list?.let { list ->
958 list.forEach<ChildHandleNode> { yield(it.childJob) }
959 }
960 }
961 }
962
963 @Suppress("OverridingDeprecatedMember")
964 public final override fun attachChild(child: ChildJob): ChildHandle {
965 /*
966 * Note: This function attaches a special ChildHandleNode node object. This node object
967 * is handled in a special way on completion on the coroutine (we wait for all of them) and
968 * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
969 * if the job is already cancelling. For cancelling state child is attached under state lock.
970 * It's required to properly wait all children before completion and provide linearizable hierarchy view:
971 * If child is attached when the job is already being cancelled, such child will receive immediate notification on
972 * cancellation, but parent *will* wait for that child before completion and will handle its exception.
973 */
974 return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(child).asHandler) as ChildHandle
975 }
976
977 /**
978 * Override to process any exceptions that were encountered while invoking completion handlers
979 * installed via [invokeOnCompletion].
980 *
981 * @suppress **This is unstable API and it is subject to change.**
982 */
983 internal open fun handleOnCompletionException(exception: Throwable) {
984 throw exception
985 }
986
987 /**
988 * This function is invoked once as soon as this job is being cancelled for any reason or completes,
989 * similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
990 *
991 * The meaning of [cause] parameter:
992 * * Cause is `null` when the job has completed normally.
993 * * Cause is an instance of [CancellationException] when the job was cancelled _normally_.
994 * **It should not be treated as an error**. In particular, it should not be reported to error logs.
995 * * Otherwise, the job had been cancelled or failed with exception.
996 *
997 * The specified [cause] is not the final cancellation cause of this job.
998 * A job may produce other exceptions while it is failing and the final cause might be different.
999 *
1000 * @suppress **This is unstable API and it is subject to change.*
1001 */
1002 protected open fun onCancelling(cause: Throwable?) {}
1003
1004 /**
1005 * Returns `true` for scoped coroutines.
1006 * Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
1007 * Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
1008 * Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
1009 */
1010 protected open val isScopedCoroutine: Boolean get() = false
1011
1012 /**
1013 * Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal].
1014 * A valid implementation of this getter should recursively check parent as well before returning `false`.
1015 *
1016 * The only instance of the [Job] that does not handle its exceptions is [JobImpl] and its subclass [SupervisorJobImpl].
1017 * @suppress **This is unstable API and it is subject to change.*
1018 */
1019 internal open val handlesException: Boolean get() = true
1020
1021 /**
1022 * Handles the final job [exception] that was not handled by the parent coroutine.
1023 * Returns `true` if it handles exception (so handling at later stages is not needed).
1024 * It is designed to be overridden by launch-like coroutines
1025 * (`StandaloneCoroutine` and `ActorCoroutine`) that don't have a result type
1026 * that can represent exceptions.
1027 *
1028 * This method is invoked **exactly once** when the final exception of the job is determined
1029 * and before it becomes complete. At the moment of invocation the job and all its children are complete.
1030 */
1031 protected open fun handleJobException(exception: Throwable): Boolean = false
1032
1033 /**
1034 * Override for completion actions that need to update some external object depending on job's state,
1035 * right before all the waiters for coroutine's completion are notified.
1036 *
1037 * @param state the final state.
1038 *
1039 * @suppress **This is unstable API and it is subject to change.**
1040 */
1041 protected open fun onCompletionInternal(state: Any?) {}
1042
1043 /**
1044 * Override for the very last action on job's completion to resume the rest of the code in
1045 * scoped coroutines. It is called when this job is externally completed in an unknown
1046 * context and thus should resume with a default mode.
1047 *
1048 * @suppress **This is unstable API and it is subject to change.**
1049 */
1050 protected open fun afterCompletion(state: Any?) {}
1051
1052 // for nicer debugging
1053 public override fun toString(): String =
1054 "${toDebugString()}@$hexAddress"
1055
1056 @InternalCoroutinesApi
1057 public fun toDebugString(): String = "${nameString()}{${stateString(state)}}"
1058
1059 /**
1060 * @suppress **This is unstable API and it is subject to change.**
1061 */
1062 internal open fun nameString(): String = classSimpleName
1063
1064 private fun stateString(state: Any?): String = when (state) {
1065 is Finishing -> when {
1066 state.isCancelling -> "Cancelling"
1067 state.isCompleting -> "Completing"
1068 else -> "Active"
1069 }
1070 is Incomplete -> if (state.isActive) "Active" else "New"
1071 is CompletedExceptionally -> "Cancelled"
1072 else -> "Completed"
1073 }
1074
1075 // Completing & Cancelling states,
1076 // All updates are guarded by synchronized(this), reads are volatile
1077 @Suppress("UNCHECKED_CAST")
1078 private class Finishing(
1079 override val list: NodeList,
1080 isCompleting: Boolean,
1081 rootCause: Throwable?
1082 ) : SynchronizedObject(), Incomplete {
1083 private val _isCompleting = atomic(isCompleting)
1084 var isCompleting: Boolean
1085 get() = _isCompleting.value
1086 set(value) { _isCompleting.value = value }
1087
1088 private val _rootCause = atomic(rootCause)
1089 var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
1090 get() = _rootCause.value
1091 set(value) { _rootCause.value = value }
1092
1093 private val _exceptionsHolder = atomic<Any?>(null)
1094 private var exceptionsHolder: Any? // Contains null | Throwable | ArrayList | SEALED
1095 get() = _exceptionsHolder.value
1096 set(value) { _exceptionsHolder.value = value }
1097
1098 // Note: cannot be modified when sealed
1099 val isSealed: Boolean get() = exceptionsHolder === SEALED
1100 val isCancelling: Boolean get() = rootCause != null
1101 override val isActive: Boolean get() = rootCause == null // !isCancelling
1102
1103 // Seals current state and returns list of exceptions
1104 // guarded by `synchronized(this)`
1105 fun sealLocked(proposedException: Throwable?): List<Throwable> {
1106 val list = when(val eh = exceptionsHolder) { // volatile read
1107 null -> allocateList()
1108 is Throwable -> allocateList().also { it.add(eh) }
1109 is ArrayList<*> -> eh as ArrayList<Throwable>
1110 else -> error("State is $eh") // already sealed -- cannot happen
1111 }
1112 val rootCause = this.rootCause // volatile read
1113 rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
1114 if (proposedException != null && proposedException != rootCause) list.add(proposedException)
1115 exceptionsHolder = SEALED
1116 return list
1117 }
1118
1119 // guarded by `synchronized(this)`
1120 fun addExceptionLocked(exception: Throwable) {
1121 val rootCause = this.rootCause // volatile read
1122 if (rootCause == null) {
1123 this.rootCause = exception
1124 return
1125 }
1126 if (exception === rootCause) return // nothing to do
1127 when (val eh = exceptionsHolder) { // volatile read
1128 null -> exceptionsHolder = exception
1129 is Throwable -> {
1130 if (exception === eh) return // nothing to do
1131 exceptionsHolder = allocateList().apply {
1132 add(eh)
1133 add(exception)
1134
1135 }
1136 }
1137 is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
1138 else -> error("State is $eh") // already sealed -- cannot happen
1139 }
1140 }
1141
1142 private fun allocateList() = ArrayList<Throwable>(4)
1143
1144 override fun toString(): String =
1145 "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$exceptionsHolder, list=$list]"
1146 }
1147
1148 private val Incomplete.isCancelling: Boolean
1149 get() = this is Finishing && isCancelling
1150
1151 // Used by parent that is waiting for child completion
1152 private class ChildCompletion(
1153 private val parent: JobSupport,
1154 private val state: Finishing,
1155 private val child: ChildHandleNode,
1156 private val proposedUpdate: Any?
1157 ) : JobNode() {
1158 override fun invoke(cause: Throwable?) {
1159 parent.continueCompleting(state, child, proposedUpdate)
1160 }
1161 }
1162
1163 private class AwaitContinuation<T>(
1164 delegate: Continuation<T>,
1165 private val job: JobSupport
1166 ) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
1167 override fun getContinuationCancellationCause(parent: Job): Throwable {
1168 val state = job.state
1169 /*
1170 * When the job we are waiting for had already completely completed exceptionally or
1171 * is failing, we shall use its root/completion cause for await's result.
1172 */
1173 if (state is Finishing) state.rootCause?.let { return it }
1174 if (state is CompletedExceptionally) return state.cause
1175 return parent.getCancellationException()
1176 }
1177
1178 protected override fun nameString(): String =
1179 "AwaitContinuation"
1180 }
1181
1182 /*
1183 * =================================================================================================
1184 * This is ready-to-use implementation for Deferred interface.
1185 * However, it is not type-safe. Conceptually it just exposes the value of the underlying
1186 * completed state as `Any?`
1187 * =================================================================================================
1188 */
1189
1190 public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
1191
1192 public fun getCompletionExceptionOrNull(): Throwable? {
1193 val state = this.state
1194 check(state !is Incomplete) { "This job has not completed yet" }
1195 return state.exceptionOrNull
1196 }
1197
1198 /**
1199 * @suppress **This is unstable API and it is subject to change.**
1200 */
1201 internal fun getCompletedInternal(): Any? {
1202 val state = this.state
1203 check(state !is Incomplete) { "This job has not completed yet" }
1204 if (state is CompletedExceptionally) throw state.cause
1205 return state.unboxState()
1206 }
1207
1208 /**
1209 * @suppress **This is unstable API and it is subject to change.**
1210 */
1211 protected suspend fun awaitInternal(): Any? {
1212 // fast-path -- check state (avoid extra object creation)
1213 while (true) { // lock-free loop on state
1214 val state = this.state
1215 if (state !is Incomplete) {
1216 // already complete -- just return result
1217 if (state is CompletedExceptionally) { // Slow path to recover stacktrace
1218 recoverAndThrow(state.cause)
1219 }
1220 return state.unboxState()
1221
1222 }
1223 if (startInternal(state) >= 0) break // break unless needs to retry
1224 }
1225 return awaitSuspend() // slow-path
1226 }
1227
1228 private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
1229 /*
1230 * Custom code here, so that parent coroutine that is using await
1231 * on its child deferred (async) coroutine would throw the exception that this child had
1232 * thrown and not a JobCancellationException.
1233 */
1234 val cont = AwaitContinuation(uCont.intercepted(), this)
1235 // we are mimicking suspendCancellableCoroutine here and call initCancellability, too.
1236 cont.initCancellability()
1237 cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
1238 cont.getResult()
1239 }
1240
1241 @Suppress("UNCHECKED_CAST")
1242 protected val onAwaitInternal: SelectClause1<*> get() = SelectClause1Impl<Any?>(
1243 clauseObject = this@JobSupport,
1244 regFunc = JobSupport::onAwaitInternalRegFunc as RegistrationFunction,
1245 processResFunc = JobSupport::onAwaitInternalProcessResFunc as ProcessResultFunction
1246 )
1247
1248 @Suppress("UNUSED_PARAMETER")
1249 private fun onAwaitInternalRegFunc(select: SelectInstance<*>, ignoredParam: Any?) {
1250 while (true) {
1251 val state = this.state
1252 if (state !is Incomplete) {
1253 val result = if (state is CompletedExceptionally) state else state.unboxState()
1254 select.selectInRegistrationPhase(result)
1255 return
1256 }
1257 if (startInternal(state) >= 0) break // break unless needs to retry
1258 }
1259 val disposableHandle = invokeOnCompletion(SelectOnAwaitCompletionHandler(select).asHandler)
1260 select.disposeOnCompletion(disposableHandle)
1261 }
1262
1263 @Suppress("UNUSED_PARAMETER")
1264 private fun onAwaitInternalProcessResFunc(ignoredParam: Any?, result: Any?): Any? {
1265 if (result is CompletedExceptionally) throw result.cause
1266 return result
1267 }
1268
1269 private inner class SelectOnAwaitCompletionHandler(
1270 private val select: SelectInstance<*>
1271 ) : JobNode() {
1272 override fun invoke(cause: Throwable?) {
1273 val state = this@JobSupport.state
1274 val result = if (state is CompletedExceptionally) state else state.unboxState()
1275 select.trySelect(this@JobSupport, result)
1276 }
1277 }
1278 }
1279
1280 /*
1281 * Class to represent object as the final state of the Job
1282 */
1283 private class IncompleteStateBox(@JvmField val state: Incomplete)
boxIncompletenull1284 internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
1285 internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
1286
1287 // --------------- helper classes & constants for job implementation
1288
1289 private val COMPLETING_ALREADY = Symbol("COMPLETING_ALREADY")
1290 @JvmField
1291 internal val COMPLETING_WAITING_CHILDREN = Symbol("COMPLETING_WAITING_CHILDREN")
1292 private val COMPLETING_RETRY = Symbol("COMPLETING_RETRY")
1293 private val TOO_LATE_TO_CANCEL = Symbol("TOO_LATE_TO_CANCEL")
1294
1295 private const val RETRY = -1
1296 private const val FALSE = 0
1297 private const val TRUE = 1
1298
1299 private val SEALED = Symbol("SEALED")
1300 private val EMPTY_NEW = Empty(false)
1301 private val EMPTY_ACTIVE = Empty(true)
1302
1303 private class Empty(override val isActive: Boolean) : Incomplete {
1304 override val list: NodeList? get() = null
1305 override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
1306 }
1307
1308 @PublishedApi // for a custom job in the test module
1309 internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
1310 init { initParentJob(parent) }
1311 override val onCancelComplete get() = true
1312 /*
1313 * Check whether parent is able to handle exceptions as well.
1314 * With this check, an exception in that pattern will be handled once:
1315 * ```
1316 * launch {
1317 * val child = Job(coroutineContext[Job])
1318 * launch(child) { throw ... }
1319 * }
1320 * ```
1321 */
1322 override val handlesException: Boolean = handlesException()
completenull1323 override fun complete() = makeCompleting(Unit)
1324 override fun completeExceptionally(exception: Throwable): Boolean =
1325 makeCompleting(CompletedExceptionally(exception))
1326
1327 @JsName("handlesExceptionF")
1328 private fun handlesException(): Boolean {
1329 var parentJob = (parentHandle as? ChildHandleNode)?.job ?: return false
1330 while (true) {
1331 if (parentJob.handlesException) return true
1332 parentJob = (parentJob.parentHandle as? ChildHandleNode)?.job ?: return false
1333 }
1334 }
1335 }
1336
1337 // -------- invokeOnCompletion nodes
1338
1339 internal interface Incomplete {
1340 val isActive: Boolean
1341 val list: NodeList? // is null only for Empty and JobNode incomplete state objects
1342 }
1343
1344 internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Incomplete {
1345 /**
1346 * Initialized by [JobSupport.makeNode].
1347 */
1348 lateinit var job: JobSupport
1349 override val isActive: Boolean get() = true
1350 override val list: NodeList? get() = null
disposenull1351 override fun dispose() = job.removeNode(this)
1352 override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
1353 }
1354
1355 internal class NodeList : LockFreeLinkedListHead(), Incomplete {
1356 override val isActive: Boolean get() = true
1357 override val list: NodeList get() = this
1358
1359 fun getString(state: String) = buildString {
1360 append("List{")
1361 append(state)
1362 append("}[")
1363 var first = true
1364 this@NodeList.forEach<JobNode> { node ->
1365 if (first) first = false else append(", ")
1366 append(node)
1367 }
1368 append("]")
1369 }
1370
1371 override fun toString(): String =
1372 if (DEBUG) getString("Active") else super.toString()
1373 }
1374
1375 internal class InactiveNodeList(
1376 override val list: NodeList
1377 ) : Incomplete {
1378 override val isActive: Boolean get() = false
toStringnull1379 override fun toString(): String = if (DEBUG) list.getString("New") else super.toString()
1380 }
1381
1382 private class InvokeOnCompletion(
1383 private val handler: CompletionHandler
1384 ) : JobNode() {
1385 override fun invoke(cause: Throwable?) = handler.invoke(cause)
1386 }
1387
1388 private class ResumeOnCompletion(
1389 private val continuation: Continuation<Unit>
1390 ) : JobNode() {
invokenull1391 override fun invoke(cause: Throwable?) = continuation.resume(Unit)
1392 }
1393
1394 private class ResumeAwaitOnCompletion<T>(
1395 private val continuation: CancellableContinuationImpl<T>
1396 ) : JobNode() {
1397 override fun invoke(cause: Throwable?) {
1398 val state = job.state
1399 assert { state !is Incomplete }
1400 if (state is CompletedExceptionally) {
1401 // Resume with with the corresponding exception to preserve it
1402 continuation.resumeWithException(state.cause)
1403 } else {
1404 // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
1405 @Suppress("UNCHECKED_CAST")
1406 continuation.resume(state.unboxState() as T)
1407 }
1408 }
1409 }
1410
1411 internal class DisposeOnCompletion(
1412 private val handle: DisposableHandle
1413 ) : JobNode() {
invokenull1414 override fun invoke(cause: Throwable?) = handle.dispose()
1415 }
1416
1417 // -------- invokeOnCancellation nodes
1418
1419 /**
1420 * Marker for node that shall be invoked on in _cancelling_ state.
1421 * **Note: may be invoked multiple times.**
1422 */
1423 internal abstract class JobCancellingNode : JobNode()
1424
1425 private class InvokeOnCancelling(
1426 private val handler: CompletionHandler
1427 ) : JobCancellingNode() {
1428 // delegate handler shall be invoked at most once, so here is an additional flag
1429 private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
1430 override fun invoke(cause: Throwable?) {
1431 if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
1432 }
1433 }
1434
1435 internal class ChildHandleNode(
1436 @JvmField val childJob: ChildJob
1437 ) : JobCancellingNode(), ChildHandle {
1438 override val parent: Job get() = job
invokenull1439 override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
1440 override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
1441 }
1442
1443 // Same as ChildHandleNode, but for cancellable continuation
1444 @PublishedApi
1445 internal class ChildContinuation(
1446 // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
1447 @JvmField val child: CancellableContinuationImpl<*>
1448 ) : JobCancellingNode() {
1449 override fun invoke(cause: Throwable?) {
1450 child.parentCancelled(child.getContinuationCancellationCause(job))
1451 }
1452 }
1453
1454