1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.internal
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11
12 private val UNDEFINED = Symbol("UNDEFINED")
13 @JvmField
14 internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED")
15
16 @PublishedApi
17 internal class DispatchedContinuation<in T>(
18 @JvmField internal val dispatcher: CoroutineDispatcher,
19 // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
20 @JvmField val continuation: Continuation<T>
21 ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
22 @JvmField
23 @Suppress("PropertyName")
24 internal var _state: Any? = UNDEFINED
25 override val callerFrame: CoroutineStackFrame? get() = continuation as? CoroutineStackFrame
26 override fun getStackTraceElement(): StackTraceElement? = null
27 @JvmField // pre-cached value to avoid ctx.fold on every resumption
28 internal val countOrElement = threadContextElements(context)
29
30 /**
31 * Possible states of reusability:
32 *
33 * 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
34 * was used and then invalidated (e.g. because of the cancellation).
35 * 2) [CancellableContinuation]. Continuation to be/that is being reused.
36 * 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
37 * ```
38 * // state == null | CC
39 * suspendCancellableCoroutineReusable { cont ->
40 * // state == REUSABLE_CLAIMED
41 * block(cont)
42 * }
43 * // state == CC
44 * ```
45 * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
46 * [CancellableContinuationImpl.getResult] will check for cancellation later.
47 *
48 * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
49 * In the `getResult`, we have the following code:
50 * ```
51 * if (trySuspend()) {
52 * // <- at this moment current continuation can be redispatched and claimed again.
53 * attachChildToParent()
54 * releaseClaimedContinuation()
55 * }
56 * ```
57 */
58 private val _reusableCancellableContinuation = atomic<Any?>(null)
59
60 private val reusableCancellableContinuation: CancellableContinuationImpl<*>?
61 get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
62
63 internal fun isReusable(): Boolean {
64 /*
65 Invariant: caller.resumeMode.isReusableMode
66 * Reusability control:
67 * `null` -> no reusability at all, `false`
68 * anything else -> reusable.
69 */
70 return _reusableCancellableContinuation.value != null
71 }
72
73 /**
74 * Awaits until previous call to `suspendCancellableCoroutineReusable` will
75 * stop mutating cached instance
76 */
77 internal fun awaitReusability() {
78 _reusableCancellableContinuation.loop {
79 if (it !== REUSABLE_CLAIMED) return
80 }
81 }
82
83 internal fun release() {
84 /*
85 * Called from `releaseInterceptedContinuation`, can be concurrent with
86 * the code in `getResult` right after `trySuspend` returned `true`, so we have
87 * to wait for a release here.
88 */
89 awaitReusability()
90 reusableCancellableContinuation?.detachChild()
91 }
92
93 /**
94 * Claims the continuation for [suspendCancellableCoroutineReusable] block,
95 * so all cancellations will be postponed.
96 */
97 @Suppress("UNCHECKED_CAST")
98 internal fun claimReusableCancellableContinuation(): CancellableContinuationImpl<T>? {
99 /*
100 * Transitions:
101 * 1) `null` -> claimed, caller will instantiate CC instance
102 * 2) `CC` -> claimed, caller will reuse CC instance
103 */
104 _reusableCancellableContinuation.loop { state ->
105 when {
106 state === null -> {
107 /*
108 * null -> CC was not yet published -> we do not compete with cancel
109 * -> can use plain store instead of CAS
110 */
111 _reusableCancellableContinuation.value = REUSABLE_CLAIMED
112 return null
113 }
114 // potentially competing with cancel
115 state is CancellableContinuationImpl<*> -> {
116 if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
117 return state as CancellableContinuationImpl<T>
118 }
119 }
120 state === REUSABLE_CLAIMED -> {
121 // Do nothing, wait until reusable instance will be returned from
122 // getResult() of a previous `suspendCancellableCoroutineReusable`
123 }
124 state is Throwable -> {
125 // Also do nothing, Throwable can only indicate that the CC
126 // is in REUSABLE_CLAIMED state, but with postponed cancellation
127 }
128 else -> error("Inconsistent state $state")
129 }
130 }
131 }
132
133 /**
134 * Checks whether there were any attempts to cancel reusable CC while it was in [REUSABLE_CLAIMED] state
135 * and returns cancellation cause if so, `null` otherwise.
136 * If continuation was cancelled, it becomes non-reusable.
137 *
138 * ```
139 * suspendCancellableCoroutineReusable { // <- claimed
140 * // Any asynchronous cancellation is "postponed" while this block
141 * // is being executed
142 * } // postponed cancellation is checked here in `getResult`
143 * ```
144 *
145 * See [CancellableContinuationImpl.getResult].
146 */
147 internal fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
148 _reusableCancellableContinuation.loop { state ->
149 // not when(state) to avoid Intrinsics.equals call
150 when {
151 state === REUSABLE_CLAIMED -> {
152 if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
153 }
154 state is Throwable -> {
155 require(_reusableCancellableContinuation.compareAndSet(state, null))
156 return state
157 }
158 else -> error("Inconsistent state $state")
159 }
160 }
161 }
162
163 /**
164 * Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state.
165 * Returns `true` if cancellation is (or previously was) postponed, `false` otherwise.
166 */
167 internal fun postponeCancellation(cause: Throwable): Boolean {
168 _reusableCancellableContinuation.loop { state ->
169 when (state) {
170 REUSABLE_CLAIMED -> {
171 if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cause))
172 return true
173 }
174 is Throwable -> return true
175 else -> {
176 // Invalidate
177 if (_reusableCancellableContinuation.compareAndSet(state, null))
178 return false
179 }
180 }
181 }
182 }
183
184 override fun takeState(): Any? {
185 val state = _state
186 assert { state !== UNDEFINED } // fail-fast if repeatedly invoked
187 _state = UNDEFINED
188 return state
189 }
190
191 override val delegate: Continuation<T>
192 get() = this
193
194 override fun resumeWith(result: Result<T>) {
195 val context = continuation.context
196 val state = result.toState()
197 if (dispatcher.isDispatchNeeded(context)) {
198 _state = state
199 resumeMode = MODE_ATOMIC
200 dispatcher.dispatch(context, this)
201 } else {
202 executeUnconfined(state, MODE_ATOMIC) {
203 withCoroutineContext(this.context, countOrElement) {
204 continuation.resumeWith(result)
205 }
206 }
207 }
208 }
209
210 // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
211 // It is used only in Continuation<T>.resumeCancellableWith
212 @Suppress("NOTHING_TO_INLINE")
213 internal inline fun resumeCancellableWith(
214 result: Result<T>,
215 noinline onCancellation: ((cause: Throwable) -> Unit)?
216 ) {
217 val state = result.toState(onCancellation)
218 if (dispatcher.isDispatchNeeded(context)) {
219 _state = state
220 resumeMode = MODE_CANCELLABLE
221 dispatcher.dispatch(context, this)
222 } else {
223 executeUnconfined(state, MODE_CANCELLABLE) {
224 if (!resumeCancelled(state)) {
225 resumeUndispatchedWith(result)
226 }
227 }
228 }
229 }
230
231 // takeState had already cleared the state so we cancel takenState here
232 override fun cancelCompletedResult(takenState: Any?, cause: Throwable) {
233 // It is Ok to call onCancellation here without try/catch around it, since this function only faces
234 // a "bound" cancellation handler that performs the safe call to the user-specified code.
235 if (takenState is CompletedWithCancellation) {
236 takenState.onCancellation(cause)
237 }
238 }
239
240 // inline here is to save us an entry on the stack for the sake of better stacktraces
241 @Suppress("NOTHING_TO_INLINE")
242 internal inline fun resumeCancelled(state: Any?): Boolean {
243 val job = context[Job]
244 if (job != null && !job.isActive) {
245 val cause = job.getCancellationException()
246 cancelCompletedResult(state, cause)
247 resumeWithException(cause)
248 return true
249 }
250 return false
251 }
252
253 @Suppress("NOTHING_TO_INLINE")
254 internal inline fun resumeUndispatchedWith(result: Result<T>) {
255 withContinuationContext(continuation, countOrElement) {
256 continuation.resumeWith(result)
257 }
258 }
259
260 // used by "yield" implementation
261 internal fun dispatchYield(context: CoroutineContext, value: T) {
262 _state = value
263 resumeMode = MODE_CANCELLABLE
264 dispatcher.dispatchYield(context, this)
265 }
266
267 override fun toString(): String =
268 "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
269 }
270
271 /**
272 * It is not inline to save bytecode (it is pretty big and used in many places)
273 * and we leave it public so that its name is not mangled in use stack traces if it shows there.
274 * It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher.
275 * @suppress **This an internal API and should not be used from general code.**
276 */
277 @InternalCoroutinesApi
resumeCancellableWithnull278 public fun <T> Continuation<T>.resumeCancellableWith(
279 result: Result<T>,
280 onCancellation: ((cause: Throwable) -> Unit)? = null
281 ): Unit = when (this) {
282 is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
283 else -> resumeWith(result)
284 }
285
yieldUndispatchednull286 internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
287 executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
288 run()
289 }
290
291 /**
292 * Executes given [block] as part of current event loop, updating current continuation
293 * mode and state if continuation is not resumed immediately.
294 * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
295 * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
296 */
executeUnconfinednull297 private inline fun DispatchedContinuation<*>.executeUnconfined(
298 contState: Any?, mode: Int, doYield: Boolean = false,
299 block: () -> Unit
300 ): Boolean {
301 assert { mode != MODE_UNINITIALIZED } // invalid execution mode
302 val eventLoop = ThreadLocalEventLoop.eventLoop
303 // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
304 if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
305 return if (eventLoop.isUnconfinedLoopActive) {
306 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
307 _state = contState
308 resumeMode = mode
309 eventLoop.dispatchUnconfined(this)
310 true // queued into the active loop
311 } else {
312 // Was not active -- run event loop until all unconfined tasks are executed
313 runUnconfinedEventLoop(eventLoop, block = block)
314 false
315 }
316 }
317