• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 import kotlin.jvm.*
10 
11 @Suppress("PrivatePropertyName")
12 @SharedImmutable
13 private val UNDEFINED = Symbol("UNDEFINED")
14 
15 /**
16  * Executes given [block] as part of current event loop, updating current continuation
17  * mode and state if continuation is not resumed immediately.
18  * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
19  * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
20  */
executeUnconfinednull21 private inline fun DispatchedContinuation<*>.executeUnconfined(
22     contState: Any?, mode: Int, doYield: Boolean = false,
23     block: () -> Unit
24 ) : Boolean {
25     val eventLoop = ThreadLocalEventLoop.eventLoop
26     // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
27     if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
28     return if (eventLoop.isUnconfinedLoopActive) {
29         // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
30         _state = contState
31         resumeMode = mode
32         eventLoop.dispatchUnconfined(this)
33         true // queued into the active loop
34     } else {
35         // Was not active -- run event loop until all unconfined tasks are executed
36         runUnconfinedEventLoop(eventLoop, block = block)
37         false
38     }
39 }
40 
resumeUnconfinednull41 private fun DispatchedTask<*>.resumeUnconfined() {
42     val eventLoop = ThreadLocalEventLoop.eventLoop
43     if (eventLoop.isUnconfinedLoopActive) {
44         // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
45         eventLoop.dispatchUnconfined(this)
46     } else {
47         // Was not active -- run event loop until all unconfined tasks are executed
48         runUnconfinedEventLoop(eventLoop) {
49             resume(delegate, MODE_UNDISPATCHED)
50         }
51     }
52 }
53 
runUnconfinedEventLoopnull54 private inline fun DispatchedTask<*>.runUnconfinedEventLoop(
55     eventLoop: EventLoop,
56     block: () -> Unit
57 ) {
58     eventLoop.incrementUseCount(unconfined = true)
59     try {
60         block()
61         while (true) {
62             // break when all unconfined continuations where executed
63             if (!eventLoop.processUnconfinedEvent()) break
64         }
65     } catch (e: Throwable) {
66         /*
67          * This exception doesn't happen normally, only if we have a bug in implementation.
68          * Report it as a fatal exception.
69          */
70         handleFatalException(e, null)
71     } finally {
72         eventLoop.decrementUseCount(unconfined = true)
73     }
74 }
75 
76 internal class DispatchedContinuation<in T>(
77     @JvmField val dispatcher: CoroutineDispatcher,
78     @JvmField val continuation: Continuation<T>
<lambda>null79 ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
80     @JvmField
81     @Suppress("PropertyName")
82     internal var _state: Any? = UNDEFINED
83     override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame
84     override fun getStackTraceElement(): StackTraceElement? = null
85     @JvmField // pre-cached value to avoid ctx.fold on every resumption
86     internal val countOrElement = threadContextElements(context)
87 
88     override fun takeState(): Any? {
89         val state = _state
90         assert { state !== UNDEFINED } // fail-fast if repeatedly invoked
91         _state = UNDEFINED
92         return state
93     }
94 
95     override val delegate: Continuation<T>
96         get() = this
97 
98     override fun resumeWith(result: Result<T>) {
99         val context = continuation.context
100         val state = result.toState()
101         if (dispatcher.isDispatchNeeded(context)) {
102             _state = state
103             resumeMode = MODE_ATOMIC_DEFAULT
104             dispatcher.dispatch(context, this)
105         } else {
106             executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
107                 withCoroutineContext(this.context, countOrElement) {
108                     continuation.resumeWith(result)
109                 }
110             }
111         }
112     }
113 
114     @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
115     inline fun resumeCancellable(value: T) {
116         if (dispatcher.isDispatchNeeded(context)) {
117             _state = value
118             resumeMode = MODE_CANCELLABLE
119             dispatcher.dispatch(context, this)
120         } else {
121             executeUnconfined(value, MODE_CANCELLABLE) {
122                 if (!resumeCancelled()) {
123                     resumeUndispatched(value)
124                 }
125             }
126         }
127     }
128 
129     @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
130     inline fun resumeCancellableWithException(exception: Throwable) {
131         val context = continuation.context
132         val state = CompletedExceptionally(exception)
133         if (dispatcher.isDispatchNeeded(context)) {
134             _state = CompletedExceptionally(exception)
135             resumeMode = MODE_CANCELLABLE
136             dispatcher.dispatch(context, this)
137         } else {
138             executeUnconfined(state, MODE_CANCELLABLE) {
139                 if (!resumeCancelled()) {
140                     resumeUndispatchedWithException(exception)
141                 }
142             }
143         }
144     }
145 
146     @Suppress("NOTHING_TO_INLINE")
147     inline fun resumeCancelled(): Boolean {
148         val job = context[Job]
149         if (job != null && !job.isActive) {
150             resumeWithException(job.getCancellationException())
151             return true
152         }
153 
154         return false
155     }
156 
157     @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
158     inline fun resumeUndispatched(value: T) {
159         withCoroutineContext(context, countOrElement) {
160             continuation.resume(value)
161         }
162     }
163 
164     @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
165     inline fun resumeUndispatchedWithException(exception: Throwable) {
166         withCoroutineContext(context, countOrElement) {
167             continuation.resumeWithStackTrace(exception)
168         }
169     }
170 
171     // used by "yield" implementation
172     internal fun dispatchYield(value: T) {
173         val context = continuation.context
174         _state = value
175         resumeMode = MODE_CANCELLABLE
176         dispatcher.dispatchYield(context, this)
177     }
178 
179     override fun toString(): String =
180         "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
181 }
182 
resumeCancellablenull183 internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
184     is DispatchedContinuation -> resumeCancellable(value)
185     else -> resume(value)
186 }
187 
resumeCancellableWithExceptionnull188 internal fun <T> Continuation<T>.resumeCancellableWithException(exception: Throwable) = when (this) {
189     is DispatchedContinuation -> resumeCancellableWithException(exception)
190     else -> resumeWithStackTrace(exception)
191 }
192 
resumeDirectnull193 internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
194     is DispatchedContinuation -> continuation.resume(value)
195     else -> resume(value)
196 }
197 
resumeDirectWithExceptionnull198 internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable) = when (this) {
199     is DispatchedContinuation -> continuation.resumeWithStackTrace(exception)
200     else -> resumeWithStackTrace(exception)
201 }
202 
203 internal abstract class DispatchedTask<in T>(
204     @JvmField public var resumeMode: Int
205 ) : SchedulerTask() {
206     internal abstract val delegate: Continuation<T>
207 
takeStatenull208     internal abstract fun takeState(): Any?
209 
210     internal open fun cancelResult(state: Any?, cause: Throwable) {}
211 
212     @Suppress("UNCHECKED_CAST")
getSuccessfulResultnull213     internal open fun <T> getSuccessfulResult(state: Any?): T =
214         state as T
215 
216     internal fun getExceptionalResult(state: Any?): Throwable? =
217         (state as? CompletedExceptionally)?.cause
218 
219     public final override fun run() {
220         val taskContext = this.taskContext
221         var fatalException: Throwable? = null
222         try {
223             val delegate = delegate as DispatchedContinuation<T>
224             val continuation = delegate.continuation
225             val context = continuation.context
226             val state = takeState() // NOTE: Must take state in any case, even if cancelled
227             withCoroutineContext(context, delegate.countOrElement) {
228                 val exception = getExceptionalResult(state)
229                 val job = if (resumeMode.isCancellableMode) context[Job] else null
230                 /*
231                  * Check whether continuation was originally resumed with an exception.
232                  * If so, it dominates cancellation, otherwise the original exception
233                  * will be silently lost.
234                  */
235                 if (exception == null && job != null && !job.isActive) {
236                     val cause = job.getCancellationException()
237                     cancelResult(state, cause)
238                     continuation.resumeWithStackTrace(cause)
239                 } else {
240                     if (exception != null) continuation.resumeWithStackTrace(exception)
241                     else continuation.resume(getSuccessfulResult(state))
242                 }
243             }
244         } catch (e: Throwable) {
245             // This instead of runCatching to have nicer stacktrace and debug experience
246             fatalException = e
247         } finally {
248             val result = runCatching { taskContext.afterTask() }
249             handleFatalException(fatalException, result.exceptionOrNull())
250         }
251     }
252 
253     /**
254      * Machinery that handles fatal exceptions in kotlinx.coroutines.
255      * There are two kinds of fatal exceptions:
256      *
257      * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
258      *    the library or the compiler has a bug that breaks internal invariants.
259      *    They usually have specific workarounds, but require careful study of the cause and should
260      *    be reported to the maintainers and fixed on the library's side anyway.
261      *
262      * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
263      *    While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
264      *    we can't ignore it because it may leave coroutine in the inconsistent state.
265      *    If you encounter such exception, you can either disable this context element or wrap it into
266      *    another context element that catches all exceptions and handles it in the application specific manner.
267      *
268      * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
269      * a failed coroutine, but such exceptions should be reported anyway.
270      */
handleFatalExceptionnull271     internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
272         if (exception === null && finallyException === null) return
273         if (exception !== null && finallyException !== null) {
274             exception.addSuppressedThrowable(finallyException)
275         }
276 
277         val cause = exception ?: finallyException
278         val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
279                 "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
280         handleCoroutineException(this.delegate.context, reason)
281     }
282 }
283 
yieldUndispatchednull284 internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
285     executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
286         run()
287     }
288 
dispatchnull289 internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
290     val delegate = this.delegate
291     if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
292         // dispatch directly using this instance's Runnable implementation
293         val dispatcher = delegate.dispatcher
294         val context = delegate.context
295         if (dispatcher.isDispatchNeeded(context)) {
296             dispatcher.dispatch(context, this)
297         } else {
298             resumeUnconfined()
299         }
300     } else {
301         resume(delegate, mode)
302     }
303 }
304 
resumenull305 internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: Int) {
306     // slow-path - use delegate
307     val state = takeState()
308     val exception = getExceptionalResult(state)
309     if (exception != null) {
310         /*
311          * Recover stacktrace for non-dispatched tasks.
312          * We usually do not recover stacktrace in a `resume` as all resumes go through `DispatchedTask.run`
313          * and we recover stacktraces there, but this is not the case for a `suspend fun main()` that knows nothing about
314          * kotlinx.coroutines and DispatchedTask
315          */
316         val recovered = if (delegate is DispatchedTask<*>) exception else recoverStackTrace(exception, delegate)
317         delegate.resumeWithExceptionMode(recovered, useMode)
318     } else {
319         delegate.resumeMode(getSuccessfulResult(state), useMode)
320     }
321 }
322 
323 
324 @Suppress("NOTHING_TO_INLINE")
resumeWithStackTracenull325 internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
326     resumeWith(Result.failure(recoverStackTrace(exception, this)))
327 }
328