<lambda>null1 package kotlinx.coroutines.guava
2
3 import com.google.common.util.concurrent.*
4 import com.google.common.util.concurrent.internal.*
5 import kotlinx.coroutines.*
6 import java.util.concurrent.*
7 import java.util.concurrent.CancellationException
8 import kotlin.coroutines.*
9
10 /**
11 * Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
12 *
13 * The coroutine is started immediately. Passing [CoroutineStart.LAZY] to [start] throws
14 * [IllegalArgumentException], because Futures don't have a way to start lazily.
15 *
16 * When the created coroutine [isCompleted][Job.isCompleted], it will try to
17 * *synchronously* complete the returned Future with the same outcome. This will
18 * succeed, barring a race with external cancellation of returned [ListenableFuture].
19 *
20 * Cancellation is propagated bidirectionally.
21 *
22 * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be
23 * added/overlaid by passing [context].
24 *
25 * If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor]
26 * member, [Dispatchers.Default] is used.
27 *
28 * The parent job is inherited from this [CoroutineScope], and can be overridden by passing
29 * a [Job] in [context].
30 *
31 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
32 * facilities.
33 *
34 * Note that the error and cancellation semantics of [future] are _different_ than [async]'s.
35 * In contrast to [Deferred], [Future] doesn't have an intermediate `Cancelling` state. If
36 * the returned `Future` is successfully cancelled, and `block` throws afterward, the thrown
37 * error is dropped, and getting the `Future`'s value will throw a `CancellationException` with
38 * no cause. This is to match the specification and behavior of
39 * `java.util.concurrent.FutureTask`.
40 *
41 * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
42 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
43 * @param block the code to execute.
44 */
45 public fun <T> CoroutineScope.future(
46 context: CoroutineContext = EmptyCoroutineContext,
47 start: CoroutineStart = CoroutineStart.DEFAULT,
48 block: suspend CoroutineScope.() -> T
49 ): ListenableFuture<T> {
50 require(!start.isLazy) { "$start start is not supported" }
51 val newContext = newCoroutineContext(context)
52 val coroutine = ListenableFutureCoroutine<T>(newContext)
53 coroutine.start(start, coroutine, block)
54 return coroutine.future
55 }
56
57 /**
58 * Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
59 *
60 * Completion is non-atomic between the two promises.
61 *
62 * Cancellation is propagated bidirectionally.
63 *
64 * When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
65 * complete the returned `Deferred` with the same value or exception. This will succeed, barring a
66 * race with cancellation of the `Deferred`.
67 *
68 * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
69 * it will cancel the returned `Deferred`.
70 *
71 * When the returned `Deferred` is [cancelled][Deferred.cancel], it will try to propagate the
72 * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
73 * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
74 * will complete with a different outcome than `this` `ListenableFuture`.
75 */
asDeferrednull76 public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
77 /* This method creates very specific behaviour as it entangles the `Deferred` and
78 * `ListenableFuture`. This behaviour is the best discovered compromise between the possible
79 * states and interface contracts of a `Future` and the states of a `Deferred`. The specific
80 * behaviour is described here.
81 *
82 * When `this` `ListenableFuture` is successfully cancelled - meaning
83 * `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
84 * `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
85 * `Deferred` will always be put into its "cancelling" state and (barring uncooperative
86 * cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
87 * cancelled.
88 *
89 * When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
90 * called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
91 * cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
92 * terminal state.
93 *
94 * The returned `Deferred` may receive and suppress the `true` return value from
95 * `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
96 * This is unavoidable, so make sure no idempotent cancellation work is performed by a
97 * reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
98 * cancellation was from the `Deferred` representation of the task.
99 *
100 * This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
101 * semantics. See `Job` for a description of coroutine cancellation semantics.
102 */
103 // First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
104 // Exception by using the same instance the Future created.
105 if (this is InternalFutureFailureAccess) {
106 val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
107 if (t != null) {
108 return CompletableDeferred<T>().also {
109 it.completeExceptionally(t)
110 }
111 }
112 }
113
114 // Second, try the fast path for a completed Future. The Future is known to be done, so get()
115 // will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of
116 // getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to
117 // handle interruption.
118 if (isDone) {
119 return try {
120 CompletableDeferred(Uninterruptibles.getUninterruptibly(this))
121 } catch (e: CancellationException) {
122 CompletableDeferred<T>().also { it.cancel(e) }
123 } catch (e: ExecutionException) {
124 // ExecutionException is the only kind of exception that can be thrown from a gotten
125 // Future. Anything else showing up here indicates a very fundamental bug in a
126 // Future implementation.
127 CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) }
128 }
129 }
130
131 // Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
132 val deferred = CompletableDeferred<T>()
133 Futures.addCallback(this, object : FutureCallback<T> {
134 override fun onSuccess(result: T) {
135 runCatching { deferred.complete(result) }
136 .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
137 }
138
139 override fun onFailure(t: Throwable) {
140 runCatching { deferred.completeExceptionally(t) }
141 .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
142 }
143 }, MoreExecutors.directExecutor())
144
145 // ... And cancel the Future when the deferred completes. Since the return type of this method
146 // is Deferred, the only interaction point from the caller is to cancel the Deferred. If this
147 // completion handler runs before the Future is completed, the Deferred must have been
148 // cancelled and should propagate its cancellation. If it runs after the Future is completed,
149 // this is a no-op.
150 deferred.invokeOnCompletion {
151 cancel(false)
152 }
153 // Return hides the CompletableDeferred. This should prevent casting.
154 @OptIn(InternalForInheritanceCoroutinesApi::class)
155 return object : Deferred<T> by deferred {}
156 }
157
158 /**
159 * Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar.
160 *
161 * [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot
162 * fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught
163 * [Exception].
164 *
165 * If this !! throws [NullPointerException], a Future is breaking its interface contract and losing
166 * state - a serious fundamental bug.
167 */
ExecutionExceptionnull168 private fun ExecutionException.nonNullCause(): Throwable {
169 return this.cause!!
170 }
171
172 /**
173 * Returns a [ListenableFuture] that is completed or failed by `this` [Deferred].
174 *
175 * Completion is non-atomic between the two promises.
176 *
177 * When either promise successfully completes, it will attempt to synchronously complete its
178 * counterpart with the same value. This will succeed barring a race with cancellation.
179 *
180 * When either promise completes with an Exception, it will attempt to synchronously complete its
181 * counterpart with the same Exception. This will succeed barring a race with cancellation.
182 *
183 * Cancellation is propagated bidirectionally.
184 *
185 * When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true -
186 * [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel
187 * the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a
188 * non-cancelled terminal state.
189 *
190 * When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it
191 * completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously
192 * cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so
193 * the returned `Future` will always _eventually_ reach its cancelled state when either promise is
194 * successfully cancelled, for their different meanings of "successfully cancelled".
195 *
196 * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation
197 * semantics. See [Job] for a description of coroutine cancellation semantics. See
198 * [JobListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and
199 * corner cases of this method.
200 */
asListenableFuturenull201 public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> {
202 val listenableFuture = JobListenableFuture<T>(this)
203 // This invokeOnCompletion completes the JobListenableFuture with the same result as `this` Deferred.
204 // The JobListenableFuture may have completed earlier if it got cancelled! See JobListenableFuture.cancel().
205 invokeOnCompletion { throwable ->
206 if (throwable == null) {
207 listenableFuture.complete(getCompleted())
208 } else {
209 listenableFuture.completeExceptionallyOrCancel(throwable)
210 }
211 }
212 return listenableFuture
213 }
214
215 /**
216 * Awaits completion of `this` [ListenableFuture] without blocking a thread.
217 *
218 * This suspend function is cancellable.
219 *
220 * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
221 * stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
222 *
223 * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well.
224 * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or
225 * [kotlinx.coroutines.NonCancellable].
226 */
awaitnull227 public suspend fun <T> ListenableFuture<T>.await(): T {
228 try {
229 if (isDone) return Uninterruptibles.getUninterruptibly(this)
230 } catch (e: ExecutionException) {
231 // ExecutionException is the only kind of exception that can be thrown from a gotten
232 // Future, other than CancellationException. Cancellation is propagated upward so that
233 // the coroutine running this suspend function may process it.
234 // Any other Exception showing up here indicates a very fundamental bug in a
235 // Future implementation.
236 throw e.nonNullCause()
237 }
238
239 return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
240 addListener(
241 ToContinuation(this, cont),
242 MoreExecutors.directExecutor())
243 cont.invokeOnCancellation {
244 cancel(false)
245 }
246 }
247 }
248
249 /**
250 * Propagates the outcome of [futureToObserve] to [continuation] on completion.
251 *
252 * Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes
253 * and fails, the cause of the Future will be propagated without a wrapping
254 * [ExecutionException] when thrown.
255 */
256 private class ToContinuation<T>(
257 val futureToObserve: ListenableFuture<T>,
258 val continuation: CancellableContinuation<T>
259 ): Runnable {
runnull260 override fun run() {
261 if (futureToObserve.isCancelled) {
262 continuation.cancel()
263 } else {
264 try {
265 continuation.resume(Uninterruptibles.getUninterruptibly(futureToObserve))
266 } catch (e: ExecutionException) {
267 // ExecutionException is the only kind of exception that can be thrown from a gotten
268 // Future. Anything else showing up here indicates a very fundamental bug in a
269 // Future implementation.
270 continuation.resumeWithException(e.nonNullCause())
271 }
272 }
273 }
274 }
275
276 /**
277 * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to
278 * completion.
279 *
280 * If [future] is successfully cancelled, cancellation is propagated to `this` `Coroutine`.
281 * By documented contract, a [Future] has been cancelled if
282 * and only if its `isCancelled()` method returns true.
283 *
284 * Any error that occurs after successfully cancelling a [ListenableFuture] is lost.
285 * The contract of [Future] does not permit it to return an error after it is successfully cancelled.
286 * On the other hand, we can't report an unhandled exception to [CoroutineExceptionHandler],
287 * otherwise [Future.cancel] can lead to an app crash which arguably is a contract violation.
288 * In contrast to [Future] which can't change its outcome after a successful cancellation,
289 * cancelling a [Deferred] places that [Deferred] in the cancelling/cancelled states defined by [Job],
290 * which _can_ show the error.
291 *
292 * This may be counterintuitive, but it maintains the error and cancellation contracts of both
293 * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
294 * to the same running task.
295 */
296 private class ListenableFutureCoroutine<T>(
297 context: CoroutineContext
298 ) : AbstractCoroutine<T>(context, initParentJob = true, active = true) {
299
300 // JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
301 @JvmField
302 val future = JobListenableFuture<T>(this)
303
onCompletednull304 override fun onCompleted(value: T) {
305 future.complete(value)
306 }
307
onCancellednull308 override fun onCancelled(cause: Throwable, handled: Boolean) {
309 // Note: if future was cancelled in a race with a cancellation of this
310 // coroutine, and the future was successfully cancelled first, the cause of coroutine
311 // cancellation is dropped in this promise. A Future can only be completed once.
312 //
313 // This is consistent with FutureTask behaviour. A race between a Future.cancel() and
314 // a FutureTask.setException() for the same Future will similarly drop the
315 // cause of a failure-after-cancellation.
316 future.completeExceptionallyOrCancel(cause)
317 }
318 }
319
320 /**
321 * A [ListenableFuture] that delegates to an internal [SettableFuture], collaborating with it.
322 *
323 * This setup allows the returned [ListenableFuture] to maintain the following properties:
324 *
325 * - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone]
326 * and [isCancelled] methods
327 * - Cancellation propagation both to and from [Deferred]
328 * - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
329 * with different concrete implementations of [ListenableFuture]
330 * - Fully correct cancellation and listener happens-after obeying [Future] and
331 * [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
332 * The best way to be correct, especially given the fun corner cases from
333 * [AbstractFuture.setFuture], is to just use an [AbstractFuture].
334 * - To maintain sanity, this class implements [ListenableFuture] and uses an auxiliary [SettableFuture]
335 * around coroutine's result as a state engine to establish happens-after-completion. This
336 * could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the
337 * cost of the implementation's readability.
338 */
339 private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFuture<T> {
340 /**
341 * Serves as a state machine for [Future] cancellation.
342 *
343 * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
344 * cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to
345 * `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned.
346 *
347 * To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled].
348 */
349 private val auxFuture = SettableFuture.create<Any?>()
350
351 /**
352 * `true` if [auxFuture.get][ListenableFuture.get] throws [ExecutionException].
353 *
354 * Note: this is eventually consistent with the state of [auxFuture].
355 *
356 * Unfortunately, there's no API to figure out if [ListenableFuture] throws [ExecutionException]
357 * apart from calling [ListenableFuture.get] on it. To avoid unnecessary [ExecutionException] allocation
358 * we use this field as an optimization.
359 */
360 private var auxFutureIsFailed: Boolean = false
361
362 /**
363 * When the attached coroutine [isCompleted][Job.isCompleted] successfully
364 * its outcome should be passed to this method.
365 *
366 * This should succeed barring a race with external cancellation.
367 */
completenull368 fun complete(result: T): Boolean = auxFuture.set(result)
369
370 /**
371 * When the attached coroutine [isCompleted][Job.isCompleted] [exceptionally][Job.isCancelled]
372 * its outcome should be passed to this method.
373 *
374 * This method will map coroutine's exception into corresponding Future's exception.
375 *
376 * This should succeed barring a race with external cancellation.
377 */
378 // CancellationException is wrapped into `Cancelled` to preserve original cause and message.
379 // All the other exceptions are delegated to SettableFuture.setException.
380 fun completeExceptionallyOrCancel(t: Throwable): Boolean =
381 if (t is CancellationException) auxFuture.set(Cancelled(t))
382 else auxFuture.setException(t).also { if (it) auxFutureIsFailed = true }
383
384 /**
385 * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
386 * [Job.isCancelled].
387 *
388 * When done, this Future is cancelled if its [auxFuture] is cancelled, or if [auxFuture]
389 * contains [CancellationException].
390 *
391 * See [cancel].
392 */
isCancellednull393 override fun isCancelled(): Boolean {
394 // This expression ensures that isCancelled() will *never* return true when isDone() returns false.
395 // In the case that the deferred has completed with cancellation, completing `this`, its
396 // reaching the "cancelled" state with a cause of CancellationException is treated as the
397 // same thing as auxFuture getting cancelled. If the Job is in the "cancelling" state and
398 // this Future hasn't itself been successfully cancelled, the Future will return
399 // isCancelled() == false. This is the only discovered way to reconcile the two different
400 // cancellation contracts.
401 return auxFuture.isCancelled || isDone && !auxFutureIsFailed && try {
402 Uninterruptibles.getUninterruptibly(auxFuture) is Cancelled
403 } catch (e: CancellationException) {
404 // `auxFuture` got cancelled right after `auxFuture.isCancelled` returned false.
405 true
406 } catch (e: ExecutionException) {
407 // `auxFutureIsFailed` hasn't been updated yet.
408 auxFutureIsFailed = true
409 false
410 }
411 }
412
413 /**
414 * Waits for [auxFuture] to complete by blocking, then uses its `result`
415 * to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException].
416 * This establishes happens-after ordering for completion of the entangled coroutine.
417 *
418 * [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally.
419 * Otherwise it returns [Cancelled] that encapsulates outcome of the entangled coroutine.
420 *
421 * [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after
422 * contract of [Future] to be correctly followed.
423 */
getnull424 override fun get(): T {
425 return getInternal(auxFuture.get())
426 }
427
428 /** See [get()]. */
getnull429 override fun get(timeout: Long, unit: TimeUnit): T {
430 return getInternal(auxFuture.get(timeout, unit))
431 }
432
433 /** See [get()]. */
getInternalnull434 private fun getInternal(result: Any?): T = if (result is Cancelled) {
435 throw CancellationException().initCause(result.exception)
436 } else {
437 // We know that `auxFuture` can contain either `T` or `Cancelled`.
438 @Suppress("UNCHECKED_CAST")
439 result as T
440 }
441
addListenernull442 override fun addListener(listener: Runnable, executor: Executor) {
443 auxFuture.addListener(listener, executor)
444 }
445
isDonenull446 override fun isDone(): Boolean {
447 return auxFuture.isDone
448 }
449
450 /**
451 * Tries to cancel [jobToCancel] if `this` future was cancelled. This is fundamentally racy.
452 *
453 * The call to `cancel()` will try to cancel [auxFuture]: if and only if cancellation of [auxFuture]
454 * succeeds, [jobToCancel] will have its [Job.cancel] called.
455 *
456 * This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves
457 * in a particular way. [jobToCancel] may also be in its "cancelling" state while this
458 * ListenableFuture is complete and cancelled.
459 */
cancelnull460 override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
461 // TODO: call jobToCancel.cancel() _before_ running the listeners.
462 // `auxFuture.cancel()` will execute auxFuture's listeners. This delays cancellation of
463 // `jobToCancel` until after auxFuture's listeners have already run.
464 // Consider moving `jobToCancel.cancel()` into [AbstractFuture.afterDone] when the API is finalized.
465 return if (auxFuture.cancel(mayInterruptIfRunning)) {
466 jobToCancel.cancel()
467 true
468 } else {
469 false
470 }
471 }
472
<lambda>null473 override fun toString(): String = buildString {
474 append(super.toString())
475 append("[status=")
476 if (isDone) {
477 try {
478 when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) {
479 is Cancelled -> append("CANCELLED, cause=[${result.exception}]")
480 else -> append("SUCCESS, result=[$result]")
481 }
482 } catch (e: CancellationException) {
483 // `this` future was cancelled by `Future.cancel`. In this case there's no cause or message.
484 append("CANCELLED")
485 } catch (e: ExecutionException) {
486 append("FAILURE, cause=[${e.cause}]")
487 } catch (t: Throwable) {
488 // Violation of Future's contract, should never happen.
489 append("UNKNOWN, cause=[${t.javaClass} thrown from get()]")
490 }
491 } else {
492 append("PENDING, delegate=[$auxFuture]")
493 }
494 append(']')
495 }
496 }
497
498 /**
499 * A wrapper for `Coroutine`'s [CancellationException].
500 *
501 * If the coroutine is _cancelled normally_, we want to show the reason of cancellation to the user. Unfortunately,
502 * [SettableFuture] can't store the reason of cancellation. To mitigate this, we wrap cancellation exception into this
503 * class and pass it into [SettableFuture.complete]. See implementation of [JobListenableFuture].
504 */
505 private class Cancelled(@JvmField val exception: CancellationException)
506