1 /*
<lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.selects
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.internal.*
11 import kotlinx.coroutines.intrinsics.*
12 import kotlinx.coroutines.sync.*
13 import kotlin.coroutines.*
14 import kotlin.coroutines.intrinsics.*
15 import kotlin.jvm.*
16
17 /**
18 * Scope for [select] invocation.
19 */
20 public interface SelectBuilder<in R> {
21 /**
22 * Registers a clause in this [select] expression without additional parameters that does not select any value.
23 */
24 public operator fun SelectClause0.invoke(block: suspend () -> R)
25
26 /**
27 * Registers clause in this [select] expression without additional parameters that selects value of type [Q].
28 */
29 public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
30
31 /**
32 * Registers clause in this [select] expression with additional parameter of type [P] that selects value of type [Q].
33 */
34 public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
35
36 /**
37 * Registers clause in this [select] expression with additional parameter nullable parameter of type [P]
38 * with the `null` value for this parameter that selects value of type [Q].
39 */
40 public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R) = invoke(null, block)
41
42 /**
43 * Clause that selects the given [block] after a specified timeout passes.
44 * If timeout is negative or zero, [block] is selected immediately.
45 *
46 * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future.
47 *
48 * @param timeMillis timeout time in milliseconds.
49 */
50 @ExperimentalCoroutinesApi
51 public fun onTimeout(timeMillis: Long, block: suspend () -> R)
52 }
53
54 /**
55 * Clause for [select] expression without additional parameters that does not select any value.
56 */
57 public interface SelectClause0 {
58 /**
59 * Registers this clause with the specified [select] instance and [block] of code.
60 * @suppress **This is unstable API and it is subject to change.**
61 */
62 @InternalCoroutinesApi
registerSelectClause0null63 public fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R)
64 }
65
66 /**
67 * Clause for [select] expression without additional parameters that selects value of type [Q].
68 */
69 public interface SelectClause1<out Q> {
70 /**
71 * Registers this clause with the specified [select] instance and [block] of code.
72 * @suppress **This is unstable API and it is subject to change.**
73 */
74 @InternalCoroutinesApi
75 public fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (Q) -> R)
76 }
77
78 /**
79 * Clause for [select] expression with additional parameter of type [P] that selects value of type [Q].
80 */
81 public interface SelectClause2<in P, out Q> {
82 /**
83 * Registers this clause with the specified [select] instance and [block] of code.
84 * @suppress **This is unstable API and it is subject to change.**
85 */
86 @InternalCoroutinesApi
registerSelectClause2null87 public fun <R> registerSelectClause2(select: SelectInstance<R>, param: P, block: suspend (Q) -> R)
88 }
89
90 /**
91 * Internal representation of select instance. This instance is called _selected_ when
92 * the clause to execute is already picked.
93 *
94 * @suppress **This is unstable API and it is subject to change.**
95 */
96 @InternalCoroutinesApi
97 public interface SelectInstance<in R> {
98 /**
99 * Returns `true` when this [select] statement had already picked a clause to execute.
100 */
101 public val isSelected: Boolean
102
103 /**
104 * Tries to select this instance.
105 */
106 public fun trySelect(idempotent: Any?): Boolean
107
108 /**
109 * Performs action atomically with [trySelect].
110 */
111 public fun performAtomicTrySelect(desc: AtomicDesc): Any?
112
113 /**
114 * Returns completion continuation of this select instance.
115 * This select instance must be _selected_ first.
116 * All resumption through this instance happen _directly_ without going through dispatcher ([MODE_DIRECT]).
117 */
118 public val completion: Continuation<R>
119
120 /**
121 * Resumes this instance in a cancellable way ([MODE_CANCELLABLE]).
122 */
123 public fun resumeSelectCancellableWithException(exception: Throwable)
124
125 /**
126 * Disposes the specified handle when this instance is selected.
127 * Note, that [DisposableHandle.dispose] could be called multiple times.
128 */
129 public fun disposeOnSelect(handle: DisposableHandle)
130 }
131
132 /**
133 * Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
134 * in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
135 * is either _selected_ or _fails_.
136 *
137 * At most one clause is *atomically* selected and its block is executed. The result of the selected clause
138 * becomes the result of the select. If any clause _fails_, then the select invocation produces the
139 * corresponding exception. No clause is selected in this case.
140 *
141 * This select function is _biased_ to the first clause. When multiple clauses can be selected at the same time,
142 * the first one of them gets priority. Use [selectUnbiased] for an unbiased (randomized) selection among
143 * the clauses.
144
145 * There is no `default` clause for select expression. Instead, each selectable suspending function has the
146 * corresponding non-suspending version that can be used with a regular `when` expression to select one
147 * of the alternatives or to perform the default (`else`) action if none of them can be immediately selected.
148 *
149 * | **Receiver** | **Suspending function** | **Select clause** | **Non-suspending version**
150 * | ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
151 * | [Job] | [join][Job.join] | [onJoin][Job.onJoin] | [isCompleted][Job.isCompleted]
152 * | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted]
153 * | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend] | [offer][SendChannel.offer]
154 * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive] | [poll][ReceiveChannel.poll]
155 * | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][ReceiveChannel.onReceiveOrNull]| [poll][ReceiveChannel.poll]
156 * | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock] | [tryLock][Mutex.tryLock]
157 * | none | [delay] | [onTimeout][SelectBuilder.onTimeout] | none
158 *
159 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
160 * function is suspended, this function immediately resumes with [CancellationException].
161 *
162 * Atomicity of cancellation depends on the clause: [onSend][SendChannel.onSend], [onReceive][ReceiveChannel.onReceive],
163 * [onReceiveOrNull][ReceiveChannel.onReceiveOrNull], and [onLock][Mutex.onLock] clauses are
164 * *atomically cancellable*. When select throws [CancellationException] it means that those clauses had not performed
165 * their respective operations.
166 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
167 * continue to execute even after it was cancelled from the same thread in the case when this select operation
168 * was already resumed on atomically cancellable clause and the continuation was posted for execution to the thread's queue.
169 *
170 * Note that this function does not check for cancellation when it is not suspended.
171 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
172 */
selectnull173 public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R =
174 suspendCoroutineUninterceptedOrReturn { uCont ->
175 val scope = SelectBuilderImpl(uCont)
176 try {
177 builder(scope)
178 } catch (e: Throwable) {
179 scope.handleBuilderException(e)
180 }
181 scope.getResult()
182 }
183
184
185 @SharedImmutable
186 internal val ALREADY_SELECTED: Any = Symbol("ALREADY_SELECTED")
187 @SharedImmutable
188 private val UNDECIDED: Any = Symbol("UNDECIDED")
189 @SharedImmutable
190 private val RESUMED: Any = Symbol("RESUMED")
191
192 @PublishedApi
193 internal class SelectBuilderImpl<in R>(
194 private val uCont: Continuation<R> // unintercepted delegate continuation
195 ) : LockFreeLinkedListHead(), SelectBuilder<R>,
196 SelectInstance<R>, Continuation<R>, CoroutineStackFrame {
197 override val callerFrame: CoroutineStackFrame?
198 get() = uCont as? CoroutineStackFrame
199
getStackTraceElementnull200 override fun getStackTraceElement(): StackTraceElement? = null
201
202 // selection state is "this" (list of nodes) initially and is replaced by idempotent marker (or null) when selected
203 private val _state = atomic<Any?>(this)
204
205 // this is basically our own SafeContinuation
206 private val _result = atomic<Any?>(UNDECIDED)
207
208 // cancellability support
209 @Volatile
210 private var parentHandle: DisposableHandle? = null
211
212 /* Result state machine
213
214 +-----------+ getResult +---------------------+ resume +---------+
215 | UNDECIDED | ------------> | COROUTINE_SUSPENDED | ---------> | RESUMED |
216 +-----------+ +---------------------+ +---------+
217 |
218 | resume
219 V
220 +------------+ getResult
221 | value/Fail | -----------+
222 +------------+ |
223 ^ |
224 | |
225 +-------------------+
226 */
227
228 override val context: CoroutineContext get() = uCont.context
229
230 override val completion: Continuation<R> get() = this
231
232 private inline fun doResume(value: () -> Any?, block: () -> Unit) {
233 assert { isSelected } // "Must be selected first"
234 _result.loop { result ->
235 when {
236 result === UNDECIDED -> if (_result.compareAndSet(UNDECIDED, value())) return
237 result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED,
238 RESUMED
239 )) {
240 block()
241 return
242 }
243 else -> throw IllegalStateException("Already resumed")
244 }
245 }
246 }
247
248 // Resumes in MODE_DIRECT
resumeWithnull249 override fun resumeWith(result: Result<R>) {
250 doResume({ result.toState() }) {
251 if (result.isFailure) {
252 uCont.resumeWithStackTrace(result.exceptionOrNull()!!)
253 } else {
254 uCont.resumeWith(result)
255 }
256 }
257 }
258
259 // Resumes in MODE_CANCELLABLE
resumeSelectCancellableWithExceptionnull260 override fun resumeSelectCancellableWithException(exception: Throwable) {
261 doResume({ CompletedExceptionally(exception) }) {
262 uCont.intercepted().resumeCancellableWithException(exception)
263 }
264 }
265
266 @PublishedApi
getResultnull267 internal fun getResult(): Any? {
268 if (!isSelected) initCancellability()
269 var result = _result.value // atomic read
270 if (result === UNDECIDED) {
271 if (_result.compareAndSet(UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
272 result = _result.value // reread volatile var
273 }
274 when {
275 result === RESUMED -> throw IllegalStateException("Already resumed")
276 result is CompletedExceptionally -> throw result.cause
277 else -> return result // either COROUTINE_SUSPENDED or data
278 }
279 }
280
initCancellabilitynull281 private fun initCancellability() {
282 val parent = context[Job] ?: return
283 val newRegistration = parent.invokeOnCompletion(
284 onCancelling = true, handler = SelectOnCancelling(parent).asHandler)
285 parentHandle = newRegistration
286 // now check our state _after_ registering
287 if (isSelected) newRegistration.dispose()
288 }
289
290 private inner class SelectOnCancelling(job: Job) : JobCancellingNode<Job>(job) {
291 // Note: may be invoked multiple times, but only the first trySelect succeeds anyway
invokenull292 override fun invoke(cause: Throwable?) {
293 if (trySelect(null))
294 resumeSelectCancellableWithException(job.getCancellationException())
295 }
toStringnull296 override fun toString(): String = "SelectOnCancelling[${this@SelectBuilderImpl}]"
297 }
298
299 private val state: Any? get() {
300 _state.loop { state ->
301 if (state !is OpDescriptor) return state
302 state.perform(this)
303 }
304 }
305
306 @PublishedApi
handleBuilderExceptionnull307 internal fun handleBuilderException(e: Throwable) {
308 if (trySelect(null)) {
309 resumeWithException(e)
310 } else if (e !is CancellationException) {
311 /*
312 * Cannot handle this exception -- builder was already resumed with a different exception,
313 * so treat it as "unhandled exception". But only if it is not the completion reason
314 * and it's not the cancellation. Otherwise, in the face of structured concurrency
315 * the same exception will be reported to theglobal exception handler.
316 */
317 val result = getResult()
318 if (result !is CompletedExceptionally || unwrap(result.cause) !== unwrap(e)) {
319 handleCoroutineException(context, e)
320 }
321 }
322 }
323
324 override val isSelected: Boolean get() = state !== this
325
disposeOnSelectnull326 override fun disposeOnSelect(handle: DisposableHandle) {
327 val node = DisposeNode(handle)
328 // check-add-check pattern is Ok here since handle.dispose() is safe to be called multiple times
329 if (!isSelected) {
330 addLast(node) // add handle to list
331 // double-check node after adding
332 if (!isSelected) return // all ok - still not selected
333 }
334 // already selected
335 handle.dispose()
336 }
337
doAfterSelectnull338 private fun doAfterSelect() {
339 parentHandle?.dispose()
340 forEach<DisposeNode> {
341 it.handle.dispose()
342 }
343 }
344
345 // it is just like start(), but support idempotent start
trySelectnull346 override fun trySelect(idempotent: Any?): Boolean {
347 assert { idempotent !is OpDescriptor } // "cannot use OpDescriptor as idempotent marker"
348 while (true) { // lock-free loop on state
349 val state = this.state
350 when {
351 state === this -> {
352 if (_state.compareAndSet(this, idempotent)) {
353 doAfterSelect()
354 return true
355 }
356 }
357 // otherwise -- already selected
358 idempotent == null -> return false // already selected
359 state === idempotent -> return true // was selected with this marker
360 else -> return false
361 }
362 }
363 }
364
performAtomicTrySelectnull365 override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
366 AtomicSelectOp(desc).perform(null)
367
368 private inner class AtomicSelectOp(
369 @JvmField val desc: AtomicDesc
370 ) : AtomicOp<Any?>() {
371 override fun prepare(affected: Any?): Any? {
372 // only originator of operation makes preparation move of installing descriptor into this selector's state
373 // helpers should never do it, or risk ruining progress when they come late
374 if (affected == null) {
375 // we are originator (affected reference is not null if helping)
376 prepareIfNotSelected()?.let { return it }
377 }
378 return desc.prepare(this)
379 }
380
381 override fun complete(affected: Any?, failure: Any?) {
382 completeSelect(failure)
383 desc.complete(this, failure)
384 }
385
386 fun prepareIfNotSelected(): Any? {
387 _state.loop { state ->
388 when {
389 state === this@AtomicSelectOp -> return null // already in progress
390 state is OpDescriptor -> state.perform(this@SelectBuilderImpl) // help
391 state === this@SelectBuilderImpl -> {
392 if (_state.compareAndSet(this@SelectBuilderImpl, this@AtomicSelectOp))
393 return null // success
394 }
395 else -> return ALREADY_SELECTED
396 }
397 }
398 }
399
400 private fun completeSelect(failure: Any?) {
401 val selectSuccess = failure == null
402 val update = if (selectSuccess) null else this@SelectBuilderImpl
403 if (_state.compareAndSet(this@AtomicSelectOp, update)) {
404 if (selectSuccess)
405 doAfterSelect()
406 }
407 }
408 }
409
invokenull410 override fun SelectClause0.invoke(block: suspend () -> R) {
411 registerSelectClause0(this@SelectBuilderImpl, block)
412 }
413
invokenull414 override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
415 registerSelectClause1(this@SelectBuilderImpl, block)
416 }
417
invokenull418 override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {
419 registerSelectClause2(this@SelectBuilderImpl, param, block)
420 }
421
onTimeoutnull422 override fun onTimeout(timeMillis: Long, block: suspend () -> R) {
423 if (timeMillis <= 0L) {
424 if (trySelect(null))
425 block.startCoroutineUnintercepted(completion)
426 return
427 }
428 val action = Runnable {
429 // todo: we could have replaced startCoroutine with startCoroutineUndispatched
430 // But we need a way to know that Delay.invokeOnTimeout had used the right thread
431 if (trySelect(null))
432 block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
433 }
434 disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action))
435 }
436
437 private class DisposeNode(
438 @JvmField val handle: DisposableHandle
439 ) : LockFreeLinkedListNode()
440 }
441