• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 @file:OptIn(ExperimentalContracts::class)
5 
6 package kotlinx.coroutines.selects
7 
8 import kotlinx.atomicfu.*
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.channels.*
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.intrinsics.*
13 import kotlinx.coroutines.sync.*
14 import kotlin.contracts.*
15 import kotlin.coroutines.*
16 import kotlin.coroutines.intrinsics.*
17 import kotlin.jvm.*
18 import kotlin.native.concurrent.*
19 import kotlin.time.*
20 
21 /**
22  * Scope for [select] invocation.
23  */
24 public interface SelectBuilder<in R> {
25     /**
26      * Registers a clause in this [select] expression without additional parameters that does not select any value.
27      */
28     public operator fun SelectClause0.invoke(block: suspend () -> R)
29 
30     /**
31      * Registers clause in this [select] expression without additional parameters that selects value of type [Q].
32      */
33     public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
34 
35     /**
36      * Registers clause in this [select] expression with additional parameter of type [P] that selects value of type [Q].
37      */
38     public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
39 
40     /**
41      * Registers clause in this [select] expression with additional nullable parameter of type [P]
42      * with the `null` value for this parameter that selects value of type [Q].
43      */
44     public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R): Unit = invoke(null, block)
45 
46     /**
47      * Clause that selects the given [block] after a specified timeout passes.
48      * If timeout is negative or zero, [block] is selected immediately.
49      *
50      * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future.
51      *
52      * @param timeMillis timeout time in milliseconds.
53      */
54     @ExperimentalCoroutinesApi
55     public fun onTimeout(timeMillis: Long, block: suspend () -> R)
56 }
57 
58 /**
59  * Clause that selects the given [block] after the specified [timeout] passes.
60  * If timeout is negative or zero, [block] is selected immediately.
61  *
62  * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future.
63  */
64 @ExperimentalCoroutinesApi
65 @ExperimentalTime
onTimeoutnull66 public fun <R> SelectBuilder<R>.onTimeout(timeout: Duration, block: suspend () -> R): Unit =
67         onTimeout(timeout.toDelayMillis(), block)
68 
69 /**
70  * Clause for [select] expression without additional parameters that does not select any value.
71  */
72 public interface SelectClause0 {
73     /**
74      * Registers this clause with the specified [select] instance and [block] of code.
75      * @suppress **This is unstable API and it is subject to change.**
76      */
77     @InternalCoroutinesApi
78     public fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R)
79 }
80 
81 /**
82  * Clause for [select] expression without additional parameters that selects value of type [Q].
83  */
84 public interface SelectClause1<out Q> {
85     /**
86      * Registers this clause with the specified [select] instance and [block] of code.
87      * @suppress **This is unstable API and it is subject to change.**
88      */
89     @InternalCoroutinesApi
registerSelectClause1null90     public fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (Q) -> R)
91 }
92 
93 /**
94  * Clause for [select] expression with additional parameter of type [P] that selects value of type [Q].
95  */
96 public interface SelectClause2<in P, out Q> {
97     /**
98      * Registers this clause with the specified [select] instance and [block] of code.
99      * @suppress **This is unstable API and it is subject to change.**
100      */
101     @InternalCoroutinesApi
102     public fun <R> registerSelectClause2(select: SelectInstance<R>, param: P, block: suspend (Q) -> R)
103 }
104 
105 /**
106  * Internal representation of select instance. This instance is called _selected_ when
107  * the clause to execute is already picked.
108  *
109  * @suppress **This is unstable API and it is subject to change.**
110  */
111 @InternalCoroutinesApi // todo: sealed interface https://youtrack.jetbrains.com/issue/KT-22286
112 public interface SelectInstance<in R> {
113     /**
114      * Returns `true` when this [select] statement had already picked a clause to execute.
115      */
116     public val isSelected: Boolean
117 
118     /**
119      * Tries to select this instance. Returns `true` on success.
120      */
trySelectnull121     public fun trySelect(): Boolean
122 
123     /**
124      * Tries to select this instance. Returns:
125      * * [RESUME_TOKEN] on success,
126      * * [RETRY_ATOMIC] on deadlock (needs retry, it is only possible when [otherOp] is not `null`)
127      * * `null` on failure to select (already selected).
128      * [otherOp] is not null when trying to rendezvous with this select from inside of another select.
129      * In this case, [PrepareOp.finishPrepare] must be called before deciding on any value other than [RETRY_ATOMIC].
130      *
131      * Note, that this method's actual return type is `Symbol?` but we cannot declare it as such, because this
132      * member is public, but [Symbol] is internal. When [SelectInstance] becomes a `sealed interface`
133      * (see KT-222860) we can declare this method as internal.
134      */
135     public fun trySelectOther(otherOp: PrepareOp?): Any?
136 
137     /**
138      * Performs action atomically with [trySelect].
139      * May return [RETRY_ATOMIC], caller shall retry with **fresh instance of desc**.
140      */
141     public fun performAtomicTrySelect(desc: AtomicDesc): Any?
142 
143     /**
144      * Returns completion continuation of this select instance.
145      * This select instance must be _selected_ first.
146      * All resumption through this instance happen _directly_ without going through dispatcher.
147      */
148     public val completion: Continuation<R>
149 
150     /**
151      * Resumes this instance in a dispatched way with exception.
152      * This method can be called from any context.
153      */
154     public fun resumeSelectWithException(exception: Throwable)
155 
156     /**
157      * Disposes the specified handle when this instance is selected.
158      * Note, that [DisposableHandle.dispose] could be called multiple times.
159      */
160     public fun disposeOnSelect(handle: DisposableHandle)
161 }
162 
163 /**
164  * Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
165  * in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
166  * is either _selected_ or _fails_.
167  *
168  * At most one clause is *atomically* selected and its block is executed. The result of the selected clause
169  * becomes the result of the select. If any clause _fails_, then the select invocation produces the
170  * corresponding exception. No clause is selected in this case.
171  *
172  * This select function is _biased_ to the first clause. When multiple clauses can be selected at the same time,
173  * the first one of them gets priority. Use [selectUnbiased] for an unbiased (randomized) selection among
174  * the clauses.
175 
176  * There is no `default` clause for select expression. Instead, each selectable suspending function has the
177  * corresponding non-suspending version that can be used with a regular `when` expression to select one
178  * of the alternatives or to perform the default (`else`) action if none of them can be immediately selected.
179  *
180  * | **Receiver**     | **Suspending function**                       | **Select clause**                                | **Non-suspending version**
181  * | ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
182  * | [Job]            | [join][Job.join]                              | [onJoin][Job.onJoin]                             | [isCompleted][Job.isCompleted]
183  * | [Deferred]       | [await][Deferred.await]                       | [onAwait][Deferred.onAwait]                      | [isCompleted][Job.isCompleted]
184  * | [SendChannel]    | [send][SendChannel.send]                      | [onSend][SendChannel.onSend]                     | [offer][SendChannel.offer]
185  * | [ReceiveChannel] | [receive][ReceiveChannel.receive]             | [onReceive][ReceiveChannel.onReceive]            | [poll][ReceiveChannel.poll]
186  * | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][ReceiveChannel.onReceiveOrNull]| [poll][ReceiveChannel.poll]
187  * | [Mutex]          | [lock][Mutex.lock]                            | [onLock][Mutex.onLock]                           | [tryLock][Mutex.tryLock]
188  * | none             | [delay]                                       | [onTimeout][SelectBuilder.onTimeout]             | none
189  *
190  * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
191  * function is suspended, this function immediately resumes with [CancellationException].
192  * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
193  * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
194  *
195  * Note that this function does not check for cancellation when it is not suspended.
196  * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
197  */
198 public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R {
199     contract {
200         callsInPlace(builder, InvocationKind.EXACTLY_ONCE)
201     }
202     return suspendCoroutineUninterceptedOrReturn { uCont ->
203         val scope = SelectBuilderImpl(uCont)
204         try {
205             builder(scope)
206         } catch (e: Throwable) {
207             scope.handleBuilderException(e)
208         }
209         scope.getResult()
210     }
211 }
212 
213 
214 @SharedImmutable
215 internal val NOT_SELECTED: Any = Symbol("NOT_SELECTED")
216 @SharedImmutable
217 internal val ALREADY_SELECTED: Any = Symbol("ALREADY_SELECTED")
218 @SharedImmutable
219 private val UNDECIDED: Any = Symbol("UNDECIDED")
220 @SharedImmutable
221 private val RESUMED: Any = Symbol("RESUMED")
222 
223 // Global counter of all atomic select operations for their deadlock resolution
224 // The separate internal class is work-around for Atomicfu's current implementation that creates public classes
225 // for static atomics
226 internal class SeqNumber {
227     private val number = atomic(1L)
nextnull228     fun next() = number.incrementAndGet()
229 }
230 
231 @SharedImmutable
232 private val selectOpSequenceNumber = SeqNumber()
233 
234 @PublishedApi
235 internal class SelectBuilderImpl<in R>(
236     private val uCont: Continuation<R> // unintercepted delegate continuation
237 ) : LockFreeLinkedListHead(), SelectBuilder<R>,
238     SelectInstance<R>, Continuation<R>, CoroutineStackFrame
239 {
240     override val callerFrame: CoroutineStackFrame?
241         get() = uCont as? CoroutineStackFrame
242 
243     override fun getStackTraceElement(): StackTraceElement? = null
244 
245     // selection state is NOT_SELECTED initially and is replaced by idempotent marker (or null) when selected
246     private val _state = atomic<Any?>(NOT_SELECTED)
247 
248     // this is basically our own SafeContinuation
249     private val _result = atomic<Any?>(UNDECIDED)
250 
251     // cancellability support
252     private val _parentHandle = atomic<DisposableHandle?>(null)
253     private var parentHandle: DisposableHandle?
254         get() = _parentHandle.value
255         set(value) { _parentHandle.value = value }
256 
257     /* Result state machine
258 
259         +-----------+   getResult   +---------------------+   resume   +---------+
260         | UNDECIDED | ------------> | COROUTINE_SUSPENDED | ---------> | RESUMED |
261         +-----------+               +---------------------+            +---------+
262               |
263               | resume
264               V
265         +------------+  getResult
266         | value/Fail | -----------+
267         +------------+            |
268               ^                   |
269               |                   |
270               +-------------------+
271      */
272 
273     override val context: CoroutineContext get() = uCont.context
274 
275     override val completion: Continuation<R> get() = this
276 
277     private inline fun doResume(value: () -> Any?, block: () -> Unit) {
278         assert { isSelected } // "Must be selected first"
279         _result.loop { result ->
280             when {
281                 result === UNDECIDED -> {
282                     val update = value()
283                     if (_result.compareAndSet(UNDECIDED, update)) return
284                 }
285                 result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED, RESUMED)) {
286                     block()
287                     return
288                 }
289                 else -> throw IllegalStateException("Already resumed")
290             }
291         }
292     }
293 
294     // Resumes in direct mode, without going through dispatcher. Should be called in the same context.
295     override fun resumeWith(result: Result<R>) {
296         doResume({ result.toState() }) {
297             if (result.isFailure) {
298                 uCont.resumeWithStackTrace(result.exceptionOrNull()!!)
299             } else {
300                 uCont.resumeWith(result)
301             }
302         }
303     }
304 
305     // Resumes in dispatched way so that it can be called from an arbitrary context
306     override fun resumeSelectWithException(exception: Throwable) {
307         doResume({ CompletedExceptionally(recoverStackTrace(exception, uCont)) }) {
308             uCont.intercepted().resumeWith(Result.failure(exception))
309         }
310     }
311 
312     @PublishedApi
313     internal fun getResult(): Any? {
314         if (!isSelected) initCancellability()
315         var result = _result.value // atomic read
316         if (result === UNDECIDED) {
317             if (_result.compareAndSet(UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
318             result = _result.value // reread volatile var
319         }
320         when {
321             result === RESUMED -> throw IllegalStateException("Already resumed")
322             result is CompletedExceptionally -> throw result.cause
323             else -> return result // either COROUTINE_SUSPENDED or data
324         }
325     }
326 
327     private fun initCancellability() {
328         val parent = context[Job] ?: return
329         val newRegistration = parent.invokeOnCompletion(
330             onCancelling = true, handler = SelectOnCancelling(parent).asHandler)
331         parentHandle = newRegistration
332         // now check our state _after_ registering
333         if (isSelected) newRegistration.dispose()
334     }
335 
336     private inner class SelectOnCancelling(job: Job) : JobCancellingNode<Job>(job) {
337         // Note: may be invoked multiple times, but only the first trySelect succeeds anyway
338         override fun invoke(cause: Throwable?) {
339             if (trySelect())
340                 resumeSelectWithException(job.getCancellationException())
341         }
342         override fun toString(): String = "SelectOnCancelling[${this@SelectBuilderImpl}]"
343     }
344 
345     @PublishedApi
346     internal fun handleBuilderException(e: Throwable) {
347         if (trySelect()) {
348             resumeWithException(e)
349         } else if (e !is CancellationException) {
350             /*
351              * Cannot handle this exception -- builder was already resumed with a different exception,
352              * so treat it as "unhandled exception". But only if  it is not the completion reason
353              *  and it's not the cancellation. Otherwise, in the face of structured concurrency
354              * the same exception will be reported to the global exception handler.
355              */
356             val result = getResult()
357             if (result !is CompletedExceptionally || unwrap(result.cause) !== unwrap(e)) {
358                 handleCoroutineException(context, e)
359             }
360         }
361     }
362 
363     override val isSelected: Boolean get() = _state.loop { state ->
364         when {
365             state === NOT_SELECTED -> return false
366             state is OpDescriptor -> state.perform(this) // help
367             else -> return true // already selected
368         }
369     }
370 
371     override fun disposeOnSelect(handle: DisposableHandle) {
372         val node = DisposeNode(handle)
373         // check-add-check pattern is Ok here since handle.dispose() is safe to be called multiple times
374         if (!isSelected) {
375             addLast(node) // add handle to list
376             // double-check node after adding
377             if (!isSelected) return // all ok - still not selected
378         }
379         // already selected
380         handle.dispose()
381     }
382 
383     private fun doAfterSelect() {
384         parentHandle?.dispose()
385         forEach<DisposeNode> {
386             it.handle.dispose()
387         }
388     }
389 
390     override fun trySelect(): Boolean {
391         val result = trySelectOther(null)
392         return when {
393             result === RESUME_TOKEN -> true
394             result == null -> false
395             else -> error("Unexpected trySelectIdempotent result $result")
396         }
397     }
398 
399     /*
400        Diagram for rendezvous between two select operations:
401 
402        +---------+         +------------------------+ state(c)
403        | Channel |         |  SelectBuilderImpl(1)  | -----------------------------------+
404        +---------+         +------------------------+                                    |
405             | queue                   ^                                                  |
406             V                         | select                                           |
407        +---------+  next   +------------------------+  next   +--------------+           |
408        | LLHead  | ------> |  Send/ReceiveSelect(3) | -+----> | NextNode ... |           |
409        +---------+         +------------------------+  |      +--------------+           |
410             ^                              ^           | next(b)     ^                   |
411             |                     affected |           V             |                   |
412             |                          +-----------------+  next     |                   V
413             |                          | PrepareOp(6)    | ----------+           +-----------------+
414             |                          +-----------------+ <-------------------- | PairSelectOp(7) |
415             |                                 | desc                             +-----------------+
416             |                                 V
417             |                  queue   +----------------------+
418             +------------------------- | TryPoll/OfferDesc(5) |
419                                        +----------------------+
420                                      atomicOp |    ^
421                                               V    | desc
422        +----------------------+  impl  +---------------------+
423        | SelectBuilderImpl(2) | <----- |  AtomicSelectOp(4)  |
424        +----------------------+        +---------------------+
425                     | state(a)                   ^
426                     |                            |
427                     +----------------------------+
428 
429 
430        0. The first select operation SelectBuilderImpl(1) had already registered Send/ReceiveSelect(3) node
431           in the channel.
432        1. The second select operation SelectBuilderImpl(2) is trying to rendezvous calling
433           performAtomicTrySelect(TryPoll/TryOfferDesc).
434        2. A linked pair of AtomicSelectOp(4) and TryPoll/OfferDesc(5) is created to initiate this operation.
435        3. AtomicSelectOp.prepareSelectOp installs a reference to AtomicSelectOp(4) in SelectBuilderImpl(2).state(a)
436           field. STARTING AT THIS MOMENT CONCURRENT HELPERS CAN DISCOVER AND TRY TO HELP PERFORM THIS OPERATION.
437        4. Then TryPoll/OfferDesc.prepare discovers "affectedNode" for this operation as Send/ReceiveSelect(3) and
438           creates PrepareOp(6) that references it. It installs reference to PrepareOp(6) in Send/ReceiveSelect(3).next(b)
439           instead of its original next pointer that was stored in PrepareOp(6).next.
440        5. PrepareOp(6).perform calls TryPoll/OfferDesc(5).onPrepare which validates that PrepareOp(6).affected node
441           is of the correct type and tries to secure ability to resume it by calling affected.tryResumeSend/Receive.
442           Note, that different PrepareOp instances can be repeatedly created for different candidate nodes. If node is
443           found to be be resumed/selected, then REMOVE_PREPARED result causes Send/ReceiveSelect(3).next change to
444           undone and new PrepareOp is created with a different candidate node. Different concurrent helpers may end up
445           creating different PrepareOp instances, so it is important that they ultimately come to consensus about
446           node on which perform operation upon.
447        6. Send/ReceiveSelect(3).affected.tryResumeSend/Receive forwards this call to SelectBuilderImpl.trySelectOther,
448           passing it a reference to PrepareOp(6) as an indication of the other select instance rendezvous.
449        7. SelectBuilderImpl.trySelectOther creates PairSelectOp(7) and installs it as SelectBuilderImpl(1).state(c)
450           to secure the state of the first builder and commit ability to make it selected for this operation.
451        8. NOW THE RENDEZVOUS IS FULLY PREPARED via descriptors installed at
452           - SelectBuilderImpl(2).state(a)
453           - Send/ReceiveSelect(3).next(b)
454           - SelectBuilderImpl(1).state(c)
455           Any concurrent operation that is trying to access any of the select instances or the queue is going to help.
456           Any helper that helps AtomicSelectOp(4) calls TryPoll/OfferDesc(5).prepare which tries to determine
457           "affectedNode" but is bound to discover the same Send/ReceiveSelect(3) node that cannot become
458           non-first node until this operation completes (there are no insertions to the head of the queue!)
459           We have not yet decided to complete this operation, but we cannot ever decide to complete this operation
460           on any other node but Send/ReceiveSelect(3), so it is now safe to perform the next step.
461        9. PairSelectOp(7).perform calls PrepareOp(6).finishPrepare which copies PrepareOp(6).affected and PrepareOp(6).next
462           to the corresponding TryPoll/OfferDesc(5) fields.
463        10. PairSelectOp(7).perform calls AtomicSelect(4).decide to reach consensus on successful completion of this
464           operation. This consensus is important in light of dead-lock resolution algorithm, because a stale helper
465           could have stumbled upon a higher-numbered atomic operation and had decided to abort this atomic operation,
466           reaching decision on RETRY_ATOMIC status of it. We cannot proceed with completion in this case and must abort,
467           all objects including AtomicSelectOp(4) will be dropped, reverting all the three updated pointers to
468           their original values and atomic operation will retry from scratch.
469        11. NOW WITH SUCCESSFUL UPDATE OF AtomicSelectOp(4).consensus to null THE RENDEZVOUS IS COMMITTED. The rest
470            of the code proceeds to update:
471            - SelectBuilderImpl(1).state to TryPoll/OfferDesc(5) so that late helpers would know that we have
472              already successfully completed rendezvous.
473            - Send/ReceiveSelect(3).next to Removed(next) so that this node becomes marked as removed.
474            - SelectBuilderImpl(2).state to null to mark this select instance as selected.
475 
476        Note, that very late helper may try to perform this AtomicSelectOp(4) when it is already completed.
477        It can proceed as far as finding affected node, creating PrepareOp, installing this new PrepareOp into the
478        node's next pointer, but PrepareOp.perform checks that AtomicSelectOp(4) is already decided and undoes all
479        the preparations.
480      */
481 
482     // it is just like plain trySelect, but support idempotent start
483     // Returns RESUME_TOKEN | RETRY_ATOMIC | null (when already selected)
484     override fun trySelectOther(otherOp: PrepareOp?): Any? {
485         _state.loop { state -> // lock-free loop on state
486             when {
487                 // Found initial state (not selected yet) -- try to make it selected
488                 state === NOT_SELECTED -> {
489                     if (otherOp == null) {
490                         // regular trySelect -- just mark as select
491                         if (!_state.compareAndSet(NOT_SELECTED, null)) return@loop
492                     } else {
493                         // Rendezvous with another select instance -- install PairSelectOp
494                         val pairSelectOp = PairSelectOp(otherOp)
495                         if (!_state.compareAndSet(NOT_SELECTED, pairSelectOp)) return@loop
496                         val decision = pairSelectOp.perform(this)
497                         if (decision !== null) return decision
498                     }
499                     doAfterSelect()
500                     return RESUME_TOKEN
501                 }
502                 state is OpDescriptor -> { // state is either AtomicSelectOp or PairSelectOp
503                     // Found descriptor of ongoing operation while working in the context of other select operation
504                     if (otherOp != null) {
505                         val otherAtomicOp = otherOp.atomicOp
506                         when {
507                             // It is the same select instance
508                             otherAtomicOp is AtomicSelectOp && otherAtomicOp.impl === this -> {
509                                 /*
510                                  * We cannot do state.perform(this) here and "help" it since it is the same
511                                  * select and we'll get StackOverflowError.
512                                  * See https://github.com/Kotlin/kotlinx.coroutines/issues/1411
513                                  * We cannot support this because select { ... } is an expression and its clauses
514                                  * have a result that shall be returned from the select.
515                                  */
516                                 error("Cannot use matching select clauses on the same object")
517                             }
518                             // The other select (that is trying to proceed) had started earlier
519                             otherAtomicOp.isEarlierThan(state) -> {
520                                 /**
521                                  * Abort to prevent deadlock by returning a failure to it.
522                                  * See https://github.com/Kotlin/kotlinx.coroutines/issues/504
523                                  * The other select operation will receive a failure and will restart itself with a
524                                  * larger sequence number. This guarantees obstruction-freedom of this algorithm.
525                                  */
526                                 return RETRY_ATOMIC
527                             }
528                         }
529                     }
530                     // Otherwise (not a special descriptor)
531                     state.perform(this) // help it
532                 }
533                 // otherwise -- already selected
534                 otherOp == null -> return null // already selected
535                 state === otherOp.desc -> return RESUME_TOKEN // was selected with this marker
536                 else -> return null // selected with different marker
537             }
538         }
539     }
540 
541     // The very last step of rendezvous between two select operations
542     private class PairSelectOp(
543         @JvmField val otherOp: PrepareOp
544     ) : OpDescriptor() {
545         override fun perform(affected: Any?): Any? {
546             val impl = affected as SelectBuilderImpl<*>
547             // here we are definitely not going to RETRY_ATOMIC, so
548             // we must finish preparation of another operation before attempting to reach decision to select
549             otherOp.finishPrepare()
550             val decision = otherOp.atomicOp.decide(null) // try decide for success of operation
551             val update: Any = if (decision == null) otherOp.desc else NOT_SELECTED
552             impl._state.compareAndSet(this, update)
553             return decision
554         }
555 
556         override val atomicOp: AtomicOp<*>?
557             get() = otherOp.atomicOp
558     }
559 
560     override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
561         AtomicSelectOp(this, desc).perform(null)
562 
563     override fun toString(): String = "SelectInstance(state=${_state.value}, result=${_result.value})"
564 
565     private class AtomicSelectOp(
566         @JvmField val impl: SelectBuilderImpl<*>,
567         @JvmField val desc: AtomicDesc
568     ) : AtomicOp<Any?>() {
569         // all select operations are totally ordered by their creating time using selectOpSequenceNumber
570         override val opSequence = selectOpSequenceNumber.next()
571 
572         init {
573             desc.atomicOp = this
574         }
575 
576         override fun prepare(affected: Any?): Any? {
577             // only originator of operation makes preparation move of installing descriptor into this selector's state
578             // helpers should never do it, or risk ruining progress when they come late
579             if (affected == null) {
580                 // we are originator (affected reference is not null if helping)
581                 prepareSelectOp()?.let { return it }
582             }
583             try {
584                 return desc.prepare(this)
585             } catch (e: Throwable) {
586                 // undo prepareSelectedOp on crash (for example if IllegalStateException is thrown)
587                 if (affected == null) undoPrepare()
588                 throw e
589             }
590         }
591 
592         override fun complete(affected: Any?, failure: Any?) {
593             completeSelect(failure)
594             desc.complete(this, failure)
595         }
596 
597         private fun prepareSelectOp(): Any? {
598             impl._state.loop { state ->
599                 when {
600                     state === this -> return null // already in progress
601                     state is OpDescriptor -> state.perform(impl) // help
602                     state === NOT_SELECTED -> {
603                         if (impl._state.compareAndSet(NOT_SELECTED, this))
604                             return null // success
605                     }
606                     else -> return ALREADY_SELECTED
607                 }
608             }
609         }
610 
611         // reverts the change done by prepareSelectedOp
612         private fun undoPrepare() {
613             impl._state.compareAndSet(this, NOT_SELECTED)
614         }
615 
616         private fun completeSelect(failure: Any?) {
617             val selectSuccess = failure == null
618             val update = if (selectSuccess) null else NOT_SELECTED
619             if (impl._state.compareAndSet(this, update)) {
620                 if (selectSuccess)
621                     impl.doAfterSelect()
622             }
623         }
624 
625         override fun toString(): String = "AtomicSelectOp(sequence=$opSequence)"
626     }
627 
628     override fun SelectClause0.invoke(block: suspend () -> R) {
629         registerSelectClause0(this@SelectBuilderImpl, block)
630     }
631 
632     override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
633         registerSelectClause1(this@SelectBuilderImpl, block)
634     }
635 
636     override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {
637         registerSelectClause2(this@SelectBuilderImpl, param, block)
638     }
639 
640     override fun onTimeout(timeMillis: Long, block: suspend () -> R) {
641         if (timeMillis <= 0L) {
642             if (trySelect())
643                 block.startCoroutineUnintercepted(completion)
644             return
645         }
646         val action = Runnable {
647             // todo: we could have replaced startCoroutine with startCoroutineUndispatched
648             // But we need a way to know that Delay.invokeOnTimeout had used the right thread
649             if (trySelect())
650                 block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
651         }
652         disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action, context))
653     }
654 
655     private class DisposeNode(
656         @JvmField val handle: DisposableHandle
657     ) : LockFreeLinkedListNode()
658 }
659