• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.guava
2 
3 import com.google.common.util.concurrent.*
4 import com.google.common.util.concurrent.internal.*
5 import kotlinx.coroutines.*
6 import java.util.concurrent.*
7 import java.util.concurrent.CancellationException
8 import kotlin.coroutines.*
9 
10 /**
11  * Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
12  *
13  * The coroutine is started immediately. Passing [CoroutineStart.LAZY] to [start] throws
14  * [IllegalArgumentException], because Futures don't have a way to start lazily.
15  *
16  * When the created coroutine [isCompleted][Job.isCompleted], it will try to
17  * *synchronously* complete the returned Future with the same outcome. This will
18  * succeed, barring a race with external cancellation of returned [ListenableFuture].
19  *
20  * Cancellation is propagated bidirectionally.
21  *
22  * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be
23  * added/overlaid by passing [context].
24  *
25  * If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor]
26  * member, [Dispatchers.Default] is used.
27  *
28  * The parent job is inherited from this [CoroutineScope], and can be overridden by passing
29  * a [Job] in [context].
30  *
31  * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
32  * facilities.
33  *
34  * Note that the error and cancellation semantics of [future] are _different_ than [async]'s.
35  * In contrast to [Deferred], [Future] doesn't have an intermediate `Cancelling` state. If
36  * the returned `Future` is successfully cancelled, and `block` throws afterward, the thrown
37  * error is dropped, and getting the `Future`'s value will throw a `CancellationException` with
38  * no cause. This is to match the specification and behavior of
39  * `java.util.concurrent.FutureTask`.
40  *
41  * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
42  * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
43  * @param block the code to execute.
44  */
45 public fun <T> CoroutineScope.future(
46     context: CoroutineContext = EmptyCoroutineContext,
47     start: CoroutineStart = CoroutineStart.DEFAULT,
48     block: suspend CoroutineScope.() -> T
49 ): ListenableFuture<T> {
50     require(!start.isLazy) { "$start start is not supported" }
51     val newContext = newCoroutineContext(context)
52     val coroutine = ListenableFutureCoroutine<T>(newContext)
53     coroutine.start(start, coroutine, block)
54     return coroutine.future
55 }
56 
57 /**
58  * Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
59  *
60  * Completion is non-atomic between the two promises.
61  *
62  * Cancellation is propagated bidirectionally.
63  *
64  * When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
65  * complete the returned `Deferred` with the same value or exception. This will succeed, barring a
66  * race with cancellation of the `Deferred`.
67  *
68  * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
69  * it will cancel the returned `Deferred`.
70  *
71  * When the returned `Deferred` is [cancelled][Deferred.cancel], it will try to propagate the
72  * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
73  * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
74  * will complete with a different outcome than `this` `ListenableFuture`.
75  */
asDeferrednull76 public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
77     /* This method creates very specific behaviour as it entangles the `Deferred` and
78      * `ListenableFuture`. This behaviour is the best discovered compromise between the possible
79      * states and interface contracts of a `Future` and the states of a `Deferred`. The specific
80      * behaviour is described here.
81      *
82      * When `this` `ListenableFuture` is successfully cancelled - meaning
83      * `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
84      * `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
85      * `Deferred` will always be put into its "cancelling" state and (barring uncooperative
86      * cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
87      * cancelled.
88      *
89      * When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
90      * called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
91      * cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
92      * terminal state.
93      *
94      * The returned `Deferred` may receive and suppress the `true` return value from
95      * `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
96      * This is unavoidable, so make sure no idempotent cancellation work is performed by a
97      * reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
98      * cancellation was from the `Deferred` representation of the task.
99      *
100      * This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
101      * semantics. See `Job` for a description of coroutine cancellation semantics.
102      */
103     // First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
104     // Exception by using the same instance the Future created.
105     if (this is InternalFutureFailureAccess) {
106         val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
107         if (t != null) {
108             return CompletableDeferred<T>().also {
109                 it.completeExceptionally(t)
110             }
111         }
112     }
113 
114     // Second, try the fast path for a completed Future. The Future is known to be done, so get()
115     // will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of
116     // getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to
117     // handle interruption.
118     if (isDone) {
119         return try {
120             CompletableDeferred(Uninterruptibles.getUninterruptibly(this))
121         } catch (e: CancellationException) {
122             CompletableDeferred<T>().also { it.cancel(e) }
123         } catch (e: ExecutionException) {
124             // ExecutionException is the only kind of exception that can be thrown from a gotten
125             // Future. Anything else showing up here indicates a very fundamental bug in a
126             // Future implementation.
127             CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) }
128         }
129     }
130 
131     // Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
132     val deferred = CompletableDeferred<T>()
133     Futures.addCallback(this, object : FutureCallback<T> {
134         override fun onSuccess(result: T) {
135             runCatching { deferred.complete(result) }
136                 .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
137         }
138 
139         override fun onFailure(t: Throwable) {
140             runCatching { deferred.completeExceptionally(t) }
141                 .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
142         }
143     }, MoreExecutors.directExecutor())
144 
145     // ... And cancel the Future when the deferred completes. Since the return type of this method
146     // is Deferred, the only interaction point from the caller is to cancel the Deferred. If this
147     // completion handler runs before the Future is completed, the Deferred must have been
148     // cancelled and should propagate its cancellation. If it runs after the Future is completed,
149     // this is a no-op.
150     deferred.invokeOnCompletion {
151         cancel(false)
152     }
153     // Return hides the CompletableDeferred. This should prevent casting.
154     @OptIn(InternalForInheritanceCoroutinesApi::class)
155     return object : Deferred<T> by deferred {}
156 }
157 
158 /**
159  * Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar.
160  *
161  * [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot
162  * fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught
163  * [Exception].
164  *
165  * If this !! throws [NullPointerException], a Future is breaking its interface contract and losing
166  * state - a serious fundamental bug.
167  */
ExecutionExceptionnull168 private fun ExecutionException.nonNullCause(): Throwable {
169     return this.cause!!
170 }
171 
172 /**
173  * Returns a [ListenableFuture] that is completed or failed by `this` [Deferred].
174  *
175  * Completion is non-atomic between the two promises.
176  *
177  * When either promise successfully completes, it will attempt to synchronously complete its
178  * counterpart with the same value. This will succeed barring a race with cancellation.
179  *
180  * When either promise completes with an Exception, it will attempt to synchronously complete its
181  * counterpart with the same Exception. This will succeed barring a race with cancellation.
182  *
183  * Cancellation is propagated bidirectionally.
184  *
185  * When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true -
186  * [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel
187  * the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a
188  * non-cancelled terminal state.
189  *
190  * When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it
191  * completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously
192  * cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so
193  * the returned `Future` will always _eventually_ reach its cancelled state when either promise is
194  * successfully cancelled, for their different meanings of "successfully cancelled".
195  *
196  * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation
197  * semantics. See [Job] for a description of coroutine cancellation semantics. See
198  * [JobListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and
199  * corner cases of this method.
200  */
asListenableFuturenull201 public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> {
202     val listenableFuture = JobListenableFuture<T>(this)
203     // This invokeOnCompletion completes the JobListenableFuture with the same result as `this` Deferred.
204     // The JobListenableFuture may have completed earlier if it got cancelled! See JobListenableFuture.cancel().
205     invokeOnCompletion { throwable ->
206         if (throwable == null) {
207             listenableFuture.complete(getCompleted())
208         } else {
209             listenableFuture.completeExceptionallyOrCancel(throwable)
210         }
211     }
212     return listenableFuture
213 }
214 
215 /**
216  * Awaits completion of `this` [ListenableFuture] without blocking a thread.
217  *
218  * This suspend function is cancellable.
219  *
220  * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
221  * stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
222  *
223  * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well.
224  * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or
225  * [kotlinx.coroutines.NonCancellable].
226  */
awaitnull227 public suspend fun <T> ListenableFuture<T>.await(): T {
228     try {
229         if (isDone) return Uninterruptibles.getUninterruptibly(this)
230     } catch (e: ExecutionException) {
231         // ExecutionException is the only kind of exception that can be thrown from a gotten
232         // Future, other than CancellationException. Cancellation is propagated upward so that
233         // the coroutine running this suspend function may process it.
234         // Any other Exception showing up here indicates a very fundamental bug in a
235         // Future implementation.
236         throw e.nonNullCause()
237     }
238 
239     return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
240         addListener(
241             ToContinuation(this, cont),
242             MoreExecutors.directExecutor())
243         cont.invokeOnCancellation {
244             cancel(false)
245         }
246     }
247 }
248 
249 /**
250  * Propagates the outcome of [futureToObserve] to [continuation] on completion.
251  *
252  * Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes
253  * and fails, the cause of the Future will be propagated without a wrapping
254  * [ExecutionException] when thrown.
255  */
256 private class ToContinuation<T>(
257     val futureToObserve: ListenableFuture<T>,
258     val continuation: CancellableContinuation<T>
259 ): Runnable {
runnull260     override fun run() {
261         if (futureToObserve.isCancelled) {
262             continuation.cancel()
263         } else {
264             try {
265                 continuation.resume(Uninterruptibles.getUninterruptibly(futureToObserve))
266             } catch (e: ExecutionException) {
267                 // ExecutionException is the only kind of exception that can be thrown from a gotten
268                 // Future. Anything else showing up here indicates a very fundamental bug in a
269                 // Future implementation.
270                 continuation.resumeWithException(e.nonNullCause())
271             }
272         }
273     }
274 }
275 
276 /**
277  * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to
278  * completion.
279  *
280  * If [future] is successfully cancelled, cancellation is propagated to `this` `Coroutine`.
281  * By documented contract, a [Future] has been cancelled if
282  * and only if its `isCancelled()` method returns true.
283  *
284  * Any error that occurs after successfully cancelling a [ListenableFuture] is lost.
285  * The contract of [Future] does not permit it to return an error after it is successfully cancelled.
286  * On the other hand, we can't report an unhandled exception to [CoroutineExceptionHandler],
287  * otherwise [Future.cancel] can lead to an app crash which arguably is a contract violation.
288  * In contrast to [Future] which can't change its outcome after a successful cancellation,
289  * cancelling a [Deferred] places that [Deferred] in the cancelling/cancelled states defined by [Job],
290  * which _can_ show the error.
291  *
292  * This may be counterintuitive, but it maintains the error and cancellation contracts of both
293  * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
294  * to the same running task.
295  */
296 private class ListenableFutureCoroutine<T>(
297     context: CoroutineContext
298 ) : AbstractCoroutine<T>(context, initParentJob = true, active = true) {
299 
300     // JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
301     @JvmField
302     val future = JobListenableFuture<T>(this)
303 
onCompletednull304     override fun onCompleted(value: T) {
305         future.complete(value)
306     }
307 
onCancellednull308     override fun onCancelled(cause: Throwable, handled: Boolean) {
309         // Note: if future was cancelled in a race with a cancellation of this
310         // coroutine, and the future was successfully cancelled first, the cause of coroutine
311         // cancellation is dropped in this promise. A Future can only be completed once.
312         //
313         // This is consistent with FutureTask behaviour. A race between a Future.cancel() and
314         // a FutureTask.setException() for the same Future will similarly drop the
315         // cause of a failure-after-cancellation.
316         future.completeExceptionallyOrCancel(cause)
317     }
318 }
319 
320 /**
321  * A [ListenableFuture] that delegates to an internal [SettableFuture], collaborating with it.
322  *
323  * This setup allows the returned [ListenableFuture] to maintain the following properties:
324  *
325  * - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone]
326  *   and [isCancelled] methods
327  * - Cancellation propagation both to and from [Deferred]
328  * - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
329  *   with different concrete implementations of [ListenableFuture]
330  *   - Fully correct cancellation and listener happens-after obeying [Future] and
331  *     [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
332  *     The best way to be correct, especially given the fun corner cases from
333  *     [AbstractFuture.setFuture], is to just use an [AbstractFuture].
334  *   - To maintain sanity, this class implements [ListenableFuture] and uses an auxiliary [SettableFuture]
335  *     around coroutine's result as a state engine to establish happens-after-completion. This
336  *     could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the
337  *     cost of the implementation's readability.
338  */
339 private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFuture<T> {
340     /**
341      * Serves as a state machine for [Future] cancellation.
342      *
343      * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
344      * cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to
345      * `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned.
346      *
347      * To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled].
348      */
349     private val auxFuture = SettableFuture.create<Any?>()
350 
351     /**
352      * `true` if [auxFuture.get][ListenableFuture.get] throws [ExecutionException].
353      *
354      * Note: this is eventually consistent with the state of [auxFuture].
355      *
356      * Unfortunately, there's no API to figure out if [ListenableFuture] throws [ExecutionException]
357      * apart from calling [ListenableFuture.get] on it. To avoid unnecessary [ExecutionException] allocation
358      * we use this field as an optimization.
359      */
360     private var auxFutureIsFailed: Boolean = false
361 
362     /**
363      * When the attached coroutine [isCompleted][Job.isCompleted] successfully
364      * its outcome should be passed to this method.
365      *
366      * This should succeed barring a race with external cancellation.
367      */
completenull368     fun complete(result: T): Boolean = auxFuture.set(result)
369 
370     /**
371      * When the attached coroutine [isCompleted][Job.isCompleted] [exceptionally][Job.isCancelled]
372      * its outcome should be passed to this method.
373      *
374      * This method will map coroutine's exception into corresponding Future's exception.
375      *
376      * This should succeed barring a race with external cancellation.
377      */
378     // CancellationException is wrapped into `Cancelled` to preserve original cause and message.
379     // All the other exceptions are delegated to SettableFuture.setException.
380     fun completeExceptionallyOrCancel(t: Throwable): Boolean =
381         if (t is CancellationException) auxFuture.set(Cancelled(t))
382         else auxFuture.setException(t).also { if (it) auxFutureIsFailed = true }
383 
384     /**
385      * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
386      * [Job.isCancelled].
387      *
388      * When done, this Future is cancelled if its [auxFuture] is cancelled, or if [auxFuture]
389      * contains [CancellationException].
390      *
391      * See [cancel].
392      */
isCancellednull393     override fun isCancelled(): Boolean {
394         // This expression ensures that isCancelled() will *never* return true when isDone() returns false.
395         // In the case that the deferred has completed with cancellation, completing `this`, its
396         // reaching the "cancelled" state with a cause of CancellationException is treated as the
397         // same thing as auxFuture getting cancelled. If the Job is in the "cancelling" state and
398         // this Future hasn't itself been successfully cancelled, the Future will return
399         // isCancelled() == false. This is the only discovered way to reconcile the two different
400         // cancellation contracts.
401         return auxFuture.isCancelled || isDone && !auxFutureIsFailed && try {
402             Uninterruptibles.getUninterruptibly(auxFuture) is Cancelled
403         } catch (e: CancellationException) {
404             // `auxFuture` got cancelled right after `auxFuture.isCancelled` returned false.
405             true
406         } catch (e: ExecutionException) {
407             // `auxFutureIsFailed` hasn't been updated yet.
408             auxFutureIsFailed = true
409             false
410         }
411     }
412 
413     /**
414      * Waits for [auxFuture] to complete by blocking, then uses its `result`
415      * to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException].
416      * This establishes happens-after ordering for completion of the entangled coroutine.
417      *
418      * [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally.
419      * Otherwise it returns [Cancelled] that encapsulates outcome of the entangled coroutine.
420      *
421      * [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after
422      * contract of [Future] to be correctly followed.
423      */
getnull424     override fun get(): T {
425         return getInternal(auxFuture.get())
426     }
427 
428     /** See [get()]. */
getnull429     override fun get(timeout: Long, unit: TimeUnit): T {
430         return getInternal(auxFuture.get(timeout, unit))
431     }
432 
433     /** See [get()]. */
getInternalnull434     private fun getInternal(result: Any?): T = if (result is Cancelled) {
435         throw CancellationException().initCause(result.exception)
436     } else {
437         // We know that `auxFuture` can contain either `T` or `Cancelled`.
438         @Suppress("UNCHECKED_CAST")
439         result as T
440     }
441 
addListenernull442     override fun addListener(listener: Runnable, executor: Executor) {
443         auxFuture.addListener(listener, executor)
444     }
445 
isDonenull446     override fun isDone(): Boolean {
447         return auxFuture.isDone
448     }
449 
450     /**
451      * Tries to cancel [jobToCancel] if `this` future was cancelled. This is fundamentally racy.
452      *
453      * The call to `cancel()` will try to cancel [auxFuture]: if and only if cancellation of [auxFuture]
454      * succeeds, [jobToCancel] will have its [Job.cancel] called.
455      *
456      * This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves
457      * in a particular way. [jobToCancel] may also be in its "cancelling" state while this
458      * ListenableFuture is complete and cancelled.
459      */
cancelnull460     override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
461         // TODO: call jobToCancel.cancel() _before_ running the listeners.
462         //  `auxFuture.cancel()` will execute auxFuture's listeners. This delays cancellation of
463         //  `jobToCancel` until after auxFuture's listeners have already run.
464         //  Consider moving `jobToCancel.cancel()` into [AbstractFuture.afterDone] when the API is finalized.
465         return if (auxFuture.cancel(mayInterruptIfRunning)) {
466             jobToCancel.cancel()
467             true
468         } else {
469             false
470         }
471     }
472 
<lambda>null473     override fun toString(): String = buildString {
474         append(super.toString())
475         append("[status=")
476         if (isDone) {
477             try {
478                 when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) {
479                     is Cancelled -> append("CANCELLED, cause=[${result.exception}]")
480                     else -> append("SUCCESS, result=[$result]")
481                 }
482             } catch (e: CancellationException) {
483                 // `this` future was cancelled by `Future.cancel`. In this case there's no cause or message.
484                 append("CANCELLED")
485             } catch (e: ExecutionException) {
486                 append("FAILURE, cause=[${e.cause}]")
487             } catch (t: Throwable) {
488                 // Violation of Future's contract, should never happen.
489                 append("UNKNOWN, cause=[${t.javaClass} thrown from get()]")
490             }
491         } else {
492             append("PENDING, delegate=[$auxFuture]")
493         }
494         append(']')
495     }
496 }
497 
498 /**
499  * A wrapper for `Coroutine`'s [CancellationException].
500  *
501  * If the coroutine is _cancelled normally_, we want to show the reason of cancellation to the user. Unfortunately,
502  * [SettableFuture] can't store the reason of cancellation. To mitigate this, we wrap cancellation exception into this
503  * class and pass it into [SettableFuture.complete]. See implementation of [JobListenableFuture].
504  */
505 private class Cancelled(@JvmField val exception: CancellationException)
506