• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("BuildersKt")
7 @file:OptIn(ExperimentalContracts::class)
8 
9 package kotlinx.coroutines
10 
11 import kotlinx.atomicfu.*
12 import kotlinx.coroutines.internal.*
13 import kotlinx.coroutines.intrinsics.*
14 import kotlinx.coroutines.selects.*
15 import kotlin.contracts.*
16 import kotlin.coroutines.*
17 import kotlin.coroutines.intrinsics.*
18 import kotlin.jvm.*
19 
20 // --------------- launch ---------------
21 
22 /**
23  * Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job].
24  * The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
25  *
26  * The coroutine context is inherited from a [CoroutineScope]. Additional context elements can be specified with [context] argument.
27  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
28  * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
29  * with a corresponding [context] element.
30  *
31  * By default, the coroutine is immediately scheduled for execution.
32  * Other start options can be specified via `start` parameter. See [CoroutineStart] for details.
33  * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
34  * the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function
35  * and will be started implicitly on the first invocation of [join][Job.join].
36  *
37  * Uncaught exceptions in this coroutine cancel the parent job in the context by default
38  * (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with
39  * the context of another coroutine, then any uncaught exception leads to the cancellation of the parent coroutine.
40  *
41  * See [newCoroutineContext] for a description of debugging facilities that are available for a newly created coroutine.
42  *
43  * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
44  * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
45  * @param block the coroutine code which will be invoked in the context of the provided scope.
46  **/
47 public fun CoroutineScope.launch(
48     context: CoroutineContext = EmptyCoroutineContext,
49     start: CoroutineStart = CoroutineStart.DEFAULT,
50     block: suspend CoroutineScope.() -> Unit
51 ): Job {
52     val newContext = newCoroutineContext(context)
53     val coroutine = if (start.isLazy)
54         LazyStandaloneCoroutine(newContext, block) else
55         StandaloneCoroutine(newContext, active = true)
56     coroutine.start(start, coroutine, block)
57     return coroutine
58 }
59 
60 // --------------- async ---------------
61 
62 /**
63  * Creates a coroutine and returns its future result as an implementation of [Deferred].
64  * The running coroutine is cancelled when the resulting deferred is [cancelled][Job.cancel].
65  * The resulting coroutine has a key difference compared with similar primitives in other languages
66  * and frameworks: it cancels the parent job (or outer scope) on failure to enforce *structured concurrency* paradigm.
67  * To change that behaviour, supervising parent ([SupervisorJob] or [supervisorScope]) can be used.
68  *
69  * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
70  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
71  * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
72  * with corresponding [context] element.
73  *
74  * By default, the coroutine is immediately scheduled for execution.
75  * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
76  * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
77  * the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start]
78  * function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll].
79  *
80  * @param block the coroutine code.
81  */
asyncnull82 public fun <T> CoroutineScope.async(
83     context: CoroutineContext = EmptyCoroutineContext,
84     start: CoroutineStart = CoroutineStart.DEFAULT,
85     block: suspend CoroutineScope.() -> T
86 ): Deferred<T> {
87     val newContext = newCoroutineContext(context)
88     val coroutine = if (start.isLazy)
89         LazyDeferredCoroutine(newContext, block) else
90         DeferredCoroutine<T>(newContext, active = true)
91     coroutine.start(start, coroutine, block)
92     return coroutine
93 }
94 
95 @Suppress("UNCHECKED_CAST")
96 private open class DeferredCoroutine<T>(
97     parentContext: CoroutineContext,
98     active: Boolean
99 ) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T>, SelectClause1<T> {
getCompletednull100     override fun getCompleted(): T = getCompletedInternal() as T
101     override suspend fun await(): T = awaitInternal() as T
102     override val onAwait: SelectClause1<T> get() = this
103     override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (T) -> R) =
104         registerSelectClause1Internal(select, block)
105 }
106 
107 private class LazyDeferredCoroutine<T>(
108     parentContext: CoroutineContext,
109     block: suspend CoroutineScope.() -> T
110 ) : DeferredCoroutine<T>(parentContext, active = false) {
111     private val continuation = block.createCoroutineUnintercepted(this, this)
112 
113     override fun onStart() {
114         continuation.startCoroutineCancellable(this)
115     }
116 }
117 
118 // --------------- withContext ---------------
119 
120 /**
121  * Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
122  * the result.
123  *
124  * The resulting context for the [block] is derived by merging the current [coroutineContext] with the
125  * specified [context] using `coroutineContext + context` (see [CoroutineContext.plus]).
126  * This suspending function is cancellable. It immediately checks for cancellation of
127  * the resulting context and throws [CancellationException] if it is not [active][CoroutineContext.isActive].
128  *
129  * Calls to [withContext] whose [context] argument provides a [CoroutineDispatcher] that is
130  * different from the current one, by necessity, perform additional dispatches: the [block]
131  * can not be executed immediately and needs to be dispatched for execution on
132  * the passed [CoroutineDispatcher], and then when the [block] completes, the execution
133  * has to shift back to the original dispatcher.
134  *
135  * Note that the result of `withContext` invocation is dispatched into the original context in a cancellable way
136  * with a **prompt cancellation guarantee**, which means that if the original [coroutineContext]
137  * in which `withContext` was invoked is cancelled by the time its dispatcher starts to execute the code,
138  * it discards the result of `withContext` and throws [CancellationException].
139  *
140  * The cancellation behaviour described above is enabled if and only if the dispatcher is being changed.
141  * For example, when using `withContext(NonCancellable) { ... }` there is no change in dispatcher and
142  * this call will not be cancelled neither on entry to the block inside `withContext` nor on exit from it.
143  */
withContextnull144 public suspend fun <T> withContext(
145     context: CoroutineContext,
146     block: suspend CoroutineScope.() -> T
147 ): T {
148     contract {
149         callsInPlace(block, InvocationKind.EXACTLY_ONCE)
150     }
151     return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
152         // compute new context
153         val oldContext = uCont.context
154         // Copy CopyableThreadContextElement if necessary
155         val newContext = oldContext.newCoroutineContext(context)
156         // always check for cancellation of new context
157         newContext.ensureActive()
158         // FAST PATH #1 -- new context is the same as the old one
159         if (newContext === oldContext) {
160             val coroutine = ScopeCoroutine(newContext, uCont)
161             return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
162         }
163         // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
164         // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
165         if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
166             val coroutine = UndispatchedCoroutine(newContext, uCont)
167             // There are changes in the context, so this thread needs to be updated
168             withCoroutineContext(newContext, null) {
169                 return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
170             }
171         }
172         // SLOW PATH -- use new dispatcher
173         val coroutine = DispatchedCoroutine(newContext, uCont)
174         block.startCoroutineCancellable(coroutine, coroutine)
175         coroutine.getResult()
176     }
177 }
178 
179 /**
180  * Calls the specified suspending block with the given [CoroutineDispatcher], suspends until it
181  * completes, and returns the result.
182  *
183  * This inline function calls [withContext].
184  */
invokenull185 public suspend inline operator fun <T> CoroutineDispatcher.invoke(
186     noinline block: suspend CoroutineScope.() -> T
187 ): T = withContext(this, block)
188 
189 // --------------- implementation ---------------
190 
191 private open class StandaloneCoroutine(
192     parentContext: CoroutineContext,
193     active: Boolean
194 ) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
195     override fun handleJobException(exception: Throwable): Boolean {
196         handleCoroutineException(context, exception)
197         return true
198     }
199 }
200 
201 private class LazyStandaloneCoroutine(
202     parentContext: CoroutineContext,
203     block: suspend CoroutineScope.() -> Unit
204 ) : StandaloneCoroutine(parentContext, active = false) {
205     private val continuation = block.createCoroutineUnintercepted(this, this)
206 
onStartnull207     override fun onStart() {
208         continuation.startCoroutineCancellable(this)
209     }
210 }
211 
212 // Used by withContext when context changes, but dispatcher stays the same
213 internal expect class UndispatchedCoroutine<in T>(
214     context: CoroutineContext,
215     uCont: Continuation<T>
216 ) : ScopeCoroutine<T>
217 
218 private const val UNDECIDED = 0
219 private const val SUSPENDED = 1
220 private const val RESUMED = 2
221 
222 // Used by withContext when context dispatcher changes
223 internal class DispatchedCoroutine<in T>(
224     context: CoroutineContext,
225     uCont: Continuation<T>
226 ) : ScopeCoroutine<T>(context, uCont) {
227     // this is copy-and-paste of a decision state machine inside AbstractionContinuation
228     // todo: we may some-how abstract it via inline class
229     private val _decision = atomic(UNDECIDED)
230 
trySuspendnull231     private fun trySuspend(): Boolean {
232         _decision.loop { decision ->
233             when (decision) {
234                 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
235                 RESUMED -> return false
236                 else -> error("Already suspended")
237             }
238         }
239     }
240 
tryResumenull241     private fun tryResume(): Boolean {
242         _decision.loop { decision ->
243             when (decision) {
244                 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
245                 SUSPENDED -> return false
246                 else -> error("Already resumed")
247             }
248         }
249     }
250 
afterCompletionnull251     override fun afterCompletion(state: Any?) {
252         // Call afterResume from afterCompletion and not vice-versa, because stack-size is more
253         // important for afterResume implementation
254         afterResume(state)
255     }
256 
afterResumenull257     override fun afterResume(state: Any?) {
258         if (tryResume()) return // completed before getResult invocation -- bail out
259         // Resume in a cancellable way because we have to switch back to the original dispatcher
260         uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
261     }
262 
getResultnull263     fun getResult(): Any? {
264         if (trySuspend()) return COROUTINE_SUSPENDED
265         // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
266         val state = this.state.unboxState()
267         if (state is CompletedExceptionally) throw state.cause
268         @Suppress("UNCHECKED_CAST")
269         return state as T
270     }
271 }
272