• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.coroutines.*
10 import kotlin.coroutines.intrinsics.*
11 import kotlin.jvm.*
12 
13 private const val UNDECIDED = 0
14 private const val SUSPENDED = 1
15 private const val RESUMED = 2
16 
17 /**
18  * @suppress **This is unstable API and it is subject to change.**
19  */
20 @PublishedApi
21 internal open class CancellableContinuationImpl<in T>(
22     final override val delegate: Continuation<T>,
23     resumeMode: Int
24 ) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame {
25     public override val context: CoroutineContext = delegate.context
26 
27     /*
28      * Implementation notes
29      *
30      * AbstractContinuation is a subset of Job with following limitations:
31      * 1) It can have only cancellation listeners
32      * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
33      * 3) It can have at most one cancellation listener
34      * 4) Its cancellation listeners cannot be deregistered
35      * As a consequence it has much simpler state machine, more lightweight machinery and
36      * less dependencies.
37      */
38 
39     /* decision state machine
40 
41         +-----------+   trySuspend   +-----------+
42         | UNDECIDED | -------------> | SUSPENDED |
43         +-----------+                +-----------+
44               |
45               | tryResume
46               V
47         +-----------+
48         |  RESUMED  |
49         +-----------+
50 
51         Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
52      */
53     private val _decision = atomic(UNDECIDED)
54 
55     /*
56        === Internal states ===
57        name        state class          public state    description
58        ------      ------------         ------------    -----------
59        ACTIVE      Active               : Active        active, no listeners
60        SINGLE_A    CancelHandler        : Active        active, one cancellation listener
61        CANCELLED   CancelledContinuation: Cancelled     cancelled (final state)
62        COMPLETED   any                  : Completed     produced some result or threw an exception (final state)
63      */
64     private val _state = atomic<Any?>(Active)
65 
66     @Volatile
67     private var parentHandle: DisposableHandle? = null
68 
69     internal val state: Any? get() = _state.value
70 
71     public override val isActive: Boolean get() = state is NotCompleted
72 
73     public override val isCompleted: Boolean get() = state !is NotCompleted
74 
75     public override val isCancelled: Boolean get() = state is CancelledContinuation
76 
77     public override fun initCancellability() {
78         // This method does nothing. Leftover for binary compatibility with old compiled code
79     }
80 
81     // It is only invoked from an internal getResult function, so we can be sure it is not invoked twice
82     private fun installParentCancellationHandler() {
83         if (isCompleted) return // fast path 1 -- don't need to do anything if already completed
84         val parent = delegate.context[Job] ?: return // fast path 2 -- don't do anything without parent
85         parent.start() // make sure the parent is started
86         val handle = parent.invokeOnCompletion(
87             onCancelling = true,
88             handler = ChildContinuation(parent, this).asHandler
89         )
90         parentHandle = handle
91         // now check our state _after_ registering (could have completed while we were registering)
92         if (isCompleted) {
93             handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle
94             parentHandle = NonDisposableHandle // release it just in case, to aid GC
95         }
96     }
97 
98     public override val callerFrame: CoroutineStackFrame?
99         get() = delegate as? CoroutineStackFrame
100 
101     public override fun getStackTraceElement(): StackTraceElement? = null
102 
103     override fun takeState(): Any? = state
104 
105     override fun cancelResult(state: Any?, cause: Throwable) {
106         if (state is CompletedWithCancellation) {
107             invokeHandlerSafely {
108                 state.onCancellation(cause)
109             }
110         }
111     }
112 
113     public override fun cancel(cause: Throwable?): Boolean {
114         _state.loop { state ->
115             if (state !is NotCompleted) return false // false if already complete or cancelling
116             // Active -- update to final state
117             val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
118             if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
119             // Invoke cancel handler if it was present
120             if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) }
121             // Complete state update
122             disposeParentHandle()
123             dispatchResume(mode = MODE_ATOMIC_DEFAULT)
124             return true
125         }
126     }
127 
128     private inline fun invokeHandlerSafely(block: () -> Unit) {
129         try {
130             block()
131         } catch (ex: Throwable) {
132             // Handler should never fail, if it does -- it is an unhandled exception
133             handleCoroutineException(
134                 context,
135                 CompletionHandlerException("Exception in cancellation handler for $this", ex)
136             )
137         }
138     }
139 
140     /**
141      * It is used when parent is cancelled to get the cancellation cause for this continuation.
142      */
143     open fun getContinuationCancellationCause(parent: Job): Throwable =
144         parent.getCancellationException()
145 
146     private fun trySuspend(): Boolean {
147         _decision.loop { decision ->
148             when (decision) {
149                 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
150                 RESUMED -> return false
151                 else -> error("Already suspended")
152             }
153         }
154     }
155 
156     private fun tryResume(): Boolean {
157         _decision.loop { decision ->
158             when (decision) {
159                 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
160                 SUSPENDED -> return false
161                 else -> error("Already resumed")
162             }
163         }
164     }
165 
166     @PublishedApi
167     internal fun getResult(): Any? {
168         installParentCancellationHandler()
169         if (trySuspend()) return COROUTINE_SUSPENDED
170         // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
171         val state = this.state
172         if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
173         // if the parent job was already cancelled, then throw the corresponding cancellation exception
174         // otherwise, there is a race is suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
175         // before the block returns. This getResult would return a result as opposed to cancellation
176         // exception that should have happened if the continuation is dispatched for execution later.
177         if (resumeMode == MODE_CANCELLABLE) {
178             val job = context[Job]
179             if (job != null && !job.isActive) {
180                 val cause = job.getCancellationException()
181                 cancelResult(state, cause)
182                 throw recoverStackTrace(cause, this)
183             }
184         }
185         return getSuccessfulResult(state)
186     }
187 
188     override fun resumeWith(result: Result<T>) {
189         resumeImpl(result.toState(), resumeMode)
190     }
191 
192     override fun resume(value: T, onCancellation: (cause: Throwable) -> Unit) {
193         val cancelled = resumeImpl(CompletedWithCancellation(value, onCancellation), resumeMode)
194         if (cancelled != null) {
195             // too late to resume (was cancelled) -- call handler
196             invokeHandlerSafely {
197                 onCancellation(cancelled.cause)
198             }
199         }
200     }
201 
202     internal fun resumeWithExceptionMode(exception: Throwable, mode: Int) =
203         resumeImpl(CompletedExceptionally(exception), mode)
204 
205     public override fun invokeOnCancellation(handler: CompletionHandler) {
206         var handleCache: CancelHandler? = null
207         _state.loop { state ->
208             when (state) {
209                 is Active -> {
210                     val node = handleCache ?: makeHandler(handler).also { handleCache = it }
211                     if (_state.compareAndSet(state, node)) return // quit on cas success
212                 }
213                 is CancelHandler -> multipleHandlersError(handler, state)
214                 is CancelledContinuation -> {
215                     /*
216                      * Continuation was already cancelled, invoke directly.
217                      * NOTE: multiple invokeOnCancellation calls with different handlers are not allowed,
218                      * so we check to make sure that handler was installed just once.
219                      */
220                     if (!state.makeHandled()) multipleHandlersError(handler, state)
221                     /*
222                      * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
223                      * because we play type tricks on Kotlin/JS and handler is not necessarily a function there
224                      */
225                     invokeHandlerSafely { handler.invokeIt((state as? CompletedExceptionally)?.cause) }
226                     return
227                 }
228                 else -> {
229                     /*
230                      * Continuation was already completed, do nothing.
231                      * NOTE: multiple invokeOnCancellation calls with different handlers are not allowed,
232                      * but we have no way to check that it was installed just once in this case.
233                      */
234                     return
235                 }
236             }
237         }
238     }
239 
240     private fun multipleHandlersError(handler: CompletionHandler, state: Any?) {
241         error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
242     }
243 
244     private fun makeHandler(handler: CompletionHandler): CancelHandler =
245         if (handler is CancelHandler) handler else InvokeOnCancel(handler)
246 
247     private fun dispatchResume(mode: Int) {
248         if (tryResume()) return // completed before getResult invocation -- bail out
249         // otherwise, getResult has already commenced, i.e. completed later or in other thread
250         dispatch(mode)
251     }
252 
253     // returns null when successfully dispatched resumed, CancelledContinuation if too late (was already cancelled)
254     private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int): CancelledContinuation? {
255         _state.loop { state ->
256             when (state) {
257                 is NotCompleted -> {
258                     if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure
259                     disposeParentHandle()
260                     dispatchResume(resumeMode)
261                     return null
262                 }
263                 is CancelledContinuation -> {
264                     /*
265                      * If continuation was cancelled, then resume attempt must be ignored,
266                      * because cancellation is asynchronous and may race with resume.
267                      * Racy exceptions will be lost, too.
268                      */
269                     if (state.makeResumed()) return state // tried to resume just once, but was cancelled
270                 }
271             }
272             alreadyResumedError(proposedUpdate) // otherwise -- an error (second resume attempt)
273         }
274     }
275 
276     private fun alreadyResumedError(proposedUpdate: Any?) {
277         error("Already resumed, but proposed with update $proposedUpdate")
278     }
279 
280     // Unregister from parent job
281     private fun disposeParentHandle() {
282         parentHandle?.let { // volatile read parentHandle (once)
283             it.dispose()
284             parentHandle = NonDisposableHandle // release it just in case, to aid GC
285         }
286     }
287 
288     override fun tryResume(value: T, idempotent: Any?): Any? {
289         _state.loop { state ->
290             when (state) {
291                 is NotCompleted -> {
292                     val update: Any? = if (idempotent == null) value else
293                         CompletedIdempotentResult(idempotent, value, state)
294                     if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
295                     disposeParentHandle()
296                     return state
297                 }
298                 is CompletedIdempotentResult -> {
299                     return if (state.idempotentResume === idempotent) {
300                         assert { state.result === value } // "Non-idempotent resume"
301                         state.token
302                     } else {
303                         null
304                     }
305                 }
306                 else -> return null // cannot resume -- not active anymore
307             }
308         }
309     }
310 
311     override fun tryResumeWithException(exception: Throwable): Any? {
312         _state.loop { state ->
313             when (state) {
314                 is NotCompleted -> {
315                     val update = CompletedExceptionally(exception)
316                     if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
317                     disposeParentHandle()
318                     return state
319                 }
320                 else -> return null // cannot resume -- not active anymore
321             }
322         }
323     }
324 
325     override fun completeResume(token: Any) {
326         // note: We don't actually use token anymore, because handler needs to be invoked on cancellation only
327         dispatchResume(resumeMode)
328     }
329 
330     override fun CoroutineDispatcher.resumeUndispatched(value: T) {
331         val dc = delegate as? DispatchedContinuation
332         resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
333     }
334 
335     override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
336         val dc = delegate as? DispatchedContinuation
337         resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
338     }
339 
340     @Suppress("UNCHECKED_CAST")
341     override fun <T> getSuccessfulResult(state: Any?): T =
342         when (state) {
343             is CompletedIdempotentResult -> state.result as T
344             is CompletedWithCancellation -> state.result as T
345             else -> state as T
346         }
347 
348     // For nicer debugging
349     public override fun toString(): String =
350         "${nameString()}(${delegate.toDebugString()}){$state}@$hexAddress"
351 
352     protected open fun nameString(): String =
353         "CancellableContinuation"
354 
355 }
356 
357 // Marker for active continuation
358 internal interface NotCompleted
359 
360 private object Active : NotCompleted {
toStringnull361     override fun toString(): String = "Active"
362 }
363 
364 internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted
365 
366 // Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly
367 private class InvokeOnCancel( // Clashes with InvokeOnCancellation
368     private val handler: CompletionHandler
369 ) : CancelHandler() {
370     override fun invoke(cause: Throwable?) {
371         handler.invoke(cause)
372     }
373     override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]"
374 }
375 
376 private class CompletedIdempotentResult(
377     @JvmField val idempotentResume: Any?,
378     @JvmField val result: Any?,
379     @JvmField val token: NotCompleted
380 ) {
toStringnull381     override fun toString(): String = "CompletedIdempotentResult[$result]"
382 }
383 
384 private class CompletedWithCancellation(
385     @JvmField val result: Any?,
386     @JvmField val onCancellation: (cause: Throwable) -> Unit
387 ) {
388     override fun toString(): String = "CompletedWithCancellation[$result]"
389 }
390 
391