• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.selects
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.internal.*
11 import kotlinx.coroutines.intrinsics.*
12 import kotlinx.coroutines.sync.*
13 import kotlin.coroutines.*
14 import kotlin.coroutines.intrinsics.*
15 import kotlin.jvm.*
16 
17 /**
18  * Scope for [select] invocation.
19  */
20 public interface SelectBuilder<in R> {
21     /**
22      * Registers a clause in this [select] expression without additional parameters that does not select any value.
23      */
24     public operator fun SelectClause0.invoke(block: suspend () -> R)
25 
26     /**
27      * Registers clause in this [select] expression without additional parameters that selects value of type [Q].
28      */
29     public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
30 
31     /**
32      * Registers clause in this [select] expression with additional parameter of type [P] that selects value of type [Q].
33      */
34     public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
35 
36     /**
37      * Registers clause in this [select] expression with additional parameter nullable parameter of type [P]
38      * with the `null` value for this parameter that selects value of type [Q].
39      */
40     public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R) = invoke(null, block)
41 
42     /**
43      * Clause that selects the given [block] after a specified timeout passes.
44      * If timeout is negative or zero, [block] is selected immediately.
45      *
46      * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future.
47      *
48      * @param timeMillis timeout time in milliseconds.
49      */
50     @ExperimentalCoroutinesApi
51     public fun onTimeout(timeMillis: Long, block: suspend () -> R)
52 }
53 
54 /**
55  * Clause for [select] expression without additional parameters that does not select any value.
56  */
57 public interface SelectClause0 {
58     /**
59      * Registers this clause with the specified [select] instance and [block] of code.
60      * @suppress **This is unstable API and it is subject to change.**
61      */
62     @InternalCoroutinesApi
registerSelectClause0null63     public fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R)
64 }
65 
66 /**
67  * Clause for [select] expression without additional parameters that selects value of type [Q].
68  */
69 public interface SelectClause1<out Q> {
70     /**
71      * Registers this clause with the specified [select] instance and [block] of code.
72      * @suppress **This is unstable API and it is subject to change.**
73      */
74     @InternalCoroutinesApi
75     public fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (Q) -> R)
76 }
77 
78 /**
79  * Clause for [select] expression with additional parameter of type [P] that selects value of type [Q].
80  */
81 public interface SelectClause2<in P, out Q> {
82     /**
83      * Registers this clause with the specified [select] instance and [block] of code.
84      * @suppress **This is unstable API and it is subject to change.**
85      */
86     @InternalCoroutinesApi
registerSelectClause2null87     public fun <R> registerSelectClause2(select: SelectInstance<R>, param: P, block: suspend (Q) -> R)
88 }
89 
90 /**
91  * Internal representation of select instance. This instance is called _selected_ when
92  * the clause to execute is already picked.
93  *
94  * @suppress **This is unstable API and it is subject to change.**
95  */
96 @InternalCoroutinesApi
97 public interface SelectInstance<in R> {
98     /**
99      * Returns `true` when this [select] statement had already picked a clause to execute.
100      */
101     public val isSelected: Boolean
102 
103     /**
104      * Tries to select this instance.
105      */
106     public fun trySelect(idempotent: Any?): Boolean
107 
108     /**
109      * Performs action atomically with [trySelect].
110      */
111     public fun performAtomicTrySelect(desc: AtomicDesc): Any?
112 
113     /**
114      * Returns completion continuation of this select instance.
115      * This select instance must be _selected_ first.
116      * All resumption through this instance happen _directly_ without going through dispatcher ([MODE_DIRECT]).
117      */
118     public val completion: Continuation<R>
119 
120     /**
121      * Resumes this instance in a cancellable way ([MODE_CANCELLABLE]).
122      */
123     public fun resumeSelectCancellableWithException(exception: Throwable)
124 
125     /**
126      * Disposes the specified handle when this instance is selected.
127      * Note, that [DisposableHandle.dispose] could be called multiple times.
128      */
129     public fun disposeOnSelect(handle: DisposableHandle)
130 }
131 
132 /**
133  * Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
134  * in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
135  * is either _selected_ or _fails_.
136  *
137  * At most one clause is *atomically* selected and its block is executed. The result of the selected clause
138  * becomes the result of the select. If any clause _fails_, then the select invocation produces the
139  * corresponding exception. No clause is selected in this case.
140  *
141  * This select function is _biased_ to the first clause. When multiple clauses can be selected at the same time,
142  * the first one of them gets priority. Use [selectUnbiased] for an unbiased (randomized) selection among
143  * the clauses.
144 
145  * There is no `default` clause for select expression. Instead, each selectable suspending function has the
146  * corresponding non-suspending version that can be used with a regular `when` expression to select one
147  * of the alternatives or to perform the default (`else`) action if none of them can be immediately selected.
148  *
149  * | **Receiver**     | **Suspending function**                       | **Select clause**                                | **Non-suspending version**
150  * | ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
151  * | [Job]            | [join][Job.join]                              | [onJoin][Job.onJoin]                             | [isCompleted][Job.isCompleted]
152  * | [Deferred]       | [await][Deferred.await]                       | [onAwait][Deferred.onAwait]                      | [isCompleted][Job.isCompleted]
153  * | [SendChannel]    | [send][SendChannel.send]                      | [onSend][SendChannel.onSend]                     | [offer][SendChannel.offer]
154  * | [ReceiveChannel] | [receive][ReceiveChannel.receive]             | [onReceive][ReceiveChannel.onReceive]            | [poll][ReceiveChannel.poll]
155  * | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][ReceiveChannel.onReceiveOrNull]| [poll][ReceiveChannel.poll]
156  * | [Mutex]          | [lock][Mutex.lock]                            | [onLock][Mutex.onLock]                           | [tryLock][Mutex.tryLock]
157  * | none             | [delay]                                       | [onTimeout][SelectBuilder.onTimeout]             | none
158  *
159  * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
160  * function is suspended, this function immediately resumes with [CancellationException].
161  *
162  * Atomicity of cancellation depends on the clause: [onSend][SendChannel.onSend], [onReceive][ReceiveChannel.onReceive],
163  * [onReceiveOrNull][ReceiveChannel.onReceiveOrNull], and [onLock][Mutex.onLock] clauses are
164  * *atomically cancellable*. When select throws [CancellationException] it means that those clauses had not performed
165  * their respective operations.
166  * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
167  * continue to execute even after it was cancelled from the same thread in the case when this select operation
168  * was already resumed on atomically cancellable clause and the continuation was posted for execution to the thread's queue.
169  *
170  * Note that this function does not check for cancellation when it is not suspended.
171  * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
172  */
selectnull173 public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R =
174     suspendCoroutineUninterceptedOrReturn { uCont ->
175         val scope = SelectBuilderImpl(uCont)
176         try {
177             builder(scope)
178         } catch (e: Throwable) {
179             scope.handleBuilderException(e)
180         }
181         scope.getResult()
182     }
183 
184 
185 @SharedImmutable
186 internal val ALREADY_SELECTED: Any = Symbol("ALREADY_SELECTED")
187 @SharedImmutable
188 private val UNDECIDED: Any = Symbol("UNDECIDED")
189 @SharedImmutable
190 private val RESUMED: Any = Symbol("RESUMED")
191 
192 @PublishedApi
193 internal class SelectBuilderImpl<in R>(
194     private val uCont: Continuation<R> // unintercepted delegate continuation
195 ) : LockFreeLinkedListHead(), SelectBuilder<R>,
196     SelectInstance<R>, Continuation<R>, CoroutineStackFrame {
197     override val callerFrame: CoroutineStackFrame?
198         get() = uCont as? CoroutineStackFrame
199 
getStackTraceElementnull200     override fun getStackTraceElement(): StackTraceElement? = null
201 
202     // selection state is "this" (list of nodes) initially and is replaced by idempotent marker (or null) when selected
203     private val _state = atomic<Any?>(this)
204 
205     // this is basically our own SafeContinuation
206     private val _result = atomic<Any?>(UNDECIDED)
207 
208     // cancellability support
209     @Volatile
210     private var parentHandle: DisposableHandle? = null
211 
212     /* Result state machine
213 
214         +-----------+   getResult   +---------------------+   resume   +---------+
215         | UNDECIDED | ------------> | COROUTINE_SUSPENDED | ---------> | RESUMED |
216         +-----------+               +---------------------+            +---------+
217               |
218               | resume
219               V
220         +------------+  getResult
221         | value/Fail | -----------+
222         +------------+            |
223               ^                   |
224               |                   |
225               +-------------------+
226      */
227 
228     override val context: CoroutineContext get() = uCont.context
229 
230     override val completion: Continuation<R> get() = this
231 
232     private inline fun doResume(value: () -> Any?, block: () -> Unit) {
233         assert { isSelected } // "Must be selected first"
234         _result.loop { result ->
235             when {
236                 result === UNDECIDED -> if (_result.compareAndSet(UNDECIDED, value())) return
237                 result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED,
238                         RESUMED
239                     )) {
240                     block()
241                     return
242                 }
243                 else -> throw IllegalStateException("Already resumed")
244             }
245         }
246     }
247 
248     // Resumes in MODE_DIRECT
resumeWithnull249     override fun resumeWith(result: Result<R>) {
250         doResume({ result.toState() }) {
251             if (result.isFailure) {
252                 uCont.resumeWithStackTrace(result.exceptionOrNull()!!)
253             } else {
254                 uCont.resumeWith(result)
255             }
256         }
257     }
258 
259     // Resumes in MODE_CANCELLABLE
resumeSelectCancellableWithExceptionnull260     override fun resumeSelectCancellableWithException(exception: Throwable) {
261         doResume({ CompletedExceptionally(exception) }) {
262             uCont.intercepted().resumeCancellableWithException(exception)
263         }
264     }
265 
266     @PublishedApi
getResultnull267     internal fun getResult(): Any? {
268         if (!isSelected) initCancellability()
269         var result = _result.value // atomic read
270         if (result === UNDECIDED) {
271             if (_result.compareAndSet(UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
272             result = _result.value // reread volatile var
273         }
274         when {
275             result === RESUMED -> throw IllegalStateException("Already resumed")
276             result is CompletedExceptionally -> throw result.cause
277             else -> return result // either COROUTINE_SUSPENDED or data
278         }
279     }
280 
initCancellabilitynull281     private fun initCancellability() {
282         val parent = context[Job] ?: return
283         val newRegistration = parent.invokeOnCompletion(
284             onCancelling = true, handler = SelectOnCancelling(parent).asHandler)
285         parentHandle = newRegistration
286         // now check our state _after_ registering
287         if (isSelected) newRegistration.dispose()
288     }
289 
290     private inner class SelectOnCancelling(job: Job) : JobCancellingNode<Job>(job) {
291         // Note: may be invoked multiple times, but only the first trySelect succeeds anyway
invokenull292         override fun invoke(cause: Throwable?) {
293             if (trySelect(null))
294                 resumeSelectCancellableWithException(job.getCancellationException())
295         }
toStringnull296         override fun toString(): String = "SelectOnCancelling[${this@SelectBuilderImpl}]"
297     }
298 
299     private val state: Any? get() {
300         _state.loop { state ->
301             if (state !is OpDescriptor) return state
302             state.perform(this)
303         }
304     }
305 
306     @PublishedApi
handleBuilderExceptionnull307     internal fun handleBuilderException(e: Throwable) {
308         if (trySelect(null)) {
309             resumeWithException(e)
310         } else if (e !is CancellationException) {
311             /*
312              * Cannot handle this exception -- builder was already resumed with a different exception,
313              * so treat it as "unhandled exception". But only if  it is not the completion reason
314              *  and it's not the cancellation. Otherwise, in the face of structured concurrency
315              * the same exception will be reported to theglobal exception handler.
316              */
317             val result = getResult()
318             if (result !is CompletedExceptionally || unwrap(result.cause) !== unwrap(e)) {
319                 handleCoroutineException(context, e)
320             }
321         }
322     }
323 
324     override val isSelected: Boolean get() = state !== this
325 
disposeOnSelectnull326     override fun disposeOnSelect(handle: DisposableHandle) {
327         val node = DisposeNode(handle)
328         // check-add-check pattern is Ok here since handle.dispose() is safe to be called multiple times
329         if (!isSelected) {
330             addLast(node) // add handle to list
331             // double-check node after adding
332             if (!isSelected) return // all ok - still not selected
333         }
334         // already selected
335         handle.dispose()
336     }
337 
doAfterSelectnull338     private fun doAfterSelect() {
339         parentHandle?.dispose()
340         forEach<DisposeNode> {
341             it.handle.dispose()
342         }
343     }
344 
345     // it is just like start(), but support idempotent start
trySelectnull346     override fun trySelect(idempotent: Any?): Boolean {
347         assert { idempotent !is OpDescriptor } // "cannot use OpDescriptor as idempotent marker"
348         while (true) { // lock-free loop on state
349             val state = this.state
350             when {
351                 state === this -> {
352                     if (_state.compareAndSet(this, idempotent)) {
353                         doAfterSelect()
354                         return true
355                     }
356                 }
357                 // otherwise -- already selected
358                 idempotent == null -> return false // already selected
359                 state === idempotent -> return true // was selected with this marker
360                 else -> return false
361             }
362         }
363     }
364 
performAtomicTrySelectnull365     override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
366         AtomicSelectOp(desc).perform(null)
367 
368     private inner class AtomicSelectOp(
369         @JvmField val desc: AtomicDesc
370     ) : AtomicOp<Any?>() {
371         override fun prepare(affected: Any?): Any? {
372             // only originator of operation makes preparation move of installing descriptor into this selector's state
373             // helpers should never do it, or risk ruining progress when they come late
374             if (affected == null) {
375                 // we are originator (affected reference is not null if helping)
376                 prepareIfNotSelected()?.let { return it }
377             }
378             return desc.prepare(this)
379         }
380 
381         override fun complete(affected: Any?, failure: Any?) {
382             completeSelect(failure)
383             desc.complete(this, failure)
384         }
385 
386         fun prepareIfNotSelected(): Any? {
387             _state.loop { state ->
388                 when {
389                     state === this@AtomicSelectOp -> return null // already in progress
390                     state is OpDescriptor -> state.perform(this@SelectBuilderImpl) // help
391                     state === this@SelectBuilderImpl -> {
392                         if (_state.compareAndSet(this@SelectBuilderImpl, this@AtomicSelectOp))
393                             return null // success
394                     }
395                     else -> return ALREADY_SELECTED
396                 }
397             }
398         }
399 
400         private fun completeSelect(failure: Any?) {
401             val selectSuccess = failure == null
402             val update = if (selectSuccess) null else this@SelectBuilderImpl
403             if (_state.compareAndSet(this@AtomicSelectOp, update)) {
404                 if (selectSuccess)
405                     doAfterSelect()
406             }
407         }
408     }
409 
invokenull410     override fun SelectClause0.invoke(block: suspend () -> R) {
411         registerSelectClause0(this@SelectBuilderImpl, block)
412     }
413 
invokenull414     override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
415         registerSelectClause1(this@SelectBuilderImpl, block)
416     }
417 
invokenull418     override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {
419         registerSelectClause2(this@SelectBuilderImpl, param, block)
420     }
421 
onTimeoutnull422     override fun onTimeout(timeMillis: Long, block: suspend () -> R) {
423         if (timeMillis <= 0L) {
424             if (trySelect(null))
425                 block.startCoroutineUnintercepted(completion)
426             return
427         }
428         val action = Runnable {
429             // todo: we could have replaced startCoroutine with startCoroutineUndispatched
430             // But we need a way to know that Delay.invokeOnTimeout had used the right thread
431             if (trySelect(null))
432                 block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
433         }
434         disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action))
435     }
436 
437     private class DisposeNode(
438         @JvmField val handle: DisposableHandle
439     ) : LockFreeLinkedListNode()
440 }
441