1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.internal.* 9 import kotlin.coroutines.* 10 import kotlin.coroutines.intrinsics.* 11 import kotlin.jvm.* 12 import kotlin.native.concurrent.* 13 14 private const val UNDECIDED = 0 15 private const val SUSPENDED = 1 16 private const val RESUMED = 2 17 18 @JvmField 19 @SharedImmutable 20 internal val RESUME_TOKEN = Symbol("RESUME_TOKEN") 21 22 /** 23 * @suppress **This is unstable API and it is subject to change.** 24 */ 25 @PublishedApi 26 internal open class CancellableContinuationImpl<in T>( 27 final override val delegate: Continuation<T>, 28 resumeMode: Int 29 ) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame { 30 init { 31 assert { resumeMode != MODE_UNINITIALIZED } // invalid mode for CancellableContinuationImpl 32 } 33 34 public override val context: CoroutineContext = delegate.context 35 36 /* 37 * Implementation notes 38 * 39 * CancellableContinuationImpl is a subset of Job with following limitations: 40 * 1) It can have only cancellation listener (no "on cancelling") 41 * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately') 42 * 3) It can have at most one cancellation listener 43 * 4) Its cancellation listeners cannot be deregistered 44 * As a consequence it has much simpler state machine, more lightweight machinery and 45 * less dependencies. 46 */ 47 48 /* decision state machine 49 50 +-----------+ trySuspend +-----------+ 51 | UNDECIDED | -------------> | SUSPENDED | 52 +-----------+ +-----------+ 53 | 54 | tryResume 55 V 56 +-----------+ 57 | RESUMED | 58 +-----------+ 59 60 Note: both tryResume and trySuspend can be invoked at most once, first invocation wins 61 */ 62 private val _decision = atomic(UNDECIDED) 63 64 /* 65 === Internal states === 66 name state class public state description 67 ------ ------------ ------------ ----------- 68 ACTIVE Active : Active active, no listeners 69 SINGLE_A CancelHandler : Active active, one cancellation listener 70 CANCELLED CancelledContinuation: Cancelled cancelled (final state) 71 COMPLETED any : Completed produced some result or threw an exception (final state) 72 */ 73 private val _state = atomic<Any?>(Active) 74 75 private var parentHandle: DisposableHandle? = null 76 77 internal val state: Any? get() = _state.value 78 79 public override val isActive: Boolean get() = state is NotCompleted 80 81 public override val isCompleted: Boolean get() = state !is NotCompleted 82 83 public override val isCancelled: Boolean get() = state is CancelledContinuation 84 85 // We cannot invoke `state.toString()` since it may cause a circular dependency 86 private val stateDebugRepresentation get() = when(state) { 87 is NotCompleted -> "Active" 88 is CancelledContinuation -> "Cancelled" 89 else -> "Completed" 90 } 91 92 public override fun initCancellability() { 93 /* 94 * Invariant: at the moment of invocation, `this` has not yet 95 * leaked to user code and no one is able to invoke `resume` or `cancel` 96 * on it yet. Also, this function is not invoked for reusable continuations. 97 */ 98 val handle = installParentHandle() 99 ?: return // fast path -- don't do anything without parent 100 // now check our state _after_ registering, could have completed while we were registering, 101 // but only if parent was cancelled. Parent could be in a "cancelling" state for a while, 102 // so we are helping it and cleaning the node ourselves 103 if (isCompleted) { 104 // Can be invoked concurrently in 'parentCancelled', no problems here 105 handle.dispose() 106 parentHandle = NonDisposableHandle 107 } 108 } 109 110 private fun isReusable(): Boolean = resumeMode.isReusableMode && (delegate as DispatchedContinuation<*>).isReusable() 111 112 /** 113 * Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work. 114 * Invariant: used only by [suspendCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state. 115 */ 116 @JvmName("resetStateReusable") // Prettier stack traces 117 internal fun resetStateReusable(): Boolean { 118 assert { resumeMode == MODE_CANCELLABLE_REUSABLE } 119 assert { parentHandle !== NonDisposableHandle } 120 val state = _state.value 121 assert { state !is NotCompleted } 122 if (state is CompletedContinuation && state.idempotentResume != null) { 123 // Cannot reuse continuation that was resumed with idempotent marker 124 detachChild() 125 return false 126 } 127 _decision.value = UNDECIDED 128 _state.value = Active 129 return true 130 } 131 132 public override val callerFrame: CoroutineStackFrame? 133 get() = delegate as? CoroutineStackFrame 134 135 public override fun getStackTraceElement(): StackTraceElement? = null 136 137 override fun takeState(): Any? = state 138 139 // Note: takeState does not clear the state so we don't use takenState 140 // and we use the actual current state where in CAS-loop 141 override fun cancelCompletedResult(takenState: Any?, cause: Throwable): Unit = _state.loop { state -> 142 when (state) { 143 is NotCompleted -> error("Not completed") 144 is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do 145 is CompletedContinuation -> { 146 check(!state.cancelled) { "Must be called at most once" } 147 val update = state.copy(cancelCause = cause) 148 if (_state.compareAndSet(state, update)) { 149 state.invokeHandlers(this, cause) 150 return // done 151 } 152 } 153 else -> { 154 // completed normally without marker class, promote to CompletedContinuation in case 155 // if invokeOnCancellation if called later 156 if (_state.compareAndSet(state, CompletedContinuation(state, cancelCause = cause))) { 157 return // done 158 } 159 } 160 } 161 } 162 163 /* 164 * Attempt to postpone cancellation for reusable cancellable continuation 165 */ 166 private fun cancelLater(cause: Throwable): Boolean { 167 // Ensure that we are postponing cancellation to the right reusable instance 168 if (!isReusable()) return false 169 val dispatched = delegate as DispatchedContinuation<*> 170 return dispatched.postponeCancellation(cause) 171 } 172 173 public override fun cancel(cause: Throwable?): Boolean { 174 _state.loop { state -> 175 if (state !is NotCompleted) return false // false if already complete or cancelling 176 // Active -- update to final state 177 val update = CancelledContinuation(this, cause, handled = state is CancelHandler) 178 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure 179 // Invoke cancel handler if it was present 180 (state as? CancelHandler)?.let { callCancelHandler(it, cause) } 181 // Complete state update 182 detachChildIfNonResuable() 183 dispatchResume(resumeMode) // no need for additional cancellation checks 184 return true 185 } 186 } 187 188 internal fun parentCancelled(cause: Throwable) { 189 if (cancelLater(cause)) return 190 cancel(cause) 191 // Even if cancellation has failed, we should detach child to avoid potential leak 192 detachChildIfNonResuable() 193 } 194 195 private inline fun callCancelHandlerSafely(block: () -> Unit) { 196 try { 197 block() 198 } catch (ex: Throwable) { 199 // Handler should never fail, if it does -- it is an unhandled exception 200 handleCoroutineException( 201 context, 202 CompletionHandlerException("Exception in invokeOnCancellation handler for $this", ex) 203 ) 204 } 205 } 206 207 private fun callCancelHandler(handler: CompletionHandler, cause: Throwable?) = 208 /* 209 * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, 210 * because we play type tricks on Kotlin/JS and handler is not necessarily a function there 211 */ 212 callCancelHandlerSafely { handler.invokeIt(cause) } 213 214 fun callCancelHandler(handler: CancelHandler, cause: Throwable?) = 215 callCancelHandlerSafely { handler.invoke(cause) } 216 217 fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) { 218 try { 219 onCancellation.invoke(cause) 220 } catch (ex: Throwable) { 221 // Handler should never fail, if it does -- it is an unhandled exception 222 handleCoroutineException( 223 context, 224 CompletionHandlerException("Exception in resume onCancellation handler for $this", ex) 225 ) 226 } 227 } 228 229 /** 230 * It is used when parent is cancelled to get the cancellation cause for this continuation. 231 */ 232 open fun getContinuationCancellationCause(parent: Job): Throwable = 233 parent.getCancellationException() 234 235 private fun trySuspend(): Boolean { 236 _decision.loop { decision -> 237 when (decision) { 238 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true 239 RESUMED -> return false 240 else -> error("Already suspended") 241 } 242 } 243 } 244 245 private fun tryResume(): Boolean { 246 _decision.loop { decision -> 247 when (decision) { 248 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true 249 SUSPENDED -> return false 250 else -> error("Already resumed") 251 } 252 } 253 } 254 255 @PublishedApi 256 internal fun getResult(): Any? { 257 val isReusable = isReusable() 258 // trySuspend may fail either if 'block' has resumed/cancelled a continuation 259 // or we got async cancellation from parent. 260 if (trySuspend()) { 261 /* 262 * Invariant: parentHandle is `null` *only* for reusable continuations. 263 * We were neither resumed nor cancelled, time to suspend. 264 * But first we have to install parent cancellation handle (if we didn't yet), 265 * so CC could be properly resumed on parent cancellation. 266 * 267 * This read has benign data-race with write of 'NonDisposableHandle' 268 * in 'detachChildIfNotReusable'. 269 */ 270 if (parentHandle == null) { 271 installParentHandle() 272 } 273 /* 274 * Release the continuation after installing the handle (if needed). 275 * If we were successful, then do nothing, it's ok to reuse the instance now. 276 * Otherwise, dispose the handle by ourselves. 277 */ 278 if (isReusable) { 279 releaseClaimedReusableContinuation() 280 } 281 return COROUTINE_SUSPENDED 282 } 283 // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state 284 if (isReusable) { 285 // release claimed reusable continuation for the future reuse 286 releaseClaimedReusableContinuation() 287 } 288 val state = this.state 289 if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this) 290 // if the parent job was already cancelled, then throw the corresponding cancellation exception 291 // otherwise, there is a race if suspendCancellableCoroutine { cont -> ... } does cont.resume(...) 292 // before the block returns. This getResult would return a result as opposed to cancellation 293 // exception that should have happened if the continuation is dispatched for execution later. 294 if (resumeMode.isCancellableMode) { 295 val job = context[Job] 296 if (job != null && !job.isActive) { 297 val cause = job.getCancellationException() 298 cancelCompletedResult(state, cause) 299 throw recoverStackTrace(cause, this) 300 } 301 } 302 return getSuccessfulResult(state) 303 } 304 305 private fun installParentHandle(): DisposableHandle? { 306 val parent = context[Job] ?: return null // don't do anything without a parent 307 // Install the handle 308 val handle = parent.invokeOnCompletion( 309 onCancelling = true, 310 handler = ChildContinuation(this).asHandler 311 ) 312 parentHandle = handle 313 return handle 314 } 315 316 /** 317 * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation, 318 * in which case it detaches from the parent and cancels this continuation. 319 */ 320 private fun releaseClaimedReusableContinuation() { 321 // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it 322 val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return 323 detachChild() 324 cancel(cancellationCause) 325 } 326 327 override fun resumeWith(result: Result<T>) = 328 resumeImpl(result.toState(this), resumeMode) 329 330 override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) = 331 resumeImpl(value, resumeMode, onCancellation) 332 333 public override fun invokeOnCancellation(handler: CompletionHandler) { 334 val cancelHandler = makeCancelHandler(handler) 335 _state.loop { state -> 336 when (state) { 337 is Active -> { 338 if (_state.compareAndSet(state, cancelHandler)) return // quit on cas success 339 } 340 is CancelHandler -> multipleHandlersError(handler, state) 341 is CompletedExceptionally -> { 342 /* 343 * Continuation was already cancelled or completed exceptionally. 344 * NOTE: multiple invokeOnCancellation calls with different handlers are not allowed, 345 * so we check to make sure handler was installed just once. 346 */ 347 if (!state.makeHandled()) multipleHandlersError(handler, state) 348 /* 349 * Call the handler only if it was cancelled (not called when completed exceptionally). 350 * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, 351 * because we play type tricks on Kotlin/JS and handler is not necessarily a function there 352 */ 353 if (state is CancelledContinuation) { 354 callCancelHandler(handler, (state as? CompletedExceptionally)?.cause) 355 } 356 return 357 } 358 is CompletedContinuation -> { 359 /* 360 * Continuation was already completed, and might already have cancel handler. 361 */ 362 if (state.cancelHandler != null) multipleHandlersError(handler, state) 363 // BeforeResumeCancelHandler does not need to be called on a completed continuation 364 if (cancelHandler is BeforeResumeCancelHandler) return 365 if (state.cancelled) { 366 // Was already cancelled while being dispatched -- invoke the handler directly 367 callCancelHandler(handler, state.cancelCause) 368 return 369 } 370 val update = state.copy(cancelHandler = cancelHandler) 371 if (_state.compareAndSet(state, update)) return // quit on cas success 372 } 373 else -> { 374 /* 375 * Continuation was already completed normally, but might get cancelled while being dispatched. 376 * Change its state to CompletedContinuation, unless we have BeforeResumeCancelHandler which 377 * does not need to be called in this case. 378 */ 379 if (cancelHandler is BeforeResumeCancelHandler) return 380 val update = CompletedContinuation(state, cancelHandler = cancelHandler) 381 if (_state.compareAndSet(state, update)) return // quit on cas success 382 } 383 } 384 } 385 } 386 387 private fun multipleHandlersError(handler: CompletionHandler, state: Any?) { 388 error("It's prohibited to register multiple handlers, tried to register $handler, already has $state") 389 } 390 391 private fun makeCancelHandler(handler: CompletionHandler): CancelHandler = 392 if (handler is CancelHandler) handler else InvokeOnCancel(handler) 393 394 private fun dispatchResume(mode: Int) { 395 if (tryResume()) return // completed before getResult invocation -- bail out 396 // otherwise, getResult has already commenced, i.e. completed later or in other thread 397 dispatch(mode) 398 } 399 400 private fun resumedState( 401 state: NotCompleted, 402 proposedUpdate: Any?, 403 resumeMode: Int, 404 onCancellation: ((cause: Throwable) -> Unit)?, 405 idempotent: Any? 406 ): Any? = when { 407 proposedUpdate is CompletedExceptionally -> { 408 assert { idempotent == null } // there are no idempotent exceptional resumes 409 assert { onCancellation == null } // only successful results can be cancelled 410 proposedUpdate 411 } 412 !resumeMode.isCancellableMode && idempotent == null -> proposedUpdate // cannot be cancelled in process, all is fine 413 onCancellation != null || (state is CancelHandler && state !is BeforeResumeCancelHandler) || idempotent != null -> 414 // mark as CompletedContinuation if special cases are present: 415 // Cancellation handlers that shall be called after resume or idempotent resume 416 CompletedContinuation(proposedUpdate, state as? CancelHandler, onCancellation, idempotent) 417 else -> proposedUpdate // simple case -- use the value directly 418 } 419 420 private fun resumeImpl( 421 proposedUpdate: Any?, 422 resumeMode: Int, 423 onCancellation: ((cause: Throwable) -> Unit)? = null 424 ) { 425 _state.loop { state -> 426 when (state) { 427 is NotCompleted -> { 428 val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null) 429 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure 430 detachChildIfNonResuable() 431 dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process 432 return // done 433 } 434 is CancelledContinuation -> { 435 /* 436 * If continuation was cancelled, then resume attempt must be ignored, 437 * because cancellation is asynchronous and may race with resume. 438 * Racy exceptions will be lost, too. 439 */ 440 if (state.makeResumed()) { // check if trying to resume one (otherwise error) 441 // call onCancellation 442 onCancellation?.let { callOnCancellation(it, state.cause) } 443 return // done 444 } 445 } 446 } 447 alreadyResumedError(proposedUpdate) // otherwise, an error (second resume attempt) 448 } 449 } 450 451 /** 452 * Similar to [tryResume], but does not actually completes resume (needs [completeResume] call). 453 * Returns [RESUME_TOKEN] when resumed, `null` when it was already resumed or cancelled. 454 */ 455 private fun tryResumeImpl( 456 proposedUpdate: Any?, 457 idempotent: Any?, 458 onCancellation: ((cause: Throwable) -> Unit)? 459 ): Symbol? { 460 _state.loop { state -> 461 when (state) { 462 is NotCompleted -> { 463 val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent) 464 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure 465 detachChildIfNonResuable() 466 return RESUME_TOKEN 467 } 468 is CompletedContinuation -> { 469 return if (idempotent != null && state.idempotentResume === idempotent) { 470 assert { state.result == proposedUpdate } // "Non-idempotent resume" 471 RESUME_TOKEN // resumed with the same token -- ok 472 } else { 473 null // resumed with a different token or non-idempotent -- too late 474 } 475 } 476 else -> return null // cannot resume -- not active anymore 477 } 478 } 479 } 480 481 private fun alreadyResumedError(proposedUpdate: Any?): Nothing { 482 error("Already resumed, but proposed with update $proposedUpdate") 483 } 484 485 // Unregister from parent job 486 private fun detachChildIfNonResuable() { 487 // If instance is reusable, do not detach on every reuse, #releaseInterceptedContinuation will do it for us in the end 488 if (!isReusable()) detachChild() 489 } 490 491 /** 492 * Detaches from the parent. 493 */ 494 internal fun detachChild() { 495 val handle = parentHandle ?: return 496 handle.dispose() 497 parentHandle = NonDisposableHandle 498 } 499 500 // Note: Always returns RESUME_TOKEN | null 501 override fun tryResume(value: T, idempotent: Any?): Any? = 502 tryResumeImpl(value, idempotent, onCancellation = null) 503 504 override fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? = 505 tryResumeImpl(value, idempotent, onCancellation) 506 507 override fun tryResumeWithException(exception: Throwable): Any? = 508 tryResumeImpl(CompletedExceptionally(exception), idempotent = null, onCancellation = null) 509 510 // note: token is always RESUME_TOKEN 511 override fun completeResume(token: Any) { 512 assert { token === RESUME_TOKEN } 513 dispatchResume(resumeMode) 514 } 515 516 override fun CoroutineDispatcher.resumeUndispatched(value: T) { 517 val dc = delegate as? DispatchedContinuation 518 resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode) 519 } 520 521 override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) { 522 val dc = delegate as? DispatchedContinuation 523 resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode) 524 } 525 526 @Suppress("UNCHECKED_CAST") 527 override fun <T> getSuccessfulResult(state: Any?): T = 528 when (state) { 529 is CompletedContinuation -> state.result as T 530 else -> state as T 531 } 532 533 // The exceptional state in CancellableContinuationImpl is stored directly and it is not recovered yet. 534 // The stacktrace recovery is invoked here. 535 override fun getExceptionalResult(state: Any?): Throwable? = 536 super.getExceptionalResult(state)?.let { recoverStackTrace(it, delegate) } 537 538 // For nicer debugging 539 public override fun toString(): String = 540 "${nameString()}(${delegate.toDebugString()}){$stateDebugRepresentation}@$hexAddress" 541 542 protected open fun nameString(): String = 543 "CancellableContinuation" 544 545 } 546 547 // Marker for active continuation 548 internal interface NotCompleted 549 550 private object Active : NotCompleted { toStringnull551 override fun toString(): String = "Active" 552 } 553 554 /** 555 * Base class for all [CancellableContinuation.invokeOnCancellation] handlers to avoid an extra instance 556 * on JVM, yet support JS where you cannot extend from a functional type. 557 */ 558 internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted 559 560 /** 561 * Base class for all [CancellableContinuation.invokeOnCancellation] handlers that don't need to be invoked 562 * if continuation is cancelled after resumption, during dispatch, because the corresponding resources 563 * were already released before calling `resume`. This cancel handler is called only before `resume`. 564 * It avoids allocation of [CompletedContinuation] instance during resume on JVM. 565 */ 566 internal abstract class BeforeResumeCancelHandler : CancelHandler() 567 568 // Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly 569 private class InvokeOnCancel( // Clashes with InvokeOnCancellation 570 private val handler: CompletionHandler 571 ) : CancelHandler() { 572 override fun invoke(cause: Throwable?) { 573 handler.invoke(cause) 574 } 575 override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]" 576 } 577 578 // Completed with additional metadata 579 private data class CompletedContinuation( 580 @JvmField val result: Any?, 581 @JvmField val cancelHandler: CancelHandler? = null, // installed via invokeOnCancellation 582 @JvmField val onCancellation: ((cause: Throwable) -> Unit)? = null, // installed via resume block 583 @JvmField val idempotentResume: Any? = null, 584 @JvmField val cancelCause: Throwable? = null 585 ) { 586 val cancelled: Boolean get() = cancelCause != null 587 invokeHandlersnull588 fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) { 589 cancelHandler?.let { cont.callCancelHandler(it, cause) } 590 onCancellation?.let { cont.callOnCancellation(it, cause) } 591 } 592 } 593