1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.selects
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.internal.*
11 import kotlinx.coroutines.selects.TrySelectDetailedResult.*
12 import kotlin.contracts.*
13 import kotlin.coroutines.*
14 import kotlin.internal.*
15 import kotlin.jvm.*
16
17 /**
18 * Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
19 * in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
20 * is either _selected_ or _fails_.
21 *
22 * At most one clause is *atomically* selected and its block is executed. The result of the selected clause
23 * becomes the result of the select. If any clause _fails_, then the select invocation produces the
24 * corresponding exception. No clause is selected in this case.
25 *
26 * This select function is _biased_ to the first clause. When multiple clauses can be selected at the same time,
27 * the first one of them gets priority. Use [selectUnbiased] for an unbiased (randomized) selection among
28 * the clauses.
29
30 * There is no `default` clause for select expression. Instead, each selectable suspending function has the
31 * corresponding non-suspending version that can be used with a regular `when` expression to select one
32 * of the alternatives or to perform the default (`else`) action if none of them can be immediately selected.
33 *
34 * ### List of supported select methods
35 *
36 * | **Receiver** | **Suspending function** | **Select clause**
37 * | ---------------- | --------------------------------------------- | -----------------------------------------------------
38 * | [Job] | [join][Job.join] | [onJoin][Job.onJoin]
39 * | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait]
40 * | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend]
41 * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive]
42 * | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching]
43 * | none | [delay] | [onTimeout][SelectBuilder.onTimeout]
44 *
45 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
46 * function is suspended, this function immediately resumes with [CancellationException].
47 * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
48 * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
49 *
50 * Note that this function does not check for cancellation when it is not suspended.
51 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
52 */
53 @OptIn(ExperimentalContracts::class)
54 public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R {
55 contract {
56 callsInPlace(builder, InvocationKind.EXACTLY_ONCE)
57 }
58 return SelectImplementation<R>(coroutineContext).run {
59 builder(this)
60 // TAIL-CALL OPTIMIZATION: the only
61 // suspend call is at the last position.
62 doSelect()
63 }
64 }
65
66 /**
67 * Scope for [select] invocation.
68 *
69 * An instance of [SelectBuilder] can only be retrieved as a receiver of a [select] block call,
70 * and it is only valid during the registration phase of the select builder.
71 * Any uses outside it lead to unspecified behaviour and are prohibited.
72 *
73 * The general rule of thumb is that instances of this type should always be used
74 * implicitly and there shouldn't be any signatures mentioning this type,
75 * whether explicitly (e.g. function signature) or implicitly (e.g. inferred `val` type).
76 */
77 public sealed interface SelectBuilder<in R> {
78 /**
79 * Registers a clause in this [select] expression without additional parameters that does not select any value.
80 */
invokenull81 public operator fun SelectClause0.invoke(block: suspend () -> R)
82
83 /**
84 * Registers clause in this [select] expression without additional parameters that selects value of type [Q].
85 */
86 public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
87
88 /**
89 * Registers clause in this [select] expression with additional parameter of type [P] that selects value of type [Q].
90 */
91 public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
92
93 /**
94 * Registers clause in this [select] expression with additional nullable parameter of type [P]
95 * with the `null` value for this parameter that selects value of type [Q].
96 */
97 public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R): Unit = invoke(null, block)
98
99 /**
100 * Clause that selects the given [block] after a specified timeout passes.
101 * If timeout is negative or zero, [block] is selected immediately.
102 *
103 * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future.
104 *
105 * @param timeMillis timeout time in milliseconds.
106 */
107 @ExperimentalCoroutinesApi
108 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
109 @LowPriorityInOverloadResolution
110 @Deprecated(
111 message = "Replaced with the same extension function",
112 level = DeprecationLevel.ERROR, replaceWith = ReplaceWith(expression = "onTimeout", imports = ["kotlinx.coroutines.selects.onTimeout"])
113 ) // Since 1.7.0, was experimental
114 public fun onTimeout(timeMillis: Long, block: suspend () -> R): Unit = onTimeout(timeMillis, block)
115 }
116
117 /**
118 * Each [select] clause is specified with:
119 * 1) the [object of this clause][clauseObject],
120 * such as the channel instance for [SendChannel.onSend];
121 * 2) the function that specifies how this clause
122 * should be registered in the object above;
123 * 3) the function that modifies the internal result
124 * (passed via [SelectInstance.trySelect] or
125 * [SelectInstance.selectInRegistrationPhase])
126 * to the argument of the user-specified block.
127 * 4) the function that specifies how the internal result provided via
128 * [SelectInstance.trySelect] or [SelectInstance.selectInRegistrationPhase]
129 * should be processed in case of this `select` cancellation while dispatching.
130 */
131 @InternalCoroutinesApi
132 public sealed interface SelectClause {
133 public val clauseObject: Any
134 public val regFunc: RegistrationFunction
135 public val processResFunc: ProcessResultFunction
136 public val onCancellationConstructor: OnCancellationConstructor?
137 }
138
139 /**
140 * The registration function specifies how the `select` instance should be registered into
141 * the specified clause object. In case of channels, the registration logic
142 * coincides with the plain `send/receive` operation with the only difference that
143 * the `select` instance is stored as a waiter instead of continuation.
144 */
145 @InternalCoroutinesApi
146 public typealias RegistrationFunction = (clauseObject: Any, select: SelectInstance<*>, param: Any?) -> Unit
147
148 /**
149 * This function specifies how the _internal_ result, provided via [SelectInstance.selectInRegistrationPhase]
150 * or [SelectInstance.trySelect] should be processed. For example, both [ReceiveChannel.onReceive] and
151 * [ReceiveChannel.onReceiveCatching] clauses perform exactly the same synchronization logic,
152 * but differ when the channel has been discovered in the closed or cancelled state.
153 */
154 @InternalCoroutinesApi
155 public typealias ProcessResultFunction = (clauseObject: Any, param: Any?, clauseResult: Any?) -> Any?
156
157 /**
158 * This function specifies how the internal result, provided via [SelectInstance.trySelect]
159 * or [SelectInstance.selectInRegistrationPhase], should be processed in case of this `select`
160 * cancellation while dispatching. Unfortunately, we cannot pass this function only in [SelectInstance.trySelect],
161 * as [SelectInstance.selectInRegistrationPhase] can be called when the coroutine is already cancelled.
162 */
163 @InternalCoroutinesApi
164 public typealias OnCancellationConstructor = (select: SelectInstance<*>, param: Any?, internalResult: Any?) -> (Throwable) -> Unit
165
166 /**
167 * Clause for [select] expression without additional parameters that does not select any value.
168 */
169 public sealed interface SelectClause0 : SelectClause
170
171 internal class SelectClause0Impl(
172 override val clauseObject: Any,
173 override val regFunc: RegistrationFunction,
174 override val onCancellationConstructor: OnCancellationConstructor? = null
175 ) : SelectClause0 {
176 override val processResFunc: ProcessResultFunction = DUMMY_PROCESS_RESULT_FUNCTION
177 }
_null178 private val DUMMY_PROCESS_RESULT_FUNCTION: ProcessResultFunction = { _, _, _ -> null }
179
180 /**
181 * Clause for [select] expression without additional parameters that selects value of type [Q].
182 */
183 public sealed interface SelectClause1<out Q> : SelectClause
184
185 internal class SelectClause1Impl<Q>(
186 override val clauseObject: Any,
187 override val regFunc: RegistrationFunction,
188 override val processResFunc: ProcessResultFunction,
189 override val onCancellationConstructor: OnCancellationConstructor? = null
190 ) : SelectClause1<Q>
191
192 /**
193 * Clause for [select] expression with additional parameter of type [P] that selects value of type [Q].
194 */
195 public sealed interface SelectClause2<in P, out Q> : SelectClause
196
197 internal class SelectClause2Impl<P, Q>(
198 override val clauseObject: Any,
199 override val regFunc: RegistrationFunction,
200 override val processResFunc: ProcessResultFunction,
201 override val onCancellationConstructor: OnCancellationConstructor? = null
202 ) : SelectClause2<P, Q>
203
204 /**
205 * Internal representation of `select` instance.
206 *
207 * @suppress **This is unstable API, and it is subject to change.**
208 */
209 @InternalCoroutinesApi
210 public sealed interface SelectInstance<in R> {
211 /**
212 * The context of the coroutine that is performing this `select` operation.
213 */
214 public val context: CoroutineContext
215
216 /**
217 * This function should be called by other operations,
218 * which are trying to perform a rendezvous with this `select`.
219 * Returns `true` if the rendezvous succeeds, `false` otherwise.
220 *
221 * Note that according to the current implementation, a rendezvous attempt can fail
222 * when either another clause is already selected or this `select` is still in
223 * REGISTRATION phase. To distinguish the reasons, [SelectImplementation.trySelectDetailed]
224 * function can be used instead.
225 */
trySelectnull226 public fun trySelect(clauseObject: Any, result: Any?): Boolean
227
228 /**
229 * When this `select` instance is stored as a waiter, the specified [handle][disposableHandle]
230 * defines how the stored `select` should be removed in case of cancellation or another clause selection.
231 */
232 public fun disposeOnCompletion(disposableHandle: DisposableHandle)
233
234 /**
235 * When a clause becomes selected during registration, the corresponding internal result
236 * (which is further passed to the clause's [ProcessResultFunction]) should be provided
237 * via this function. After that, other clause registrations are ignored and [trySelect] fails.
238 */
239 public fun selectInRegistrationPhase(internalResult: Any?)
240 }
241 internal interface SelectInstanceInternal<R>: SelectInstance<R>, Waiter
242
243 @PublishedApi
244 internal open class SelectImplementation<R>(
245 override val context: CoroutineContext
246 ) : CancelHandler(), SelectBuilder<R>, SelectInstanceInternal<R> {
247
248 /**
249 * Essentially, the `select` operation is split into three phases: REGISTRATION, WAITING, and COMPLETION.
250 *
251 * == Phase 1: REGISTRATION ==
252 * In the first REGISTRATION phase, the user-specified [SelectBuilder] is applied, and all the listed clauses
253 * are registered via the provided [registration functions][SelectClause.regFunc]. Intuitively, `select` clause
254 * registration is similar to the plain blocking operation, with the only difference that this [SelectInstance]
255 * is stored as a waiter instead of continuation, and [SelectInstance.trySelect] is used to make a rendezvous.
256 * Also, when registering, it is possible for the operation to complete immediately, without waiting. In this case,
257 * [SelectInstance.selectInRegistrationPhase] should be used. Otherwise, when no rendezvous happens and this `select`
258 * instance is stored as a waiter, a completion handler for the registering clause should be specified via
259 * [SelectInstance.disposeOnCompletion]; this handler specifies how to remove this `select` instance from the
260 * clause object when another clause becomes selected or the operation cancels.
261 *
262 * After a clause registration is completed, another coroutine can attempt to make a rendezvous with this `select`.
263 * However, to resolve a race between clauses registration and [SelectInstance.trySelect], the latter fails when
264 * this `select` is still in REGISTRATION phase. Thus, the corresponding clause has to be registered again.
265 *
266 * In this phase, the `state` field stores either a special [STATE_REG] marker or
267 * a list of clauses to be re-registered due to failed rendezvous attempts.
268 *
269 * == Phase 2: WAITING ==
270 * If no rendezvous happens in REGISTRATION phase, the `select` operation moves to WAITING one and suspends until
271 * [SelectInstance.trySelect] is called. Also, when waiting, this `select` can be cancelled. In the latter case,
272 * further [SelectInstance.trySelect] attempts fail, and all the completion handlers, specified via
273 * [SelectInstance.disposeOnCompletion], are invoked to remove this `select` instance from the corresponding
274 * clause objects.
275 *
276 * In this phase, the `state` field stores either the continuation to be later resumed or a special `Cancelled`
277 * object (with the cancellation cause inside) when this `select` becomes cancelled.
278 *
279 * == Phase 3: COMPLETION ==
280 * Once a rendezvous happens either in REGISTRATION phase (via [SelectInstance.selectInRegistrationPhase]) or
281 * in WAITING phase (via [SelectInstance.trySelect]), this `select` moves to the final `COMPLETION` phase.
282 * First, the provided internal result is processed via the [ProcessResultFunction] of the selected clause;
283 * it returns the argument for the user-specified block or throws an exception (see [SendChannel.onSend] as
284 * an example). After that, this `select` should be removed from all other clause objects by calling the
285 * corresponding [DisposableHandle]-s, provided via [SelectInstance.disposeOnCompletion] during registration.
286 * At the end, the user-specified block is called and this `select` finishes.
287 *
288 * In this phase, once a rendezvous is happened, the `state` field stores the corresponding clause.
289 * After that, it moves to [STATE_COMPLETED] to avoid memory leaks.
290 *
291 *
292 *
293 * The state machine is listed below:
294 *
295 * REGISTRATION PHASE WAITING PHASE COMPLETION PHASE
296 * ⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢ ⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢ ⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢
297 *
298 * +-----------+ +-----------+
299 * | CANCELLED | | COMPLETED |
300 * +-----------+ +-----------+
301 * ^ ^
302 * INITIAL STATE | | this `select`
303 * ------------+ | cancelled | is completed
304 * \ | |
305 * +=============+ move to +------+ successful +------------+
306 * +--| STATE_REG |---------------> | cont |-----------------| ClauseData |
307 * | +=============+ WAITING phase +------+ trySelect(..) +------------+
308 * | ^ | ^
309 * | | | some clause has been selected during registration |
310 * add a | | +-------------------------------------------------------+
311 * clause to be | | |
312 * re-registered | | re-register some clause has been selected |
313 * | | clauses during registration while there |
314 * v | are clauses to be re-registered; |
315 * +------------------+ ignore the latter |
316 * +--| List<ClauseData> |-----------------------------------------------------+
317 * | +------------------+
318 * | ^
319 * | | add one more clause
320 * | | for re-registration
321 * +------------+
322 *
323 * One of the most valuable benefits of this `select` design is that it allows processing clauses
324 * in a way similar to plain operations, such as `send` or `receive` on channels. The only difference
325 * is that instead of continuation, the operation should store the provided `select` instance object.
326 * Thus, this design makes it possible to support the `select` expression for any blocking data structure
327 * in Kotlin Coroutines.
328 *
329 * It is worth mentioning that the algorithm above provides "obstruction-freedom" non-blocking guarantee
330 * instead of the standard "lock-freedom" to avoid using heavy descriptors. In practice, this relaxation
331 * does not make significant difference. However, it is vital for Kotlin Coroutines to provide some
332 * non-blocking guarantee, as users may add blocking code in [SelectBuilder], and this blocking code
333 * should not cause blocking behaviour in other places, such as an attempt to make a rendezvous with
334 * the `select` that is hang in REGISTRATION phase.
335 *
336 * Also, this implementation is NOT linearizable under some circumstances. The reason is that a rendezvous
337 * attempt with `select` (via [SelectInstance.trySelect]) may fail when this `select` operation is still
338 * in REGISTRATION phase. Consider the following situation on two empty rendezvous channels `c1` and `c2`
339 * and the `select` operation that tries to send an element to one of these channels. First, this `select`
340 * instance is registered as a waiter in `c1`. After that, another thread can observe that `c1` is no longer
341 * empty and try to receive an element from `c1` -- this receive attempt fails due to the `select` operation
342 * being in REGISTRATION phase.
343 * It is also possible to observe that this `select` operation registered in `c2` first, and only after that in
344 * `c1` (it has to re-register in `c1` after the unsuccessful rendezvous attempt), which is also non-linearizable.
345 * We, however, find such a non-linearizable behaviour not so important in practice and leverage the correctness
346 * relaxation for the algorithm simplicity and the non-blocking progress guarantee.
347 */
348
349 /**
350 * The state of this `select` operation. See the description above for details.
351 */
352 private val state = atomic<Any>(STATE_REG)
353 /**
354 * Returns `true` if this `select` instance is in the REGISTRATION phase;
355 * otherwise, returns `false`.
356 */
357 private val inRegistrationPhase
358 get() = state.value.let {
359 it === STATE_REG || it is List<*>
360 }
361 /**
362 * Returns `true` if this `select` is already selected;
363 * thus, other parties are bound to fail when making a rendezvous with it.
364 */
365 private val isSelected
366 get() = state.value is SelectImplementation<*>.ClauseData
367 /**
368 * Returns `true` if this `select` is cancelled.
369 */
370 private val isCancelled
371 get() = state.value === STATE_CANCELLED
372
373 /**
374 * List of clauses waiting on this `select` instance.
375 */
376 private var clauses: MutableList<ClauseData>? = ArrayList(2)
377
378 /**
379 * Stores the completion action provided through [disposeOnCompletion] or [invokeOnCancellation]
380 * during clause registration. After that, if the clause is successfully registered
381 * (so, it has not completed immediately), this handler is stored into
382 * the corresponding [ClauseData] instance.
383 *
384 * Note that either [DisposableHandle] is provided, or a [Segment] instance with
385 * the index in it, which specify the location of storing this `select`.
386 * In the latter case, [Segment.onCancellation] should be called on completion/cancellation.
387 */
388 private var disposableHandleOrSegment: Any? = null
389
390 /**
391 * In case the disposable handle is specified via [Segment]
392 * and index in it, implying calling [Segment.onCancellation],
393 * the corresponding index is stored in this field.
394 * The segment is stored in [disposableHandleOrSegment].
395 */
396 private var indexInSegment: Int = -1
397
398 /**
399 * Stores the result passed via [selectInRegistrationPhase] during clause registration
400 * or [trySelect], which is called by another coroutine trying to make a rendezvous
401 * with this `select` instance. Further, this result is processed via the
402 * [ProcessResultFunction] of the selected clause.
403 *
404 * Unfortunately, we cannot store the result in the [state] field, as the latter stores
405 * the clause object upon selection (see [ClauseData.clauseObject] and [SelectClause.clauseObject]).
406 * Instead, it is possible to merge the [internalResult] and [disposableHandle] fields into
407 * one that stores either result when the clause is successfully registered ([inRegistrationPhase] is `true`),
408 * or [DisposableHandle] instance when the clause is completed during registration ([inRegistrationPhase] is `false`).
409 * Yet, this optimization is omitted for code simplicity.
410 */
411 private var internalResult: Any? = NO_RESULT
412
413 /**
414 * This function is called after the [SelectBuilder] is applied. In case one of the clauses is already selected,
415 * the algorithm applies the corresponding [ProcessResultFunction] and invokes the user-specified [block][ClauseData.block].
416 * Otherwise, it moves this `select` to WAITING phase (re-registering clauses if needed), suspends until a rendezvous
417 * is happened, and then completes the operation by applying the corresponding [ProcessResultFunction] and
418 * invoking the user-specified [block][ClauseData.block].
419 */
420 @PublishedApi
421 internal open suspend fun doSelect(): R =
422 if (isSelected) complete() // Fast path
423 else doSelectSuspend() // Slow path
424
425 // We separate the following logic as it has two suspension points
426 // and, therefore, breaks the tail-call optimization if it were
427 // inlined in [doSelect]
428 private suspend fun doSelectSuspend(): R {
429 // In case no clause has been selected during registration,
430 // the `select` operation suspends and waits for a rendezvous.
431 waitUntilSelected() // <-- suspend call => no tail-call optimization here
432 // There is a selected clause! Apply the corresponding
433 // [ProcessResultFunction] and invoke the user-specified block.
434 return complete() // <-- one more suspend call
435 }
436
437 // ========================
438 // = CLAUSES REGISTRATION =
439 // ========================
440
441 override fun SelectClause0.invoke(block: suspend () -> R) =
442 ClauseData(clauseObject, regFunc, processResFunc, PARAM_CLAUSE_0, block, onCancellationConstructor).register()
443 override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) =
444 ClauseData(clauseObject, regFunc, processResFunc, null, block, onCancellationConstructor).register()
445 override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) =
446 ClauseData(clauseObject, regFunc, processResFunc, param, block, onCancellationConstructor).register()
447
448 /**
449 * Attempts to register this `select` clause. If another clause is already selected,
450 * this function does nothing and completes immediately.
451 * Otherwise, it registers this `select` instance in
452 * the [clause object][ClauseData.clauseObject]
453 * according to the provided [registration function][ClauseData.regFunc].
454 * On success, this `select` instance is stored as a waiter
455 * in the clause object -- the algorithm also stores
456 * the provided via [disposeOnCompletion] completion action
457 * and adds the clause to the list of registered one.
458 * In case of registration failure, the internal result
459 * (not processed by [ProcessResultFunction] yet) must be
460 * provided via [selectInRegistrationPhase] -- the algorithm
461 * updates the state to this clause reference.
462 */
463 @JvmName("register")
464 internal fun ClauseData.register(reregister: Boolean = false) {
465 assert { state.value !== STATE_CANCELLED }
466 // Is there already selected clause?
467 if (state.value.let { it is SelectImplementation<*>.ClauseData }) return
468 // For new clauses, check that there does not exist
469 // another clause with the same object.
470 if (!reregister) checkClauseObject(clauseObject)
471 // Try to register in the corresponding object.
472 if (tryRegisterAsWaiter(this@SelectImplementation)) {
473 // Successfully registered, and this `select` instance
474 // is stored as a waiter. Add this clause to the list
475 // of registered clauses and store the provided via
476 // [invokeOnCompletion] completion action into the clause.
477 //
478 // Importantly, the [waitUntilSelected] function is implemented
479 // carefully to ensure that the cancellation handler has not been
480 // installed when clauses re-register, so the logic below cannot
481 // be invoked concurrently with the clean-up procedure.
482 // This also guarantees that the list of clauses cannot be cleared
483 // in the registration phase, so it is safe to read it with "!!".
484 if (!reregister) clauses!! += this
485 disposableHandleOrSegment = this@SelectImplementation.disposableHandleOrSegment
486 indexInSegment = this@SelectImplementation.indexInSegment
487 this@SelectImplementation.disposableHandleOrSegment = null
488 this@SelectImplementation.indexInSegment = -1
489 } else {
490 // This clause has been selected!
491 // Update the state correspondingly.
492 state.value = this
493 }
494 }
495
496 /**
497 * Checks that there does not exist another clause with the same object.
498 */
499 private fun checkClauseObject(clauseObject: Any) {
500 // Read the list of clauses, it is guaranteed that it is non-null.
501 // In fact, it can become `null` only in the clean-up phase, while
502 // this check can be called only in the registration one.
503 val clauses = clauses!!
504 // Check that there does not exist another clause with the same object.
505 check(clauses.none { it.clauseObject === clauseObject }) {
506 "Cannot use select clauses on the same object: $clauseObject"
507 }
508 }
509
510 override fun disposeOnCompletion(disposableHandle: DisposableHandle) {
511 this.disposableHandleOrSegment = disposableHandle
512 }
513
514 /**
515 * An optimized version for the code below that does not allocate
516 * a cancellation handler object and efficiently stores the specified
517 * [segment] and [index].
518 *
519 * ```
520 * disposeOnCompletion {
521 * segment.onCancellation(index, null)
522 * }
523 * ```
524 */
525 override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
526 this.disposableHandleOrSegment = segment
527 this.indexInSegment = index
528 }
529
530 override fun selectInRegistrationPhase(internalResult: Any?) {
531 this.internalResult = internalResult
532 }
533
534 // =========================
535 // = WAITING FOR SELECTION =
536 // =========================
537
538 /**
539 * Suspends and waits until some clause is selected. However, it is possible for a concurrent
540 * coroutine to invoke [trySelect] while this `select` is still in REGISTRATION phase.
541 * In this case, [trySelect] marks the corresponding select clause to be re-registered, and
542 * this function performs registration of such clauses. After that, it atomically stores
543 * the continuation into the [state] field if there is no more clause to be re-registered.
544 */
545 private suspend fun waitUntilSelected() = suspendCancellableCoroutine<Unit> sc@ { cont ->
546 // Update the state.
547 state.loop { curState ->
548 when {
549 // This `select` is in REGISTRATION phase, and there is no clause to be re-registered.
550 // Perform a transition to WAITING phase by storing the current continuation.
551 curState === STATE_REG -> if (state.compareAndSet(curState, cont)) {
552 // Perform a clean-up in case of cancellation.
553 //
554 // Importantly, we MUST install the cancellation handler
555 // only when the algorithm is bound to suspend. Otherwise,
556 // a race with [tryRegister] is possible, and the provided
557 // via [disposeOnCompletion] cancellation action can be ignored.
558 // Also, we MUST guarantee that this dispose handle is _visible_
559 // according to the memory model, and we CAN guarantee this when
560 // the state is updated.
561 cont.invokeOnCancellation(this.asHandler)
562 return@sc
563 }
564 // This `select` is in REGISTRATION phase, but there are clauses that has to be registered again.
565 // Perform the required registrations and try again.
566 curState is List<*> -> if (state.compareAndSet(curState, STATE_REG)) {
567 @Suppress("UNCHECKED_CAST")
568 curState as List<Any>
569 curState.forEach { reregisterClause(it) }
570 }
571 // This `select` operation became completed during clauses re-registration.
572 curState is SelectImplementation<*>.ClauseData -> {
573 cont.resume(Unit, curState.createOnCancellationAction(this, internalResult))
574 return@sc
575 }
576 // This `select` cannot be in any other state.
577 else -> error("unexpected state: $curState")
578 }
579 }
580 }
581
582 /**
583 * Re-registers the clause with the specified
584 * [clause object][clauseObject] after unsuccessful
585 * [trySelect] of this clause while the `select`
586 * was still in REGISTRATION phase.
587 */
588 private fun reregisterClause(clauseObject: Any) {
589 val clause = findClause(clauseObject)!! // it is guaranteed that the corresponding clause is presented
590 clause.disposableHandleOrSegment = null
591 clause.indexInSegment = -1
592 clause.register(reregister = true)
593 }
594
595 // ==============
596 // = RENDEZVOUS =
597 // ==============
598
599 override fun trySelect(clauseObject: Any, result: Any?): Boolean =
600 trySelectInternal(clauseObject, result) == TRY_SELECT_SUCCESSFUL
601
602 /**
603 * Similar to [trySelect] but provides a failure reason
604 * if this rendezvous is unsuccessful. We need this function
605 * in the channel implementation.
606 */
607 fun trySelectDetailed(clauseObject: Any, result: Any?) =
608 TrySelectDetailedResult(trySelectInternal(clauseObject, result))
609
610 private fun trySelectInternal(clauseObject: Any, internalResult: Any?): Int {
611 while (true) {
612 when (val curState = state.value) {
613 // Perform a rendezvous with this select if it is in WAITING state.
614 is CancellableContinuation<*> -> {
615 val clause = findClause(clauseObject) ?: continue // retry if `clauses` is already `null`
616 val onCancellation = clause.createOnCancellationAction(this@SelectImplementation, internalResult)
617 if (state.compareAndSet(curState, clause)) {
618 @Suppress("UNCHECKED_CAST")
619 val cont = curState as CancellableContinuation<Unit>
620 // Success! Store the resumption value and
621 // try to resume the continuation.
622 this.internalResult = internalResult
623 if (cont.tryResume(onCancellation)) return TRY_SELECT_SUCCESSFUL
624 // If the resumption failed, we need to clean
625 // the [result] field to avoid memory leaks.
626 this.internalResult = null
627 return TRY_SELECT_CANCELLED
628 }
629 }
630 // Already selected.
631 STATE_COMPLETED, is SelectImplementation<*>.ClauseData -> return TRY_SELECT_ALREADY_SELECTED
632 // Already cancelled.
633 STATE_CANCELLED -> return TRY_SELECT_CANCELLED
634 // This select is still in REGISTRATION phase, re-register the clause
635 // in order not to wait until this select moves to WAITING phase.
636 // This is a rare race, so we do not need to worry about performance here.
637 STATE_REG -> if (state.compareAndSet(curState, listOf(clauseObject))) return TRY_SELECT_REREGISTER
638 // This select is still in REGISTRATION phase, and the state stores a list of clauses
639 // for re-registration, add the selecting clause to this list.
640 // This is a rare race, so we do not need to worry about performance here.
641 is List<*> -> if (state.compareAndSet(curState, curState + clauseObject)) return TRY_SELECT_REREGISTER
642 // Another state? Something went really wrong.
643 else -> error("Unexpected state: $curState")
644 }
645 }
646 }
647
648 /**
649 * Finds the clause with the corresponding [clause object][SelectClause.clauseObject].
650 * If the reference to the list of clauses is already cleared due to completion/cancellation,
651 * this function returns `null`
652 */
653 private fun findClause(clauseObject: Any): ClauseData? {
654 // Read the list of clauses. If the `clauses` field is already `null`,
655 // the clean-up phase has already completed, and this function returns `null`.
656 val clauses = this.clauses ?: return null
657 // Find the clause with the specified clause object.
658 return clauses.find { it.clauseObject === clauseObject }
659 ?: error("Clause with object $clauseObject is not found")
660 }
661
662 // ==============
663 // = COMPLETION =
664 // ==============
665
666 /**
667 * Completes this `select` operation after the internal result is provided
668 * via [SelectInstance.trySelect] or [SelectInstance.selectInRegistrationPhase].
669 * (1) First, this function applies the [ProcessResultFunction] of the selected clause
670 * to the internal result.
671 * (2) After that, the [clean-up procedure][cleanup]
672 * is called to remove this `select` instance from other clause objects, and
673 * make it possible to collect it by GC after this `select` finishes.
674 * (3) Finally, the user-specified block is invoked
675 * with the processed result as an argument.
676 */
677 private suspend fun complete(): R {
678 assert { isSelected }
679 // Get the selected clause.
680 @Suppress("UNCHECKED_CAST")
681 val selectedClause = state.value as SelectImplementation<R>.ClauseData
682 // Perform the clean-up before the internal result processing and
683 // the user-specified block invocation to guarantee the absence
684 // of memory leaks. Collect the internal result before that.
685 val internalResult = this.internalResult
686 cleanup(selectedClause)
687 // Process the internal result and invoke the user's block.
688 return if (!RECOVER_STACK_TRACES) {
689 // TAIL-CALL OPTIMIZATION: the `suspend` block
690 // is invoked at the very end.
691 val blockArgument = selectedClause.processResult(internalResult)
692 selectedClause.invokeBlock(blockArgument)
693 } else {
694 // TAIL-CALL OPTIMIZATION: the `suspend`
695 // function is invoked at the very end.
696 // However, internally this `suspend` function
697 // constructs a state machine to recover a
698 // possible stack-trace.
699 processResultAndInvokeBlockRecoveringException(selectedClause, internalResult)
700 }
701 }
702
703 private suspend fun processResultAndInvokeBlockRecoveringException(clause: ClauseData, internalResult: Any?): R =
704 try {
705 val blockArgument = clause.processResult(internalResult)
706 clause.invokeBlock(blockArgument)
707 } catch (e: Throwable) {
708 // In the debug mode, we need to properly recover
709 // the stack-trace of the exception; the tail-call
710 // optimization cannot be applied here.
711 recoverAndThrow(e)
712 }
713
714 /**
715 * Invokes all [DisposableHandle]-s provided via
716 * [SelectInstance.disposeOnCompletion] during
717 * clause registrations.
718 */
719 private fun cleanup(selectedClause: ClauseData) {
720 assert { state.value == selectedClause }
721 // Read the list of clauses. If the `clauses` field is already `null`,
722 // a concurrent clean-up procedure has already completed, and it is safe to finish.
723 val clauses = this.clauses ?: return
724 // Invoke all cancellation handlers except for the
725 // one related to the selected clause, if specified.
726 clauses.forEach { clause ->
727 if (clause !== selectedClause) clause.dispose()
728 }
729 // We do need to clean all the data to avoid memory leaks.
730 this.state.value = STATE_COMPLETED
731 this.internalResult = NO_RESULT
732 this.clauses = null
733 }
734
735 // [CompletionHandler] implementation, must be invoked on cancellation.
736 override fun invoke(cause: Throwable?) {
737 // Update the state.
738 state.update { cur ->
739 // Finish immediately when this `select` is already completed.
740 // Notably, this select might be logically completed
741 // (the `state` field stores the selected `ClauseData`),
742 // while the continuation is already cancelled.
743 // We need to invoke the cancellation handler in this case.
744 if (cur === STATE_COMPLETED) return
745 STATE_CANCELLED
746 }
747 // Read the list of clauses. If the `clauses` field is already `null`,
748 // a concurrent clean-up procedure has already completed, and it is safe to finish.
749 val clauses = this.clauses ?: return
750 // Remove this `select` instance from all the clause object (channels, mutexes, etc.).
751 clauses.forEach { it.dispose() }
752 // We do need to clean all the data to avoid memory leaks.
753 this.internalResult = NO_RESULT
754 this.clauses = null
755 }
756
757 /**
758 * Each `select` clause is internally represented with a [ClauseData] instance.
759 */
760 internal inner class ClauseData(
761 @JvmField val clauseObject: Any, // the object of this `select` clause: Channel, Mutex, Job, ...
762 private val regFunc: RegistrationFunction,
763 private val processResFunc: ProcessResultFunction,
764 private val param: Any?, // the user-specified param
765 private val block: Any, // the user-specified block, which should be called if this clause becomes selected
766 @JvmField val onCancellationConstructor: OnCancellationConstructor?
767 ) {
768 @JvmField var disposableHandleOrSegment: Any? = null
769 @JvmField var indexInSegment: Int = -1
770
771 /**
772 * Tries to register the specified [select] instance in [clauseObject] and check
773 * whether the registration succeeded or a rendezvous has happened during the registration.
774 * This function returns `true` if this [select] is successfully registered and
775 * is _waiting_ for a rendezvous, or `false` when this clause becomes
776 * selected during registration.
777 *
778 * For example, the [Channel.onReceive] clause registration
779 * on a non-empty channel retrieves the first element and completes
780 * the corresponding [select] via [SelectInstance.selectInRegistrationPhase].
781 */
782 fun tryRegisterAsWaiter(select: SelectImplementation<R>): Boolean {
783 assert { select.inRegistrationPhase || select.isCancelled }
784 assert { select.internalResult === NO_RESULT }
785 regFunc(clauseObject, select, param)
786 return select.internalResult === NO_RESULT
787 }
788
789 /**
790 * Processes the internal result provided via either
791 * [SelectInstance.selectInRegistrationPhase] or
792 * [SelectInstance.trySelect] and returns an argument
793 * for the user-specified [block].
794 *
795 * Importantly, this function may throw an exception
796 * (e.g., when the channel is closed in [Channel.onSend], the
797 * corresponding [ProcessResultFunction] is bound to fail).
798 */
799 fun processResult(result: Any?) = processResFunc(clauseObject, param, result)
800
801 /**
802 * Invokes the user-specified block and returns
803 * the final result of this `select` clause.
804 */
805 @Suppress("UNCHECKED_CAST")
806 suspend fun invokeBlock(argument: Any?): R {
807 val block = block
808 // We distinguish no-argument and 1-argument
809 // lambdas via special markers for the clause
810 // parameters. Specifically, PARAM_CLAUSE_0
811 // is always used with [SelectClause0], which
812 // takes a no-argument lambda.
813 //
814 // TAIL-CALL OPTIMIZATION: we invoke
815 // the `suspend` block at the very end.
816 return if (this.param === PARAM_CLAUSE_0) {
817 block as suspend () -> R
818 block()
819 } else {
820 block as suspend (Any?) -> R
821 block(argument)
822 }
823 }
824
825 fun dispose() {
826 with(disposableHandleOrSegment) {
827 if (this is Segment<*>) {
828 this.onCancellation(indexInSegment, null, context)
829 } else {
830 (this as? DisposableHandle)?.dispose()
831 }
832 }
833 }
834
835 fun createOnCancellationAction(select: SelectInstance<*>, internalResult: Any?) =
836 onCancellationConstructor?.invoke(select, param, internalResult)
837 }
838 }
839
tryResumenull840 private fun CancellableContinuation<Unit>.tryResume(onCancellation: ((cause: Throwable) -> Unit)?): Boolean {
841 val token = tryResume(Unit, null, onCancellation) ?: return false
842 completeResume(token)
843 return true
844 }
845
846 // trySelectInternal(..) results.
847 private const val TRY_SELECT_SUCCESSFUL = 0
848 private const val TRY_SELECT_REREGISTER = 1
849 private const val TRY_SELECT_CANCELLED = 2
850 private const val TRY_SELECT_ALREADY_SELECTED = 3
851 // trySelectDetailed(..) results.
852 internal enum class TrySelectDetailedResult {
853 SUCCESSFUL, REREGISTER, CANCELLED, ALREADY_SELECTED
854 }
TrySelectDetailedResultnull855 private fun TrySelectDetailedResult(trySelectInternalResult: Int): TrySelectDetailedResult = when(trySelectInternalResult) {
856 TRY_SELECT_SUCCESSFUL -> SUCCESSFUL
857 TRY_SELECT_REREGISTER -> REREGISTER
858 TRY_SELECT_CANCELLED -> CANCELLED
859 TRY_SELECT_ALREADY_SELECTED -> ALREADY_SELECTED
860 else -> error("Unexpected internal result: $trySelectInternalResult")
861 }
862
863 // Markers for REGISTRATION, COMPLETED, and CANCELLED states.
864 private val STATE_REG = Symbol("STATE_REG")
865 private val STATE_COMPLETED = Symbol("STATE_COMPLETED")
866 private val STATE_CANCELLED = Symbol("STATE_CANCELLED")
867 // As the selection result is nullable, we use this special
868 // marker for the absence of result.
869 private val NO_RESULT = Symbol("NO_RESULT")
870 // We use this marker parameter objects to distinguish
871 // SelectClause[0,1,2] and invoke the user-specified block correctly.
872 internal val PARAM_CLAUSE_0 = Symbol("PARAM_CLAUSE_0")
873