• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.selects
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.internal.*
7 import kotlinx.coroutines.selects.TrySelectDetailedResult.*
8 import kotlin.contracts.*
9 import kotlin.coroutines.*
10 import kotlin.internal.*
11 import kotlin.jvm.*
12 
13 /**
14  * Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
15  * in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
16  * is either _selected_ or _fails_.
17  *
18  * At most one clause is *atomically* selected and its block is executed. The result of the selected clause
19  * becomes the result of the select. If any clause _fails_, then the select invocation produces the
20  * corresponding exception. No clause is selected in this case.
21  *
22  * This select function is _biased_ to the first clause. When multiple clauses can be selected at the same time,
23  * the first one of them gets priority. Use [selectUnbiased] for an unbiased (randomized) selection among
24  * the clauses.
25 
26  * There is no `default` clause for select expression. Instead, each selectable suspending function has the
27  * corresponding non-suspending version that can be used with a regular `when` expression to select one
28  * of the alternatives or to perform the default (`else`) action if none of them can be immediately selected.
29  *
30  * ### List of supported select methods
31  *
32  * | **Receiver**     | **Suspending function**                           | **Select clause**
33  * | ---------------- | ---------------------------------------------     | -----------------------------------------------------
34  * | [Job]            | [join][Job.join]                                  | [onJoin][Job.onJoin]
35  * | [Deferred]       | [await][Deferred.await]                           | [onAwait][Deferred.onAwait]
36  * | [SendChannel]    | [send][SendChannel.send]                          | [onSend][SendChannel.onSend]
37  * | [ReceiveChannel] | [receive][ReceiveChannel.receive]                 | [onReceive][ReceiveChannel.onReceive]
38  * | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching]
39  * | none             | [delay]                                           | [onTimeout][SelectBuilder.onTimeout]
40  *
41  * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
42  * suspending function is waiting, this function immediately resumes with [CancellationException].
43  * There is a **prompt cancellation guarantee**: even if this function is ready to return the result, but was cancelled
44  * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
45  *
46  * Note that this function does not check for cancellation when it is not suspended.
47  * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
48  */
49 @OptIn(ExperimentalContracts::class)
50 public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R {
51     contract {
52         callsInPlace(builder, InvocationKind.EXACTLY_ONCE)
53     }
54     return SelectImplementation<R>(coroutineContext).run {
55         builder(this)
56         // TAIL-CALL OPTIMIZATION: the only
57         // suspend call is at the last position.
58         doSelect()
59     }
60 }
61 
62 /**
63  * Scope for [select] invocation.
64  *
65  * An instance of [SelectBuilder] can only be retrieved as a receiver of a [select] block call,
66  * and it is only valid during the registration phase of the select builder.
67  * Any uses outside it lead to unspecified behaviour and are prohibited.
68  *
69  * The general rule of thumb is that instances of this type should always be used
70  * implicitly and there shouldn't be any signatures mentioning this type,
71  * whether explicitly (e.g. function signature) or implicitly (e.g. inferred `val` type).
72  */
73 public sealed interface SelectBuilder<in R> {
74     /**
75      * Registers a clause in this [select] expression without additional parameters that does not select any value.
76      */
invokenull77     public operator fun SelectClause0.invoke(block: suspend () -> R)
78 
79     /**
80      * Registers clause in this [select] expression without additional parameters that selects value of type [Q].
81      */
82     public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
83 
84     /**
85      * Registers clause in this [select] expression with additional parameter of type [P] that selects value of type [Q].
86      */
87     public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
88 
89     /**
90      * Registers clause in this [select] expression with additional nullable parameter of type [P]
91      * with the `null` value for this parameter that selects value of type [Q].
92      */
93     public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R): Unit = invoke(null, block)
94 
95     /**
96      * Clause that selects the given [block] after a specified timeout passes.
97      * If timeout is negative or zero, [block] is selected immediately.
98      *
99      * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future.
100      *
101      * @param timeMillis timeout time in milliseconds.
102      */
103     @ExperimentalCoroutinesApi
104     @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
105     @LowPriorityInOverloadResolution
106     @Deprecated(
107         message = "Replaced with the same extension function",
108         level = DeprecationLevel.ERROR,
109         replaceWith = ReplaceWith(expression = "onTimeout", imports = ["kotlinx.coroutines.selects.onTimeout"])
110     ) // Since 1.7.0, was experimental
111     public fun onTimeout(timeMillis: Long, block: suspend () -> R): Unit = onTimeout(timeMillis, block)
112 }
113 
114 /**
115  * Each [select] clause is specified with:
116  * 1) the [object of this clause][clauseObject],
117  *    such as the channel instance for [SendChannel.onSend];
118  * 2) the function that specifies how this clause
119  *    should be registered in the object above;
120  * 3) the function that modifies the internal result
121  *    (passed via [SelectInstance.trySelect] or
122  *    [SelectInstance.selectInRegistrationPhase])
123  *    to the argument of the user-specified block.
124  * 4) the function that specifies how the internal result provided via
125  *    [SelectInstance.trySelect] or [SelectInstance.selectInRegistrationPhase]
126  *    should be processed in case of this `select` cancellation while dispatching.
127  *
128  * @suppress **This is unstable API, and it is subject to change.**
129  */
130 @InternalCoroutinesApi
131 public sealed interface SelectClause {
132     public val clauseObject: Any
133     public val regFunc: RegistrationFunction
134     public val processResFunc: ProcessResultFunction
135     public val onCancellationConstructor: OnCancellationConstructor?
136 }
137 
138 /**
139  * The registration function specifies how the `select` instance should be registered into
140  * the specified clause object. In case of channels, the registration logic
141  * coincides with the plain `send/receive` operation with the only difference that
142  * the `select` instance is stored as a waiter instead of continuation.
143  *
144  * @suppress **This is unstable API, and it is subject to change.**
145  */
146 @InternalCoroutinesApi
147 public typealias RegistrationFunction = (clauseObject: Any, select: SelectInstance<*>, param: Any?) -> Unit
148 
149 /**
150  * This function specifies how the _internal_ result, provided via [SelectInstance.selectInRegistrationPhase]
151  * or [SelectInstance.trySelect] should be processed. For example, both [ReceiveChannel.onReceive] and
152  * [ReceiveChannel.onReceiveCatching] clauses perform exactly the same synchronization logic,
153  * but differ when the channel has been discovered in the closed or cancelled state.
154  *
155  * @suppress **This is unstable API, and it is subject to change.**
156  */
157 @InternalCoroutinesApi
158 public typealias ProcessResultFunction = (clauseObject: Any, param: Any?, clauseResult: Any?) -> Any?
159 
160 /**
161  * This function specifies how the internal result, provided via [SelectInstance.trySelect]
162  * or [SelectInstance.selectInRegistrationPhase], should be processed in case of this `select`
163  * cancellation while dispatching. Unfortunately, we cannot pass this function only in [SelectInstance.trySelect],
164  * as [SelectInstance.selectInRegistrationPhase] can be called when the coroutine is already cancelled.
165  *
166  * @suppress **This is unstable API, and it is subject to change.**
167  */
168 @InternalCoroutinesApi
169 public typealias OnCancellationConstructor = (select: SelectInstance<*>, param: Any?, internalResult: Any?) ->
170     (Throwable, Any?, CoroutineContext) -> Unit
171 
172 /**
173  * Clause for [select] expression without additional parameters that does not select any value.
174  */
175 public sealed interface SelectClause0 : SelectClause
176 
177 internal class SelectClause0Impl(
178     override val clauseObject: Any,
179     override val regFunc: RegistrationFunction,
180     override val onCancellationConstructor: OnCancellationConstructor? = null
181 ) : SelectClause0 {
182     override val processResFunc: ProcessResultFunction = DUMMY_PROCESS_RESULT_FUNCTION
183 }
184 
_null185 private val DUMMY_PROCESS_RESULT_FUNCTION: ProcessResultFunction = { _, _, _ -> null }
186 
187 /**
188  * Clause for [select] expression without additional parameters that selects value of type [Q].
189  */
190 public sealed interface SelectClause1<out Q> : SelectClause
191 
192 internal class SelectClause1Impl<Q>(
193     override val clauseObject: Any,
194     override val regFunc: RegistrationFunction,
195     override val processResFunc: ProcessResultFunction,
196     override val onCancellationConstructor: OnCancellationConstructor? = null
197 ) : SelectClause1<Q>
198 
199 /**
200  * Clause for [select] expression with additional parameter of type [P] that selects value of type [Q].
201  */
202 public sealed interface SelectClause2<in P, out Q> : SelectClause
203 
204 internal class SelectClause2Impl<P, Q>(
205     override val clauseObject: Any,
206     override val regFunc: RegistrationFunction,
207     override val processResFunc: ProcessResultFunction,
208     override val onCancellationConstructor: OnCancellationConstructor? = null
209 ) : SelectClause2<P, Q>
210 
211 /**
212  * Internal representation of `select` instance.
213  *
214  * @suppress **This is unstable API, and it is subject to change.**
215  */
216 @InternalCoroutinesApi
217 public sealed interface SelectInstance<in R> {
218     /**
219      * The context of the coroutine that is performing this `select` operation.
220      */
221     public val context: CoroutineContext
222 
223     /**
224      * This function should be called by other operations,
225      * which are trying to perform a rendezvous with this `select`.
226      * Returns `true` if the rendezvous succeeds, `false` otherwise.
227      *
228      * Note that according to the current implementation, a rendezvous attempt can fail
229      * when either another clause is already selected or this `select` is still in
230      * REGISTRATION phase. To distinguish the reasons, [SelectImplementation.trySelectDetailed]
231      * function can be used instead.
232      */
trySelectnull233     public fun trySelect(clauseObject: Any, result: Any?): Boolean
234 
235     /**
236      * When this `select` instance is stored as a waiter, the specified [handle][disposableHandle]
237      * defines how the stored `select` should be removed in case of cancellation or another clause selection.
238      */
239     public fun disposeOnCompletion(disposableHandle: DisposableHandle)
240 
241     /**
242      * When a clause becomes selected during registration, the corresponding internal result
243      * (which is further passed to the clause's [ProcessResultFunction]) should be provided
244      * via this function. After that, other clause registrations are ignored and [trySelect] fails.
245      */
246     public fun selectInRegistrationPhase(internalResult: Any?)
247 }
248 
249 internal interface SelectInstanceInternal<R> : SelectInstance<R>, Waiter
250 
251 @PublishedApi
252 internal open class SelectImplementation<R>(
253     override val context: CoroutineContext
254 ) : CancelHandler, SelectBuilder<R>, SelectInstanceInternal<R> {
255 
256     /**
257      * Essentially, the `select` operation is split into three phases: REGISTRATION, WAITING, and COMPLETION.
258      *
259      * == Phase 1: REGISTRATION ==
260      * In the first REGISTRATION phase, the user-specified [SelectBuilder] is applied, and all the listed clauses
261      * are registered via the provided [registration functions][SelectClause.regFunc]. Intuitively, `select` clause
262      * registration is similar to the plain blocking operation, with the only difference that this [SelectInstance]
263      * is stored as a waiter instead of continuation, and [SelectInstance.trySelect] is used to make a rendezvous.
264      * Also, when registering, it is possible for the operation to complete immediately, without waiting. In this case,
265      * [SelectInstance.selectInRegistrationPhase] should be used. Otherwise, when no rendezvous happens and this `select`
266      * instance is stored as a waiter, a completion handler for the registering clause should be specified via
267      * [SelectInstance.disposeOnCompletion]; this handler specifies how to remove this `select` instance from the
268      * clause object when another clause becomes selected or the operation cancels.
269      *
270      * After a clause registration is completed, another coroutine can attempt to make a rendezvous with this `select`.
271      * However, to resolve a race between clauses registration and [SelectInstance.trySelect], the latter fails when
272      * this `select` is still in REGISTRATION phase. Thus, the corresponding clause has to be registered again.
273      *
274      * In this phase, the `state` field stores either a special [STATE_REG] marker or
275      * a list of clauses to be re-registered due to failed rendezvous attempts.
276      *
277      * == Phase 2: WAITING ==
278      * If no rendezvous happens in REGISTRATION phase, the `select` operation moves to WAITING one and suspends until
279      * [SelectInstance.trySelect] is called. Also, when waiting, this `select` can be cancelled. In the latter case,
280      * further [SelectInstance.trySelect] attempts fail, and all the completion handlers, specified via
281      * [SelectInstance.disposeOnCompletion], are invoked to remove this `select` instance from the corresponding
282      * clause objects.
283      *
284      * In this phase, the `state` field stores either the continuation to be later resumed or a special `Cancelled`
285      * object (with the cancellation cause inside) when this `select` becomes cancelled.
286      *
287      * == Phase 3: COMPLETION ==
288      * Once a rendezvous happens either in REGISTRATION phase (via [SelectInstance.selectInRegistrationPhase]) or
289      * in WAITING phase (via [SelectInstance.trySelect]), this `select` moves to the final `COMPLETION` phase.
290      * First, the provided internal result is processed via the [ProcessResultFunction] of the selected clause;
291      * it returns the argument for the user-specified block or throws an exception (see [SendChannel.onSend] as
292      * an example). After that, this `select` should be removed from all other clause objects by calling the
293      * corresponding [DisposableHandle]-s, provided via [SelectInstance.disposeOnCompletion] during registration.
294      * At the end, the user-specified block is called and this `select` finishes.
295      *
296      * In this phase, once a rendezvous is happened, the `state` field stores the corresponding clause.
297      * After that, it moves to [STATE_COMPLETED] to avoid memory leaks.
298      *
299      *
300      *
301      * The state machine is listed below:
302      *
303      *            REGISTRATION PHASE                   WAITING PHASE             COMPLETION PHASE
304      *       ⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢             ⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢         ⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢
305      *
306      *                                                 +-----------+                 +-----------+
307      *                                                 | CANCELLED |                 | COMPLETED |
308      *                                                 +-----------+                 +-----------+
309      *                                                       ^                             ^
310      *     INITIAL STATE                                     |                             | this `select`
311      *     ------------+                                     |  cancelled                  | is completed
312      *                  \                                    |                             |
313      *                   +=============+     move to     +------+    successful   +------------+
314      *                +--|  STATE_REG  |---------------> | cont |-----------------| ClauseData |
315      *                |  +=============+  WAITING phase  +------+  trySelect(..)  +------------+
316      *                |    ^     |                                                       ^
317      *                |    |     |    some clause has been selected during registration  |
318      *         add a  |    |     +-------------------------------------------------------+
319      *   clause to be |    |                                                             |
320      *  re-registered |    | re-register                   some clause has been selected |
321      *                |    | clauses                     during registration while there |
322      *                v    |                            are clauses to be re-registered; |
323      *          +------------------+                                   ignore the latter |
324      *       +--| List<ClauseData> |-----------------------------------------------------+
325      *       |  +------------------+
326      *       |            ^
327      *       |            |  add one more clause
328      *       |            |  for re-registration
329      *       +------------+
330      *
331      * One of the most valuable benefits of this `select` design is that it allows processing clauses
332      * in a way similar to plain operations, such as `send` or `receive` on channels. The only difference
333      * is that instead of continuation, the operation should store the provided `select` instance object.
334      * Thus, this design makes it possible to support the `select` expression for any blocking data structure
335      * in Kotlin Coroutines.
336      *
337      * It is worth mentioning that the algorithm above provides "obstruction-freedom" non-blocking guarantee
338      * instead of the standard "lock-freedom" to avoid using heavy descriptors. In practice, this relaxation
339      * does not make significant difference. However, it is vital for Kotlin Coroutines to provide some
340      * non-blocking guarantee, as users may add blocking code in [SelectBuilder], and this blocking code
341      * should not cause blocking behaviour in other places, such as an attempt to make a rendezvous with
342      * the `select` that is hang in REGISTRATION phase.
343      *
344      * Also, this implementation is NOT linearizable under some circumstances. The reason is that a rendezvous
345      * attempt with `select` (via [SelectInstance.trySelect]) may fail when this `select` operation is still
346      * in REGISTRATION phase. Consider the following situation on two empty rendezvous channels `c1` and `c2`
347      * and the `select` operation that tries to send an element to one of these channels. First, this `select`
348      * instance is registered as a waiter in `c1`. After that, another thread can observe that `c1` is no longer
349      * empty and try to receive an element from `c1` -- this receive attempt fails due to the `select` operation
350      * being in REGISTRATION phase.
351      * It is also possible to observe that this `select` operation registered in `c2` first, and only after that in
352      * `c1` (it has to re-register in `c1` after the unsuccessful rendezvous attempt), which is also non-linearizable.
353      * We, however, find such a non-linearizable behaviour not so important in practice and leverage the correctness
354      * relaxation for the algorithm simplicity and the non-blocking progress guarantee.
355      */
356 
357     /**
358      * The state of this `select` operation. See the description above for details.
359      */
360     private val state = atomic<Any>(STATE_REG)
361 
362     /**
363      * Returns `true` if this `select` instance is in the REGISTRATION phase;
364      * otherwise, returns `false`.
365      */
366     private val inRegistrationPhase
367         get() = state.value.let {
368             it === STATE_REG || it is List<*>
369         }
370 
371     /**
372      * Returns `true` if this `select` is already selected;
373      * thus, other parties are bound to fail when making a rendezvous with it.
374      */
375     private val isSelected
376         get() = state.value is SelectImplementation<*>.ClauseData
377 
378     /**
379      * Returns `true` if this `select` is cancelled.
380      */
381     private val isCancelled
382         get() = state.value === STATE_CANCELLED
383 
384     /**
385      * List of clauses waiting on this `select` instance.
386      *
387      * This property is the subject to bening data race: concurrent cancellation might null-out this property
388      * while [trySelect] operation reads it and iterates over its content.
389      * A logical race is resolved by the consensus on [state] property.
390      */
391     @BenignDataRace
392     private var clauses: MutableList<ClauseData>? = ArrayList(2)
393 
394     /**
395      * Stores the completion action provided through [disposeOnCompletion] or [invokeOnCancellation]
396      * during clause registration. After that, if the clause is successfully registered
397      * (so, it has not completed immediately), this handler is stored into
398      * the corresponding [ClauseData] instance.
399      *
400      * Note that either [DisposableHandle] is provided, or a [Segment] instance with
401      * the index in it, which specify the location of storing this `select`.
402      * In the latter case, [Segment.onCancellation] should be called on completion/cancellation.
403      */
404     private var disposableHandleOrSegment: Any? = null
405 
406     /**
407      * In case the disposable handle is specified via [Segment]
408      * and index in it, implying calling [Segment.onCancellation],
409      * the corresponding index is stored in this field.
410      * The segment is stored in [disposableHandleOrSegment].
411      */
412     private var indexInSegment: Int = -1
413 
414     /**
415      * Stores the result passed via [selectInRegistrationPhase] during clause registration
416      * or [trySelect], which is called by another coroutine trying to make a rendezvous
417      * with this `select` instance. Further, this result is processed via the
418      * [ProcessResultFunction] of the selected clause.
419      *
420      * Unfortunately, we cannot store the result in the [state] field, as the latter stores
421      * the clause object upon selection (see [ClauseData.clauseObject] and [SelectClause.clauseObject]).
422      * Instead, it is possible to merge the [internalResult] and [disposableHandle] fields into
423      * one that stores either result when the clause is successfully registered ([inRegistrationPhase] is `true`),
424      * or [DisposableHandle] instance when the clause is completed during registration ([inRegistrationPhase] is `false`).
425      * Yet, this optimization is omitted for code simplicity.
426      *
427      * This property is the subject to benign data race:
428      * [Cleanup][cleanup] procedure can be invoked both as part of the completion sequence
429      * and as a cancellation handler triggered by an external cancellation.
430      * In both scenarios, [NO_RESULT] is written to this property via race.
431      */
432     @BenignDataRace
433     private var internalResult: Any? = NO_RESULT
434 
435     /**
436      * This function is called after the [SelectBuilder] is applied. In case one of the clauses is already selected,
437      * the algorithm applies the corresponding [ProcessResultFunction] and invokes the user-specified [block][ClauseData.block].
438      * Otherwise, it moves this `select` to WAITING phase (re-registering clauses if needed), suspends until a rendezvous
439      * is happened, and then completes the operation by applying the corresponding [ProcessResultFunction] and
440      * invoking the user-specified [block][ClauseData.block].
441      */
442     @PublishedApi
443     internal open suspend fun doSelect(): R =
444         if (isSelected) complete()  // Fast path
445         else doSelectSuspend()      // Slow path
446 
447     // We separate the following logic as it has two suspension points
448     // and, therefore, breaks the tail-call optimization if it were
449     // inlined in [doSelect]
450     private suspend fun doSelectSuspend(): R {
451         // In case no clause has been selected during registration,
452         // the `select` operation suspends and waits for a rendezvous.
453         waitUntilSelected() // <-- suspend call => no tail-call optimization here
454         // There is a selected clause! Apply the corresponding
455         // [ProcessResultFunction] and invoke the user-specified block.
456         return complete() // <-- one more suspend call
457     }
458 
459     // ========================
460     // = CLAUSES REGISTRATION =
461     // ========================
462 
463     override fun SelectClause0.invoke(block: suspend () -> R) =
464         ClauseData(clauseObject, regFunc, processResFunc, PARAM_CLAUSE_0, block, onCancellationConstructor).register()
465 
466     override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) =
467         ClauseData(clauseObject, regFunc, processResFunc, null, block, onCancellationConstructor).register()
468 
469     override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) =
470         ClauseData(clauseObject, regFunc, processResFunc, param, block, onCancellationConstructor).register()
471 
472     /**
473      * Attempts to register this `select` clause. If another clause is already selected,
474      * this function does nothing and completes immediately.
475      * Otherwise, it registers this `select` instance in
476      * the [clause object][ClauseData.clauseObject]
477      * according to the provided [registration function][ClauseData.regFunc].
478      * On success, this `select` instance is stored as a waiter
479      * in the clause object -- the algorithm also stores
480      * the provided via [disposeOnCompletion] completion action
481      * and adds the clause to the list of registered one.
482      * In case of registration failure, the internal result
483      * (not processed by [ProcessResultFunction] yet) must be
484      * provided via [selectInRegistrationPhase] -- the algorithm
485      * updates the state to this clause reference.
486      */
487     @JvmName("register")
488     internal fun ClauseData.register(reregister: Boolean = false) {
489         assert { state.value !== STATE_CANCELLED }
490         // Is there already selected clause?
491         if (state.value.let { it is SelectImplementation<*>.ClauseData }) return
492         // For new clauses, check that there does not exist
493         // another clause with the same object.
494         if (!reregister) checkClauseObject(clauseObject)
495         // Try to register in the corresponding object.
496         if (tryRegisterAsWaiter(this@SelectImplementation)) {
497             // Successfully registered, and this `select` instance
498             // is stored as a waiter. Add this clause to the list
499             // of registered clauses and store the provided via
500             // [invokeOnCompletion] completion action into the clause.
501             //
502             // Importantly, the [waitUntilSelected] function is implemented
503             // carefully to ensure that the cancellation handler has not been
504             // installed when clauses re-register, so the logic below cannot
505             // be invoked concurrently with the clean-up procedure.
506             // This also guarantees that the list of clauses cannot be cleared
507             // in the registration phase, so it is safe to read it with "!!".
508             if (!reregister) clauses!! += this
509             disposableHandleOrSegment = this@SelectImplementation.disposableHandleOrSegment
510             indexInSegment = this@SelectImplementation.indexInSegment
511             this@SelectImplementation.disposableHandleOrSegment = null
512             this@SelectImplementation.indexInSegment = -1
513         } else {
514             // This clause has been selected!
515             // Update the state correspondingly.
516             state.value = this
517         }
518     }
519 
520     /**
521      * Checks that there does not exist another clause with the same object.
522      */
523     private fun checkClauseObject(clauseObject: Any) {
524         // Read the list of clauses, it is guaranteed that it is non-null.
525         // In fact, it can become `null` only in the clean-up phase, while
526         // this check can be called only in the registration one.
527         val clauses = clauses!!
528         // Check that there does not exist another clause with the same object.
529         check(clauses.none { it.clauseObject === clauseObject }) {
530             "Cannot use select clauses on the same object: $clauseObject"
531         }
532     }
533 
534     override fun disposeOnCompletion(disposableHandle: DisposableHandle) {
535         this.disposableHandleOrSegment = disposableHandle
536     }
537 
538     /**
539      * An optimized version for the code below that does not allocate
540      * a cancellation handler object and efficiently stores the specified
541      * [segment] and [index].
542      *
543      * ```
544      * disposeOnCompletion {
545      *     segment.onCancellation(index, null)
546      * }
547      * ```
548      */
549     override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
550         this.disposableHandleOrSegment = segment
551         this.indexInSegment = index
552     }
553 
554     override fun selectInRegistrationPhase(internalResult: Any?) {
555         this.internalResult = internalResult
556     }
557 
558     // =========================
559     // = WAITING FOR SELECTION =
560     // =========================
561 
562     /**
563      * Suspends and waits until some clause is selected. However, it is possible for a concurrent
564      * coroutine to invoke [trySelect] while this `select` is still in REGISTRATION phase.
565      * In this case, [trySelect] marks the corresponding select clause to be re-registered, and
566      * this function performs registration of such clauses. After that, it atomically stores
567      * the continuation into the [state] field if there is no more clause to be re-registered.
568      */
569     private suspend fun waitUntilSelected() = suspendCancellableCoroutine<Unit> sc@{ cont ->
570         // Update the state.
571         state.loop { curState ->
572             when {
573                 // This `select` is in REGISTRATION phase, and there is no clause to be re-registered.
574                 // Perform a transition to WAITING phase by storing the current continuation.
575                 curState === STATE_REG -> if (state.compareAndSet(curState, cont)) {
576                     // Perform a clean-up in case of cancellation.
577                     //
578                     // Importantly, we MUST install the cancellation handler
579                     // only when the algorithm is bound to suspend. Otherwise,
580                     // a race with [tryRegister] is possible, and the provided
581                     // via [disposeOnCompletion] cancellation action can be ignored.
582                     // Also, we MUST guarantee that this dispose handle is _visible_
583                     // according to the memory model, and we CAN guarantee this when
584                     // the state is updated.
585                     cont.invokeOnCancellation(this)
586                     return@sc
587                 }
588                 // This `select` is in REGISTRATION phase, but there are clauses that has to be registered again.
589                 // Perform the required registrations and try again.
590                 curState is List<*> -> if (state.compareAndSet(curState, STATE_REG)) {
591                     @Suppress("UNCHECKED_CAST")
592                     curState as List<Any>
593                     curState.forEach { reregisterClause(it) }
594                 }
595                 // This `select` operation became completed during clauses re-registration.
596                 curState is SelectImplementation<*>.ClauseData -> {
597                     cont.resume(Unit, curState.createOnCancellationAction(this, internalResult))
598                     return@sc
599                 }
600                 // This `select` cannot be in any other state.
601                 else -> error("unexpected state: $curState")
602             }
603         }
604     }
605 
606     /**
607      * Re-registers the clause with the specified
608      * [clause object][clauseObject] after unsuccessful
609      * [trySelect] of this clause while the `select`
610      * was still in REGISTRATION phase.
611      */
612     private fun reregisterClause(clauseObject: Any) {
613         val clause = findClause(clauseObject)!! // it is guaranteed that the corresponding clause is presented
614         clause.disposableHandleOrSegment = null
615         clause.indexInSegment = -1
616         clause.register(reregister = true)
617     }
618 
619     // ==============
620     // = RENDEZVOUS =
621     // ==============
622 
623     override fun trySelect(clauseObject: Any, result: Any?): Boolean =
624         trySelectInternal(clauseObject, result) == TRY_SELECT_SUCCESSFUL
625 
626     /**
627      * Similar to [trySelect] but provides a failure reason
628      * if this rendezvous is unsuccessful. We need this function
629      * in the channel implementation.
630      */
631     fun trySelectDetailed(clauseObject: Any, result: Any?) =
632         TrySelectDetailedResult(trySelectInternal(clauseObject, result))
633 
634     private fun trySelectInternal(clauseObject: Any, internalResult: Any?): Int {
635         while (true) {
636             when (val curState = state.value) {
637                 // Perform a rendezvous with this select if it is in WAITING state.
638                 is CancellableContinuation<*> -> {
639                     val clause = findClause(clauseObject) ?: continue // retry if `clauses` is already `null`
640                     val onCancellation = clause.createOnCancellationAction(this@SelectImplementation, internalResult)
641                     if (state.compareAndSet(curState, clause)) {
642                         @Suppress("UNCHECKED_CAST")
643                         val cont = curState as CancellableContinuation<Unit>
644                         // Success! Store the resumption value and
645                         // try to resume the continuation.
646                         this.internalResult = internalResult
647                         if (cont.tryResume(onCancellation)) return TRY_SELECT_SUCCESSFUL
648                         // If the resumption failed, we need to clean the [result] field to avoid memory leaks.
649                         this.internalResult = NO_RESULT
650                         return TRY_SELECT_CANCELLED
651                     }
652                 }
653                 // Already selected.
654                 STATE_COMPLETED, is SelectImplementation<*>.ClauseData -> return TRY_SELECT_ALREADY_SELECTED
655                 // Already cancelled.
656                 STATE_CANCELLED -> return TRY_SELECT_CANCELLED
657                 // This select is still in REGISTRATION phase, re-register the clause
658                 // in order not to wait until this select moves to WAITING phase.
659                 // This is a rare race, so we do not need to worry about performance here.
660                 STATE_REG -> if (state.compareAndSet(curState, listOf(clauseObject))) return TRY_SELECT_REREGISTER
661                 // This select is still in REGISTRATION phase, and the state stores a list of clauses
662                 // for re-registration, add the selecting clause to this list.
663                 // This is a rare race, so we do not need to worry about performance here.
664                 is List<*> -> if (state.compareAndSet(curState, curState + clauseObject)) return TRY_SELECT_REREGISTER
665                 // Another state? Something went really wrong.
666                 else -> error("Unexpected state: $curState")
667             }
668         }
669     }
670 
671     /**
672      * Finds the clause with the corresponding [clause object][SelectClause.clauseObject].
673      * If the reference to the list of clauses is already cleared due to completion/cancellation,
674      * this function returns `null`
675      */
676     private fun findClause(clauseObject: Any): ClauseData? {
677         // Read the list of clauses. If the `clauses` field is already `null`,
678         // the clean-up phase has already completed, and this function returns `null`.
679         val clauses = this.clauses ?: return null
680         // Find the clause with the specified clause object.
681         return clauses.find { it.clauseObject === clauseObject }
682             ?: error("Clause with object $clauseObject is not found")
683     }
684 
685     // ==============
686     // = COMPLETION =
687     // ==============
688 
689     /**
690      * Completes this `select` operation after the internal result is provided
691      * via [SelectInstance.trySelect] or [SelectInstance.selectInRegistrationPhase].
692      * (1) First, this function applies the [ProcessResultFunction] of the selected clause
693      * to the internal result.
694      * (2) After that, the [clean-up procedure][cleanup]
695      * is called to remove this `select` instance from other clause objects, and
696      * make it possible to collect it by GC after this `select` finishes.
697      * (3) Finally, the user-specified block is invoked
698      * with the processed result as an argument.
699      */
700     private suspend fun complete(): R {
701         assert { isSelected }
702         // Get the selected clause.
703         @Suppress("UNCHECKED_CAST")
704         val selectedClause = state.value as SelectImplementation<R>.ClauseData
705         // Perform the clean-up before the internal result processing and
706         // the user-specified block invocation to guarantee the absence
707         // of memory leaks. Collect the internal result before that.
708         val internalResult = this.internalResult
709         cleanup(selectedClause)
710         // Process the internal result and invoke the user's block.
711         return if (!RECOVER_STACK_TRACES) {
712             // TAIL-CALL OPTIMIZATION: the `suspend` block
713             // is invoked at the very end.
714             val blockArgument = selectedClause.processResult(internalResult)
715             selectedClause.invokeBlock(blockArgument)
716         } else {
717             // TAIL-CALL OPTIMIZATION: the `suspend`
718             // function is invoked at the very end.
719             // However, internally this `suspend` function
720             // constructs a state machine to recover a
721             // possible stack-trace.
722             processResultAndInvokeBlockRecoveringException(selectedClause, internalResult)
723         }
724     }
725 
726     private suspend fun processResultAndInvokeBlockRecoveringException(clause: ClauseData, internalResult: Any?): R =
727         try {
728             val blockArgument = clause.processResult(internalResult)
729             clause.invokeBlock(blockArgument)
730         } catch (e: Throwable) {
731             // In the debug mode, we need to properly recover
732             // the stack-trace of the exception; the tail-call
733             // optimization cannot be applied here.
734             recoverAndThrow(e)
735         }
736 
737     /**
738      * Invokes all [DisposableHandle]-s provided via
739      * [SelectInstance.disposeOnCompletion] during
740      * clause registrations.
741      */
742     private fun cleanup(selectedClause: ClauseData) {
743         assert { state.value == selectedClause }
744         // Read the list of clauses. If the `clauses` field is already `null`,
745         // a concurrent clean-up procedure has already completed, and it is safe to finish.
746         val clauses = this.clauses ?: return
747         // Invoke all cancellation handlers except for the
748         // one related to the selected clause, if specified.
749         clauses.forEach { clause ->
750             if (clause !== selectedClause) clause.dispose()
751         }
752         // We do need to clean all the data to avoid memory leaks.
753         this.state.value = STATE_COMPLETED
754         this.internalResult = NO_RESULT
755         this.clauses = null
756     }
757 
758     // [CompletionHandler] implementation, must be invoked on cancellation.
759     override fun invoke(cause: Throwable?) {
760         // Update the state.
761         state.update { cur ->
762             // Finish immediately when this `select` is already completed.
763             // Notably, this select might be logically completed
764             // (the `state` field stores the selected `ClauseData`),
765             // while the continuation is already cancelled.
766             // We need to invoke the cancellation handler in this case.
767             if (cur === STATE_COMPLETED) return
768             STATE_CANCELLED
769         }
770         // Read the list of clauses. If the `clauses` field is already `null`,
771         // a concurrent clean-up procedure has already completed, and it is safe to finish.
772         val clauses = this.clauses ?: return
773         // Remove this `select` instance from all the clause object (channels, mutexes, etc.).
774         clauses.forEach { it.dispose() }
775         // We do need to clean all the data to avoid memory leaks.
776         this.internalResult = NO_RESULT
777         this.clauses = null
778     }
779 
780     /**
781      * Each `select` clause is internally represented with a [ClauseData] instance.
782      */
783     internal inner class ClauseData(
784         @JvmField val clauseObject: Any, // the object of this `select` clause: Channel, Mutex, Job, ...
785         private val regFunc: RegistrationFunction,
786         private val processResFunc: ProcessResultFunction,
787         private val param: Any?, // the user-specified param
788         private val block: Any, // the user-specified block, which should be called if this clause becomes selected
789         @JvmField val onCancellationConstructor: OnCancellationConstructor?
790     ) {
791         @JvmField
792         var disposableHandleOrSegment: Any? = null
793         @JvmField
794         var indexInSegment: Int = -1
795 
796         /**
797          * Tries to register the specified [select] instance in [clauseObject] and check
798          * whether the registration succeeded or a rendezvous has happened during the registration.
799          * This function returns `true` if this [select] is successfully registered and
800          * is _waiting_ for a rendezvous, or `false` when this clause becomes
801          * selected during registration.
802          *
803          * For example, the [Channel.onReceive] clause registration
804          * on a non-empty channel retrieves the first element and completes
805          * the corresponding [select] via [SelectInstance.selectInRegistrationPhase].
806          */
807         fun tryRegisterAsWaiter(select: SelectImplementation<R>): Boolean {
808             assert { select.inRegistrationPhase || select.isCancelled }
809             assert { select.internalResult === NO_RESULT }
810             regFunc(clauseObject, select, param)
811             return select.internalResult === NO_RESULT
812         }
813 
814         /**
815          * Processes the internal result provided via either
816          * [SelectInstance.selectInRegistrationPhase] or
817          * [SelectInstance.trySelect] and returns an argument
818          * for the user-specified [block].
819          *
820          * Importantly, this function may throw an exception
821          * (e.g., when the channel is closed in [Channel.onSend], the
822          * corresponding [ProcessResultFunction] is bound to fail).
823          */
824         fun processResult(result: Any?) = processResFunc(clauseObject, param, result)
825 
826         /**
827          * Invokes the user-specified block and returns
828          * the final result of this `select` clause.
829          */
830         @Suppress("UNCHECKED_CAST")
831         suspend fun invokeBlock(argument: Any?): R {
832             val block = block
833             // We distinguish no-argument and 1-argument
834             // lambdas via special markers for the clause
835             // parameters. Specifically, PARAM_CLAUSE_0
836             // is always used with [SelectClause0], which
837             // takes a no-argument lambda.
838             //
839             // TAIL-CALL OPTIMIZATION: we invoke
840             // the `suspend` block at the very end.
841             return if (this.param === PARAM_CLAUSE_0) {
842                 block as suspend () -> R
843                 block()
844             } else {
845                 block as suspend (Any?) -> R
846                 block(argument)
847             }
848         }
849 
850         fun dispose() {
851             with(disposableHandleOrSegment) {
852                 if (this is Segment<*>) {
853                     this.onCancellation(indexInSegment, null, context)
854                 } else {
855                     (this as? DisposableHandle)?.dispose()
856                 }
857             }
858         }
859 
860         fun createOnCancellationAction(select: SelectInstance<*>, internalResult: Any?) =
861             onCancellationConstructor?.invoke(select, param, internalResult)
862     }
863 }
864 
tryResumenull865 private fun CancellableContinuation<Unit>.tryResume(
866     onCancellation: ((cause: Throwable, value: Any?, context: CoroutineContext) -> Unit)?
867 ): Boolean {
868     val token =
869         tryResume(Unit, null, onCancellation) ?: return false
870     completeResume(token)
871     return true
872 }
873 
874 // trySelectInternal(..) results.
875 private const val TRY_SELECT_SUCCESSFUL = 0
876 private const val TRY_SELECT_REREGISTER = 1
877 private const val TRY_SELECT_CANCELLED = 2
878 private const val TRY_SELECT_ALREADY_SELECTED = 3
879 
880 // trySelectDetailed(..) results.
881 internal enum class TrySelectDetailedResult {
882     SUCCESSFUL, REREGISTER, CANCELLED, ALREADY_SELECTED
883 }
TrySelectDetailedResultnull884 private fun TrySelectDetailedResult(trySelectInternalResult: Int): TrySelectDetailedResult = when(trySelectInternalResult) {
885     TRY_SELECT_SUCCESSFUL -> SUCCESSFUL
886     TRY_SELECT_REREGISTER -> REREGISTER
887     TRY_SELECT_CANCELLED -> CANCELLED
888     TRY_SELECT_ALREADY_SELECTED -> ALREADY_SELECTED
889     else -> error("Unexpected internal result: $trySelectInternalResult")
890 }
891 
892 // Markers for REGISTRATION, COMPLETED, and CANCELLED states.
893 private val STATE_REG = Symbol("STATE_REG")
894 private val STATE_COMPLETED = Symbol("STATE_COMPLETED")
895 private val STATE_CANCELLED = Symbol("STATE_CANCELLED")
896 
897 // As the selection result is nullable, we use this special
898 // marker for the absence of result.
899 private val NO_RESULT = Symbol("NO_RESULT")
900 
901 // We use this marker parameter objects to distinguish
902 // SelectClause[0,1,2] and invoke the user-specified block correctly.
903 internal val PARAM_CLAUSE_0 = Symbol("PARAM_CLAUSE_0")
904