• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("BuildersKt")
7 
8 package kotlinx.coroutines
9 
10 import kotlinx.atomicfu.*
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.intrinsics.*
13 import kotlinx.coroutines.selects.*
14 import kotlin.coroutines.*
15 import kotlin.coroutines.intrinsics.*
16 import kotlin.jvm.*
17 
18 // --------------- launch ---------------
19 
20 /**
21  * Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job].
22  * The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
23  *
24  * The coroutine context is inherited from a [CoroutineScope]. Additional context elements can be specified with [context] argument.
25  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
26  * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
27  * with a corresponding [coroutineContext] element.
28  *
29  * By default, the coroutine is immediately scheduled for execution.
30  * Other start options can be specified via `start` parameter. See [CoroutineStart] for details.
31  * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
32  * the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function
33  * and will be started implicitly on the first invocation of [join][Job.join].
34  *
35  * Uncaught exceptions in this coroutine cancel the parent job in the context by default
36  * (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with
37  * the context of another coroutine, then any uncaught exception leads to the cancellation of the parent coroutine.
38  *
39  * See [newCoroutineContext] for a description of debugging facilities that are available for a newly created coroutine.
40  *
41  * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
42  * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
43  * @param block the coroutine code which will be invoked in the context of the provided scope.
44  **/
45 public fun CoroutineScope.launch(
46     context: CoroutineContext = EmptyCoroutineContext,
47     start: CoroutineStart = CoroutineStart.DEFAULT,
48     block: suspend CoroutineScope.() -> Unit
49 ): Job {
50     val newContext = newCoroutineContext(context)
51     val coroutine = if (start.isLazy)
52         LazyStandaloneCoroutine(newContext, block) else
53         StandaloneCoroutine(newContext, active = true)
54     coroutine.start(start, coroutine, block)
55     return coroutine
56 }
57 
58 // --------------- async ---------------
59 
60 /**
61  * Creates a coroutine and returns its future result as an implementation of [Deferred].
62  * The running coroutine is cancelled when the resulting deferred is [cancelled][Job.cancel].
63  * The resulting coroutine has a key difference compared with similar primitives in other languages
64  * and frameworks: it cancels the parent job (or outer scope) on failure to enforce *structured concurrency* paradigm.
65  * To change that behaviour, supervising parent ([SupervisorJob] or [supervisorScope]) can be used.
66  *
67  * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
68  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
69  * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
70  * with corresponding [coroutineContext] element.
71  *
72  * By default, the coroutine is immediately scheduled for execution.
73  * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
74  * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
75  * the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start]
76  * function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll].
77  *
78  * @param block the coroutine code.
79  */
asyncnull80 public fun <T> CoroutineScope.async(
81     context: CoroutineContext = EmptyCoroutineContext,
82     start: CoroutineStart = CoroutineStart.DEFAULT,
83     block: suspend CoroutineScope.() -> T
84 ): Deferred<T> {
85     val newContext = newCoroutineContext(context)
86     val coroutine = if (start.isLazy)
87         LazyDeferredCoroutine(newContext, block) else
88         DeferredCoroutine<T>(newContext, active = true)
89     coroutine.start(start, coroutine, block)
90     return coroutine
91 }
92 
93 @Suppress("UNCHECKED_CAST")
94 private open class DeferredCoroutine<T>(
95     parentContext: CoroutineContext,
96     active: Boolean
97 ) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
getCompletednull98     override fun getCompleted(): T = getCompletedInternal() as T
99     override suspend fun await(): T = awaitInternal() as T
100     override val onAwait: SelectClause1<T> get() = this
101     override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (T) -> R) =
102         registerSelectClause1Internal(select, block)
103 }
104 
105 private class LazyDeferredCoroutine<T>(
106     parentContext: CoroutineContext,
107     block: suspend CoroutineScope.() -> T
108 ) : DeferredCoroutine<T>(parentContext, active = false) {
109     private var block: (suspend CoroutineScope.() -> T)? = block
110 
111     override fun onStart() {
112         val block = checkNotNull(this.block) { "Already started" }
113         this.block = null
114         block.startCoroutineCancellable(this, this)
115     }
116 }
117 
118 // --------------- withContext ---------------
119 
120 /**
121  * Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
122  * the result.
123  *
124  * The resulting context for the [block] is derived by merging the current [coroutineContext] with the
125  * specified [context] using `coroutineContext + context` (see [CoroutineContext.plus]).
126  * This suspending function is cancellable. It immediately checks for cancellation of
127  * the resulting context and throws [CancellationException] if it is not [active][CoroutineContext.isActive].
128  *
129  * This function uses dispatcher from the new context, shifting execution of the [block] into the
130  * different thread if a new dispatcher is specified, and back to the original dispatcher
131  * when it completes. Note that the result of `withContext` invocation is
132  * dispatched into the original context in a cancellable way, which means that if the original [coroutineContext],
133  * in which `withContext` was invoked, is cancelled by the time its dispatcher starts to execute the code,
134  * it discards the result of `withContext` and throws [CancellationException].
135  */
withContextnull136 public suspend fun <T> withContext(
137     context: CoroutineContext,
138     block: suspend CoroutineScope.() -> T
139 ): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
140     // compute new context
141     val oldContext = uCont.context
142     val newContext = oldContext + context
143     // always check for cancellation of new context
144     newContext.checkCompletion()
145     // FAST PATH #1 -- new context is the same as the old one
146     if (newContext === oldContext) {
147         val coroutine = ScopeCoroutine(newContext, uCont) // MODE_DIRECT
148         return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
149     }
150     // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
151     // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
152     if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
153         val coroutine = UndispatchedCoroutine(newContext, uCont) // MODE_UNDISPATCHED
154         // There are changes in the context, so this thread needs to be updated
155         withCoroutineContext(newContext, null) {
156             return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
157         }
158     }
159     // SLOW PATH -- use new dispatcher
160     val coroutine = DispatchedCoroutine(newContext, uCont) // MODE_CANCELLABLE
161     coroutine.initParentJob()
162     block.startCoroutineCancellable(coroutine, coroutine)
163     coroutine.getResult()
164 }
165 
166 /**
167  * Calls the specified suspending block with the given [CoroutineDispatcher], suspends until it
168  * completes, and returns the result.
169  *
170  * This inline function calls [withContext].
171  */
172 @ExperimentalCoroutinesApi
invokenull173 public suspend inline operator fun <T> CoroutineDispatcher.invoke(
174     noinline block: suspend CoroutineScope.() -> T
175 ): T = withContext(this, block)
176 
177 // --------------- implementation ---------------
178 
179 private open class StandaloneCoroutine(
180     parentContext: CoroutineContext,
181     active: Boolean
182 ) : AbstractCoroutine<Unit>(parentContext, active) {
183     override fun handleJobException(exception: Throwable): Boolean {
184         handleCoroutineException(context, exception)
185         return true
186     }
187 }
188 
189 private class LazyStandaloneCoroutine(
190     parentContext: CoroutineContext,
191     block: suspend CoroutineScope.() -> Unit
192 ) : StandaloneCoroutine(parentContext, active = false) {
193     private var block: (suspend CoroutineScope.() -> Unit)? = block
194 
onStartnull195     override fun onStart() {
196         val block = checkNotNull(this.block) { "Already started" }
197         this.block = null
198         block.startCoroutineCancellable(this, this)
199     }
200 }
201 
202 // Used by withContext when context changes, but dispatcher stays the same
203 private class UndispatchedCoroutine<in T>(
204     context: CoroutineContext,
205     uCont: Continuation<T>
206 ) : ScopeCoroutine<T>(context, uCont) {
207     override val defaultResumeMode: Int get() = MODE_UNDISPATCHED
208 }
209 
210 private const val UNDECIDED = 0
211 private const val SUSPENDED = 1
212 private const val RESUMED = 2
213 
214 // Used by withContext when context dispatcher changes
215 private class DispatchedCoroutine<in T>(
216     context: CoroutineContext,
217     uCont: Continuation<T>
218 ) : ScopeCoroutine<T>(context, uCont) {
219     override val defaultResumeMode: Int get() = MODE_CANCELLABLE
220 
221     // this is copy-and-paste of a decision state machine inside AbstractionContinuation
222     // todo: we may some-how abstract it via inline class
223     private val _decision = atomic(UNDECIDED)
224 
trySuspendnull225     private fun trySuspend(): Boolean {
226         _decision.loop { decision ->
227             when (decision) {
228                 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
229                 RESUMED -> return false
230                 else -> error("Already suspended")
231             }
232         }
233     }
234 
tryResumenull235     private fun tryResume(): Boolean {
236         _decision.loop { decision ->
237             when (decision) {
238                 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
239                 SUSPENDED -> return false
240                 else -> error("Already resumed")
241             }
242         }
243     }
244 
afterCompletionInternalnull245     override fun afterCompletionInternal(state: Any?, mode: Int) {
246         if (tryResume()) return // completed before getResult invocation -- bail out
247         // otherwise, getResult has already commenced, i.e. completed later or in other thread
248         super.afterCompletionInternal(state, mode)
249     }
250 
getResultnull251     fun getResult(): Any? {
252         if (trySuspend()) return COROUTINE_SUSPENDED
253         // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
254         val state = this.state.unboxState()
255         if (state is CompletedExceptionally) throw state.cause
256         @Suppress("UNCHECKED_CAST")
257         return state as T
258     }
259 }
260