• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 import kotlin.coroutines.intrinsics.*
10 
11 // --------------- cancellable continuations ---------------
12 
13 /**
14  * Cancellable continuation. It is _completed_ when resumed or cancelled.
15  * When the [cancel] function is explicitly invoked, this continuation immediately resumes with a [CancellationException] or
16  * the specified cancel cause.
17  *
18  * An instance of `CancellableContinuation` is created by the [suspendCancellableCoroutine] function.
19  *
20  * Cancellable continuation has three states (as subset of [Job] states):
21  *
22  * | **State**                           | [isActive] | [isCompleted] | [isCancelled] |
23  * | ----------------------------------- | ---------- | ------------- | ------------- |
24  * | _Active_ (initial state)            | `true`     | `false`       | `false`       |
25  * | _Resumed_ (final _completed_ state) | `false`    | `true`        | `false`       |
26  * | _Canceled_ (final _completed_ state)| `false`    | `true`        | `true`        |
27  *
28  * Invocation of [cancel] transitions this continuation from _active_ to _cancelled_ state, while
29  * invocation of [Continuation.resume] or [Continuation.resumeWithException] transitions it from _active_ to _resumed_ state.
30  *
31  * A [cancelled][isCancelled] continuation implies that it is [completed][isCompleted].
32  *
33  * Invocation of [Continuation.resume] or [Continuation.resumeWithException] in _resumed_ state produces an [IllegalStateException],
34  * but is ignored in _cancelled_ state.
35  *
36  * ```
37  *    +-----------+   resume    +---------+
38  *    |  Active   | ----------> | Resumed |
39  *    +-----------+             +---------+
40  *          |
41  *          | cancel
42  *          V
43  *    +-----------+
44  *    | Cancelled |
45  *    +-----------+
46  * ```
47  */
48 public interface CancellableContinuation<in T> : Continuation<T> {
49     /**
50      * Returns `true` when this continuation is active -- it has not completed or cancelled yet.
51      */
52     public val isActive: Boolean
53 
54     /**
55      * Returns `true` when this continuation has completed for any reason. A cancelled continuation
56      * is also considered complete.
57      */
58     public val isCompleted: Boolean
59 
60     /**
61      * Returns `true` if this continuation was [cancelled][cancel].
62      *
63      * It implies that [isActive] is `false` and [isCompleted] is `true`.
64      */
65     public val isCancelled: Boolean
66 
67     /**
68      * Tries to resume this continuation with the specified [value] and returns a non-null object token if successful,
69      * or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
70      * [completeResume] must be invoked with it.
71      *
72      * When [idempotent] is not `null`, this function performs an _idempotent_ operation, so that
73      * further invocations with the same non-null reference produce the same result.
74      *
75      * @suppress **This is unstable API and it is subject to change.**
76      */
77     @InternalCoroutinesApi
78     public fun tryResume(value: T, idempotent: Any? = null): Any?
79 
80     /**
81      * Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
82      * delivered to the caller because of the dispatch in the process, so that atomicity delivery
83      * guaranteed can be provided by having a cancellation fallback.
84      *
85      * Implementation note: current implementation always returns RESUME_TOKEN or `null`
86      *
87      * @suppress  **This is unstable API and it is subject to change.**
88      */
89     @InternalCoroutinesApi
90     public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?
91 
92     /**
93      * Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
94      * or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
95      * [completeResume] must be invoked with it.
96      *
97      * @suppress **This is unstable API and it is subject to change.**
98      */
99     @InternalCoroutinesApi
100     public fun tryResumeWithException(exception: Throwable): Any?
101 
102     /**
103      * Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
104      *
105      * @suppress **This is unstable API and it is subject to change.**
106      */
107     @InternalCoroutinesApi
108     public fun completeResume(token: Any)
109 
110     /**
111      * Internal function that setups cancellation behavior in [suspendCancellableCoroutine].
112      * It's illegal to call this function in any non-`kotlinx.coroutines` code and
113      * such calls lead to undefined behaviour.
114      * Exposed in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body.
115      *
116      * @suppress **This is unstable API and it is subject to change.**
117      */
118     @InternalCoroutinesApi
119     public fun initCancellability()
120 
121     /**
122      * Cancels this continuation with an optional cancellation `cause`. The result is `true` if this continuation was
123      * cancelled as a result of this invocation, and `false` otherwise.
124      */
125     public fun cancel(cause: Throwable? = null): Boolean
126 
127     /**
128      * Registers a [handler] to be **synchronously** invoked on [cancellation][cancel] (regular or exceptional) of this continuation.
129      * When the continuation is already cancelled, the handler is immediately invoked
130      * with the cancellation exception. Otherwise, the handler will be invoked as soon as this
131      * continuation is cancelled.
132      *
133      * The installed [handler] should not throw any exceptions.
134      * If it does, they will get caught, wrapped into a [CompletionHandlerException] and
135      * processed as an uncaught exception in the context of the current coroutine
136      * (see [CoroutineExceptionHandler]).
137      *
138      * At most one [handler] can be installed on a continuation. Attempt to call `invokeOnCancellation` second
139      * time produces [IllegalStateException].
140      *
141      * This handler is also called when this continuation [resumes][Continuation.resume] normally (with a value) and then
142      * is cancelled while waiting to be dispatched. More generally speaking, this handler is called whenever
143      * the caller of [suspendCancellableCoroutine] is getting a [CancellationException].
144      *
145      * A typical example for `invokeOnCancellation` usage is given in
146      * the documentation for the [suspendCancellableCoroutine] function.
147      *
148      * **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
149      * This `handler` can be invoked concurrently with the surrounding code.
150      * There is no guarantee on the execution context in which the `handler` will be invoked.
151      */
152     public fun invokeOnCancellation(handler: CompletionHandler)
153 
154     /**
155      * Resumes this continuation with the specified [value] in the invoker thread without going through
156      * the [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
157      * This function is designed to only be used by [CoroutineDispatcher] implementations.
158      * **It should not be used in general code**.
159      *
160      * **Note: This function is experimental.** Its signature general code may be changed in the future.
161      */
162     @ExperimentalCoroutinesApi
163     public fun CoroutineDispatcher.resumeUndispatched(value: T)
164 
165     /**
166      * Resumes this continuation with the specified [exception] in the invoker thread without going through
167      * the [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
168      * This function is designed to only be used by [CoroutineDispatcher] implementations.
169      * **It should not be used in general code**.
170      *
171      * **Note: This function is experimental.** Its signature general code may be changed in the future.
172      */
173     @ExperimentalCoroutinesApi
174     public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
175 
176     /**
177      * Resumes this continuation with the specified `value` and calls the specified `onCancellation`
178      * handler when either resumed too late (when continuation was already cancelled) or, although resumed
179      * successfully (before cancellation), the coroutine's job was cancelled before it had a
180      * chance to run in its dispatcher, so that the suspended function threw an exception
181      * instead of returning this value.
182      *
183      * The installed [onCancellation] handler should not throw any exceptions.
184      * If it does, they will get caught, wrapped into a [CompletionHandlerException] and
185      * processed as an uncaught exception in the context of the current coroutine
186      * (see [CoroutineExceptionHandler]).
187      *
188      * This function shall be used when resuming with a resource that must be closed by
189      * code that called the corresponding suspending function, for example:
190      *
191      * ```
192      * continuation.resume(resource) {
193      *     resource.close()
194      * }
195      * ```
196      *
197      * A more complete example and further details are given in
198      * the documentation for the [suspendCancellableCoroutine] function.
199      *
200      * **Note**: The [onCancellation] handler must be fast, non-blocking, and thread-safe.
201      * It can be invoked concurrently with the surrounding code.
202      * There is no guarantee on the execution context of its invocation.
203      */
204     @ExperimentalCoroutinesApi // since 1.2.0
205     public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
206 }
207 
208 /**
209  * Suspends the coroutine like [suspendCoroutine], but providing a [CancellableContinuation] to
210  * the [block]. This function throws a [CancellationException] if the [Job] of the coroutine is
211  * cancelled or completed while it is suspended.
212  *
213  * A typical use of this function is to suspend a coroutine while waiting for a result
214  * from a single-shot callback API and to return the result to the caller.
215  * For multi-shot callback APIs see [callbackFlow][kotlinx.coroutines.flow.callbackFlow].
216  *
217  * ```
218  * suspend fun awaitCallback(): T = suspendCancellableCoroutine { continuation ->
219  *     val callback = object : Callback { // Implementation of some callback interface
220  *         override fun onCompleted(value: T) {
221  *             // Resume coroutine with a value provided by the callback
222  *             continuation.resume(value)
223  *         }
224  *         override fun onApiError(cause: Throwable) {
225  *             // Resume coroutine with an exception provided by the callback
226  *             continuation.resumeWithException(cause)
227  *         }
228  *     }
229  *     // Register callback with an API
230  *     api.register(callback)
231  *     // Remove callback on cancellation
232  *     continuation.invokeOnCancellation { api.unregister(callback) }
233  *     // At this point the coroutine is suspended by suspendCancellableCoroutine until callback fires
234  * }
235  * ```
236  *
237  * > The callback `register`/`unregister` methods provided by an external API must be thread-safe, because
238  * > `invokeOnCancellation` block can be called at any time due to asynchronous nature of cancellation, even
239  * > concurrently with the call of the callback.
240  *
241  * ### Prompt cancellation guarantee
242  *
243  * This function provides **prompt cancellation guarantee**.
244  * If the [Job] of the current coroutine was cancelled while this function was suspended it will not resume
245  * successfully.
246  *
247  * The cancellation of the coroutine's job is generally asynchronous with respect to the suspended coroutine.
248  * The suspended coroutine is resumed with the call it to its [Continuation.resumeWith] member function or to
249  * [resume][Continuation.resume] extension function.
250  * However, when coroutine is resumed, it does not immediately start executing, but is passed to its
251  * [CoroutineDispatcher] to schedule its execution when dispatcher's resources become available for execution.
252  * The job's cancellation can happen both before, after, and concurrently with the call to `resume`. In any
253  * case, prompt cancellation guarantees that the the coroutine will not resume its code successfully.
254  *
255  * If the coroutine was resumed with an exception (for example, using [Continuation.resumeWithException] extension
256  * function) and cancelled, then the resulting exception of the `suspendCancellableCoroutine` function is determined
257  * by whichever action (exceptional resume or cancellation) that happened first.
258  *
259  * ### Returning resources from a suspended coroutine
260  *
261  * As a result of a prompt cancellation guarantee, when a closeable resource
262  * (like open file or a handle to another native resource) is returned from a suspended coroutine as a value
263  * it can be lost when the coroutine is cancelled. In order to ensure that the resource can be properly closed
264  * in this case, the [CancellableContinuation] interface provides two functions.
265  *
266  * * [invokeOnCancellation][CancellableContinuation.invokeOnCancellation] installs a handler that is called
267  *   whenever a suspend coroutine is being cancelled. In addition to the example at the beginning, it can be
268  *   used to ensure that a resource that was opened before the call to
269  *   `suspendCancellableCoroutine` or in its body is closed in case of cancellation.
270  *
271  * ```
272  * suspendCancellableCoroutine { continuation ->
273  *    val resource = openResource() // Opens some resource
274  *    continuation.invokeOnCancellation {
275  *        resource.close() // Ensures the resource is closed on cancellation
276  *    }
277  *    // ...
278  * }
279  * ```
280  *
281  * * [resume(value) { ... }][CancellableContinuation.resume] method on a [CancellableContinuation] takes
282  *   an optional `onCancellation` block. It can be used when resuming with a resource that must be closed by
283  *   the code that called the corresponding suspending function.
284  *
285  * ```
286  * suspendCancellableCoroutine { continuation ->
287  *     val callback = object : Callback { // Implementation of some callback interface
288  *         // A callback provides a reference to some closeable resource
289  *         override fun onCompleted(resource: T) {
290  *             // Resume coroutine with a value provided by the callback and ensure the resource is closed in case
291  *             // when the coroutine is cancelled before the caller gets a reference to the resource.
292  *             continuation.resume(resource) {
293  *                 resource.close() // Close the resource on cancellation
294  *             }
295  *         }
296  *     // ...
297  * }
298  * ```
299  *
300  * ### Implementation details and custom continuation interceptors
301  *
302  * The prompt cancellation guarantee is the result of a coordinated implementation inside `suspendCancellableCoroutine`
303  * function and the [CoroutineDispatcher] class. The coroutine dispatcher checks for the status of the [Job] immediately
304  * before continuing its normal execution and aborts this normal execution, calling all the corresponding
305  * cancellation handlers, if the job was cancelled.
306  *
307  * If a custom implementation of [ContinuationInterceptor] is used in a coroutine's context that does not extend
308  * [CoroutineDispatcher] class, then there is no prompt cancellation guarantee. A custom continuation interceptor
309  * can resume execution of a previously suspended coroutine even if its job was already cancelled.
310  */
suspendCancellableCoroutinenull311 public suspend inline fun <T> suspendCancellableCoroutine(
312     crossinline block: (CancellableContinuation<T>) -> Unit
313 ): T =
314     suspendCoroutineUninterceptedOrReturn { uCont ->
315         val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
316         /*
317          * For non-atomic cancellation we setup parent-child relationship immediately
318          * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
319          * properly supports cancellation.
320          */
321         cancellable.initCancellability()
322         block(cancellable)
323         cancellable.getResult()
324     }
325 
326 /**
327  * Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
328  * [CancellableContinuationImpl] is reused.
329  */
suspendCancellableCoroutineReusablenull330 internal suspend inline fun <T> suspendCancellableCoroutineReusable(
331     crossinline block: (CancellableContinuation<T>) -> Unit
332 ): T = suspendCoroutineUninterceptedOrReturn { uCont ->
333     val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
334     block(cancellable)
335     cancellable.getResult()
336 }
337 
getOrCreateCancellableContinuationnull338 internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
339     // If used outside of our dispatcher
340     if (delegate !is DispatchedContinuation<T>) {
341         return CancellableContinuationImpl(delegate, MODE_CANCELLABLE)
342     }
343     /*
344      * Attempt to claim reusable instance.
345      *
346      * suspendCancellableCoroutineReusable { // <- claimed
347      *     // Any asynchronous cancellation is "postponed" while this block
348      *     // is being executed
349      * } // postponed cancellation is checked here.
350      *
351      * Claim can fail for the following reasons:
352      * 1) Someone tried to make idempotent resume.
353      *    Idempotent resume is internal (used only by us) and is used only in `select`,
354      *    thus leaking CC instance for indefinite time.
355      * 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
356      */
357     return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetStateReusable() }
358         ?: return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
359 }
360 
361 /**
362  * Removes the specified [node] on cancellation. This function assumes that this node is already
363  * removed on successful resume and does not try to remove it if the continuation is cancelled during dispatch.
364  */
removeOnCancellationnull365 internal fun CancellableContinuation<*>.removeOnCancellation(node: LockFreeLinkedListNode) =
366     invokeOnCancellation(handler = RemoveOnCancel(node).asHandler)
367 
368 /**
369  * Disposes the specified [handle] when this continuation is cancelled.
370  *
371  * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created):
372  * ```
373  * invokeOnCancellation { handle.dispose() }
374  * ```
375  *
376  * @suppress **This an internal API and should not be used from general code.**
377  */
378 @InternalCoroutinesApi
379 public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle): Unit =
380     invokeOnCancellation(handler = DisposeOnCancel(handle).asHandler)
381 
382 // --------------- implementation details ---------------
383 
384 private class RemoveOnCancel(private val node: LockFreeLinkedListNode) : BeforeResumeCancelHandler() {
385     override fun invoke(cause: Throwable?) { node.remove() }
386     override fun toString() = "RemoveOnCancel[$node]"
387 }
388 
389 private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHandler() {
invokenull390     override fun invoke(cause: Throwable?) = handle.dispose()
391     override fun toString(): String = "DisposeOnCancel[$handle]"
392 }
393