1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.future
6
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.CancellationException
9 import java.util.concurrent.*
10 import java.util.function.*
11 import kotlin.coroutines.*
12
13 /**
14 * Starts a new coroutine and returns its result as an implementation of [CompletableFuture].
15 * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
16 *
17 * The coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with the [context] argument.
18 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
19 * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
20 * with corresponding [context] element.
21 *
22 * By default, the coroutine is immediately scheduled for execution.
23 * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
24 * A value of [CoroutineStart.LAZY] is not supported
25 * (since `CompletableFuture` framework does not provide the corresponding capability) and
26 * produces [IllegalArgumentException].
27 *
28 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
29 *
30 * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
31 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
32 * @param block the coroutine code.
33 */
34 public fun <T> CoroutineScope.future(
35 context: CoroutineContext = EmptyCoroutineContext,
36 start: CoroutineStart = CoroutineStart.DEFAULT,
37 block: suspend CoroutineScope.() -> T
38 ) : CompletableFuture<T> {
39 require(!start.isLazy) { "$start start is not supported" }
40 val newContext = this.newCoroutineContext(context)
41 val future = CompletableFuture<T>()
42 val coroutine = CompletableFutureCoroutine(newContext, future)
43 future.whenComplete(coroutine) // Cancel coroutine if future was completed externally
44 coroutine.start(start, coroutine, block)
45 return future
46 }
47
48 private class CompletableFutureCoroutine<T>(
49 context: CoroutineContext,
50 private val future: CompletableFuture<T>
51 ) : AbstractCoroutine<T>(context, initParentJob = true, active = true), BiConsumer<T?, Throwable?> {
acceptnull52 override fun accept(value: T?, exception: Throwable?) {
53 cancel()
54 }
55
onCompletednull56 override fun onCompleted(value: T) {
57 future.complete(value)
58 }
59
onCancellednull60 override fun onCancelled(cause: Throwable, handled: Boolean) {
61 if (!future.completeExceptionally(cause) && !handled) {
62 // prevents loss of exception that was not handled by parent & could not be set to CompletableFuture
63 handleCoroutineException(context, cause)
64 }
65 }
66 }
67
68 /**
69 * Converts this deferred value to the instance of [CompletableFuture].
70 * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
71 */
asCompletableFuturenull72 public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
73 val future = CompletableFuture<T>()
74 setupCancellation(future)
75 invokeOnCompletion {
76 try {
77 future.complete(getCompleted())
78 } catch (t: Throwable) {
79 future.completeExceptionally(t)
80 }
81 }
82 return future
83 }
84
85 /**
86 * Converts this job to the instance of [CompletableFuture].
87 * The job is cancelled when the resulting future is cancelled or otherwise completed.
88 */
asCompletableFuturenull89 public fun Job.asCompletableFuture(): CompletableFuture<Unit> {
90 val future = CompletableFuture<Unit>()
91 setupCancellation(future)
92 invokeOnCompletion { cause ->
93 if (cause === null) future.complete(Unit)
94 else future.completeExceptionally(cause)
95 }
96 return future
97 }
98
setupCancellationnull99 private fun Job.setupCancellation(future: CompletableFuture<*>) {
100 future.whenComplete { _, exception ->
101 cancel(exception?.let {
102 it as? CancellationException ?: CancellationException("CompletableFuture was completed exceptionally", it)
103 })
104 }
105 }
106
107 /**
108 * Converts this [CompletionStage] to an instance of [Deferred].
109 *
110 * The [CompletableFuture] that corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture])
111 * is cancelled when the resulting deferred is cancelled.
112 */
113 @Suppress("DeferredIsResult")
asDeferrednull114 public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
115 val future = toCompletableFuture() // retrieve the future
116 // Fast path if already completed
117 if (future.isDone) {
118 return try {
119 @Suppress("UNCHECKED_CAST")
120 CompletableDeferred(future.get() as T)
121 } catch (e: Throwable) {
122 // unwrap original cause from ExecutionException
123 val original = (e as? ExecutionException)?.cause ?: e
124 CompletableDeferred<T>().also { it.completeExceptionally(original) }
125 }
126 }
127 val result = CompletableDeferred<T>()
128 whenComplete { value, exception ->
129 try {
130 if (exception == null) {
131 // the future has completed normally
132 result.complete(value)
133 } else {
134 // the future has completed with an exception, unwrap it consistently with fast path
135 // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
136 result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
137 }
138 } catch (e: Throwable) {
139 // We come here iff the internals of Deferred threw an exception during its completion
140 handleCoroutineException(EmptyCoroutineContext, e)
141 }
142 }
143 result.cancelFutureOnCompletion(future)
144 return result
145 }
146
147 /**
148 * Awaits for completion of [CompletionStage] without blocking a thread.
149 *
150 * This suspending function is cancellable.
151 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
152 * stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
153 *
154 * This method is intended to be used with one-shot futures, so on coroutine cancellation the [CompletableFuture] that
155 * corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture])
156 * is cancelled. If cancelling the given stage is undesired, `stage.asDeferred().await()` should be used instead.
157 */
awaitnull158 public suspend fun <T> CompletionStage<T>.await(): T {
159 val future = toCompletableFuture() // retrieve the future
160 // fast path when CompletableFuture is already done (does not suspend)
161 if (future.isDone) {
162 try {
163 @Suppress("UNCHECKED_CAST", "BlockingMethodInNonBlockingContext")
164 return future.get() as T
165 } catch (e: ExecutionException) {
166 throw e.cause ?: e // unwrap original cause from ExecutionException
167 }
168 }
169 // slow path -- suspend
170 return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
171 val consumer = ContinuationConsumer(cont)
172 whenComplete(consumer)
173 cont.invokeOnCancellation {
174 future.cancel(false)
175 consumer.cont = null // shall clear reference to continuation to aid GC
176 }
177 }
178 }
179
180 private class ContinuationConsumer<T>(
181 @Volatile @JvmField var cont: Continuation<T>?
182 ) : BiConsumer<T?, Throwable?> {
183 @Suppress("UNCHECKED_CAST")
acceptnull184 override fun accept(result: T?, exception: Throwable?) {
185 val cont = this.cont ?: return // atomically read current value unless null
186 if (exception == null) {
187 // the future has completed normally
188 cont.resume(result as T)
189 } else {
190 // the future has completed with an exception, unwrap it to provide consistent view of .await() result and to propagate only original exception
191 cont.resumeWithException((exception as? CompletionException)?.cause ?: exception)
192 }
193 }
194 }
195