1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4 @file:Suppress("DEPRECATION_ERROR")
5
6 package kotlinx.coroutines
7
8 import kotlinx.atomicfu.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.intrinsics.*
11 import kotlinx.coroutines.selects.*
12 import kotlin.coroutines.*
13 import kotlin.coroutines.intrinsics.*
14 import kotlin.js.*
15 import kotlin.jvm.*
16 import kotlin.native.concurrent.*
17
18 /**
19 * A concrete implementation of [Job]. It is optionally a child to a parent job.
20 *
21 * This is an open class designed for extension by more specific classes that might augment the
22 * state and mare store addition state information for completed jobs, like their result values.
23 *
24 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
25 * @suppress **This is unstable API and it is subject to change.**
26 */
27 @Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
28 public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
29 final override val key: CoroutineContext.Key<*> get() = Job
30
31 /*
32 === Internal states ===
33
34 name state class public state description
35 ------ ------------ ------------ -----------
36 EMPTY_N EmptyNew : New no listeners
37 EMPTY_A EmptyActive : Active no listeners
38 SINGLE JobNode : Active a single listener
39 SINGLE+ JobNode : Active a single listener + NodeList added as its next
40 LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
41 LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
42 COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
43 CANCELLING Finishing : Cancelling -- " --
44 FINAL_C Cancelled : Cancelled Cancelled (final state)
45 FINAL_R <any> : Completed produced some result
46
47 === Transitions ===
48
49 New states Active states Inactive states
50
51 +---------+ +---------+ }
52 | EMPTY_N | ----> | EMPTY_A | ----+ } Empty states
53 +---------+ +---------+ | }
54 | | | ^ | +----------+
55 | | | | +--> | FINAL_* |
56 | | V | | +----------+
57 | | +---------+ | }
58 | | | SINGLE | ----+ } JobNode states
59 | | +---------+ | }
60 | | | | }
61 | | V | }
62 | | +---------+ | }
63 | +-------> | SINGLE+ | ----+ }
64 | +---------+ | }
65 | | |
66 V V |
67 +---------+ +---------+ | }
68 | LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states
69 +---------+ +---------+ | }
70 | | | | |
71 | | +--------+ | |
72 | | | V |
73 | | | +------------+ | +------------+ }
74 | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
75 | | +------------+ +------------+ }
76 | | | ^
77 | | | |
78 +--------+---------+--------------------+
79
80
81 This state machine and its transition matrix are optimized for the common case when a job is created in active
82 state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
83 successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
84 state without going to COMPLETING state)
85
86 Note that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
87
88 ---------- TIMELINE of state changes and notification in Job lifecycle ----------
89
90 | The longest possible chain of events in shown, shorter versions cut-through intermediate states,
91 | while still performing all the notifications in this order.
92
93 + Job object is created
94 ## NEW: state == EMPTY_NEW | is InactiveNodeList
95 + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
96 ~ waits for start
97 >> start / join / await invoked
98 ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
99 + onStart (lazy coroutine is started)
100 ~ active coroutine is working (or scheduled to execution)
101 >> childCancelled / cancelImpl invoked
102 ## CANCELLING: state is Finishing, state.rootCause != null
103 ------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancelling=true) returns NonDisposableHandle
104 ------ new children get immediately cancelled, but are still admitted to the list
105 + onCancelling
106 + notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception)
107 + cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too)
108 ~ waits for completion of coroutine body
109 >> makeCompleting / makeCompletingOnce invoked
110 ## COMPLETING: state is Finishing, state.isCompleting == true
111 ------ new children are not admitted anymore, attachChild returns NonDisposableHandle
112 ~ waits for children
113 >> last child completes
114 - computes the final exception
115 ## SEALED: state is Finishing, state.isSealed == true
116 ------ cancel/childCancelled returns false (cannot handle exceptions anymore)
117 + cancelParent (final exception is communicated to the parent, parent incorporates it)
118 + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
119 ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
120 ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
121 + parentHandle.dispose
122 + notifyCompletion (invoke all completion listeners)
123 + onCompletionInternal / onCompleted / onCancelled
124
125 ---------------------------------------------------------------------------------
126 */
127
128 // Note: use shared objects while we have no listeners
129 private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
130
131 private val _parentHandle = atomic<ChildHandle?>(null)
132 internal var parentHandle: ChildHandle?
133 get() = _parentHandle.value
134 set(value) { _parentHandle.value = value }
135
136 // ------------ initialization ------------
137
138 /**
139 * Initializes parent job.
140 * It shall be invoked at most once after construction after all other initialization.
141 */
142 protected fun initParentJob(parent: Job?) {
143 assert { parentHandle == null }
144 if (parent == null) {
145 parentHandle = NonDisposableHandle
146 return
147 }
148 parent.start() // make sure the parent is started
149 @Suppress("DEPRECATION")
150 val handle = parent.attachChild(this)
151 parentHandle = handle
152 // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
153 if (isCompleted) {
154 handle.dispose()
155 parentHandle = NonDisposableHandle // release it just in case, to aid GC
156 }
157 }
158
159 // ------------ state query ------------
160 /**
161 * Returns current state of this job.
162 * If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
163 * and should be [unboxed][unboxState] before returning to user code.
164 */
165 internal val state: Any? get() {
166 _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
167 if (state !is OpDescriptor) return state
168 state.perform(this)
169 }
170 }
171
172 /**
173 * @suppress **This is unstable API and it is subject to change.**
174 */
175 private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
176 while (true) {
177 block(state)
178 }
179 }
180
181 public override val isActive: Boolean get() {
182 val state = this.state
183 return state is Incomplete && state.isActive
184 }
185
186 public final override val isCompleted: Boolean get() = state !is Incomplete
187
188 public final override val isCancelled: Boolean get() {
189 val state = this.state
190 return state is CompletedExceptionally || (state is Finishing && state.isCancelling)
191 }
192
193 // ------------ state update ------------
194
195 // Finalizes Finishing -> Completed (terminal state) transition.
196 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
197 // Returns final state that was created and updated to
198 private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
199 /*
200 * Note: proposed state can be Incomplete, e.g.
201 * async {
202 * something.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
203 * }
204 */
205 assert { this.state === state } // consistency check -- it cannot change
206 assert { !state.isSealed } // consistency check -- cannot be sealed yet
207 assert { state.isCompleting } // consistency check -- must be marked as completing
208 val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
209 // Create the final exception and seal the state so that no more exceptions can be added
210 var wasCancelling = false // KLUDGE: we cannot have contract for our own expect fun synchronized
211 val finalException = synchronized(state) {
212 wasCancelling = state.isCancelling
213 val exceptions = state.sealLocked(proposedException)
214 val finalCause = getFinalRootCause(state, exceptions)
215 if (finalCause != null) addSuppressedExceptions(finalCause, exceptions)
216 finalCause
217 }
218 // Create the final state object
219 val finalState = when {
220 // was not cancelled (no exception) -> use proposed update value
221 finalException == null -> proposedUpdate
222 // small optimization when we can used proposeUpdate object as is on cancellation
223 finalException === proposedException -> proposedUpdate
224 // cancelled job final state
225 else -> CompletedExceptionally(finalException)
226 }
227 // Now handle the final exception
228 if (finalException != null) {
229 val handled = cancelParent(finalException) || handleJobException(finalException)
230 if (handled) (finalState as CompletedExceptionally).makeHandled()
231 }
232 // Process state updates for the final state before the state of the Job is actually set to the final state
233 // to avoid races where outside observer may see the job in the final state, yet exception is not handled yet.
234 if (!wasCancelling) onCancelling(finalException)
235 onCompletionInternal(finalState)
236 // Then CAS to completed state -> it must succeed
237 val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete())
238 assert { casSuccess }
239 // And process all post-completion actions
240 completeStateFinalization(state, finalState)
241 return finalState
242 }
243
244 private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
245 // A case of no exceptions
246 if (exceptions.isEmpty()) {
247 // materialize cancellation exception if it was not materialized yet
248 if (state.isCancelling) return defaultCancellationException()
249 return null
250 }
251 /*
252 * 1) If we have non-CE, use it as root cause
253 * 2) If our original cause was TCE, use *non-original* TCE because of the special nature of TCE
254 * * It is a CE, so it's not reported by children
255 * * The first instance (cancellation cause) is created by timeout coroutine and has no meaningful stacktrace
256 * * The potential second instance is thrown by withTimeout lexical block itself, then it has recovered stacktrace
257 * 3) Just return the very first CE
258 */
259 val firstNonCancellation = exceptions.firstOrNull { it !is CancellationException }
260 if (firstNonCancellation != null) return firstNonCancellation
261 val first = exceptions[0]
262 if (first is TimeoutCancellationException) {
263 val detailedTimeoutException = exceptions.firstOrNull { it !== first && it is TimeoutCancellationException }
264 if (detailedTimeoutException != null) return detailedTimeoutException
265 }
266 return first
267 }
268
269 private fun addSuppressedExceptions(rootCause: Throwable, exceptions: List<Throwable>) {
270 if (exceptions.size <= 1) return // nothing more to do here
271 val seenExceptions = identitySet<Throwable>(exceptions.size)
272 /*
273 * Note that root cause may be a recovered exception as well.
274 * To avoid cycles we unwrap the root cause and check for self-suppression against unwrapped cause,
275 * but add suppressed exceptions to the recovered root cause (as it is our final exception)
276 */
277 val unwrappedCause = unwrap(rootCause)
278 for (exception in exceptions) {
279 val unwrapped = unwrap(exception)
280 if (unwrapped !== rootCause && unwrapped !== unwrappedCause &&
281 unwrapped !is CancellationException && seenExceptions.add(unwrapped)) {
282 rootCause.addSuppressedThrowable(unwrapped)
283 }
284 }
285 }
286
287 // fast-path method to finalize normally completed coroutines without children
288 // returns true if complete, and afterCompletion(update) shall be called
289 private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
290 assert { state is Empty || state is JobNode } // only simple state without lists where children can concurrently add
291 assert { update !is CompletedExceptionally } // only for normal completion
292 if (!_state.compareAndSet(state, update.boxIncomplete())) return false
293 onCancelling(null) // simple state is not a failure
294 onCompletionInternal(update)
295 completeStateFinalization(state, update)
296 return true
297 }
298
299 // suppressed == true when any exceptions were suppressed while building the final completion cause
300 private fun completeStateFinalization(state: Incomplete, update: Any?) {
301 /*
302 * Now the job in THE FINAL state. We need to properly handle the resulting state.
303 * Order of various invocations here is important.
304 *
305 * 1) Unregister from parent job.
306 */
307 parentHandle?.let {
308 it.dispose() // volatile read parentHandle _after_ state was updated
309 parentHandle = NonDisposableHandle // release it just in case, to aid GC
310 }
311 val cause = (update as? CompletedExceptionally)?.cause
312 /*
313 * 2) Invoke completion handlers: .join(), callbacks etc.
314 * It's important to invoke them only AFTER exception handling and everything else, see #208
315 */
316 if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
317 try {
318 state.invoke(cause)
319 } catch (ex: Throwable) {
320 handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
321 }
322 } else {
323 state.list?.notifyCompletion(cause)
324 }
325 }
326
327 private fun notifyCancelling(list: NodeList, cause: Throwable) {
328 // first cancel our own children
329 onCancelling(cause)
330 notifyHandlers<JobCancellingNode>(list, cause)
331 // then cancel parent
332 cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
333 }
334
335 /**
336 * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
337 * Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
338 *
339 * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
340 * may leak to the [CoroutineExceptionHandler].
341 */
342 private fun cancelParent(cause: Throwable): Boolean {
343 // Is scoped coroutine -- don't propagate, will be rethrown
344 if (isScopedCoroutine) return true
345
346 /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
347 * This allow parent to cancel its children (normally) without being cancelled itself, unless
348 * child crashes and produce some other exception during its completion.
349 */
350 val isCancellation = cause is CancellationException
351 val parent = parentHandle
352 // No parent -- ignore CE, report other exceptions.
353 if (parent === null || parent === NonDisposableHandle) {
354 return isCancellation
355 }
356
357 // Notify parent but don't forget to check cancellation
358 return parent.childCancelled(cause) || isCancellation
359 }
360
361 private fun NodeList.notifyCompletion(cause: Throwable?) =
362 notifyHandlers<JobNode>(this, cause)
363
364 private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
365 var exception: Throwable? = null
366 list.forEach<T> { node ->
367 try {
368 node.invoke(cause)
369 } catch (ex: Throwable) {
370 exception?.apply { addSuppressedThrowable(ex) } ?: run {
371 exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
372 }
373 }
374 }
375 exception?.let { handleOnCompletionException(it) }
376 }
377
378 public final override fun start(): Boolean {
379 loopOnState { state ->
380 when (startInternal(state)) {
381 FALSE -> return false
382 TRUE -> return true
383 }
384 }
385 }
386
387 // returns: RETRY/FALSE/TRUE:
388 // FALSE when not new,
389 // TRUE when started
390 // RETRY when need to retry
391 private fun startInternal(state: Any?): Int {
392 when (state) {
393 is Empty -> { // EMPTY_X state -- no completion handlers
394 if (state.isActive) return FALSE // already active
395 if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
396 onStart()
397 return TRUE
398 }
399 is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
400 if (!_state.compareAndSet(state, state.list)) return RETRY
401 onStart()
402 return TRUE
403 }
404 else -> return FALSE // not a new state
405 }
406 }
407
408 /**
409 * Override to provide the actual [start] action.
410 * This function is invoked exactly once when non-active coroutine is [started][start].
411 */
412 protected open fun onStart() {}
413
414 public final override fun getCancellationException(): CancellationException =
415 when (val state = this.state) {
416 is Finishing -> state.rootCause?.toCancellationException("$classSimpleName is cancelling")
417 ?: error("Job is still new or active: $this")
418 is Incomplete -> error("Job is still new or active: $this")
419 is CompletedExceptionally -> state.cause.toCancellationException()
420 else -> JobCancellationException("$classSimpleName has completed normally", null, this)
421 }
422
423 protected fun Throwable.toCancellationException(message: String? = null): CancellationException =
424 this as? CancellationException ?: defaultCancellationException(message, this)
425
426 /**
427 * Returns the cause that signals the completion of this job -- it returns the original
428 * [cancel] cause, [CancellationException] or **`null` if this job had completed normally**.
429 * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
430 * is being cancelled yet.
431 */
432 protected val completionCause: Throwable?
433 get() = when (val state = state) {
434 is Finishing -> state.rootCause
435 ?: error("Job is still new or active: $this")
436 is Incomplete -> error("Job is still new or active: $this")
437 is CompletedExceptionally -> state.cause
438 else -> null
439 }
440
441 /**
442 * Returns `true` when [completionCause] exception was handled by parent coroutine.
443 */
444 protected val completionCauseHandled: Boolean
445 get() = state.let { it is CompletedExceptionally && it.handled }
446
447 @Suppress("OverridingDeprecatedMember")
448 public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
449 invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler)
450
451 public final override fun invokeOnCompletion(
452 onCancelling: Boolean,
453 invokeImmediately: Boolean,
454 handler: CompletionHandler
455 ): DisposableHandle {
456 // Create node upfront -- for common cases it just initializes JobNode.job field,
457 // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
458 val node: JobNode = makeNode(handler, onCancelling)
459 loopOnState { state ->
460 when (state) {
461 is Empty -> { // EMPTY_X state -- no completion handlers
462 if (state.isActive) {
463 // try move to SINGLE state
464 if (_state.compareAndSet(state, node)) return node
465 } else
466 promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
467 }
468 is Incomplete -> {
469 val list = state.list
470 if (list == null) { // SINGLE/SINGLE+
471 promoteSingleToNodeList(state as JobNode)
472 } else {
473 var rootCause: Throwable? = null
474 var handle: DisposableHandle = NonDisposableHandle
475 if (onCancelling && state is Finishing) {
476 synchronized(state) {
477 // check if we are installing cancellation handler on job that is being cancelled
478 rootCause = state.rootCause // != null if cancelling job
479 // We add node to the list in two cases --- either the job is not being cancelled
480 // or we are adding a child to a coroutine that is not completing yet
481 if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
482 // Note: add node the list while holding lock on state (make sure it cannot change)
483 if (!addLastAtomic(state, list, node)) return@loopOnState // retry
484 // just return node if we don't have to invoke handler (not cancelling yet)
485 if (rootCause == null) return node
486 // otherwise handler is invoked immediately out of the synchronized section & handle returned
487 handle = node
488 }
489 }
490 }
491 if (rootCause != null) {
492 // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
493 if (invokeImmediately) handler.invokeIt(rootCause)
494 return handle
495 } else {
496 if (addLastAtomic(state, list, node)) return node
497 }
498 }
499 }
500 else -> { // is complete
501 // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
502 // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
503 if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
504 return NonDisposableHandle
505 }
506 }
507 }
508 }
509
510 private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode {
511 val node = if (onCancelling) {
512 (handler as? JobCancellingNode)
513 ?: InvokeOnCancelling(handler)
514 } else {
515 (handler as? JobNode)
516 ?.also { assert { it !is JobCancellingNode } }
517 ?: InvokeOnCompletion(handler)
518 }
519 node.job = this
520 return node
521 }
522
523 private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
524 list.addLastIf(node) { this.state === expect }
525
526 private fun promoteEmptyToNodeList(state: Empty) {
527 // try to promote it to LIST state with the corresponding state
528 val list = NodeList()
529 val update = if (state.isActive) list else InactiveNodeList(list)
530 _state.compareAndSet(state, update)
531 }
532
533 private fun promoteSingleToNodeList(state: JobNode) {
534 // try to promote it to list (SINGLE+ state)
535 state.addOneIfEmpty(NodeList())
536 // it must be in SINGLE+ state or state has changed (node could have need removed from state)
537 val list = state.nextNode // either our NodeList or somebody else won the race, updated state
538 // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
539 _state.compareAndSet(state, list)
540 }
541
542 public final override suspend fun join() {
543 if (!joinInternal()) { // fast-path no wait
544 coroutineContext.ensureActive()
545 return // do not suspend
546 }
547 return joinSuspend() // slow-path wait
548 }
549
550 private fun joinInternal(): Boolean {
551 loopOnState { state ->
552 if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
553 if (startInternal(state) >= 0) return true // wait unless need to retry
554 }
555 }
556
557 private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
558 // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
559 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont).asHandler))
560 }
561
562 public final override val onJoin: SelectClause0
563 get() = this
564
565 // registerSelectJoin
566 public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
567 // fast-path -- check state and select/return if needed
568 loopOnState { state ->
569 if (select.isSelected) return
570 if (state !is Incomplete) {
571 // already complete -- select result
572 if (select.trySelect()) {
573 block.startCoroutineUnintercepted(select.completion)
574 }
575 return
576 }
577 if (startInternal(state) == 0) {
578 // slow-path -- register waiter for completion
579 select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(select, block).asHandler))
580 return
581 }
582 }
583 }
584
585 /**
586 * @suppress **This is unstable API and it is subject to change.**
587 */
588 internal fun removeNode(node: JobNode) {
589 // remove logic depends on the state of the job
590 loopOnState { state ->
591 when (state) {
592 is JobNode -> { // SINGE/SINGLE+ state -- one completion handler
593 if (state !== node) return // a different job node --> we were already removed
594 // try remove and revert back to empty state
595 if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
596 }
597 is Incomplete -> { // may have a list of completion handlers
598 // remove node from the list if there is a list
599 if (state.list != null) node.remove()
600 return
601 }
602 else -> return // it is complete and does not have any completion handlers
603 }
604 }
605 }
606
607 /**
608 * Returns `true` for job that do not have "body block" to complete and should immediately go into
609 * completing state and start waiting for children.
610 *
611 * @suppress **This is unstable API and it is subject to change.**
612 */
613 internal open val onCancelComplete: Boolean get() = false
614
615 // external cancel with cause, never invoked implicitly from internal machinery
616 public override fun cancel(cause: CancellationException?) {
617 cancelInternal(cause ?: defaultCancellationException())
618 }
619
620 protected open fun cancellationExceptionMessage(): String = "Job was cancelled"
621
622 // HIDDEN in Job interface. Invoked only by legacy compiled code.
623 // external cancel with (optional) cause, never invoked implicitly from internal machinery
624 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Added since 1.2.0 for binary compatibility with versions <= 1.1.x")
625 public override fun cancel(cause: Throwable?): Boolean {
626 cancelInternal(cause?.toCancellationException() ?: defaultCancellationException())
627 return true
628 }
629
630 // It is overridden in channel-linked implementation
631 public open fun cancelInternal(cause: Throwable) {
632 cancelImpl(cause)
633 }
634
635 // Parent is cancelling child
636 public final override fun parentCancelled(parentJob: ParentJob) {
637 cancelImpl(parentJob)
638 }
639
640 /**
641 * Child was cancelled with a cause.
642 * In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
643 * It is overridden in supervisor implementations to completely ignore any child cancellation.
644 * Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
645 *
646 * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
647 * may leak to the [CoroutineExceptionHandler].
648 */
649 public open fun childCancelled(cause: Throwable): Boolean {
650 if (cause is CancellationException) return true
651 return cancelImpl(cause) && handlesException
652 }
653
654 /**
655 * Makes this [Job] cancelled with a specified [cause].
656 * It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
657 */
658 public fun cancelCoroutine(cause: Throwable?): Boolean = cancelImpl(cause)
659
660 // cause is Throwable or ParentJob when cancelChild was invoked
661 // returns true is exception was handled, false otherwise
662 internal fun cancelImpl(cause: Any?): Boolean {
663 var finalState: Any? = COMPLETING_ALREADY
664 if (onCancelComplete) {
665 // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
666 // completing and had recorded exception
667 finalState = cancelMakeCompleting(cause)
668 if (finalState === COMPLETING_WAITING_CHILDREN) return true
669 }
670 if (finalState === COMPLETING_ALREADY) {
671 finalState = makeCancelling(cause)
672 }
673 return when {
674 finalState === COMPLETING_ALREADY -> true
675 finalState === COMPLETING_WAITING_CHILDREN -> true
676 finalState === TOO_LATE_TO_CANCEL -> false
677 else -> {
678 afterCompletion(finalState)
679 true
680 }
681 }
682 }
683
684 // cause is Throwable or ParentJob when cancelChild was invoked
685 // It contains a loop and never returns COMPLETING_RETRY, can return
686 // COMPLETING_ALREADY -- if already completed/completing
687 // COMPLETING_WAITING_CHILDREN -- if started waiting for children
688 // final state -- when completed, for call to afterCompletion
689 private fun cancelMakeCompleting(cause: Any?): Any? {
690 loopOnState { state ->
691 if (state !is Incomplete || state is Finishing && state.isCompleting) {
692 // already completed/completing, do not even create exception to propose update
693 return COMPLETING_ALREADY
694 }
695 val proposedUpdate = CompletedExceptionally(createCauseException(cause))
696 val finalState = tryMakeCompleting(state, proposedUpdate)
697 if (finalState !== COMPLETING_RETRY) return finalState
698 }
699 }
700
701 @Suppress("NOTHING_TO_INLINE") // Save a stack frame
702 internal inline fun defaultCancellationException(message: String? = null, cause: Throwable? = null) =
703 JobCancellationException(message ?: cancellationExceptionMessage(), cause, this)
704
705 override fun getChildJobCancellationCause(): CancellationException {
706 // determine root cancellation cause of this job (why is it cancelling its children?)
707 val state = this.state
708 val rootCause = when (state) {
709 is Finishing -> state.rootCause
710 is CompletedExceptionally -> state.cause
711 is Incomplete -> error("Cannot be cancelling child in this state: $state")
712 else -> null // create exception with the below code on normal completion
713 }
714 return (rootCause as? CancellationException) ?: JobCancellationException("Parent job is ${stateString(state)}", rootCause, this)
715 }
716
717 // cause is Throwable or ParentJob when cancelChild was invoked
718 private fun createCauseException(cause: Any?): Throwable = when (cause) {
719 is Throwable? -> cause ?: defaultCancellationException()
720 else -> (cause as ParentJob).getChildJobCancellationCause()
721 }
722
723 // transitions to Cancelling state
724 // cause is Throwable or ParentJob when cancelChild was invoked
725 // It contains a loop and never returns COMPLETING_RETRY, can return
726 // COMPLETING_ALREADY -- if already completing or successfully made cancelling, added exception
727 // COMPLETING_WAITING_CHILDREN -- if started waiting for children, added exception
728 // TOO_LATE_TO_CANCEL -- too late to cancel, did not add exception
729 // final state -- when completed, for call to afterCompletion
730 private fun makeCancelling(cause: Any?): Any? {
731 var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
732 loopOnState { state ->
733 when (state) {
734 is Finishing -> { // already finishing -- collect exceptions
735 val notifyRootCause = synchronized(state) {
736 if (state.isSealed) return TOO_LATE_TO_CANCEL // already sealed -- cannot add exception nor mark cancelled
737 // add exception, do nothing is parent is cancelling child that is already being cancelled
738 val wasCancelling = state.isCancelling // will notify if was not cancelling
739 // Materialize missing exception if it is the first exception (otherwise -- don't)
740 if (cause != null || !wasCancelling) {
741 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
742 state.addExceptionLocked(causeException)
743 }
744 // take cause for notification if was not in cancelling state before
745 state.rootCause.takeIf { !wasCancelling }
746 }
747 notifyRootCause?.let { notifyCancelling(state.list, it) }
748 return COMPLETING_ALREADY
749 }
750 is Incomplete -> {
751 // Not yet finishing -- try to make it cancelling
752 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
753 if (state.isActive) {
754 // active state becomes cancelling
755 if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY
756 } else {
757 // non active state starts completing
758 val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException))
759 when {
760 finalState === COMPLETING_ALREADY -> error("Cannot happen in $state")
761 finalState === COMPLETING_RETRY -> return@loopOnState
762 else -> return finalState
763 }
764 }
765 }
766 else -> return TOO_LATE_TO_CANCEL // already complete
767 }
768 }
769 }
770
771 // Performs promotion of incomplete coroutine state to NodeList for the purpose of
772 // converting coroutine state to Cancelling, returns null when need to retry
773 private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:
774 when (state) {
775 is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state
776 is JobNode -> {
777 // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
778 // correctly capture a reference to it
779 promoteSingleToNodeList(state)
780 null // retry
781 }
782 else -> error("State should have list: $state")
783 }
784
785 // try make new Cancelling state on the condition that we're still in the expected state
786 private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
787 assert { state !is Finishing } // only for non-finishing states
788 assert { state.isActive } // only for active states
789 // get state's list or else promote to list to correctly operate on child lists
790 val list = getOrPromoteCancellingList(state) ?: return false
791 // Create cancelling state (with rootCause!)
792 val cancelling = Finishing(list, false, rootCause)
793 if (!_state.compareAndSet(state, cancelling)) return false
794 // Notify listeners
795 notifyCancelling(list, rootCause)
796 return true
797 }
798
799 /**
800 * Completes this job. Used by [CompletableDeferred.complete] (and exceptionally)
801 * and by [JobImpl.cancel]. It returns `false` on repeated invocation
802 * (when this job is already completing).
803 */
804 internal fun makeCompleting(proposedUpdate: Any?): Boolean {
805 loopOnState { state ->
806 val finalState = tryMakeCompleting(state, proposedUpdate)
807 when {
808 finalState === COMPLETING_ALREADY -> return false
809 finalState === COMPLETING_WAITING_CHILDREN -> return true
810 finalState === COMPLETING_RETRY -> return@loopOnState
811 else -> {
812 afterCompletion(finalState)
813 return true
814 }
815 }
816 }
817 }
818
819 /**
820 * Completes this job. Used by [AbstractCoroutine.resume].
821 * It throws [IllegalStateException] on repeated invocation (when this job is already completing).
822 * Returns:
823 * * [COMPLETING_WAITING_CHILDREN] if started waiting for children.
824 * * Final state otherwise (caller should do [afterCompletion])
825 */
826 internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
827 loopOnState { state ->
828 val finalState = tryMakeCompleting(state, proposedUpdate)
829 when {
830 finalState === COMPLETING_ALREADY ->
831 throw IllegalStateException(
832 "Job $this is already complete or completing, " +
833 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
834 )
835 finalState === COMPLETING_RETRY -> return@loopOnState
836 else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
837 }
838 }
839 }
840
841 // Returns one of COMPLETING symbols or final state:
842 // COMPLETING_ALREADY -- when already complete or completing
843 // COMPLETING_RETRY -- when need to retry due to interference
844 // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
845 // final state -- when completed, for call to afterCompletion
846 private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
847 if (state !is Incomplete)
848 return COMPLETING_ALREADY
849 /*
850 * FAST PATH -- no children to wait for && simple state (no list) && not cancelling => can complete immediately
851 * Cancellation (failures) always have to go through Finishing state to serialize exception handling.
852 * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
853 * which may miss unhandled exception.
854 */
855 if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
856 if (tryFinalizeSimpleState(state, proposedUpdate)) {
857 // Completed successfully on fast path -- return updated state
858 return proposedUpdate
859 }
860 return COMPLETING_RETRY
861 }
862 // The separate slow-path function to simplify profiling
863 return tryMakeCompletingSlowPath(state, proposedUpdate)
864 }
865
866 // Returns one of COMPLETING symbols or final state:
867 // COMPLETING_ALREADY -- when already complete or completing
868 // COMPLETING_RETRY -- when need to retry due to interference
869 // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
870 // final state -- when completed, for call to afterCompletion
871 private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
872 // get state's list or else promote to list to correctly operate on child lists
873 val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
874 // promote to Finishing state if we are not in it yet
875 // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
876 // atomically transition to finishing & completing state
877 val finishing = state as? Finishing ?: Finishing(list, false, null)
878 // must synchronize updates to finishing state
879 var notifyRootCause: Throwable? = null
880 synchronized(finishing) {
881 // check if this state is already completing
882 if (finishing.isCompleting) return COMPLETING_ALREADY
883 // mark as completing
884 finishing.isCompleting = true
885 // if we need to promote to finishing then atomically do it here.
886 // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
887 // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
888 if (finishing !== state) {
889 if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
890 }
891 // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
892 assert { !finishing.isSealed } // cannot be sealed
893 // add new proposed exception to the finishing state
894 val wasCancelling = finishing.isCancelling
895 (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
896 // If it just becomes cancelling --> must process cancelling notifications
897 notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
898 }
899 // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
900 notifyRootCause?.let { notifyCancelling(list, it) }
901 // now wait for children
902 val child = firstChild(state)
903 if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
904 return COMPLETING_WAITING_CHILDREN
905 // otherwise -- we have not children left (all were already cancelled?)
906 return finalizeFinishingState(finishing, proposedUpdate)
907 }
908
909 private val Any?.exceptionOrNull: Throwable?
910 get() = (this as? CompletedExceptionally)?.cause
911
912 private fun firstChild(state: Incomplete) =
913 state as? ChildHandleNode ?: state.list?.nextChild()
914
915 // return false when there is no more incomplete children to wait
916 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
917 private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
918 val handle = child.childJob.invokeOnCompletion(
919 invokeImmediately = false,
920 handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
921 )
922 if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
923 val nextChild = child.nextChild() ?: return false
924 return tryWaitForChild(state, nextChild, proposedUpdate)
925 }
926
927 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
928 private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
929 assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
930 // figure out if we need to wait for next child
931 val waitChild = lastChild.nextChild()
932 // try wait for next child
933 if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
934 // no more children to wait -- try update state
935 val finalState = finalizeFinishingState(state, proposedUpdate)
936 afterCompletion(finalState)
937 }
938
939 private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? {
940 var cur = this
941 while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
942 while (true) {
943 cur = cur.nextNode
944 if (cur.isRemoved) continue
945 if (cur is ChildHandleNode) return cur
946 if (cur is NodeList) return null // checked all -- no more children
947 }
948 }
949
950 public final override val children: Sequence<Job> get() = sequence {
951 when (val state = this@JobSupport.state) {
952 is ChildHandleNode -> yield(state.childJob)
953 is Incomplete -> state.list?.let { list ->
954 list.forEach<ChildHandleNode> { yield(it.childJob) }
955 }
956 }
957 }
958
959 @Suppress("OverridingDeprecatedMember")
960 public final override fun attachChild(child: ChildJob): ChildHandle {
961 /*
962 * Note: This function attaches a special ChildHandleNode node object. This node object
963 * is handled in a special way on completion on the coroutine (we wait for all of them) and
964 * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
965 * if the job is already cancelling. For cancelling state child is attached under state lock.
966 * It's required to properly wait all children before completion and provide linearizable hierarchy view:
967 * If child is attached when the job is already being cancelled, such child will receive immediate notification on
968 * cancellation, but parent *will* wait for that child before completion and will handle its exception.
969 */
970 return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(child).asHandler) as ChildHandle
971 }
972
973 /**
974 * Override to process any exceptions that were encountered while invoking completion handlers
975 * installed via [invokeOnCompletion].
976 *
977 * @suppress **This is unstable API and it is subject to change.**
978 */
979 internal open fun handleOnCompletionException(exception: Throwable) {
980 throw exception
981 }
982
983 /**
984 * This function is invoked once as soon as this job is being cancelled for any reason or completes,
985 * similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
986 *
987 * The meaning of [cause] parameter:
988 * * Cause is `null` when the job has completed normally.
989 * * Cause is an instance of [CancellationException] when the job was cancelled _normally_.
990 * **It should not be treated as an error**. In particular, it should not be reported to error logs.
991 * * Otherwise, the job had been cancelled or failed with exception.
992 *
993 * The specified [cause] is not the final cancellation cause of this job.
994 * A job may produce other exceptions while it is failing and the final cause might be different.
995 *
996 * @suppress **This is unstable API and it is subject to change.*
997 */
998 protected open fun onCancelling(cause: Throwable?) {}
999
1000 /**
1001 * Returns `true` for scoped coroutines.
1002 * Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
1003 * Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
1004 * Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
1005 */
1006 protected open val isScopedCoroutine: Boolean get() = false
1007
1008 /**
1009 * Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal].
1010 * A valid implementation of this getter should recursively check parent as well before returning `false`.
1011 *
1012 * The only instance of the [Job] that does not handle its exceptions is [JobImpl] and its subclass [SupervisorJobImpl].
1013 * @suppress **This is unstable API and it is subject to change.*
1014 */
1015 internal open val handlesException: Boolean get() = true
1016
1017 /**
1018 * Handles the final job [exception] that was not handled by the parent coroutine.
1019 * Returns `true` if it handles exception (so handling at later stages is not needed).
1020 * It is designed to be overridden by launch-like coroutines
1021 * (`StandaloneCoroutine` and `ActorCoroutine`) that don't have a result type
1022 * that can represent exceptions.
1023 *
1024 * This method is invoked **exactly once** when the final exception of the job is determined
1025 * and before it becomes complete. At the moment of invocation the job and all its children are complete.
1026 */
1027 protected open fun handleJobException(exception: Throwable): Boolean = false
1028
1029 /**
1030 * Override for completion actions that need to update some external object depending on job's state,
1031 * right before all the waiters for coroutine's completion are notified.
1032 *
1033 * @param state the final state.
1034 *
1035 * @suppress **This is unstable API and it is subject to change.**
1036 */
1037 protected open fun onCompletionInternal(state: Any?) {}
1038
1039 /**
1040 * Override for the very last action on job's completion to resume the rest of the code in
1041 * scoped coroutines. It is called when this job is externally completed in an unknown
1042 * context and thus should resume with a default mode.
1043 *
1044 * @suppress **This is unstable API and it is subject to change.**
1045 */
1046 protected open fun afterCompletion(state: Any?) {}
1047
1048 // for nicer debugging
1049 public override fun toString(): String =
1050 "${toDebugString()}@$hexAddress"
1051
1052 @InternalCoroutinesApi
1053 public fun toDebugString(): String = "${nameString()}{${stateString(state)}}"
1054
1055 /**
1056 * @suppress **This is unstable API and it is subject to change.**
1057 */
1058 internal open fun nameString(): String = classSimpleName
1059
1060 private fun stateString(state: Any?): String = when (state) {
1061 is Finishing -> when {
1062 state.isCancelling -> "Cancelling"
1063 state.isCompleting -> "Completing"
1064 else -> "Active"
1065 }
1066 is Incomplete -> if (state.isActive) "Active" else "New"
1067 is CompletedExceptionally -> "Cancelled"
1068 else -> "Completed"
1069 }
1070
1071 // Completing & Cancelling states,
1072 // All updates are guarded by synchronized(this), reads are volatile
1073 @Suppress("UNCHECKED_CAST")
1074 private class Finishing(
1075 override val list: NodeList,
1076 isCompleting: Boolean,
1077 rootCause: Throwable?
1078 ) : SynchronizedObject(), Incomplete {
1079 private val _isCompleting = atomic(isCompleting)
1080 var isCompleting: Boolean
1081 get() = _isCompleting.value
1082 set(value) { _isCompleting.value = value }
1083
1084 private val _rootCause = atomic(rootCause)
1085 var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
1086 get() = _rootCause.value
1087 set(value) { _rootCause.value = value }
1088
1089 private val _exceptionsHolder = atomic<Any?>(null)
1090 private var exceptionsHolder: Any? // Contains null | Throwable | ArrayList | SEALED
1091 get() = _exceptionsHolder.value
1092 set(value) { _exceptionsHolder.value = value }
1093
1094 // Note: cannot be modified when sealed
1095 val isSealed: Boolean get() = exceptionsHolder === SEALED
1096 val isCancelling: Boolean get() = rootCause != null
1097 override val isActive: Boolean get() = rootCause == null // !isCancelling
1098
1099 // Seals current state and returns list of exceptions
1100 // guarded by `synchronized(this)`
1101 fun sealLocked(proposedException: Throwable?): List<Throwable> {
1102 val list = when(val eh = exceptionsHolder) { // volatile read
1103 null -> allocateList()
1104 is Throwable -> allocateList().also { it.add(eh) }
1105 is ArrayList<*> -> eh as ArrayList<Throwable>
1106 else -> error("State is $eh") // already sealed -- cannot happen
1107 }
1108 val rootCause = this.rootCause // volatile read
1109 rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
1110 if (proposedException != null && proposedException != rootCause) list.add(proposedException)
1111 exceptionsHolder = SEALED
1112 return list
1113 }
1114
1115 // guarded by `synchronized(this)`
1116 fun addExceptionLocked(exception: Throwable) {
1117 val rootCause = this.rootCause // volatile read
1118 if (rootCause == null) {
1119 this.rootCause = exception
1120 return
1121 }
1122 if (exception === rootCause) return // nothing to do
1123 when (val eh = exceptionsHolder) { // volatile read
1124 null -> exceptionsHolder = exception
1125 is Throwable -> {
1126 if (exception === eh) return // nothing to do
1127 exceptionsHolder = allocateList().apply {
1128 add(eh)
1129 add(exception)
1130
1131 }
1132 }
1133 is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
1134 else -> error("State is $eh") // already sealed -- cannot happen
1135 }
1136 }
1137
1138 private fun allocateList() = ArrayList<Throwable>(4)
1139
1140 override fun toString(): String =
1141 "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$exceptionsHolder, list=$list]"
1142 }
1143
1144 private val Incomplete.isCancelling: Boolean
1145 get() = this is Finishing && isCancelling
1146
1147 // Used by parent that is waiting for child completion
1148 private class ChildCompletion(
1149 private val parent: JobSupport,
1150 private val state: Finishing,
1151 private val child: ChildHandleNode,
1152 private val proposedUpdate: Any?
1153 ) : JobNode() {
1154 override fun invoke(cause: Throwable?) {
1155 parent.continueCompleting(state, child, proposedUpdate)
1156 }
1157 }
1158
1159 private class AwaitContinuation<T>(
1160 delegate: Continuation<T>,
1161 private val job: JobSupport
1162 ) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
1163 override fun getContinuationCancellationCause(parent: Job): Throwable {
1164 val state = job.state
1165 /*
1166 * When the job we are waiting for had already completely completed exceptionally or
1167 * is failing, we shall use its root/completion cause for await's result.
1168 */
1169 if (state is Finishing) state.rootCause?.let { return it }
1170 if (state is CompletedExceptionally) return state.cause
1171 return parent.getCancellationException()
1172 }
1173
1174 protected override fun nameString(): String =
1175 "AwaitContinuation"
1176 }
1177
1178 /*
1179 * =================================================================================================
1180 * This is ready-to-use implementation for Deferred interface.
1181 * However, it is not type-safe. Conceptually it just exposes the value of the underlying
1182 * completed state as `Any?`
1183 * =================================================================================================
1184 */
1185
1186 public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
1187
1188 public fun getCompletionExceptionOrNull(): Throwable? {
1189 val state = this.state
1190 check(state !is Incomplete) { "This job has not completed yet" }
1191 return state.exceptionOrNull
1192 }
1193
1194 /**
1195 * @suppress **This is unstable API and it is subject to change.**
1196 */
1197 internal fun getCompletedInternal(): Any? {
1198 val state = this.state
1199 check(state !is Incomplete) { "This job has not completed yet" }
1200 if (state is CompletedExceptionally) throw state.cause
1201 return state.unboxState()
1202 }
1203
1204 /**
1205 * @suppress **This is unstable API and it is subject to change.**
1206 */
1207 internal suspend fun awaitInternal(): Any? {
1208 // fast-path -- check state (avoid extra object creation)
1209 while (true) { // lock-free loop on state
1210 val state = this.state
1211 if (state !is Incomplete) {
1212 // already complete -- just return result
1213 if (state is CompletedExceptionally) { // Slow path to recover stacktrace
1214 recoverAndThrow(state.cause)
1215 }
1216 return state.unboxState()
1217
1218 }
1219 if (startInternal(state) >= 0) break // break unless needs to retry
1220 }
1221 return awaitSuspend() // slow-path
1222 }
1223
1224 private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
1225 /*
1226 * Custom code here, so that parent coroutine that is using await
1227 * on its child deferred (async) coroutine would throw the exception that this child had
1228 * thrown and not a JobCancellationException.
1229 */
1230 val cont = AwaitContinuation(uCont.intercepted(), this)
1231 // we are mimicking suspendCancellableCoroutine here and call initCancellability, too.
1232 cont.initCancellability()
1233 cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
1234 cont.getResult()
1235 }
1236
1237 /**
1238 * @suppress **This is unstable API and it is subject to change.**
1239 */
1240 // registerSelectAwaitInternal
1241 @Suppress("UNCHECKED_CAST")
1242 internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) {
1243 // fast-path -- check state and select/return if needed
1244 loopOnState { state ->
1245 if (select.isSelected) return
1246 if (state !is Incomplete) {
1247 // already complete -- select result
1248 if (select.trySelect()) {
1249 if (state is CompletedExceptionally) {
1250 select.resumeSelectWithException(state.cause)
1251 }
1252 else {
1253 block.startCoroutineUnintercepted(state.unboxState() as T, select.completion)
1254 }
1255 }
1256 return
1257 }
1258 if (startInternal(state) == 0) {
1259 // slow-path -- register waiter for completion
1260 select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(select, block).asHandler))
1261 return
1262 }
1263 }
1264 }
1265
1266 /**
1267 * @suppress **This is unstable API and it is subject to change.**
1268 */
1269 @Suppress("UNCHECKED_CAST")
1270 internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
1271 val state = this.state
1272 // Note: await is non-atomic (can be cancelled while dispatched)
1273 if (state is CompletedExceptionally)
1274 select.resumeSelectWithException(state.cause)
1275 else
1276 block.startCoroutineCancellable(state.unboxState() as T, select.completion)
1277 }
1278 }
1279
1280 /*
1281 * Class to represent object as the final state of the Job
1282 */
1283 private class IncompleteStateBox(@JvmField val state: Incomplete)
boxIncompletenull1284 internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
1285 internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
1286
1287 // --------------- helper classes & constants for job implementation
1288
1289 @SharedImmutable
1290 private val COMPLETING_ALREADY = Symbol("COMPLETING_ALREADY")
1291 @JvmField
1292 @SharedImmutable
1293 internal val COMPLETING_WAITING_CHILDREN = Symbol("COMPLETING_WAITING_CHILDREN")
1294 @SharedImmutable
1295 private val COMPLETING_RETRY = Symbol("COMPLETING_RETRY")
1296 @SharedImmutable
1297 private val TOO_LATE_TO_CANCEL = Symbol("TOO_LATE_TO_CANCEL")
1298
1299 private const val RETRY = -1
1300 private const val FALSE = 0
1301 private const val TRUE = 1
1302
1303 @SharedImmutable
1304 private val SEALED = Symbol("SEALED")
1305 @SharedImmutable
1306 private val EMPTY_NEW = Empty(false)
1307 @SharedImmutable
1308 private val EMPTY_ACTIVE = Empty(true)
1309
1310 private class Empty(override val isActive: Boolean) : Incomplete {
1311 override val list: NodeList? get() = null
1312 override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
1313 }
1314
1315 @PublishedApi // for a custom job in the test module
1316 internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
1317 init { initParentJob(parent) }
1318 override val onCancelComplete get() = true
1319 /*
1320 * Check whether parent is able to handle exceptions as well.
1321 * With this check, an exception in that pattern will be handled once:
1322 * ```
1323 * launch {
1324 * val child = Job(coroutineContext[Job])
1325 * launch(child) { throw ... }
1326 * }
1327 * ```
1328 */
1329 override val handlesException: Boolean = handlesException()
completenull1330 override fun complete() = makeCompleting(Unit)
1331 override fun completeExceptionally(exception: Throwable): Boolean =
1332 makeCompleting(CompletedExceptionally(exception))
1333
1334 @JsName("handlesExceptionF")
1335 private fun handlesException(): Boolean {
1336 var parentJob = (parentHandle as? ChildHandleNode)?.job ?: return false
1337 while (true) {
1338 if (parentJob.handlesException) return true
1339 parentJob = (parentJob.parentHandle as? ChildHandleNode)?.job ?: return false
1340 }
1341 }
1342 }
1343
1344 // -------- invokeOnCompletion nodes
1345
1346 internal interface Incomplete {
1347 val isActive: Boolean
1348 val list: NodeList? // is null only for Empty and JobNode incomplete state objects
1349 }
1350
1351 internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Incomplete {
1352 /**
1353 * Initialized by [JobSupport.makeNode].
1354 */
1355 lateinit var job: JobSupport
1356 override val isActive: Boolean get() = true
1357 override val list: NodeList? get() = null
disposenull1358 override fun dispose() = job.removeNode(this)
1359 override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
1360 }
1361
1362 internal class NodeList : LockFreeLinkedListHead(), Incomplete {
1363 override val isActive: Boolean get() = true
1364 override val list: NodeList get() = this
1365
1366 fun getString(state: String) = buildString {
1367 append("List{")
1368 append(state)
1369 append("}[")
1370 var first = true
1371 this@NodeList.forEach<JobNode> { node ->
1372 if (first) first = false else append(", ")
1373 append(node)
1374 }
1375 append("]")
1376 }
1377
1378 override fun toString(): String =
1379 if (DEBUG) getString("Active") else super.toString()
1380 }
1381
1382 internal class InactiveNodeList(
1383 override val list: NodeList
1384 ) : Incomplete {
1385 override val isActive: Boolean get() = false
toStringnull1386 override fun toString(): String = if (DEBUG) list.getString("New") else super.toString()
1387 }
1388
1389 private class InvokeOnCompletion(
1390 private val handler: CompletionHandler
1391 ) : JobNode() {
1392 override fun invoke(cause: Throwable?) = handler.invoke(cause)
1393 }
1394
1395 private class ResumeOnCompletion(
1396 private val continuation: Continuation<Unit>
1397 ) : JobNode() {
invokenull1398 override fun invoke(cause: Throwable?) = continuation.resume(Unit)
1399 }
1400
1401 private class ResumeAwaitOnCompletion<T>(
1402 private val continuation: CancellableContinuationImpl<T>
1403 ) : JobNode() {
1404 override fun invoke(cause: Throwable?) {
1405 val state = job.state
1406 assert { state !is Incomplete }
1407 if (state is CompletedExceptionally) {
1408 // Resume with with the corresponding exception to preserve it
1409 continuation.resumeWithException(state.cause)
1410 } else {
1411 // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
1412 @Suppress("UNCHECKED_CAST")
1413 continuation.resume(state.unboxState() as T)
1414 }
1415 }
1416 }
1417
1418 internal class DisposeOnCompletion(
1419 private val handle: DisposableHandle
1420 ) : JobNode() {
invokenull1421 override fun invoke(cause: Throwable?) = handle.dispose()
1422 }
1423
1424 private class SelectJoinOnCompletion<R>(
1425 private val select: SelectInstance<R>,
1426 private val block: suspend () -> R
1427 ) : JobNode() {
1428 override fun invoke(cause: Throwable?) {
1429 if (select.trySelect())
1430 block.startCoroutineCancellable(select.completion)
1431 }
1432 }
1433
1434 private class SelectAwaitOnCompletion<T, R>(
1435 private val select: SelectInstance<R>,
1436 private val block: suspend (T) -> R
1437 ) : JobNode() {
invokenull1438 override fun invoke(cause: Throwable?) {
1439 if (select.trySelect())
1440 job.selectAwaitCompletion(select, block)
1441 }
1442 }
1443
1444 // -------- invokeOnCancellation nodes
1445
1446 /**
1447 * Marker for node that shall be invoked on in _cancelling_ state.
1448 * **Note: may be invoked multiple times.**
1449 */
1450 internal abstract class JobCancellingNode : JobNode()
1451
1452 private class InvokeOnCancelling(
1453 private val handler: CompletionHandler
1454 ) : JobCancellingNode() {
1455 // delegate handler shall be invoked at most once, so here is an additional flag
1456 private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
invokenull1457 override fun invoke(cause: Throwable?) {
1458 if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
1459 }
1460 }
1461
1462 internal class ChildHandleNode(
1463 @JvmField val childJob: ChildJob
1464 ) : JobCancellingNode(), ChildHandle {
1465 override val parent: Job get() = job
invokenull1466 override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
1467 override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
1468 }
1469
1470 // Same as ChildHandleNode, but for cancellable continuation
1471 internal class ChildContinuation(
1472 @JvmField val child: CancellableContinuationImpl<*>
1473 ) : JobCancellingNode() {
1474 override fun invoke(cause: Throwable?) {
1475 child.parentCancelled(child.getContinuationCancellationCause(job))
1476 }
1477 }
1478
1479