1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.internal
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11 import kotlin.native.concurrent.*
12
13 @SharedImmutable
14 private val UNDEFINED = Symbol("UNDEFINED")
15 @SharedImmutable
16 @JvmField
17 internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED")
18
19 internal class DispatchedContinuation<in T>(
20 @JvmField val dispatcher: CoroutineDispatcher,
21 @JvmField val continuation: Continuation<T>
22 ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
23 @JvmField
24 @Suppress("PropertyName")
25 internal var _state: Any? = UNDEFINED
26 override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame
27 override fun getStackTraceElement(): StackTraceElement? = null
28 @JvmField // pre-cached value to avoid ctx.fold on every resumption
29 internal val countOrElement = threadContextElements(context)
30
31 /**
32 * Possible states of reusability:
33 *
34 * 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
35 * was used and then invalidated (e.g. because of the cancellation).
36 * 2) [CancellableContinuation]. Continuation to be/that is being reused.
37 * 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
38 * ```
39 * // state == null | CC
40 * suspendCancellableCoroutineReusable { cont ->
41 * // state == REUSABLE_CLAIMED
42 * block(cont)
43 * }
44 * // state == CC
45 * ```
46 * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
47 * [CancellableContinuationImpl.getResult] will check for cancellation later.
48 *
49 * [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel.
50 * AbstractChannel.receive method relies on the fact that the following pattern
51 * ```
52 * suspendCancellableCoroutineReusable { cont ->
53 * val result = pollFastPath()
54 * if (result != null) cont.resume(result)
55 * }
56 * ```
57 * always succeeds.
58 * To make it always successful, we actually postpone "reusable" cancellation
59 * to this phase and set cancellation only at the moment of instantiation.
60 */
61 private val _reusableCancellableContinuation = atomic<Any?>(null)
62
63 public val reusableCancellableContinuation: CancellableContinuationImpl<*>?
64 get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
65
66 public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
67 /*
68 * Reusability control:
69 * `null` -> no reusability at all, false
70 * If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true
71 * Else, if result is CCI === requester.
72 * Identity check my fail for the following pattern:
73 * ```
74 * loop:
75 * suspendCancellableCoroutineReusable { } // Reusable, outer coroutine stores the child handle
76 * suspendCancellableCoroutine { } // **Not reusable**, handle should be disposed after {}, otherwise
77 * it will leak because it won't be freed by `releaseInterceptedContinuation`
78 * ```
79 */
80 val value = _reusableCancellableContinuation.value ?: return false
81 if (value is CancellableContinuationImpl<*>) return value === requester
82 return true
83 }
84
85 /**
86 * Claims the continuation for [suspendCancellableCoroutineReusable] block,
87 * so all cancellations will be postponed.
88 */
89 @Suppress("UNCHECKED_CAST")
90 fun claimReusableCancellableContinuation(): CancellableContinuationImpl<T>? {
91 /*
92 * Transitions:
93 * 1) `null` -> claimed, caller will instantiate CC instance
94 * 2) `CC` -> claimed, caller will reuse CC instance
95 */
96 _reusableCancellableContinuation.loop { state ->
97 when {
98 state === null -> {
99 /*
100 * null -> CC was not yet published -> we do not compete with cancel
101 * -> can use plain store instead of CAS
102 */
103 _reusableCancellableContinuation.value = REUSABLE_CLAIMED
104 return null
105 }
106 state is CancellableContinuationImpl<*> -> {
107 if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
108 return state as CancellableContinuationImpl<T>
109 }
110 }
111 else -> error("Inconsistent state $state")
112 }
113 }
114 }
115
116 /**
117 * Checks whether there were any attempts to cancel reusable CC while it was in [REUSABLE_CLAIMED] state
118 * and returns cancellation cause if so, `null` otherwise.
119 * If continuation was cancelled, it becomes non-reusable.
120 *
121 * ```
122 * suspendCancellableCoroutineReusable { // <- claimed
123 * // Any asynchronous cancellation is "postponed" while this block
124 * // is being executed
125 * } // postponed cancellation is checked here in `getResult`
126 * ```
127 *
128 * See [CancellableContinuationImpl.getResult].
129 */
130 fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? {
131 _reusableCancellableContinuation.loop { state ->
132 // not when(state) to avoid Intrinsics.equals call
133 when {
134 state === REUSABLE_CLAIMED -> {
135 if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
136 }
137 state === null -> return null
138 state is Throwable -> {
139 require(_reusableCancellableContinuation.compareAndSet(state, null))
140 return state
141 }
142 else -> error("Inconsistent state $state")
143 }
144 }
145 }
146
147 /**
148 * Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state.
149 * Returns `true` if cancellation is (or previously was) postponed, `false` otherwise.
150 */
151 fun postponeCancellation(cause: Throwable): Boolean {
152 _reusableCancellableContinuation.loop { state ->
153 when (state) {
154 REUSABLE_CLAIMED -> {
155 if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cause))
156 return true
157 }
158 is Throwable -> return true
159 else -> {
160 // Invalidate
161 if (_reusableCancellableContinuation.compareAndSet(state, null))
162 return false
163 }
164 }
165 }
166 }
167
168 override fun takeState(): Any? {
169 val state = _state
170 assert { state !== UNDEFINED } // fail-fast if repeatedly invoked
171 _state = UNDEFINED
172 return state
173 }
174
175 override val delegate: Continuation<T>
176 get() = this
177
178 override fun resumeWith(result: Result<T>) {
179 val context = continuation.context
180 val state = result.toState()
181 if (dispatcher.isDispatchNeeded(context)) {
182 _state = state
183 resumeMode = MODE_ATOMIC
184 dispatcher.dispatch(context, this)
185 } else {
186 executeUnconfined(state, MODE_ATOMIC) {
187 withCoroutineContext(this.context, countOrElement) {
188 continuation.resumeWith(result)
189 }
190 }
191 }
192 }
193
194 // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
195 // It is used only in Continuation<T>.resumeCancellableWith
196 @Suppress("NOTHING_TO_INLINE")
197 inline fun resumeCancellableWith(
198 result: Result<T>,
199 noinline onCancellation: ((cause: Throwable) -> Unit)?
200 ) {
201 val state = result.toState(onCancellation)
202 if (dispatcher.isDispatchNeeded(context)) {
203 _state = state
204 resumeMode = MODE_CANCELLABLE
205 dispatcher.dispatch(context, this)
206 } else {
207 executeUnconfined(state, MODE_CANCELLABLE) {
208 if (!resumeCancelled(state)) {
209 resumeUndispatchedWith(result)
210 }
211 }
212 }
213 }
214
215 // takeState had already cleared the state so we cancel takenState here
216 override fun cancelCompletedResult(takenState: Any?, cause: Throwable) {
217 // It is Ok to call onCancellation here without try/catch around it, since this function only faces
218 // a "bound" cancellation handler that performs the safe call to the user-specified code.
219 if (takenState is CompletedWithCancellation) {
220 takenState.onCancellation(cause)
221 }
222 }
223
224 @Suppress("NOTHING_TO_INLINE")
225 inline fun resumeCancelled(state: Any?): Boolean {
226 val job = context[Job]
227 if (job != null && !job.isActive) {
228 val cause = job.getCancellationException()
229 cancelCompletedResult(state, cause)
230 resumeWithException(cause)
231 return true
232 }
233 return false
234 }
235
236 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
237 inline fun resumeUndispatchedWith(result: Result<T>) {
238 withCoroutineContext(context, countOrElement) {
239 continuation.resumeWith(result)
240 }
241 }
242
243 // used by "yield" implementation
244 internal fun dispatchYield(context: CoroutineContext, value: T) {
245 _state = value
246 resumeMode = MODE_CANCELLABLE
247 dispatcher.dispatchYield(context, this)
248 }
249
250 override fun toString(): String =
251 "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
252 }
253
254 /**
255 * It is not inline to save bytecode (it is pretty big and used in many places)
256 * and we leave it public so that its name is not mangled in use stack traces if it shows there.
257 * It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher.
258 * @suppress **This an internal API and should not be used from general code.**
259 */
260 @InternalCoroutinesApi
resumeCancellableWithnull261 public fun <T> Continuation<T>.resumeCancellableWith(
262 result: Result<T>,
263 onCancellation: ((cause: Throwable) -> Unit)? = null
264 ): Unit = when (this) {
265 is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
266 else -> resumeWith(result)
267 }
268
yieldUndispatchednull269 internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
270 executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
271 run()
272 }
273
274 /**
275 * Executes given [block] as part of current event loop, updating current continuation
276 * mode and state if continuation is not resumed immediately.
277 * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
278 * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
279 */
executeUnconfinednull280 private inline fun DispatchedContinuation<*>.executeUnconfined(
281 contState: Any?, mode: Int, doYield: Boolean = false,
282 block: () -> Unit
283 ): Boolean {
284 assert { mode != MODE_UNINITIALIZED } // invalid execution mode
285 val eventLoop = ThreadLocalEventLoop.eventLoop
286 // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
287 if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
288 return if (eventLoop.isUnconfinedLoopActive) {
289 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
290 _state = contState
291 resumeMode = mode
292 eventLoop.dispatchUnconfined(this)
293 true // queued into the active loop
294 } else {
295 // Was not active -- run event loop until all unconfined tasks are executed
296 runUnconfinedEventLoop(eventLoop, block = block)
297 false
298 }
299 }
300