1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.guava
6
7 import com.google.common.util.concurrent.*
8 import com.google.common.util.concurrent.internal.*
9 import kotlinx.coroutines.*
10 import java.util.concurrent.*
11 import java.util.concurrent.CancellationException
12 import kotlin.coroutines.*
13
14 /**
15 * Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
16 *
17 * The coroutine is started immediately. Passing [CoroutineStart.LAZY] to [start] throws
18 * [IllegalArgumentException], because Futures don't have a way to start lazily.
19 *
20 * When the created coroutine [isCompleted][Job.isCompleted], it will try to
21 * *synchronously* complete the returned Future with the same outcome. This will
22 * succeed, barring a race with external cancellation of returned [ListenableFuture].
23 *
24 * Cancellation is propagated bidirectionally.
25 *
26 * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be
27 * added/overlaid by passing [context].
28 *
29 * If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor]
30 * member, [Dispatchers.Default] is used.
31 *
32 * The parent job is inherited from this [CoroutineScope], and can be overridden by passing
33 * a [Job] in [context].
34 *
35 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
36 * facilities.
37 *
38 * Note that the error and cancellation semantics of [future] are _different_ than [async]'s.
39 * In contrast to [Deferred], [Future] doesn't have an intermediate `Cancelling` state. If
40 * the returned `Future` is successfully cancelled, and `block` throws afterward, the thrown
41 * error is dropped, and getting the `Future`'s value will throw a `CancellationException` with
42 * no cause. This is to match the specification and behavior of
43 * `java.util.concurrent.FutureTask`.
44 *
45 * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
46 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
47 * @param block the code to execute.
48 */
49 public fun <T> CoroutineScope.future(
50 context: CoroutineContext = EmptyCoroutineContext,
51 start: CoroutineStart = CoroutineStart.DEFAULT,
52 block: suspend CoroutineScope.() -> T
53 ): ListenableFuture<T> {
54 require(!start.isLazy) { "$start start is not supported" }
55 val newContext = newCoroutineContext(context)
56 val coroutine = ListenableFutureCoroutine<T>(newContext)
57 coroutine.start(start, coroutine, block)
58 return coroutine.future
59 }
60
61 /**
62 * Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
63 *
64 * Completion is non-atomic between the two promises.
65 *
66 * Cancellation is propagated bidirectionally.
67 *
68 * When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
69 * complete the returned `Deferred` with the same value or exception. This will succeed, barring a
70 * race with cancellation of the `Deferred`.
71 *
72 * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
73 * it will cancel the returned `Deferred`.
74 *
75 * When the returned `Deferred` is [cancelled][Deferred.cancel], it will try to propagate the
76 * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
77 * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
78 * will complete with a different outcome than `this` `ListenableFuture`.
79 */
asDeferrednull80 public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
81 /* This method creates very specific behaviour as it entangles the `Deferred` and
82 * `ListenableFuture`. This behaviour is the best discovered compromise between the possible
83 * states and interface contracts of a `Future` and the states of a `Deferred`. The specific
84 * behaviour is described here.
85 *
86 * When `this` `ListenableFuture` is successfully cancelled - meaning
87 * `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
88 * `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
89 * `Deferred` will always be put into its "cancelling" state and (barring uncooperative
90 * cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
91 * cancelled.
92 *
93 * When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
94 * called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
95 * cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
96 * terminal state.
97 *
98 * The returned `Deferred` may receive and suppress the `true` return value from
99 * `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
100 * This is unavoidable, so make sure no idempotent cancellation work is performed by a
101 * reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
102 * cancellation was from the `Deferred` representation of the task.
103 *
104 * This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
105 * semantics. See `Job` for a description of coroutine cancellation semantics.
106 */
107 // First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
108 // Exception by using the same instance the Future created.
109 if (this is InternalFutureFailureAccess) {
110 val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
111 if (t != null) {
112 return CompletableDeferred<T>().also {
113 it.completeExceptionally(t)
114 }
115 }
116 }
117
118 // Second, try the fast path for a completed Future. The Future is known to be done, so get()
119 // will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of
120 // getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to
121 // handle interruption.
122 if (isDone) {
123 return try {
124 CompletableDeferred(Uninterruptibles.getUninterruptibly(this))
125 } catch (e: CancellationException) {
126 CompletableDeferred<T>().also { it.cancel(e) }
127 } catch (e: ExecutionException) {
128 // ExecutionException is the only kind of exception that can be thrown from a gotten
129 // Future. Anything else showing up here indicates a very fundamental bug in a
130 // Future implementation.
131 CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) }
132 }
133 }
134
135 // Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
136 val deferred = CompletableDeferred<T>()
137 Futures.addCallback(this, object : FutureCallback<T> {
138 override fun onSuccess(result: T) {
139 runCatching { deferred.complete(result) }
140 .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
141 }
142
143 override fun onFailure(t: Throwable) {
144 runCatching { deferred.completeExceptionally(t) }
145 .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
146 }
147 }, MoreExecutors.directExecutor())
148
149 // ... And cancel the Future when the deferred completes. Since the return type of this method
150 // is Deferred, the only interaction point from the caller is to cancel the Deferred. If this
151 // completion handler runs before the Future is completed, the Deferred must have been
152 // cancelled and should propagate its cancellation. If it runs after the Future is completed,
153 // this is a no-op.
154 deferred.invokeOnCompletion {
155 cancel(false)
156 }
157 // Return hides the CompletableDeferred. This should prevent casting.
158 return object : Deferred<T> by deferred {}
159 }
160
161 /**
162 * Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar.
163 *
164 * [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot
165 * fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught
166 * [Exception].
167 *
168 * If this !! throws [NullPointerException], a Future is breaking its interface contract and losing
169 * state - a serious fundamental bug.
170 */
ExecutionExceptionnull171 private fun ExecutionException.nonNullCause(): Throwable {
172 return this.cause!!
173 }
174
175 /**
176 * Returns a [ListenableFuture] that is completed or failed by `this` [Deferred].
177 *
178 * Completion is non-atomic between the two promises.
179 *
180 * When either promise successfully completes, it will attempt to synchronously complete its
181 * counterpart with the same value. This will succeed barring a race with cancellation.
182 *
183 * When either promise completes with an Exception, it will attempt to synchronously complete its
184 * counterpart with the same Exception. This will succeed barring a race with cancellation.
185 *
186 * Cancellation is propagated bidirectionally.
187 *
188 * When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true -
189 * [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel
190 * the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a
191 * non-cancelled terminal state.
192 *
193 * When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it
194 * completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously
195 * cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so
196 * the returned `Future` will always _eventually_ reach its cancelled state when either promise is
197 * successfully cancelled, for their different meanings of "successfully cancelled".
198 *
199 * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation
200 * semantics. See [Job] for a description of coroutine cancellation semantics. See
201 * [JobListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and
202 * corner cases of this method.
203 */
asListenableFuturenull204 public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> {
205 val listenableFuture = JobListenableFuture<T>(this)
206 // This invokeOnCompletion completes the JobListenableFuture with the same result as `this` Deferred.
207 // The JobListenableFuture may have completed earlier if it got cancelled! See JobListenableFuture.cancel().
208 invokeOnCompletion { throwable ->
209 if (throwable == null) {
210 listenableFuture.complete(getCompleted())
211 } else {
212 listenableFuture.completeExceptionallyOrCancel(throwable)
213 }
214 }
215 return listenableFuture
216 }
217
218 /**
219 * Awaits completion of `this` [ListenableFuture] without blocking a thread.
220 *
221 * This suspend function is cancellable.
222 *
223 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
224 * stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
225 *
226 * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well.
227 * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or
228 * [kotlinx.coroutines.NonCancellable].
229 */
awaitnull230 public suspend fun <T> ListenableFuture<T>.await(): T {
231 try {
232 if (isDone) return Uninterruptibles.getUninterruptibly(this)
233 } catch (e: ExecutionException) {
234 // ExecutionException is the only kind of exception that can be thrown from a gotten
235 // Future, other than CancellationException. Cancellation is propagated upward so that
236 // the coroutine running this suspend function may process it.
237 // Any other Exception showing up here indicates a very fundamental bug in a
238 // Future implementation.
239 throw e.nonNullCause()
240 }
241
242 return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
243 addListener(
244 ToContinuation(this, cont),
245 MoreExecutors.directExecutor())
246 cont.invokeOnCancellation {
247 cancel(false)
248 }
249 }
250 }
251
252 /**
253 * Propagates the outcome of [futureToObserve] to [continuation] on completion.
254 *
255 * Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes
256 * and fails, the cause of the Future will be propagated without a wrapping
257 * [ExecutionException] when thrown.
258 */
259 private class ToContinuation<T>(
260 val futureToObserve: ListenableFuture<T>,
261 val continuation: CancellableContinuation<T>
262 ): Runnable {
runnull263 override fun run() {
264 if (futureToObserve.isCancelled) {
265 continuation.cancel()
266 } else {
267 try {
268 continuation.resume(Uninterruptibles.getUninterruptibly(futureToObserve))
269 } catch (e: ExecutionException) {
270 // ExecutionException is the only kind of exception that can be thrown from a gotten
271 // Future. Anything else showing up here indicates a very fundamental bug in a
272 // Future implementation.
273 continuation.resumeWithException(e.nonNullCause())
274 }
275 }
276 }
277 }
278
279 /**
280 * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to
281 * completion.
282 *
283 * If [future] is successfully cancelled, cancellation is propagated to `this` `Coroutine`.
284 * By documented contract, a [Future] has been cancelled if
285 * and only if its `isCancelled()` method returns true.
286 *
287 * Any error that occurs after successfully cancelling a [ListenableFuture] is lost.
288 * The contract of [Future] does not permit it to return an error after it is successfully cancelled.
289 * On the other hand, we can't report an unhandled exception to [CoroutineExceptionHandler],
290 * otherwise [Future.cancel] can lead to an app crash which arguably is a contract violation.
291 * In contrast to [Future] which can't change its outcome after a successful cancellation,
292 * cancelling a [Deferred] places that [Deferred] in the cancelling/cancelled states defined by [Job],
293 * which _can_ show the error.
294 *
295 * This may be counterintuitive, but it maintains the error and cancellation contracts of both
296 * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
297 * to the same running task.
298 */
299 private class ListenableFutureCoroutine<T>(
300 context: CoroutineContext
301 ) : AbstractCoroutine<T>(context, initParentJob = true, active = true) {
302
303 // JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
304 @JvmField
305 val future = JobListenableFuture<T>(this)
306
onCompletednull307 override fun onCompleted(value: T) {
308 future.complete(value)
309 }
310
onCancellednull311 override fun onCancelled(cause: Throwable, handled: Boolean) {
312 // Note: if future was cancelled in a race with a cancellation of this
313 // coroutine, and the future was successfully cancelled first, the cause of coroutine
314 // cancellation is dropped in this promise. A Future can only be completed once.
315 //
316 // This is consistent with FutureTask behaviour. A race between a Future.cancel() and
317 // a FutureTask.setException() for the same Future will similarly drop the
318 // cause of a failure-after-cancellation.
319 future.completeExceptionallyOrCancel(cause)
320 }
321 }
322
323 /**
324 * A [ListenableFuture] that delegates to an internal [SettableFuture], collaborating with it.
325 *
326 * This setup allows the returned [ListenableFuture] to maintain the following properties:
327 *
328 * - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone]
329 * and [isCancelled] methods
330 * - Cancellation propagation both to and from [Deferred]
331 * - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
332 * with different concrete implementations of [ListenableFuture]
333 * - Fully correct cancellation and listener happens-after obeying [Future] and
334 * [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
335 * The best way to be correct, especially given the fun corner cases from
336 * [AbstractFuture.setFuture], is to just use an [AbstractFuture].
337 * - To maintain sanity, this class implements [ListenableFuture] and uses an auxiliary [SettableFuture]
338 * around coroutine's result as a state engine to establish happens-after-completion. This
339 * could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the
340 * cost of the implementation's readability.
341 */
342 private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFuture<T> {
343 /**
344 * Serves as a state machine for [Future] cancellation.
345 *
346 * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
347 * cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to
348 * `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned.
349 *
350 * To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled].
351 */
352 private val auxFuture = SettableFuture.create<Any?>()
353
354 /**
355 * `true` if [auxFuture.get][ListenableFuture.get] throws [ExecutionException].
356 *
357 * Note: this is eventually consistent with the state of [auxFuture].
358 *
359 * Unfortunately, there's no API to figure out if [ListenableFuture] throws [ExecutionException]
360 * apart from calling [ListenableFuture.get] on it. To avoid unnecessary [ExecutionException] allocation
361 * we use this field as an optimization.
362 */
363 private var auxFutureIsFailed: Boolean = false
364
365 /**
366 * When the attached coroutine [isCompleted][Job.isCompleted] successfully
367 * its outcome should be passed to this method.
368 *
369 * This should succeed barring a race with external cancellation.
370 */
completenull371 fun complete(result: T): Boolean = auxFuture.set(result)
372
373 /**
374 * When the attached coroutine [isCompleted][Job.isCompleted] [exceptionally][Job.isCancelled]
375 * its outcome should be passed to this method.
376 *
377 * This method will map coroutine's exception into corresponding Future's exception.
378 *
379 * This should succeed barring a race with external cancellation.
380 */
381 // CancellationException is wrapped into `Cancelled` to preserve original cause and message.
382 // All the other exceptions are delegated to SettableFuture.setException.
383 fun completeExceptionallyOrCancel(t: Throwable): Boolean =
384 if (t is CancellationException) auxFuture.set(Cancelled(t))
385 else auxFuture.setException(t).also { if (it) auxFutureIsFailed = true }
386
387 /**
388 * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
389 * [Job.isCancelled].
390 *
391 * When done, this Future is cancelled if its [auxFuture] is cancelled, or if [auxFuture]
392 * contains [CancellationException].
393 *
394 * See [cancel].
395 */
isCancellednull396 override fun isCancelled(): Boolean {
397 // This expression ensures that isCancelled() will *never* return true when isDone() returns false.
398 // In the case that the deferred has completed with cancellation, completing `this`, its
399 // reaching the "cancelled" state with a cause of CancellationException is treated as the
400 // same thing as auxFuture getting cancelled. If the Job is in the "cancelling" state and
401 // this Future hasn't itself been successfully cancelled, the Future will return
402 // isCancelled() == false. This is the only discovered way to reconcile the two different
403 // cancellation contracts.
404 return auxFuture.isCancelled || isDone && !auxFutureIsFailed && try {
405 Uninterruptibles.getUninterruptibly(auxFuture) is Cancelled
406 } catch (e: CancellationException) {
407 // `auxFuture` got cancelled right after `auxFuture.isCancelled` returned false.
408 true
409 } catch (e: ExecutionException) {
410 // `auxFutureIsFailed` hasn't been updated yet.
411 auxFutureIsFailed = true
412 false
413 }
414 }
415
416 /**
417 * Waits for [auxFuture] to complete by blocking, then uses its `result`
418 * to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException].
419 * This establishes happens-after ordering for completion of the entangled coroutine.
420 *
421 * [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally.
422 * Otherwise it returns [Cancelled] that encapsulates outcome of the entangled coroutine.
423 *
424 * [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after
425 * contract of [Future] to be correctly followed.
426 */
getnull427 override fun get(): T {
428 return getInternal(auxFuture.get())
429 }
430
431 /** See [get()]. */
getnull432 override fun get(timeout: Long, unit: TimeUnit): T {
433 return getInternal(auxFuture.get(timeout, unit))
434 }
435
436 /** See [get()]. */
getInternalnull437 private fun getInternal(result: Any?): T = if (result is Cancelled) {
438 throw CancellationException().initCause(result.exception)
439 } else {
440 // We know that `auxFuture` can contain either `T` or `Cancelled`.
441 @Suppress("UNCHECKED_CAST")
442 result as T
443 }
444
addListenernull445 override fun addListener(listener: Runnable, executor: Executor) {
446 auxFuture.addListener(listener, executor)
447 }
448
isDonenull449 override fun isDone(): Boolean {
450 return auxFuture.isDone
451 }
452
453 /**
454 * Tries to cancel [jobToCancel] if `this` future was cancelled. This is fundamentally racy.
455 *
456 * The call to `cancel()` will try to cancel [auxFuture]: if and only if cancellation of [auxFuture]
457 * succeeds, [jobToCancel] will have its [Job.cancel] called.
458 *
459 * This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves
460 * in a particular way. [jobToCancel] may also be in its "cancelling" state while this
461 * ListenableFuture is complete and cancelled.
462 */
cancelnull463 override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
464 // TODO: call jobToCancel.cancel() _before_ running the listeners.
465 // `auxFuture.cancel()` will execute auxFuture's listeners. This delays cancellation of
466 // `jobToCancel` until after auxFuture's listeners have already run.
467 // Consider moving `jobToCancel.cancel()` into [AbstractFuture.afterDone] when the API is finalized.
468 return if (auxFuture.cancel(mayInterruptIfRunning)) {
469 jobToCancel.cancel()
470 true
471 } else {
472 false
473 }
474 }
475
<lambda>null476 override fun toString(): String = buildString {
477 append(super.toString())
478 append("[status=")
479 if (isDone) {
480 try {
481 when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) {
482 is Cancelled -> append("CANCELLED, cause=[${result.exception}]")
483 else -> append("SUCCESS, result=[$result]")
484 }
485 } catch (e: CancellationException) {
486 // `this` future was cancelled by `Future.cancel`. In this case there's no cause or message.
487 append("CANCELLED")
488 } catch (e: ExecutionException) {
489 append("FAILURE, cause=[${e.cause}]")
490 } catch (t: Throwable) {
491 // Violation of Future's contract, should never happen.
492 append("UNKNOWN, cause=[${t.javaClass} thrown from get()]")
493 }
494 } else {
495 append("PENDING, delegate=[$auxFuture]")
496 }
497 append(']')
498 }
499 }
500
501 /**
502 * A wrapper for `Coroutine`'s [CancellationException].
503 *
504 * If the coroutine is _cancelled normally_, we want to show the reason of cancellation to the user. Unfortunately,
505 * [SettableFuture] can't store the reason of cancellation. To mitigate this, we wrap cancellation exception into this
506 * class and pass it into [SettableFuture.complete]. See implementation of [JobListenableFuture].
507 */
508 private class Cancelled(@JvmField val exception: CancellationException)
509