• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.internal
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11 import kotlin.native.concurrent.*
12 
13 @SharedImmutable
14 private val UNDEFINED = Symbol("UNDEFINED")
15 @SharedImmutable
16 @JvmField
17 internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED")
18 
19 internal class DispatchedContinuation<in T>(
20     @JvmField val dispatcher: CoroutineDispatcher,
21     @JvmField val continuation: Continuation<T>
22 ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
23     @JvmField
24     @Suppress("PropertyName")
25     internal var _state: Any? = UNDEFINED
26     override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame
27     override fun getStackTraceElement(): StackTraceElement? = null
28     @JvmField // pre-cached value to avoid ctx.fold on every resumption
29     internal val countOrElement = threadContextElements(context)
30 
31     /**
32      * Possible states of reusability:
33      *
34      * 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
35      *     was used and then invalidated (e.g. because of the cancellation).
36      * 2) [CancellableContinuation]. Continuation to be/that is being reused.
37      * 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
38      *    ```
39      *    // state == null | CC
40      *    suspendCancellableCoroutineReusable { cont ->
41      *        // state == REUSABLE_CLAIMED
42      *        block(cont)
43      *    }
44      *    // state == CC
45      *    ```
46      * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
47      *    [CancellableContinuationImpl.getResult] will check for cancellation later.
48      *
49      * [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel.
50      * AbstractChannel.receive method relies on the fact that the following pattern
51      * ```
52      * suspendCancellableCoroutineReusable { cont ->
53      *     val result = pollFastPath()
54      *     if (result != null) cont.resume(result)
55      * }
56      * ```
57      * always succeeds.
58      * To make it always successful, we actually postpone "reusable" cancellation
59      * to this phase and set cancellation only at the moment of instantiation.
60      */
61     private val _reusableCancellableContinuation = atomic<Any?>(null)
62 
63     public val reusableCancellableContinuation: CancellableContinuationImpl<*>?
64         get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
65 
66     public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
67         /*
68          * Reusability control:
69          * `null` -> no reusability at all, false
70          * If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true
71          * Else, if result is CCI === requester.
72          * Identity check my fail for the following pattern:
73          * ```
74          * loop:
75          * suspendCancellableCoroutineReusable { } // Reusable, outer coroutine stores the child handle
76          * suspendCancellableCoroutine { } // **Not reusable**, handle should be disposed after {}, otherwise
77          * it will leak because it won't be freed by `releaseInterceptedContinuation`
78          * ```
79          */
80         val value = _reusableCancellableContinuation.value ?: return false
81         if (value is CancellableContinuationImpl<*>) return value === requester
82         return true
83     }
84 
85     /**
86      * Claims the continuation for [suspendCancellableCoroutineReusable] block,
87      * so all cancellations will be postponed.
88      */
89     @Suppress("UNCHECKED_CAST")
90     fun claimReusableCancellableContinuation(): CancellableContinuationImpl<T>? {
91         /*
92          * Transitions:
93          * 1) `null` -> claimed, caller will instantiate CC instance
94          * 2) `CC` -> claimed, caller will reuse CC instance
95          */
96         _reusableCancellableContinuation.loop { state ->
97             when {
98                 state === null -> {
99                     /*
100                      * null -> CC was not yet published -> we do not compete with cancel
101                      * -> can use plain store instead of CAS
102                      */
103                     _reusableCancellableContinuation.value = REUSABLE_CLAIMED
104                     return null
105                 }
106                 state is CancellableContinuationImpl<*> -> {
107                     if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
108                         return state as CancellableContinuationImpl<T>
109                     }
110                 }
111                 else -> error("Inconsistent state $state")
112             }
113         }
114     }
115 
116     /**
117      * Checks whether there were any attempts to cancel reusable CC while it was in [REUSABLE_CLAIMED] state
118      * and returns cancellation cause if so, `null` otherwise.
119      * If continuation was cancelled, it becomes non-reusable.
120      *
121      * ```
122      * suspendCancellableCoroutineReusable { // <- claimed
123      * // Any asynchronous cancellation is "postponed" while this block
124      * // is being executed
125      * } // postponed cancellation is checked here in `getResult`
126      * ```
127      *
128      * See [CancellableContinuationImpl.getResult].
129      */
130     fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? {
131         _reusableCancellableContinuation.loop { state ->
132             // not when(state) to avoid Intrinsics.equals call
133             when {
134                 state === REUSABLE_CLAIMED -> {
135                     if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
136                 }
137                 state === null -> return null
138                 state is Throwable -> {
139                     require(_reusableCancellableContinuation.compareAndSet(state, null))
140                     return state
141                 }
142                 else -> error("Inconsistent state $state")
143             }
144         }
145     }
146 
147     /**
148      * Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state.
149      * Returns `true` if cancellation is (or previously was) postponed, `false` otherwise.
150      */
151     fun postponeCancellation(cause: Throwable): Boolean {
152         _reusableCancellableContinuation.loop { state ->
153             when (state) {
154                 REUSABLE_CLAIMED -> {
155                     if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cause))
156                         return true
157                 }
158                 is Throwable -> return true
159                 else -> {
160                     // Invalidate
161                     if (_reusableCancellableContinuation.compareAndSet(state, null))
162                         return false
163                 }
164             }
165         }
166     }
167 
168     override fun takeState(): Any? {
169         val state = _state
170         assert { state !== UNDEFINED } // fail-fast if repeatedly invoked
171         _state = UNDEFINED
172         return state
173     }
174 
175     override val delegate: Continuation<T>
176         get() = this
177 
178     override fun resumeWith(result: Result<T>) {
179         val context = continuation.context
180         val state = result.toState()
181         if (dispatcher.isDispatchNeeded(context)) {
182             _state = state
183             resumeMode = MODE_ATOMIC
184             dispatcher.dispatch(context, this)
185         } else {
186             executeUnconfined(state, MODE_ATOMIC) {
187                 withCoroutineContext(this.context, countOrElement) {
188                     continuation.resumeWith(result)
189                 }
190             }
191         }
192     }
193 
194     // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
195     // It is used only in Continuation<T>.resumeCancellableWith
196     @Suppress("NOTHING_TO_INLINE")
197     inline fun resumeCancellableWith(
198         result: Result<T>,
199         noinline onCancellation: ((cause: Throwable) -> Unit)?
200     ) {
201         val state = result.toState(onCancellation)
202         if (dispatcher.isDispatchNeeded(context)) {
203             _state = state
204             resumeMode = MODE_CANCELLABLE
205             dispatcher.dispatch(context, this)
206         } else {
207             executeUnconfined(state, MODE_CANCELLABLE) {
208                 if (!resumeCancelled(state)) {
209                     resumeUndispatchedWith(result)
210                 }
211             }
212         }
213     }
214 
215     // takeState had already cleared the state so we cancel takenState here
216     override fun cancelCompletedResult(takenState: Any?, cause: Throwable) {
217         // It is Ok to call onCancellation here without try/catch around it, since this function only faces
218         // a "bound" cancellation handler that performs the safe call to the user-specified code.
219         if (takenState is CompletedWithCancellation) {
220             takenState.onCancellation(cause)
221         }
222     }
223 
224     @Suppress("NOTHING_TO_INLINE")
225     inline fun resumeCancelled(state: Any?): Boolean {
226         val job = context[Job]
227         if (job != null && !job.isActive) {
228             val cause = job.getCancellationException()
229             cancelCompletedResult(state, cause)
230             resumeWithException(cause)
231             return true
232         }
233         return false
234     }
235 
236     @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
237     inline fun resumeUndispatchedWith(result: Result<T>) {
238         withCoroutineContext(context, countOrElement) {
239             continuation.resumeWith(result)
240         }
241     }
242 
243     // used by "yield" implementation
244     internal fun dispatchYield(context: CoroutineContext, value: T) {
245         _state = value
246         resumeMode = MODE_CANCELLABLE
247         dispatcher.dispatchYield(context, this)
248     }
249 
250     override fun toString(): String =
251         "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
252 }
253 
254 /**
255  * It is not inline to save bytecode (it is pretty big and used in many places)
256  * and we leave it public so that its name is not mangled in use stack traces if it shows there.
257  * It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher.
258  * @suppress **This an internal API and should not be used from general code.**
259  */
260 @InternalCoroutinesApi
resumeCancellableWithnull261 public fun <T> Continuation<T>.resumeCancellableWith(
262     result: Result<T>,
263     onCancellation: ((cause: Throwable) -> Unit)? = null
264 ): Unit = when (this) {
265     is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
266     else -> resumeWith(result)
267 }
268 
yieldUndispatchednull269 internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
270     executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
271         run()
272     }
273 
274 /**
275  * Executes given [block] as part of current event loop, updating current continuation
276  * mode and state if continuation is not resumed immediately.
277  * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
278  * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
279  */
executeUnconfinednull280 private inline fun DispatchedContinuation<*>.executeUnconfined(
281     contState: Any?, mode: Int, doYield: Boolean = false,
282     block: () -> Unit
283 ): Boolean {
284     assert { mode != MODE_UNINITIALIZED } // invalid execution mode
285     val eventLoop = ThreadLocalEventLoop.eventLoop
286     // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
287     if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
288     return if (eventLoop.isUnconfinedLoopActive) {
289         // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
290         _state = contState
291         resumeMode = mode
292         eventLoop.dispatchUnconfined(this)
293         true // queued into the active loop
294     } else {
295         // Was not active -- run event loop until all unconfined tasks are executed
296         runUnconfinedEventLoop(eventLoop, block = block)
297         false
298     }
299 }
300