1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("BuildersKt")
7 @file:OptIn(ExperimentalContracts::class)
8
9 package kotlinx.coroutines
10
11 import kotlinx.atomicfu.*
12 import kotlinx.coroutines.internal.*
13 import kotlinx.coroutines.intrinsics.*
14 import kotlinx.coroutines.selects.*
15 import kotlin.contracts.*
16 import kotlin.coroutines.*
17 import kotlin.coroutines.intrinsics.*
18 import kotlin.jvm.*
19
20 // --------------- launch ---------------
21
22 /**
23 * Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job].
24 * The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
25 *
26 * The coroutine context is inherited from a [CoroutineScope]. Additional context elements can be specified with [context] argument.
27 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
28 * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
29 * with a corresponding [context] element.
30 *
31 * By default, the coroutine is immediately scheduled for execution.
32 * Other start options can be specified via `start` parameter. See [CoroutineStart] for details.
33 * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
34 * the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function
35 * and will be started implicitly on the first invocation of [join][Job.join].
36 *
37 * Uncaught exceptions in this coroutine cancel the parent job in the context by default
38 * (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with
39 * the context of another coroutine, then any uncaught exception leads to the cancellation of the parent coroutine.
40 *
41 * See [newCoroutineContext] for a description of debugging facilities that are available for a newly created coroutine.
42 *
43 * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
44 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
45 * @param block the coroutine code which will be invoked in the context of the provided scope.
46 **/
47 public fun CoroutineScope.launch(
48 context: CoroutineContext = EmptyCoroutineContext,
49 start: CoroutineStart = CoroutineStart.DEFAULT,
50 block: suspend CoroutineScope.() -> Unit
51 ): Job {
52 val newContext = newCoroutineContext(context)
53 val coroutine = if (start.isLazy)
54 LazyStandaloneCoroutine(newContext, block) else
55 StandaloneCoroutine(newContext, active = true)
56 coroutine.start(start, coroutine, block)
57 return coroutine
58 }
59
60 // --------------- async ---------------
61
62 /**
63 * Creates a coroutine and returns its future result as an implementation of [Deferred].
64 * The running coroutine is cancelled when the resulting deferred is [cancelled][Job.cancel].
65 * The resulting coroutine has a key difference compared with similar primitives in other languages
66 * and frameworks: it cancels the parent job (or outer scope) on failure to enforce *structured concurrency* paradigm.
67 * To change that behaviour, supervising parent ([SupervisorJob] or [supervisorScope]) can be used.
68 *
69 * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
70 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
71 * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
72 * with corresponding [context] element.
73 *
74 * By default, the coroutine is immediately scheduled for execution.
75 * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
76 * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
77 * the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start]
78 * function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll].
79 *
80 * @param block the coroutine code.
81 */
asyncnull82 public fun <T> CoroutineScope.async(
83 context: CoroutineContext = EmptyCoroutineContext,
84 start: CoroutineStart = CoroutineStart.DEFAULT,
85 block: suspend CoroutineScope.() -> T
86 ): Deferred<T> {
87 val newContext = newCoroutineContext(context)
88 val coroutine = if (start.isLazy)
89 LazyDeferredCoroutine(newContext, block) else
90 DeferredCoroutine<T>(newContext, active = true)
91 coroutine.start(start, coroutine, block)
92 return coroutine
93 }
94
95 @Suppress("UNCHECKED_CAST")
96 private open class DeferredCoroutine<T>(
97 parentContext: CoroutineContext,
98 active: Boolean
99 ) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T> {
getCompletednull100 override fun getCompleted(): T = getCompletedInternal() as T
101 override suspend fun await(): T = awaitInternal() as T
102 override val onAwait: SelectClause1<T> get() = onAwaitInternal as SelectClause1<T>
103 }
104
105 private class LazyDeferredCoroutine<T>(
106 parentContext: CoroutineContext,
107 block: suspend CoroutineScope.() -> T
108 ) : DeferredCoroutine<T>(parentContext, active = false) {
109 private val continuation = block.createCoroutineUnintercepted(this, this)
110
111 override fun onStart() {
112 continuation.startCoroutineCancellable(this)
113 }
114 }
115
116 // --------------- withContext ---------------
117
118 /**
119 * Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
120 * the result.
121 *
122 * The resulting context for the [block] is derived by merging the current [coroutineContext] with the
123 * specified [context] using `coroutineContext + context` (see [CoroutineContext.plus]).
124 * This suspending function is cancellable. It immediately checks for cancellation of
125 * the resulting context and throws [CancellationException] if it is not [active][CoroutineContext.isActive].
126 *
127 * Calls to [withContext] whose [context] argument provides a [CoroutineDispatcher] that is
128 * different from the current one, by necessity, perform additional dispatches: the [block]
129 * can not be executed immediately and needs to be dispatched for execution on
130 * the passed [CoroutineDispatcher], and then when the [block] completes, the execution
131 * has to shift back to the original dispatcher.
132 *
133 * Note that the result of `withContext` invocation is dispatched into the original context in a cancellable way
134 * with a **prompt cancellation guarantee**, which means that if the original [coroutineContext]
135 * in which `withContext` was invoked is cancelled by the time its dispatcher starts to execute the code,
136 * it discards the result of `withContext` and throws [CancellationException].
137 *
138 * The cancellation behaviour described above is enabled if and only if the dispatcher is being changed.
139 * For example, when using `withContext(NonCancellable) { ... }` there is no change in dispatcher and
140 * this call will not be cancelled neither on entry to the block inside `withContext` nor on exit from it.
141 */
withContextnull142 public suspend fun <T> withContext(
143 context: CoroutineContext,
144 block: suspend CoroutineScope.() -> T
145 ): T {
146 contract {
147 callsInPlace(block, InvocationKind.EXACTLY_ONCE)
148 }
149 return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
150 // compute new context
151 val oldContext = uCont.context
152 // Copy CopyableThreadContextElement if necessary
153 val newContext = oldContext.newCoroutineContext(context)
154 // always check for cancellation of new context
155 newContext.ensureActive()
156 // FAST PATH #1 -- new context is the same as the old one
157 if (newContext === oldContext) {
158 val coroutine = ScopeCoroutine(newContext, uCont)
159 return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
160 }
161 // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
162 // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
163 if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
164 val coroutine = UndispatchedCoroutine(newContext, uCont)
165 // There are changes in the context, so this thread needs to be updated
166 withCoroutineContext(coroutine.context, null) {
167 return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
168 }
169 }
170 // SLOW PATH -- use new dispatcher
171 val coroutine = DispatchedCoroutine(newContext, uCont)
172 block.startCoroutineCancellable(coroutine, coroutine)
173 coroutine.getResult()
174 }
175 }
176
177 /**
178 * Calls the specified suspending block with the given [CoroutineDispatcher], suspends until it
179 * completes, and returns the result.
180 *
181 * This inline function calls [withContext].
182 */
invokenull183 public suspend inline operator fun <T> CoroutineDispatcher.invoke(
184 noinline block: suspend CoroutineScope.() -> T
185 ): T = withContext(this, block)
186
187 // --------------- implementation ---------------
188
189 private open class StandaloneCoroutine(
190 parentContext: CoroutineContext,
191 active: Boolean
192 ) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
193 override fun handleJobException(exception: Throwable): Boolean {
194 handleCoroutineException(context, exception)
195 return true
196 }
197 }
198
199 private class LazyStandaloneCoroutine(
200 parentContext: CoroutineContext,
201 block: suspend CoroutineScope.() -> Unit
202 ) : StandaloneCoroutine(parentContext, active = false) {
203 private val continuation = block.createCoroutineUnintercepted(this, this)
204
onStartnull205 override fun onStart() {
206 continuation.startCoroutineCancellable(this)
207 }
208 }
209
210 // Used by withContext when context changes, but dispatcher stays the same
211 internal expect class UndispatchedCoroutine<in T>(
212 context: CoroutineContext,
213 uCont: Continuation<T>
214 ) : ScopeCoroutine<T>
215
216 private const val UNDECIDED = 0
217 private const val SUSPENDED = 1
218 private const val RESUMED = 2
219
220 // Used by withContext when context dispatcher changes
221 @PublishedApi
222 internal class DispatchedCoroutine<in T> internal constructor(
223 context: CoroutineContext,
224 uCont: Continuation<T>
225 ) : ScopeCoroutine<T>(context, uCont) {
226 // this is copy-and-paste of a decision state machine inside AbstractionContinuation
227 // todo: we may some-how abstract it via inline class
228 // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
229 @JvmField
230 public val _decision = atomic(UNDECIDED)
231
trySuspendnull232 private fun trySuspend(): Boolean {
233 _decision.loop { decision ->
234 when (decision) {
235 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
236 RESUMED -> return false
237 else -> error("Already suspended")
238 }
239 }
240 }
241
tryResumenull242 private fun tryResume(): Boolean {
243 _decision.loop { decision ->
244 when (decision) {
245 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
246 SUSPENDED -> return false
247 else -> error("Already resumed")
248 }
249 }
250 }
251
afterCompletionnull252 override fun afterCompletion(state: Any?) {
253 // Call afterResume from afterCompletion and not vice-versa, because stack-size is more
254 // important for afterResume implementation
255 afterResume(state)
256 }
257
afterResumenull258 override fun afterResume(state: Any?) {
259 if (tryResume()) return // completed before getResult invocation -- bail out
260 // Resume in a cancellable way because we have to switch back to the original dispatcher
261 uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
262 }
263
getResultnull264 internal fun getResult(): Any? {
265 if (trySuspend()) return COROUTINE_SUSPENDED
266 // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
267 val state = this.state.unboxState()
268 if (state is CompletedExceptionally) throw state.cause
269 @Suppress("UNCHECKED_CAST")
270 return state as T
271 }
272 }
273