1 /*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 import kotlin.jvm.*
10
11 @Suppress("PrivatePropertyName")
12 @SharedImmutable
13 private val UNDEFINED = Symbol("UNDEFINED")
14
15 /**
16 * Executes given [block] as part of current event loop, updating current continuation
17 * mode and state if continuation is not resumed immediately.
18 * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
19 * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
20 */
executeUnconfinednull21 private inline fun DispatchedContinuation<*>.executeUnconfined(
22 contState: Any?, mode: Int, doYield: Boolean = false,
23 block: () -> Unit
24 ) : Boolean {
25 val eventLoop = ThreadLocalEventLoop.eventLoop
26 // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
27 if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
28 return if (eventLoop.isUnconfinedLoopActive) {
29 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
30 _state = contState
31 resumeMode = mode
32 eventLoop.dispatchUnconfined(this)
33 true // queued into the active loop
34 } else {
35 // Was not active -- run event loop until all unconfined tasks are executed
36 runUnconfinedEventLoop(eventLoop, block = block)
37 false
38 }
39 }
40
resumeUnconfinednull41 private fun DispatchedTask<*>.resumeUnconfined() {
42 val eventLoop = ThreadLocalEventLoop.eventLoop
43 if (eventLoop.isUnconfinedLoopActive) {
44 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
45 eventLoop.dispatchUnconfined(this)
46 } else {
47 // Was not active -- run event loop until all unconfined tasks are executed
48 runUnconfinedEventLoop(eventLoop) {
49 resume(delegate, MODE_UNDISPATCHED)
50 }
51 }
52 }
53
runUnconfinedEventLoopnull54 private inline fun DispatchedTask<*>.runUnconfinedEventLoop(
55 eventLoop: EventLoop,
56 block: () -> Unit
57 ) {
58 eventLoop.incrementUseCount(unconfined = true)
59 try {
60 block()
61 while (true) {
62 // break when all unconfined continuations where executed
63 if (!eventLoop.processUnconfinedEvent()) break
64 }
65 } catch (e: Throwable) {
66 /*
67 * This exception doesn't happen normally, only if we have a bug in implementation.
68 * Report it as a fatal exception.
69 */
70 handleFatalException(e, null)
71 } finally {
72 eventLoop.decrementUseCount(unconfined = true)
73 }
74 }
75
76 internal class DispatchedContinuation<in T>(
77 @JvmField val dispatcher: CoroutineDispatcher,
78 @JvmField val continuation: Continuation<T>
<lambda>null79 ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
80 @JvmField
81 @Suppress("PropertyName")
82 internal var _state: Any? = UNDEFINED
83 override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame
84 override fun getStackTraceElement(): StackTraceElement? = null
85 @JvmField // pre-cached value to avoid ctx.fold on every resumption
86 internal val countOrElement = threadContextElements(context)
87
88 override fun takeState(): Any? {
89 val state = _state
90 assert { state !== UNDEFINED } // fail-fast if repeatedly invoked
91 _state = UNDEFINED
92 return state
93 }
94
95 override val delegate: Continuation<T>
96 get() = this
97
98 override fun resumeWith(result: Result<T>) {
99 val context = continuation.context
100 val state = result.toState()
101 if (dispatcher.isDispatchNeeded(context)) {
102 _state = state
103 resumeMode = MODE_ATOMIC_DEFAULT
104 dispatcher.dispatch(context, this)
105 } else {
106 executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
107 withCoroutineContext(this.context, countOrElement) {
108 continuation.resumeWith(result)
109 }
110 }
111 }
112 }
113
114 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
115 inline fun resumeCancellable(value: T) {
116 if (dispatcher.isDispatchNeeded(context)) {
117 _state = value
118 resumeMode = MODE_CANCELLABLE
119 dispatcher.dispatch(context, this)
120 } else {
121 executeUnconfined(value, MODE_CANCELLABLE) {
122 if (!resumeCancelled()) {
123 resumeUndispatched(value)
124 }
125 }
126 }
127 }
128
129 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
130 inline fun resumeCancellableWithException(exception: Throwable) {
131 val context = continuation.context
132 val state = CompletedExceptionally(exception)
133 if (dispatcher.isDispatchNeeded(context)) {
134 _state = CompletedExceptionally(exception)
135 resumeMode = MODE_CANCELLABLE
136 dispatcher.dispatch(context, this)
137 } else {
138 executeUnconfined(state, MODE_CANCELLABLE) {
139 if (!resumeCancelled()) {
140 resumeUndispatchedWithException(exception)
141 }
142 }
143 }
144 }
145
146 @Suppress("NOTHING_TO_INLINE")
147 inline fun resumeCancelled(): Boolean {
148 val job = context[Job]
149 if (job != null && !job.isActive) {
150 resumeWithException(job.getCancellationException())
151 return true
152 }
153
154 return false
155 }
156
157 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
158 inline fun resumeUndispatched(value: T) {
159 withCoroutineContext(context, countOrElement) {
160 continuation.resume(value)
161 }
162 }
163
164 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
165 inline fun resumeUndispatchedWithException(exception: Throwable) {
166 withCoroutineContext(context, countOrElement) {
167 continuation.resumeWithStackTrace(exception)
168 }
169 }
170
171 // used by "yield" implementation
172 internal fun dispatchYield(value: T) {
173 val context = continuation.context
174 _state = value
175 resumeMode = MODE_CANCELLABLE
176 dispatcher.dispatchYield(context, this)
177 }
178
179 override fun toString(): String =
180 "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
181 }
182
resumeCancellablenull183 internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
184 is DispatchedContinuation -> resumeCancellable(value)
185 else -> resume(value)
186 }
187
resumeCancellableWithExceptionnull188 internal fun <T> Continuation<T>.resumeCancellableWithException(exception: Throwable) = when (this) {
189 is DispatchedContinuation -> resumeCancellableWithException(exception)
190 else -> resumeWithStackTrace(exception)
191 }
192
resumeDirectnull193 internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
194 is DispatchedContinuation -> continuation.resume(value)
195 else -> resume(value)
196 }
197
resumeDirectWithExceptionnull198 internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable) = when (this) {
199 is DispatchedContinuation -> continuation.resumeWithStackTrace(exception)
200 else -> resumeWithStackTrace(exception)
201 }
202
203 internal abstract class DispatchedTask<in T>(
204 @JvmField public var resumeMode: Int
205 ) : SchedulerTask() {
206 internal abstract val delegate: Continuation<T>
207
takeStatenull208 internal abstract fun takeState(): Any?
209
210 internal open fun cancelResult(state: Any?, cause: Throwable) {}
211
212 @Suppress("UNCHECKED_CAST")
getSuccessfulResultnull213 internal open fun <T> getSuccessfulResult(state: Any?): T =
214 state as T
215
216 internal fun getExceptionalResult(state: Any?): Throwable? =
217 (state as? CompletedExceptionally)?.cause
218
219 public final override fun run() {
220 val taskContext = this.taskContext
221 var fatalException: Throwable? = null
222 try {
223 val delegate = delegate as DispatchedContinuation<T>
224 val continuation = delegate.continuation
225 val context = continuation.context
226 val state = takeState() // NOTE: Must take state in any case, even if cancelled
227 withCoroutineContext(context, delegate.countOrElement) {
228 val exception = getExceptionalResult(state)
229 val job = if (resumeMode.isCancellableMode) context[Job] else null
230 /*
231 * Check whether continuation was originally resumed with an exception.
232 * If so, it dominates cancellation, otherwise the original exception
233 * will be silently lost.
234 */
235 if (exception == null && job != null && !job.isActive) {
236 val cause = job.getCancellationException()
237 cancelResult(state, cause)
238 continuation.resumeWithStackTrace(cause)
239 } else {
240 if (exception != null) continuation.resumeWithStackTrace(exception)
241 else continuation.resume(getSuccessfulResult(state))
242 }
243 }
244 } catch (e: Throwable) {
245 // This instead of runCatching to have nicer stacktrace and debug experience
246 fatalException = e
247 } finally {
248 val result = runCatching { taskContext.afterTask() }
249 handleFatalException(fatalException, result.exceptionOrNull())
250 }
251 }
252
253 /**
254 * Machinery that handles fatal exceptions in kotlinx.coroutines.
255 * There are two kinds of fatal exceptions:
256 *
257 * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
258 * the library or the compiler has a bug that breaks internal invariants.
259 * They usually have specific workarounds, but require careful study of the cause and should
260 * be reported to the maintainers and fixed on the library's side anyway.
261 *
262 * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
263 * While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
264 * we can't ignore it because it may leave coroutine in the inconsistent state.
265 * If you encounter such exception, you can either disable this context element or wrap it into
266 * another context element that catches all exceptions and handles it in the application specific manner.
267 *
268 * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
269 * a failed coroutine, but such exceptions should be reported anyway.
270 */
handleFatalExceptionnull271 internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
272 if (exception === null && finallyException === null) return
273 if (exception !== null && finallyException !== null) {
274 exception.addSuppressedThrowable(finallyException)
275 }
276
277 val cause = exception ?: finallyException
278 val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
279 "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
280 handleCoroutineException(this.delegate.context, reason)
281 }
282 }
283
yieldUndispatchednull284 internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
285 executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
286 run()
287 }
288
dispatchnull289 internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
290 val delegate = this.delegate
291 if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
292 // dispatch directly using this instance's Runnable implementation
293 val dispatcher = delegate.dispatcher
294 val context = delegate.context
295 if (dispatcher.isDispatchNeeded(context)) {
296 dispatcher.dispatch(context, this)
297 } else {
298 resumeUnconfined()
299 }
300 } else {
301 resume(delegate, mode)
302 }
303 }
304
resumenull305 internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: Int) {
306 // slow-path - use delegate
307 val state = takeState()
308 val exception = getExceptionalResult(state)
309 if (exception != null) {
310 /*
311 * Recover stacktrace for non-dispatched tasks.
312 * We usually do not recover stacktrace in a `resume` as all resumes go through `DispatchedTask.run`
313 * and we recover stacktraces there, but this is not the case for a `suspend fun main()` that knows nothing about
314 * kotlinx.coroutines and DispatchedTask
315 */
316 val recovered = if (delegate is DispatchedTask<*>) exception else recoverStackTrace(exception, delegate)
317 delegate.resumeWithExceptionMode(recovered, useMode)
318 } else {
319 delegate.resumeMode(getSuccessfulResult(state), useMode)
320 }
321 }
322
323
324 @Suppress("NOTHING_TO_INLINE")
resumeWithStackTracenull325 internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
326 resumeWith(Result.failure(recoverStackTrace(exception, this)))
327 }
328