/* * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:JvmMultifileClass @file:JvmName("BuildersKt") package kotlinx.coroutines import kotlinx.atomicfu.* import kotlinx.coroutines.internal.* import kotlinx.coroutines.intrinsics.* import kotlinx.coroutines.selects.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* import kotlin.jvm.* // --------------- launch --------------- /** * Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job]. * The coroutine is cancelled when the resulting job is [cancelled][Job.cancel]. * * The coroutine context is inherited from a [CoroutineScope]. Additional context elements can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden * with a corresponding [coroutineContext] element. * * By default, the coroutine is immediately scheduled for execution. * Other start options can be specified via `start` parameter. See [CoroutineStart] for details. * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case, * the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function * and will be started implicitly on the first invocation of [join][Job.join]. * * Uncaught exceptions in this coroutine cancel the parent job in the context by default * (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with * the context of another coroutine, then any uncaught exception leads to the cancellation of the parent coroutine. * * See [newCoroutineContext] for a description of debugging facilities that are available for a newly created coroutine. * * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT]. * @param block the coroutine code which will be invoked in the context of the provided scope. **/ public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine } // --------------- async --------------- /** * Creates a coroutine and returns its future result as an implementation of [Deferred]. * The running coroutine is cancelled when the resulting deferred is [cancelled][Job.cancel]. * The resulting coroutine has a key difference compared with similar primitives in other languages * and frameworks: it cancels the parent job (or outer scope) on failure to enforce *structured concurrency* paradigm. * To change that behaviour, supervising parent ([SupervisorJob] or [supervisorScope]) can be used. * * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden * with corresponding [coroutineContext] element. * * By default, the coroutine is immediately scheduled for execution. * Other options can be specified via `start` parameter. See [CoroutineStart] for details. * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case, * the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start] * function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll]. * * @param block the coroutine code. */ public fun CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyDeferredCoroutine(newContext, block) else DeferredCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine } @Suppress("UNCHECKED_CAST") private open class DeferredCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine(parentContext, active), Deferred, SelectClause1 { override fun getCompleted(): T = getCompletedInternal() as T override suspend fun await(): T = awaitInternal() as T override val onAwait: SelectClause1 get() = this override fun registerSelectClause1(select: SelectInstance, block: suspend (T) -> R) = registerSelectClause1Internal(select, block) } private class LazyDeferredCoroutine( parentContext: CoroutineContext, block: suspend CoroutineScope.() -> T ) : DeferredCoroutine(parentContext, active = false) { private var block: (suspend CoroutineScope.() -> T)? = block override fun onStart() { val block = checkNotNull(this.block) { "Already started" } this.block = null block.startCoroutineCancellable(this, this) } } // --------------- withContext --------------- /** * Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns * the result. * * The resulting context for the [block] is derived by merging the current [coroutineContext] with the * specified [context] using `coroutineContext + context` (see [CoroutineContext.plus]). * This suspending function is cancellable. It immediately checks for cancellation of * the resulting context and throws [CancellationException] if it is not [active][CoroutineContext.isActive]. * * This function uses dispatcher from the new context, shifting execution of the [block] into the * different thread if a new dispatcher is specified, and back to the original dispatcher * when it completes. Note that the result of `withContext` invocation is * dispatched into the original context in a cancellable way, which means that if the original [coroutineContext], * in which `withContext` was invoked, is cancelled by the time its dispatcher starts to execute the code, * it discards the result of `withContext` and throws [CancellationException]. */ public suspend fun withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont -> // compute new context val oldContext = uCont.context val newContext = oldContext + context // always check for cancellation of new context newContext.checkCompletion() // FAST PATH #1 -- new context is the same as the old one if (newContext === oldContext) { val coroutine = ScopeCoroutine(newContext, uCont) // MODE_DIRECT return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed) // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher) if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { val coroutine = UndispatchedCoroutine(newContext, uCont) // MODE_UNDISPATCHED // There are changes in the context, so this thread needs to be updated withCoroutineContext(newContext, null) { return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } } // SLOW PATH -- use new dispatcher val coroutine = DispatchedCoroutine(newContext, uCont) // MODE_CANCELLABLE coroutine.initParentJob() block.startCoroutineCancellable(coroutine, coroutine) coroutine.getResult() } /** * Calls the specified suspending block with the given [CoroutineDispatcher], suspends until it * completes, and returns the result. * * This inline function calls [withContext]. */ @ExperimentalCoroutinesApi public suspend inline operator fun CoroutineDispatcher.invoke( noinline block: suspend CoroutineScope.() -> T ): T = withContext(this, block) // --------------- implementation --------------- private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine(parentContext, active) { override fun handleJobException(exception: Throwable): Boolean { handleCoroutineException(context, exception) return true } } private class LazyStandaloneCoroutine( parentContext: CoroutineContext, block: suspend CoroutineScope.() -> Unit ) : StandaloneCoroutine(parentContext, active = false) { private var block: (suspend CoroutineScope.() -> Unit)? = block override fun onStart() { val block = checkNotNull(this.block) { "Already started" } this.block = null block.startCoroutineCancellable(this, this) } } // Used by withContext when context changes, but dispatcher stays the same private class UndispatchedCoroutine( context: CoroutineContext, uCont: Continuation ) : ScopeCoroutine(context, uCont) { override val defaultResumeMode: Int get() = MODE_UNDISPATCHED } private const val UNDECIDED = 0 private const val SUSPENDED = 1 private const val RESUMED = 2 // Used by withContext when context dispatcher changes private class DispatchedCoroutine( context: CoroutineContext, uCont: Continuation ) : ScopeCoroutine(context, uCont) { override val defaultResumeMode: Int get() = MODE_CANCELLABLE // this is copy-and-paste of a decision state machine inside AbstractionContinuation // todo: we may some-how abstract it via inline class private val _decision = atomic(UNDECIDED) private fun trySuspend(): Boolean { _decision.loop { decision -> when (decision) { UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true RESUMED -> return false else -> error("Already suspended") } } } private fun tryResume(): Boolean { _decision.loop { decision -> when (decision) { UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true SUSPENDED -> return false else -> error("Already resumed") } } } override fun afterCompletionInternal(state: Any?, mode: Int) { if (tryResume()) return // completed before getResult invocation -- bail out // otherwise, getResult has already commenced, i.e. completed later or in other thread super.afterCompletionInternal(state, mode) } fun getResult(): Any? { if (trySuspend()) return COROUTINE_SUSPENDED // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state val state = this.state.unboxState() if (state is CompletedExceptionally) throw state.cause @Suppress("UNCHECKED_CAST") return state as T } }