• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 import kotlin.jvm.*
10 
11 /**
12  * Non-cancellable dispatch mode.
13  *
14  * **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling
15  * inline `suspendAtomicCancellableCoroutine` function and did not support reuse.
16  */
17 internal const val MODE_ATOMIC = 0
18 
19 /**
20  * Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine].
21  * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension.
22  *
23  * **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine].
24  */
25 @PublishedApi
26 internal const val MODE_CANCELLABLE = 1
27 
28 /**
29  * Cancellable dispatch mode for [suspendCancellableCoroutineReusable].
30  * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension;
31  * implementation of reuse checks mode via [Int.isReusableMode] extension.
32  */
33 internal const val MODE_CANCELLABLE_REUSABLE = 2
34 
35 /**
36  * Undispatched mode for [CancellableContinuation.resumeUndispatched].
37  * It is used when the thread is right, but it needs to be marked with the current coroutine.
38  */
39 internal const val MODE_UNDISPATCHED = 4
40 
41 /**
42  * Initial mode for [DispatchedContinuation] implementation, should never be used for dispatch, because it is always
43  * overwritten when continuation is resumed with the actual resume mode.
44  */
45 internal const val MODE_UNINITIALIZED = -1
46 
47 internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
48 internal val Int.isReusableMode get() = this == MODE_CANCELLABLE_REUSABLE
49 
50 internal abstract class DispatchedTask<in T>(
51     @JvmField public var resumeMode: Int
52 ) : SchedulerTask() {
53     internal abstract val delegate: Continuation<T>
54 
takeStatenull55     internal abstract fun takeState(): Any?
56 
57     /**
58      * Called when this task was cancelled while it was being dispatched.
59      */
60     internal open fun cancelCompletedResult(takenState: Any?, cause: Throwable) {}
61 
62     /**
63      * There are two implementations of `DispatchedTask`:
64      * * [DispatchedContinuation] keeps only simple values as successfully results.
65      * * [CancellableContinuationImpl] keeps additional data with values and overrides this method to unwrap it.
66      */
67     @Suppress("UNCHECKED_CAST")
getSuccessfulResultnull68     internal open fun <T> getSuccessfulResult(state: Any?): T =
69         state as T
70 
71     /**
72      * There are two implementations of `DispatchedTask`:
73      * * [DispatchedContinuation] is just an intermediate storage that stores the exception that has its stack-trace
74      *   properly recovered and is ready to pass to the [delegate] continuation directly.
75      * * [CancellableContinuationImpl] stores raw cause of the failure in its state; when it needs to be dispatched
76      *   its stack-trace has to be recovered, so it overrides this method for that purpose.
77      */
78     internal open fun getExceptionalResult(state: Any?): Throwable? =
79         (state as? CompletedExceptionally)?.cause
80 
81     public final override fun run() {
82         assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
83         val taskContext = this.taskContext
84         var fatalException: Throwable? = null
85         try {
86             val delegate = delegate as DispatchedContinuation<T>
87             val continuation = delegate.continuation
88             val context = continuation.context
89             val state = takeState() // NOTE: Must take state in any case, even if cancelled
90             withCoroutineContext(context, delegate.countOrElement) {
91                 val exception = getExceptionalResult(state)
92                 /*
93                  * Check whether continuation was originally resumed with an exception.
94                  * If so, it dominates cancellation, otherwise the original exception
95                  * will be silently lost.
96                  */
97                 val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
98                 if (job != null && !job.isActive) {
99                     val cause = job.getCancellationException()
100                     cancelCompletedResult(state, cause)
101                     continuation.resumeWithStackTrace(cause)
102                 } else {
103                     if (exception != null) {
104                         continuation.resumeWithException(exception)
105                     } else {
106                         continuation.resume(getSuccessfulResult(state))
107                     }
108                 }
109             }
110         } catch (e: Throwable) {
111             // This instead of runCatching to have nicer stacktrace and debug experience
112             fatalException = e
113         } finally {
114             val result = runCatching { taskContext.afterTask() }
115             handleFatalException(fatalException, result.exceptionOrNull())
116         }
117     }
118 
119     /**
120      * Machinery that handles fatal exceptions in kotlinx.coroutines.
121      * There are two kinds of fatal exceptions:
122      *
123      * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
124      *    the library or the compiler has a bug that breaks internal invariants.
125      *    They usually have specific workarounds, but require careful study of the cause and should
126      *    be reported to the maintainers and fixed on the library's side anyway.
127      *
128      * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
129      *    While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
130      *    we can't ignore it because it may leave coroutine in the inconsistent state.
131      *    If you encounter such exception, you can either disable this context element or wrap it into
132      *    another context element that catches all exceptions and handles it in the application specific manner.
133      *
134      * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
135      * a failed coroutine, but such exceptions should be reported anyway.
136      */
handleFatalExceptionnull137     internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
138         if (exception === null && finallyException === null) return
139         if (exception !== null && finallyException !== null) {
140             exception.addSuppressedThrowable(finallyException)
141         }
142 
143         val cause = exception ?: finallyException
144         val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
145                 "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
146         handleCoroutineException(this.delegate.context, reason)
147     }
148 }
149 
dispatchnull150 internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
151     assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method
152     val delegate = this.delegate
153     val undispatched = mode == MODE_UNDISPATCHED
154     if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
155         // dispatch directly using this instance's Runnable implementation
156         val dispatcher = delegate.dispatcher
157         val context = delegate.context
158         if (dispatcher.isDispatchNeeded(context)) {
159             dispatcher.dispatch(context, this)
160         } else {
161             resumeUnconfined()
162         }
163     } else {
164         // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation)
165         // or undispatched mode was requested
166         resume(delegate, undispatched)
167     }
168 }
169 
170 @Suppress("UNCHECKED_CAST")
resumenull171 internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, undispatched: Boolean) {
172     // This resume is never cancellable. The result is always delivered to delegate continuation.
173     val state = takeState()
174     val exception = getExceptionalResult(state)
175     val result = if (exception != null) Result.failure(exception) else Result.success(getSuccessfulResult<T>(state))
176     when {
177         undispatched -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
178         else -> delegate.resumeWith(result)
179     }
180 }
181 
resumeUnconfinednull182 private fun DispatchedTask<*>.resumeUnconfined() {
183     val eventLoop = ThreadLocalEventLoop.eventLoop
184     if (eventLoop.isUnconfinedLoopActive) {
185         // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
186         eventLoop.dispatchUnconfined(this)
187     } else {
188         // Was not active -- run event loop until all unconfined tasks are executed
189         runUnconfinedEventLoop(eventLoop) {
190             resume(delegate, undispatched = true)
191         }
192     }
193 }
194 
runUnconfinedEventLoopnull195 internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
196     eventLoop: EventLoop,
197     block: () -> Unit
198 ) {
199     eventLoop.incrementUseCount(unconfined = true)
200     try {
201         block()
202         while (true) {
203             // break when all unconfined continuations where executed
204             if (!eventLoop.processUnconfinedEvent()) break
205         }
206     } catch (e: Throwable) {
207         /*
208          * This exception doesn't happen normally, only if we have a bug in implementation.
209          * Report it as a fatal exception.
210          */
211         handleFatalException(e, null)
212     } finally {
213         eventLoop.decrementUseCount(unconfined = true)
214     }
215 }
216 
217 @Suppress("NOTHING_TO_INLINE")
resumeWithStackTracenull218 internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
219     resumeWith(Result.failure(recoverStackTrace(exception, this)))
220 }
221