1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.channels
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.intrinsics.*
11 import kotlinx.coroutines.selects.*
12 import kotlin.coroutines.*
13 import kotlin.jvm.*
14 import kotlin.native.concurrent.*
15
16 /**
17 * Abstract send channel. It is a base class for all send channel implementations.
18 */
19 internal abstract class AbstractSendChannel<E>(
20 @JvmField protected val onUndeliveredElement: OnUndeliveredElement<E>?
21 ) : SendChannel<E> {
22 /** @suppress **This is unstable API and it is subject to change.** */
23 protected val queue = LockFreeLinkedListHead()
24
25 // ------ extension points for buffered channels ------
26
27 /**
28 * Returns `true` if [isBufferFull] is always `true`.
29 * @suppress **This is unstable API and it is subject to change.**
30 */
31 protected abstract val isBufferAlwaysFull: Boolean
32
33 /**
34 * Returns `true` if this channel's buffer is full.
35 * This operation should be atomic if it is invoked by [enqueueSend].
36 * @suppress **This is unstable API and it is subject to change.**
37 */
38 protected abstract val isBufferFull: Boolean
39
40 // State transitions: null -> handler -> HANDLER_INVOKED
41 private val onCloseHandler = atomic<Any?>(null)
42
43 // ------ internal functions for override by buffered channels ------
44
45 /**
46 * Tries to add element to buffer or to queued receiver.
47 * Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
48 * @suppress **This is unstable API and it is subject to change.**
49 */
50 protected open fun offerInternal(element: E): Any {
51 while (true) {
52 val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
53 val token = receive.tryResumeReceive(element, null)
54 if (token != null) {
55 assert { token === RESUME_TOKEN }
56 receive.completeResumeReceive(element)
57 return receive.offerResult
58 }
59 }
60 }
61
62 /**
63 * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
64 * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
65 * @suppress **This is unstable API and it is subject to change.**
66 */
67 protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
68 // offer atomically with select
69 val offerOp = describeTryOffer(element)
70 val failure = select.performAtomicTrySelect(offerOp)
71 if (failure != null) return failure
72 val receive = offerOp.result
73 receive.completeResumeReceive(element)
74 return receive.offerResult
75 }
76
77 // ------ state functions & helpers for concrete implementations ------
78
79 /**
80 * Returns non-null closed token if it is last in the queue.
81 * @suppress **This is unstable API and it is subject to change.**
82 */
83 protected val closedForSend: Closed<*>? get() = (queue.prevNode as? Closed<*>)?.also { helpClose(it) }
84
85 /**
86 * Returns non-null closed token if it is first in the queue.
87 * @suppress **This is unstable API and it is subject to change.**
88 */
89 protected val closedForReceive: Closed<*>? get() = (queue.nextNode as? Closed<*>)?.also { helpClose(it) }
90
91 /**
92 * Retrieves first sending waiter from the queue or returns closed token.
93 * @suppress **This is unstable API and it is subject to change.**
94 */
95 protected fun takeFirstSendOrPeekClosed(): Send? =
96 queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
97
98 /**
99 * Queues buffered element, returns null on success or
100 * returns node reference if it was already closed or is waiting for receive.
101 * @suppress **This is unstable API and it is subject to change.**
102 */
103 protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
104 queue.addLastIfPrev(SendBuffered(element)) { prev ->
105 if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
106 true
107 }
108 return null
109 }
110
111 /**
112 * @suppress **This is unstable API and it is subject to change.**
113 */
114 protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
115
116 private open class SendBufferedDesc<E>(
117 queue: LockFreeLinkedListHead,
118 element: E
119 ) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
120 override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
121 is Closed<*> -> affected
122 is ReceiveOrClosed<*> -> OFFER_FAILED
123 else -> null
124 }
125 }
126
127 // ------ SendChannel ------
128
129 public final override val isClosedForSend: Boolean get() = closedForSend != null
130 private val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
131
132 public final override suspend fun send(element: E) {
133 // fast path -- try offer non-blocking
134 if (offerInternal(element) === OFFER_SUCCESS) return
135 // slow-path does suspend or throws exception
136 return sendSuspend(element)
137 }
138
139 @Suppress("DEPRECATION", "DEPRECATION_ERROR")
140 override fun offer(element: E): Boolean {
141 // Temporary migration for offer users who rely on onUndeliveredElement
142 try {
143 return super.offer(element)
144 } catch (e: Throwable) {
145 onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
146 // If it crashes, add send exception as suppressed for better diagnostics
147 it.addSuppressed(e)
148 throw it
149 }
150 throw e
151 }
152 }
153
154 public final override fun trySend(element: E): ChannelResult<Unit> {
155 val result = offerInternal(element)
156 return when {
157 result === OFFER_SUCCESS -> ChannelResult.success(Unit)
158 result === OFFER_FAILED -> {
159 // We should check for closed token on trySend as well, otherwise trySend won't be linearizable
160 // in the face of concurrent close()
161 // See https://github.com/Kotlin/kotlinx.coroutines/issues/359
162 val closedForSend = closedForSend ?: return ChannelResult.failure()
163 ChannelResult.closed(helpCloseAndGetSendException(closedForSend))
164 }
165 result is Closed<*> -> {
166 ChannelResult.closed(helpCloseAndGetSendException(result))
167 }
168 else -> error("trySend returned $result")
169 }
170 }
171
172 private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable {
173 helpClose(closed)
174 return closed.sendException
175 }
176
177 private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable {
178 // To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
179 // See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
180 helpClose(closed)
181 // Element was not delivered -> cals onUndeliveredElement
182 onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
183 // If it crashes, add send exception as suppressed for better diagnostics
184 it.addSuppressed(closed.sendException)
185 throw it
186 }
187 return closed.sendException
188 }
189
190 private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->
191 loop@ while (true) {
192 if (isFullImpl) {
193 val send = if (onUndeliveredElement == null)
194 SendElement(element, cont) else
195 SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)
196 val enqueueResult = enqueueSend(send)
197 when {
198 enqueueResult == null -> { // enqueued successfully
199 cont.removeOnCancellation(send)
200 return@sc
201 }
202 enqueueResult is Closed<*> -> {
203 cont.helpCloseAndResumeWithSendException(element, enqueueResult)
204 return@sc
205 }
206 enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
207 enqueueResult is Receive<*> -> {} // try to offer instead
208 else -> error("enqueueSend returned $enqueueResult")
209 }
210 }
211 // hm... receiver is waiting or buffer is not full. try to offer
212 val offerResult = offerInternal(element)
213 when {
214 offerResult === OFFER_SUCCESS -> {
215 cont.resume(Unit)
216 return@sc
217 }
218 offerResult === OFFER_FAILED -> continue@loop
219 offerResult is Closed<*> -> {
220 cont.helpCloseAndResumeWithSendException(element, offerResult)
221 return@sc
222 }
223 else -> error("offerInternal returned $offerResult")
224 }
225 }
226 }
227
228 private fun Continuation<*>.helpCloseAndResumeWithSendException(element: E, closed: Closed<*>) {
229 helpClose(closed)
230 val sendException = closed.sendException
231 onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
232 it.addSuppressed(sendException)
233 resumeWithException(it)
234 return
235 }
236 resumeWithException(sendException)
237 }
238
239 /**
240 * Result is:
241 * * null -- successfully enqueued
242 * * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
243 * * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
244 */
245 protected open fun enqueueSend(send: Send): Any? {
246 if (isBufferAlwaysFull) {
247 queue.addLastIfPrev(send) { prev ->
248 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
249 true
250 }
251 } else {
252 if (!queue.addLastIfPrevAndIf(send, { prev ->
253 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
254 true
255 }, { isBufferFull }))
256 return ENQUEUE_FAILED
257 }
258 return null
259 }
260
261 public override fun close(cause: Throwable?): Boolean {
262 val closed = Closed<E>(cause)
263 /*
264 * Try to commit close by adding a close token to the end of the queue.
265 * Successful -> we're now responsible for closing receivers
266 * Not successful -> help closing pending receivers to maintain invariant
267 * "if (!close()) next send will throw"
268 */
269 val closeAdded = queue.addLastIfPrev(closed) { it !is Closed<*> }
270 val actuallyClosed = if (closeAdded) closed else queue.prevNode as Closed<*>
271 helpClose(actuallyClosed)
272 if (closeAdded) invokeOnCloseHandler(cause)
273 return closeAdded // true if we have closed
274 }
275
276 private fun invokeOnCloseHandler(cause: Throwable?) {
277 val handler = onCloseHandler.value
278 if (handler !== null && handler !== HANDLER_INVOKED
279 && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
280 // CAS failed -> concurrent invokeOnClose() invoked handler
281 @Suppress("UNCHECKED_CAST")
282 (handler as Handler)(cause)
283 }
284 }
285
286 override fun invokeOnClose(handler: Handler) {
287 // Intricate dance for concurrent invokeOnClose and close calls
288 if (!onCloseHandler.compareAndSet(null, handler)) {
289 val value = onCloseHandler.value
290 if (value === HANDLER_INVOKED) {
291 throw IllegalStateException("Another handler was already registered and successfully invoked")
292 }
293
294 throw IllegalStateException("Another handler was already registered: $value")
295 } else {
296 val closedToken = closedForSend
297 if (closedToken != null && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
298 // CAS failed -> close() call invoked handler
299 (handler)(closedToken.closeCause)
300 }
301 }
302 }
303
304 private fun helpClose(closed: Closed<*>) {
305 /*
306 * It's important to traverse list from right to left to avoid races with sender.
307 * Consider channel state: head -> [receive_1] -> [receive_2] -> head
308 * - T1 calls receive()
309 * - T2 calls close()
310 * - T3 calls close() + send(value)
311 *
312 * If both will traverse list from left to right, following non-linearizable history is possible:
313 * [close -> false], [send -> transferred 'value' to receiver]
314 *
315 * Another problem with linearizability of close is that we cannot resume closed receives until all
316 * receivers are removed from the list.
317 * Consider channel state: head -> [receive_1] -> [receive_2] -> head
318 * - T1 called receive_2, and will call send() when it's receive call resumes
319 * - T2 calls close()
320 *
321 * Now if T2's close resumes T1's receive_2 then it's receive gets "closed for receive" exception, but
322 * its subsequent attempt to send successfully rendezvous with receive_1, producing non-linearizable execution.
323 */
324 var closedList = InlineList<Receive<E>>()
325 while (true) {
326 // Break when channel is empty or has no receivers
327 @Suppress("UNCHECKED_CAST")
328 val previous = closed.prevNode as? Receive<E> ?: break
329 if (!previous.remove()) {
330 // failed to remove the node (due to race) -- retry finding non-removed prevNode
331 // NOTE: remove() DOES NOT help pending remove operation (that marked next pointer)
332 previous.helpRemove() // make sure remove is complete before continuing
333 continue
334 }
335 // add removed nodes to a separate list
336 closedList += previous
337 }
338 /*
339 * Now notify all removed nodes that the channel was closed
340 * in the order they were added to the channel
341 */
342 closedList.forEachReversed { it.resumeReceiveClosed(closed) }
343 // and do other post-processing
344 onClosedIdempotent(closed)
345 }
346
347 /**
348 * Invoked when channel is closed as the last action of [close] invocation.
349 * This method should be idempotent and can be called multiple times.
350 */
351 protected open fun onClosedIdempotent(closed: LockFreeLinkedListNode) {}
352
353 /**
354 * Retrieves first receiving waiter from the queue or returns closed token.
355 * @suppress **This is unstable API and it is subject to change.**
356 */
357 protected open fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
358 queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
359
360 // ------ registerSelectSend ------
361
362 /**
363 * @suppress **This is unstable API and it is subject to change.**
364 */
365 protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
366
367 /**
368 * @suppress **This is unstable API and it is subject to change.**
369 */
370 protected class TryOfferDesc<E>(
371 @JvmField val element: E,
372 queue: LockFreeLinkedListHead
373 ) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
374 override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
375 is Closed<*> -> affected
376 !is ReceiveOrClosed<*> -> OFFER_FAILED
377 else -> null
378 }
379
380 @Suppress("UNCHECKED_CAST")
381 override fun onPrepare(prepareOp: PrepareOp): Any? {
382 val affected = prepareOp.affected as ReceiveOrClosed<E> // see "failure" impl
383 val token = affected.tryResumeReceive(element, prepareOp) ?: return REMOVE_PREPARED
384 if (token === RETRY_ATOMIC) return RETRY_ATOMIC
385 assert { token === RESUME_TOKEN }
386 return null
387 }
388 }
389
390 final override val onSend: SelectClause2<E, SendChannel<E>>
391 get() = object : SelectClause2<E, SendChannel<E>> {
392 override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
393 registerSelectSend(select, param, block)
394 }
395 }
396
397 private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
398 while (true) {
399 if (select.isSelected) return
400 if (isFullImpl) {
401 val node = SendSelect(element, this, select, block)
402 val enqueueResult = enqueueSend(node)
403 when {
404 enqueueResult == null -> { // enqueued successfully
405 select.disposeOnSelect(node)
406 return
407 }
408 enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(element, enqueueResult))
409 enqueueResult === ENQUEUE_FAILED -> {} // try to offer
410 enqueueResult is Receive<*> -> {} // try to offer
411 else -> error("enqueueSend returned $enqueueResult ")
412 }
413 }
414 // hm... receiver is waiting or buffer is not full. try to offer
415 val offerResult = offerSelectInternal(element, select)
416 when {
417 offerResult === ALREADY_SELECTED -> return
418 offerResult === OFFER_FAILED -> {} // retry
419 offerResult === RETRY_ATOMIC -> {} // retry
420 offerResult === OFFER_SUCCESS -> {
421 block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
422 return
423 }
424 offerResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(element, offerResult))
425 else -> error("offerSelectInternal returned $offerResult")
426 }
427 }
428 }
429
430 // ------ debug ------
431
432 public override fun toString() =
433 "$classSimpleName@$hexAddress{$queueDebugStateString}$bufferDebugString"
434
435 private val queueDebugStateString: String
436 get() {
437 val head = queue.nextNode
438 if (head === queue) return "EmptyQueue"
439 var result = when (head) {
440 is Closed<*> -> head.toString()
441 is Receive<*> -> "ReceiveQueued"
442 is Send -> "SendQueued"
443 else -> "UNEXPECTED:$head" // should not happen
444 }
445 val tail = queue.prevNode
446 if (tail !== head) {
447 result += ",queueSize=${countQueueSize()}"
448 if (tail is Closed<*>) result += ",closedForSend=$tail"
449 }
450 return result
451 }
452
453 private fun countQueueSize(): Int {
454 var size = 0
455 queue.forEach<LockFreeLinkedListNode> { size++ }
456 return size
457 }
458
459 protected open val bufferDebugString: String get() = ""
460
461 // ------ private ------
462
463 private class SendSelect<E, R>(
464 override val pollResult: E, // E | Closed - the result pollInternal returns when it rendezvous with this node
465 @JvmField val channel: AbstractSendChannel<E>,
466 @JvmField val select: SelectInstance<R>,
467 @JvmField val block: suspend (SendChannel<E>) -> R
468 ) : Send(), DisposableHandle {
469 override fun tryResumeSend(otherOp: PrepareOp?): Symbol? =
470 select.trySelectOther(otherOp) as Symbol? // must return symbol
471
472 override fun completeResumeSend() {
473 block.startCoroutineCancellable(receiver = channel, completion = select.completion)
474 }
475
476 override fun dispose() { // invoked on select completion
477 if (!remove()) return
478 // if the node was successfully removed (meaning it was added but was not received) then element not delivered
479 undeliveredElement()
480 }
481
482 override fun resumeSendClosed(closed: Closed<*>) {
483 if (select.trySelect())
484 select.resumeSelectWithException(closed.sendException)
485 }
486
487 override fun undeliveredElement() {
488 channel.onUndeliveredElement?.callUndeliveredElement(pollResult, select.completion.context)
489 }
490
491 override fun toString(): String = "SendSelect@$hexAddress($pollResult)[$channel, $select]"
492 }
493
494 internal class SendBuffered<out E>(
495 @JvmField val element: E
496 ) : Send() {
497 override val pollResult: Any? get() = element
498 override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
499 override fun completeResumeSend() {}
500
501 /**
502 * This method should be never called, see special logic in [LinkedListChannel.onCancelIdempotentList].
503 */
504 override fun resumeSendClosed(closed: Closed<*>) {
505 assert { false }
506 }
507
508 override fun toString(): String = "SendBuffered@$hexAddress($element)"
509 }
510 }
511
512 /**
513 * Abstract send/receive channel. It is a base class for all channel implementations.
514 */
515 internal abstract class AbstractChannel<E>(
516 onUndeliveredElement: OnUndeliveredElement<E>?
517 ) : AbstractSendChannel<E>(onUndeliveredElement), Channel<E> {
518 // ------ extension points for buffered channels ------
519
520 /**
521 * Returns `true` if [isBufferEmpty] is always `true`.
522 * @suppress **This is unstable API and it is subject to change.**
523 */
524 protected abstract val isBufferAlwaysEmpty: Boolean
525
526 /**
527 * Returns `true` if this channel's buffer is empty.
528 * This operation should be atomic if it is invoked by [enqueueReceive].
529 * @suppress **This is unstable API and it is subject to change.**
530 */
531 protected abstract val isBufferEmpty: Boolean
532
533 // ------ internal functions for override by buffered channels ------
534
535 /**
536 * Tries to remove element from buffer or from queued sender.
537 * Return type is `E | POLL_FAILED | Closed`
538 * @suppress **This is unstable API and it is subject to change.**
539 */
pollInternalnull540 protected open fun pollInternal(): Any? {
541 while (true) {
542 val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
543 val token = send.tryResumeSend(null)
544 if (token != null) {
545 assert { token === RESUME_TOKEN }
546 send.completeResumeSend()
547 return send.pollResult
548 }
549 // too late, already cancelled, but we removed it from the queue and need to notify on undelivered element
550 send.undeliveredElement()
551 }
552 }
553
554 /**
555 * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
556 * Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
557 * @suppress **This is unstable API and it is subject to change.**
558 */
pollSelectInternalnull559 protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
560 // poll atomically with select
561 val pollOp = describeTryPoll()
562 val failure = select.performAtomicTrySelect(pollOp)
563 if (failure != null) return failure
564 val send = pollOp.result
565 send.completeResumeSend()
566 return pollOp.result.pollResult
567 }
568
569 // ------ state functions & helpers for concrete implementations ------
570
571 /**
572 * @suppress **This is unstable API and it is subject to change.**
573 */
574 protected val hasReceiveOrClosed: Boolean get() = queue.nextNode is ReceiveOrClosed<*>
575
576 // ------ ReceiveChannel ------
577
578 public override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
579 public override val isEmpty: Boolean get() = isEmptyImpl
580 protected val isEmptyImpl: Boolean get() = queue.nextNode !is Send && isBufferEmpty
581
receivenull582 public final override suspend fun receive(): E {
583 // fast path -- try poll non-blocking
584 val result = pollInternal()
585 /*
586 * If result is Closed -- go to tail-call slow-path that will allow us to
587 * properly recover stacktrace without paying a performance cost on fast path.
588 * We prefer to recover stacktrace using suspending path to have a more precise stacktrace.
589 */
590 @Suppress("UNCHECKED_CAST")
591 if (result !== POLL_FAILED && result !is Closed<*>) return result as E
592 // slow-path does suspend
593 return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)
594 }
595
596 @Suppress("UNCHECKED_CAST")
receiveSuspendnull597 private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable sc@ { cont ->
598 val receive = if (onUndeliveredElement == null)
599 ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) else
600 ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement)
601 while (true) {
602 if (enqueueReceive(receive)) {
603 removeReceiveOnCancel(cont, receive)
604 return@sc
605 }
606 // hm... something is not right. try to poll
607 val result = pollInternal()
608 if (result is Closed<*>) {
609 receive.resumeReceiveClosed(result)
610 return@sc
611 }
612 if (result !== POLL_FAILED) {
613 cont.resume(receive.resumeValue(result as E), receive.resumeOnCancellationFun(result as E))
614 return@sc
615 }
616 }
617 }
618
enqueueReceiveInternalnull619 protected open fun enqueueReceiveInternal(receive: Receive<E>): Boolean = if (isBufferAlwaysEmpty)
620 queue.addLastIfPrev(receive) { it !is Send } else
<lambda>null621 queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
622
resultnull623 private fun enqueueReceive(receive: Receive<E>) = enqueueReceiveInternal(receive).also { result ->
624 if (result) onReceiveEnqueued()
625 }
626
627 @Suppress("UNCHECKED_CAST")
receiveCatchingnull628 public final override suspend fun receiveCatching(): ChannelResult<E> {
629 // fast path -- try poll non-blocking
630 val result = pollInternal()
631 if (result !== POLL_FAILED) return result.toResult()
632 // slow-path does suspend
633 return receiveSuspend(RECEIVE_RESULT)
634 }
635
636 @Suppress("UNCHECKED_CAST")
tryReceivenull637 public final override fun tryReceive(): ChannelResult<E> {
638 val result = pollInternal()
639 if (result === POLL_FAILED) return ChannelResult.failure()
640 if (result is Closed<*>) return ChannelResult.closed(result.closeCause)
641 return ChannelResult.success(result as E)
642 }
643
644 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
cancelnull645 final override fun cancel(cause: Throwable?): Boolean =
646 cancelInternal(cause)
647
648 final override fun cancel(cause: CancellationException?) {
649 /*
650 * Do not create an exception if channel is already cancelled.
651 * Channel is closed for receive when either it is cancelled (then we are free to bail out)
652 * or was closed and elements were received.
653 * Then `onCancelIdempotent` does nothing for all implementations.
654 */
655 if (isClosedForReceive) return
656 cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
657 }
658
659 // It needs to be internal to support deprecated cancel(Throwable?) API
cancelInternalnull660 internal fun cancelInternal(cause: Throwable?): Boolean =
661 close(cause).also {
662 onCancelIdempotent(it)
663 }
664
665 /**
666 * Method that is invoked right after [close] in [cancel] sequence.
667 * [wasClosed] is directly mapped to the value returned by [close].
668 */
onCancelIdempotentnull669 protected open fun onCancelIdempotent(wasClosed: Boolean) {
670 /*
671 * See the comment to helpClose, all these machinery (reversed order of iteration, postponed resume)
672 * has the same rationale.
673 */
674 val closed = closedForSend ?: error("Cannot happen")
675 var list = InlineList<Send>()
676 while (true) {
677 val previous = closed.prevNode
678 if (previous is LockFreeLinkedListHead) {
679 break
680 }
681 assert { previous is Send }
682 if (!previous.remove()) {
683 previous.helpRemove() // make sure remove is complete before continuing
684 continue
685 }
686 // Add to the list only **after** successful removal
687 list += previous as Send
688 }
689 onCancelIdempotentList(list, closed)
690 }
691
692 /**
693 * This method is overridden by [LinkedListChannel] to handle cancellation of [SendBuffered] elements from the list.
694 */
onCancelIdempotentListnull695 protected open fun onCancelIdempotentList(list: InlineList<Send>, closed: Closed<*>) {
696 list.forEachReversed { it.resumeSendClosed(closed) }
697 }
698
iteratornull699 public final override fun iterator(): ChannelIterator<E> = Itr(this)
700
701 // ------ registerSelectReceive ------
702
703 /**
704 * @suppress **This is unstable API and it is subject to change.**
705 */
706 protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
707
708 /**
709 * @suppress **This is unstable API and it is subject to change.**
710 */
711 protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
712 override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
713 is Closed<*> -> affected
714 !is Send -> POLL_FAILED
715 else -> null
716 }
717
718 @Suppress("UNCHECKED_CAST")
719 override fun onPrepare(prepareOp: PrepareOp): Any? {
720 val affected = prepareOp.affected as Send // see "failure" impl
721 val token = affected.tryResumeSend(prepareOp) ?: return REMOVE_PREPARED
722 if (token === RETRY_ATOMIC) return RETRY_ATOMIC
723 assert { token === RESUME_TOKEN }
724 return null
725 }
726
727 override fun onRemoved(affected: LockFreeLinkedListNode) {
728 // Called when we removed it from the queue but were too late to resume, so we have undelivered element
729 (affected as Send).undeliveredElement()
730 }
731 }
732
733 final override val onReceive: SelectClause1<E>
734 get() = object : SelectClause1<E> {
735 @Suppress("UNCHECKED_CAST")
registerSelectClause1null736 override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
737 registerSelectReceiveMode(select, RECEIVE_THROWS_ON_CLOSE, block as suspend (Any?) -> R)
738 }
739 }
740
741 final override val onReceiveCatching: SelectClause1<ChannelResult<E>>
742 get() = object : SelectClause1<ChannelResult<E>> {
743 @Suppress("UNCHECKED_CAST")
registerSelectClause1null744 override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ChannelResult<E>) -> R) {
745 registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R)
746 }
747 }
748
registerSelectReceiveModenull749 private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {
750 while (true) {
751 if (select.isSelected) return
752 if (isEmptyImpl) {
753 if (enqueueReceiveSelect(select, block, receiveMode)) return
754 } else {
755 val pollResult = pollSelectInternal(select)
756 when {
757 pollResult === ALREADY_SELECTED -> return
758 pollResult === POLL_FAILED -> {} // retry
759 pollResult === RETRY_ATOMIC -> {} // retry
760 else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult)
761 }
762 }
763 }
764 }
765
tryStartBlockUninterceptednull766 private fun <R> (suspend (Any?) -> R).tryStartBlockUnintercepted(select: SelectInstance<R>, receiveMode: Int, value: Any?) {
767 when (value) {
768 is Closed<*> -> {
769 when (receiveMode) {
770 RECEIVE_THROWS_ON_CLOSE -> {
771 throw recoverStackTrace(value.receiveException)
772 }
773 RECEIVE_RESULT -> {
774 if (!select.trySelect()) return
775 startCoroutineUnintercepted(ChannelResult.closed<Any>(value.closeCause), select.completion)
776 }
777 }
778 }
779 else -> {
780 if (receiveMode == RECEIVE_RESULT) {
781 startCoroutineUnintercepted(value.toResult<Any>(), select.completion)
782 } else {
783 startCoroutineUnintercepted(value, select.completion)
784 }
785 }
786 }
787 }
788
enqueueReceiveSelectnull789 private fun <R> enqueueReceiveSelect(
790 select: SelectInstance<R>,
791 block: suspend (Any?) -> R,
792 receiveMode: Int
793 ): Boolean {
794 val node = ReceiveSelect(this, select, block, receiveMode)
795 val result = enqueueReceive(node)
796 if (result) select.disposeOnSelect(node)
797 return result
798 }
799
800 // ------ protected ------
801
takeFirstReceiveOrPeekClosednull802 override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
803 super.takeFirstReceiveOrPeekClosed().also {
804 if (it != null && it !is Closed<*>) onReceiveDequeued()
805 }
806
807 /**
808 * Invoked when receiver is successfully enqueued to the queue of waiting receivers.
809 * @suppress **This is unstable API and it is subject to change.**
810 */
onReceiveEnqueuednull811 protected open fun onReceiveEnqueued() {}
812
813 /**
814 * Invoked when enqueued receiver was successfully removed from the queue of waiting receivers.
815 * @suppress **This is unstable API and it is subject to change.**
816 */
onReceiveDequeuednull817 protected open fun onReceiveDequeued() {}
818
819 // ------ private ------
820
removeReceiveOnCancelnull821 private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) =
822 cont.invokeOnCancellation(handler = RemoveReceiveOnCancel(receive).asHandler)
823
824 private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : BeforeResumeCancelHandler() {
825 override fun invoke(cause: Throwable?) {
826 if (receive.remove())
827 onReceiveDequeued()
828 }
829 override fun toString(): String = "RemoveReceiveOnCancel[$receive]"
830 }
831
832 private class Itr<E>(@JvmField val channel: AbstractChannel<E>) : ChannelIterator<E> {
833 var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
834
hasNextnull835 override suspend fun hasNext(): Boolean {
836 // check for repeated hasNext
837 if (result !== POLL_FAILED) return hasNextResult(result)
838 // fast path -- try poll non-blocking
839 result = channel.pollInternal()
840 if (result !== POLL_FAILED) return hasNextResult(result)
841 // slow-path does suspend
842 return hasNextSuspend()
843 }
844
hasNextResultnull845 private fun hasNextResult(result: Any?): Boolean {
846 if (result is Closed<*>) {
847 if (result.closeCause != null) throw recoverStackTrace(result.receiveException)
848 return false
849 }
850 return true
851 }
852
hasNextSuspendnull853 private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutineReusable sc@ { cont ->
854 val receive = ReceiveHasNext(this, cont)
855 while (true) {
856 if (channel.enqueueReceive(receive)) {
857 channel.removeReceiveOnCancel(cont, receive)
858 return@sc
859 }
860 // hm... something is not right. try to poll
861 val result = channel.pollInternal()
862 this.result = result
863 if (result is Closed<*>) {
864 if (result.closeCause == null)
865 cont.resume(false)
866 else
867 cont.resumeWithException(result.receiveException)
868 return@sc
869 }
870 if (result !== POLL_FAILED) {
871 @Suppress("UNCHECKED_CAST")
872 cont.resume(true, channel.onUndeliveredElement?.bindCancellationFun(result as E, cont.context))
873 return@sc
874 }
875 }
876 }
877
878 @Suppress("UNCHECKED_CAST")
nextnull879 override fun next(): E {
880 val result = this.result
881 if (result is Closed<*>) throw recoverStackTrace(result.receiveException)
882 if (result !== POLL_FAILED) {
883 this.result = POLL_FAILED
884 return result as E
885 }
886
887 throw IllegalStateException("'hasNext' should be called prior to 'next' invocation")
888 }
889 }
890
891 private open class ReceiveElement<in E>(
892 @JvmField val cont: CancellableContinuation<Any?>,
893 @JvmField val receiveMode: Int
894 ) : Receive<E>() {
resumeValuenull895 fun resumeValue(value: E): Any? = when (receiveMode) {
896 RECEIVE_RESULT -> ChannelResult.success(value)
897 else -> value
898 }
899
tryResumeReceivenull900 override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
901 val token = cont.tryResume(resumeValue(value), otherOp?.desc, resumeOnCancellationFun(value)) ?: return null
902 assert { token === RESUME_TOKEN } // the only other possible result
903 // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
904 otherOp?.finishPrepare()
905 return RESUME_TOKEN
906 }
907
completeResumeReceivenull908 override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
909
910 override fun resumeReceiveClosed(closed: Closed<*>) {
911 when {
912 receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
913 else -> cont.resumeWithException(closed.receiveException)
914 }
915 }
toStringnull916 override fun toString(): String = "ReceiveElement@$hexAddress[receiveMode=$receiveMode]"
917 }
918
919 private class ReceiveElementWithUndeliveredHandler<in E>(
920 cont: CancellableContinuation<Any?>,
921 receiveMode: Int,
922 @JvmField val onUndeliveredElement: OnUndeliveredElement<E>
923 ) : ReceiveElement<E>(cont, receiveMode) {
924 override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
925 onUndeliveredElement.bindCancellationFun(value, cont.context)
926 }
927
928 private open class ReceiveHasNext<E>(
929 @JvmField val iterator: Itr<E>,
930 @JvmField val cont: CancellableContinuation<Boolean>
931 ) : Receive<E>() {
tryResumeReceivenull932 override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
933 val token = cont.tryResume(true, otherOp?.desc, resumeOnCancellationFun(value))
934 ?: return null
935 assert { token === RESUME_TOKEN } // the only other possible result
936 // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
937 otherOp?.finishPrepare()
938 return RESUME_TOKEN
939 }
940
completeResumeReceivenull941 override fun completeResumeReceive(value: E) {
942 /*
943 When otherOp != null invocation of tryResumeReceive can happen multiple times and much later,
944 but completeResumeReceive is called once so we set iterator result here.
945 */
946 iterator.result = value
947 cont.completeResume(RESUME_TOKEN)
948 }
949
resumeReceiveClosednull950 override fun resumeReceiveClosed(closed: Closed<*>) {
951 val token = if (closed.closeCause == null) {
952 cont.tryResume(false)
953 } else {
954 cont.tryResumeWithException(closed.receiveException)
955 }
956 if (token != null) {
957 iterator.result = closed
958 cont.completeResume(token)
959 }
960 }
961
resumeOnCancellationFunnull962 override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
963 iterator.channel.onUndeliveredElement?.bindCancellationFun(value, cont.context)
964
965 override fun toString(): String = "ReceiveHasNext@$hexAddress"
966 }
967
968 private class ReceiveSelect<R, E>(
969 @JvmField val channel: AbstractChannel<E>,
970 @JvmField val select: SelectInstance<R>,
971 @JvmField val block: suspend (Any?) -> R,
972 @JvmField val receiveMode: Int
973 ) : Receive<E>(), DisposableHandle {
974 override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =
975 select.trySelectOther(otherOp) as Symbol?
976
977 @Suppress("UNCHECKED_CAST")
978 override fun completeResumeReceive(value: E) {
979 block.startCoroutineCancellable(
980 if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
981 select.completion,
982 resumeOnCancellationFun(value)
983 )
984 }
985
986 override fun resumeReceiveClosed(closed: Closed<*>) {
987 if (!select.trySelect()) return
988 when (receiveMode) {
989 RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
990 RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed<R>(closed.closeCause), select.completion)
991 }
992 }
993
994 override fun dispose() { // invoked on select completion
995 if (remove())
996 channel.onReceiveDequeued() // notify cancellation of receive
997 }
998
999 override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
1000 channel.onUndeliveredElement?.bindCancellationFun(value, select.completion.context)
1001
1002 override fun toString(): String = "ReceiveSelect@$hexAddress[$select,receiveMode=$receiveMode]"
1003 }
1004 }
1005
1006 // receiveMode values
1007 internal const val RECEIVE_THROWS_ON_CLOSE = 0
1008 internal const val RECEIVE_RESULT = 1
1009
1010 @JvmField
1011 @SharedImmutable
1012 internal val EMPTY = Symbol("EMPTY") // marker for Conflated & Buffered channels
1013
1014 @JvmField
1015 @SharedImmutable
1016 internal val OFFER_SUCCESS = Symbol("OFFER_SUCCESS")
1017
1018 @JvmField
1019 @SharedImmutable
1020 internal val OFFER_FAILED = Symbol("OFFER_FAILED")
1021
1022 @JvmField
1023 @SharedImmutable
1024 internal val POLL_FAILED = Symbol("POLL_FAILED")
1025
1026 @JvmField
1027 @SharedImmutable
1028 internal val ENQUEUE_FAILED = Symbol("ENQUEUE_FAILED")
1029
1030 @JvmField
1031 @SharedImmutable
1032 internal val HANDLER_INVOKED = Symbol("ON_CLOSE_HANDLER_INVOKED")
1033
1034 internal typealias Handler = (Throwable?) -> Unit
1035
1036 /**
1037 * Represents sending waiter in the queue.
1038 */
1039 internal abstract class Send : LockFreeLinkedListNode() {
1040 abstract val pollResult: Any? // E | Closed - the result pollInternal returns when it rendezvous with this node
1041 // Returns: null - failure,
1042 // RETRY_ATOMIC for retry (only when otherOp != null),
1043 // RESUME_TOKEN on success (call completeResumeSend)
1044 // Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
tryResumeSendnull1045 abstract fun tryResumeSend(otherOp: PrepareOp?): Symbol?
1046 abstract fun completeResumeSend()
1047 abstract fun resumeSendClosed(closed: Closed<*>)
1048 open fun undeliveredElement() {}
1049 }
1050
1051 /**
1052 * Represents receiver waiter in the queue or closed token.
1053 */
1054 internal interface ReceiveOrClosed<in E> {
1055 val offerResult: Any // OFFER_SUCCESS | Closed
1056 // Returns: null - failure,
1057 // RETRY_ATOMIC for retry (only when otherOp != null),
1058 // RESUME_TOKEN on success (call completeResumeReceive)
1059 // Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
tryResumeReceivenull1060 fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol?
1061 fun completeResumeReceive(value: E)
1062 }
1063
1064 /**
1065 * Represents sender for a specific element.
1066 */
1067 internal open class SendElement<E>(
1068 override val pollResult: E,
1069 @JvmField val cont: CancellableContinuation<Unit>
1070 ) : Send() {
1071 override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
1072 val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
1073 assert { token === RESUME_TOKEN } // the only other possible result
1074 // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
1075 otherOp?.finishPrepare() // finish preparations
1076 return RESUME_TOKEN
1077 }
1078
1079 override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
1080 override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
1081 override fun toString(): String = "$classSimpleName@$hexAddress($pollResult)"
1082 }
1083
1084 internal class SendElementWithUndeliveredHandler<E>(
1085 pollResult: E,
1086 cont: CancellableContinuation<Unit>,
1087 @JvmField val onUndeliveredElement: OnUndeliveredElement<E>
1088 ) : SendElement<E>(pollResult, cont) {
removenull1089 override fun remove(): Boolean {
1090 if (!super.remove()) return false
1091 // if the node was successfully removed (meaning it was added but was not received) then we have undelivered element
1092 undeliveredElement()
1093 return true
1094 }
1095
undeliveredElementnull1096 override fun undeliveredElement() {
1097 onUndeliveredElement.callUndeliveredElement(pollResult, cont.context)
1098 }
1099 }
1100
1101 /**
1102 * Represents closed channel.
1103 */
1104 internal class Closed<in E>(
1105 @JvmField val closeCause: Throwable?
1106 ) : Send(), ReceiveOrClosed<E> {
1107 val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
1108 val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1109
1110 override val offerResult get() = this
1111 override val pollResult get() = this
tryResumeSendnull1112 override fun tryResumeSend(otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
completeResumeSendnull1113 override fun completeResumeSend() {}
<lambda>null1114 override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
completeResumeReceivenull1115 override fun completeResumeReceive(value: E) {}
resumeSendClosednull1116 override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
toStringnull1117 override fun toString(): String = "Closed@$hexAddress[$closeCause]"
1118 }
1119
1120 internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
1121 override val offerResult get() = OFFER_SUCCESS
1122 abstract fun resumeReceiveClosed(closed: Closed<*>)
1123 open fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? = null
1124 }
1125
1126 @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
toResultnull1127 private inline fun <E> Any?.toResult(): ChannelResult<E> =
1128 if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E)
1129
1130 @Suppress("NOTHING_TO_INLINE")
1131 private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause)
1132