1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4 @file:Suppress("DEPRECATION_ERROR")
5
6 package kotlinx.coroutines
7
8 import kotlinx.atomicfu.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.intrinsics.*
11 import kotlinx.coroutines.selects.*
12 import kotlin.coroutines.*
13 import kotlin.coroutines.intrinsics.*
14 import kotlin.js.*
15 import kotlin.jvm.*
16 import kotlin.native.concurrent.*
17
18 /**
19 * A concrete implementation of [Job]. It is optionally a child to a parent job.
20 *
21 * This is an open class designed for extension by more specific classes that might augment the
22 * state and mare store addition state information for completed jobs, like their result values.
23 *
24 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
25 * @suppress **This is unstable API and it is subject to change.**
26 */
27 @Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
28 public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
29 final override val key: CoroutineContext.Key<*> get() = Job
30
31 /*
32 === Internal states ===
33
34 name state class public state description
35 ------ ------------ ------------ -----------
36 EMPTY_N EmptyNew : New no listeners
37 EMPTY_A EmptyActive : Active no listeners
38 SINGLE JobNode : Active a single listener
39 SINGLE+ JobNode : Active a single listener + NodeList added as its next
40 LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
41 LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
42 COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
43 CANCELLING Finishing : Cancelling -- " --
44 FINAL_C Cancelled : Cancelled Cancelled (final state)
45 FINAL_R <any> : Completed produced some result
46
47 === Transitions ===
48
49 New states Active states Inactive states
50
51 +---------+ +---------+ }
52 | EMPTY_N | ----> | EMPTY_A | ----+ } Empty states
53 +---------+ +---------+ | }
54 | | | ^ | +----------+
55 | | | | +--> | FINAL_* |
56 | | V | | +----------+
57 | | +---------+ | }
58 | | | SINGLE | ----+ } JobNode states
59 | | +---------+ | }
60 | | | | }
61 | | V | }
62 | | +---------+ | }
63 | +-------> | SINGLE+ | ----+ }
64 | +---------+ | }
65 | | |
66 V V |
67 +---------+ +---------+ | }
68 | LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states
69 +---------+ +---------+ | }
70 | | | | |
71 | | +--------+ | |
72 | | | V |
73 | | | +------------+ | +------------+ }
74 | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
75 | | +------------+ +------------+ }
76 | | | ^
77 | | | |
78 +--------+---------+--------------------+
79
80
81 This state machine and its transition matrix are optimized for the common case when a job is created in active
82 state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
83 successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
84 state without going to COMPLETING state)
85
86 Note that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
87
88 ---------- TIMELINE of state changes and notification in Job lifecycle ----------
89
90 | The longest possible chain of events in shown, shorter versions cut-through intermediate states,
91 | while still performing all the notifications in this order.
92
93 + Job object is created
94 ## NEW: state == EMPTY_NEW | is InactiveNodeList
95 + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
96 ~ waits for start
97 >> start / join / await invoked
98 ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
99 + onStartInternal / onStart (lazy coroutine is started)
100 ~ active coroutine is working (or scheduled to execution)
101 >> childCancelled / cancelImpl invoked
102 ## CANCELLING: state is Finishing, state.rootCause != null
103 ------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancelling=true) returns NonDisposableHandle
104 ------ new children get immediately cancelled, but are still admitted to the list
105 + onCancelling
106 + notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception)
107 + cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too)
108 ~ waits for completion of coroutine body
109 >> makeCompleting / makeCompletingOnce invoked
110 ## COMPLETING: state is Finishing, state.isCompleting == true
111 ------ new children are not admitted anymore, attachChild returns NonDisposableHandle
112 ~ waits for children
113 >> last child completes
114 - computes the final exception
115 ## SEALED: state is Finishing, state.isSealed == true
116 ------ cancel/childCancelled returns false (cannot handle exceptions anymore)
117 + cancelParent (final exception is communicated to the parent, parent incorporates it)
118 + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
119 ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
120 ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
121 + parentHandle.dispose
122 + notifyCompletion (invoke all completion listeners)
123 + onCompletionInternal / onCompleted / onCancelled
124
125 ---------------------------------------------------------------------------------
126 */
127
128 // Note: use shared objects while we have no listeners
129 private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
130
131 private val _parentHandle = atomic<ChildHandle?>(null)
132 internal var parentHandle: ChildHandle?
133 get() = _parentHandle.value
134 set(value) { _parentHandle.value = value }
135
136 // ------------ initialization ------------
137
138 /**
139 * Initializes parent job.
140 * It shall be invoked at most once after construction after all other initialization.
141 */
142 internal fun initParentJobInternal(parent: Job?) {
143 assert { parentHandle == null }
144 if (parent == null) {
145 parentHandle = NonDisposableHandle
146 return
147 }
148 parent.start() // make sure the parent is started
149 @Suppress("DEPRECATION")
150 val handle = parent.attachChild(this)
151 parentHandle = handle
152 // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
153 if (isCompleted) {
154 handle.dispose()
155 parentHandle = NonDisposableHandle // release it just in case, to aid GC
156 }
157 }
158
159 // ------------ state query ------------
160 /**
161 * Returns current state of this job.
162 * If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
163 * and should be [unboxed][unboxState] before returning to user code.
164 */
165 internal val state: Any? get() {
166 _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
167 if (state !is OpDescriptor) return state
168 state.perform(this)
169 }
170 }
171
172 /**
173 * @suppress **This is unstable API and it is subject to change.**
174 */
175 private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
176 while (true) {
177 block(state)
178 }
179 }
180
181 public override val isActive: Boolean get() {
182 val state = this.state
183 return state is Incomplete && state.isActive
184 }
185
186 public final override val isCompleted: Boolean get() = state !is Incomplete
187
188 public final override val isCancelled: Boolean get() {
189 val state = this.state
190 return state is CompletedExceptionally || (state is Finishing && state.isCancelling)
191 }
192
193 // ------------ state update ------------
194
195 // Finalizes Finishing -> Completed (terminal state) transition.
196 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
197 // Returns final state that was created and updated to
198 private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
199 /*
200 * Note: proposed state can be Incomplete, e.g.
201 * async {
202 * something.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
203 * }
204 */
205 assert { this.state === state } // consistency check -- it cannot change
206 assert { !state.isSealed } // consistency check -- cannot be sealed yet
207 assert { state.isCompleting } // consistency check -- must be marked as completing
208 val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
209 // Create the final exception and seal the state so that no more exceptions can be added
210 var wasCancelling = false // KLUDGE: we cannot have contract for our own expect fun synchronized
211 val finalException = synchronized(state) {
212 wasCancelling = state.isCancelling
213 val exceptions = state.sealLocked(proposedException)
214 val finalCause = getFinalRootCause(state, exceptions)
215 if (finalCause != null) addSuppressedExceptions(finalCause, exceptions)
216 finalCause
217 }
218 // Create the final state object
219 val finalState = when {
220 // was not cancelled (no exception) -> use proposed update value
221 finalException == null -> proposedUpdate
222 // small optimization when we can used proposeUpdate object as is on cancellation
223 finalException === proposedException -> proposedUpdate
224 // cancelled job final state
225 else -> CompletedExceptionally(finalException)
226 }
227 // Now handle the final exception
228 if (finalException != null) {
229 val handled = cancelParent(finalException) || handleJobException(finalException)
230 if (handled) (finalState as CompletedExceptionally).makeHandled()
231 }
232 // Process state updates for the final state before the state of the Job is actually set to the final state
233 // to avoid races where outside observer may see the job in the final state, yet exception is not handled yet.
234 if (!wasCancelling) onCancelling(finalException)
235 onCompletionInternal(finalState)
236 // Then CAS to completed state -> it must succeed
237 val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete())
238 assert { casSuccess }
239 // And process all post-completion actions
240 completeStateFinalization(state, finalState)
241 return finalState
242 }
243
244 private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
245 // A case of no exceptions
246 if (exceptions.isEmpty()) {
247 // materialize cancellation exception if it was not materialized yet
248 if (state.isCancelling) return defaultCancellationException()
249 return null
250 }
251 /*
252 * 1) If we have non-CE, use it as root cause
253 * 2) If our original cause was TCE, use *non-original* TCE because of the special nature of TCE
254 * * It is a CE, so it's not reported by children
255 * * The first instance (cancellation cause) is created by timeout coroutine and has no meaningful stacktrace
256 * * The potential second instance is thrown by withTimeout lexical block itself, then it has recovered stacktrace
257 * 3) Just return the very first CE
258 */
259 val firstNonCancellation = exceptions.firstOrNull { it !is CancellationException }
260 if (firstNonCancellation != null) return firstNonCancellation
261 val first = exceptions[0]
262 if (first is TimeoutCancellationException) {
263 val detailedTimeoutException = exceptions.firstOrNull { it !== first && it is TimeoutCancellationException }
264 if (detailedTimeoutException != null) return detailedTimeoutException
265 }
266 return first
267 }
268
269 private fun addSuppressedExceptions(rootCause: Throwable, exceptions: List<Throwable>) {
270 if (exceptions.size <= 1) return // nothing more to do here
271 val seenExceptions = identitySet<Throwable>(exceptions.size)
272 /*
273 * Note that root cause may be a recovered exception as well.
274 * To avoid cycles we unwrap the root cause and check for self-suppression against unwrapped cause,
275 * but add suppressed exceptions to the recovered root cause (as it is our final exception)
276 */
277 val unwrappedCause = unwrap(rootCause)
278 for (exception in exceptions) {
279 val unwrapped = unwrap(exception)
280 if (unwrapped !== rootCause && unwrapped !== unwrappedCause &&
281 unwrapped !is CancellationException && seenExceptions.add(unwrapped)) {
282 rootCause.addSuppressedThrowable(unwrapped)
283 }
284 }
285 }
286
287 // fast-path method to finalize normally completed coroutines without children
288 // returns true if complete, and afterCompletion(update) shall be called
289 private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
290 assert { state is Empty || state is JobNode<*> } // only simple state without lists where children can concurrently add
291 assert { update !is CompletedExceptionally } // only for normal completion
292 if (!_state.compareAndSet(state, update.boxIncomplete())) return false
293 onCancelling(null) // simple state is not a failure
294 onCompletionInternal(update)
295 completeStateFinalization(state, update)
296 return true
297 }
298
299 // suppressed == true when any exceptions were suppressed while building the final completion cause
300 private fun completeStateFinalization(state: Incomplete, update: Any?) {
301 /*
302 * Now the job in THE FINAL state. We need to properly handle the resulting state.
303 * Order of various invocations here is important.
304 *
305 * 1) Unregister from parent job.
306 */
307 parentHandle?.let {
308 it.dispose() // volatile read parentHandle _after_ state was updated
309 parentHandle = NonDisposableHandle // release it just in case, to aid GC
310 }
311 val cause = (update as? CompletedExceptionally)?.cause
312 /*
313 * 2) Invoke completion handlers: .join(), callbacks etc.
314 * It's important to invoke them only AFTER exception handling and everything else, see #208
315 */
316 if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
317 try {
318 state.invoke(cause)
319 } catch (ex: Throwable) {
320 handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
321 }
322 } else {
323 state.list?.notifyCompletion(cause)
324 }
325 }
326
327 private fun notifyCancelling(list: NodeList, cause: Throwable) {
328 // first cancel our own children
329 onCancelling(cause)
330 notifyHandlers<JobCancellingNode<*>>(list, cause)
331 // then cancel parent
332 cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
333 }
334
335 /**
336 * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
337 * Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
338 *
339 * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
340 * may leak to the [CoroutineExceptionHandler].
341 */
342 private fun cancelParent(cause: Throwable): Boolean {
343 // Is scoped coroutine -- don't propagate, will be rethrown
344 if (isScopedCoroutine) return true
345
346 /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
347 * This allow parent to cancel its children (normally) without being cancelled itself, unless
348 * child crashes and produce some other exception during its completion.
349 */
350 val isCancellation = cause is CancellationException
351 val parent = parentHandle
352 // No parent -- ignore CE, report other exceptions.
353 if (parent === null || parent === NonDisposableHandle) {
354 return isCancellation
355 }
356
357 // Notify parent but don't forget to check cancellation
358 return parent.childCancelled(cause) || isCancellation
359 }
360
361 private fun NodeList.notifyCompletion(cause: Throwable?) =
362 notifyHandlers<JobNode<*>>(this, cause)
363
364 private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
365 var exception: Throwable? = null
366 list.forEach<T> { node ->
367 try {
368 node.invoke(cause)
369 } catch (ex: Throwable) {
370 exception?.apply { addSuppressedThrowable(ex) } ?: run {
371 exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
372 }
373 }
374 }
375 exception?.let { handleOnCompletionException(it) }
376 }
377
378 public final override fun start(): Boolean {
379 loopOnState { state ->
380 when (startInternal(state)) {
381 FALSE -> return false
382 TRUE -> return true
383 }
384 }
385 }
386
387 // returns: RETRY/FALSE/TRUE:
388 // FALSE when not new,
389 // TRUE when started
390 // RETRY when need to retry
391 private fun startInternal(state: Any?): Int {
392 when (state) {
393 is Empty -> { // EMPTY_X state -- no completion handlers
394 if (state.isActive) return FALSE // already active
395 if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
396 onStartInternal()
397 return TRUE
398 }
399 is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
400 if (!_state.compareAndSet(state, state.list)) return RETRY
401 onStartInternal()
402 return TRUE
403 }
404 else -> return FALSE // not a new state
405 }
406 }
407
408 /**
409 * Override to provide the actual [start] action.
410 * This function is invoked exactly once when non-active coroutine is [started][start].
411 */
412 internal open fun onStartInternal() {}
413
414 public final override fun getCancellationException(): CancellationException =
415 when (val state = this.state) {
416 is Finishing -> state.rootCause?.toCancellationException("$classSimpleName is cancelling")
417 ?: error("Job is still new or active: $this")
418 is Incomplete -> error("Job is still new or active: $this")
419 is CompletedExceptionally -> state.cause.toCancellationException()
420 else -> JobCancellationException("$classSimpleName has completed normally", null, this)
421 }
422
423 protected fun Throwable.toCancellationException(message: String? = null): CancellationException =
424 this as? CancellationException ?: defaultCancellationException(message, this)
425
426 /**
427 * Returns the cause that signals the completion of this job -- it returns the original
428 * [cancel] cause, [CancellationException] or **`null` if this job had completed normally**.
429 * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
430 * is being cancelled yet.
431 */
432 protected val completionCause: Throwable?
433 get() = when (val state = state) {
434 is Finishing -> state.rootCause
435 ?: error("Job is still new or active: $this")
436 is Incomplete -> error("Job is still new or active: $this")
437 is CompletedExceptionally -> state.cause
438 else -> null
439 }
440
441 /**
442 * Returns `true` when [completionCause] exception was handled by parent coroutine.
443 */
444 protected val completionCauseHandled: Boolean
445 get() = state.let { it is CompletedExceptionally && it.handled }
446
447 @Suppress("OverridingDeprecatedMember")
448 public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
449 invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler)
450
451 public final override fun invokeOnCompletion(
452 onCancelling: Boolean,
453 invokeImmediately: Boolean,
454 handler: CompletionHandler
455 ): DisposableHandle {
456 var nodeCache: JobNode<*>? = null
457 loopOnState { state ->
458 when (state) {
459 is Empty -> { // EMPTY_X state -- no completion handlers
460 if (state.isActive) {
461 // try move to SINGLE state
462 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
463 if (_state.compareAndSet(state, node)) return node
464 } else
465 promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
466 }
467 is Incomplete -> {
468 val list = state.list
469 if (list == null) { // SINGLE/SINGLE+
470 promoteSingleToNodeList(state as JobNode<*>)
471 } else {
472 var rootCause: Throwable? = null
473 var handle: DisposableHandle = NonDisposableHandle
474 if (onCancelling && state is Finishing) {
475 synchronized(state) {
476 // check if we are installing cancellation handler on job that is being cancelled
477 rootCause = state.rootCause // != null if cancelling job
478 // We add node to the list in two cases --- either the job is not being cancelled
479 // or we are adding a child to a coroutine that is not completing yet
480 if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
481 // Note: add node the list while holding lock on state (make sure it cannot change)
482 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
483 if (!addLastAtomic(state, list, node)) return@loopOnState // retry
484 // just return node if we don't have to invoke handler (not cancelling yet)
485 if (rootCause == null) return node
486 // otherwise handler is invoked immediately out of the synchronized section & handle returned
487 handle = node
488 }
489 }
490 }
491 if (rootCause != null) {
492 // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
493 if (invokeImmediately) handler.invokeIt(rootCause)
494 return handle
495 } else {
496 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
497 if (addLastAtomic(state, list, node)) return node
498 }
499 }
500 }
501 else -> { // is complete
502 // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
503 // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
504 if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
505 return NonDisposableHandle
506 }
507 }
508 }
509 }
510
511 private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
512 return if (onCancelling)
513 (handler as? JobCancellingNode<*>)?.also { assert { it.job === this } }
514 ?: InvokeOnCancelling(this, handler)
515 else
516 (handler as? JobNode<*>)?.also { assert { it.job === this && it !is JobCancellingNode } }
517 ?: InvokeOnCompletion(this, handler)
518 }
519
520 private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) =
521 list.addLastIf(node) { this.state === expect }
522
523 private fun promoteEmptyToNodeList(state: Empty) {
524 // try to promote it to LIST state with the corresponding state
525 val list = NodeList()
526 val update = if (state.isActive) list else InactiveNodeList(list)
527 _state.compareAndSet(state, update)
528 }
529
530 private fun promoteSingleToNodeList(state: JobNode<*>) {
531 // try to promote it to list (SINGLE+ state)
532 state.addOneIfEmpty(NodeList())
533 // it must be in SINGLE+ state or state has changed (node could have need removed from state)
534 val list = state.nextNode // either our NodeList or somebody else won the race, updated state
535 // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
536 _state.compareAndSet(state, list)
537 }
538
539 public final override suspend fun join() {
540 if (!joinInternal()) { // fast-path no wait
541 coroutineContext.checkCompletion()
542 return // do not suspend
543 }
544 return joinSuspend() // slow-path wait
545 }
546
547 private fun joinInternal(): Boolean {
548 loopOnState { state ->
549 if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
550 if (startInternal(state) >= 0) return true // wait unless need to retry
551 }
552 }
553
554 private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
555 // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
556 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
557 }
558
559 public final override val onJoin: SelectClause0
560 get() = this
561
562 // registerSelectJoin
563 public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
564 // fast-path -- check state and select/return if needed
565 loopOnState { state ->
566 if (select.isSelected) return
567 if (state !is Incomplete) {
568 // already complete -- select result
569 if (select.trySelect()) {
570 block.startCoroutineUnintercepted(select.completion)
571 }
572 return
573 }
574 if (startInternal(state) == 0) {
575 // slow-path -- register waiter for completion
576 select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler))
577 return
578 }
579 }
580 }
581
582 /**
583 * @suppress **This is unstable API and it is subject to change.**
584 */
585 internal fun removeNode(node: JobNode<*>) {
586 // remove logic depends on the state of the job
587 loopOnState { state ->
588 when (state) {
589 is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
590 if (state !== node) return // a different job node --> we were already removed
591 // try remove and revert back to empty state
592 if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
593 }
594 is Incomplete -> { // may have a list of completion handlers
595 // remove node from the list if there is a list
596 if (state.list != null) node.remove()
597 return
598 }
599 else -> return // it is complete and does not have any completion handlers
600 }
601 }
602 }
603
604 /**
605 * Returns `true` for job that do not have "body block" to complete and should immediately go into
606 * completing state and start waiting for children.
607 *
608 * @suppress **This is unstable API and it is subject to change.**
609 */
610 internal open val onCancelComplete: Boolean get() = false
611
612 // external cancel with cause, never invoked implicitly from internal machinery
613 public override fun cancel(cause: CancellationException?) {
614 cancelInternal(cause ?: defaultCancellationException())
615 }
616
617 protected open fun cancellationExceptionMessage(): String = "Job was cancelled"
618
619 // HIDDEN in Job interface. Invoked only by legacy compiled code.
620 // external cancel with (optional) cause, never invoked implicitly from internal machinery
621 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Added since 1.2.0 for binary compatibility with versions <= 1.1.x")
622 public override fun cancel(cause: Throwable?): Boolean {
623 cancelInternal(cause?.toCancellationException() ?: defaultCancellationException())
624 return true
625 }
626
627 // It is overridden in channel-linked implementation
628 public open fun cancelInternal(cause: Throwable) {
629 cancelImpl(cause)
630 }
631
632 // Parent is cancelling child
633 public final override fun parentCancelled(parentJob: ParentJob) {
634 cancelImpl(parentJob)
635 }
636
637 /**
638 * Child was cancelled with a cause.
639 * In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
640 * It is overridden in supervisor implementations to completely ignore any child cancellation.
641 * Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
642 *
643 * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
644 * may leak to the [CoroutineExceptionHandler].
645 */
646 public open fun childCancelled(cause: Throwable): Boolean {
647 if (cause is CancellationException) return true
648 return cancelImpl(cause) && handlesException
649 }
650
651 /**
652 * Makes this [Job] cancelled with a specified [cause].
653 * It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
654 */
655 public fun cancelCoroutine(cause: Throwable?): Boolean = cancelImpl(cause)
656
657 // cause is Throwable or ParentJob when cancelChild was invoked
658 // returns true is exception was handled, false otherwise
659 internal fun cancelImpl(cause: Any?): Boolean {
660 var finalState: Any? = COMPLETING_ALREADY
661 if (onCancelComplete) {
662 // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
663 // completing and had recorded exception
664 finalState = cancelMakeCompleting(cause)
665 if (finalState === COMPLETING_WAITING_CHILDREN) return true
666 }
667 if (finalState === COMPLETING_ALREADY) {
668 finalState = makeCancelling(cause)
669 }
670 return when {
671 finalState === COMPLETING_ALREADY -> true
672 finalState === COMPLETING_WAITING_CHILDREN -> true
673 finalState === TOO_LATE_TO_CANCEL -> false
674 else -> {
675 afterCompletion(finalState)
676 true
677 }
678 }
679 }
680
681 // cause is Throwable or ParentJob when cancelChild was invoked
682 // It contains a loop and never returns COMPLETING_RETRY, can return
683 // COMPLETING_ALREADY -- if already completed/completing
684 // COMPLETING_WAITING_CHILDREN -- if started waiting for children
685 // final state -- when completed, for call to afterCompletion
686 private fun cancelMakeCompleting(cause: Any?): Any? {
687 loopOnState { state ->
688 if (state !is Incomplete || state is Finishing && state.isCompleting) {
689 // already completed/completing, do not even create exception to propose update
690 return COMPLETING_ALREADY
691 }
692 val proposedUpdate = CompletedExceptionally(createCauseException(cause))
693 val finalState = tryMakeCompleting(state, proposedUpdate)
694 if (finalState !== COMPLETING_RETRY) return finalState
695 }
696 }
697
698 @Suppress("NOTHING_TO_INLINE") // Save a stack frame
699 internal inline fun defaultCancellationException(message: String? = null, cause: Throwable? = null) =
700 JobCancellationException(message ?: cancellationExceptionMessage(), cause, this)
701
702 override fun getChildJobCancellationCause(): CancellationException {
703 // determine root cancellation cause of this job (why is it cancelling its children?)
704 val state = this.state
705 val rootCause = when (state) {
706 is Finishing -> state.rootCause
707 is CompletedExceptionally -> state.cause
708 is Incomplete -> error("Cannot be cancelling child in this state: $state")
709 else -> null // create exception with the below code on normal completion
710 }
711 return (rootCause as? CancellationException) ?: JobCancellationException("Parent job is ${stateString(state)}", rootCause, this)
712 }
713
714 // cause is Throwable or ParentJob when cancelChild was invoked
715 private fun createCauseException(cause: Any?): Throwable = when (cause) {
716 is Throwable? -> cause ?: defaultCancellationException()
717 else -> (cause as ParentJob).getChildJobCancellationCause()
718 }
719
720 // transitions to Cancelling state
721 // cause is Throwable or ParentJob when cancelChild was invoked
722 // It contains a loop and never returns COMPLETING_RETRY, can return
723 // COMPLETING_ALREADY -- if already completing or successfully made cancelling, added exception
724 // COMPLETING_WAITING_CHILDREN -- if started waiting for children, added exception
725 // TOO_LATE_TO_CANCEL -- too late to cancel, did not add exception
726 // final state -- when completed, for call to afterCompletion
727 private fun makeCancelling(cause: Any?): Any? {
728 var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
729 loopOnState { state ->
730 when (state) {
731 is Finishing -> { // already finishing -- collect exceptions
732 val notifyRootCause = synchronized(state) {
733 if (state.isSealed) return TOO_LATE_TO_CANCEL // already sealed -- cannot add exception nor mark cancelled
734 // add exception, do nothing is parent is cancelling child that is already being cancelled
735 val wasCancelling = state.isCancelling // will notify if was not cancelling
736 // Materialize missing exception if it is the first exception (otherwise -- don't)
737 if (cause != null || !wasCancelling) {
738 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
739 state.addExceptionLocked(causeException)
740 }
741 // take cause for notification if was not in cancelling state before
742 state.rootCause.takeIf { !wasCancelling }
743 }
744 notifyRootCause?.let { notifyCancelling(state.list, it) }
745 return COMPLETING_ALREADY
746 }
747 is Incomplete -> {
748 // Not yet finishing -- try to make it cancelling
749 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
750 if (state.isActive) {
751 // active state becomes cancelling
752 if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY
753 } else {
754 // non active state starts completing
755 val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException))
756 when {
757 finalState === COMPLETING_ALREADY -> error("Cannot happen in $state")
758 finalState === COMPLETING_RETRY -> return@loopOnState
759 else -> return finalState
760 }
761 }
762 }
763 else -> return TOO_LATE_TO_CANCEL // already complete
764 }
765 }
766 }
767
768 // Performs promotion of incomplete coroutine state to NodeList for the purpose of
769 // converting coroutine state to Cancelling, returns null when need to retry
770 private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:
771 when (state) {
772 is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state
773 is JobNode<*> -> {
774 // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
775 // correctly capture a reference to it
776 promoteSingleToNodeList(state)
777 null // retry
778 }
779 else -> error("State should have list: $state")
780 }
781
782 // try make new Cancelling state on the condition that we're still in the expected state
783 private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
784 assert { state !is Finishing } // only for non-finishing states
785 assert { state.isActive } // only for active states
786 // get state's list or else promote to list to correctly operate on child lists
787 val list = getOrPromoteCancellingList(state) ?: return false
788 // Create cancelling state (with rootCause!)
789 val cancelling = Finishing(list, false, rootCause)
790 if (!_state.compareAndSet(state, cancelling)) return false
791 // Notify listeners
792 notifyCancelling(list, rootCause)
793 return true
794 }
795
796 /**
797 * Completes this job. Used by [CompletableDeferred.complete] (and exceptionally)
798 * and by [JobImpl.cancel]. It returns `false` on repeated invocation
799 * (when this job is already completing).
800 */
801 internal fun makeCompleting(proposedUpdate: Any?): Boolean {
802 loopOnState { state ->
803 val finalState = tryMakeCompleting(state, proposedUpdate)
804 when {
805 finalState === COMPLETING_ALREADY -> return false
806 finalState === COMPLETING_WAITING_CHILDREN -> return true
807 finalState === COMPLETING_RETRY -> return@loopOnState
808 else -> {
809 afterCompletion(finalState)
810 return true
811 }
812 }
813 }
814 }
815
816 /**
817 * Completes this job. Used by [AbstractCoroutine.resume].
818 * It throws [IllegalStateException] on repeated invocation (when this job is already completing).
819 * Returns:
820 * * [COMPLETING_WAITING_CHILDREN] if started waiting for children.
821 * * Final state otherwise (caller should do [afterCompletion])
822 */
823 internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
824 loopOnState { state ->
825 val finalState = tryMakeCompleting(state, proposedUpdate)
826 when {
827 finalState === COMPLETING_ALREADY ->
828 throw IllegalStateException(
829 "Job $this is already complete or completing, " +
830 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
831 )
832 finalState === COMPLETING_RETRY -> return@loopOnState
833 else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
834 }
835 }
836 }
837
838 // Returns one of COMPLETING symbols or final state:
839 // COMPLETING_ALREADY -- when already complete or completing
840 // COMPLETING_RETRY -- when need to retry due to interference
841 // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
842 // final state -- when completed, for call to afterCompletion
843 private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
844 if (state !is Incomplete)
845 return COMPLETING_ALREADY
846 /*
847 * FAST PATH -- no children to wait for && simple state (no list) && not cancelling => can complete immediately
848 * Cancellation (failures) always have to go through Finishing state to serialize exception handling.
849 * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
850 * which may miss unhandled exception.
851 */
852 if ((state is Empty || state is JobNode<*>) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
853 if (tryFinalizeSimpleState(state, proposedUpdate)) {
854 // Completed successfully on fast path -- return updated state
855 return proposedUpdate
856 }
857 return COMPLETING_RETRY
858 }
859 // The separate slow-path function to simplify profiling
860 return tryMakeCompletingSlowPath(state, proposedUpdate)
861 }
862
863 // Returns one of COMPLETING symbols or final state:
864 // COMPLETING_ALREADY -- when already complete or completing
865 // COMPLETING_RETRY -- when need to retry due to interference
866 // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
867 // final state -- when completed, for call to afterCompletion
868 private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
869 // get state's list or else promote to list to correctly operate on child lists
870 val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
871 // promote to Finishing state if we are not in it yet
872 // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
873 // atomically transition to finishing & completing state
874 val finishing = state as? Finishing ?: Finishing(list, false, null)
875 // must synchronize updates to finishing state
876 var notifyRootCause: Throwable? = null
877 synchronized(finishing) {
878 // check if this state is already completing
879 if (finishing.isCompleting) return COMPLETING_ALREADY
880 // mark as completing
881 finishing.isCompleting = true
882 // if we need to promote to finishing then atomically do it here.
883 // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
884 // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
885 if (finishing !== state) {
886 if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
887 }
888 // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
889 assert { !finishing.isSealed } // cannot be sealed
890 // add new proposed exception to the finishing state
891 val wasCancelling = finishing.isCancelling
892 (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
893 // If it just becomes cancelling --> must process cancelling notifications
894 notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
895 }
896 // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
897 notifyRootCause?.let { notifyCancelling(list, it) }
898 // now wait for children
899 val child = firstChild(state)
900 if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
901 return COMPLETING_WAITING_CHILDREN
902 // otherwise -- we have not children left (all were already cancelled?)
903 return finalizeFinishingState(finishing, proposedUpdate)
904 }
905
906 private val Any?.exceptionOrNull: Throwable?
907 get() = (this as? CompletedExceptionally)?.cause
908
909 private fun firstChild(state: Incomplete) =
910 state as? ChildHandleNode ?: state.list?.nextChild()
911
912 // return false when there is no more incomplete children to wait
913 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
914 private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
915 val handle = child.childJob.invokeOnCompletion(
916 invokeImmediately = false,
917 handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
918 )
919 if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
920 val nextChild = child.nextChild() ?: return false
921 return tryWaitForChild(state, nextChild, proposedUpdate)
922 }
923
924 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
925 private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
926 assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
927 // figure out if we need to wait for next child
928 val waitChild = lastChild.nextChild()
929 // try wait for next child
930 if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
931 // no more children to wait -- try update state
932 val finalState = finalizeFinishingState(state, proposedUpdate)
933 afterCompletion(finalState)
934 }
935
936 private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? {
937 var cur = this
938 while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
939 while (true) {
940 cur = cur.nextNode
941 if (cur.isRemoved) continue
942 if (cur is ChildHandleNode) return cur
943 if (cur is NodeList) return null // checked all -- no more children
944 }
945 }
946
947 public final override val children: Sequence<Job> get() = sequence {
948 when (val state = this@JobSupport.state) {
949 is ChildHandleNode -> yield(state.childJob)
950 is Incomplete -> state.list?.let { list ->
951 list.forEach<ChildHandleNode> { yield(it.childJob) }
952 }
953 }
954 }
955
956 @Suppress("OverridingDeprecatedMember")
957 public final override fun attachChild(child: ChildJob): ChildHandle {
958 /*
959 * Note: This function attaches a special ChildHandleNode node object. This node object
960 * is handled in a special way on completion on the coroutine (we wait for all of them) and
961 * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
962 * if the job is already cancelling. For cancelling state child is attached under state lock.
963 * It's required to properly wait all children before completion and provide linearizable hierarchy view:
964 * If child is attached when the job is already being cancelled, such child will receive immediate notification on
965 * cancellation, but parent *will* wait for that child before completion and will handle its exception.
966 */
967 return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
968 }
969
970 /**
971 * Override to process any exceptions that were encountered while invoking completion handlers
972 * installed via [invokeOnCompletion].
973 *
974 * @suppress **This is unstable API and it is subject to change.**
975 */
976 internal open fun handleOnCompletionException(exception: Throwable) {
977 throw exception
978 }
979
980 /**
981 * This function is invoked once as soon as this job is being cancelled for any reason or completes,
982 * similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
983 *
984 * The meaning of [cause] parameter:
985 * * Cause is `null` when the job has completed normally.
986 * * Cause is an instance of [CancellationException] when the job was cancelled _normally_.
987 * **It should not be treated as an error**. In particular, it should not be reported to error logs.
988 * * Otherwise, the job had been cancelled or failed with exception.
989 *
990 * The specified [cause] is not the final cancellation cause of this job.
991 * A job may produce other exceptions while it is failing and the final cause might be different.
992 *
993 * @suppress **This is unstable API and it is subject to change.*
994 */
995 protected open fun onCancelling(cause: Throwable?) {}
996
997 /**
998 * Returns `true` for scoped coroutines.
999 * Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
1000 * Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
1001 * Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
1002 */
1003 protected open val isScopedCoroutine: Boolean get() = false
1004
1005 /**
1006 * Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal].
1007 * A valid implementation of this getter should recursively check parent as well before returning `false`.
1008 *
1009 * The only instance of the [Job] that does not handle its exceptions is [JobImpl] and its subclass [SupervisorJobImpl].
1010 * @suppress **This is unstable API and it is subject to change.*
1011 */
1012 internal open val handlesException: Boolean get() = true
1013
1014 /**
1015 * Handles the final job [exception] that was not handled by the parent coroutine.
1016 * Returns `true` if it handles exception (so handling at later stages is not needed).
1017 * It is designed to be overridden by launch-like coroutines
1018 * (`StandaloneCoroutine` and `ActorCoroutine`) that don't have a result type
1019 * that can represent exceptions.
1020 *
1021 * This method is invoked **exactly once** when the final exception of the job is determined
1022 * and before it becomes complete. At the moment of invocation the job and all its children are complete.
1023 */
1024 protected open fun handleJobException(exception: Throwable): Boolean = false
1025
1026 /**
1027 * Override for completion actions that need to update some external object depending on job's state,
1028 * right before all the waiters for coroutine's completion are notified.
1029 *
1030 * @param state the final state.
1031 *
1032 * @suppress **This is unstable API and it is subject to change.**
1033 */
1034 protected open fun onCompletionInternal(state: Any?) {}
1035
1036 /**
1037 * Override for the very last action on job's completion to resume the rest of the code in
1038 * scoped coroutines. It is called when this job is externally completed in an unknown
1039 * context and thus should resume with a default mode.
1040 *
1041 * @suppress **This is unstable API and it is subject to change.**
1042 */
1043 protected open fun afterCompletion(state: Any?) {}
1044
1045 // for nicer debugging
1046 public override fun toString(): String =
1047 "${toDebugString()}@$hexAddress"
1048
1049 @InternalCoroutinesApi
1050 public fun toDebugString(): String = "${nameString()}{${stateString(state)}}"
1051
1052 /**
1053 * @suppress **This is unstable API and it is subject to change.**
1054 */
1055 internal open fun nameString(): String = classSimpleName
1056
1057 private fun stateString(state: Any?): String = when (state) {
1058 is Finishing -> when {
1059 state.isCancelling -> "Cancelling"
1060 state.isCompleting -> "Completing"
1061 else -> "Active"
1062 }
1063 is Incomplete -> if (state.isActive) "Active" else "New"
1064 is CompletedExceptionally -> "Cancelled"
1065 else -> "Completed"
1066 }
1067
1068 // Completing & Cancelling states,
1069 // All updates are guarded by synchronized(this), reads are volatile
1070 @Suppress("UNCHECKED_CAST")
1071 private class Finishing(
1072 override val list: NodeList,
1073 isCompleting: Boolean,
1074 rootCause: Throwable?
1075 ) : SynchronizedObject(), Incomplete {
1076 private val _isCompleting = atomic(isCompleting)
1077 var isCompleting: Boolean
1078 get() = _isCompleting.value
1079 set(value) { _isCompleting.value = value }
1080
1081 private val _rootCause = atomic(rootCause)
1082 var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
1083 get() = _rootCause.value
1084 set(value) { _rootCause.value = value }
1085
1086 private val _exceptionsHolder = atomic<Any?>(null)
1087 private var exceptionsHolder: Any? // Contains null | Throwable | ArrayList | SEALED
1088 get() = _exceptionsHolder.value
1089 set(value) { _exceptionsHolder.value = value }
1090
1091 // Note: cannot be modified when sealed
1092 val isSealed: Boolean get() = exceptionsHolder === SEALED
1093 val isCancelling: Boolean get() = rootCause != null
1094 override val isActive: Boolean get() = rootCause == null // !isCancelling
1095
1096 // Seals current state and returns list of exceptions
1097 // guarded by `synchronized(this)`
1098 fun sealLocked(proposedException: Throwable?): List<Throwable> {
1099 val list = when(val eh = exceptionsHolder) { // volatile read
1100 null -> allocateList()
1101 is Throwable -> allocateList().also { it.add(eh) }
1102 is ArrayList<*> -> eh as ArrayList<Throwable>
1103 else -> error("State is $eh") // already sealed -- cannot happen
1104 }
1105 val rootCause = this.rootCause // volatile read
1106 rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
1107 if (proposedException != null && proposedException != rootCause) list.add(proposedException)
1108 exceptionsHolder = SEALED
1109 return list
1110 }
1111
1112 // guarded by `synchronized(this)`
1113 fun addExceptionLocked(exception: Throwable) {
1114 val rootCause = this.rootCause // volatile read
1115 if (rootCause == null) {
1116 this.rootCause = exception
1117 return
1118 }
1119 if (exception === rootCause) return // nothing to do
1120 when (val eh = exceptionsHolder) { // volatile read
1121 null -> exceptionsHolder = exception
1122 is Throwable -> {
1123 if (exception === eh) return // nothing to do
1124 exceptionsHolder = allocateList().apply {
1125 add(eh)
1126 add(exception)
1127
1128 }
1129 }
1130 is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
1131 else -> error("State is $eh") // already sealed -- cannot happen
1132 }
1133 }
1134
1135 private fun allocateList() = ArrayList<Throwable>(4)
1136
1137 override fun toString(): String =
1138 "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$exceptionsHolder, list=$list]"
1139 }
1140
1141 private val Incomplete.isCancelling: Boolean
1142 get() = this is Finishing && isCancelling
1143
1144 // Used by parent that is waiting for child completion
1145 private class ChildCompletion(
1146 private val parent: JobSupport,
1147 private val state: Finishing,
1148 private val child: ChildHandleNode,
1149 private val proposedUpdate: Any?
1150 ) : JobNode<Job>(child.childJob) {
1151 override fun invoke(cause: Throwable?) {
1152 parent.continueCompleting(state, child, proposedUpdate)
1153 }
1154 override fun toString(): String =
1155 "ChildCompletion[$child, $proposedUpdate]"
1156 }
1157
1158 private class AwaitContinuation<T>(
1159 delegate: Continuation<T>,
1160 private val job: JobSupport
1161 ) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
1162 override fun getContinuationCancellationCause(parent: Job): Throwable {
1163 val state = job.state
1164 /*
1165 * When the job we are waiting for had already completely completed exceptionally or
1166 * is failing, we shall use its root/completion cause for await's result.
1167 */
1168 if (state is Finishing) state.rootCause?.let { return it }
1169 if (state is CompletedExceptionally) return state.cause
1170 return parent.getCancellationException()
1171 }
1172
1173 protected override fun nameString(): String =
1174 "AwaitContinuation"
1175 }
1176
1177 /*
1178 * =================================================================================================
1179 * This is ready-to-use implementation for Deferred interface.
1180 * However, it is not type-safe. Conceptually it just exposes the value of the underlying
1181 * completed state as `Any?`
1182 * =================================================================================================
1183 */
1184
1185 public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
1186
1187 public fun getCompletionExceptionOrNull(): Throwable? {
1188 val state = this.state
1189 check(state !is Incomplete) { "This job has not completed yet" }
1190 return state.exceptionOrNull
1191 }
1192
1193 /**
1194 * @suppress **This is unstable API and it is subject to change.**
1195 */
1196 internal fun getCompletedInternal(): Any? {
1197 val state = this.state
1198 check(state !is Incomplete) { "This job has not completed yet" }
1199 if (state is CompletedExceptionally) throw state.cause
1200 return state.unboxState()
1201 }
1202
1203 /**
1204 * @suppress **This is unstable API and it is subject to change.**
1205 */
1206 internal suspend fun awaitInternal(): Any? {
1207 // fast-path -- check state (avoid extra object creation)
1208 while (true) { // lock-free loop on state
1209 val state = this.state
1210 if (state !is Incomplete) {
1211 // already complete -- just return result
1212 if (state is CompletedExceptionally) { // Slow path to recover stacktrace
1213 recoverAndThrow(state.cause)
1214 }
1215 return state.unboxState()
1216
1217 }
1218 if (startInternal(state) >= 0) break // break unless needs to retry
1219 }
1220 return awaitSuspend() // slow-path
1221 }
1222
1223 private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
1224 /*
1225 * Custom code here, so that parent coroutine that is using await
1226 * on its child deferred (async) coroutine would throw the exception that this child had
1227 * thrown and not a JobCancellationException.
1228 */
1229 val cont = AwaitContinuation(uCont.intercepted(), this)
1230 cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler))
1231 cont.getResult()
1232 }
1233
1234 /**
1235 * @suppress **This is unstable API and it is subject to change.**
1236 */
1237 // registerSelectAwaitInternal
1238 @Suppress("UNCHECKED_CAST")
1239 internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) {
1240 // fast-path -- check state and select/return if needed
1241 loopOnState { state ->
1242 if (select.isSelected) return
1243 if (state !is Incomplete) {
1244 // already complete -- select result
1245 if (select.trySelect()) {
1246 if (state is CompletedExceptionally) {
1247 select.resumeSelectWithException(state.cause)
1248 }
1249 else {
1250 block.startCoroutineUnintercepted(state.unboxState() as T, select.completion)
1251 }
1252 }
1253 return
1254 }
1255 if (startInternal(state) == 0) {
1256 // slow-path -- register waiter for completion
1257 select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler))
1258 return
1259 }
1260 }
1261 }
1262
1263 /**
1264 * @suppress **This is unstable API and it is subject to change.**
1265 */
1266 @Suppress("UNCHECKED_CAST")
1267 internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
1268 val state = this.state
1269 // Note: await is non-atomic (can be cancelled while dispatched)
1270 if (state is CompletedExceptionally)
1271 select.resumeSelectWithException(state.cause)
1272 else
1273 block.startCoroutineCancellable(state.unboxState() as T, select.completion)
1274 }
1275 }
1276
1277 /*
1278 * Class to represent object as the final state of the Job
1279 */
1280 private class IncompleteStateBox(@JvmField val state: Incomplete)
boxIncompletenull1281 internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
1282 internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
1283
1284 // --------------- helper classes & constants for job implementation
1285
1286 @SharedImmutable
1287 private val COMPLETING_ALREADY = Symbol("COMPLETING_ALREADY")
1288 @JvmField
1289 @SharedImmutable
1290 internal val COMPLETING_WAITING_CHILDREN = Symbol("COMPLETING_WAITING_CHILDREN")
1291 @SharedImmutable
1292 private val COMPLETING_RETRY = Symbol("COMPLETING_RETRY")
1293 @SharedImmutable
1294 private val TOO_LATE_TO_CANCEL = Symbol("TOO_LATE_TO_CANCEL")
1295
1296 private const val RETRY = -1
1297 private const val FALSE = 0
1298 private const val TRUE = 1
1299
1300 @SharedImmutable
1301 private val SEALED = Symbol("SEALED")
1302 @SharedImmutable
1303 private val EMPTY_NEW = Empty(false)
1304 @SharedImmutable
1305 private val EMPTY_ACTIVE = Empty(true)
1306
1307 private class Empty(override val isActive: Boolean) : Incomplete {
1308 override val list: NodeList? get() = null
1309 override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
1310 }
1311
1312 internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
1313 init { initParentJobInternal(parent) }
1314 override val onCancelComplete get() = true
1315 /*
1316 * Check whether parent is able to handle exceptions as well.
1317 * With this check, an exception in that pattern will be handled once:
1318 * ```
1319 * launch {
1320 * val child = Job(coroutineContext[Job])
1321 * launch(child) { throw ... }
1322 * }
1323 * ```
1324 */
1325 override val handlesException: Boolean = handlesException()
completenull1326 override fun complete() = makeCompleting(Unit)
1327 override fun completeExceptionally(exception: Throwable): Boolean =
1328 makeCompleting(CompletedExceptionally(exception))
1329
1330 @JsName("handlesExceptionF")
1331 private fun handlesException(): Boolean {
1332 var parentJob = (parentHandle as? ChildHandleNode)?.job ?: return false
1333 while (true) {
1334 if (parentJob.handlesException) return true
1335 parentJob = (parentJob.parentHandle as? ChildHandleNode)?.job ?: return false
1336 }
1337 }
1338 }
1339
1340 // -------- invokeOnCompletion nodes
1341
1342 internal interface Incomplete {
1343 val isActive: Boolean
1344 val list: NodeList? // is null only for Empty and JobNode incomplete state objects
1345 }
1346
1347 internal abstract class JobNode<out J : Job>(
1348 @JvmField val job: J
1349 ) : CompletionHandlerBase(), DisposableHandle, Incomplete {
1350 override val isActive: Boolean get() = true
1351 override val list: NodeList? get() = null
disposenull1352 override fun dispose() = (job as JobSupport).removeNode(this)
1353 }
1354
1355 internal class NodeList : LockFreeLinkedListHead(), Incomplete {
1356 override val isActive: Boolean get() = true
1357 override val list: NodeList get() = this
1358
1359 fun getString(state: String) = buildString {
1360 append("List{")
1361 append(state)
1362 append("}[")
1363 var first = true
1364 this@NodeList.forEach<JobNode<*>> { node ->
1365 if (first) first = false else append(", ")
1366 append(node)
1367 }
1368 append("]")
1369 }
1370
1371 override fun toString(): String =
1372 if (DEBUG) getString("Active") else super.toString()
1373 }
1374
1375 internal class InactiveNodeList(
1376 override val list: NodeList
1377 ) : Incomplete {
1378 override val isActive: Boolean get() = false
toStringnull1379 override fun toString(): String = if (DEBUG) list.getString("New") else super.toString()
1380 }
1381
1382 private class InvokeOnCompletion(
1383 job: Job,
1384 private val handler: CompletionHandler
1385 ) : JobNode<Job>(job) {
1386 override fun invoke(cause: Throwable?) = handler.invoke(cause)
1387 override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]"
1388 }
1389
1390 private class ResumeOnCompletion(
1391 job: Job,
1392 private val continuation: Continuation<Unit>
1393 ) : JobNode<Job>(job) {
invokenull1394 override fun invoke(cause: Throwable?) = continuation.resume(Unit)
1395 override fun toString() = "ResumeOnCompletion[$continuation]"
1396 }
1397
1398 private class ResumeAwaitOnCompletion<T>(
1399 job: JobSupport,
1400 private val continuation: CancellableContinuationImpl<T>
1401 ) : JobNode<JobSupport>(job) {
1402 override fun invoke(cause: Throwable?) {
1403 val state = job.state
1404 assert { state !is Incomplete }
1405 if (state is CompletedExceptionally) {
1406 // Resume with with the corresponding exception to preserve it
1407 continuation.resumeWithException(state.cause)
1408 } else {
1409 // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
1410 @Suppress("UNCHECKED_CAST")
1411 continuation.resume(state.unboxState() as T)
1412 }
1413 }
1414 override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
1415 }
1416
1417 internal class DisposeOnCompletion(
1418 job: Job,
1419 private val handle: DisposableHandle
1420 ) : JobNode<Job>(job) {
invokenull1421 override fun invoke(cause: Throwable?) = handle.dispose()
1422 override fun toString(): String = "DisposeOnCompletion[$handle]"
1423 }
1424
1425 private class SelectJoinOnCompletion<R>(
1426 job: JobSupport,
1427 private val select: SelectInstance<R>,
1428 private val block: suspend () -> R
1429 ) : JobNode<JobSupport>(job) {
1430 override fun invoke(cause: Throwable?) {
1431 if (select.trySelect())
1432 block.startCoroutineCancellable(select.completion)
1433 }
1434 override fun toString(): String = "SelectJoinOnCompletion[$select]"
1435 }
1436
1437 private class SelectAwaitOnCompletion<T, R>(
1438 job: JobSupport,
1439 private val select: SelectInstance<R>,
1440 private val block: suspend (T) -> R
1441 ) : JobNode<JobSupport>(job) {
invokenull1442 override fun invoke(cause: Throwable?) {
1443 if (select.trySelect())
1444 job.selectAwaitCompletion(select, block)
1445 }
toStringnull1446 override fun toString(): String = "SelectAwaitOnCompletion[$select]"
1447 }
1448
1449 // -------- invokeOnCancellation nodes
1450
1451 /**
1452 * Marker for node that shall be invoked on in _cancelling_ state.
1453 * **Note: may be invoked multiple times.**
1454 */
1455 internal abstract class JobCancellingNode<out J : Job>(job: J) : JobNode<J>(job)
1456
1457 private class InvokeOnCancelling(
1458 job: Job,
1459 private val handler: CompletionHandler
1460 ) : JobCancellingNode<Job>(job) {
1461 // delegate handler shall be invoked at most once, so here is an additional flag
1462 private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
1463 override fun invoke(cause: Throwable?) {
1464 if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
1465 }
1466 override fun toString() = "InvokeOnCancelling[$classSimpleName@$hexAddress]"
1467 }
1468
1469 internal class ChildHandleNode(
1470 parent: JobSupport,
1471 @JvmField val childJob: ChildJob
1472 ) : JobCancellingNode<JobSupport>(parent), ChildHandle {
invokenull1473 override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
1474 override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
1475 override fun toString(): String = "ChildHandle[$childJob]"
1476 }
1477
1478 // Same as ChildHandleNode, but for cancellable continuation
1479 internal class ChildContinuation(
1480 parent: Job,
1481 @JvmField val child: CancellableContinuationImpl<*>
1482 ) : JobCancellingNode<Job>(parent) {
1483 override fun invoke(cause: Throwable?) {
1484 child.parentCancelled(child.getContinuationCancellationCause(job))
1485 }
1486 override fun toString(): String =
1487 "ChildContinuation[$child]"
1488 }
1489
1490