• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 @file:JvmMultifileClass
2 @file:JvmName("BuildersKt")
3 @file:OptIn(ExperimentalContracts::class)
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.internal.*
9 import kotlinx.coroutines.intrinsics.*
10 import kotlinx.coroutines.selects.*
11 import kotlin.contracts.*
12 import kotlin.coroutines.*
13 import kotlin.coroutines.intrinsics.*
14 import kotlin.jvm.*
15 
16 // --------------- launch ---------------
17 
18 /**
19  * Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job].
20  * The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
21  *
22  * The coroutine context is inherited from a [CoroutineScope]. Additional context elements can be specified with [context] argument.
23  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
24  * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
25  * with a corresponding [context] element.
26  *
27  * By default, the coroutine is immediately scheduled for execution.
28  * Other start options can be specified via `start` parameter. See [CoroutineStart] for details.
29  * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
30  * the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function
31  * and will be started implicitly on the first invocation of [join][Job.join].
32  *
33  * Uncaught exceptions in this coroutine cancel the parent job in the context by default
34  * (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with
35  * the context of another coroutine, then any uncaught exception leads to the cancellation of the parent coroutine.
36  *
37  * See [newCoroutineContext] for a description of debugging facilities that are available for a newly created coroutine.
38  *
39  * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
40  * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
41  * @param block the coroutine code which will be invoked in the context of the provided scope.
42  **/
43 public fun CoroutineScope.launch(
44     context: CoroutineContext = EmptyCoroutineContext,
45     start: CoroutineStart = CoroutineStart.DEFAULT,
46     block: suspend CoroutineScope.() -> Unit
47 ): Job {
48     val newContext = newCoroutineContext(context)
49     val coroutine = if (start.isLazy)
50         LazyStandaloneCoroutine(newContext, block) else
51         StandaloneCoroutine(newContext, active = true)
52     coroutine.start(start, coroutine, block)
53     return coroutine
54 }
55 
56 // --------------- async ---------------
57 
58 /**
59  * Creates a coroutine and returns its future result as an implementation of [Deferred].
60  * The running coroutine is cancelled when the resulting deferred is [cancelled][Job.cancel].
61  * The resulting coroutine has a key difference compared with similar primitives in other languages
62  * and frameworks: it cancels the parent job (or outer scope) on failure to enforce *structured concurrency* paradigm.
63  * To change that behaviour, supervising parent ([SupervisorJob] or [supervisorScope]) can be used.
64  *
65  * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
66  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
67  * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
68  * with corresponding [context] element.
69  *
70  * By default, the coroutine is immediately scheduled for execution.
71  * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
72  * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
73  * the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start]
74  * function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll].
75  *
76  * @param block the coroutine code.
77  */
asyncnull78 public fun <T> CoroutineScope.async(
79     context: CoroutineContext = EmptyCoroutineContext,
80     start: CoroutineStart = CoroutineStart.DEFAULT,
81     block: suspend CoroutineScope.() -> T
82 ): Deferred<T> {
83     val newContext = newCoroutineContext(context)
84     val coroutine = if (start.isLazy)
85         LazyDeferredCoroutine(newContext, block) else
86         DeferredCoroutine<T>(newContext, active = true)
87     coroutine.start(start, coroutine, block)
88     return coroutine
89 }
90 
91 @OptIn(InternalForInheritanceCoroutinesApi::class)
92 @Suppress("UNCHECKED_CAST")
93 private open class DeferredCoroutine<T>(
94     parentContext: CoroutineContext,
95     active: Boolean
96 ) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T> {
getCompletednull97     override fun getCompleted(): T = getCompletedInternal() as T
98     override suspend fun await(): T = awaitInternal() as T
99     override val onAwait: SelectClause1<T> get() = onAwaitInternal as SelectClause1<T>
100 }
101 
102 private class LazyDeferredCoroutine<T>(
103     parentContext: CoroutineContext,
104     block: suspend CoroutineScope.() -> T
105 ) : DeferredCoroutine<T>(parentContext, active = false) {
106     private val continuation = block.createCoroutineUnintercepted(this, this)
107 
108     override fun onStart() {
109         continuation.startCoroutineCancellable(this)
110     }
111 }
112 
113 // --------------- withContext ---------------
114 
115 /**
116  * Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
117  * the result.
118  *
119  * The resulting context for the [block] is derived by merging the current [coroutineContext] with the
120  * specified [context] using `coroutineContext + context` (see [CoroutineContext.plus]).
121  * This suspending function is cancellable. It immediately checks for cancellation of
122  * the resulting context and throws [CancellationException] if it is not [active][CoroutineContext.isActive].
123  *
124  * Calls to [withContext] whose [context] argument provides a [CoroutineDispatcher] that is
125  * different from the current one, by necessity, perform additional dispatches: the [block]
126  * can not be executed immediately and needs to be dispatched for execution on
127  * the passed [CoroutineDispatcher], and then when the [block] completes, the execution
128  * has to shift back to the original dispatcher.
129  *
130  * Note that the result of `withContext` invocation is dispatched into the original context in a cancellable way
131  * with a **prompt cancellation guarantee**, which means that if the original [coroutineContext]
132  * in which `withContext` was invoked is cancelled by the time its dispatcher starts to execute the code,
133  * it discards the result of `withContext` and throws [CancellationException].
134  *
135  * The cancellation behaviour described above is enabled if and only if the dispatcher is being changed.
136  * For example, when using `withContext(NonCancellable) { ... }` there is no change in dispatcher and
137  * this call will not be cancelled neither on entry to the block inside `withContext` nor on exit from it.
138  */
withContextnull139 public suspend fun <T> withContext(
140     context: CoroutineContext,
141     block: suspend CoroutineScope.() -> T
142 ): T {
143     contract {
144         callsInPlace(block, InvocationKind.EXACTLY_ONCE)
145     }
146     return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
147         // compute new context
148         val oldContext = uCont.context
149         // Copy CopyableThreadContextElement if necessary
150         val newContext = oldContext.newCoroutineContext(context)
151         // always check for cancellation of new context
152         newContext.ensureActive()
153         // FAST PATH #1 -- new context is the same as the old one
154         if (newContext === oldContext) {
155             val coroutine = ScopeCoroutine(newContext, uCont)
156             @Suppress("LEAKED_IN_PLACE_LAMBDA") // Contract is preserved, invoked immediately or throws
157             return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
158         }
159         // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
160         // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
161         if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
162             val coroutine = UndispatchedCoroutine(newContext, uCont)
163             // There are changes in the context, so this thread needs to be updated
164             withCoroutineContext(coroutine.context, null) {
165                 @Suppress("LEAKED_IN_PLACE_LAMBDA") // Contract is preserved, invoked immediately or throws
166                 return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
167             }
168         }
169         // SLOW PATH -- use new dispatcher
170         val coroutine = DispatchedCoroutine(newContext, uCont)
171         @Suppress("LEAKED_IN_PLACE_LAMBDA")  // Contract is preserved, invoked immediately or throws
172         block.startCoroutineCancellable(coroutine, coroutine)
173         coroutine.getResult()
174     }
175 }
176 
177 /**
178  * Calls the specified suspending block with the given [CoroutineDispatcher], suspends until it
179  * completes, and returns the result.
180  *
181  * This inline function calls [withContext].
182  */
invokenull183 public suspend inline operator fun <T> CoroutineDispatcher.invoke(
184     noinline block: suspend CoroutineScope.() -> T
185 ): T = withContext(this, block)
186 
187 // --------------- implementation ---------------
188 
189 private open class StandaloneCoroutine(
190     parentContext: CoroutineContext,
191     active: Boolean
192 ) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
193     override fun handleJobException(exception: Throwable): Boolean {
194         handleCoroutineException(context, exception)
195         return true
196     }
197 }
198 
199 private class LazyStandaloneCoroutine(
200     parentContext: CoroutineContext,
201     block: suspend CoroutineScope.() -> Unit
202 ) : StandaloneCoroutine(parentContext, active = false) {
203     private val continuation = block.createCoroutineUnintercepted(this, this)
204 
onStartnull205     override fun onStart() {
206         continuation.startCoroutineCancellable(this)
207     }
208 }
209 
210 // Used by withContext when context changes, but dispatcher stays the same
211 internal expect class UndispatchedCoroutine<in T>(
212     context: CoroutineContext,
213     uCont: Continuation<T>
214 ) : ScopeCoroutine<T>
215 
216 private const val UNDECIDED = 0
217 private const val SUSPENDED = 1
218 private const val RESUMED = 2
219 
220 // Used by withContext when context dispatcher changes
221 internal class DispatchedCoroutine<in T>(
222     context: CoroutineContext,
223     uCont: Continuation<T>
224 ) : ScopeCoroutine<T>(context, uCont) {
225     // this is copy-and-paste of a decision state machine inside AbstractionContinuation
226     // todo: we may some-how abstract it via inline class
227     private val _decision = atomic(UNDECIDED)
228 
trySuspendnull229     private fun trySuspend(): Boolean {
230         _decision.loop { decision ->
231             when (decision) {
232                 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
233                 RESUMED -> return false
234                 else -> error("Already suspended")
235             }
236         }
237     }
238 
tryResumenull239     private fun tryResume(): Boolean {
240         _decision.loop { decision ->
241             when (decision) {
242                 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
243                 SUSPENDED -> return false
244                 else -> error("Already resumed")
245             }
246         }
247     }
248 
afterCompletionnull249     override fun afterCompletion(state: Any?) {
250         // Call afterResume from afterCompletion and not vice-versa, because stack-size is more
251         // important for afterResume implementation
252         afterResume(state)
253     }
254 
afterResumenull255     override fun afterResume(state: Any?) {
256         if (tryResume()) return // completed before getResult invocation -- bail out
257         // Resume in a cancellable way because we have to switch back to the original dispatcher
258         uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
259     }
260 
getResultnull261     internal fun getResult(): Any? {
262         if (trySuspend()) return COROUTINE_SUSPENDED
263         // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
264         val state = this.state.unboxState()
265         if (state is CompletedExceptionally) throw state.cause
266         @Suppress("UNCHECKED_CAST")
267         return state as T
268     }
269 }
270