<lambda>null1 package kotlinx.coroutines.internal
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlin.coroutines.*
6 import kotlin.jvm.*
7
8 private val UNDEFINED = Symbol("UNDEFINED")
9 @JvmField
10 internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED")
11
12 internal class DispatchedContinuation<in T>(
13 @JvmField internal val dispatcher: CoroutineDispatcher,
14 @JvmField val continuation: Continuation<T>
15 ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
16 @JvmField
17 @Suppress("PropertyName")
18 internal var _state: Any? = UNDEFINED
19 override val callerFrame: CoroutineStackFrame? get() = continuation as? CoroutineStackFrame
20 override fun getStackTraceElement(): StackTraceElement? = null
21 @JvmField // pre-cached value to avoid ctx.fold on every resumption
22 internal val countOrElement = threadContextElements(context)
23
24 /**
25 * Possible states of reusability:
26 *
27 * 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
28 * was used and then invalidated (e.g. because of the cancellation).
29 * 2) [CancellableContinuation]. Continuation to be/that is being reused.
30 * 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
31 * ```
32 * // state == null | CC
33 * suspendCancellableCoroutineReusable { cont ->
34 * // state == REUSABLE_CLAIMED
35 * block(cont)
36 * }
37 * // state == CC
38 * ```
39 * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
40 * [CancellableContinuationImpl.getResult] will check for cancellation later.
41 *
42 * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
43 * In the `getResult`, we have the following code:
44 * ```
45 * if (trySuspend()) {
46 * // <- at this moment current continuation can be redispatched and claimed again.
47 * attachChildToParent()
48 * releaseClaimedContinuation()
49 * }
50 * ```
51 */
52 private val _reusableCancellableContinuation = atomic<Any?>(null)
53
54 private val reusableCancellableContinuation: CancellableContinuationImpl<*>?
55 get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
56
57 internal fun isReusable(): Boolean {
58 /*
59 Invariant: caller.resumeMode.isReusableMode
60 * Reusability control:
61 * `null` -> no reusability at all, `false`
62 * anything else -> reusable.
63 */
64 return _reusableCancellableContinuation.value != null
65 }
66
67 /**
68 * Awaits until previous call to `suspendCancellableCoroutineReusable` will
69 * stop mutating cached instance
70 */
71 internal fun awaitReusability() {
72 _reusableCancellableContinuation.loop {
73 if (it !== REUSABLE_CLAIMED) return
74 }
75 }
76
77 internal fun release() {
78 /*
79 * Called from `releaseInterceptedContinuation`, can be concurrent with
80 * the code in `getResult` right after `trySuspend` returned `true`, so we have
81 * to wait for a release here.
82 */
83 awaitReusability()
84 reusableCancellableContinuation?.detachChild()
85 }
86
87 /**
88 * Claims the continuation for [suspendCancellableCoroutineReusable] block,
89 * so all cancellations will be postponed.
90 */
91 @Suppress("UNCHECKED_CAST")
92 internal fun claimReusableCancellableContinuation(): CancellableContinuationImpl<T>? {
93 /*
94 * Transitions:
95 * 1) `null` -> claimed, caller will instantiate CC instance
96 * 2) `CC` -> claimed, caller will reuse CC instance
97 */
98 _reusableCancellableContinuation.loop { state ->
99 when {
100 state === null -> {
101 /*
102 * null -> CC was not yet published -> we do not compete with cancel
103 * -> can use plain store instead of CAS
104 */
105 _reusableCancellableContinuation.value = REUSABLE_CLAIMED
106 return null
107 }
108 // potentially competing with cancel
109 state is CancellableContinuationImpl<*> -> {
110 if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
111 return state as CancellableContinuationImpl<T>
112 }
113 }
114 state === REUSABLE_CLAIMED -> {
115 // Do nothing, wait until reusable instance will be returned from
116 // getResult() of a previous `suspendCancellableCoroutineReusable`
117 }
118 state is Throwable -> {
119 // Also do nothing, Throwable can only indicate that the CC
120 // is in REUSABLE_CLAIMED state, but with postponed cancellation
121 }
122 else -> error("Inconsistent state $state")
123 }
124 }
125 }
126
127 /**
128 * Checks whether there were any attempts to cancel reusable CC while it was in [REUSABLE_CLAIMED] state
129 * and returns cancellation cause if so, `null` otherwise.
130 * If continuation was cancelled, it becomes non-reusable.
131 *
132 * ```
133 * suspendCancellableCoroutineReusable { // <- claimed
134 * // Any asynchronous cancellation is "postponed" while this block
135 * // is being executed
136 * } // postponed cancellation is checked here in `getResult`
137 * ```
138 *
139 * See [CancellableContinuationImpl.getResult].
140 */
141 internal fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
142 _reusableCancellableContinuation.loop { state ->
143 // not when(state) to avoid Intrinsics.equals call
144 when {
145 state === REUSABLE_CLAIMED -> {
146 if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
147 }
148 state is Throwable -> {
149 require(_reusableCancellableContinuation.compareAndSet(state, null))
150 return state
151 }
152 else -> error("Inconsistent state $state")
153 }
154 }
155 }
156
157 /**
158 * Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state.
159 * Returns `true` if cancellation is (or previously was) postponed, `false` otherwise.
160 */
161 internal fun postponeCancellation(cause: Throwable): Boolean {
162 _reusableCancellableContinuation.loop { state ->
163 when (state) {
164 REUSABLE_CLAIMED -> {
165 if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cause))
166 return true
167 }
168 is Throwable -> return true
169 else -> {
170 // Invalidate
171 if (_reusableCancellableContinuation.compareAndSet(state, null))
172 return false
173 }
174 }
175 }
176 }
177
178 override fun takeState(): Any? {
179 val state = _state
180 assert { state !== UNDEFINED } // fail-fast if repeatedly invoked
181 _state = UNDEFINED
182 return state
183 }
184
185 override val delegate: Continuation<T>
186 get() = this
187
188 override fun resumeWith(result: Result<T>) {
189 val state = result.toState()
190 if (dispatcher.safeIsDispatchNeeded(context)) {
191 _state = state
192 resumeMode = MODE_ATOMIC
193 dispatcher.safeDispatch(context, this)
194 } else {
195 executeUnconfined(state, MODE_ATOMIC) {
196 withCoroutineContext(context, countOrElement) {
197 continuation.resumeWith(result)
198 }
199 }
200 }
201 }
202
203 // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
204 // It is used only in Continuation<T>.resumeCancellableWith
205 @Suppress("NOTHING_TO_INLINE")
206 internal inline fun resumeCancellableWith(result: Result<T>) {
207 val state = result.toState()
208 if (dispatcher.safeIsDispatchNeeded(context)) {
209 _state = state
210 resumeMode = MODE_CANCELLABLE
211 dispatcher.safeDispatch(context, this)
212 } else {
213 executeUnconfined(state, MODE_CANCELLABLE) {
214 if (!resumeCancelled(state)) {
215 resumeUndispatchedWith(result)
216 }
217 }
218 }
219 }
220
221 // inline here is to save us an entry on the stack for the sake of better stacktraces
222 @Suppress("NOTHING_TO_INLINE")
223 internal inline fun resumeCancelled(state: Any?): Boolean {
224 val job = context[Job]
225 if (job != null && !job.isActive) {
226 val cause = job.getCancellationException()
227 cancelCompletedResult(state, cause)
228 resumeWithException(cause)
229 return true
230 }
231 return false
232 }
233
234 @Suppress("NOTHING_TO_INLINE")
235 internal inline fun resumeUndispatchedWith(result: Result<T>) {
236 withContinuationContext(continuation, countOrElement) {
237 continuation.resumeWith(result)
238 }
239 }
240
241 // used by "yield" implementation
242 internal fun dispatchYield(context: CoroutineContext, value: T) {
243 _state = value
244 resumeMode = MODE_CANCELLABLE
245 dispatcher.dispatchYield(context, this)
246 }
247
248 override fun toString(): String =
249 "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
250 }
251
safeDispatchnull252 internal fun CoroutineDispatcher.safeDispatch(context: CoroutineContext, runnable: Runnable) {
253 try {
254 dispatch(context, runnable)
255 } catch (e: Throwable) {
256 throw DispatchException(e, this, context)
257 }
258 }
259
safeIsDispatchNeedednull260 internal fun CoroutineDispatcher.safeIsDispatchNeeded(context: CoroutineContext): Boolean {
261 try {
262 return isDispatchNeeded(context)
263 } catch (e: Throwable) {
264 throw DispatchException(e, this, context)
265 }
266 }
267
268 /**
269 * It is not inline to save bytecode (it is pretty big and used in many places)
270 * and we leave it public so that its name is not mangled in use stack traces if it shows there.
271 * It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher.
272 * @suppress **This an internal API and should not be used from general code.**
273 */
274 @InternalCoroutinesApi
resumeCancellableWithnull275 public fun <T> Continuation<T>.resumeCancellableWith(
276 result: Result<T>,
277 ): Unit = when (this) {
278 is DispatchedContinuation -> resumeCancellableWith(result)
279 else -> resumeWith(result)
280 }
281
yieldUndispatchednull282 internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
283 executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
284 run()
285 }
286
287 /**
288 * Executes given [block] as part of current event loop, updating current continuation
289 * mode and state if continuation is not resumed immediately.
290 * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
291 * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
292 */
executeUnconfinednull293 private inline fun DispatchedContinuation<*>.executeUnconfined(
294 contState: Any?, mode: Int, doYield: Boolean = false,
295 block: () -> Unit
296 ): Boolean {
297 assert { mode != MODE_UNINITIALIZED } // invalid execution mode
298 val eventLoop = ThreadLocalEventLoop.eventLoop
299 // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
300 if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
301 return if (eventLoop.isUnconfinedLoopActive) {
302 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
303 _state = contState
304 resumeMode = mode
305 eventLoop.dispatchUnconfined(this)
306 true // queued into the active loop
307 } else {
308 // Was not active -- run event loop until all unconfined tasks are executed
309 runUnconfinedEventLoop(eventLoop, block = block)
310 false
311 }
312 }
313