<lambda>null1 @file:Suppress("DEPRECATION_ERROR")
2
3 package kotlinx.coroutines
4
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.internal.*
7 import kotlinx.coroutines.selects.*
8 import kotlin.coroutines.*
9 import kotlin.coroutines.intrinsics.*
10 import kotlin.js.*
11 import kotlin.jvm.*
12
13 /**
14 * A concrete implementation of [Job]. It is optionally a child to a parent job.
15 *
16 * This is an open class designed for extension by more specific classes that might augment the
17 * state and mare store addition state information for completed jobs, like their result values.
18 *
19 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
20 * @suppress **This is unstable API and it is subject to change.**
21 */
22 @OptIn(InternalForInheritanceCoroutinesApi::class)
23 @Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
24 public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob {
25 final override val key: CoroutineContext.Key<*> get() = Job
26
27 /*
28 === Internal states ===
29
30 name state class public state description
31 ------ ------------ ------------ -----------
32 EMPTY_N EmptyNew : New no listeners
33 EMPTY_A EmptyActive : Active no listeners
34 SINGLE JobNode : Active a single listener
35 SINGLE+ JobNode : Active a single listener + NodeList added as its next
36 LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
37 LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
38 COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
39 CANCELLING Finishing : Cancelling -- " --
40 FINAL_C Cancelled : Cancelled Cancelled (final state)
41 FINAL_R <any> : Completed produced some result
42
43 === Transitions ===
44
45 New states Active states Inactive states
46
47 +---------+ +---------+ }
48 | EMPTY_N | ----> | EMPTY_A | ----+ } Empty states
49 +---------+ +---------+ | }
50 | | | ^ | +----------+
51 | | | | +--> | FINAL_* |
52 | | V | | +----------+
53 | | +---------+ | }
54 | | | SINGLE | ----+ } JobNode states
55 | | +---------+ | }
56 | | | | }
57 | | V | }
58 | | +---------+ | }
59 | +-------> | SINGLE+ | ----+ }
60 | +---------+ | }
61 | | |
62 V V |
63 +---------+ +---------+ | }
64 | LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states
65 +---------+ +---------+ | }
66 | | | | |
67 | | +--------+ | |
68 | | | V |
69 | | | +------------+ | +------------+ }
70 | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
71 | | +------------+ +------------+ }
72 | | | ^
73 | | | |
74 +--------+---------+--------------------+
75
76
77 This state machine and its transition matrix are optimized for the common case when a job is created in active
78 state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
79 successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
80 state without going to COMPLETING state)
81
82 Note that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
83
84 ---------- TIMELINE of state changes and notification in Job lifecycle ----------
85
86 | The longest possible chain of events in shown, shorter versions cut-through intermediate states,
87 | while still performing all the notifications in this order.
88
89 + Job object is created
90 ## NEW: state == EMPTY_NEW | is InactiveNodeList
91 + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
92 ~ waits for start
93 >> start / join / await invoked
94 ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
95 + onStart (lazy coroutine is started)
96 ~ active coroutine is working (or scheduled to execution)
97 >> childCancelled / cancelImpl invoked
98 ## CANCELLING: state is Finishing, state.rootCause != null
99 ------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancelling=true) returns NonDisposableHandle
100 ------ new children get immediately cancelled, but are still admitted to the list
101 + onCancelling
102 + notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception)
103 + cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too)
104 ~ waits for completion of coroutine body
105 >> makeCompleting / makeCompletingOnce invoked
106 ## COMPLETING: state is Finishing, state.isCompleting == true
107 ------ new children are not admitted anymore, attachChild returns NonDisposableHandle
108 ~ waits for children
109 >> last child completes
110 - computes the final exception
111 ## SEALED: state is Finishing, state.isSealed == true
112 ------ cancel/childCancelled returns false (cannot handle exceptions anymore)
113 + cancelParent (final exception is communicated to the parent, parent incorporates it)
114 + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
115 ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
116 ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
117 + parentHandle.dispose
118 + notifyCompletion (invoke all completion listeners)
119 + onCompletionInternal / onCompleted / onCancelled
120
121 ---------------------------------------------------------------------------------
122 */
123
124 // Note: use shared objects while we have no listeners
125 private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
126
127 private val _parentHandle = atomic<ChildHandle?>(null)
128 internal var parentHandle: ChildHandle?
129 get() = _parentHandle.value
130 set(value) { _parentHandle.value = value }
131
132 override val parent: Job?
133 get() = parentHandle?.parent
134
135 // ------------ initialization ------------
136
137 /**
138 * Initializes parent job.
139 * It shall be invoked at most once after construction after all other initialization.
140 */
141 protected fun initParentJob(parent: Job?) {
142 assert { parentHandle == null }
143 if (parent == null) {
144 parentHandle = NonDisposableHandle
145 return
146 }
147 parent.start() // make sure the parent is started
148 val handle = parent.attachChild(this)
149 parentHandle = handle
150 // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
151 if (isCompleted) {
152 handle.dispose()
153 parentHandle = NonDisposableHandle // release it just in case, to aid GC
154 }
155 }
156
157 // ------------ state query ------------
158 /**
159 * Returns current state of this job.
160 * If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
161 * and should be [unboxed][unboxState] before returning to user code.
162 */
163 internal val state: Any? get() = _state.value
164
165 /**
166 * @suppress **This is unstable API and it is subject to change.**
167 */
168 private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
169 while (true) {
170 block(state)
171 }
172 }
173
174 public override val isActive: Boolean get() {
175 val state = this.state
176 return state is Incomplete && state.isActive
177 }
178
179 public final override val isCompleted: Boolean get() = state !is Incomplete
180
181 public final override val isCancelled: Boolean get() {
182 val state = this.state
183 return state is CompletedExceptionally || (state is Finishing && state.isCancelling)
184 }
185
186 // ------------ state update ------------
187
188 // Finalizes Finishing -> Completed (terminal state) transition.
189 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
190 // Returns final state that was created and updated to
191 private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
192 /*
193 * Note: proposed state can be Incomplete, e.g.
194 * async {
195 * something.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
196 * }
197 */
198 assert { this.state === state } // consistency check -- it cannot change
199 assert { !state.isSealed } // consistency check -- cannot be sealed yet
200 assert { state.isCompleting } // consistency check -- must be marked as completing
201 val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
202 // Create the final exception and seal the state so that no more exceptions can be added
203 val wasCancelling: Boolean
204 val finalException = synchronized(state) {
205 wasCancelling = state.isCancelling
206 val exceptions = state.sealLocked(proposedException)
207 val finalCause = getFinalRootCause(state, exceptions)
208 if (finalCause != null) addSuppressedExceptions(finalCause, exceptions)
209 finalCause
210 }
211 // Create the final state object
212 val finalState = when {
213 // was not cancelled (no exception) -> use proposed update value
214 finalException == null -> proposedUpdate
215 // small optimization when we can used proposeUpdate object as is on cancellation
216 finalException === proposedException -> proposedUpdate
217 // cancelled job final state
218 else -> CompletedExceptionally(finalException)
219 }
220 // Now handle the final exception
221 if (finalException != null) {
222 val handled = cancelParent(finalException) || handleJobException(finalException)
223 if (handled) (finalState as CompletedExceptionally).makeHandled()
224 }
225 // Process state updates for the final state before the state of the Job is actually set to the final state
226 // to avoid races where outside observer may see the job in the final state, yet exception is not handled yet.
227 if (!wasCancelling) onCancelling(finalException)
228 onCompletionInternal(finalState)
229 // Then CAS to completed state -> it must succeed
230 val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete())
231 assert { casSuccess }
232 // And process all post-completion actions
233 completeStateFinalization(state, finalState)
234 return finalState
235 }
236
237 private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
238 // A case of no exceptions
239 if (exceptions.isEmpty()) {
240 // materialize cancellation exception if it was not materialized yet
241 if (state.isCancelling) return defaultCancellationException()
242 return null
243 }
244 /*
245 * 1) If we have non-CE, use it as root cause
246 * 2) If our original cause was TCE, use *non-original* TCE because of the special nature of TCE
247 * - It is a CE, so it's not reported by children
248 * - The first instance (cancellation cause) is created by timeout coroutine and has no meaningful stacktrace
249 * - The potential second instance is thrown by withTimeout lexical block itself, then it has recovered stacktrace
250 * 3) Just return the very first CE
251 */
252 val firstNonCancellation = exceptions.firstOrNull { it !is CancellationException }
253 if (firstNonCancellation != null) return firstNonCancellation
254 val first = exceptions[0]
255 if (first is TimeoutCancellationException) {
256 val detailedTimeoutException = exceptions.firstOrNull { it !== first && it is TimeoutCancellationException }
257 if (detailedTimeoutException != null) return detailedTimeoutException
258 }
259 return first
260 }
261
262 private fun addSuppressedExceptions(rootCause: Throwable, exceptions: List<Throwable>) {
263 if (exceptions.size <= 1) return // nothing more to do here
264 val seenExceptions = identitySet<Throwable>(exceptions.size)
265 /*
266 * Note that root cause may be a recovered exception as well.
267 * To avoid cycles we unwrap the root cause and check for self-suppression against unwrapped cause,
268 * but add suppressed exceptions to the recovered root cause (as it is our final exception)
269 */
270 val unwrappedCause = unwrap(rootCause)
271 for (exception in exceptions) {
272 val unwrapped = unwrap(exception)
273 if (unwrapped !== rootCause && unwrapped !== unwrappedCause &&
274 unwrapped !is CancellationException && seenExceptions.add(unwrapped)) {
275 rootCause.addSuppressed(unwrapped)
276 }
277 }
278 }
279
280 // fast-path method to finalize normally completed coroutines without children
281 // returns true if complete, and afterCompletion(update) shall be called
282 private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
283 assert { state is Empty || state is JobNode } // only simple state without lists where children can concurrently add
284 assert { update !is CompletedExceptionally } // only for normal completion
285 if (!_state.compareAndSet(state, update.boxIncomplete())) return false
286 onCancelling(null) // simple state is not a failure
287 onCompletionInternal(update)
288 completeStateFinalization(state, update)
289 return true
290 }
291
292 // suppressed == true when any exceptions were suppressed while building the final completion cause
293 private fun completeStateFinalization(state: Incomplete, update: Any?) {
294 /*
295 * Now the job in THE FINAL state. We need to properly handle the resulting state.
296 * Order of various invocations here is important.
297 *
298 * 1) Unregister from parent job.
299 */
300 parentHandle?.let {
301 it.dispose() // volatile read parentHandle _after_ state was updated
302 parentHandle = NonDisposableHandle // release it just in case, to aid GC
303 }
304 val cause = (update as? CompletedExceptionally)?.cause
305 /*
306 * 2) Invoke completion handlers: .join(), callbacks etc.
307 * It's important to invoke them only AFTER exception handling and everything else, see #208
308 */
309 if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
310 try {
311 state.invoke(cause)
312 } catch (ex: Throwable) {
313 handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
314 }
315 } else {
316 state.list?.notifyCompletion(cause)
317 }
318 }
319
320 private fun notifyCancelling(list: NodeList, cause: Throwable) {
321 // first cancel our own children
322 onCancelling(cause)
323 list.close(LIST_CANCELLATION_PERMISSION)
324 notifyHandlers(list, cause) { it.onCancelling }
325 // then cancel parent
326 cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
327 }
328
329 /**
330 * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
331 * Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
332 *
333 * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
334 * may leak to the [CoroutineExceptionHandler].
335 */
336 private fun cancelParent(cause: Throwable): Boolean {
337 // Is scoped coroutine -- don't propagate, will be rethrown
338 if (isScopedCoroutine) return true
339
340 /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
341 * This allow parent to cancel its children (normally) without being cancelled itself, unless
342 * child crashes and produce some other exception during its completion.
343 */
344 val isCancellation = cause is CancellationException
345 val parent = parentHandle
346 // No parent -- ignore CE, report other exceptions.
347 if (parent === null || parent === NonDisposableHandle) {
348 return isCancellation
349 }
350
351 // Notify parent but don't forget to check cancellation
352 return parent.childCancelled(cause) || isCancellation
353 }
354
355 private fun NodeList.notifyCompletion(cause: Throwable?) {
356 close(LIST_ON_COMPLETION_PERMISSION)
357 notifyHandlers(this, cause) { true }
358 }
359
360 private inline fun notifyHandlers(list: NodeList, cause: Throwable?, predicate: (JobNode) -> Boolean) {
361 var exception: Throwable? = null
362 list.forEach { node ->
363 if (node is JobNode && predicate(node)) {
364 try {
365 node.invoke(cause)
366 } catch (ex: Throwable) {
367 exception?.apply { addSuppressed(ex) } ?: run {
368 exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
369 }
370 }
371 }
372 }
373 exception?.let { handleOnCompletionException(it) }
374 }
375
376 public final override fun start(): Boolean {
377 loopOnState { state ->
378 when (startInternal(state)) {
379 FALSE -> return false
380 TRUE -> return true
381 }
382 }
383 }
384
385 // returns: RETRY/FALSE/TRUE:
386 // FALSE when not new,
387 // TRUE when started
388 // RETRY when need to retry
389 private fun startInternal(state: Any?): Int {
390 when (state) {
391 is Empty -> { // EMPTY_X state -- no completion handlers
392 if (state.isActive) return FALSE // already active
393 if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
394 onStart()
395 return TRUE
396 }
397 is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
398 if (!_state.compareAndSet(state, state.list)) return RETRY
399 onStart()
400 return TRUE
401 }
402 else -> return FALSE // not a new state
403 }
404 }
405
406 /**
407 * Override to provide the actual [start] action.
408 * This function is invoked exactly once when non-active coroutine is [started][start].
409 */
410 protected open fun onStart() {}
411
412 public final override fun getCancellationException(): CancellationException =
413 when (val state = this.state) {
414 is Finishing -> state.rootCause?.toCancellationException("$classSimpleName is cancelling")
415 ?: error("Job is still new or active: $this")
416 is Incomplete -> error("Job is still new or active: $this")
417 is CompletedExceptionally -> state.cause.toCancellationException()
418 else -> JobCancellationException("$classSimpleName has completed normally", null, this)
419 }
420
421 protected fun Throwable.toCancellationException(message: String? = null): CancellationException =
422 this as? CancellationException ?: defaultCancellationException(message, this)
423
424 /**
425 * Returns the cause that signals the completion of this job -- it returns the original
426 * [cancel] cause, [CancellationException] or **`null` if this job had completed normally**.
427 * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
428 * is being cancelled yet.
429 */
430 protected val completionCause: Throwable?
431 get() = when (val state = state) {
432 is Finishing -> state.rootCause
433 ?: error("Job is still new or active: $this")
434 is Incomplete -> error("Job is still new or active: $this")
435 is CompletedExceptionally -> state.cause
436 else -> null
437 }
438
439 /**
440 * Returns `true` when [completionCause] exception was handled by parent coroutine.
441 */
442 protected val completionCauseHandled: Boolean
443 get() = state.let { it is CompletedExceptionally && it.handled }
444
445 public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
446 invokeOnCompletionInternal(
447 invokeImmediately = true,
448 node = InvokeOnCompletion(handler),
449 )
450
451 public final override fun invokeOnCompletion(onCancelling: Boolean, invokeImmediately: Boolean, handler: CompletionHandler): DisposableHandle =
452 invokeOnCompletionInternal(
453 invokeImmediately = invokeImmediately,
454 node = if (onCancelling) {
455 InvokeOnCancelling(handler)
456 } else {
457 InvokeOnCompletion(handler)
458 }
459 )
460
461 internal fun invokeOnCompletionInternal(
462 invokeImmediately: Boolean,
463 node: JobNode
464 ): DisposableHandle {
465 node.job = this
466 // Create node upfront -- for common cases it just initializes JobNode.job field,
467 // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
468 val added = tryPutNodeIntoList(node) { state, list ->
469 if (node.onCancelling) {
470 /**
471 * We are querying whether the job was already cancelled when we entered this block.
472 * We can't naively attempt to add the node to the list, because a lot of time could pass between
473 * notifying the cancellation handlers (and thus closing the list, forcing us to retry)
474 * and reaching a final state.
475 *
476 * Alternatively, we could also try to add the node to the list first and then read the latest state
477 * to check for an exception, but that logic would need to manually handle the final state, which is
478 * less straightforward.
479 */
480 val rootCause = (state as? Finishing)?.rootCause
481 if (rootCause == null) {
482 /**
483 * There is no known root cause yet, so we can add the node to the list of state handlers.
484 *
485 * If this call fails, because of the bitmask, this means one of the two happened:
486 * - [notifyCancelling] was already called.
487 * This means that the job is already being cancelled: otherwise, with what exception would we
488 * notify the handler?
489 * So, we can retry the operation: either the state is already final, or the `rootCause` check
490 * above will give a different result.
491 * - [notifyCompletion] was already called.
492 * This means that the job is already complete.
493 * We can retry the operation and will observe the final state.
494 */
495 list.addLast(node, LIST_CANCELLATION_PERMISSION or LIST_ON_COMPLETION_PERMISSION)
496 } else {
497 /**
498 * The root cause is known, so we can invoke the handler immediately and avoid adding it.
499 */
500 if (invokeImmediately) node.invoke(rootCause)
501 return NonDisposableHandle
502 }
503 } else {
504 /**
505 * The non-[onCancelling]-handlers are interested in completions only, so it's safe to add them at
506 * any time before [notifyCompletion] is called (which closes the list).
507 *
508 * If the list *is* closed, on a retry, we'll observe the final state, as [notifyCompletion] is only
509 * called after the state transition.
510 */
511 list.addLast(node, LIST_ON_COMPLETION_PERMISSION)
512 }
513 }
514 when {
515 added -> return node
516 invokeImmediately -> node.invoke((state as? CompletedExceptionally)?.cause)
517 }
518 return NonDisposableHandle
519 }
520
521 /**
522 * Puts [node] into the current state's list of completion handlers.
523 *
524 * Returns `false` if the state is already complete and doesn't accept new handlers.
525 * Returns `true` if the handler was successfully added to the list.
526 *
527 * [tryAdd] is invoked when the state is [Incomplete] and the list is not `null`, to decide on the specific
528 * behavior in this case. It must return
529 * - `true` if the element was successfully added to the list
530 * - `false` if the operation needs to be retried
531 */
532 private inline fun tryPutNodeIntoList(
533 node: JobNode,
534 tryAdd: (Incomplete, NodeList) -> Boolean
535 ): Boolean {
536 loopOnState { state ->
537 when (state) {
538 is Empty -> { // EMPTY_X state -- no completion handlers
539 if (state.isActive) {
540 // try to move to the SINGLE state
541 if (_state.compareAndSet(state, node)) return true
542 } else
543 promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
544 }
545 is Incomplete -> when (val list = state.list) {
546 null -> promoteSingleToNodeList(state as JobNode)
547 else -> if (tryAdd(state, list)) return true
548 }
549 else -> return false
550 }
551 }
552 }
553
554 private fun promoteEmptyToNodeList(state: Empty) {
555 // try to promote it to LIST state with the corresponding state
556 val list = NodeList()
557 val update = if (state.isActive) list else InactiveNodeList(list)
558 _state.compareAndSet(state, update)
559 }
560
561 private fun promoteSingleToNodeList(state: JobNode) {
562 // try to promote it to list (SINGLE+ state)
563 state.addOneIfEmpty(NodeList())
564 // it must be in SINGLE+ state or state has changed (node could have need removed from state)
565 val list = state.nextNode // either our NodeList or somebody else won the race, updated state
566 // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
567 _state.compareAndSet(state, list)
568 }
569
570 public final override suspend fun join() {
571 if (!joinInternal()) { // fast-path no wait
572 coroutineContext.ensureActive()
573 return // do not suspend
574 }
575 return joinSuspend() // slow-path wait
576 }
577
578 private fun joinInternal(): Boolean {
579 loopOnState { state ->
580 if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
581 if (startInternal(state) >= 0) return true // wait unless need to retry
582 }
583 }
584
585 private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
586 // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
587 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont)))
588 }
589
590 @Suppress("UNCHECKED_CAST")
591 public final override val onJoin: SelectClause0
592 get() = SelectClause0Impl(
593 clauseObject = this@JobSupport,
594 regFunc = JobSupport::registerSelectForOnJoin as RegistrationFunction
595 )
596
597 @Suppress("UNUSED_PARAMETER")
598 private fun registerSelectForOnJoin(select: SelectInstance<*>, ignoredParam: Any?) {
599 if (!joinInternal()) {
600 select.selectInRegistrationPhase(Unit)
601 return
602 }
603 val disposableHandle = invokeOnCompletion(handler = SelectOnJoinCompletionHandler(select))
604 select.disposeOnCompletion(disposableHandle)
605 }
606
607 private inner class SelectOnJoinCompletionHandler(
608 private val select: SelectInstance<*>
609 ) : JobNode() {
610 override val onCancelling: Boolean get() = false
611 override fun invoke(cause: Throwable?) {
612 select.trySelect(this@JobSupport, Unit)
613 }
614 }
615
616 /**
617 * @suppress **This is unstable API and it is subject to change.**
618 */
619 internal fun removeNode(node: JobNode) {
620 // remove logic depends on the state of the job
621 loopOnState { state ->
622 when (state) {
623 is JobNode -> { // SINGE/SINGLE+ state -- one completion handler
624 if (state !== node) return // a different job node --> we were already removed
625 // try remove and revert back to empty state
626 if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
627 }
628 is Incomplete -> { // may have a list of completion handlers
629 // remove node from the list if there is a list
630 if (state.list != null) node.remove()
631 return
632 }
633 else -> return // it is complete and does not have any completion handlers
634 }
635 }
636 }
637
638 /**
639 * Returns `true` for job that do not have "body block" to complete and should immediately go into
640 * completing state and start waiting for children.
641 *
642 * @suppress **This is unstable API and it is subject to change.**
643 */
644 internal open val onCancelComplete: Boolean get() = false
645
646 // external cancel with cause, never invoked implicitly from internal machinery
647 public override fun cancel(cause: CancellationException?) {
648 cancelInternal(cause ?: defaultCancellationException())
649 }
650
651 protected open fun cancellationExceptionMessage(): String = "Job was cancelled"
652
653 // HIDDEN in Job interface. Invoked only by legacy compiled code.
654 // external cancel with (optional) cause, never invoked implicitly from internal machinery
655 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Added since 1.2.0 for binary compatibility with versions <= 1.1.x")
656 public override fun cancel(cause: Throwable?): Boolean {
657 cancelInternal(cause?.toCancellationException() ?: defaultCancellationException())
658 return true
659 }
660
661 // It is overridden in channel-linked implementation
662 public open fun cancelInternal(cause: Throwable) {
663 cancelImpl(cause)
664 }
665
666 // Parent is cancelling child
667 public final override fun parentCancelled(parentJob: ParentJob) {
668 cancelImpl(parentJob)
669 }
670
671 /**
672 * Child was cancelled with a cause.
673 * In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
674 * It is overridden in supervisor implementations to completely ignore any child cancellation.
675 * Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
676 *
677 * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
678 * may leak to the [CoroutineExceptionHandler].
679 */
680 public open fun childCancelled(cause: Throwable): Boolean {
681 if (cause is CancellationException) return true
682 return cancelImpl(cause) && handlesException
683 }
684
685 /**
686 * Makes this [Job] cancelled with a specified [cause].
687 * It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
688 */
689 public fun cancelCoroutine(cause: Throwable?): Boolean = cancelImpl(cause)
690
691 // cause is Throwable or ParentJob when cancelChild was invoked
692 // returns true is exception was handled, false otherwise
693 internal fun cancelImpl(cause: Any?): Boolean {
694 var finalState: Any? = COMPLETING_ALREADY
695 if (onCancelComplete) {
696 // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
697 // completing and had recorded exception
698 finalState = cancelMakeCompleting(cause)
699 if (finalState === COMPLETING_WAITING_CHILDREN) return true
700 }
701 if (finalState === COMPLETING_ALREADY) {
702 finalState = makeCancelling(cause)
703 }
704 return when {
705 finalState === COMPLETING_ALREADY -> true
706 finalState === COMPLETING_WAITING_CHILDREN -> true
707 finalState === TOO_LATE_TO_CANCEL -> false
708 else -> {
709 afterCompletion(finalState)
710 true
711 }
712 }
713 }
714
715 // cause is Throwable or ParentJob when cancelChild was invoked
716 // It contains a loop and never returns COMPLETING_RETRY, can return
717 // COMPLETING_ALREADY -- if already completed/completing
718 // COMPLETING_WAITING_CHILDREN -- if started waiting for children
719 // final state -- when completed, for call to afterCompletion
720 private fun cancelMakeCompleting(cause: Any?): Any? {
721 loopOnState { state ->
722 if (state !is Incomplete || state is Finishing && state.isCompleting) {
723 // already completed/completing, do not even create exception to propose update
724 return COMPLETING_ALREADY
725 }
726 val proposedUpdate = CompletedExceptionally(createCauseException(cause))
727 val finalState = tryMakeCompleting(state, proposedUpdate)
728 if (finalState !== COMPLETING_RETRY) return finalState
729 }
730 }
731
732 @Suppress("NOTHING_TO_INLINE") // Save a stack frame
733 internal inline fun defaultCancellationException(message: String? = null, cause: Throwable? = null) =
734 JobCancellationException(message ?: cancellationExceptionMessage(), cause, this)
735
736 override fun getChildJobCancellationCause(): CancellationException {
737 // determine root cancellation cause of this job (why is it cancelling its children?)
738 val state = this.state
739 val rootCause = when (state) {
740 is Finishing -> state.rootCause
741 is CompletedExceptionally -> state.cause
742 is Incomplete -> error("Cannot be cancelling child in this state: $state")
743 else -> null // create exception with the below code on normal completion
744 }
745 return (rootCause as? CancellationException) ?: JobCancellationException("Parent job is ${stateString(state)}", rootCause, this)
746 }
747
748 // cause is Throwable or ParentJob when cancelChild was invoked
749 private fun createCauseException(cause: Any?): Throwable = when (cause) {
750 is Throwable? -> cause ?: defaultCancellationException()
751 else -> (cause as ParentJob).getChildJobCancellationCause()
752 }
753
754 // transitions to Cancelling state
755 // cause is Throwable or ParentJob when cancelChild was invoked
756 // It contains a loop and never returns COMPLETING_RETRY, can return
757 // COMPLETING_ALREADY -- if already completing or successfully made cancelling, added exception
758 // COMPLETING_WAITING_CHILDREN -- if started waiting for children, added exception
759 // TOO_LATE_TO_CANCEL -- too late to cancel, did not add exception
760 // final state -- when completed, for call to afterCompletion
761 private fun makeCancelling(cause: Any?): Any? {
762 var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
763 loopOnState { state ->
764 when (state) {
765 is Finishing -> { // already finishing -- collect exceptions
766 val notifyRootCause = synchronized(state) {
767 if (state.isSealed) return TOO_LATE_TO_CANCEL // already sealed -- cannot add exception nor mark cancelled
768 // add exception, do nothing is parent is cancelling child that is already being cancelled
769 val wasCancelling = state.isCancelling // will notify if was not cancelling
770 // Materialize missing exception if it is the first exception (otherwise -- don't)
771 if (cause != null || !wasCancelling) {
772 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
773 state.addExceptionLocked(causeException)
774 }
775 // take cause for notification if was not in cancelling state before
776 state.rootCause.takeIf { !wasCancelling }
777 }
778 notifyRootCause?.let { notifyCancelling(state.list, it) }
779 return COMPLETING_ALREADY
780 }
781 is Incomplete -> {
782 // Not yet finishing -- try to make it cancelling
783 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
784 if (state.isActive) {
785 // active state becomes cancelling
786 if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY
787 } else {
788 // non active state starts completing
789 val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException))
790 when {
791 finalState === COMPLETING_ALREADY -> error("Cannot happen in $state")
792 finalState === COMPLETING_RETRY -> return@loopOnState
793 else -> return finalState
794 }
795 }
796 }
797 else -> return TOO_LATE_TO_CANCEL // already complete
798 }
799 }
800 }
801
802 // Performs promotion of incomplete coroutine state to NodeList for the purpose of
803 // converting coroutine state to Cancelling, returns null when need to retry
804 private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:
805 when (state) {
806 is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state
807 is JobNode -> {
808 // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
809 // correctly capture a reference to it
810 promoteSingleToNodeList(state)
811 null // retry
812 }
813 else -> error("State should have list: $state")
814 }
815
816 // try make new Cancelling state on the condition that we're still in the expected state
817 private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
818 assert { state !is Finishing } // only for non-finishing states
819 assert { state.isActive } // only for active states
820 // get state's list or else promote to list to correctly operate on child lists
821 val list = getOrPromoteCancellingList(state) ?: return false
822 // Create cancelling state (with rootCause!)
823 val cancelling = Finishing(list, false, rootCause)
824 if (!_state.compareAndSet(state, cancelling)) return false
825 // Notify listeners
826 notifyCancelling(list, rootCause)
827 return true
828 }
829
830 /**
831 * Completes this job. Used by [CompletableDeferred.complete] (and exceptionally)
832 * and by [JobImpl.cancel]. It returns `false` on repeated invocation
833 * (when this job is already completing).
834 */
835 internal fun makeCompleting(proposedUpdate: Any?): Boolean {
836 loopOnState { state ->
837 val finalState = tryMakeCompleting(state, proposedUpdate)
838 when {
839 finalState === COMPLETING_ALREADY -> return false
840 finalState === COMPLETING_WAITING_CHILDREN -> return true
841 finalState === COMPLETING_RETRY -> return@loopOnState
842 else -> {
843 afterCompletion(finalState)
844 return true
845 }
846 }
847 }
848 }
849
850 /**
851 * Completes this job. Used by [AbstractCoroutine.resume].
852 * It throws [IllegalStateException] on repeated invocation (when this job is already completing).
853 * Returns:
854 * - [COMPLETING_WAITING_CHILDREN] if started waiting for children.
855 * - Final state otherwise (caller should do [afterCompletion])
856 */
857 internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
858 loopOnState { state ->
859 val finalState = tryMakeCompleting(state, proposedUpdate)
860 when {
861 finalState === COMPLETING_ALREADY ->
862 throw IllegalStateException(
863 "Job $this is already complete or completing, " +
864 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
865 )
866 finalState === COMPLETING_RETRY -> return@loopOnState
867 else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
868 }
869 }
870 }
871
872 // Returns one of COMPLETING symbols or final state:
873 // COMPLETING_ALREADY -- when already complete or completing
874 // COMPLETING_RETRY -- when need to retry due to interference
875 // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
876 // final state -- when completed, for call to afterCompletion
877 private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
878 if (state !is Incomplete)
879 return COMPLETING_ALREADY
880 /*
881 * FAST PATH -- no children to wait for && simple state (no list) && not cancelling => can complete immediately
882 * Cancellation (failures) always have to go through Finishing state to serialize exception handling.
883 * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
884 * which may miss unhandled exception.
885 */
886 if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
887 if (tryFinalizeSimpleState(state, proposedUpdate)) {
888 // Completed successfully on fast path -- return updated state
889 return proposedUpdate
890 }
891 return COMPLETING_RETRY
892 }
893 // The separate slow-path function to simplify profiling
894 return tryMakeCompletingSlowPath(state, proposedUpdate)
895 }
896
897 // Returns one of COMPLETING symbols or final state:
898 // COMPLETING_ALREADY -- when already complete or completing
899 // COMPLETING_RETRY -- when need to retry due to interference
900 // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
901 // final state -- when completed, for call to afterCompletion
902 private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
903 // get state's list or else promote to list to correctly operate on child lists
904 val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
905 // promote to Finishing state if we are not in it yet
906 // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
907 // atomically transition to finishing & completing state
908 val finishing = state as? Finishing ?: Finishing(list, false, null)
909 // must synchronize updates to finishing state
910 val notifyRootCause: Throwable?
911 synchronized(finishing) {
912 // check if this state is already completing
913 if (finishing.isCompleting) return COMPLETING_ALREADY
914 // mark as completing
915 finishing.isCompleting = true
916 // if we need to promote to finishing, then atomically do it here.
917 // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
918 // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
919 if (finishing !== state) {
920 if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
921 }
922 // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
923 assert { !finishing.isSealed } // cannot be sealed
924 // add new proposed exception to the finishing state
925 val wasCancelling = finishing.isCancelling
926 (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
927 // If it just becomes cancelling --> must process cancelling notifications
928 notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
929 }
930 // process cancelling notification here -- it cancels all the children _before_ we start to wait them (sic!!!)
931 notifyRootCause?.let { notifyCancelling(list, it) }
932 // now wait for children
933 // we can't close the list yet: while there are active children, adding new ones is still allowed.
934 val child = list.nextChild()
935 if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
936 return COMPLETING_WAITING_CHILDREN
937 // turns out, there are no children to await, so we close the list.
938 list.close(LIST_CHILD_PERMISSION)
939 // some children could have sneaked into the list, so we try waiting for them again.
940 // it would be more correct to re-open the list (otherwise, we get non-linearizable behavior),
941 // but it's too difficult with the current lock-free list implementation.
942 val anotherChild = list.nextChild()
943 if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate))
944 return COMPLETING_WAITING_CHILDREN
945 // otherwise -- we have not children left (all were already cancelled?)
946 return finalizeFinishingState(finishing, proposedUpdate)
947 }
948
949 private val Any?.exceptionOrNull: Throwable?
950 get() = (this as? CompletedExceptionally)?.cause
951
952 // return false when there is no more incomplete children to wait
953 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
954 private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
955 val handle = child.childJob.invokeOnCompletion(
956 invokeImmediately = false,
957 handler = ChildCompletion(this, state, child, proposedUpdate)
958 )
959 if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
960 val nextChild = child.nextChild() ?: return false
961 return tryWaitForChild(state, nextChild, proposedUpdate)
962 }
963
964 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
965 private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
966 assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
967 // figure out if we need to wait for the next child
968 val waitChild = lastChild.nextChild()
969 // try to wait for the next child
970 if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
971 // no more children to await, so *maybe* we can complete the job; for that, we stop accepting new children.
972 // potentially, the list can be closed for children more than once: if we detect that there are no more
973 // children, attempt to close the list, and then new children sneak in, this whole logic will be
974 // repeated, including closing the list.
975 state.list.close(LIST_CHILD_PERMISSION)
976 // did any new children sneak in?
977 val waitChildAgain = lastChild.nextChild()
978 if (waitChildAgain != null && tryWaitForChild(state, waitChildAgain, proposedUpdate)) {
979 // yes, so now we have to wait for them!
980 // ideally, we should re-open the list,
981 // but it's too difficult with the current lock-free list implementation,
982 // so we'll live with non-linearizable behavior for now.
983 return
984 }
985 // no more children, now we are sure; try to update the state
986 val finalState = finalizeFinishingState(state, proposedUpdate)
987 afterCompletion(finalState)
988 }
989
990 private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? {
991 var cur = this
992 while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
993 while (true) {
994 cur = cur.nextNode
995 if (cur.isRemoved) continue
996 if (cur is ChildHandleNode) return cur
997 if (cur is NodeList) return null // checked all -- no more children
998 }
999 }
1000
1001 public final override val children: Sequence<Job> get() = sequence {
1002 when (val state = this@JobSupport.state) {
1003 is ChildHandleNode -> yield(state.childJob)
1004 is Incomplete -> state.list?.let { list ->
1005 list.forEach { if (it is ChildHandleNode) yield(it.childJob) }
1006 }
1007 }
1008 }
1009
1010 @Suppress("OverridingDeprecatedMember")
1011 public final override fun attachChild(child: ChildJob): ChildHandle {
1012 /*
1013 * Note: This function attaches a special ChildHandleNode node object. This node object
1014 * is handled in a special way on completion on the coroutine (we wait for all of them) and also
1015 * can't be added simply with `invokeOnCompletionInternal` -- we add this node to the list even
1016 * if the job is already cancelling.
1017 * It's required to properly await all children before completion and provide a linearizable hierarchy view:
1018 * If the child is attached when the job is already being cancelled, such a child will receive
1019 * an immediate notification on cancellation,
1020 * but the parent *will* wait for that child before completion and will handle its exception.
1021 */
1022 val node = ChildHandleNode(child).also { it.job = this }
1023 val added = tryPutNodeIntoList(node) { _, list ->
1024 // First, try to add a child along the cancellation handlers
1025 val addedBeforeCancellation = list.addLast(
1026 node,
1027 LIST_ON_COMPLETION_PERMISSION or LIST_CHILD_PERMISSION or LIST_CANCELLATION_PERMISSION
1028 )
1029 if (addedBeforeCancellation) {
1030 // The child managed to be added before the parent started to cancel or complete. Success.
1031 true
1032 } else {
1033 /* Either cancellation or completion already happened, the child was not added.
1034 * Now we need to try adding it just for completion. */
1035 val addedBeforeCompletion = list.addLast(
1036 node,
1037 LIST_CHILD_PERMISSION or LIST_ON_COMPLETION_PERMISSION
1038 )
1039 /*
1040 * Whether or not we managed to add the child before the parent completed, we need to investigate:
1041 * why didn't we manage to add it before cancellation?
1042 * If it's because cancellation happened in the meantime, we need to notify the child about it.
1043 * We check the latest state because the original state with which we started may not have had
1044 * the information about the cancellation yet.
1045 */
1046 val rootCause = when (val latestState = this.state) {
1047 is Finishing -> {
1048 // The state is still incomplete, so we need to notify the child about the completion cause.
1049 latestState.rootCause
1050 }
1051 else -> {
1052 /** Since the list is already closed for [onCancelling], the job is either Finishing or
1053 * already completed. We need to notify the child about the completion cause. */
1054 assert { latestState !is Incomplete }
1055 (latestState as? CompletedExceptionally)?.cause
1056 }
1057 }
1058 /**
1059 * We must cancel the child if the parent was cancelled already, even if we successfully attached,
1060 * as this child didn't make it before [notifyCancelling] and won't be notified that it should be
1061 * cancelled.
1062 *
1063 * And if the parent wasn't cancelled and the previous [LockFreeLinkedListNode.addLast] failed because
1064 * the job is in its final state already, we won't be able to attach anyway, so we must just invoke
1065 * the handler and return.
1066 */
1067 node.invoke(rootCause)
1068 if (addedBeforeCompletion) {
1069 /** The root cause can't be null: since the earlier addition to the list failed, this means that
1070 * the job was already cancelled or completed. */
1071 assert { rootCause != null }
1072 true
1073 } else {
1074 /** No sense in retrying: we know it won't succeed, and we already invoked the handler. */
1075 return NonDisposableHandle
1076 }
1077 }
1078 }
1079 if (added) return node
1080 /** We can only end up here if [tryPutNodeIntoList] detected a final state. */
1081 node.invoke((state as? CompletedExceptionally)?.cause)
1082 return NonDisposableHandle
1083 }
1084
1085 /**
1086 * Override to process any exceptions that were encountered while invoking completion handlers
1087 * installed via [invokeOnCompletion].
1088 *
1089 * @suppress **This is unstable API and it is subject to change.**
1090 */
1091 internal open fun handleOnCompletionException(exception: Throwable) {
1092 throw exception
1093 }
1094
1095 /**
1096 * This function is invoked once as soon as this job is being cancelled for any reason or completes,
1097 * similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
1098 *
1099 * The meaning of [cause] parameter:
1100 * - Cause is `null` when the job has completed normally.
1101 * - Cause is an instance of [CancellationException] when the job was cancelled _normally_.
1102 * **It should not be treated as an error**. In particular, it should not be reported to error logs.
1103 * - Otherwise, the job had been cancelled or failed with exception.
1104 *
1105 * The specified [cause] is not the final cancellation cause of this job.
1106 * A job may produce other exceptions while it is failing and the final cause might be different.
1107 *
1108 * @suppress **This is unstable API and it is subject to change.*
1109 */
1110 protected open fun onCancelling(cause: Throwable?) {}
1111
1112 /**
1113 * Returns `true` for scoped coroutines.
1114 * Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
1115 * Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
1116 * Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
1117 */
1118 protected open val isScopedCoroutine: Boolean get() = false
1119
1120 /**
1121 * Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal].
1122 * A valid implementation of this getter should recursively check parent as well before returning `false`.
1123 *
1124 * The only instance of the [Job] that does not handle its exceptions is [JobImpl] and its subclass [SupervisorJobImpl].
1125 * @suppress **This is unstable API and it is subject to change.*
1126 */
1127 internal open val handlesException: Boolean get() = true
1128
1129 /**
1130 * Handles the final job [exception] that was not handled by the parent coroutine.
1131 * Returns `true` if it handles exception (so handling at later stages is not needed).
1132 * It is designed to be overridden by launch-like coroutines
1133 * (`StandaloneCoroutine` and `ActorCoroutine`) that don't have a result type
1134 * that can represent exceptions.
1135 *
1136 * This method is invoked **exactly once** when the final exception of the job is determined
1137 * and before it becomes complete. At the moment of invocation the job and all its children are complete.
1138 */
1139 protected open fun handleJobException(exception: Throwable): Boolean = false
1140
1141 /**
1142 * Override for completion actions that need to update some external object depending on job's state,
1143 * right before all the waiters for coroutine's completion are notified.
1144 *
1145 * @param state the final state.
1146 *
1147 * @suppress **This is unstable API and it is subject to change.**
1148 */
1149 protected open fun onCompletionInternal(state: Any?) {}
1150
1151 /**
1152 * Override for the very last action on job's completion to resume the rest of the code in
1153 * scoped coroutines. It is called when this job is externally completed in an unknown
1154 * context and thus should resume with a default mode.
1155 *
1156 * @suppress **This is unstable API and it is subject to change.**
1157 */
1158 protected open fun afterCompletion(state: Any?) {}
1159
1160 // for nicer debugging
1161 public override fun toString(): String =
1162 "${toDebugString()}@$hexAddress"
1163
1164 @InternalCoroutinesApi
1165 public fun toDebugString(): String = "${nameString()}{${stateString(state)}}"
1166
1167 /**
1168 * @suppress **This is unstable API and it is subject to change.**
1169 */
1170 internal open fun nameString(): String = classSimpleName
1171
1172 private fun stateString(state: Any?): String = when (state) {
1173 is Finishing -> when {
1174 state.isCancelling -> "Cancelling"
1175 state.isCompleting -> "Completing"
1176 else -> "Active"
1177 }
1178 is Incomplete -> if (state.isActive) "Active" else "New"
1179 is CompletedExceptionally -> "Cancelled"
1180 else -> "Completed"
1181 }
1182
1183 // Completing & Cancelling states,
1184 // All updates are guarded by synchronized(this), reads are volatile
1185 @Suppress("UNCHECKED_CAST")
1186 private class Finishing(
1187 override val list: NodeList,
1188 isCompleting: Boolean,
1189 rootCause: Throwable?
1190 ) : SynchronizedObject(), Incomplete {
1191 private val _isCompleting = atomic(isCompleting)
1192 var isCompleting: Boolean
1193 get() = _isCompleting.value
1194 set(value) { _isCompleting.value = value }
1195
1196 private val _rootCause = atomic(rootCause)
1197 var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
1198 get() = _rootCause.value
1199 set(value) { _rootCause.value = value }
1200
1201 private val _exceptionsHolder = atomic<Any?>(null)
1202 private var exceptionsHolder: Any? // Contains null | Throwable | ArrayList | SEALED
1203 get() = _exceptionsHolder.value
1204 set(value) { _exceptionsHolder.value = value }
1205
1206 // Note: cannot be modified when sealed
1207 val isSealed: Boolean get() = exceptionsHolder === SEALED
1208 val isCancelling: Boolean get() = rootCause != null
1209 override val isActive: Boolean get() = rootCause == null // !isCancelling
1210
1211 // Seals current state and returns list of exceptions
1212 // guarded by `synchronized(this)`
1213 fun sealLocked(proposedException: Throwable?): List<Throwable> {
1214 val list = when(val eh = exceptionsHolder) { // volatile read
1215 null -> allocateList()
1216 is Throwable -> allocateList().also { it.add(eh) }
1217 is ArrayList<*> -> eh as ArrayList<Throwable>
1218 else -> error("State is $eh") // already sealed -- cannot happen
1219 }
1220 val rootCause = this.rootCause // volatile read
1221 rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
1222 if (proposedException != null && proposedException != rootCause) list.add(proposedException)
1223 exceptionsHolder = SEALED
1224 return list
1225 }
1226
1227 // guarded by `synchronized(this)`
1228 fun addExceptionLocked(exception: Throwable) {
1229 val rootCause = this.rootCause // volatile read
1230 if (rootCause == null) {
1231 this.rootCause = exception
1232 return
1233 }
1234 if (exception === rootCause) return // nothing to do
1235 when (val eh = exceptionsHolder) { // volatile read
1236 null -> exceptionsHolder = exception
1237 is Throwable -> {
1238 if (exception === eh) return // nothing to do
1239 exceptionsHolder = allocateList().apply {
1240 add(eh)
1241 add(exception)
1242
1243 }
1244 }
1245 is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
1246 else -> error("State is $eh") // already sealed -- cannot happen
1247 }
1248 }
1249
1250 private fun allocateList() = ArrayList<Throwable>(4)
1251
1252 override fun toString(): String =
1253 "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$exceptionsHolder, list=$list]"
1254 }
1255
1256 private val Incomplete.isCancelling: Boolean
1257 get() = this is Finishing && isCancelling
1258
1259 // Used by parent that is waiting for child completion
1260 private class ChildCompletion(
1261 private val parent: JobSupport,
1262 private val state: Finishing,
1263 private val child: ChildHandleNode,
1264 private val proposedUpdate: Any?
1265 ) : JobNode() {
1266 override val onCancelling get() = false
1267 override fun invoke(cause: Throwable?) {
1268 parent.continueCompleting(state, child, proposedUpdate)
1269 }
1270 }
1271
1272 private class AwaitContinuation<T>(
1273 delegate: Continuation<T>,
1274 private val job: JobSupport
1275 ) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
1276 override fun getContinuationCancellationCause(parent: Job): Throwable {
1277 val state = job.state
1278 /*
1279 * When the job we are waiting for had already completely completed exceptionally or
1280 * is failing, we shall use its root/completion cause for await's result.
1281 */
1282 if (state is Finishing) state.rootCause?.let { return it }
1283 if (state is CompletedExceptionally) return state.cause
1284 return parent.getCancellationException()
1285 }
1286
1287 override fun nameString(): String =
1288 "AwaitContinuation"
1289 }
1290
1291 /*
1292 * =================================================================================================
1293 * This is ready-to-use implementation for Deferred interface.
1294 * However, it is not type-safe. Conceptually it just exposes the value of the underlying
1295 * completed state as `Any?`
1296 * =================================================================================================
1297 */
1298
1299 public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
1300
1301 public fun getCompletionExceptionOrNull(): Throwable? {
1302 val state = this.state
1303 check(state !is Incomplete) { "This job has not completed yet" }
1304 return state.exceptionOrNull
1305 }
1306
1307 /**
1308 * @suppress **This is unstable API and it is subject to change.**
1309 */
1310 internal fun getCompletedInternal(): Any? {
1311 val state = this.state
1312 check(state !is Incomplete) { "This job has not completed yet" }
1313 if (state is CompletedExceptionally) throw state.cause
1314 return state.unboxState()
1315 }
1316
1317 /**
1318 * @suppress **This is unstable API and it is subject to change.**
1319 */
1320 protected suspend fun awaitInternal(): Any? {
1321 // fast-path -- check state (avoid extra object creation)
1322 while (true) { // lock-free loop on state
1323 val state = this.state
1324 if (state !is Incomplete) {
1325 // already complete -- just return result
1326 if (state is CompletedExceptionally) { // Slow path to recover stacktrace
1327 recoverAndThrow(state.cause)
1328 }
1329 return state.unboxState()
1330
1331 }
1332 if (startInternal(state) >= 0) break // break unless needs to retry
1333 }
1334 return awaitSuspend() // slow-path
1335 }
1336
1337 private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
1338 /*
1339 * Custom code here, so that parent coroutine that is using await
1340 * on its child deferred (async) coroutine would throw the exception that this child had
1341 * thrown and not a JobCancellationException.
1342 */
1343 val cont = AwaitContinuation(uCont.intercepted(), this)
1344 // we are mimicking suspendCancellableCoroutine here and call initCancellability, too.
1345 cont.initCancellability()
1346 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeAwaitOnCompletion(cont)))
1347 cont.getResult()
1348 }
1349
1350 @Suppress("UNCHECKED_CAST")
1351 protected val onAwaitInternal: SelectClause1<*> get() = SelectClause1Impl<Any?>(
1352 clauseObject = this@JobSupport,
1353 regFunc = JobSupport::onAwaitInternalRegFunc as RegistrationFunction,
1354 processResFunc = JobSupport::onAwaitInternalProcessResFunc as ProcessResultFunction
1355 )
1356
1357 @Suppress("UNUSED_PARAMETER")
1358 private fun onAwaitInternalRegFunc(select: SelectInstance<*>, ignoredParam: Any?) {
1359 while (true) {
1360 val state = this.state
1361 if (state !is Incomplete) {
1362 val result = if (state is CompletedExceptionally) state else state.unboxState()
1363 select.selectInRegistrationPhase(result)
1364 return
1365 }
1366 if (startInternal(state) >= 0) break // break unless needs to retry
1367 }
1368 val disposableHandle = invokeOnCompletion(handler = SelectOnAwaitCompletionHandler(select))
1369 select.disposeOnCompletion(disposableHandle)
1370 }
1371
1372 @Suppress("UNUSED_PARAMETER")
1373 private fun onAwaitInternalProcessResFunc(ignoredParam: Any?, result: Any?): Any? {
1374 if (result is CompletedExceptionally) throw result.cause
1375 return result
1376 }
1377
1378 private inner class SelectOnAwaitCompletionHandler(
1379 private val select: SelectInstance<*>
1380 ) : JobNode() {
1381 override val onCancelling get() = false
1382 override fun invoke(cause: Throwable?) {
1383 val state = this@JobSupport.state
1384 val result = if (state is CompletedExceptionally) state else state.unboxState()
1385 select.trySelect(this@JobSupport, result)
1386 }
1387 }
1388 }
1389
1390 /*
1391 * Class to represent object as the final state of the Job
1392 */
1393 private class IncompleteStateBox(@JvmField val state: Incomplete)
boxIncompletenull1394 internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
1395 internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
1396
1397 // --------------- helper classes & constants for job implementation
1398
1399 private val COMPLETING_ALREADY = Symbol("COMPLETING_ALREADY")
1400 @JvmField
1401 internal val COMPLETING_WAITING_CHILDREN = Symbol("COMPLETING_WAITING_CHILDREN")
1402 private val COMPLETING_RETRY = Symbol("COMPLETING_RETRY")
1403 private val TOO_LATE_TO_CANCEL = Symbol("TOO_LATE_TO_CANCEL")
1404
1405 private const val RETRY = -1
1406 private const val FALSE = 0
1407 private const val TRUE = 1
1408
1409 private val SEALED = Symbol("SEALED")
1410 private val EMPTY_NEW = Empty(false)
1411 private val EMPTY_ACTIVE = Empty(true)
1412
1413 // bit mask
1414 private const val LIST_ON_COMPLETION_PERMISSION = 1
1415 private const val LIST_CHILD_PERMISSION = 2
1416 private const val LIST_CANCELLATION_PERMISSION = 4
1417
1418 private class Empty(override val isActive: Boolean) : Incomplete {
1419 override val list: NodeList? get() = null
1420 override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
1421 }
1422
1423 @OptIn(InternalForInheritanceCoroutinesApi::class)
1424 @PublishedApi // for a custom job in the test module
1425 internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
1426 init { initParentJob(parent) }
1427 override val onCancelComplete get() = true
1428 /*
1429 * Check whether parent is able to handle exceptions as well.
1430 * With this check, an exception in that pattern will be handled once:
1431 * ```
1432 * launch {
1433 * val child = Job(coroutineContext[Job])
1434 * launch(child) { throw ... }
1435 * }
1436 * ```
1437 */
1438 override val handlesException: Boolean = handlesException()
completenull1439 override fun complete() = makeCompleting(Unit)
1440 override fun completeExceptionally(exception: Throwable): Boolean =
1441 makeCompleting(CompletedExceptionally(exception))
1442
1443 @JsName("handlesExceptionF")
1444 private fun handlesException(): Boolean {
1445 var parentJob = (parentHandle as? ChildHandleNode)?.job ?: return false
1446 while (true) {
1447 if (parentJob.handlesException) return true
1448 parentJob = (parentJob.parentHandle as? ChildHandleNode)?.job ?: return false
1449 }
1450 }
1451 }
1452
1453 // -------- invokeOnCompletion nodes
1454
1455 internal interface Incomplete {
1456 val isActive: Boolean
1457 val list: NodeList? // is null only for Empty and JobNode incomplete state objects
1458 }
1459
1460 internal abstract class JobNode : LockFreeLinkedListNode(), DisposableHandle, Incomplete {
1461 /**
1462 * Initialized by [JobSupport.invokeOnCompletionInternal].
1463 */
1464 lateinit var job: JobSupport
1465
1466 /**
1467 * If `false`, [invoke] will be called once the job is cancelled or is complete.
1468 * If `true`, [invoke] is invoked as soon as the job becomes _cancelling_ instead, and if that doesn't happen,
1469 * it will be called once the job is cancelled or is complete.
1470 */
1471 abstract val onCancelling: Boolean
1472 override val isActive: Boolean get() = true
1473 override val list: NodeList? get() = null
1474
disposenull1475 override fun dispose() = job.removeNode(this)
1476 override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
1477 /**
1478 * Signals completion.
1479 *
1480 * This function:
1481 * - Does not throw any exceptions.
1482 * For [Job] instances that are coroutines, exceptions thrown by this function will be caught, wrapped into
1483 * [CompletionHandlerException], and passed to [handleCoroutineException], but for those that are not coroutines,
1484 * they will just be rethrown, potentially crashing unrelated code.
1485 * - Is fast, non-blocking, and thread-safe.
1486 * - Can be invoked concurrently with the surrounding code.
1487 * - Can be invoked from any context.
1488 *
1489 * The meaning of `cause` that is passed to the handler is:
1490 * - It is `null` if the job has completed normally.
1491 * - It is an instance of [CancellationException] if the job was cancelled _normally_.
1492 * **It should not be treated as an error**. In particular, it should not be reported to error logs.
1493 * - Otherwise, the job had _failed_.
1494 *
1495 * [CompletionHandler] is the user-visible interface for supplying custom implementations of [invoke]
1496 * (see [InvokeOnCompletion] and [InvokeOnCancelling]).
1497 */
1498 abstract fun invoke(cause: Throwable?)
1499 }
1500
1501 internal class NodeList : LockFreeLinkedListHead(), Incomplete {
1502 override val isActive: Boolean get() = true
1503 override val list: NodeList get() = this
1504
1505 fun getString(state: String) = buildString {
1506 append("List{")
1507 append(state)
1508 append("}[")
1509 var first = true
1510 this@NodeList.forEach { node ->
1511 if (node is JobNode) {
1512 if (first) first = false else append(", ")
1513 append(node)
1514 }
1515 }
1516 append("]")
1517 }
1518
1519 override fun toString(): String =
1520 if (DEBUG) getString("Active") else super.toString()
1521 }
1522
1523 private class InactiveNodeList(
1524 override val list: NodeList
1525 ) : Incomplete {
1526 override val isActive: Boolean get() = false
toStringnull1527 override fun toString(): String = if (DEBUG) list.getString("New") else super.toString()
1528 }
1529
1530 private class InvokeOnCompletion(
1531 private val handler: CompletionHandler
1532 ) : JobNode() {
1533 override val onCancelling get() = false
1534 override fun invoke(cause: Throwable?) = handler.invoke(cause)
1535 }
1536
1537 private class ResumeOnCompletion(
1538 private val continuation: Continuation<Unit>
1539 ) : JobNode() {
1540 override val onCancelling get() = false
invokenull1541 override fun invoke(cause: Throwable?) = continuation.resume(Unit)
1542 }
1543
1544 private class ResumeAwaitOnCompletion<T>(
1545 private val continuation: CancellableContinuationImpl<T>
1546 ) : JobNode() {
1547 override val onCancelling get() = false
1548 override fun invoke(cause: Throwable?) {
1549 val state = job.state
1550 assert { state !is Incomplete }
1551 if (state is CompletedExceptionally) {
1552 // Resume with with the corresponding exception to preserve it
1553 continuation.resumeWithException(state.cause)
1554 } else {
1555 // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
1556 @Suppress("UNCHECKED_CAST")
1557 continuation.resume(state.unboxState() as T)
1558 }
1559 }
1560 }
1561
1562 // -------- invokeOnCancellation nodes
1563
1564 private class InvokeOnCancelling(
1565 private val handler: CompletionHandler
1566 ) : JobNode() {
1567 // delegate handler shall be invoked at most once, so here is an additional flag
1568 private val _invoked = atomic(false)
1569 override val onCancelling get() = true
invokenull1570 override fun invoke(cause: Throwable?) {
1571 if (_invoked.compareAndSet(expect = false, update = true)) handler.invoke(cause)
1572 }
1573 }
1574
1575 private class ChildHandleNode(
1576 @JvmField val childJob: ChildJob
1577 ) : JobNode(), ChildHandle {
1578 override val parent: Job get() = job
1579 override val onCancelling: Boolean get() = true
invokenull1580 override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
1581 override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
1582 }
1583