1 /* <lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.internal.* 9 import kotlin.coroutines.* 10 import kotlin.coroutines.intrinsics.* 11 import kotlin.jvm.* 12 13 private const val UNDECIDED = 0 14 private const val SUSPENDED = 1 15 private const val RESUMED = 2 16 17 /** 18 * @suppress **This is unstable API and it is subject to change.** 19 */ 20 @PublishedApi 21 internal open class CancellableContinuationImpl<in T>( 22 final override val delegate: Continuation<T>, 23 resumeMode: Int 24 ) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame { 25 public override val context: CoroutineContext = delegate.context 26 27 /* 28 * Implementation notes 29 * 30 * AbstractContinuation is a subset of Job with following limitations: 31 * 1) It can have only cancellation listeners 32 * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately') 33 * 3) It can have at most one cancellation listener 34 * 4) Its cancellation listeners cannot be deregistered 35 * As a consequence it has much simpler state machine, more lightweight machinery and 36 * less dependencies. 37 */ 38 39 /* decision state machine 40 41 +-----------+ trySuspend +-----------+ 42 | UNDECIDED | -------------> | SUSPENDED | 43 +-----------+ +-----------+ 44 | 45 | tryResume 46 V 47 +-----------+ 48 | RESUMED | 49 +-----------+ 50 51 Note: both tryResume and trySuspend can be invoked at most once, first invocation wins 52 */ 53 private val _decision = atomic(UNDECIDED) 54 55 /* 56 === Internal states === 57 name state class public state description 58 ------ ------------ ------------ ----------- 59 ACTIVE Active : Active active, no listeners 60 SINGLE_A CancelHandler : Active active, one cancellation listener 61 CANCELLED CancelledContinuation: Cancelled cancelled (final state) 62 COMPLETED any : Completed produced some result or threw an exception (final state) 63 */ 64 private val _state = atomic<Any?>(Active) 65 66 @Volatile 67 private var parentHandle: DisposableHandle? = null 68 69 internal val state: Any? get() = _state.value 70 71 public override val isActive: Boolean get() = state is NotCompleted 72 73 public override val isCompleted: Boolean get() = state !is NotCompleted 74 75 public override val isCancelled: Boolean get() = state is CancelledContinuation 76 77 public override fun initCancellability() { 78 // This method does nothing. Leftover for binary compatibility with old compiled code 79 } 80 81 // It is only invoked from an internal getResult function, so we can be sure it is not invoked twice 82 private fun installParentCancellationHandler() { 83 if (isCompleted) return // fast path 1 -- don't need to do anything if already completed 84 val parent = delegate.context[Job] ?: return // fast path 2 -- don't do anything without parent 85 parent.start() // make sure the parent is started 86 val handle = parent.invokeOnCompletion( 87 onCancelling = true, 88 handler = ChildContinuation(parent, this).asHandler 89 ) 90 parentHandle = handle 91 // now check our state _after_ registering (could have completed while we were registering) 92 if (isCompleted) { 93 handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle 94 parentHandle = NonDisposableHandle // release it just in case, to aid GC 95 } 96 } 97 98 public override val callerFrame: CoroutineStackFrame? 99 get() = delegate as? CoroutineStackFrame 100 101 public override fun getStackTraceElement(): StackTraceElement? = null 102 103 override fun takeState(): Any? = state 104 105 override fun cancelResult(state: Any?, cause: Throwable) { 106 if (state is CompletedWithCancellation) { 107 invokeHandlerSafely { 108 state.onCancellation(cause) 109 } 110 } 111 } 112 113 public override fun cancel(cause: Throwable?): Boolean { 114 _state.loop { state -> 115 if (state !is NotCompleted) return false // false if already complete or cancelling 116 // Active -- update to final state 117 val update = CancelledContinuation(this, cause, handled = state is CancelHandler) 118 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure 119 // Invoke cancel handler if it was present 120 if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) } 121 // Complete state update 122 disposeParentHandle() 123 dispatchResume(mode = MODE_ATOMIC_DEFAULT) 124 return true 125 } 126 } 127 128 private inline fun invokeHandlerSafely(block: () -> Unit) { 129 try { 130 block() 131 } catch (ex: Throwable) { 132 // Handler should never fail, if it does -- it is an unhandled exception 133 handleCoroutineException( 134 context, 135 CompletionHandlerException("Exception in cancellation handler for $this", ex) 136 ) 137 } 138 } 139 140 /** 141 * It is used when parent is cancelled to get the cancellation cause for this continuation. 142 */ 143 open fun getContinuationCancellationCause(parent: Job): Throwable = 144 parent.getCancellationException() 145 146 private fun trySuspend(): Boolean { 147 _decision.loop { decision -> 148 when (decision) { 149 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true 150 RESUMED -> return false 151 else -> error("Already suspended") 152 } 153 } 154 } 155 156 private fun tryResume(): Boolean { 157 _decision.loop { decision -> 158 when (decision) { 159 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true 160 SUSPENDED -> return false 161 else -> error("Already resumed") 162 } 163 } 164 } 165 166 @PublishedApi 167 internal fun getResult(): Any? { 168 installParentCancellationHandler() 169 if (trySuspend()) return COROUTINE_SUSPENDED 170 // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state 171 val state = this.state 172 if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this) 173 // if the parent job was already cancelled, then throw the corresponding cancellation exception 174 // otherwise, there is a race is suspendCancellableCoroutine { cont -> ... } does cont.resume(...) 175 // before the block returns. This getResult would return a result as opposed to cancellation 176 // exception that should have happened if the continuation is dispatched for execution later. 177 if (resumeMode == MODE_CANCELLABLE) { 178 val job = context[Job] 179 if (job != null && !job.isActive) { 180 val cause = job.getCancellationException() 181 cancelResult(state, cause) 182 throw recoverStackTrace(cause, this) 183 } 184 } 185 return getSuccessfulResult(state) 186 } 187 188 override fun resumeWith(result: Result<T>) { 189 resumeImpl(result.toState(), resumeMode) 190 } 191 192 override fun resume(value: T, onCancellation: (cause: Throwable) -> Unit) { 193 val cancelled = resumeImpl(CompletedWithCancellation(value, onCancellation), resumeMode) 194 if (cancelled != null) { 195 // too late to resume (was cancelled) -- call handler 196 invokeHandlerSafely { 197 onCancellation(cancelled.cause) 198 } 199 } 200 } 201 202 internal fun resumeWithExceptionMode(exception: Throwable, mode: Int) = 203 resumeImpl(CompletedExceptionally(exception), mode) 204 205 public override fun invokeOnCancellation(handler: CompletionHandler) { 206 var handleCache: CancelHandler? = null 207 _state.loop { state -> 208 when (state) { 209 is Active -> { 210 val node = handleCache ?: makeHandler(handler).also { handleCache = it } 211 if (_state.compareAndSet(state, node)) return // quit on cas success 212 } 213 is CancelHandler -> multipleHandlersError(handler, state) 214 is CancelledContinuation -> { 215 /* 216 * Continuation was already cancelled, invoke directly. 217 * NOTE: multiple invokeOnCancellation calls with different handlers are not allowed, 218 * so we check to make sure that handler was installed just once. 219 */ 220 if (!state.makeHandled()) multipleHandlersError(handler, state) 221 /* 222 * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, 223 * because we play type tricks on Kotlin/JS and handler is not necessarily a function there 224 */ 225 invokeHandlerSafely { handler.invokeIt((state as? CompletedExceptionally)?.cause) } 226 return 227 } 228 else -> { 229 /* 230 * Continuation was already completed, do nothing. 231 * NOTE: multiple invokeOnCancellation calls with different handlers are not allowed, 232 * but we have no way to check that it was installed just once in this case. 233 */ 234 return 235 } 236 } 237 } 238 } 239 240 private fun multipleHandlersError(handler: CompletionHandler, state: Any?) { 241 error("It's prohibited to register multiple handlers, tried to register $handler, already has $state") 242 } 243 244 private fun makeHandler(handler: CompletionHandler): CancelHandler = 245 if (handler is CancelHandler) handler else InvokeOnCancel(handler) 246 247 private fun dispatchResume(mode: Int) { 248 if (tryResume()) return // completed before getResult invocation -- bail out 249 // otherwise, getResult has already commenced, i.e. completed later or in other thread 250 dispatch(mode) 251 } 252 253 // returns null when successfully dispatched resumed, CancelledContinuation if too late (was already cancelled) 254 private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int): CancelledContinuation? { 255 _state.loop { state -> 256 when (state) { 257 is NotCompleted -> { 258 if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure 259 disposeParentHandle() 260 dispatchResume(resumeMode) 261 return null 262 } 263 is CancelledContinuation -> { 264 /* 265 * If continuation was cancelled, then resume attempt must be ignored, 266 * because cancellation is asynchronous and may race with resume. 267 * Racy exceptions will be lost, too. 268 */ 269 if (state.makeResumed()) return state // tried to resume just once, but was cancelled 270 } 271 } 272 alreadyResumedError(proposedUpdate) // otherwise -- an error (second resume attempt) 273 } 274 } 275 276 private fun alreadyResumedError(proposedUpdate: Any?) { 277 error("Already resumed, but proposed with update $proposedUpdate") 278 } 279 280 // Unregister from parent job 281 private fun disposeParentHandle() { 282 parentHandle?.let { // volatile read parentHandle (once) 283 it.dispose() 284 parentHandle = NonDisposableHandle // release it just in case, to aid GC 285 } 286 } 287 288 override fun tryResume(value: T, idempotent: Any?): Any? { 289 _state.loop { state -> 290 when (state) { 291 is NotCompleted -> { 292 val update: Any? = if (idempotent == null) value else 293 CompletedIdempotentResult(idempotent, value, state) 294 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure 295 disposeParentHandle() 296 return state 297 } 298 is CompletedIdempotentResult -> { 299 return if (state.idempotentResume === idempotent) { 300 assert { state.result === value } // "Non-idempotent resume" 301 state.token 302 } else { 303 null 304 } 305 } 306 else -> return null // cannot resume -- not active anymore 307 } 308 } 309 } 310 311 override fun tryResumeWithException(exception: Throwable): Any? { 312 _state.loop { state -> 313 when (state) { 314 is NotCompleted -> { 315 val update = CompletedExceptionally(exception) 316 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure 317 disposeParentHandle() 318 return state 319 } 320 else -> return null // cannot resume -- not active anymore 321 } 322 } 323 } 324 325 override fun completeResume(token: Any) { 326 // note: We don't actually use token anymore, because handler needs to be invoked on cancellation only 327 dispatchResume(resumeMode) 328 } 329 330 override fun CoroutineDispatcher.resumeUndispatched(value: T) { 331 val dc = delegate as? DispatchedContinuation 332 resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode) 333 } 334 335 override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) { 336 val dc = delegate as? DispatchedContinuation 337 resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode) 338 } 339 340 @Suppress("UNCHECKED_CAST") 341 override fun <T> getSuccessfulResult(state: Any?): T = 342 when (state) { 343 is CompletedIdempotentResult -> state.result as T 344 is CompletedWithCancellation -> state.result as T 345 else -> state as T 346 } 347 348 // For nicer debugging 349 public override fun toString(): String = 350 "${nameString()}(${delegate.toDebugString()}){$state}@$hexAddress" 351 352 protected open fun nameString(): String = 353 "CancellableContinuation" 354 355 } 356 357 // Marker for active continuation 358 internal interface NotCompleted 359 360 private object Active : NotCompleted { toStringnull361 override fun toString(): String = "Active" 362 } 363 364 internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted 365 366 // Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly 367 private class InvokeOnCancel( // Clashes with InvokeOnCancellation 368 private val handler: CompletionHandler 369 ) : CancelHandler() { 370 override fun invoke(cause: Throwable?) { 371 handler.invoke(cause) 372 } 373 override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]" 374 } 375 376 private class CompletedIdempotentResult( 377 @JvmField val idempotentResume: Any?, 378 @JvmField val result: Any?, 379 @JvmField val token: NotCompleted 380 ) { toStringnull381 override fun toString(): String = "CompletedIdempotentResult[$result]" 382 } 383 384 private class CompletedWithCancellation( 385 @JvmField val result: Any?, 386 @JvmField val onCancellation: (cause: Throwable) -> Unit 387 ) { 388 override fun toString(): String = "CompletedWithCancellation[$result]" 389 } 390 391