1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.channels
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.intrinsics.*
11 import kotlinx.coroutines.selects.*
12 import kotlin.coroutines.*
13 import kotlin.jvm.*
14 import kotlin.native.concurrent.*
15
16 /**
17 * Abstract send channel. It is a base class for all send channel implementations.
18 */
19 internal abstract class AbstractSendChannel<E>(
20 @JvmField protected val onUndeliveredElement: OnUndeliveredElement<E>?
21 ) : SendChannel<E> {
22 /** @suppress **This is unstable API and it is subject to change.** */
23 protected val queue = LockFreeLinkedListHead()
24
25 // ------ extension points for buffered channels ------
26
27 /**
28 * Returns `true` if [isBufferFull] is always `true`.
29 * @suppress **This is unstable API and it is subject to change.**
30 */
31 protected abstract val isBufferAlwaysFull: Boolean
32
33 /**
34 * Returns `true` if this channel's buffer is full.
35 * This operation should be atomic if it is invoked by [enqueueSend].
36 * @suppress **This is unstable API and it is subject to change.**
37 */
38 protected abstract val isBufferFull: Boolean
39
40 // State transitions: null -> handler -> HANDLER_INVOKED
41 private val onCloseHandler = atomic<Any?>(null)
42
43 // ------ internal functions for override by buffered channels ------
44
45 /**
46 * Tries to add element to buffer or to queued receiver.
47 * Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
48 * @suppress **This is unstable API and it is subject to change.**
49 */
50 protected open fun offerInternal(element: E): Any {
51 while (true) {
52 val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
53 val token = receive.tryResumeReceive(element, null)
54 if (token != null) {
55 assert { token === RESUME_TOKEN }
56 receive.completeResumeReceive(element)
57 return receive.offerResult
58 }
59 }
60 }
61
62 /**
63 * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
64 * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
65 * @suppress **This is unstable API and it is subject to change.**
66 */
67 protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
68 // offer atomically with select
69 val offerOp = describeTryOffer(element)
70 val failure = select.performAtomicTrySelect(offerOp)
71 if (failure != null) return failure
72 val receive = offerOp.result
73 receive.completeResumeReceive(element)
74 return receive.offerResult
75 }
76
77 // ------ state functions & helpers for concrete implementations ------
78
79 /**
80 * Returns non-null closed token if it is last in the queue.
81 * @suppress **This is unstable API and it is subject to change.**
82 */
83 protected val closedForSend: Closed<*>? get() = (queue.prevNode as? Closed<*>)?.also { helpClose(it) }
84
85 /**
86 * Returns non-null closed token if it is first in the queue.
87 * @suppress **This is unstable API and it is subject to change.**
88 */
89 protected val closedForReceive: Closed<*>? get() = (queue.nextNode as? Closed<*>)?.also { helpClose(it) }
90
91 /**
92 * Retrieves first sending waiter from the queue or returns closed token.
93 * @suppress **This is unstable API and it is subject to change.**
94 */
95 protected fun takeFirstSendOrPeekClosed(): Send? =
96 queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
97
98 /**
99 * Queues buffered element, returns null on success or
100 * returns node reference if it was already closed or is waiting for receive.
101 * @suppress **This is unstable API and it is subject to change.**
102 */
103 protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
104 queue.addLastIfPrev(SendBuffered(element)) { prev ->
105 if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
106 true
107 }
108 return null
109 }
110
111 /**
112 * @suppress **This is unstable API and it is subject to change.**
113 */
114 protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
115
116 private open class SendBufferedDesc<E>(
117 queue: LockFreeLinkedListHead,
118 element: E
119 ) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
120 override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
121 is Closed<*> -> affected
122 is ReceiveOrClosed<*> -> OFFER_FAILED
123 else -> null
124 }
125 }
126
127 // ------ SendChannel ------
128
129 public final override val isClosedForSend: Boolean get() = closedForSend != null
130 public override val isFull: Boolean get() = isFullImpl
131 protected val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
132
133 public final override suspend fun send(element: E) {
134 // fast path -- try offer non-blocking
135 if (offerInternal(element) === OFFER_SUCCESS) return
136 // slow-path does suspend or throws exception
137 return sendSuspend(element)
138 }
139
140 public final override fun offer(element: E): Boolean {
141 val result = offerInternal(element)
142 return when {
143 result === OFFER_SUCCESS -> true
144 result === OFFER_FAILED -> {
145 // We should check for closed token on offer as well, otherwise offer won't be linearizable
146 // in the face of concurrent close()
147 // See https://github.com/Kotlin/kotlinx.coroutines/issues/359
148 throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false))
149 }
150 result is Closed<*> -> {
151 throw recoverStackTrace(helpCloseAndGetSendException(element, result))
152 }
153 else -> error("offerInternal returned $result")
154 }
155 }
156
157 private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable {
158 // To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
159 // See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
160 helpClose(closed)
161 // Element was not delivered -> cals onUndeliveredElement
162 onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
163 // If it crashes, add send exception as suppressed for better diagnostics
164 it.addSuppressed(closed.sendException)
165 throw it
166 }
167 return closed.sendException
168 }
169
170 private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->
171 loop@ while (true) {
172 if (isFullImpl) {
173 val send = if (onUndeliveredElement == null)
174 SendElement(element, cont) else
175 SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)
176 val enqueueResult = enqueueSend(send)
177 when {
178 enqueueResult == null -> { // enqueued successfully
179 cont.removeOnCancellation(send)
180 return@sc
181 }
182 enqueueResult is Closed<*> -> {
183 cont.helpCloseAndResumeWithSendException(element, enqueueResult)
184 return@sc
185 }
186 enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
187 enqueueResult is Receive<*> -> {} // try to offer instead
188 else -> error("enqueueSend returned $enqueueResult")
189 }
190 }
191 // hm... receiver is waiting or buffer is not full. try to offer
192 val offerResult = offerInternal(element)
193 when {
194 offerResult === OFFER_SUCCESS -> {
195 cont.resume(Unit)
196 return@sc
197 }
198 offerResult === OFFER_FAILED -> continue@loop
199 offerResult is Closed<*> -> {
200 cont.helpCloseAndResumeWithSendException(element, offerResult)
201 return@sc
202 }
203 else -> error("offerInternal returned $offerResult")
204 }
205 }
206 }
207
208 private fun Continuation<*>.helpCloseAndResumeWithSendException(element: E, closed: Closed<*>) {
209 helpClose(closed)
210 val sendException = closed.sendException
211 onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
212 it.addSuppressed(sendException)
213 resumeWithException(it)
214 return
215 }
216 resumeWithException(sendException)
217 }
218
219 /**
220 * Result is:
221 * * null -- successfully enqueued
222 * * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
223 * * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
224 */
225 protected open fun enqueueSend(send: Send): Any? {
226 if (isBufferAlwaysFull) {
227 queue.addLastIfPrev(send) { prev ->
228 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
229 true
230 }
231 } else {
232 if (!queue.addLastIfPrevAndIf(send, { prev ->
233 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
234 true
235 }, { isBufferFull }))
236 return ENQUEUE_FAILED
237 }
238 return null
239 }
240
241 public override fun close(cause: Throwable?): Boolean {
242 val closed = Closed<E>(cause)
243 /*
244 * Try to commit close by adding a close token to the end of the queue.
245 * Successful -> we're now responsible for closing receivers
246 * Not successful -> help closing pending receivers to maintain invariant
247 * "if (!close()) next send will throw"
248 */
249 val closeAdded = queue.addLastIfPrev(closed) { it !is Closed<*> }
250 val actuallyClosed = if (closeAdded) closed else queue.prevNode as Closed<*>
251 helpClose(actuallyClosed)
252 if (closeAdded) invokeOnCloseHandler(cause)
253 return closeAdded // true if we have closed
254 }
255
256 private fun invokeOnCloseHandler(cause: Throwable?) {
257 val handler = onCloseHandler.value
258 if (handler !== null && handler !== HANDLER_INVOKED
259 && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
260 // CAS failed -> concurrent invokeOnClose() invoked handler
261 @Suppress("UNCHECKED_CAST")
262 (handler as Handler)(cause)
263 }
264 }
265
266 override fun invokeOnClose(handler: Handler) {
267 // Intricate dance for concurrent invokeOnClose and close calls
268 if (!onCloseHandler.compareAndSet(null, handler)) {
269 val value = onCloseHandler.value
270 if (value === HANDLER_INVOKED) {
271 throw IllegalStateException("Another handler was already registered and successfully invoked")
272 }
273
274 throw IllegalStateException("Another handler was already registered: $value")
275 } else {
276 val closedToken = closedForSend
277 if (closedToken != null && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
278 // CAS failed -> close() call invoked handler
279 (handler)(closedToken.closeCause)
280 }
281 }
282 }
283
284 private fun helpClose(closed: Closed<*>) {
285 /*
286 * It's important to traverse list from right to left to avoid races with sender.
287 * Consider channel state: head -> [receive_1] -> [receive_2] -> head
288 * - T1 calls receive()
289 * - T2 calls close()
290 * - T3 calls close() + send(value)
291 *
292 * If both will traverse list from left to right, following non-linearizable history is possible:
293 * [close -> false], [send -> transferred 'value' to receiver]
294 *
295 * Another problem with linearizability of close is that we cannot resume closed receives until all
296 * receivers are removed from the list.
297 * Consider channel state: head -> [receive_1] -> [receive_2] -> head
298 * - T1 called receive_2, and will call send() when it's receive call resumes
299 * - T2 calls close()
300 *
301 * Now if T2's close resumes T1's receive_2 then it's receive gets "closed for receive" exception, but
302 * its subsequent attempt to send successfully rendezvous with receive_1, producing non-linearizable execution.
303 */
304 var closedList = InlineList<Receive<E>>()
305 while (true) {
306 // Break when channel is empty or has no receivers
307 @Suppress("UNCHECKED_CAST")
308 val previous = closed.prevNode as? Receive<E> ?: break
309 if (!previous.remove()) {
310 // failed to remove the node (due to race) -- retry finding non-removed prevNode
311 // NOTE: remove() DOES NOT help pending remove operation (that marked next pointer)
312 previous.helpRemove() // make sure remove is complete before continuing
313 continue
314 }
315 // add removed nodes to a separate list
316 closedList += previous
317 }
318 /*
319 * Now notify all removed nodes that the channel was closed
320 * in the order they were added to the channel
321 */
322 closedList.forEachReversed { it.resumeReceiveClosed(closed) }
323 // and do other post-processing
324 onClosedIdempotent(closed)
325 }
326
327 /**
328 * Invoked when channel is closed as the last action of [close] invocation.
329 * This method should be idempotent and can be called multiple times.
330 */
331 protected open fun onClosedIdempotent(closed: LockFreeLinkedListNode) {}
332
333 /**
334 * Retrieves first receiving waiter from the queue or returns closed token.
335 * @suppress **This is unstable API and it is subject to change.**
336 */
337 protected open fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
338 queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
339
340 // ------ registerSelectSend ------
341
342 /**
343 * @suppress **This is unstable API and it is subject to change.**
344 */
345 protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
346
347 /**
348 * @suppress **This is unstable API and it is subject to change.**
349 */
350 protected class TryOfferDesc<E>(
351 @JvmField val element: E,
352 queue: LockFreeLinkedListHead
353 ) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
354 override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
355 is Closed<*> -> affected
356 !is ReceiveOrClosed<*> -> OFFER_FAILED
357 else -> null
358 }
359
360 @Suppress("UNCHECKED_CAST")
361 override fun onPrepare(prepareOp: PrepareOp): Any? {
362 val affected = prepareOp.affected as ReceiveOrClosed<E> // see "failure" impl
363 val token = affected.tryResumeReceive(element, prepareOp) ?: return REMOVE_PREPARED
364 if (token === RETRY_ATOMIC) return RETRY_ATOMIC
365 assert { token === RESUME_TOKEN }
366 return null
367 }
368 }
369
370 final override val onSend: SelectClause2<E, SendChannel<E>>
371 get() = object : SelectClause2<E, SendChannel<E>> {
372 override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
373 registerSelectSend(select, param, block)
374 }
375 }
376
377 private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
378 while (true) {
379 if (select.isSelected) return
380 if (isFullImpl) {
381 val node = SendSelect(element, this, select, block)
382 val enqueueResult = enqueueSend(node)
383 when {
384 enqueueResult == null -> { // enqueued successfully
385 select.disposeOnSelect(node)
386 return
387 }
388 enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(element, enqueueResult))
389 enqueueResult === ENQUEUE_FAILED -> {} // try to offer
390 enqueueResult is Receive<*> -> {} // try to offer
391 else -> error("enqueueSend returned $enqueueResult ")
392 }
393 }
394 // hm... receiver is waiting or buffer is not full. try to offer
395 val offerResult = offerSelectInternal(element, select)
396 when {
397 offerResult === ALREADY_SELECTED -> return
398 offerResult === OFFER_FAILED -> {} // retry
399 offerResult === RETRY_ATOMIC -> {} // retry
400 offerResult === OFFER_SUCCESS -> {
401 block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
402 return
403 }
404 offerResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(element, offerResult))
405 else -> error("offerSelectInternal returned $offerResult")
406 }
407 }
408 }
409
410 // ------ debug ------
411
412 public override fun toString() =
413 "$classSimpleName@$hexAddress{$queueDebugStateString}$bufferDebugString"
414
415 private val queueDebugStateString: String
416 get() {
417 val head = queue.nextNode
418 if (head === queue) return "EmptyQueue"
419 var result = when (head) {
420 is Closed<*> -> head.toString()
421 is Receive<*> -> "ReceiveQueued"
422 is Send -> "SendQueued"
423 else -> "UNEXPECTED:$head" // should not happen
424 }
425 val tail = queue.prevNode
426 if (tail !== head) {
427 result += ",queueSize=${countQueueSize()}"
428 if (tail is Closed<*>) result += ",closedForSend=$tail"
429 }
430 return result
431 }
432
433 private fun countQueueSize(): Int {
434 var size = 0
435 queue.forEach<LockFreeLinkedListNode> { size++ }
436 return size
437 }
438
439 protected open val bufferDebugString: String get() = ""
440
441 // ------ private ------
442
443 private class SendSelect<E, R>(
444 override val pollResult: E, // E | Closed - the result pollInternal returns when it rendezvous with this node
445 @JvmField val channel: AbstractSendChannel<E>,
446 @JvmField val select: SelectInstance<R>,
447 @JvmField val block: suspend (SendChannel<E>) -> R
448 ) : Send(), DisposableHandle {
449 override fun tryResumeSend(otherOp: PrepareOp?): Symbol? =
450 select.trySelectOther(otherOp) as Symbol? // must return symbol
451
452 override fun completeResumeSend() {
453 block.startCoroutineCancellable(receiver = channel, completion = select.completion)
454 }
455
456 override fun dispose() { // invoked on select completion
457 if (!remove()) return
458 // if the node was successfully removed (meaning it was added but was not received) then element not delivered
459 undeliveredElement()
460 }
461
462 override fun resumeSendClosed(closed: Closed<*>) {
463 if (select.trySelect())
464 select.resumeSelectWithException(closed.sendException)
465 }
466
467 override fun undeliveredElement() {
468 channel.onUndeliveredElement?.callUndeliveredElement(pollResult, select.completion.context)
469 }
470
471 override fun toString(): String = "SendSelect@$hexAddress($pollResult)[$channel, $select]"
472 }
473
474 internal class SendBuffered<out E>(
475 @JvmField val element: E
476 ) : Send() {
477 override val pollResult: Any? get() = element
478 override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
479 override fun completeResumeSend() {}
480 override fun resumeSendClosed(closed: Closed<*>) {}
481 override fun toString(): String = "SendBuffered@$hexAddress($element)"
482 }
483 }
484
485 /**
486 * Abstract send/receive channel. It is a base class for all channel implementations.
487 */
488 internal abstract class AbstractChannel<E>(
489 onUndeliveredElement: OnUndeliveredElement<E>?
490 ) : AbstractSendChannel<E>(onUndeliveredElement), Channel<E> {
491 // ------ extension points for buffered channels ------
492
493 /**
494 * Returns `true` if [isBufferEmpty] is always `true`.
495 * @suppress **This is unstable API and it is subject to change.**
496 */
497 protected abstract val isBufferAlwaysEmpty: Boolean
498
499 /**
500 * Returns `true` if this channel's buffer is empty.
501 * This operation should be atomic if it is invoked by [enqueueReceive].
502 * @suppress **This is unstable API and it is subject to change.**
503 */
504 protected abstract val isBufferEmpty: Boolean
505
506 // ------ internal functions for override by buffered channels ------
507
508 /**
509 * Tries to remove element from buffer or from queued sender.
510 * Return type is `E | POLL_FAILED | Closed`
511 * @suppress **This is unstable API and it is subject to change.**
512 */
pollInternalnull513 protected open fun pollInternal(): Any? {
514 while (true) {
515 val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
516 val token = send.tryResumeSend(null)
517 if (token != null) {
518 assert { token === RESUME_TOKEN }
519 send.completeResumeSend()
520 return send.pollResult
521 }
522 // too late, already cancelled, but we removed it from the queue and need to notify on undelivered element
523 send.undeliveredElement()
524 }
525 }
526
527 /**
528 * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
529 * Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
530 * @suppress **This is unstable API and it is subject to change.**
531 */
pollSelectInternalnull532 protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
533 // poll atomically with select
534 val pollOp = describeTryPoll()
535 val failure = select.performAtomicTrySelect(pollOp)
536 if (failure != null) return failure
537 val send = pollOp.result
538 send.completeResumeSend()
539 return pollOp.result.pollResult
540 }
541
542 // ------ state functions & helpers for concrete implementations ------
543
544 /**
545 * @suppress **This is unstable API and it is subject to change.**
546 */
547 protected val hasReceiveOrClosed: Boolean get() = queue.nextNode is ReceiveOrClosed<*>
548
549 // ------ ReceiveChannel ------
550
551 public override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
552 public override val isEmpty: Boolean get() = isEmptyImpl
553 protected val isEmptyImpl: Boolean get() = queue.nextNode !is Send && isBufferEmpty
554
receivenull555 public final override suspend fun receive(): E {
556 // fast path -- try poll non-blocking
557 val result = pollInternal()
558 /*
559 * If result is Closed -- go to tail-call slow-path that will allow us to
560 * properly recover stacktrace without paying a performance cost on fast path.
561 * We prefer to recover stacktrace using suspending path to have a more precise stacktrace.
562 */
563 @Suppress("UNCHECKED_CAST")
564 if (result !== POLL_FAILED && result !is Closed<*>) return result as E
565 // slow-path does suspend
566 return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)
567 }
568
569 @Suppress("UNCHECKED_CAST")
receiveSuspendnull570 private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable sc@ { cont ->
571 val receive = if (onUndeliveredElement == null)
572 ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) else
573 ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement)
574 while (true) {
575 if (enqueueReceive(receive)) {
576 removeReceiveOnCancel(cont, receive)
577 return@sc
578 }
579 // hm... something is not right. try to poll
580 val result = pollInternal()
581 if (result is Closed<*>) {
582 receive.resumeReceiveClosed(result)
583 return@sc
584 }
585 if (result !== POLL_FAILED) {
586 cont.resume(receive.resumeValue(result as E), receive.resumeOnCancellationFun(result as E))
587 return@sc
588 }
589 }
590 }
591
enqueueReceiveInternalnull592 protected open fun enqueueReceiveInternal(receive: Receive<E>): Boolean = if (isBufferAlwaysEmpty)
593 queue.addLastIfPrev(receive) { it !is Send } else
<lambda>null594 queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
595
resultnull596 private fun enqueueReceive(receive: Receive<E>) = enqueueReceiveInternal(receive).also { result ->
597 if (result) onReceiveEnqueued()
598 }
599
receiveOrNullnull600 public final override suspend fun receiveOrNull(): E? {
601 // fast path -- try poll non-blocking
602 val result = pollInternal()
603 @Suppress("UNCHECKED_CAST")
604 if (result !== POLL_FAILED && result !is Closed<*>) return result as E
605 // slow-path does suspend
606 return receiveSuspend(RECEIVE_NULL_ON_CLOSE)
607 }
608
609 @Suppress("UNCHECKED_CAST")
receiveOrNullResultnull610 private fun receiveOrNullResult(result: Any?): E? {
611 if (result is Closed<*>) {
612 if (result.closeCause != null) throw recoverStackTrace(result.closeCause)
613 return null
614 }
615 return result as E
616 }
617
618 @Suppress("UNCHECKED_CAST")
receiveOrClosednull619 public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
620 // fast path -- try poll non-blocking
621 val result = pollInternal()
622 if (result !== POLL_FAILED) return result.toResult()
623 // slow-path does suspend
624 return receiveSuspend(RECEIVE_RESULT)
625 }
626
627 @Suppress("UNCHECKED_CAST")
pollnull628 public final override fun poll(): E? {
629 val result = pollInternal()
630 return if (result === POLL_FAILED) null else receiveOrNullResult(result)
631 }
632
633 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
cancelnull634 final override fun cancel(cause: Throwable?): Boolean =
635 cancelInternal(cause)
636
637 final override fun cancel(cause: CancellationException?) {
638 cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
639 }
640
641 // It needs to be internal to support deprecated cancel(Throwable?) API
cancelInternalnull642 internal fun cancelInternal(cause: Throwable?): Boolean =
643 close(cause).also {
644 onCancelIdempotent(it)
645 }
646
647 /**
648 * Method that is invoked right after [close] in [cancel] sequence.
649 * [wasClosed] is directly mapped to the value returned by [close].
650 */
onCancelIdempotentnull651 protected open fun onCancelIdempotent(wasClosed: Boolean) {
652 /*
653 * See the comment to helpClose, all these machinery (reversed order of iteration, postponed resume)
654 * has the same rationale.
655 */
656 val closed = closedForSend ?: error("Cannot happen")
657 var list = InlineList<Send>()
658 while (true) {
659 val previous = closed.prevNode
660 if (previous is LockFreeLinkedListHead) {
661 break
662 }
663 assert { previous is Send }
664 if (!previous.remove()) {
665 previous.helpRemove() // make sure remove is complete before continuing
666 continue
667 }
668 // Add to the list only **after** successful removal
669 list += previous as Send
670 }
671 list.forEachReversed { it.resumeSendClosed(closed) }
672 }
673
iteratornull674 public final override fun iterator(): ChannelIterator<E> = Itr(this)
675
676 // ------ registerSelectReceive ------
677
678 /**
679 * @suppress **This is unstable API and it is subject to change.**
680 */
681 protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
682
683 /**
684 * @suppress **This is unstable API and it is subject to change.**
685 */
686 protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
687 override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
688 is Closed<*> -> affected
689 !is Send -> POLL_FAILED
690 else -> null
691 }
692
693 @Suppress("UNCHECKED_CAST")
694 override fun onPrepare(prepareOp: PrepareOp): Any? {
695 val affected = prepareOp.affected as Send // see "failure" impl
696 val token = affected.tryResumeSend(prepareOp) ?: return REMOVE_PREPARED
697 if (token === RETRY_ATOMIC) return RETRY_ATOMIC
698 assert { token === RESUME_TOKEN }
699 return null
700 }
701
702 override fun onRemoved(affected: LockFreeLinkedListNode) {
703 // Called when we removed it from the queue but were too late to resume, so we have undelivered element
704 (affected as Send).undeliveredElement()
705 }
706 }
707
708 final override val onReceive: SelectClause1<E>
709 get() = object : SelectClause1<E> {
710 @Suppress("UNCHECKED_CAST")
registerSelectClause1null711 override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
712 registerSelectReceiveMode(select, RECEIVE_THROWS_ON_CLOSE, block as suspend (Any?) -> R)
713 }
714 }
715
716 final override val onReceiveOrNull: SelectClause1<E?>
717 get() = object : SelectClause1<E?> {
718 @Suppress("UNCHECKED_CAST")
registerSelectClause1null719 override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
720 registerSelectReceiveMode(select, RECEIVE_NULL_ON_CLOSE, block as suspend (Any?) -> R)
721 }
722 }
723
724 final override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
725 get() = object : SelectClause1<ValueOrClosed<E>> {
726 @Suppress("UNCHECKED_CAST")
registerSelectClause1null727 override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
728 registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R)
729 }
730 }
731
registerSelectReceiveModenull732 private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {
733 while (true) {
734 if (select.isSelected) return
735 if (isEmptyImpl) {
736 if (enqueueReceiveSelect(select, block, receiveMode)) return
737 } else {
738 val pollResult = pollSelectInternal(select)
739 when {
740 pollResult === ALREADY_SELECTED -> return
741 pollResult === POLL_FAILED -> {} // retry
742 pollResult === RETRY_ATOMIC -> {} // retry
743 else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult)
744 }
745 }
746 }
747 }
748
tryStartBlockUninterceptednull749 private fun <R> (suspend (Any?) -> R).tryStartBlockUnintercepted(select: SelectInstance<R>, receiveMode: Int, value: Any?) {
750 when (value) {
751 is Closed<*> -> {
752 when (receiveMode) {
753 RECEIVE_THROWS_ON_CLOSE -> {
754 throw recoverStackTrace(value.receiveException)
755 }
756 RECEIVE_RESULT -> {
757 if (!select.trySelect()) return
758 startCoroutineUnintercepted(ValueOrClosed.closed<Any>(value.closeCause), select.completion)
759 }
760 RECEIVE_NULL_ON_CLOSE -> {
761 if (value.closeCause == null) {
762 if (!select.trySelect()) return
763 startCoroutineUnintercepted(null, select.completion)
764 } else {
765 throw recoverStackTrace(value.receiveException)
766 }
767 }
768 }
769 }
770 else -> {
771 if (receiveMode == RECEIVE_RESULT) {
772 startCoroutineUnintercepted(value.toResult<Any>(), select.completion)
773 } else {
774 startCoroutineUnintercepted(value, select.completion)
775 }
776 }
777 }
778 }
779
enqueueReceiveSelectnull780 private fun <R> enqueueReceiveSelect(
781 select: SelectInstance<R>,
782 block: suspend (Any?) -> R,
783 receiveMode: Int
784 ): Boolean {
785 val node = ReceiveSelect(this, select, block, receiveMode)
786 val result = enqueueReceive(node)
787 if (result) select.disposeOnSelect(node)
788 return result
789 }
790
791 // ------ protected ------
792
takeFirstReceiveOrPeekClosednull793 override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
794 super.takeFirstReceiveOrPeekClosed().also {
795 if (it != null && it !is Closed<*>) onReceiveDequeued()
796 }
797
798 /**
799 * Invoked when receiver is successfully enqueued to the queue of waiting receivers.
800 * @suppress **This is unstable API and it is subject to change.**
801 */
onReceiveEnqueuednull802 protected open fun onReceiveEnqueued() {}
803
804 /**
805 * Invoked when enqueued receiver was successfully removed from the queue of waiting receivers.
806 * @suppress **This is unstable API and it is subject to change.**
807 */
onReceiveDequeuednull808 protected open fun onReceiveDequeued() {}
809
810 // ------ private ------
811
removeReceiveOnCancelnull812 private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) =
813 cont.invokeOnCancellation(handler = RemoveReceiveOnCancel(receive).asHandler)
814
815 private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : BeforeResumeCancelHandler() {
816 override fun invoke(cause: Throwable?) {
817 if (receive.remove())
818 onReceiveDequeued()
819 }
820 override fun toString(): String = "RemoveReceiveOnCancel[$receive]"
821 }
822
823 private class Itr<E>(@JvmField val channel: AbstractChannel<E>) : ChannelIterator<E> {
824 var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
825
hasNextnull826 override suspend fun hasNext(): Boolean {
827 // check for repeated hasNext
828 if (result !== POLL_FAILED) return hasNextResult(result)
829 // fast path -- try poll non-blocking
830 result = channel.pollInternal()
831 if (result !== POLL_FAILED) return hasNextResult(result)
832 // slow-path does suspend
833 return hasNextSuspend()
834 }
835
hasNextResultnull836 private fun hasNextResult(result: Any?): Boolean {
837 if (result is Closed<*>) {
838 if (result.closeCause != null) throw recoverStackTrace(result.receiveException)
839 return false
840 }
841 return true
842 }
843
hasNextSuspendnull844 private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutineReusable sc@ { cont ->
845 val receive = ReceiveHasNext(this, cont)
846 while (true) {
847 if (channel.enqueueReceive(receive)) {
848 channel.removeReceiveOnCancel(cont, receive)
849 return@sc
850 }
851 // hm... something is not right. try to poll
852 val result = channel.pollInternal()
853 this.result = result
854 if (result is Closed<*>) {
855 if (result.closeCause == null)
856 cont.resume(false)
857 else
858 cont.resumeWithException(result.receiveException)
859 return@sc
860 }
861 if (result !== POLL_FAILED) {
862 @Suppress("UNCHECKED_CAST")
863 cont.resume(true, channel.onUndeliveredElement?.bindCancellationFun(result as E, cont.context))
864 return@sc
865 }
866 }
867 }
868
869 @Suppress("UNCHECKED_CAST")
nextnull870 override fun next(): E {
871 val result = this.result
872 if (result is Closed<*>) throw recoverStackTrace(result.receiveException)
873 if (result !== POLL_FAILED) {
874 this.result = POLL_FAILED
875 return result as E
876 }
877
878 throw IllegalStateException("'hasNext' should be called prior to 'next' invocation")
879 }
880 }
881
882 private open class ReceiveElement<in E>(
883 @JvmField val cont: CancellableContinuation<Any?>,
884 @JvmField val receiveMode: Int
885 ) : Receive<E>() {
resumeValuenull886 fun resumeValue(value: E): Any? = when (receiveMode) {
887 RECEIVE_RESULT -> ValueOrClosed.value(value)
888 else -> value
889 }
890
tryResumeReceivenull891 override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
892 val token = cont.tryResume(resumeValue(value), otherOp?.desc, resumeOnCancellationFun(value)) ?: return null
893 assert { token === RESUME_TOKEN } // the only other possible result
894 // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
895 otherOp?.finishPrepare()
896 return RESUME_TOKEN
897 }
898
completeResumeReceivenull899 override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
900
901 override fun resumeReceiveClosed(closed: Closed<*>) {
902 when {
903 receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)
904 receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
905 else -> cont.resumeWithException(closed.receiveException)
906 }
907 }
toStringnull908 override fun toString(): String = "ReceiveElement@$hexAddress[receiveMode=$receiveMode]"
909 }
910
911 private class ReceiveElementWithUndeliveredHandler<in E>(
912 cont: CancellableContinuation<Any?>,
913 receiveMode: Int,
914 @JvmField val onUndeliveredElement: OnUndeliveredElement<E>
915 ) : ReceiveElement<E>(cont, receiveMode) {
916 override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
917 onUndeliveredElement.bindCancellationFun(value, cont.context)
918 }
919
920 private open class ReceiveHasNext<E>(
921 @JvmField val iterator: Itr<E>,
922 @JvmField val cont: CancellableContinuation<Boolean>
923 ) : Receive<E>() {
tryResumeReceivenull924 override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
925 val token = cont.tryResume(true, otherOp?.desc, resumeOnCancellationFun(value))
926 ?: return null
927 assert { token === RESUME_TOKEN } // the only other possible result
928 // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
929 otherOp?.finishPrepare()
930 return RESUME_TOKEN
931 }
932
completeResumeReceivenull933 override fun completeResumeReceive(value: E) {
934 /*
935 When otherOp != null invocation of tryResumeReceive can happen multiple times and much later,
936 but completeResumeReceive is called once so we set iterator result here.
937 */
938 iterator.result = value
939 cont.completeResume(RESUME_TOKEN)
940 }
941
resumeReceiveClosednull942 override fun resumeReceiveClosed(closed: Closed<*>) {
943 val token = if (closed.closeCause == null) {
944 cont.tryResume(false)
945 } else {
946 cont.tryResumeWithException(closed.receiveException)
947 }
948 if (token != null) {
949 iterator.result = closed
950 cont.completeResume(token)
951 }
952 }
953
resumeOnCancellationFunnull954 override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
955 iterator.channel.onUndeliveredElement?.bindCancellationFun(value, cont.context)
956
957 override fun toString(): String = "ReceiveHasNext@$hexAddress"
958 }
959
960 private class ReceiveSelect<R, E>(
961 @JvmField val channel: AbstractChannel<E>,
962 @JvmField val select: SelectInstance<R>,
963 @JvmField val block: suspend (Any?) -> R,
964 @JvmField val receiveMode: Int
965 ) : Receive<E>(), DisposableHandle {
966 override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =
967 select.trySelectOther(otherOp) as Symbol?
968
969 @Suppress("UNCHECKED_CAST")
970 override fun completeResumeReceive(value: E) {
971 block.startCoroutineCancellable(
972 if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value,
973 select.completion,
974 resumeOnCancellationFun(value)
975 )
976 }
977
978 override fun resumeReceiveClosed(closed: Closed<*>) {
979 if (!select.trySelect()) return
980 when (receiveMode) {
981 RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
982 RECEIVE_RESULT -> block.startCoroutineCancellable(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
983 RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
984 block.startCoroutineCancellable(null, select.completion)
985 } else {
986 select.resumeSelectWithException(closed.receiveException)
987 }
988 }
989 }
990
991 override fun dispose() { // invoked on select completion
992 if (remove())
993 channel.onReceiveDequeued() // notify cancellation of receive
994 }
995
996 override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
997 channel.onUndeliveredElement?.bindCancellationFun(value, select.completion.context)
998
999 override fun toString(): String = "ReceiveSelect@$hexAddress[$select,receiveMode=$receiveMode]"
1000 }
1001 }
1002
1003 // receiveMode values
1004 internal const val RECEIVE_THROWS_ON_CLOSE = 0
1005 internal const val RECEIVE_NULL_ON_CLOSE = 1
1006 internal const val RECEIVE_RESULT = 2
1007
1008 @JvmField
1009 @SharedImmutable
1010 internal val EMPTY = Symbol("EMPTY") // marker for Conflated & Buffered channels
1011
1012 @JvmField
1013 @SharedImmutable
1014 internal val OFFER_SUCCESS = Symbol("OFFER_SUCCESS")
1015
1016 @JvmField
1017 @SharedImmutable
1018 internal val OFFER_FAILED = Symbol("OFFER_FAILED")
1019
1020 @JvmField
1021 @SharedImmutable
1022 internal val POLL_FAILED = Symbol("POLL_FAILED")
1023
1024 @JvmField
1025 @SharedImmutable
1026 internal val ENQUEUE_FAILED = Symbol("ENQUEUE_FAILED")
1027
1028 @JvmField
1029 @SharedImmutable
1030 internal val HANDLER_INVOKED = Symbol("ON_CLOSE_HANDLER_INVOKED")
1031
1032 internal typealias Handler = (Throwable?) -> Unit
1033
1034 /**
1035 * Represents sending waiter in the queue.
1036 */
1037 internal abstract class Send : LockFreeLinkedListNode() {
1038 abstract val pollResult: Any? // E | Closed - the result pollInternal returns when it rendezvous with this node
1039 // Returns: null - failure,
1040 // RETRY_ATOMIC for retry (only when otherOp != null),
1041 // RESUME_TOKEN on success (call completeResumeSend)
1042 // Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
tryResumeSendnull1043 abstract fun tryResumeSend(otherOp: PrepareOp?): Symbol?
1044 abstract fun completeResumeSend()
1045 abstract fun resumeSendClosed(closed: Closed<*>)
1046 open fun undeliveredElement() {}
1047 }
1048
1049 /**
1050 * Represents receiver waiter in the queue or closed token.
1051 */
1052 internal interface ReceiveOrClosed<in E> {
1053 val offerResult: Any // OFFER_SUCCESS | Closed
1054 // Returns: null - failure,
1055 // RETRY_ATOMIC for retry (only when otherOp != null),
1056 // RESUME_TOKEN on success (call completeResumeReceive)
1057 // Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
tryResumeReceivenull1058 fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol?
1059 fun completeResumeReceive(value: E)
1060 }
1061
1062 /**
1063 * Represents sender for a specific element.
1064 */
1065 internal open class SendElement<E>(
1066 override val pollResult: E,
1067 @JvmField val cont: CancellableContinuation<Unit>
1068 ) : Send() {
1069 override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
1070 val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
1071 assert { token === RESUME_TOKEN } // the only other possible result
1072 // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
1073 otherOp?.finishPrepare() // finish preparations
1074 return RESUME_TOKEN
1075 }
1076
1077 override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
1078 override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
1079 override fun toString(): String = "$classSimpleName@$hexAddress($pollResult)"
1080 }
1081
1082 internal class SendElementWithUndeliveredHandler<E>(
1083 pollResult: E,
1084 cont: CancellableContinuation<Unit>,
1085 @JvmField val onUndeliveredElement: OnUndeliveredElement<E>
1086 ) : SendElement<E>(pollResult, cont) {
removenull1087 override fun remove(): Boolean {
1088 if (!super.remove()) return false
1089 // if the node was successfully removed (meaning it was added but was not received) then we have undelivered element
1090 undeliveredElement()
1091 return true
1092 }
1093
undeliveredElementnull1094 override fun undeliveredElement() {
1095 onUndeliveredElement.callUndeliveredElement(pollResult, cont.context)
1096 }
1097 }
1098
1099 /**
1100 * Represents closed channel.
1101 */
1102 internal class Closed<in E>(
1103 @JvmField val closeCause: Throwable?
1104 ) : Send(), ReceiveOrClosed<E> {
1105 val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
1106 val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1107
1108 override val offerResult get() = this
1109 override val pollResult get() = this
tryResumeSendnull1110 override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
completeResumeSendnull1111 override fun completeResumeSend() {}
<lambda>null1112 override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
completeResumeReceivenull1113 override fun completeResumeReceive(value: E) {}
resumeSendClosednull1114 override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
toStringnull1115 override fun toString(): String = "Closed@$hexAddress[$closeCause]"
1116 }
1117
1118 internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
1119 override val offerResult get() = OFFER_SUCCESS
1120 abstract fun resumeReceiveClosed(closed: Closed<*>)
1121 open fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? = null
1122 }
1123
1124 @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
toResultnull1125 private inline fun <E> Any?.toResult(): ValueOrClosed<E> =
1126 if (this is Closed<*>) ValueOrClosed.closed(closeCause) else ValueOrClosed.value(this as E)
1127
1128 @Suppress("NOTHING_TO_INLINE")
1129 private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(closeCause)
1130