/* * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlin.coroutines.* import kotlin.jvm.* @Suppress("PrivatePropertyName") @SharedImmutable private val UNDEFINED = Symbol("UNDEFINED") /** * Executes given [block] as part of current event loop, updating current continuation * mode and state if continuation is not resumed immediately. * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty). * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise. */ private inline fun DispatchedContinuation<*>.executeUnconfined( contState: Any?, mode: Int, doYield: Boolean = false, block: () -> Unit ) : Boolean { val eventLoop = ThreadLocalEventLoop.eventLoop // If we are yielding and unconfined queue is empty, we can bail out as part of fast path if (doYield && eventLoop.isUnconfinedQueueEmpty) return false return if (eventLoop.isUnconfinedLoopActive) { // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow _state = contState resumeMode = mode eventLoop.dispatchUnconfined(this) true // queued into the active loop } else { // Was not active -- run event loop until all unconfined tasks are executed runUnconfinedEventLoop(eventLoop, block = block) false } } private fun DispatchedTask<*>.resumeUnconfined() { val eventLoop = ThreadLocalEventLoop.eventLoop if (eventLoop.isUnconfinedLoopActive) { // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow eventLoop.dispatchUnconfined(this) } else { // Was not active -- run event loop until all unconfined tasks are executed runUnconfinedEventLoop(eventLoop) { resume(delegate, MODE_UNDISPATCHED) } } } private inline fun DispatchedTask<*>.runUnconfinedEventLoop( eventLoop: EventLoop, block: () -> Unit ) { eventLoop.incrementUseCount(unconfined = true) try { block() while (true) { // break when all unconfined continuations where executed if (!eventLoop.processUnconfinedEvent()) break } } catch (e: Throwable) { /* * This exception doesn't happen normally, only if we have a bug in implementation. * Report it as a fatal exception. */ handleFatalException(e, null) } finally { eventLoop.decrementUseCount(unconfined = true) } } internal class DispatchedContinuation( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation ) : DispatchedTask(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation by continuation { @JvmField @Suppress("PropertyName") internal var _state: Any? = UNDEFINED override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame override fun getStackTraceElement(): StackTraceElement? = null @JvmField // pre-cached value to avoid ctx.fold on every resumption internal val countOrElement = threadContextElements(context) override fun takeState(): Any? { val state = _state assert { state !== UNDEFINED } // fail-fast if repeatedly invoked _state = UNDEFINED return state } override val delegate: Continuation get() = this override fun resumeWith(result: Result) { val context = continuation.context val state = result.toState() if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_ATOMIC_DEFAULT dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_ATOMIC_DEFAULT) { withCoroutineContext(this.context, countOrElement) { continuation.resumeWith(result) } } } } @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack inline fun resumeCancellable(value: T) { if (dispatcher.isDispatchNeeded(context)) { _state = value resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { executeUnconfined(value, MODE_CANCELLABLE) { if (!resumeCancelled()) { resumeUndispatched(value) } } } } @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack inline fun resumeCancellableWithException(exception: Throwable) { val context = continuation.context val state = CompletedExceptionally(exception) if (dispatcher.isDispatchNeeded(context)) { _state = CompletedExceptionally(exception) resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled()) { resumeUndispatchedWithException(exception) } } } } @Suppress("NOTHING_TO_INLINE") inline fun resumeCancelled(): Boolean { val job = context[Job] if (job != null && !job.isActive) { resumeWithException(job.getCancellationException()) return true } return false } @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack inline fun resumeUndispatched(value: T) { withCoroutineContext(context, countOrElement) { continuation.resume(value) } } @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack inline fun resumeUndispatchedWithException(exception: Throwable) { withCoroutineContext(context, countOrElement) { continuation.resumeWithStackTrace(exception) } } // used by "yield" implementation internal fun dispatchYield(value: T) { val context = continuation.context _state = value resumeMode = MODE_CANCELLABLE dispatcher.dispatchYield(context, this) } override fun toString(): String = "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]" } internal fun Continuation.resumeCancellable(value: T) = when (this) { is DispatchedContinuation -> resumeCancellable(value) else -> resume(value) } internal fun Continuation.resumeCancellableWithException(exception: Throwable) = when (this) { is DispatchedContinuation -> resumeCancellableWithException(exception) else -> resumeWithStackTrace(exception) } internal fun Continuation.resumeDirect(value: T) = when (this) { is DispatchedContinuation -> continuation.resume(value) else -> resume(value) } internal fun Continuation.resumeDirectWithException(exception: Throwable) = when (this) { is DispatchedContinuation -> continuation.resumeWithStackTrace(exception) else -> resumeWithStackTrace(exception) } internal abstract class DispatchedTask( @JvmField public var resumeMode: Int ) : SchedulerTask() { internal abstract val delegate: Continuation internal abstract fun takeState(): Any? internal open fun cancelResult(state: Any?, cause: Throwable) {} @Suppress("UNCHECKED_CAST") internal open fun getSuccessfulResult(state: Any?): T = state as T internal fun getExceptionalResult(state: Any?): Throwable? = (state as? CompletedExceptionally)?.cause public final override fun run() { val taskContext = this.taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation val continuation = delegate.continuation val context = continuation.context val state = takeState() // NOTE: Must take state in any case, even if cancelled withCoroutineContext(context, delegate.countOrElement) { val exception = getExceptionalResult(state) val job = if (resumeMode.isCancellableMode) context[Job] else null /* * Check whether continuation was originally resumed with an exception. * If so, it dominates cancellation, otherwise the original exception * will be silently lost. */ if (exception == null && job != null && !job.isActive) { val cause = job.getCancellationException() cancelResult(state, cause) continuation.resumeWithStackTrace(cause) } else { if (exception != null) continuation.resumeWithStackTrace(exception) else continuation.resume(getSuccessfulResult(state)) } } } catch (e: Throwable) { // This instead of runCatching to have nicer stacktrace and debug experience fatalException = e } finally { val result = runCatching { taskContext.afterTask() } handleFatalException(fatalException, result.exceptionOrNull()) } } /** * Machinery that handles fatal exceptions in kotlinx.coroutines. * There are two kinds of fatal exceptions: * * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either * the library or the compiler has a bug that breaks internal invariants. * They usually have specific workarounds, but require careful study of the cause and should * be reported to the maintainers and fixed on the library's side anyway. * * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext]. * While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement], * we can't ignore it because it may leave coroutine in the inconsistent state. * If you encounter such exception, you can either disable this context element or wrap it into * another context element that catches all exceptions and handles it in the application specific manner. * * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of * a failed coroutine, but such exceptions should be reported anyway. */ internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) { if (exception === null && finallyException === null) return if (exception !== null && finallyException !== null) { exception.addSuppressedThrowable(finallyException) } val cause = exception ?: finallyException val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " + "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!) handleCoroutineException(this.delegate.context, reason) } } internal fun DispatchedContinuation.yieldUndispatched(): Boolean = executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) { run() } internal fun DispatchedTask.dispatch(mode: Int = MODE_CANCELLABLE) { val delegate = this.delegate if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) { // dispatch directly using this instance's Runnable implementation val dispatcher = delegate.dispatcher val context = delegate.context if (dispatcher.isDispatchNeeded(context)) { dispatcher.dispatch(context, this) } else { resumeUnconfined() } } else { resume(delegate, mode) } } internal fun DispatchedTask.resume(delegate: Continuation, useMode: Int) { // slow-path - use delegate val state = takeState() val exception = getExceptionalResult(state) if (exception != null) { /* * Recover stacktrace for non-dispatched tasks. * We usually do not recover stacktrace in a `resume` as all resumes go through `DispatchedTask.run` * and we recover stacktraces there, but this is not the case for a `suspend fun main()` that knows nothing about * kotlinx.coroutines and DispatchedTask */ val recovered = if (delegate is DispatchedTask<*>) exception else recoverStackTrace(exception, delegate) delegate.resumeWithExceptionMode(recovered, useMode) } else { delegate.resumeMode(getSuccessfulResult(state), useMode) } } @Suppress("NOTHING_TO_INLINE") internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) { resumeWith(Result.failure(recoverStackTrace(exception, this))) }