1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.internal
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11 import kotlin.native.concurrent.*
12
13 @SharedImmutable
14 private val UNDEFINED = Symbol("UNDEFINED")
15 @SharedImmutable
16 @JvmField
17 internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED")
18
19 internal class DispatchedContinuation<in T>(
20 @JvmField val dispatcher: CoroutineDispatcher,
21 @JvmField val continuation: Continuation<T>
22 ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
23 @JvmField
24 @Suppress("PropertyName")
25 internal var _state: Any? = UNDEFINED
26 override val callerFrame: CoroutineStackFrame? get() = continuation as? CoroutineStackFrame
27 override fun getStackTraceElement(): StackTraceElement? = null
28 @JvmField // pre-cached value to avoid ctx.fold on every resumption
29 internal val countOrElement = threadContextElements(context)
30
31 /**
32 * Possible states of reusability:
33 *
34 * 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
35 * was used and then invalidated (e.g. because of the cancellation).
36 * 2) [CancellableContinuation]. Continuation to be/that is being reused.
37 * 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
38 * ```
39 * // state == null | CC
40 * suspendCancellableCoroutineReusable { cont ->
41 * // state == REUSABLE_CLAIMED
42 * block(cont)
43 * }
44 * // state == CC
45 * ```
46 * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
47 * [CancellableContinuationImpl.getResult] will check for cancellation later.
48 *
49 * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
50 * In the `getResult`, we have the following code:
51 * ```
52 * if (trySuspend()) {
53 * // <- at this moment current continuation can be redispatched and claimed again.
54 * attachChildToParent()
55 * releaseClaimedContinuation()
56 * }
57 * ```
58 */
59 private val _reusableCancellableContinuation = atomic<Any?>(null)
60
61 private val reusableCancellableContinuation: CancellableContinuationImpl<*>?
62 get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
63
64 fun isReusable(): Boolean {
65 /*
66 Invariant: caller.resumeMode.isReusableMode
67 * Reusability control:
68 * `null` -> no reusability at all, `false`
69 * anything else -> reusable.
70 */
71 return _reusableCancellableContinuation.value != null
72 }
73
74 /**
75 * Awaits until previous call to `suspendCancellableCoroutineReusable` will
76 * stop mutating cached instance
77 */
78 fun awaitReusability() {
79 _reusableCancellableContinuation.loop {
80 if (it !== REUSABLE_CLAIMED) return
81 }
82 }
83
84 fun release() {
85 /*
86 * Called from `releaseInterceptedContinuation`, can be concurrent with
87 * the code in `getResult` right after `trySuspend` returned `true`, so we have
88 * to wait for a release here.
89 */
90 awaitReusability()
91 reusableCancellableContinuation?.detachChild()
92 }
93
94 /**
95 * Claims the continuation for [suspendCancellableCoroutineReusable] block,
96 * so all cancellations will be postponed.
97 */
98 @Suppress("UNCHECKED_CAST")
99 fun claimReusableCancellableContinuation(): CancellableContinuationImpl<T>? {
100 /*
101 * Transitions:
102 * 1) `null` -> claimed, caller will instantiate CC instance
103 * 2) `CC` -> claimed, caller will reuse CC instance
104 */
105 _reusableCancellableContinuation.loop { state ->
106 when {
107 state === null -> {
108 /*
109 * null -> CC was not yet published -> we do not compete with cancel
110 * -> can use plain store instead of CAS
111 */
112 _reusableCancellableContinuation.value = REUSABLE_CLAIMED
113 return null
114 }
115 // potentially competing with cancel
116 state is CancellableContinuationImpl<*> -> {
117 if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
118 return state as CancellableContinuationImpl<T>
119 }
120 }
121 state === REUSABLE_CLAIMED -> {
122 // Do nothing, wait until reusable instance will be returned from
123 // getResult() of a previous `suspendCancellableCoroutineReusable`
124 }
125 state is Throwable -> {
126 // Also do nothing, Throwable can only indicate that the CC
127 // is in REUSABLE_CLAIMED state, but with postponed cancellation
128 }
129 else -> error("Inconsistent state $state")
130 }
131 }
132 }
133
134 /**
135 * Checks whether there were any attempts to cancel reusable CC while it was in [REUSABLE_CLAIMED] state
136 * and returns cancellation cause if so, `null` otherwise.
137 * If continuation was cancelled, it becomes non-reusable.
138 *
139 * ```
140 * suspendCancellableCoroutineReusable { // <- claimed
141 * // Any asynchronous cancellation is "postponed" while this block
142 * // is being executed
143 * } // postponed cancellation is checked here in `getResult`
144 * ```
145 *
146 * See [CancellableContinuationImpl.getResult].
147 */
148 fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
149 _reusableCancellableContinuation.loop { state ->
150 // not when(state) to avoid Intrinsics.equals call
151 when {
152 state === REUSABLE_CLAIMED -> {
153 if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
154 }
155 state is Throwable -> {
156 require(_reusableCancellableContinuation.compareAndSet(state, null))
157 return state
158 }
159 else -> error("Inconsistent state $state")
160 }
161 }
162 }
163
164 /**
165 * Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state.
166 * Returns `true` if cancellation is (or previously was) postponed, `false` otherwise.
167 */
168 fun postponeCancellation(cause: Throwable): Boolean {
169 _reusableCancellableContinuation.loop { state ->
170 when (state) {
171 REUSABLE_CLAIMED -> {
172 if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cause))
173 return true
174 }
175 is Throwable -> return true
176 else -> {
177 // Invalidate
178 if (_reusableCancellableContinuation.compareAndSet(state, null))
179 return false
180 }
181 }
182 }
183 }
184
185 override fun takeState(): Any? {
186 val state = _state
187 assert { state !== UNDEFINED } // fail-fast if repeatedly invoked
188 _state = UNDEFINED
189 return state
190 }
191
192 override val delegate: Continuation<T>
193 get() = this
194
195 override fun resumeWith(result: Result<T>) {
196 val context = continuation.context
197 val state = result.toState()
198 if (dispatcher.isDispatchNeeded(context)) {
199 _state = state
200 resumeMode = MODE_ATOMIC
201 dispatcher.dispatch(context, this)
202 } else {
203 executeUnconfined(state, MODE_ATOMIC) {
204 withCoroutineContext(this.context, countOrElement) {
205 continuation.resumeWith(result)
206 }
207 }
208 }
209 }
210
211 // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
212 // It is used only in Continuation<T>.resumeCancellableWith
213 @Suppress("NOTHING_TO_INLINE")
214 inline fun resumeCancellableWith(
215 result: Result<T>,
216 noinline onCancellation: ((cause: Throwable) -> Unit)?
217 ) {
218 val state = result.toState(onCancellation)
219 if (dispatcher.isDispatchNeeded(context)) {
220 _state = state
221 resumeMode = MODE_CANCELLABLE
222 dispatcher.dispatch(context, this)
223 } else {
224 executeUnconfined(state, MODE_CANCELLABLE) {
225 if (!resumeCancelled(state)) {
226 resumeUndispatchedWith(result)
227 }
228 }
229 }
230 }
231
232 // takeState had already cleared the state so we cancel takenState here
233 override fun cancelCompletedResult(takenState: Any?, cause: Throwable) {
234 // It is Ok to call onCancellation here without try/catch around it, since this function only faces
235 // a "bound" cancellation handler that performs the safe call to the user-specified code.
236 if (takenState is CompletedWithCancellation) {
237 takenState.onCancellation(cause)
238 }
239 }
240
241 @Suppress("NOTHING_TO_INLINE")
242 inline fun resumeCancelled(state: Any?): Boolean {
243 val job = context[Job]
244 if (job != null && !job.isActive) {
245 val cause = job.getCancellationException()
246 cancelCompletedResult(state, cause)
247 resumeWithException(cause)
248 return true
249 }
250 return false
251 }
252
253 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
254 inline fun resumeUndispatchedWith(result: Result<T>) {
255 withContinuationContext(continuation, countOrElement) {
256 continuation.resumeWith(result)
257 }
258 }
259
260 // used by "yield" implementation
261 internal fun dispatchYield(context: CoroutineContext, value: T) {
262 _state = value
263 resumeMode = MODE_CANCELLABLE
264 dispatcher.dispatchYield(context, this)
265 }
266
267 override fun toString(): String =
268 "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
269 }
270
271 /**
272 * It is not inline to save bytecode (it is pretty big and used in many places)
273 * and we leave it public so that its name is not mangled in use stack traces if it shows there.
274 * It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher.
275 * @suppress **This an internal API and should not be used from general code.**
276 */
277 @InternalCoroutinesApi
resumeCancellableWithnull278 public fun <T> Continuation<T>.resumeCancellableWith(
279 result: Result<T>,
280 onCancellation: ((cause: Throwable) -> Unit)? = null
281 ): Unit = when (this) {
282 is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
283 else -> resumeWith(result)
284 }
285
yieldUndispatchednull286 internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
287 executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
288 run()
289 }
290
291 /**
292 * Executes given [block] as part of current event loop, updating current continuation
293 * mode and state if continuation is not resumed immediately.
294 * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
295 * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
296 */
executeUnconfinednull297 private inline fun DispatchedContinuation<*>.executeUnconfined(
298 contState: Any?, mode: Int, doYield: Boolean = false,
299 block: () -> Unit
300 ): Boolean {
301 assert { mode != MODE_UNINITIALIZED } // invalid execution mode
302 val eventLoop = ThreadLocalEventLoop.eventLoop
303 // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
304 if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
305 return if (eventLoop.isUnconfinedLoopActive) {
306 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
307 _state = contState
308 resumeMode = mode
309 eventLoop.dispatchUnconfined(this)
310 true // queued into the active loop
311 } else {
312 // Was not active -- run event loop until all unconfined tasks are executed
313 runUnconfinedEventLoop(eventLoop, block = block)
314 false
315 }
316 }
317