1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.guava
6
7 import com.google.common.util.concurrent.*
8 import com.google.common.util.concurrent.internal.*
9 import kotlinx.coroutines.*
10 import java.util.concurrent.*
11 import java.util.concurrent.CancellationException
12 import kotlin.coroutines.*
13
14 /**
15 * Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
16 *
17 * The coroutine is immediately started. Passing [CoroutineStart.LAZY] to [start] throws
18 * [IllegalArgumentException], because Futures don't have a way to start lazily.
19 *
20 * The created coroutine is cancelled when the resulting future completes successfully, fails, or
21 * is cancelled.
22 *
23 * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be
24 * added/overlaid by passing [context].
25 *
26 * If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor]
27 * member, [Dispatchers.Default] is used.
28 *
29 * The parent job is inherited from this [CoroutineScope], and can be overridden by passing
30 * a [Job] in [context].
31 *
32 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
33 * facilities.
34 *
35 * Note that the error and cancellation semantics of [future] are _subtly different_ than
36 * [asListenableFuture]'s. See [ListenableFutureCoroutine] for details.
37 *
38 * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
39 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
40 * @param block the code to execute.
41 */
42 public fun <T> CoroutineScope.future(
43 context: CoroutineContext = EmptyCoroutineContext,
44 start: CoroutineStart = CoroutineStart.DEFAULT,
45 block: suspend CoroutineScope.() -> T
46 ): ListenableFuture<T> {
47 require(!start.isLazy) { "$start start is not supported" }
48 val newContext = newCoroutineContext(context)
49 val future = SettableFuture.create<T>()
50 val coroutine = ListenableFutureCoroutine(newContext, future)
51 future.addListener(
52 coroutine,
53 MoreExecutors.directExecutor())
54 coroutine.start(start, coroutine, block)
55 // Return hides the SettableFuture. This should prevent casting.
56 return object: ListenableFuture<T> by future {}
57 }
58
59 /**
60 * Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
61 *
62 * Completion is non-atomic between the two promises.
63 *
64 * Cancellation is propagated bidirectionally.
65 *
66 * When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
67 * complete the returned `Deferred` with the same value or exception. This will succeed, barring a
68 * race with cancellation of the `Deferred`.
69 *
70 * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
71 * it will cancel the returned `Deferred`.
72 *
73 * When the returned `Deferred` is [cancelled][Deferred.cancel()], it will try to propagate the
74 * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
75 * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
76 * will complete with a different outcome than `this` `ListenableFuture`.
77 */
asDeferrednull78 public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
79 /* This method creates very specific behaviour as it entangles the `Deferred` and
80 * `ListenableFuture`. This behaviour is the best discovered compromise between the possible
81 * states and interface contracts of a `Future` and the states of a `Deferred`. The specific
82 * behaviour is described here.
83 *
84 * When `this` `ListenableFuture` is successfully cancelled - meaning
85 * `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
86 * `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
87 * `Deferred` will always be put into its "cancelling" state and (barring uncooperative
88 * cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
89 * cancelled.
90 *
91 * When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
92 * called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
93 * cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
94 * terminal state.
95 *
96 * The returned `Deferred` may receive and suppress the `true` return value from
97 * `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
98 * This is unavoidable, so make sure no idempotent cancellation work is performed by a
99 * reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
100 * cancellation was from the `Deferred` representation of the task.
101 *
102 * This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
103 * semantics. See `Job` for a description of coroutine cancellation semantics.
104 */
105 // First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
106 // Exception by using the same instance the Future created.
107 if (this is InternalFutureFailureAccess) {
108 val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
109 if (t != null) {
110 return CompletableDeferred<T>().also {
111 it.completeExceptionally(t)
112 }
113 }
114 }
115
116 // Second, try the fast path for a completed Future. The Future is known to be done, so get()
117 // will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of
118 // getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to
119 // handle interruption.
120 if (isDone) {
121 return try {
122 CompletableDeferred(Uninterruptibles.getUninterruptibly(this))
123 } catch (e: CancellationException) {
124 CompletableDeferred<T>().also { it.cancel(e) }
125 } catch (e: ExecutionException) {
126 // ExecutionException is the only kind of exception that can be thrown from a gotten
127 // Future. Anything else showing up here indicates a very fundamental bug in a
128 // Future implementation.
129 CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) }
130 }
131 }
132
133 // Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
134 val deferred = CompletableDeferred<T>()
135 Futures.addCallback(this, object : FutureCallback<T> {
136 override fun onSuccess(result: T?) {
137 // Here we work with flexible types, so we unchecked cast to trick the type system
138 @Suppress("UNCHECKED_CAST")
139 deferred.complete(result as T)
140 }
141
142 override fun onFailure(t: Throwable) {
143 deferred.completeExceptionally(t)
144 }
145 }, MoreExecutors.directExecutor())
146
147 // ... And cancel the Future when the deferred completes. Since the return type of this method
148 // is Deferred, the only interaction point from the caller is to cancel the Deferred. If this
149 // completion handler runs before the Future is completed, the Deferred must have been
150 // cancelled and should propagate its cancellation. If it runs after the Future is completed,
151 // this is a no-op.
152 deferred.invokeOnCompletion {
153 cancel(false)
154 }
155 return deferred
156 }
157
158 /**
159 * Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar.
160 *
161 * [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot
162 * fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught
163 * [Exception].
164 *
165 * If this !! throws [NullPointerException], a Future is breaking its interface contract and losing
166 * state - a serious fundamental bug.
167 */
ExecutionExceptionnull168 private fun ExecutionException.nonNullCause(): Throwable {
169 return this.cause!!
170 }
171
172 /**
173 * Returns a [ListenableFuture] that is completed or failed by `this` [Deferred].
174 *
175 * Completion is non-atomic between the two promises.
176 *
177 * When either promise successfully completes, it will attempt to synchronously complete its
178 * counterpart with the same value. This will succeed barring a race with cancellation.
179 *
180 * When either promise completes with an Exception, it will attempt to synchronously complete its
181 * counterpart with the same Exception. This will succeed barring a race with cancellation.
182 *
183 * Cancellation is propagated bidirectionally.
184 *
185 * When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true -
186 * [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel
187 * the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a
188 * non-cancelled terminal state.
189 *
190 * When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it
191 * completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously
192 * cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so
193 * the returned `Future` will always _eventually_ reach its cancelled state when either promise is
194 * successfully cancelled, for their different meanings of "successfully cancelled".
195 *
196 * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation
197 * semantics. See [Job] for a description of coroutine cancellation semantics. See
198 * [DeferredListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and
199 * corner cases of this method.
200 */
asListenableFuturenull201 public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> {
202 val outerFuture = OuterFuture<T>(this)
203 outerFuture.afterInit()
204 return outerFuture
205 }
206
207 /**
208 * Awaits completion of `this` [ListenableFuture] without blocking a thread.
209 *
210 * This suspend function is cancellable.
211 *
212 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
213 * stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
214 *
215 * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well.
216 * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or
217 * [kotlinx.coroutines.NonCancellable].
218 *
219 */
awaitnull220 public suspend fun <T> ListenableFuture<T>.await(): T {
221 try {
222 if (isDone) return Uninterruptibles.getUninterruptibly(this)
223 } catch (e: ExecutionException) {
224 // ExecutionException is the only kind of exception that can be thrown from a gotten
225 // Future, other than CancellationException. Cancellation is propagated upward so that
226 // the coroutine running this suspend function may process it.
227 // Any other Exception showing up here indicates a very fundamental bug in a
228 // Future implementation.
229 throw e.nonNullCause()
230 }
231
232 return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
233 addListener(
234 ToContinuation(this, cont),
235 MoreExecutors.directExecutor())
236 cont.invokeOnCancellation {
237 cancel(false)
238 }
239 }
240 }
241
242 /**
243 * Propagates the outcome of [futureToObserve] to [continuation] on completion.
244 *
245 * Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes
246 * and fails, the cause of the Future will be propagated without a wrapping
247 * [ExecutionException] when thrown.
248 */
249 private class ToContinuation<T>(
250 val futureToObserve: ListenableFuture<T>,
251 val continuation: CancellableContinuation<T>
252 ): Runnable {
runnull253 override fun run() {
254 if (futureToObserve.isCancelled) {
255 continuation.cancel()
256 } else {
257 try {
258 continuation.resumeWith(
259 Result.success(Uninterruptibles.getUninterruptibly(futureToObserve)))
260 } catch (e: ExecutionException) {
261 // ExecutionException is the only kind of exception that can be thrown from a gotten
262 // Future. Anything else showing up here indicates a very fundamental bug in a
263 // Future implementation.
264 continuation.resumeWithException(e.nonNullCause())
265 }
266 }
267 }
268 }
269
270 /**
271 * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to
272 * completion.
273 *
274 * The code in the [Runnable] portion of the class is registered as a [ListenableFuture] callback.
275 * See [run] for details. Both types are implemented by this object to save an allocation.
276 */
277 private class ListenableFutureCoroutine<T>(
278 context: CoroutineContext,
279 private val future: SettableFuture<T>
280 ) : AbstractCoroutine<T>(context), Runnable {
281
282 /**
283 * When registered as a [ListenableFuture] listener, cancels the returned [Coroutine] if
284 * [future] is successfully cancelled. By documented contract, a [Future] has been cancelled if
285 * and only if its `isCancelled()` method returns true.
286 *
287 * Any error that occurs after successfully cancelling a [ListenableFuture]
288 * created by submitting the returned object as a [Runnable] to an `Executor` will be passed
289 * to the [CoroutineExceptionHandler] from the context. The contract of [Future] does not permit
290 * it to return an error after it is successfully cancelled.
291 *
292 * By calling [asListenableFuture] on a [Deferred], any error that occurs after successfully
293 * cancelling the [ListenableFuture] representation of the [Deferred] will _not_ be passed to
294 * the [CoroutineExceptionHandler]. Cancelling a [Deferred] places that [Deferred] in the
295 * cancelling/cancelled states defined by [Job], which _can_ show the error. It's assumed that
296 * the [Deferred] pointing to the task will be used to observe any error outcome occurring after
297 * cancellation.
298 *
299 * This may be counterintuitive, but it maintains the error and cancellation contracts of both
300 * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
301 * to the same running task.
302 */
runnull303 override fun run() {
304 if (future.isCancelled) {
305 cancel()
306 }
307 }
308
onCompletednull309 override fun onCompleted(value: T) {
310 future.set(value)
311 }
312
313 // TODO: This doesn't actually cancel the Future. There doesn't seem to be bidi cancellation?
onCancellednull314 override fun onCancelled(cause: Throwable, handled: Boolean) {
315 if (!future.setException(cause) && !handled) {
316 // prevents loss of exception that was not handled by parent & could not be set to SettableFuture
317 handleCoroutineException(context, cause)
318 }
319 }
320 }
321
322 /**
323 * A [ListenableFuture] that delegates to an internal [DeferredListenableFuture], collaborating with
324 * it.
325 *
326 * This setup allows the returned [ListenableFuture] to maintain the following properties:
327 *
328 * - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone]
329 * and [isCancelled] methods
330 * - Cancellation propagation both to and from [Deferred]
331 * - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
332 * with different concrete implementations of [ListenableFuture]
333 * - Fully correct cancellation and listener happens-after obeying [Future] and
334 * [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
335 * The best way to be correct, especially given the fun corner cases from
336 * [AsyncFuture.setAsync], is to just use an [AsyncFuture].
337 * - To maintain sanity, this class implements [ListenableFuture] and uses an inner [AsyncFuture]
338 * around its input [deferred] as a state engine to establish happens-after-completion. This
339 * could probably be compressed into one subclass of [AsyncFuture] to save an allocation, at the
340 * cost of the implementation's readability.
341 */
342 private class OuterFuture<T>(private val deferred: Deferred<T>): ListenableFuture<T> {
343 val innerFuture = DeferredListenableFuture(deferred)
344
345 // Adding the listener after initialization resolves partial construction hairpin problem.
346 //
347 // This invokeOnCompletion completes the innerFuture as `deferred` does. The innerFuture may
348 // have completed earlier if it got cancelled! See DeferredListenableFuture.
afterInitnull349 fun afterInit() {
350 deferred.invokeOnCompletion {
351 innerFuture.complete()
352 }
353 }
354
355 /**
356 * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
357 * [Job.isCancelled].
358 *
359 * When done, this Future is cancelled if its innerFuture is cancelled, or if its delegate
360 * [deferred] is cancelled. Cancellation of [innerFuture] collaborates with this class.
361 *
362 * See [DeferredListenableFuture.cancel].
363 */
isCancellednull364 override fun isCancelled(): Boolean {
365 // This expression ensures that isCancelled() will *never* return true when isDone() returns false.
366 // In the case that the deferred has completed with cancellation, completing `this`, its
367 // reaching the "cancelled" state with a cause of CancellationException is treated as the
368 // same thing as innerFuture getting cancelled. If the Job is in the "cancelling" state and
369 // this Future hasn't itself been successfully cancelled, the Future will return
370 // isCancelled() == false. This is the only discovered way to reconcile the two different
371 // cancellation contracts.
372 return isDone
373 && (innerFuture.isCancelled
374 || deferred.getCompletionExceptionOrNull() is kotlinx.coroutines.CancellationException)
375 }
376
377 /**
378 * Waits for [innerFuture] to complete by blocking, then uses the [deferred] returned by that
379 * Future to get the `T` value `this` [ListenableFuture] is pointing to. This establishes
380 * happens-after ordering for completion of the [Deferred] input to [OuterFuture].
381 *
382 * `innerFuture` _must be complete_ in order for the [isDone] and [isCancelled] happens-after
383 * contract of [Future] to be correctly followed. If this method were to directly use
384 * _`this.deferred`_ instead of blocking on its `innerFuture`, the [Deferred] that this
385 * [ListenableFuture] is created from might be in an incomplete state when used by `get()`.
386 */
getnull387 override fun get(): T {
388 return getInternal(innerFuture.get())
389 }
390
391 /** See [get()]. */
getnull392 override fun get(timeout: Long, unit: TimeUnit): T {
393 return getInternal(innerFuture.get(timeout, unit))
394 }
395
396 /** See [get()]. */
getInternalnull397 private fun getInternal(deferred: Deferred<T>): T {
398 if (deferred.isCancelled) {
399 val exception = deferred.getCompletionExceptionOrNull()
400 if (exception is kotlinx.coroutines.CancellationException) {
401 throw exception
402 } else {
403 throw ExecutionException(exception)
404 }
405 } else {
406 return deferred.getCompleted()
407 }
408 }
409
addListenernull410 override fun addListener(listener: Runnable, executor: Executor) {
411 innerFuture.addListener(listener, executor)
412 }
413
isDonenull414 override fun isDone(): Boolean {
415 return innerFuture.isDone
416 }
417
cancelnull418 override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
419 return innerFuture.cancel(mayInterruptIfRunning)
420 }
421 }
422
423 /**
424 * Holds a delegate deferred, and serves as a state machine for [Future] cancellation.
425 *
426 * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
427 * cancellation semantics. By using that type, the [OuterFuture] can delegate its semantics to
428 * _this_ `Future` `get()` the result in such a way that the `Deferred` is always complete when
429 * returned.
430 */
431 private class DeferredListenableFuture<T>(
432 private val deferred: Deferred<T>
433 ) : AbstractFuture<Deferred<T>>() {
434
completenull435 fun complete() {
436 set(deferred)
437 }
438
439 /**
440 * Tries to cancel the task. This is fundamentally racy.
441 *
442 * For any given call to `cancel()`, if [deferred] is already completed, the call will complete
443 * this Future with it, and fail to cancel. Otherwise, the
444 * call to `cancel()` will try to cancel this Future: if and only if cancellation of this
445 * succeeds, [deferred] will have its [Deferred.cancel] called.
446 *
447 * This arrangement means that [deferred] _might not successfully cancel_, if the race resolves
448 * in a particular way. [deferred] may also be in its "cancelling" state while this
449 * ListenableFuture is complete and cancelled.
450 *
451 * [OuterFuture] collaborates with this class to present a more cohesive picture and ensure
452 * that certain combinations of cancelled/cancelling states can't be observed.
453 */
cancelnull454 override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
455 return if (super.cancel(mayInterruptIfRunning)) {
456 deferred.cancel()
457 true
458 } else {
459 false
460 }
461 }
462 }
463