• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.guava
6 
7 import com.google.common.util.concurrent.*
8 import com.google.common.util.concurrent.internal.*
9 import kotlinx.coroutines.*
10 import java.util.concurrent.*
11 import java.util.concurrent.CancellationException
12 import kotlin.coroutines.*
13 
14 /**
15  * Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
16  *
17  * The coroutine is immediately started. Passing [CoroutineStart.LAZY] to [start] throws
18  * [IllegalArgumentException], because Futures don't have a way to start lazily.
19  *
20  * The created coroutine is cancelled when the resulting future completes successfully, fails, or
21  * is cancelled.
22  *
23  * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be
24  * added/overlaid by passing [context].
25  *
26  * If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor]
27  * member, [Dispatchers.Default] is used.
28  *
29  * The parent job is inherited from this [CoroutineScope], and can be overridden by passing
30  * a [Job] in [context].
31  *
32  * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
33  * facilities.
34  *
35  * Note that the error and cancellation semantics of [future] are _subtly different_ than
36  * [asListenableFuture]'s. See [ListenableFutureCoroutine] for details.
37  *
38  * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
39  * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
40  * @param block the code to execute.
41  */
42 public fun <T> CoroutineScope.future(
43     context: CoroutineContext = EmptyCoroutineContext,
44     start: CoroutineStart = CoroutineStart.DEFAULT,
45     block: suspend CoroutineScope.() -> T
46 ): ListenableFuture<T> {
47     require(!start.isLazy) { "$start start is not supported" }
48     val newContext = newCoroutineContext(context)
49     val future = SettableFuture.create<T>()
50     val coroutine = ListenableFutureCoroutine(newContext, future)
51     future.addListener(
52       coroutine,
53       MoreExecutors.directExecutor())
54     coroutine.start(start, coroutine, block)
55     // Return hides the SettableFuture. This should prevent casting.
56     return object: ListenableFuture<T> by future {}
57 }
58 
59 /**
60  * Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
61  *
62  * Completion is non-atomic between the two promises.
63  *
64  * Cancellation is propagated bidirectionally.
65  *
66  * When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
67  * complete the returned `Deferred` with the same value or exception. This will succeed, barring a
68  * race with cancellation of the `Deferred`.
69  *
70  * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
71  * it will cancel the returned `Deferred`.
72  *
73  * When the returned `Deferred` is [cancelled][Deferred.cancel()], it will try to propagate the
74  * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
75  * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
76  * will complete with a different outcome than `this` `ListenableFuture`.
77  */
asDeferrednull78 public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
79     /* This method creates very specific behaviour as it entangles the `Deferred` and
80      * `ListenableFuture`. This behaviour is the best discovered compromise between the possible
81      * states and interface contracts of a `Future` and the states of a `Deferred`. The specific
82      * behaviour is described here.
83      *
84      * When `this` `ListenableFuture` is successfully cancelled - meaning
85      * `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
86      * `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
87      * `Deferred` will always be put into its "cancelling" state and (barring uncooperative
88      * cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
89      * cancelled.
90      *
91      * When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
92      * called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
93      * cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
94      * terminal state.
95      *
96      * The returned `Deferred` may receive and suppress the `true` return value from
97      * `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
98      * This is unavoidable, so make sure no idempotent cancellation work is performed by a
99      * reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
100      * cancellation was from the `Deferred` representation of the task.
101      *
102      * This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
103      * semantics. See `Job` for a description of coroutine cancellation semantics.
104      */
105     // First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
106     // Exception by using the same instance the Future created.
107     if (this is InternalFutureFailureAccess) {
108         val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
109         if (t != null) {
110             return CompletableDeferred<T>().also {
111                 it.completeExceptionally(t)
112             }
113         }
114     }
115 
116     // Second, try the fast path for a completed Future. The Future is known to be done, so get()
117     // will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of
118     // getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to
119     // handle interruption.
120     if (isDone) {
121         return try {
122             CompletableDeferred(Uninterruptibles.getUninterruptibly(this))
123         } catch (e: CancellationException) {
124             CompletableDeferred<T>().also { it.cancel(e) }
125         } catch (e: ExecutionException) {
126             // ExecutionException is the only kind of exception that can be thrown from a gotten
127             // Future. Anything else showing up here indicates a very fundamental bug in a
128             // Future implementation.
129             CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) }
130         }
131     }
132 
133     // Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
134     val deferred = CompletableDeferred<T>()
135     Futures.addCallback(this, object : FutureCallback<T> {
136         override fun onSuccess(result: T?) {
137             // Here we work with flexible types, so we unchecked cast to trick the type system
138             @Suppress("UNCHECKED_CAST")
139             deferred.complete(result as T)
140         }
141 
142         override fun onFailure(t: Throwable) {
143             deferred.completeExceptionally(t)
144         }
145     }, MoreExecutors.directExecutor())
146 
147     // ... And cancel the Future when the deferred completes. Since the return type of this method
148     // is Deferred, the only interaction point from the caller is to cancel the Deferred. If this
149     // completion handler runs before the Future is completed, the Deferred must have been
150     // cancelled and should propagate its cancellation. If it runs after the Future is completed,
151     // this is a no-op.
152     deferred.invokeOnCompletion {
153         cancel(false)
154     }
155     return deferred
156 }
157 
158 /**
159  * Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar.
160  *
161  * [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot
162  * fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught
163  * [Exception].
164  *
165  * If this !! throws [NullPointerException], a Future is breaking its interface contract and losing
166  * state - a serious fundamental bug.
167  */
ExecutionExceptionnull168 private fun ExecutionException.nonNullCause(): Throwable {
169   return this.cause!!
170 }
171 
172 /**
173  * Returns a [ListenableFuture] that is completed or failed by `this` [Deferred].
174  *
175  * Completion is non-atomic between the two promises.
176  *
177  * When either promise successfully completes, it will attempt to synchronously complete its
178  * counterpart with the same value. This will succeed barring a race with cancellation.
179  *
180  * When either promise completes with an Exception, it will attempt to synchronously complete its
181  * counterpart with the same Exception. This will succeed barring a race with cancellation.
182  *
183  * Cancellation is propagated bidirectionally.
184  *
185  * When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true -
186  * [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel
187  * the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a
188  * non-cancelled terminal state.
189  *
190  * When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it
191  * completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously
192  * cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so
193  * the returned `Future` will always _eventually_ reach its cancelled state when either promise is
194  * successfully cancelled, for their different meanings of "successfully cancelled".
195  *
196  * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation
197  * semantics. See [Job] for a description of coroutine cancellation semantics. See
198  * [DeferredListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and
199  * corner cases of this method.
200  */
asListenableFuturenull201 public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> {
202     val outerFuture = OuterFuture<T>(this)
203     outerFuture.afterInit()
204     return outerFuture
205 }
206 
207 /**
208  * Awaits completion of `this` [ListenableFuture] without blocking a thread.
209  *
210  * This suspend function is cancellable.
211  *
212  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
213  * stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
214  *
215  * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well.
216  * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or
217  * [kotlinx.coroutines.NonCancellable].
218  *
219  */
awaitnull220 public suspend fun <T> ListenableFuture<T>.await(): T {
221     try {
222         if (isDone) return Uninterruptibles.getUninterruptibly(this)
223     } catch (e: ExecutionException) {
224         // ExecutionException is the only kind of exception that can be thrown from a gotten
225         // Future, other than CancellationException. Cancellation is propagated upward so that
226         // the coroutine running this suspend function may process it.
227         // Any other Exception showing up here indicates a very fundamental bug in a
228         // Future implementation.
229         throw e.nonNullCause()
230     }
231 
232     return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
233         addListener(
234           ToContinuation(this, cont),
235           MoreExecutors.directExecutor())
236         cont.invokeOnCancellation {
237             cancel(false)
238         }
239     }
240 }
241 
242 /**
243  * Propagates the outcome of [futureToObserve] to [continuation] on completion.
244  *
245  * Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes
246  * and fails, the cause of the Future will be propagated without a wrapping
247  * [ExecutionException] when thrown.
248  */
249 private class ToContinuation<T>(
250     val futureToObserve: ListenableFuture<T>,
251     val continuation: CancellableContinuation<T>
252 ): Runnable {
runnull253     override fun run() {
254         if (futureToObserve.isCancelled) {
255             continuation.cancel()
256         } else {
257             try {
258                 continuation.resumeWith(
259                   Result.success(Uninterruptibles.getUninterruptibly(futureToObserve)))
260             } catch (e: ExecutionException) {
261                 // ExecutionException is the only kind of exception that can be thrown from a gotten
262                 // Future. Anything else showing up here indicates a very fundamental bug in a
263                 // Future implementation.
264                 continuation.resumeWithException(e.nonNullCause())
265             }
266         }
267     }
268 }
269 
270 /**
271  * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to
272  * completion.
273  *
274  * The code in the [Runnable] portion of the class is registered as a [ListenableFuture] callback.
275  * See [run] for details. Both types are implemented by this object to save an allocation.
276  */
277 private class ListenableFutureCoroutine<T>(
278     context: CoroutineContext,
279     private val future: SettableFuture<T>
280 ) : AbstractCoroutine<T>(context), Runnable  {
281 
282     /**
283      * When registered as a [ListenableFuture] listener, cancels the returned [Coroutine] if
284      * [future] is successfully cancelled. By documented contract, a [Future] has been cancelled if
285      * and only if its `isCancelled()` method returns true.
286      *
287      * Any error that occurs after successfully cancelling a [ListenableFuture]
288      * created by submitting the returned object as a [Runnable] to an `Executor` will be passed
289      * to the [CoroutineExceptionHandler] from the context. The contract of [Future] does not permit
290      * it to return an error after it is successfully cancelled.
291      *
292      * By calling [asListenableFuture] on a [Deferred], any error that occurs after successfully
293      * cancelling the [ListenableFuture] representation of the [Deferred] will _not_ be passed to
294      * the [CoroutineExceptionHandler]. Cancelling a [Deferred] places that [Deferred] in the
295      * cancelling/cancelled states defined by [Job], which _can_ show the error. It's assumed that
296      * the [Deferred] pointing to the task will be used to observe any error outcome occurring after
297      * cancellation.
298      *
299      * This may be counterintuitive, but it maintains the error and cancellation contracts of both
300      * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
301      * to the same running task.
302      */
runnull303     override fun run() {
304         if (future.isCancelled) {
305             cancel()
306         }
307     }
308 
onCompletednull309     override fun onCompleted(value: T) {
310         future.set(value)
311     }
312 
313     // TODO: This doesn't actually cancel the Future. There doesn't seem to be bidi cancellation?
onCancellednull314     override fun onCancelled(cause: Throwable, handled: Boolean) {
315         if (!future.setException(cause) && !handled) {
316             // prevents loss of exception that was not handled by parent & could not be set to SettableFuture
317             handleCoroutineException(context, cause)
318         }
319     }
320 }
321 
322 /**
323  * A [ListenableFuture] that delegates to an internal [DeferredListenableFuture], collaborating with
324  * it.
325  *
326  * This setup allows the returned [ListenableFuture] to maintain the following properties:
327  *
328  * - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone]
329  *   and [isCancelled] methods
330  * - Cancellation propagation both to and from [Deferred]
331  * - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
332  *   with different concrete implementations of [ListenableFuture]
333  *   - Fully correct cancellation and listener happens-after obeying [Future] and
334  *     [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
335  *     The best way to be correct, especially given the fun corner cases from
336  *     [AsyncFuture.setAsync], is to just use an [AsyncFuture].
337  *   - To maintain sanity, this class implements [ListenableFuture] and uses an inner [AsyncFuture]
338  *     around its input [deferred] as a state engine to establish happens-after-completion. This
339  *     could probably be compressed into one subclass of [AsyncFuture] to save an allocation, at the
340  *     cost of the implementation's readability.
341  */
342 private class OuterFuture<T>(private val deferred: Deferred<T>): ListenableFuture<T> {
343     val innerFuture = DeferredListenableFuture(deferred)
344 
345     // Adding the listener after initialization resolves partial construction hairpin problem.
346     //
347     // This invokeOnCompletion completes the innerFuture as `deferred`  does. The innerFuture may
348     // have completed earlier if it got cancelled! See DeferredListenableFuture.
afterInitnull349     fun afterInit() {
350         deferred.invokeOnCompletion {
351             innerFuture.complete()
352         }
353     }
354 
355     /**
356      * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
357      * [Job.isCancelled].
358      *
359      * When done, this Future is cancelled if its innerFuture is cancelled, or if its delegate
360      * [deferred] is cancelled. Cancellation of [innerFuture] collaborates with this class.
361      *
362      * See [DeferredListenableFuture.cancel].
363      */
isCancellednull364     override fun isCancelled(): Boolean {
365         // This expression ensures that isCancelled() will *never* return true when isDone() returns false.
366         // In the case that the deferred has completed with cancellation, completing `this`, its
367         // reaching the "cancelled" state with a cause of CancellationException is treated as the
368         // same thing as innerFuture getting cancelled. If the Job is in the "cancelling" state and
369         // this Future hasn't itself been successfully cancelled, the Future will return
370         // isCancelled() == false. This is the only discovered way to reconcile the two different
371         // cancellation contracts.
372         return isDone
373           && (innerFuture.isCancelled
374           || deferred.getCompletionExceptionOrNull() is kotlinx.coroutines.CancellationException)
375     }
376 
377     /**
378      * Waits for [innerFuture] to complete by blocking, then uses the [deferred] returned by that
379      * Future to get the `T` value `this` [ListenableFuture] is pointing to. This establishes
380      * happens-after ordering for completion of the [Deferred] input to [OuterFuture].
381      *
382      * `innerFuture` _must be complete_ in order for the [isDone] and [isCancelled] happens-after
383      * contract of [Future] to be correctly followed. If this method were to directly use
384      * _`this.deferred`_ instead of blocking on its `innerFuture`, the [Deferred] that this
385      * [ListenableFuture] is created from might be in an incomplete state when used by `get()`.
386      */
getnull387     override fun get(): T {
388         return getInternal(innerFuture.get())
389     }
390 
391     /** See [get()]. */
getnull392     override fun get(timeout: Long, unit: TimeUnit): T {
393         return getInternal(innerFuture.get(timeout, unit))
394     }
395 
396     /** See [get()]. */
getInternalnull397     private fun getInternal(deferred: Deferred<T>): T {
398         if (deferred.isCancelled) {
399             val exception = deferred.getCompletionExceptionOrNull()
400             if (exception is kotlinx.coroutines.CancellationException) {
401                 throw exception
402             } else {
403                 throw ExecutionException(exception)
404             }
405         } else {
406             return deferred.getCompleted()
407         }
408     }
409 
addListenernull410     override fun addListener(listener: Runnable, executor: Executor) {
411         innerFuture.addListener(listener, executor)
412     }
413 
isDonenull414     override fun isDone(): Boolean {
415         return innerFuture.isDone
416     }
417 
cancelnull418     override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
419         return innerFuture.cancel(mayInterruptIfRunning)
420     }
421 }
422 
423 /**
424  * Holds a delegate deferred, and serves as a state machine for [Future] cancellation.
425  *
426  * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
427  * cancellation semantics. By using that type, the [OuterFuture] can delegate its semantics to
428  * _this_ `Future` `get()` the result in such a way that the `Deferred` is always complete when
429  * returned.
430  */
431 private class DeferredListenableFuture<T>(
432     private val deferred: Deferred<T>
433 ) : AbstractFuture<Deferred<T>>() {
434 
completenull435     fun complete() {
436         set(deferred)
437     }
438 
439     /**
440      * Tries to cancel the task. This is fundamentally racy.
441      *
442      * For any given call to `cancel()`, if [deferred] is already completed, the call will complete
443      * this Future with it, and fail to cancel. Otherwise, the
444      * call to `cancel()` will try to cancel this Future: if and only if cancellation of this
445      * succeeds, [deferred] will have its [Deferred.cancel] called.
446      *
447      * This arrangement means that [deferred] _might not successfully cancel_, if the race resolves
448      * in a particular way. [deferred] may also be in its "cancelling" state while this
449      * ListenableFuture is complete and cancelled.
450      *
451      * [OuterFuture] collaborates with this class to present a more cohesive picture and ensure
452      * that certain combinations of cancelled/cancelling states can't be observed.
453      */
cancelnull454     override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
455         return if (super.cancel(mayInterruptIfRunning)) {
456             deferred.cancel()
457             true
458         } else {
459             false
460         }
461     }
462 }
463