• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.guava
6 
7 import com.google.common.util.concurrent.*
8 import com.google.common.util.concurrent.internal.*
9 import kotlinx.coroutines.*
10 import java.util.concurrent.*
11 import java.util.concurrent.CancellationException
12 import kotlin.coroutines.*
13 
14 /**
15  * Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
16  *
17  * The coroutine is started immediately. Passing [CoroutineStart.LAZY] to [start] throws
18  * [IllegalArgumentException], because Futures don't have a way to start lazily.
19  *
20  * When the created coroutine [isCompleted][Job.isCompleted], it will try to
21  * *synchronously* complete the returned Future with the same outcome. This will
22  * succeed, barring a race with external cancellation of returned [ListenableFuture].
23  *
24  * Cancellation is propagated bidirectionally.
25  *
26  * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be
27  * added/overlaid by passing [context].
28  *
29  * If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor]
30  * member, [Dispatchers.Default] is used.
31  *
32  * The parent job is inherited from this [CoroutineScope], and can be overridden by passing
33  * a [Job] in [context].
34  *
35  * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
36  * facilities.
37  *
38  * Note that the error and cancellation semantics of [future] are _different_ than [async]'s.
39  * In contrast to [Deferred], [Future] doesn't have an intermediate `Cancelling` state. If
40  * the returned `Future` is successfully cancelled, and `block` throws afterward, the thrown
41  * error is dropped, and getting the `Future`'s value will throw a `CancellationException` with
42  * no cause. This is to match the specification and behavior of
43  * `java.util.concurrent.FutureTask`.
44  *
45  * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
46  * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
47  * @param block the code to execute.
48  */
49 public fun <T> CoroutineScope.future(
50     context: CoroutineContext = EmptyCoroutineContext,
51     start: CoroutineStart = CoroutineStart.DEFAULT,
52     block: suspend CoroutineScope.() -> T
53 ): ListenableFuture<T> {
54     require(!start.isLazy) { "$start start is not supported" }
55     val newContext = newCoroutineContext(context)
56     val coroutine = ListenableFutureCoroutine<T>(newContext)
57     coroutine.start(start, coroutine, block)
58     return coroutine.future
59 }
60 
61 /**
62  * Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
63  *
64  * Completion is non-atomic between the two promises.
65  *
66  * Cancellation is propagated bidirectionally.
67  *
68  * When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
69  * complete the returned `Deferred` with the same value or exception. This will succeed, barring a
70  * race with cancellation of the `Deferred`.
71  *
72  * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
73  * it will cancel the returned `Deferred`.
74  *
75  * When the returned `Deferred` is [cancelled][Deferred.cancel], it will try to propagate the
76  * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
77  * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
78  * will complete with a different outcome than `this` `ListenableFuture`.
79  */
asDeferrednull80 public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
81     /* This method creates very specific behaviour as it entangles the `Deferred` and
82      * `ListenableFuture`. This behaviour is the best discovered compromise between the possible
83      * states and interface contracts of a `Future` and the states of a `Deferred`. The specific
84      * behaviour is described here.
85      *
86      * When `this` `ListenableFuture` is successfully cancelled - meaning
87      * `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
88      * `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
89      * `Deferred` will always be put into its "cancelling" state and (barring uncooperative
90      * cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
91      * cancelled.
92      *
93      * When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
94      * called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
95      * cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
96      * terminal state.
97      *
98      * The returned `Deferred` may receive and suppress the `true` return value from
99      * `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
100      * This is unavoidable, so make sure no idempotent cancellation work is performed by a
101      * reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
102      * cancellation was from the `Deferred` representation of the task.
103      *
104      * This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
105      * semantics. See `Job` for a description of coroutine cancellation semantics.
106      */
107     // First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
108     // Exception by using the same instance the Future created.
109     if (this is InternalFutureFailureAccess) {
110         val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
111         if (t != null) {
112             return CompletableDeferred<T>().also {
113                 it.completeExceptionally(t)
114             }
115         }
116     }
117 
118     // Second, try the fast path for a completed Future. The Future is known to be done, so get()
119     // will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of
120     // getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to
121     // handle interruption.
122     if (isDone) {
123         return try {
124             CompletableDeferred(Uninterruptibles.getUninterruptibly(this))
125         } catch (e: CancellationException) {
126             CompletableDeferred<T>().also { it.cancel(e) }
127         } catch (e: ExecutionException) {
128             // ExecutionException is the only kind of exception that can be thrown from a gotten
129             // Future. Anything else showing up here indicates a very fundamental bug in a
130             // Future implementation.
131             CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) }
132         }
133     }
134 
135     // Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
136     val deferred = CompletableDeferred<T>()
137     Futures.addCallback(this, object : FutureCallback<T> {
138         override fun onSuccess(result: T) {
139             runCatching { deferred.complete(result) }
140                 .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
141         }
142 
143         override fun onFailure(t: Throwable) {
144             runCatching { deferred.completeExceptionally(t) }
145                 .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
146         }
147     }, MoreExecutors.directExecutor())
148 
149     // ... And cancel the Future when the deferred completes. Since the return type of this method
150     // is Deferred, the only interaction point from the caller is to cancel the Deferred. If this
151     // completion handler runs before the Future is completed, the Deferred must have been
152     // cancelled and should propagate its cancellation. If it runs after the Future is completed,
153     // this is a no-op.
154     deferred.invokeOnCompletion {
155         cancel(false)
156     }
157     // Return hides the CompletableDeferred. This should prevent casting.
158     return object : Deferred<T> by deferred {}
159 }
160 
161 /**
162  * Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar.
163  *
164  * [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot
165  * fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught
166  * [Exception].
167  *
168  * If this !! throws [NullPointerException], a Future is breaking its interface contract and losing
169  * state - a serious fundamental bug.
170  */
ExecutionExceptionnull171 private fun ExecutionException.nonNullCause(): Throwable {
172     return this.cause!!
173 }
174 
175 /**
176  * Returns a [ListenableFuture] that is completed or failed by `this` [Deferred].
177  *
178  * Completion is non-atomic between the two promises.
179  *
180  * When either promise successfully completes, it will attempt to synchronously complete its
181  * counterpart with the same value. This will succeed barring a race with cancellation.
182  *
183  * When either promise completes with an Exception, it will attempt to synchronously complete its
184  * counterpart with the same Exception. This will succeed barring a race with cancellation.
185  *
186  * Cancellation is propagated bidirectionally.
187  *
188  * When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true -
189  * [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel
190  * the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a
191  * non-cancelled terminal state.
192  *
193  * When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it
194  * completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously
195  * cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so
196  * the returned `Future` will always _eventually_ reach its cancelled state when either promise is
197  * successfully cancelled, for their different meanings of "successfully cancelled".
198  *
199  * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation
200  * semantics. See [Job] for a description of coroutine cancellation semantics. See
201  * [JobListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and
202  * corner cases of this method.
203  */
asListenableFuturenull204 public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> {
205     val listenableFuture = JobListenableFuture<T>(this)
206     // This invokeOnCompletion completes the JobListenableFuture with the same result as `this` Deferred.
207     // The JobListenableFuture may have completed earlier if it got cancelled! See JobListenableFuture.cancel().
208     invokeOnCompletion { throwable ->
209         if (throwable == null) {
210             listenableFuture.complete(getCompleted())
211         } else {
212             listenableFuture.completeExceptionallyOrCancel(throwable)
213         }
214     }
215     return listenableFuture
216 }
217 
218 /**
219  * Awaits completion of `this` [ListenableFuture] without blocking a thread.
220  *
221  * This suspend function is cancellable.
222  *
223  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
224  * stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
225  *
226  * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well.
227  * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or
228  * [kotlinx.coroutines.NonCancellable].
229  */
awaitnull230 public suspend fun <T> ListenableFuture<T>.await(): T {
231     try {
232         if (isDone) return Uninterruptibles.getUninterruptibly(this)
233     } catch (e: ExecutionException) {
234         // ExecutionException is the only kind of exception that can be thrown from a gotten
235         // Future, other than CancellationException. Cancellation is propagated upward so that
236         // the coroutine running this suspend function may process it.
237         // Any other Exception showing up here indicates a very fundamental bug in a
238         // Future implementation.
239         throw e.nonNullCause()
240     }
241 
242     return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
243         addListener(
244             ToContinuation(this, cont),
245             MoreExecutors.directExecutor())
246         cont.invokeOnCancellation {
247             cancel(false)
248         }
249     }
250 }
251 
252 /**
253  * Propagates the outcome of [futureToObserve] to [continuation] on completion.
254  *
255  * Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes
256  * and fails, the cause of the Future will be propagated without a wrapping
257  * [ExecutionException] when thrown.
258  */
259 private class ToContinuation<T>(
260     val futureToObserve: ListenableFuture<T>,
261     val continuation: CancellableContinuation<T>
262 ): Runnable {
runnull263     override fun run() {
264         if (futureToObserve.isCancelled) {
265             continuation.cancel()
266         } else {
267             try {
268                 continuation.resume(Uninterruptibles.getUninterruptibly(futureToObserve))
269             } catch (e: ExecutionException) {
270                 // ExecutionException is the only kind of exception that can be thrown from a gotten
271                 // Future. Anything else showing up here indicates a very fundamental bug in a
272                 // Future implementation.
273                 continuation.resumeWithException(e.nonNullCause())
274             }
275         }
276     }
277 }
278 
279 /**
280  * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to
281  * completion.
282  *
283  * If [future] is successfully cancelled, cancellation is propagated to `this` `Coroutine`.
284  * By documented contract, a [Future] has been cancelled if
285  * and only if its `isCancelled()` method returns true.
286  *
287  * Any error that occurs after successfully cancelling a [ListenableFuture] is lost.
288  * The contract of [Future] does not permit it to return an error after it is successfully cancelled.
289  * On the other hand, we can't report an unhandled exception to [CoroutineExceptionHandler],
290  * otherwise [Future.cancel] can lead to an app crash which arguably is a contract violation.
291  * In contrast to [Future] which can't change its outcome after a successful cancellation,
292  * cancelling a [Deferred] places that [Deferred] in the cancelling/cancelled states defined by [Job],
293  * which _can_ show the error.
294  *
295  * This may be counterintuitive, but it maintains the error and cancellation contracts of both
296  * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
297  * to the same running task.
298  */
299 private class ListenableFutureCoroutine<T>(
300     context: CoroutineContext
301 ) : AbstractCoroutine<T>(context, initParentJob = true, active = true) {
302 
303     // JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
304     @JvmField
305     val future = JobListenableFuture<T>(this)
306 
onCompletednull307     override fun onCompleted(value: T) {
308         future.complete(value)
309     }
310 
onCancellednull311     override fun onCancelled(cause: Throwable, handled: Boolean) {
312         // Note: if future was cancelled in a race with a cancellation of this
313         // coroutine, and the future was successfully cancelled first, the cause of coroutine
314         // cancellation is dropped in this promise. A Future can only be completed once.
315         //
316         // This is consistent with FutureTask behaviour. A race between a Future.cancel() and
317         // a FutureTask.setException() for the same Future will similarly drop the
318         // cause of a failure-after-cancellation.
319         future.completeExceptionallyOrCancel(cause)
320     }
321 }
322 
323 /**
324  * A [ListenableFuture] that delegates to an internal [SettableFuture], collaborating with it.
325  *
326  * This setup allows the returned [ListenableFuture] to maintain the following properties:
327  *
328  * - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone]
329  *   and [isCancelled] methods
330  * - Cancellation propagation both to and from [Deferred]
331  * - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
332  *   with different concrete implementations of [ListenableFuture]
333  *   - Fully correct cancellation and listener happens-after obeying [Future] and
334  *     [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
335  *     The best way to be correct, especially given the fun corner cases from
336  *     [AbstractFuture.setFuture], is to just use an [AbstractFuture].
337  *   - To maintain sanity, this class implements [ListenableFuture] and uses an auxiliary [SettableFuture]
338  *     around coroutine's result as a state engine to establish happens-after-completion. This
339  *     could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the
340  *     cost of the implementation's readability.
341  */
342 private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFuture<T> {
343     /**
344      * Serves as a state machine for [Future] cancellation.
345      *
346      * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
347      * cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to
348      * `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned.
349      *
350      * To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled].
351      */
352     private val auxFuture = SettableFuture.create<Any?>()
353 
354     /**
355      * `true` if [auxFuture.get][ListenableFuture.get] throws [ExecutionException].
356      *
357      * Note: this is eventually consistent with the state of [auxFuture].
358      *
359      * Unfortunately, there's no API to figure out if [ListenableFuture] throws [ExecutionException]
360      * apart from calling [ListenableFuture.get] on it. To avoid unnecessary [ExecutionException] allocation
361      * we use this field as an optimization.
362      */
363     private var auxFutureIsFailed: Boolean = false
364 
365     /**
366      * When the attached coroutine [isCompleted][Job.isCompleted] successfully
367      * its outcome should be passed to this method.
368      *
369      * This should succeed barring a race with external cancellation.
370      */
completenull371     fun complete(result: T): Boolean = auxFuture.set(result)
372 
373     /**
374      * When the attached coroutine [isCompleted][Job.isCompleted] [exceptionally][Job.isCancelled]
375      * its outcome should be passed to this method.
376      *
377      * This method will map coroutine's exception into corresponding Future's exception.
378      *
379      * This should succeed barring a race with external cancellation.
380      */
381     // CancellationException is wrapped into `Cancelled` to preserve original cause and message.
382     // All the other exceptions are delegated to SettableFuture.setException.
383     fun completeExceptionallyOrCancel(t: Throwable): Boolean =
384         if (t is CancellationException) auxFuture.set(Cancelled(t))
385         else auxFuture.setException(t).also { if (it) auxFutureIsFailed = true }
386 
387     /**
388      * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
389      * [Job.isCancelled].
390      *
391      * When done, this Future is cancelled if its [auxFuture] is cancelled, or if [auxFuture]
392      * contains [CancellationException].
393      *
394      * See [cancel].
395      */
isCancellednull396     override fun isCancelled(): Boolean {
397         // This expression ensures that isCancelled() will *never* return true when isDone() returns false.
398         // In the case that the deferred has completed with cancellation, completing `this`, its
399         // reaching the "cancelled" state with a cause of CancellationException is treated as the
400         // same thing as auxFuture getting cancelled. If the Job is in the "cancelling" state and
401         // this Future hasn't itself been successfully cancelled, the Future will return
402         // isCancelled() == false. This is the only discovered way to reconcile the two different
403         // cancellation contracts.
404         return auxFuture.isCancelled || isDone && !auxFutureIsFailed && try {
405             Uninterruptibles.getUninterruptibly(auxFuture) is Cancelled
406         } catch (e: CancellationException) {
407             // `auxFuture` got cancelled right after `auxFuture.isCancelled` returned false.
408             true
409         } catch (e: ExecutionException) {
410             // `auxFutureIsFailed` hasn't been updated yet.
411             auxFutureIsFailed = true
412             false
413         }
414     }
415 
416     /**
417      * Waits for [auxFuture] to complete by blocking, then uses its `result`
418      * to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException].
419      * This establishes happens-after ordering for completion of the entangled coroutine.
420      *
421      * [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally.
422      * Otherwise it returns [Cancelled] that encapsulates outcome of the entangled coroutine.
423      *
424      * [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after
425      * contract of [Future] to be correctly followed.
426      */
getnull427     override fun get(): T {
428         return getInternal(auxFuture.get())
429     }
430 
431     /** See [get()]. */
getnull432     override fun get(timeout: Long, unit: TimeUnit): T {
433         return getInternal(auxFuture.get(timeout, unit))
434     }
435 
436     /** See [get()]. */
getInternalnull437     private fun getInternal(result: Any?): T = if (result is Cancelled) {
438         throw CancellationException().initCause(result.exception)
439     } else {
440         // We know that `auxFuture` can contain either `T` or `Cancelled`.
441         @Suppress("UNCHECKED_CAST")
442         result as T
443     }
444 
addListenernull445     override fun addListener(listener: Runnable, executor: Executor) {
446         auxFuture.addListener(listener, executor)
447     }
448 
isDonenull449     override fun isDone(): Boolean {
450         return auxFuture.isDone
451     }
452 
453     /**
454      * Tries to cancel [jobToCancel] if `this` future was cancelled. This is fundamentally racy.
455      *
456      * The call to `cancel()` will try to cancel [auxFuture]: if and only if cancellation of [auxFuture]
457      * succeeds, [jobToCancel] will have its [Job.cancel] called.
458      *
459      * This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves
460      * in a particular way. [jobToCancel] may also be in its "cancelling" state while this
461      * ListenableFuture is complete and cancelled.
462      */
cancelnull463     override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
464         // TODO: call jobToCancel.cancel() _before_ running the listeners.
465         //  `auxFuture.cancel()` will execute auxFuture's listeners. This delays cancellation of
466         //  `jobToCancel` until after auxFuture's listeners have already run.
467         //  Consider moving `jobToCancel.cancel()` into [AbstractFuture.afterDone] when the API is finalized.
468         return if (auxFuture.cancel(mayInterruptIfRunning)) {
469             jobToCancel.cancel()
470             true
471         } else {
472             false
473         }
474     }
475 
<lambda>null476     override fun toString(): String = buildString {
477         append(super.toString())
478         append("[status=")
479         if (isDone) {
480             try {
481                 when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) {
482                     is Cancelled -> append("CANCELLED, cause=[${result.exception}]")
483                     else -> append("SUCCESS, result=[$result]")
484                 }
485             } catch (e: CancellationException) {
486                 // `this` future was cancelled by `Future.cancel`. In this case there's no cause or message.
487                 append("CANCELLED")
488             } catch (e: ExecutionException) {
489                 append("FAILURE, cause=[${e.cause}]")
490             } catch (t: Throwable) {
491                 // Violation of Future's contract, should never happen.
492                 append("UNKNOWN, cause=[${t.javaClass} thrown from get()]")
493             }
494         } else {
495             append("PENDING, delegate=[$auxFuture]")
496         }
497         append(']')
498     }
499 }
500 
501 /**
502  * A wrapper for `Coroutine`'s [CancellationException].
503  *
504  * If the coroutine is _cancelled normally_, we want to show the reason of cancellation to the user. Unfortunately,
505  * [SettableFuture] can't store the reason of cancellation. To mitigate this, we wrap cancellation exception into this
506  * class and pass it into [SettableFuture.complete]. See implementation of [JobListenableFuture].
507  */
508 private class Cancelled(@JvmField val exception: CancellationException)
509