1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4 @file:OptIn(ExperimentalContracts::class)
5
6 package kotlinx.coroutines.selects
7
8 import kotlinx.atomicfu.*
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.channels.*
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.intrinsics.*
13 import kotlinx.coroutines.sync.*
14 import kotlin.contracts.*
15 import kotlin.coroutines.*
16 import kotlin.coroutines.intrinsics.*
17 import kotlin.jvm.*
18 import kotlin.native.concurrent.*
19 import kotlin.time.*
20
21 /**
22 * Scope for [select] invocation.
23 */
24 public interface SelectBuilder<in R> {
25 /**
26 * Registers a clause in this [select] expression without additional parameters that does not select any value.
27 */
28 public operator fun SelectClause0.invoke(block: suspend () -> R)
29
30 /**
31 * Registers clause in this [select] expression without additional parameters that selects value of type [Q].
32 */
33 public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
34
35 /**
36 * Registers clause in this [select] expression with additional parameter of type [P] that selects value of type [Q].
37 */
38 public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
39
40 /**
41 * Registers clause in this [select] expression with additional nullable parameter of type [P]
42 * with the `null` value for this parameter that selects value of type [Q].
43 */
44 public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R): Unit = invoke(null, block)
45
46 /**
47 * Clause that selects the given [block] after a specified timeout passes.
48 * If timeout is negative or zero, [block] is selected immediately.
49 *
50 * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future.
51 *
52 * @param timeMillis timeout time in milliseconds.
53 */
54 @ExperimentalCoroutinesApi
55 public fun onTimeout(timeMillis: Long, block: suspend () -> R)
56 }
57
58 /**
59 * Clause that selects the given [block] after the specified [timeout] passes.
60 * If timeout is negative or zero, [block] is selected immediately.
61 *
62 * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future.
63 */
64 @ExperimentalCoroutinesApi
onTimeoutnull65 public fun <R> SelectBuilder<R>.onTimeout(timeout: Duration, block: suspend () -> R): Unit =
66 onTimeout(timeout.toDelayMillis(), block)
67
68 /**
69 * Clause for [select] expression without additional parameters that does not select any value.
70 */
71 public interface SelectClause0 {
72 /**
73 * Registers this clause with the specified [select] instance and [block] of code.
74 * @suppress **This is unstable API and it is subject to change.**
75 */
76 @InternalCoroutinesApi
77 public fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R)
78 }
79
80 /**
81 * Clause for [select] expression without additional parameters that selects value of type [Q].
82 */
83 public interface SelectClause1<out Q> {
84 /**
85 * Registers this clause with the specified [select] instance and [block] of code.
86 * @suppress **This is unstable API and it is subject to change.**
87 */
88 @InternalCoroutinesApi
registerSelectClause1null89 public fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (Q) -> R)
90 }
91
92 /**
93 * Clause for [select] expression with additional parameter of type [P] that selects value of type [Q].
94 */
95 public interface SelectClause2<in P, out Q> {
96 /**
97 * Registers this clause with the specified [select] instance and [block] of code.
98 * @suppress **This is unstable API and it is subject to change.**
99 */
100 @InternalCoroutinesApi
101 public fun <R> registerSelectClause2(select: SelectInstance<R>, param: P, block: suspend (Q) -> R)
102 }
103
104 /**
105 * Internal representation of select instance. This instance is called _selected_ when
106 * the clause to execute is already picked.
107 *
108 * @suppress **This is unstable API and it is subject to change.**
109 */
110 @InternalCoroutinesApi // todo: sealed interface https://youtrack.jetbrains.com/issue/KT-22286
111 public interface SelectInstance<in R> {
112 /**
113 * Returns `true` when this [select] statement had already picked a clause to execute.
114 */
115 public val isSelected: Boolean
116
117 /**
118 * Tries to select this instance. Returns `true` on success.
119 */
trySelectnull120 public fun trySelect(): Boolean
121
122 /**
123 * Tries to select this instance. Returns:
124 * * [RESUME_TOKEN] on success,
125 * * [RETRY_ATOMIC] on deadlock (needs retry, it is only possible when [otherOp] is not `null`)
126 * * `null` on failure to select (already selected).
127 * [otherOp] is not null when trying to rendezvous with this select from inside of another select.
128 * In this case, [PrepareOp.finishPrepare] must be called before deciding on any value other than [RETRY_ATOMIC].
129 *
130 * Note, that this method's actual return type is `Symbol?` but we cannot declare it as such, because this
131 * member is public, but [Symbol] is internal. When [SelectInstance] becomes a `sealed interface`
132 * (see KT-222860) we can declare this method as internal.
133 */
134 public fun trySelectOther(otherOp: PrepareOp?): Any?
135
136 /**
137 * Performs action atomically with [trySelect].
138 * May return [RETRY_ATOMIC], caller shall retry with **fresh instance of desc**.
139 */
140 public fun performAtomicTrySelect(desc: AtomicDesc): Any?
141
142 /**
143 * Returns completion continuation of this select instance.
144 * This select instance must be _selected_ first.
145 * All resumption through this instance happen _directly_ without going through dispatcher.
146 */
147 public val completion: Continuation<R>
148
149 /**
150 * Resumes this instance in a dispatched way with exception.
151 * This method can be called from any context.
152 */
153 public fun resumeSelectWithException(exception: Throwable)
154
155 /**
156 * Disposes the specified handle when this instance is selected.
157 * Note, that [DisposableHandle.dispose] could be called multiple times.
158 */
159 public fun disposeOnSelect(handle: DisposableHandle)
160 }
161
162 /**
163 * Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
164 * in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
165 * is either _selected_ or _fails_.
166 *
167 * At most one clause is *atomically* selected and its block is executed. The result of the selected clause
168 * becomes the result of the select. If any clause _fails_, then the select invocation produces the
169 * corresponding exception. No clause is selected in this case.
170 *
171 * This select function is _biased_ to the first clause. When multiple clauses can be selected at the same time,
172 * the first one of them gets priority. Use [selectUnbiased] for an unbiased (randomized) selection among
173 * the clauses.
174
175 * There is no `default` clause for select expression. Instead, each selectable suspending function has the
176 * corresponding non-suspending version that can be used with a regular `when` expression to select one
177 * of the alternatives or to perform the default (`else`) action if none of them can be immediately selected.
178 *
179 * ### List of supported select methods
180 *
181 * | **Receiver** | **Suspending function** | **Select clause**
182 * | ---------------- | --------------------------------------------- | -----------------------------------------------------
183 * | [Job] | [join][Job.join] | [onJoin][Job.onJoin]
184 * | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait]
185 * | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend]
186 * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive]
187 * | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching]
188 * | none | [delay] | [onTimeout][SelectBuilder.onTimeout]
189 *
190 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
191 * function is suspended, this function immediately resumes with [CancellationException].
192 * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
193 * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
194 *
195 * Note that this function does not check for cancellation when it is not suspended.
196 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
197 */
198 public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R {
199 contract {
200 callsInPlace(builder, InvocationKind.EXACTLY_ONCE)
201 }
202 return suspendCoroutineUninterceptedOrReturn { uCont ->
203 val scope = SelectBuilderImpl(uCont)
204 try {
205 builder(scope)
206 } catch (e: Throwable) {
207 scope.handleBuilderException(e)
208 }
209 scope.getResult()
210 }
211 }
212
213
214 @SharedImmutable
215 internal val NOT_SELECTED: Any = Symbol("NOT_SELECTED")
216 @SharedImmutable
217 internal val ALREADY_SELECTED: Any = Symbol("ALREADY_SELECTED")
218 @SharedImmutable
219 private val UNDECIDED: Any = Symbol("UNDECIDED")
220 @SharedImmutable
221 private val RESUMED: Any = Symbol("RESUMED")
222
223 // Global counter of all atomic select operations for their deadlock resolution
224 // The separate internal class is work-around for Atomicfu's current implementation that creates public classes
225 // for static atomics
226 internal class SeqNumber {
227 private val number = atomic(1L)
nextnull228 fun next() = number.incrementAndGet()
229 }
230
231 @SharedImmutable
232 private val selectOpSequenceNumber = SeqNumber()
233
234 @PublishedApi
235 internal class SelectBuilderImpl<in R>(
236 private val uCont: Continuation<R> // unintercepted delegate continuation
237 ) : LockFreeLinkedListHead(), SelectBuilder<R>,
238 SelectInstance<R>, Continuation<R>, CoroutineStackFrame
239 {
240 override val callerFrame: CoroutineStackFrame?
241 get() = uCont as? CoroutineStackFrame
242
243 override fun getStackTraceElement(): StackTraceElement? = null
244
245 // selection state is NOT_SELECTED initially and is replaced by idempotent marker (or null) when selected
246 private val _state = atomic<Any?>(NOT_SELECTED)
247
248 // this is basically our own SafeContinuation
249 private val _result = atomic<Any?>(UNDECIDED)
250
251 // cancellability support
252 private val _parentHandle = atomic<DisposableHandle?>(null)
253 private var parentHandle: DisposableHandle?
254 get() = _parentHandle.value
255 set(value) { _parentHandle.value = value }
256
257 /* Result state machine
258
259 +-----------+ getResult +---------------------+ resume +---------+
260 | UNDECIDED | ------------> | COROUTINE_SUSPENDED | ---------> | RESUMED |
261 +-----------+ +---------------------+ +---------+
262 |
263 | resume
264 V
265 +------------+ getResult
266 | value/Fail | -----------+
267 +------------+ |
268 ^ |
269 | |
270 +-------------------+
271 */
272
273 override val context: CoroutineContext get() = uCont.context
274
275 override val completion: Continuation<R> get() = this
276
277 private inline fun doResume(value: () -> Any?, block: () -> Unit) {
278 assert { isSelected } // "Must be selected first"
279 _result.loop { result ->
280 when {
281 result === UNDECIDED -> {
282 val update = value()
283 if (_result.compareAndSet(UNDECIDED, update)) return
284 }
285 result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED, RESUMED)) {
286 block()
287 return
288 }
289 else -> throw IllegalStateException("Already resumed")
290 }
291 }
292 }
293
294 // Resumes in direct mode, without going through dispatcher. Should be called in the same context.
295 override fun resumeWith(result: Result<R>) {
296 doResume({ result.toState() }) {
297 if (result.isFailure) {
298 uCont.resumeWithStackTrace(result.exceptionOrNull()!!)
299 } else {
300 uCont.resumeWith(result)
301 }
302 }
303 }
304
305 // Resumes in dispatched way so that it can be called from an arbitrary context
306 override fun resumeSelectWithException(exception: Throwable) {
307 doResume({ CompletedExceptionally(recoverStackTrace(exception, uCont)) }) {
308 uCont.intercepted().resumeWith(Result.failure(exception))
309 }
310 }
311
312 @PublishedApi
313 internal fun getResult(): Any? {
314 if (!isSelected) initCancellability()
315 var result = _result.value // atomic read
316 if (result === UNDECIDED) {
317 if (_result.compareAndSet(UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
318 result = _result.value // reread volatile var
319 }
320 when {
321 result === RESUMED -> throw IllegalStateException("Already resumed")
322 result is CompletedExceptionally -> throw result.cause
323 else -> return result // either COROUTINE_SUSPENDED or data
324 }
325 }
326
327 private fun initCancellability() {
328 val parent = context[Job] ?: return
329 val newRegistration = parent.invokeOnCompletion(
330 onCancelling = true, handler = SelectOnCancelling().asHandler)
331 parentHandle = newRegistration
332 // now check our state _after_ registering
333 if (isSelected) newRegistration.dispose()
334 }
335
336 private inner class SelectOnCancelling : JobCancellingNode() {
337 // Note: may be invoked multiple times, but only the first trySelect succeeds anyway
338 override fun invoke(cause: Throwable?) {
339 if (trySelect())
340 resumeSelectWithException(job.getCancellationException())
341 }
342 }
343
344 @PublishedApi
345 internal fun handleBuilderException(e: Throwable) {
346 if (trySelect()) {
347 resumeWithException(e)
348 } else if (e !is CancellationException) {
349 /*
350 * Cannot handle this exception -- builder was already resumed with a different exception,
351 * so treat it as "unhandled exception". But only if it is not the completion reason
352 * and it's not the cancellation. Otherwise, in the face of structured concurrency
353 * the same exception will be reported to the global exception handler.
354 */
355 val result = getResult()
356 if (result !is CompletedExceptionally || unwrap(result.cause) !== unwrap(e)) {
357 handleCoroutineException(context, e)
358 }
359 }
360 }
361
362 override val isSelected: Boolean get() = _state.loop { state ->
363 when {
364 state === NOT_SELECTED -> return false
365 state is OpDescriptor -> state.perform(this) // help
366 else -> return true // already selected
367 }
368 }
369
370 override fun disposeOnSelect(handle: DisposableHandle) {
371 val node = DisposeNode(handle)
372 // check-add-check pattern is Ok here since handle.dispose() is safe to be called multiple times
373 if (!isSelected) {
374 addLast(node) // add handle to list
375 // double-check node after adding
376 if (!isSelected) return // all ok - still not selected
377 }
378 // already selected
379 handle.dispose()
380 }
381
382 private fun doAfterSelect() {
383 parentHandle?.dispose()
384 forEach<DisposeNode> {
385 it.handle.dispose()
386 }
387 }
388
389 override fun trySelect(): Boolean {
390 val result = trySelectOther(null)
391 return when {
392 result === RESUME_TOKEN -> true
393 result == null -> false
394 else -> error("Unexpected trySelectIdempotent result $result")
395 }
396 }
397
398 /*
399 Diagram for rendezvous between two select operations:
400
401 +---------+ +------------------------+ state(c)
402 | Channel | | SelectBuilderImpl(1) | -----------------------------------+
403 +---------+ +------------------------+ |
404 | queue ^ |
405 V | select |
406 +---------+ next +------------------------+ next +--------------+ |
407 | LLHead | ------> | Send/ReceiveSelect(3) | -+----> | NextNode ... | |
408 +---------+ +------------------------+ | +--------------+ |
409 ^ ^ | next(b) ^ |
410 | affected | V | |
411 | +-----------------+ next | V
412 | | PrepareOp(6) | ----------+ +-----------------+
413 | +-----------------+ <-------------------- | PairSelectOp(7) |
414 | | desc +-----------------+
415 | V
416 | queue +----------------------+
417 +------------------------- | TryPoll/OfferDesc(5) |
418 +----------------------+
419 atomicOp | ^
420 V | desc
421 +----------------------+ impl +---------------------+
422 | SelectBuilderImpl(2) | <----- | AtomicSelectOp(4) |
423 +----------------------+ +---------------------+
424 | state(a) ^
425 | |
426 +----------------------------+
427
428
429 0. The first select operation SelectBuilderImpl(1) had already registered Send/ReceiveSelect(3) node
430 in the channel.
431 1. The second select operation SelectBuilderImpl(2) is trying to rendezvous calling
432 performAtomicTrySelect(TryPoll/TryOfferDesc).
433 2. A linked pair of AtomicSelectOp(4) and TryPoll/OfferDesc(5) is created to initiate this operation.
434 3. AtomicSelectOp.prepareSelectOp installs a reference to AtomicSelectOp(4) in SelectBuilderImpl(2).state(a)
435 field. STARTING AT THIS MOMENT CONCURRENT HELPERS CAN DISCOVER AND TRY TO HELP PERFORM THIS OPERATION.
436 4. Then TryPoll/OfferDesc.prepare discovers "affectedNode" for this operation as Send/ReceiveSelect(3) and
437 creates PrepareOp(6) that references it. It installs reference to PrepareOp(6) in Send/ReceiveSelect(3).next(b)
438 instead of its original next pointer that was stored in PrepareOp(6).next.
439 5. PrepareOp(6).perform calls TryPoll/OfferDesc(5).onPrepare which validates that PrepareOp(6).affected node
440 is of the correct type and tries to secure ability to resume it by calling affected.tryResumeSend/Receive.
441 Note, that different PrepareOp instances can be repeatedly created for different candidate nodes. If node is
442 found to be be resumed/selected, then REMOVE_PREPARED result causes Send/ReceiveSelect(3).next change to
443 undone and new PrepareOp is created with a different candidate node. Different concurrent helpers may end up
444 creating different PrepareOp instances, so it is important that they ultimately come to consensus about
445 node on which perform operation upon.
446 6. Send/ReceiveSelect(3).affected.tryResumeSend/Receive forwards this call to SelectBuilderImpl.trySelectOther,
447 passing it a reference to PrepareOp(6) as an indication of the other select instance rendezvous.
448 7. SelectBuilderImpl.trySelectOther creates PairSelectOp(7) and installs it as SelectBuilderImpl(1).state(c)
449 to secure the state of the first builder and commit ability to make it selected for this operation.
450 8. NOW THE RENDEZVOUS IS FULLY PREPARED via descriptors installed at
451 - SelectBuilderImpl(2).state(a)
452 - Send/ReceiveSelect(3).next(b)
453 - SelectBuilderImpl(1).state(c)
454 Any concurrent operation that is trying to access any of the select instances or the queue is going to help.
455 Any helper that helps AtomicSelectOp(4) calls TryPoll/OfferDesc(5).prepare which tries to determine
456 "affectedNode" but is bound to discover the same Send/ReceiveSelect(3) node that cannot become
457 non-first node until this operation completes (there are no insertions to the head of the queue!)
458 We have not yet decided to complete this operation, but we cannot ever decide to complete this operation
459 on any other node but Send/ReceiveSelect(3), so it is now safe to perform the next step.
460 9. PairSelectOp(7).perform calls PrepareOp(6).finishPrepare which copies PrepareOp(6).affected and PrepareOp(6).next
461 to the corresponding TryPoll/OfferDesc(5) fields.
462 10. PairSelectOp(7).perform calls AtomicSelect(4).decide to reach consensus on successful completion of this
463 operation. This consensus is important in light of dead-lock resolution algorithm, because a stale helper
464 could have stumbled upon a higher-numbered atomic operation and had decided to abort this atomic operation,
465 reaching decision on RETRY_ATOMIC status of it. We cannot proceed with completion in this case and must abort,
466 all objects including AtomicSelectOp(4) will be dropped, reverting all the three updated pointers to
467 their original values and atomic operation will retry from scratch.
468 11. NOW WITH SUCCESSFUL UPDATE OF AtomicSelectOp(4).consensus to null THE RENDEZVOUS IS COMMITTED. The rest
469 of the code proceeds to update:
470 - SelectBuilderImpl(1).state to TryPoll/OfferDesc(5) so that late helpers would know that we have
471 already successfully completed rendezvous.
472 - Send/ReceiveSelect(3).next to Removed(next) so that this node becomes marked as removed.
473 - SelectBuilderImpl(2).state to null to mark this select instance as selected.
474
475 Note, that very late helper may try to perform this AtomicSelectOp(4) when it is already completed.
476 It can proceed as far as finding affected node, creating PrepareOp, installing this new PrepareOp into the
477 node's next pointer, but PrepareOp.perform checks that AtomicSelectOp(4) is already decided and undoes all
478 the preparations.
479 */
480
481 // it is just like plain trySelect, but support idempotent start
482 // Returns RESUME_TOKEN | RETRY_ATOMIC | null (when already selected)
483 override fun trySelectOther(otherOp: PrepareOp?): Any? {
484 _state.loop { state -> // lock-free loop on state
485 when {
486 // Found initial state (not selected yet) -- try to make it selected
487 state === NOT_SELECTED -> {
488 if (otherOp == null) {
489 // regular trySelect -- just mark as select
490 if (!_state.compareAndSet(NOT_SELECTED, null)) return@loop
491 } else {
492 // Rendezvous with another select instance -- install PairSelectOp
493 val pairSelectOp = PairSelectOp(otherOp)
494 if (!_state.compareAndSet(NOT_SELECTED, pairSelectOp)) return@loop
495 val decision = pairSelectOp.perform(this)
496 if (decision !== null) return decision
497 }
498 doAfterSelect()
499 return RESUME_TOKEN
500 }
501 state is OpDescriptor -> { // state is either AtomicSelectOp or PairSelectOp
502 // Found descriptor of ongoing operation while working in the context of other select operation
503 if (otherOp != null) {
504 val otherAtomicOp = otherOp.atomicOp
505 when {
506 // It is the same select instance
507 otherAtomicOp is AtomicSelectOp && otherAtomicOp.impl === this -> {
508 /*
509 * We cannot do state.perform(this) here and "help" it since it is the same
510 * select and we'll get StackOverflowError.
511 * See https://github.com/Kotlin/kotlinx.coroutines/issues/1411
512 * We cannot support this because select { ... } is an expression and its clauses
513 * have a result that shall be returned from the select.
514 */
515 error("Cannot use matching select clauses on the same object")
516 }
517 // The other select (that is trying to proceed) had started earlier
518 otherAtomicOp.isEarlierThan(state) -> {
519 /**
520 * Abort to prevent deadlock by returning a failure to it.
521 * See https://github.com/Kotlin/kotlinx.coroutines/issues/504
522 * The other select operation will receive a failure and will restart itself with a
523 * larger sequence number. This guarantees obstruction-freedom of this algorithm.
524 */
525 return RETRY_ATOMIC
526 }
527 }
528 }
529 // Otherwise (not a special descriptor)
530 state.perform(this) // help it
531 }
532 // otherwise -- already selected
533 otherOp == null -> return null // already selected
534 state === otherOp.desc -> return RESUME_TOKEN // was selected with this marker
535 else -> return null // selected with different marker
536 }
537 }
538 }
539
540 // The very last step of rendezvous between two select operations
541 private class PairSelectOp(
542 @JvmField val otherOp: PrepareOp
543 ) : OpDescriptor() {
544 override fun perform(affected: Any?): Any? {
545 val impl = affected as SelectBuilderImpl<*>
546 // here we are definitely not going to RETRY_ATOMIC, so
547 // we must finish preparation of another operation before attempting to reach decision to select
548 otherOp.finishPrepare()
549 val decision = otherOp.atomicOp.decide(null) // try decide for success of operation
550 val update: Any = if (decision == null) otherOp.desc else NOT_SELECTED
551 impl._state.compareAndSet(this, update)
552 return decision
553 }
554
555 override val atomicOp: AtomicOp<*>
556 get() = otherOp.atomicOp
557 }
558
559 override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
560 AtomicSelectOp(this, desc).perform(null)
561
562 override fun toString(): String = "SelectInstance(state=${_state.value}, result=${_result.value})"
563
564 private class AtomicSelectOp(
565 @JvmField val impl: SelectBuilderImpl<*>,
566 @JvmField val desc: AtomicDesc
567 ) : AtomicOp<Any?>() {
568 // all select operations are totally ordered by their creating time using selectOpSequenceNumber
569 override val opSequence = selectOpSequenceNumber.next()
570
571 init {
572 desc.atomicOp = this
573 }
574
575 override fun prepare(affected: Any?): Any? {
576 // only originator of operation makes preparation move of installing descriptor into this selector's state
577 // helpers should never do it, or risk ruining progress when they come late
578 if (affected == null) {
579 // we are originator (affected reference is not null if helping)
580 prepareSelectOp()?.let { return it }
581 }
582 try {
583 return desc.prepare(this)
584 } catch (e: Throwable) {
585 // undo prepareSelectedOp on crash (for example if IllegalStateException is thrown)
586 if (affected == null) undoPrepare()
587 throw e
588 }
589 }
590
591 override fun complete(affected: Any?, failure: Any?) {
592 completeSelect(failure)
593 desc.complete(this, failure)
594 }
595
596 private fun prepareSelectOp(): Any? {
597 impl._state.loop { state ->
598 when {
599 state === this -> return null // already in progress
600 state is OpDescriptor -> state.perform(impl) // help
601 state === NOT_SELECTED -> {
602 if (impl._state.compareAndSet(NOT_SELECTED, this))
603 return null // success
604 }
605 else -> return ALREADY_SELECTED
606 }
607 }
608 }
609
610 // reverts the change done by prepareSelectedOp
611 private fun undoPrepare() {
612 impl._state.compareAndSet(this, NOT_SELECTED)
613 }
614
615 private fun completeSelect(failure: Any?) {
616 val selectSuccess = failure == null
617 val update = if (selectSuccess) null else NOT_SELECTED
618 if (impl._state.compareAndSet(this, update)) {
619 if (selectSuccess)
620 impl.doAfterSelect()
621 }
622 }
623
624 override fun toString(): String = "AtomicSelectOp(sequence=$opSequence)"
625 }
626
627 override fun SelectClause0.invoke(block: suspend () -> R) {
628 registerSelectClause0(this@SelectBuilderImpl, block)
629 }
630
631 override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
632 registerSelectClause1(this@SelectBuilderImpl, block)
633 }
634
635 override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {
636 registerSelectClause2(this@SelectBuilderImpl, param, block)
637 }
638
639 override fun onTimeout(timeMillis: Long, block: suspend () -> R) {
640 if (timeMillis <= 0L) {
641 if (trySelect())
642 block.startCoroutineUnintercepted(completion)
643 return
644 }
645 val action = Runnable {
646 // todo: we could have replaced startCoroutine with startCoroutineUndispatched
647 // But we need a way to know that Delay.invokeOnTimeout had used the right thread
648 if (trySelect())
649 block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
650 }
651 disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action, context))
652 }
653
654 private class DisposeNode(
655 @JvmField val handle: DisposableHandle
656 ) : LockFreeLinkedListNode()
657 }
658