<lambda>null1 package kotlinx.coroutines
2
3 import kotlinx.coroutines.internal.*
4 import kotlin.coroutines.*
5 import kotlin.coroutines.intrinsics.*
6
7 /**
8 * Cancellable [continuation][Continuation] is a thread-safe continuation primitive with the support of
9 * an asynchronous cancellation.
10 *
11 * Cancellable continuation can be [resumed][Continuation.resumeWith], but unlike regular [Continuation],
12 * it also might be [cancelled][CancellableContinuation.cancel] explicitly or [implicitly][Job.cancel] via a parent [job][Job].
13 *
14 * If the continuation is cancelled successfully, it resumes with a [CancellationException] or
15 * the specified cancel cause.
16 *
17 * ### Usage
18 *
19 * An instance of `CancellableContinuation` can only be obtained by the [suspendCancellableCoroutine] function.
20 * The interface itself is public for use and private for implementation.
21 *
22 * A typical usages of this function is to suspend a coroutine while waiting for a result
23 * from a callback or an external source of values that optionally supports cancellation:
24 *
25 * ```
26 * suspend fun <T> CompletableFuture<T>.await(): T = suspendCancellableCoroutine { c ->
27 * val future = this
28 * future.whenComplete { result, throwable ->
29 * if (throwable != null) {
30 * // Resume continuation with an exception if an external source failed
31 * c.resumeWithException(throwable)
32 * } else {
33 * // Resume continuation with a value if it was computed
34 * c.resume(result)
35 * }
36 * }
37 * // Cancel the computation if the continuation itself was cancelled because a caller of 'await' is cancelled
38 * c.invokeOnCancellation { future.cancel(true) }
39 * }
40 * ```
41 *
42 * ### Thread-safety
43 *
44 * Instances of [CancellableContinuation] are thread-safe and can be safely shared across multiple threads.
45 * [CancellableContinuation] allows concurrent invocations of the [cancel] and [resume] pair, guaranteeing
46 * that only one of these operations will succeed.
47 * Concurrent invocations of [resume] methods lead to a [IllegalStateException] and are considered a programmatic error.
48 * Concurrent invocations of [cancel] methods is permitted, and at most one of them succeeds.
49 *
50 * ### Prompt cancellation guarantee
51 *
52 * A cancellable continuation provides a **prompt cancellation guarantee**.
53 *
54 * If the [Job] of the coroutine that obtained a cancellable continuation was cancelled while this continuation was suspended it will not resume
55 * successfully, even if [CancellableContinuation.resume] was already invoked but not yet executed.
56 *
57 * The cancellation of the coroutine's job is generally asynchronous with respect to the suspended coroutine.
58 * The suspended coroutine is resumed with a call to its [Continuation.resumeWith] member function or to the
59 * [resume][Continuation.resume] extension function.
60 * However, when the coroutine is resumed, it does not immediately start executing but is passed to its
61 * [CoroutineDispatcher] to schedule its execution when the dispatcher's resources become available for execution.
62 * The job's cancellation can happen before, after, and concurrently with the call to `resume`. In any
63 * case, prompt cancellation guarantees that the coroutine will not resume its code successfully.
64 *
65 * If the coroutine was resumed with an exception (for example, using the [Continuation.resumeWithException] extension
66 * function) and cancelled, then the exception thrown by the `suspendCancellableCoroutine` function is determined
67 * by what happened first: exceptional resume or cancellation.
68 *
69 * ### Resuming with a closeable resource
70 *
71 * [CancellableContinuation] provides the capability to work with values that represent a resource that should be
72 * closed. For that, it provides `resume(value: R, onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)`
73 * function that guarantees that either the given `value` will be successfully returned from the corresponding
74 * `suspend` function or that `onCancellation` will be invoked with the supplied value:
75 *
76 * ```
77 * continuation.resume(resourceToResumeWith) { _, resourceToClose, _
78 * // Will be invoked if the continuation is cancelled while being dispatched
79 * resourceToClose.close()
80 * }
81 * ```
82 *
83 * #### Continuation states
84 *
85 * A cancellable continuation has three observable states:
86 *
87 * | **State** | [isActive] | [isCompleted] | [isCancelled] |
88 * | ----------------------------------- | ---------- | ------------- | ------------- |
89 * | _Active_ (initial state) | `true` | `false` | `false` |
90 * | _Resumed_ (final _completed_ state) | `false` | `true` | `false` |
91 * | _Canceled_ (final _completed_ state)| `false` | `true` | `true` |
92 *
93 * For a detailed description of each state, see the corresponding properties' documentation.
94 *
95 * A successful invocation of [cancel] transitions the continuation from an _active_ to a _cancelled_ state, while
96 * an invocation of [Continuation.resume] or [Continuation.resumeWithException] transitions it from
97 * an _active_ to _resumed_ state.
98 *
99 * Possible state transitions diagram:
100 * ```
101 * +-----------+ resume +---------+
102 * | Active | ----------> | Resumed |
103 * +-----------+ +---------+
104 * |
105 * | cancel
106 * V
107 * +-----------+
108 * | Cancelled |
109 * +-----------+
110 * ```
111 */
112 @OptIn(ExperimentalSubclassOptIn::class)
113 @SubclassOptInRequired(InternalForInheritanceCoroutinesApi::class)
114 public interface CancellableContinuation<in T> : Continuation<T> {
115 /**
116 * Returns `true` when this continuation is active -- it was created,
117 * but not yet [resumed][Continuation.resumeWith] or [cancelled][CancellableContinuation.cancel].
118 *
119 * This state implies that [isCompleted] and [isCancelled] are `false`,
120 * but this can change immediately after the invocation because of parallel calls to [cancel] and [resume].
121 */
122 public val isActive: Boolean
123
124 /**
125 * Returns `true` when this continuation was completed -- [resumed][Continuation.resumeWith] or
126 * [cancelled][CancellableContinuation.cancel].
127 *
128 * This state implies that [isActive] is `false`.
129 */
130 public val isCompleted: Boolean
131
132 /**
133 * Returns `true` if this continuation was [cancelled][CancellableContinuation.cancel].
134 *
135 * It implies that [isActive] is `false` and [isCompleted] is `true`.
136 */
137 public val isCancelled: Boolean
138
139 /**
140 * Tries to resume this continuation with the specified [value] and returns a non-null object token if successful,
141 * or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
142 * [completeResume] must be invoked with it.
143 *
144 * When [idempotent] is not `null`, this function performs an _idempotent_ operation, so that
145 * further invocations with the same non-null reference produce the same result.
146 *
147 * @suppress **This is unstable API and it is subject to change.**
148 */
149 @InternalCoroutinesApi
150 public fun tryResume(value: T, idempotent: Any? = null): Any?
151
152 /**
153 * Same as [tryResume] but with an [onCancellation] handler that is called if and only if the value is not
154 * delivered to the caller because of the dispatch in the process.
155 *
156 * The purpose of this function is to enable atomic delivery guarantees: either resumption succeeded, passing
157 * the responsibility for [value] to the continuation, or the [onCancellation] block will be invoked,
158 * allowing one to free the resources in [value].
159 *
160 * Implementation note: current implementation always returns RESUME_TOKEN or `null`
161 *
162 * @suppress **This is unstable API and it is subject to change.**
163 */
164 @InternalCoroutinesApi
165 public fun <R: T> tryResume(
166 value: R, idempotent: Any?, onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
167 ): Any?
168
169 /**
170 * Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
171 * or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
172 * [completeResume] must be invoked with it.
173 *
174 * @suppress **This is unstable API and it is subject to change.**
175 */
176 @InternalCoroutinesApi
177 public fun tryResumeWithException(exception: Throwable): Any?
178
179 /**
180 * Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
181 *
182 * @suppress **This is unstable API and it is subject to change.**
183 */
184 @InternalCoroutinesApi
185 public fun completeResume(token: Any)
186
187 /**
188 * Internal function that setups cancellation behavior in [suspendCancellableCoroutine].
189 * It's illegal to call this function in any non-`kotlinx.coroutines` code and
190 * such calls lead to undefined behaviour.
191 * Exposed in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body.
192 *
193 * @suppress **This is unstable API and it is subject to change.**
194 */
195 @InternalCoroutinesApi
196 public fun initCancellability()
197
198 /**
199 * Cancels this continuation with an optional cancellation `cause`. The result is `true` if this continuation was
200 * cancelled as a result of this invocation, and `false` otherwise.
201 * [cancel] might return `false` when the continuation was either [resumed][resume] or already [cancelled][cancel].
202 */
203 public fun cancel(cause: Throwable? = null): Boolean
204
205 /**
206 * Registers a [handler] to be **synchronously** invoked on [cancellation][cancel] (regular or exceptional) of this continuation.
207 * When the continuation is already cancelled, the handler is immediately invoked with the cancellation exception.
208 * Otherwise, the handler will be invoked as soon as this continuation is cancelled.
209 *
210 * The installed [handler] should not throw any exceptions.
211 * If it does, they will get caught, wrapped into a `CompletionHandlerException` and
212 * processed as an uncaught exception in the context of the current coroutine
213 * (see [CoroutineExceptionHandler]).
214 *
215 * At most one [handler] can be installed on a continuation.
216 * Attempting to call `invokeOnCancellation` a second time produces an [IllegalStateException].
217 *
218 * This handler is also called when this continuation [resumes][Continuation.resume] normally (with a value) and then
219 * is cancelled while waiting to be dispatched. More generally speaking, this handler is called whenever
220 * the caller of [suspendCancellableCoroutine] is getting a [CancellationException].
221 *
222 * A typical example of `invokeOnCancellation` usage is given in
223 * the documentation for the [suspendCancellableCoroutine] function.
224 *
225 * **Note**: Implementations of [CompletionHandler] must be fast, non-blocking, and thread-safe.
226 * This [handler] can be invoked concurrently with the surrounding code.
227 * There is no guarantee on the execution context in which the [handler] will be invoked.
228 */
229 public fun invokeOnCancellation(handler: CompletionHandler)
230
231 /**
232 * Resumes this continuation with the specified [value] in the invoker thread without going through
233 * the [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
234 * This function is designed to only be used by [CoroutineDispatcher] implementations.
235 * **It should not be used in general code**.
236 *
237 * **Note: This function is experimental.** Its signature general code may be changed in the future.
238 */
239 @ExperimentalCoroutinesApi
240 public fun CoroutineDispatcher.resumeUndispatched(value: T)
241
242 /**
243 * Resumes this continuation with the specified [exception] in the invoker thread without going through
244 * the [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
245 * This function is designed to only be used by [CoroutineDispatcher] implementations.
246 * **It should not be used in general code**.
247 *
248 * **Note: This function is experimental.** Its signature general code may be changed in the future.
249 */
250 @ExperimentalCoroutinesApi
251 public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
252
253 /** @suppress */
254 @Deprecated(
255 "Use the overload that also accepts the `value` and the coroutine context in lambda",
256 level = DeprecationLevel.WARNING,
257 replaceWith = ReplaceWith("resume(value) { cause, _, _ -> onCancellation(cause) }")
258 ) // warning since 1.9.0, was experimental
259 public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
260
261 /**
262 * Resumes this continuation with the specified [value], calling the specified [onCancellation] if and only if
263 * the [value] was not successfully used to resume the continuation.
264 *
265 * The [value] can be rejected in two cases (in both of which [onCancellation] will be called):
266 * - Cancellation happened before the handler was resumed;
267 * - The continuation was resumed successfully (before cancellation), but the coroutine's job was cancelled before
268 * it had a chance to run in its dispatcher, and so the suspended function threw an exception instead of returning
269 * this value.
270 *
271 * The installed [onCancellation] handler should not throw any exceptions.
272 * If it does, they will get caught, wrapped into a `CompletionHandlerException`, and
273 * processed as an uncaught exception in the context of the current coroutine
274 * (see [CoroutineExceptionHandler]).
275 *
276 * With this version of [resume], it's possible to pass resources that can not simply be left for the garbage
277 * collector (like file handles, sockets, etc.) and need to be closed explicitly:
278 *
279 * ```
280 * continuation.resume(resourceToResumeWith) { _, resourceToClose, _ ->
281 * resourceToClose.close()
282 * }
283 * ```
284 *
285 * [onCancellation] accepts three arguments:
286 *
287 * - `cause: Throwable` is the exception with which the continuation was cancelled.
288 * - `value` is exactly the same as the [value] passed to [resume] itself.
289 * In the example above, `resourceToResumeWith` is exactly the same as `resourceToClose`; in particular,
290 * one could call `resourceToResumeWith.close()` in the lambda for the same effect.
291 * The reason to reference `resourceToClose` anyway is to avoid a memory allocation due to the lambda
292 * capturing the `resourceToResumeWith` reference.
293 * - `context` is the [context] of this continuation.
294 * Like with `value`, the reason this is available as a lambda parameter, even though it is always possible to
295 * call [context] from the lambda instead, is to allow lambdas to capture less of their environment.
296 *
297 * A more complete example and further details are given in
298 * the documentation for the [suspendCancellableCoroutine] function.
299 *
300 * **Note**: The [onCancellation] handler must be fast, non-blocking, and thread-safe.
301 * It can be invoked concurrently with the surrounding code.
302 * There is no guarantee on the execution context of its invocation.
303 */
304 public fun <R: T> resume(
305 value: R, onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
306 )
307 }
308
309 /**
310 * A version of `invokeOnCancellation` that accepts a class as a handler instead of a lambda, but identical otherwise.
311 * This allows providing a custom [toString] instance that will look better during debugging.
312 */
invokeOnCancellationnull313 internal fun <T> CancellableContinuation<T>.invokeOnCancellation(handler: CancelHandler) = when (this) {
314 is CancellableContinuationImpl -> invokeOnCancellationInternal(handler)
315 else -> throw UnsupportedOperationException("third-party implementation of CancellableContinuation is not supported")
316 }
317
318 /**
319 * Suspends the coroutine like [suspendCoroutine], but providing a [CancellableContinuation] to
320 * the [block]. This function throws a [CancellationException] if the [Job] of the coroutine is
321 * cancelled or completed while it is suspended, or if [CancellableContinuation.cancel] is invoked.
322 *
323 * A typical use of this function is to suspend a coroutine while waiting for a result
324 * from a single-shot callback API and to return the result to the caller.
325 * For multi-shot callback APIs see [callbackFlow][kotlinx.coroutines.flow.callbackFlow].
326 *
327 * ```
328 * suspend fun awaitCallback(): T = suspendCancellableCoroutine { continuation ->
329 * val callback = object : Callback { // Implementation of some callback interface
330 * override fun onCompleted(value: T) {
331 * // Resume coroutine with a value provided by the callback
332 * continuation.resume(value)
333 * }
334 * override fun onApiError(cause: Throwable) {
335 * // Resume coroutine with an exception provided by the callback
336 * continuation.resumeWithException(cause)
337 * }
338 * }
339 * // Register callback with an API
340 * api.register(callback)
341 * // Remove callback on cancellation
342 * continuation.invokeOnCancellation { api.unregister(callback) }
343 * // At this point the coroutine is suspended by suspendCancellableCoroutine until callback fires
344 * }
345 * ```
346 *
347 * > The callback `register`/`unregister` methods provided by an external API must be thread-safe, because
348 * > `invokeOnCancellation` block can be called at any time due to asynchronous nature of cancellation, even
349 * > concurrently with the call of the callback.
350 *
351 * ### Prompt cancellation guarantee
352 *
353 * This function provides **prompt cancellation guarantee**.
354 * If the [Job] of the current coroutine was cancelled while this function was suspended it will not resume
355 * successfully, even if [CancellableContinuation.resume] was already invoked.
356 *
357 * The cancellation of the coroutine's job is generally asynchronous with respect to the suspended coroutine.
358 * The suspended coroutine is resumed with a call to its [Continuation.resumeWith] member function or to the
359 * [resume][Continuation.resume] extension function.
360 * However, when coroutine is resumed, it does not immediately start executing, but is passed to its
361 * [CoroutineDispatcher] to schedule its execution when dispatcher's resources become available for execution.
362 * The job's cancellation can happen before, after, and concurrently with the call to `resume`. In any
363 * case, prompt cancellation guarantees that the coroutine will not resume its code successfully.
364 *
365 * If the coroutine was resumed with an exception (for example, using [Continuation.resumeWithException] extension
366 * function) and cancelled, then the exception thrown by the `suspendCancellableCoroutine` function is determined
367 * by what happened first: exceptional resume or cancellation.
368 *
369 * ### Returning resources from a suspended coroutine
370 *
371 * As a result of the prompt cancellation guarantee, when a closeable resource
372 * (like open file or a handle to another native resource) is returned from a suspended coroutine as a value,
373 * it can be lost when the coroutine is cancelled. To ensure that the resource can be properly closed
374 * in this case, the [CancellableContinuation] interface provides two functions.
375 *
376 * - [invokeOnCancellation][CancellableContinuation.invokeOnCancellation] installs a handler that is called
377 * whenever a suspend coroutine is being cancelled. In addition to the example at the beginning, it can be
378 * used to ensure that a resource that was opened before the call to
379 * `suspendCancellableCoroutine` or in its body is closed in case of cancellation.
380 *
381 * ```
382 * suspendCancellableCoroutine { continuation ->
383 * val resource = openResource() // Opens some resource
384 * continuation.invokeOnCancellation {
385 * resource.close() // Ensures the resource is closed on cancellation
386 * }
387 * // ...
388 * }
389 * ```
390 *
391 * - [resume(value) { ... }][CancellableContinuation.resume] method on a [CancellableContinuation] takes
392 * an optional `onCancellation` block. It can be used when resuming with a resource that must be closed by
393 * the code that called the corresponding suspending function.
394 *
395 * ```
396 * suspendCancellableCoroutine { continuation ->
397 * val callback = object : Callback { // Implementation of some callback interface
398 * // A callback provides a reference to some closeable resource
399 * override fun onCompleted(resource: T) {
400 * // Resume coroutine with a value provided by the callback and ensure the resource is closed in case
401 * // when the coroutine is cancelled before the caller gets a reference to the resource.
402 * continuation.resume(resource) { cause, resourceToClose, context ->
403 * resourceToClose.close() // Close the resource on cancellation
404 * // If we used `resource` instead of `resourceToClose`, this lambda would need to allocate a closure,
405 * // but with `resourceToClose`, the lambda does not capture any of its environment.
406 * }
407 * }
408 * // ...
409 * }
410 * ```
411 *
412 * ### Implementation details and custom continuation interceptors
413 *
414 * The prompt cancellation guarantee is the result of a coordinated implementation inside `suspendCancellableCoroutine`
415 * function and the [CoroutineDispatcher] class. The coroutine dispatcher checks for the status of the [Job] immediately
416 * before continuing its normal execution and aborts this normal execution, calling all the corresponding
417 * cancellation handlers, if the job was cancelled.
418 *
419 * If a custom implementation of [ContinuationInterceptor] is used in a coroutine's context that does not extend
420 * [CoroutineDispatcher] class, then there is no prompt cancellation guarantee. A custom continuation interceptor
421 * can resume execution of a previously suspended coroutine even if its job was already cancelled.
422 */
suspendCancellableCoroutinenull423 public suspend inline fun <T> suspendCancellableCoroutine(
424 crossinline block: (CancellableContinuation<T>) -> Unit
425 ): T =
426 suspendCoroutineUninterceptedOrReturn { uCont ->
427 val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
428 /*
429 * For non-atomic cancellation we setup parent-child relationship immediately
430 * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
431 * properly supports cancellation.
432 */
433 cancellable.initCancellability()
434 block(cancellable)
435 cancellable.getResult()
436 }
437
438 /**
439 * Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
440 * [CancellableContinuationImpl] is reused.
441 */
suspendCancellableCoroutineReusablenull442 internal suspend inline fun <T> suspendCancellableCoroutineReusable(
443 crossinline block: (CancellableContinuationImpl<T>) -> Unit
444 ): T = suspendCoroutineUninterceptedOrReturn { uCont ->
445 val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
446 try {
447 block(cancellable)
448 } catch (e: Throwable) {
449 // Here we catch any unexpected exception from user-supplied block (e.g. invariant violation)
450 // and release claimed continuation in order to leave it in a reasonable state (see #3613)
451 cancellable.releaseClaimedReusableContinuation()
452 throw e
453 }
454 cancellable.getResult()
455 }
456
getOrCreateCancellableContinuationnull457 internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
458 // If used outside our dispatcher
459 if (delegate !is DispatchedContinuation<T>) {
460 return CancellableContinuationImpl(delegate, MODE_CANCELLABLE)
461 }
462 /*
463 * Attempt to claim reusable instance.
464 *
465 * suspendCancellableCoroutineReusable { // <- claimed
466 * // Any asynchronous cancellation is "postponed" while this block
467 * // is being executed
468 * } // postponed cancellation is checked here.
469 *
470 * Claim can fail for the following reasons:
471 * 1) Someone tried to make idempotent resume.
472 * Idempotent resume is internal (used only by us) and is used only in `select`,
473 * thus leaking CC instance for indefinite time.
474 * 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
475 */
476 return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetStateReusable() }
477 ?: return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
478 }
479
480 /**
481 * Disposes the specified [handle] when this continuation is cancelled.
482 *
483 * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created):
484 * ```
485 * invokeOnCancellation { handle.dispose() }
486 * ```
487 *
488 * @suppress **This an internal API and should not be used from general code.**
489 */
490 @InternalCoroutinesApi
disposeOnCancellationnull491 public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle): Unit =
492 invokeOnCancellation(handler = DisposeOnCancel(handle))
493
494 private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHandler {
495 override fun invoke(cause: Throwable?) = handle.dispose()
496 override fun toString(): String = "DisposeOnCancel[$handle]"
497 }
498