1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 import kotlin.coroutines.intrinsics.*
10
11 // --------------- cancellable continuations ---------------
12
13 /**
14 * Cancellable continuation. It is _completed_ when resumed or cancelled.
15 * When the [cancel] function is explicitly invoked, this continuation immediately resumes with a [CancellationException] or
16 * the specified cancel cause.
17 *
18 * An instance of `CancellableContinuation` is created by the [suspendCancellableCoroutine] function.
19 *
20 * Cancellable continuation has three states (as subset of [Job] states):
21 *
22 * | **State** | [isActive] | [isCompleted] | [isCancelled] |
23 * | ----------------------------------- | ---------- | ------------- | ------------- |
24 * | _Active_ (initial state) | `true` | `false` | `false` |
25 * | _Resumed_ (final _completed_ state) | `false` | `true` | `false` |
26 * | _Canceled_ (final _completed_ state)| `false` | `true` | `true` |
27 *
28 * Invocation of [cancel] transitions this continuation from _active_ to _cancelled_ state, while
29 * invocation of [Continuation.resume] or [Continuation.resumeWithException] transitions it from _active_ to _resumed_ state.
30 *
31 * A [cancelled][isCancelled] continuation implies that it is [completed][isCompleted].
32 *
33 * Invocation of [Continuation.resume] or [Continuation.resumeWithException] in _resumed_ state produces an [IllegalStateException],
34 * but is ignored in _cancelled_ state.
35 *
36 * ```
37 * +-----------+ resume +---------+
38 * | Active | ----------> | Resumed |
39 * +-----------+ +---------+
40 * |
41 * | cancel
42 * V
43 * +-----------+
44 * | Cancelled |
45 * +-----------+
46 * ```
47 */
48 public interface CancellableContinuation<in T> : Continuation<T> {
49 /**
50 * Returns `true` when this continuation is active -- it has not completed or cancelled yet.
51 */
52 public val isActive: Boolean
53
54 /**
55 * Returns `true` when this continuation has completed for any reason. A cancelled continuation
56 * is also considered complete.
57 */
58 public val isCompleted: Boolean
59
60 /**
61 * Returns `true` if this continuation was [cancelled][cancel].
62 *
63 * It implies that [isActive] is `false` and [isCompleted] is `true`.
64 */
65 public val isCancelled: Boolean
66
67 /**
68 * Tries to resume this continuation with the specified [value] and returns a non-null object token if successful,
69 * or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
70 * [completeResume] must be invoked with it.
71 *
72 * When [idempotent] is not `null`, this function performs an _idempotent_ operation, so that
73 * further invocations with the same non-null reference produce the same result.
74 *
75 * @suppress **This is unstable API and it is subject to change.**
76 */
77 @InternalCoroutinesApi
78 public fun tryResume(value: T, idempotent: Any? = null): Any?
79
80 /**
81 * Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
82 * delivered to the caller because of the dispatch in the process, so that atomicity delivery
83 * guaranteed can be provided by having a cancellation fallback.
84 */
85 @InternalCoroutinesApi
86 public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?
87
88 /**
89 * Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
90 * or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
91 * [completeResume] must be invoked with it.
92 *
93 * @suppress **This is unstable API and it is subject to change.**
94 */
95 @InternalCoroutinesApi
96 public fun tryResumeWithException(exception: Throwable): Any?
97
98 /**
99 * Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
100 *
101 * @suppress **This is unstable API and it is subject to change.**
102 */
103 @InternalCoroutinesApi
104 public fun completeResume(token: Any)
105
106 /**
107 * Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0.
108 * This function does nothing and is left only for binary compatibility with old compiled code.
109 *
110 * @suppress **This is unstable API and it is subject to change.**
111 */
112 @InternalCoroutinesApi
113 public fun initCancellability()
114
115 /**
116 * Cancels this continuation with an optional cancellation `cause`. The result is `true` if this continuation was
117 * cancelled as a result of this invocation, and `false` otherwise.
118 */
119 public fun cancel(cause: Throwable? = null): Boolean
120
121 /**
122 * Registers a [handler] to be **synchronously** invoked on [cancellation][cancel] (regular or exceptional) of this continuation.
123 * When the continuation is already cancelled, the handler is immediately invoked
124 * with the cancellation exception. Otherwise, the handler will be invoked as soon as this
125 * continuation is cancelled.
126 *
127 * The installed [handler] should not throw any exceptions.
128 * If it does, they will get caught, wrapped into a [CompletionHandlerException] and
129 * processed as an uncaught exception in the context of the current coroutine
130 * (see [CoroutineExceptionHandler]).
131 *
132 * At most one [handler] can be installed on a continuation. Attempt to call `invokeOnCancellation` second
133 * time produces [IllegalStateException].
134 *
135 * This handler is also called when this continuation [resumes][Continuation.resume] normally (with a value) and then
136 * is cancelled while waiting to be dispatched. More generally speaking, this handler is called whenever
137 * the caller of [suspendCancellableCoroutine] is getting a [CancellationException].
138 *
139 * A typical example for `invokeOnCancellation` usage is given in
140 * the documentation for the [suspendCancellableCoroutine] function.
141 *
142 * **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
143 * This `handler` can be invoked concurrently with the surrounding code.
144 * There is no guarantee on the execution context in which the `handler` will be invoked.
145 */
146 public fun invokeOnCancellation(handler: CompletionHandler)
147
148 /**
149 * Resumes this continuation with the specified [value] in the invoker thread without going through
150 * the [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
151 * This function is designed to only be used by [CoroutineDispatcher] implementations.
152 * **It should not be used in general code**.
153 *
154 * **Note: This function is experimental.** Its signature general code may be changed in the future.
155 */
156 @ExperimentalCoroutinesApi
157 public fun CoroutineDispatcher.resumeUndispatched(value: T)
158
159 /**
160 * Resumes this continuation with the specified [exception] in the invoker thread without going through
161 * the [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
162 * This function is designed to only be used by [CoroutineDispatcher] implementations.
163 * **It should not be used in general code**.
164 *
165 * **Note: This function is experimental.** Its signature general code may be changed in the future.
166 */
167 @ExperimentalCoroutinesApi
168 public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
169
170 /**
171 * Resumes this continuation with the specified `value` and calls the specified `onCancellation`
172 * handler when either resumed too late (when continuation was already cancelled) or, although resumed
173 * successfully (before cancellation), the coroutine's job was cancelled before it had a
174 * chance to run in its dispatcher, so that the suspended function threw an exception
175 * instead of returning this value.
176 *
177 * The installed [onCancellation] handler should not throw any exceptions.
178 * If it does, they will get caught, wrapped into a [CompletionHandlerException] and
179 * processed as an uncaught exception in the context of the current coroutine
180 * (see [CoroutineExceptionHandler]).
181 *
182 * This function shall be used when resuming with a resource that must be closed by
183 * code that called the corresponding suspending function, for example:
184 *
185 * ```
186 * continuation.resume(resource) {
187 * resource.close()
188 * }
189 * ```
190 *
191 * A more complete example and further details are given in
192 * the documentation for the [suspendCancellableCoroutine] function.
193 *
194 * **Note**: The [onCancellation] handler must be fast, non-blocking, and thread-safe.
195 * It can be invoked concurrently with the surrounding code.
196 * There is no guarantee on the execution context of its invocation.
197 */
198 @ExperimentalCoroutinesApi // since 1.2.0
199 public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
200 }
201
202 /**
203 * Suspends the coroutine like [suspendCoroutine], but providing a [CancellableContinuation] to
204 * the [block]. This function throws a [CancellationException] if the [Job] of the coroutine is
205 * cancelled or completed while it is suspended.
206 *
207 * A typical use of this function is to suspend a coroutine while waiting for a result
208 * from a single-shot callback API and to return the result to the caller.
209 * For multi-shot callback APIs see [callbackFlow][kotlinx.coroutines.flow.callbackFlow].
210 *
211 * ```
212 * suspend fun awaitCallback(): T = suspendCancellableCoroutine { continuation ->
213 * val callback = object : Callback { // Implementation of some callback interface
214 * override fun onCompleted(value: T) {
215 * // Resume coroutine with a value provided by the callback
216 * continuation.resume(value)
217 * }
218 * override fun onApiError(cause: Throwable) {
219 * // Resume coroutine with an exception provided by the callback
220 * continuation.resumeWithException(cause)
221 * }
222 * }
223 * // Register callback with an API
224 * api.register(callback)
225 * // Remove callback on cancellation
226 * continuation.invokeOnCancellation { api.unregister(callback) }
227 * // At this point the coroutine is suspended by suspendCancellableCoroutine until callback fires
228 * }
229 * ```
230 *
231 * > The callback `register`/`unregister` methods provided by an external API must be thread-safe, because
232 * > `invokeOnCancellation` block can be called at any time due to asynchronous nature of cancellation, even
233 * > concurrently with the call of the callback.
234 *
235 * ### Prompt cancellation guarantee
236 *
237 * This function provides **prompt cancellation guarantee**.
238 * If the [Job] of the current coroutine was cancelled while this function was suspended it will not resume
239 * successfully.
240 *
241 * The cancellation of the coroutine's job is generally asynchronous with respect to the suspended coroutine.
242 * The suspended coroutine is resumed with the call it to its [Continuation.resumeWith] member function or to
243 * [resume][Continuation.resume] extension function.
244 * However, when coroutine is resumed, it does not immediately start executing, but is passed to its
245 * [CoroutineDispatcher] to schedule its execution when dispatcher's resources become available for execution.
246 * The job's cancellation can happen both before, after, and concurrently with the call to `resume`. In any
247 * case, prompt cancellation guarantees that the the coroutine will not resume its code successfully.
248 *
249 * If the coroutine was resumed with an exception (for example, using [Continuation.resumeWithException] extension
250 * function) and cancelled, then the resulting exception of the `suspendCancellableCoroutine` function is determined
251 * by whichever action (exceptional resume or cancellation) that happened first.
252 *
253 * ### Returning resources from a suspended coroutine
254 *
255 * As a result of a prompt cancellation guarantee, when a closeable resource
256 * (like open file or a handle to another native resource) is returned from a suspended coroutine as a value
257 * it can be lost when the coroutine is cancelled. In order to ensure that the resource can be properly closed
258 * in this case, the [CancellableContinuation] interface provides two functions.
259 *
260 * * [invokeOnCancellation][CancellableContinuation.invokeOnCancellation] installs a handler that is called
261 * whenever a suspend coroutine is being cancelled. In addition to the example at the beginning, it can be
262 * used to ensure that a resource that was opened before the call to
263 * `suspendCancellableCoroutine` or in its body is closed in case of cancellation.
264 *
265 * ```
266 * suspendCancellableCoroutine { continuation ->
267 * val resource = openResource() // Opens some resource
268 * continuation.invokeOnCancellation {
269 * resource.close() // Ensures the resource is closed on cancellation
270 * }
271 * // ...
272 * }
273 * ```
274 *
275 * * [resume(value) { ... }][CancellableContinuation.resume] method on a [CancellableContinuation] takes
276 * an optional `onCancellation` block. It can be used when resuming with a resource that must be closed by
277 * the code that called the corresponding suspending function.
278 *
279 * ```
280 * suspendCancellableCoroutine { continuation ->
281 * val callback = object : Callback { // Implementation of some callback interface
282 * // A callback provides a reference to some closeable resource
283 * override fun onCompleted(resource: T) {
284 * // Resume coroutine with a value provided by the callback and ensure the resource is closed in case
285 * // when the coroutine is cancelled before the caller gets a reference to the resource.
286 * continuation.resume(resource) {
287 * resource.close() // Close the resource on cancellation
288 * }
289 * }
290 * // ...
291 * }
292 * ```
293 *
294 * ### Implementation details and custom continuation interceptors
295 *
296 * The prompt cancellation guarantee is the result of a coordinated implementation inside `suspendCancellableCoroutine`
297 * function and the [CoroutineDispatcher] class. The coroutine dispatcher checks for the status of the [Job] immediately
298 * before continuing its normal execution and aborts this normal execution, calling all the corresponding
299 * cancellation handlers, if the job was cancelled.
300 *
301 * If a custom implementation of [ContinuationInterceptor] is used in a coroutine's context that does not extend
302 * [CoroutineDispatcher] class, then there is no prompt cancellation guarantee. A custom continuation interceptor
303 * can resume execution of a previously suspended coroutine even if its job was already cancelled.
304 */
suspendCancellableCoroutinenull305 public suspend inline fun <T> suspendCancellableCoroutine(
306 crossinline block: (CancellableContinuation<T>) -> Unit
307 ): T =
308 suspendCoroutineUninterceptedOrReturn { uCont ->
309 val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
310 /*
311 * For non-atomic cancellation we setup parent-child relationship immediately
312 * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
313 * properly supports cancellation.
314 */
315 cancellable.initCancellability()
316 block(cancellable)
317 cancellable.getResult()
318 }
319
320 /**
321 * Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
322 * [CancellableContinuationImpl] is reused.
323 */
suspendCancellableCoroutineReusablenull324 internal suspend inline fun <T> suspendCancellableCoroutineReusable(
325 crossinline block: (CancellableContinuation<T>) -> Unit
326 ): T = suspendCoroutineUninterceptedOrReturn { uCont ->
327 val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
328 block(cancellable)
329 cancellable.getResult()
330 }
331
getOrCreateCancellableContinuationnull332 internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
333 // If used outside of our dispatcher
334 if (delegate !is DispatchedContinuation<T>) {
335 return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
336 }
337 /*
338 * Attempt to claim reusable instance.
339 *
340 * suspendCancellableCoroutineReusable { // <- claimed
341 * // Any asynchronous cancellation is "postponed" while this block
342 * // is being executed
343 * } // postponed cancellation is checked here.
344 *
345 * Claim can fail for the following reasons:
346 * 1) Someone tried to make idempotent resume.
347 * Idempotent resume is internal (used only by us) and is used only in `select`,
348 * thus leaking CC instance for indefinite time.
349 * 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
350 */
351 return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetStateReusable() }
352 ?: return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
353 }
354
355 /**
356 * Removes the specified [node] on cancellation. This function assumes that this node is already
357 * removed on successful resume and does not try to remove it if the continuation is cancelled during dispatch.
358 */
removeOnCancellationnull359 internal fun CancellableContinuation<*>.removeOnCancellation(node: LockFreeLinkedListNode) =
360 invokeOnCancellation(handler = RemoveOnCancel(node).asHandler)
361
362 /**
363 * Disposes the specified [handle] when this continuation is cancelled.
364 *
365 * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created):
366 * ```
367 * invokeOnCancellation { handle.dispose() }
368 * ```
369 *
370 * @suppress **This an internal API and should not be used from general code.**
371 */
372 @InternalCoroutinesApi
373 public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle): Unit =
374 invokeOnCancellation(handler = DisposeOnCancel(handle).asHandler)
375
376 // --------------- implementation details ---------------
377
378 private class RemoveOnCancel(private val node: LockFreeLinkedListNode) : BeforeResumeCancelHandler() {
379 override fun invoke(cause: Throwable?) { node.remove() }
380 override fun toString() = "RemoveOnCancel[$node]"
381 }
382
383 private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHandler() {
invokenull384 override fun invoke(cause: Throwable?) = handle.dispose()
385 override fun toString(): String = "DisposeOnCancel[$handle]"
386 }
387