1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.internal.* 9 import kotlin.coroutines.* 10 import kotlin.coroutines.intrinsics.* 11 import kotlin.jvm.* 12 import kotlin.native.concurrent.* 13 14 private const val UNDECIDED = 0 15 private const val SUSPENDED = 1 16 private const val RESUMED = 2 17 18 @JvmField 19 @SharedImmutable 20 internal val RESUME_TOKEN = Symbol("RESUME_TOKEN") 21 22 /** 23 * @suppress **This is unstable API and it is subject to change.** 24 */ 25 @PublishedApi 26 internal open class CancellableContinuationImpl<in T>( 27 final override val delegate: Continuation<T>, 28 resumeMode: Int 29 ) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame { 30 init { 31 assert { resumeMode != MODE_UNINITIALIZED } // invalid mode for CancellableContinuationImpl 32 } 33 34 public override val context: CoroutineContext = delegate.context 35 36 /* 37 * Implementation notes 38 * 39 * CancellableContinuationImpl is a subset of Job with following limitations: 40 * 1) It can have only cancellation listener (no "on cancelling") 41 * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately') 42 * 3) It can have at most one cancellation listener 43 * 4) Its cancellation listeners cannot be deregistered 44 * As a consequence it has much simpler state machine, more lightweight machinery and 45 * less dependencies. 46 */ 47 48 /* decision state machine 49 50 +-----------+ trySuspend +-----------+ 51 | UNDECIDED | -------------> | SUSPENDED | 52 +-----------+ +-----------+ 53 | 54 | tryResume 55 V 56 +-----------+ 57 | RESUMED | 58 +-----------+ 59 60 Note: both tryResume and trySuspend can be invoked at most once, first invocation wins 61 */ 62 private val _decision = atomic(UNDECIDED) 63 64 /* 65 === Internal states === 66 name state class public state description 67 ------ ------------ ------------ ----------- 68 ACTIVE Active : Active active, no listeners 69 SINGLE_A CancelHandler : Active active, one cancellation listener 70 CANCELLED CancelledContinuation: Cancelled cancelled (final state) 71 COMPLETED any : Completed produced some result or threw an exception (final state) 72 */ 73 private val _state = atomic<Any?>(Active) 74 75 private val _parentHandle = atomic<DisposableHandle?>(null) 76 private var parentHandle: DisposableHandle? 77 get() = _parentHandle.value 78 set(value) { _parentHandle.value = value } 79 80 internal val state: Any? get() = _state.value 81 82 public override val isActive: Boolean get() = state is NotCompleted 83 84 public override val isCompleted: Boolean get() = state !is NotCompleted 85 86 public override val isCancelled: Boolean get() = state is CancelledContinuation 87 88 public override fun initCancellability() { 89 setupCancellation() 90 } 91 92 private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this) 93 94 /** 95 * Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work. 96 * Invariant: used only by [suspendCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state. 97 */ 98 @JvmName("resetStateReusable") // Prettier stack traces 99 internal fun resetStateReusable(): Boolean { 100 assert { resumeMode == MODE_CANCELLABLE_REUSABLE } // invalid mode for CancellableContinuationImpl 101 assert { parentHandle !== NonDisposableHandle } 102 val state = _state.value 103 assert { state !is NotCompleted } 104 if (state is CompletedContinuation && state.idempotentResume != null) { 105 // Cannot reuse continuation that was resumed with idempotent marker 106 detachChild() 107 return false 108 } 109 _decision.value = UNDECIDED 110 _state.value = Active 111 return true 112 } 113 114 /** 115 * Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations. 116 * It is only invoked from an internal [getResult] function for reusable continuations 117 * and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere. 118 */ 119 private fun setupCancellation() { 120 if (checkCompleted()) return 121 if (parentHandle !== null) return // fast path 2 -- was already initialized 122 val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent 123 val handle = parent.invokeOnCompletion( 124 onCancelling = true, 125 handler = ChildContinuation(parent, this).asHandler 126 ) 127 parentHandle = handle 128 // now check our state _after_ registering (could have completed while we were registering) 129 // Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us 130 if (isCompleted && !isReusable()) { 131 handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle 132 parentHandle = NonDisposableHandle // release it just in case, to aid GC 133 } 134 } 135 136 private fun checkCompleted(): Boolean { 137 val completed = isCompleted 138 if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations 139 val dispatched = delegate as? DispatchedContinuation<*> ?: return completed 140 val cause = dispatched.checkPostponedCancellation(this) ?: return completed 141 if (!completed) { 142 // Note: this cancel may fail if one more concurrent cancel is currently being invoked 143 cancel(cause) 144 } 145 return true 146 } 147 148 public override val callerFrame: CoroutineStackFrame? 149 get() = delegate as? CoroutineStackFrame 150 151 public override fun getStackTraceElement(): StackTraceElement? = null 152 153 override fun takeState(): Any? = state 154 155 // Note: takeState does not clear the state so we don't use takenState 156 // and we use the actual current state where in CAS-loop 157 override fun cancelCompletedResult(takenState: Any?, cause: Throwable): Unit = _state.loop { state -> 158 when (state) { 159 is NotCompleted -> error("Not completed") 160 is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do 161 is CompletedContinuation -> { 162 check(!state.cancelled) { "Must be called at most once" } 163 val update = state.copy(cancelCause = cause) 164 if (_state.compareAndSet(state, update)) { 165 state.invokeHandlers(this, cause) 166 return // done 167 } 168 } 169 else -> { 170 // completed normally without marker class, promote to CompletedContinuation in case 171 // if invokeOnCancellation if called later 172 if (_state.compareAndSet(state, CompletedContinuation(state, cancelCause = cause))) { 173 return // done 174 } 175 } 176 } 177 } 178 179 /* 180 * Attempt to postpone cancellation for reusable cancellable continuation 181 */ 182 private fun cancelLater(cause: Throwable): Boolean { 183 if (!resumeMode.isReusableMode) return false 184 val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false 185 return dispatched.postponeCancellation(cause) 186 } 187 188 public override fun cancel(cause: Throwable?): Boolean { 189 _state.loop { state -> 190 if (state !is NotCompleted) return false // false if already complete or cancelling 191 // Active -- update to final state 192 val update = CancelledContinuation(this, cause, handled = state is CancelHandler) 193 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure 194 // Invoke cancel handler if it was present 195 (state as? CancelHandler)?.let { callCancelHandler(it, cause) } 196 // Complete state update 197 detachChildIfNonResuable() 198 dispatchResume(resumeMode) // no need for additional cancellation checks 199 return true 200 } 201 } 202 203 internal fun parentCancelled(cause: Throwable) { 204 if (cancelLater(cause)) return 205 cancel(cause) 206 // Even if cancellation has failed, we should detach child to avoid potential leak 207 detachChildIfNonResuable() 208 } 209 210 private inline fun callCancelHandlerSafely(block: () -> Unit) { 211 try { 212 block() 213 } catch (ex: Throwable) { 214 // Handler should never fail, if it does -- it is an unhandled exception 215 handleCoroutineException( 216 context, 217 CompletionHandlerException("Exception in invokeOnCancellation handler for $this", ex) 218 ) 219 } 220 } 221 222 private fun callCancelHandler(handler: CompletionHandler, cause: Throwable?) = 223 /* 224 * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, 225 * because we play type tricks on Kotlin/JS and handler is not necessarily a function there 226 */ 227 callCancelHandlerSafely { handler.invokeIt(cause) } 228 229 fun callCancelHandler(handler: CancelHandler, cause: Throwable?) = 230 callCancelHandlerSafely { handler.invoke(cause) } 231 232 fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) { 233 try { 234 onCancellation.invoke(cause) 235 } catch (ex: Throwable) { 236 // Handler should never fail, if it does -- it is an unhandled exception 237 handleCoroutineException( 238 context, 239 CompletionHandlerException("Exception in resume onCancellation handler for $this", ex) 240 ) 241 } 242 } 243 244 /** 245 * It is used when parent is cancelled to get the cancellation cause for this continuation. 246 */ 247 open fun getContinuationCancellationCause(parent: Job): Throwable = 248 parent.getCancellationException() 249 250 private fun trySuspend(): Boolean { 251 _decision.loop { decision -> 252 when (decision) { 253 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true 254 RESUMED -> return false 255 else -> error("Already suspended") 256 } 257 } 258 } 259 260 private fun tryResume(): Boolean { 261 _decision.loop { decision -> 262 when (decision) { 263 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true 264 SUSPENDED -> return false 265 else -> error("Already resumed") 266 } 267 } 268 } 269 270 @PublishedApi 271 internal fun getResult(): Any? { 272 setupCancellation() 273 if (trySuspend()) return COROUTINE_SUSPENDED 274 // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state 275 val state = this.state 276 if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this) 277 // if the parent job was already cancelled, then throw the corresponding cancellation exception 278 // otherwise, there is a race if suspendCancellableCoroutine { cont -> ... } does cont.resume(...) 279 // before the block returns. This getResult would return a result as opposed to cancellation 280 // exception that should have happened if the continuation is dispatched for execution later. 281 if (resumeMode.isCancellableMode) { 282 val job = context[Job] 283 if (job != null && !job.isActive) { 284 val cause = job.getCancellationException() 285 cancelCompletedResult(state, cause) 286 throw recoverStackTrace(cause, this) 287 } 288 } 289 return getSuccessfulResult(state) 290 } 291 292 override fun resumeWith(result: Result<T>) = 293 resumeImpl(result.toState(this), resumeMode) 294 295 override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) = 296 resumeImpl(value, resumeMode, onCancellation) 297 298 public override fun invokeOnCancellation(handler: CompletionHandler) { 299 val cancelHandler = makeCancelHandler(handler) 300 _state.loop { state -> 301 when (state) { 302 is Active -> { 303 if (_state.compareAndSet(state, cancelHandler)) return // quit on cas success 304 } 305 is CancelHandler -> multipleHandlersError(handler, state) 306 is CompletedExceptionally -> { 307 /* 308 * Continuation was already cancelled or completed exceptionally. 309 * NOTE: multiple invokeOnCancellation calls with different handlers are not allowed, 310 * so we check to make sure handler was installed just once. 311 */ 312 if (!state.makeHandled()) multipleHandlersError(handler, state) 313 /* 314 * Call the handler only if it was cancelled (not called when completed exceptionally). 315 * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, 316 * because we play type tricks on Kotlin/JS and handler is not necessarily a function there 317 */ 318 if (state is CancelledContinuation) { 319 callCancelHandler(handler, (state as? CompletedExceptionally)?.cause) 320 } 321 return 322 } 323 is CompletedContinuation -> { 324 /* 325 * Continuation was already completed, and might already have cancel handler. 326 */ 327 if (state.cancelHandler != null) multipleHandlersError(handler, state) 328 // BeforeResumeCancelHandler does not need to be called on a completed continuation 329 if (cancelHandler is BeforeResumeCancelHandler) return 330 if (state.cancelled) { 331 // Was already cancelled while being dispatched -- invoke the handler directly 332 callCancelHandler(handler, state.cancelCause) 333 return 334 } 335 val update = state.copy(cancelHandler = cancelHandler) 336 if (_state.compareAndSet(state, update)) return // quit on cas success 337 } 338 else -> { 339 /* 340 * Continuation was already completed normally, but might get cancelled while being dispatched. 341 * Change its state to CompletedContinuation, unless we have BeforeResumeCancelHandler which 342 * does not need to be called in this case. 343 */ 344 if (cancelHandler is BeforeResumeCancelHandler) return 345 val update = CompletedContinuation(state, cancelHandler = cancelHandler) 346 if (_state.compareAndSet(state, update)) return // quit on cas success 347 } 348 } 349 } 350 } 351 352 private fun multipleHandlersError(handler: CompletionHandler, state: Any?) { 353 error("It's prohibited to register multiple handlers, tried to register $handler, already has $state") 354 } 355 356 private fun makeCancelHandler(handler: CompletionHandler): CancelHandler = 357 if (handler is CancelHandler) handler else InvokeOnCancel(handler) 358 359 private fun dispatchResume(mode: Int) { 360 if (tryResume()) return // completed before getResult invocation -- bail out 361 // otherwise, getResult has already commenced, i.e. completed later or in other thread 362 dispatch(mode) 363 } 364 365 private fun resumedState( 366 state: NotCompleted, 367 proposedUpdate: Any?, 368 resumeMode: Int, 369 onCancellation: ((cause: Throwable) -> Unit)?, 370 idempotent: Any? 371 ): Any? = when { 372 proposedUpdate is CompletedExceptionally -> { 373 assert { idempotent == null } // there are no idempotent exceptional resumes 374 assert { onCancellation == null } // only successful results can be cancelled 375 proposedUpdate 376 } 377 !resumeMode.isCancellableMode && idempotent == null -> proposedUpdate // cannot be cancelled in process, all is fine 378 onCancellation != null || (state is CancelHandler && state !is BeforeResumeCancelHandler) || idempotent != null -> 379 // mark as CompletedContinuation if special cases are present: 380 // Cancellation handlers that shall be called after resume or idempotent resume 381 CompletedContinuation(proposedUpdate, state as? CancelHandler, onCancellation, idempotent) 382 else -> proposedUpdate // simple case -- use the value directly 383 } 384 385 private fun resumeImpl( 386 proposedUpdate: Any?, 387 resumeMode: Int, 388 onCancellation: ((cause: Throwable) -> Unit)? = null 389 ) { 390 _state.loop { state -> 391 when (state) { 392 is NotCompleted -> { 393 val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null) 394 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure 395 detachChildIfNonResuable() 396 dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process 397 return // done 398 } 399 is CancelledContinuation -> { 400 /* 401 * If continuation was cancelled, then resume attempt must be ignored, 402 * because cancellation is asynchronous and may race with resume. 403 * Racy exceptions will be lost, too. 404 */ 405 if (state.makeResumed()) { // check if trying to resume one (otherwise error) 406 // call onCancellation 407 onCancellation?.let { callOnCancellation(it, state.cause) } 408 return // done 409 } 410 } 411 } 412 alreadyResumedError(proposedUpdate) // otherwise, an error (second resume attempt) 413 } 414 } 415 416 /** 417 * Similar to [tryResume], but does not actually completes resume (needs [completeResume] call). 418 * Returns [RESUME_TOKEN] when resumed, `null` when it was already resumed or cancelled. 419 */ 420 private fun tryResumeImpl( 421 proposedUpdate: Any?, 422 idempotent: Any?, 423 onCancellation: ((cause: Throwable) -> Unit)? 424 ): Symbol? { 425 _state.loop { state -> 426 when (state) { 427 is NotCompleted -> { 428 val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent) 429 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure 430 detachChildIfNonResuable() 431 return RESUME_TOKEN 432 } 433 is CompletedContinuation -> { 434 return if (idempotent != null && state.idempotentResume === idempotent) { 435 assert { state.result == proposedUpdate } // "Non-idempotent resume" 436 RESUME_TOKEN // resumed with the same token -- ok 437 } else { 438 null // resumed with a different token or non-idempotent -- too late 439 } 440 } 441 else -> return null // cannot resume -- not active anymore 442 } 443 } 444 } 445 446 private fun alreadyResumedError(proposedUpdate: Any?): Nothing { 447 error("Already resumed, but proposed with update $proposedUpdate") 448 } 449 450 // Unregister from parent job 451 private fun detachChildIfNonResuable() { 452 // If instance is reusable, do not detach on every reuse, #releaseInterceptedContinuation will do it for us in the end 453 if (!isReusable()) detachChild() 454 } 455 456 /** 457 * Detaches from the parent. 458 * Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true` 459 */ 460 internal fun detachChild() { 461 val handle = parentHandle 462 handle?.dispose() 463 parentHandle = NonDisposableHandle 464 } 465 466 // Note: Always returns RESUME_TOKEN | null 467 override fun tryResume(value: T, idempotent: Any?): Any? = 468 tryResumeImpl(value, idempotent, onCancellation = null) 469 470 override fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? = 471 tryResumeImpl(value, idempotent, onCancellation) 472 473 override fun tryResumeWithException(exception: Throwable): Any? = 474 tryResumeImpl(CompletedExceptionally(exception), idempotent = null, onCancellation = null) 475 476 // note: token is always RESUME_TOKEN 477 override fun completeResume(token: Any) { 478 assert { token === RESUME_TOKEN } 479 dispatchResume(resumeMode) 480 } 481 482 override fun CoroutineDispatcher.resumeUndispatched(value: T) { 483 val dc = delegate as? DispatchedContinuation 484 resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode) 485 } 486 487 override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) { 488 val dc = delegate as? DispatchedContinuation 489 resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode) 490 } 491 492 @Suppress("UNCHECKED_CAST") 493 override fun <T> getSuccessfulResult(state: Any?): T = 494 when (state) { 495 is CompletedContinuation -> state.result as T 496 else -> state as T 497 } 498 499 // The exceptional state in CancellableContinuationImpl is stored directly and it is not recovered yet. 500 // The stacktrace recovery is invoked here. 501 override fun getExceptionalResult(state: Any?): Throwable? = 502 super.getExceptionalResult(state)?.let { recoverStackTrace(it, delegate) } 503 504 // For nicer debugging 505 public override fun toString(): String = 506 "${nameString()}(${delegate.toDebugString()}){$state}@$hexAddress" 507 508 protected open fun nameString(): String = 509 "CancellableContinuation" 510 511 } 512 513 // Marker for active continuation 514 internal interface NotCompleted 515 516 private object Active : NotCompleted { toStringnull517 override fun toString(): String = "Active" 518 } 519 520 /** 521 * Base class for all [CancellableContinuation.invokeOnCancellation] handlers to avoid an extra instance 522 * on JVM, yet support JS where you cannot extend from a functional type. 523 */ 524 internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted 525 526 /** 527 * Base class for all [CancellableContinuation.invokeOnCancellation] handlers that don't need to be invoked 528 * if continuation is cancelled after resumption, during dispatch, because the corresponding resources 529 * were already released before calling `resume`. This cancel handler is called only before `resume`. 530 * It avoids allocation of [CompletedContinuation] instance during resume on JVM. 531 */ 532 internal abstract class BeforeResumeCancelHandler : CancelHandler() 533 534 // Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly 535 private class InvokeOnCancel( // Clashes with InvokeOnCancellation 536 private val handler: CompletionHandler 537 ) : CancelHandler() { 538 override fun invoke(cause: Throwable?) { 539 handler.invoke(cause) 540 } 541 override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]" 542 } 543 544 // Completed with additional metadata 545 private data class CompletedContinuation( 546 @JvmField val result: Any?, 547 @JvmField val cancelHandler: CancelHandler? = null, // installed via invokeOnCancellation 548 @JvmField val onCancellation: ((cause: Throwable) -> Unit)? = null, // installed via resume block 549 @JvmField val idempotentResume: Any? = null, 550 @JvmField val cancelCause: Throwable? = null 551 ) { 552 val cancelled: Boolean get() = cancelCause != null 553 invokeHandlersnull554 fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) { 555 cancelHandler?.let { cont.callCancelHandler(it, cause) } 556 onCancellation?.let { cont.callOnCancellation(it, cause) } 557 } 558 } 559