• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 @file:Suppress("PrivatePropertyName")
2 
3 package kotlinx.coroutines.channels
4 
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.channels.ChannelResult.Companion.closed
8 import kotlinx.coroutines.channels.ChannelResult.Companion.failure
9 import kotlinx.coroutines.channels.ChannelResult.Companion.success
10 import kotlinx.coroutines.internal.*
11 import kotlinx.coroutines.selects.*
12 import kotlinx.coroutines.selects.TrySelectDetailedResult.*
13 import kotlin.coroutines.*
14 import kotlin.js.*
15 import kotlin.jvm.*
16 import kotlin.math.*
17 import kotlin.reflect.*
18 
19 /**
20  * The buffered channel implementation, which also serves as a rendezvous channel when the capacity is zero.
21  * The high-level structure bases on a conceptually infinite array for storing elements and waiting requests,
22  * separate counters of [send] and [receive] invocations that were ever performed, and an additional counter
23  * that indicates the end of the logical buffer by counting the number of array cells it ever contained.
24  * The key idea is that both [send] and [receive] start by incrementing their counters, assigning the array cell
25  * referenced by the counter. In case of rendezvous channels, the operation either suspends and stores its continuation
26  * in the cell or makes a rendezvous with the opposite request. Each cell can be processed by exactly one [send] and
27  * one [receive]. As for buffered channels, [send]-s can also add elements without suspension if the logical buffer
28  * contains the cell, while the [receive] operation updates the end of the buffer when its synchronization finishes.
29  *
30  * Please see the ["Fast and Scalable Channels in Kotlin Coroutines"](https://arxiv.org/abs/2211.04986)
31  * paper by Nikita Koval, Roman Elizarov, and Dan Alistarh for the detailed algorithm description.
32  */
33 internal open class BufferedChannel<E>(
34     /**
35      * Channel capacity; `Channel.RENDEZVOUS` for rendezvous channel
36      * and `Channel.UNLIMITED` for unlimited capacity.
37      */
38     private val capacity: Int,
39     @JvmField
40     internal val onUndeliveredElement: OnUndeliveredElement<E>? = null
41 ) : Channel<E> {
42     init {
43         require(capacity >= 0) { "Invalid channel capacity: $capacity, should be >=0" }
44         // This implementation has second `init`.
45     }
46 
47     // Maintenance note: use `Buffered1ChannelLincheckTest` to check hypotheses.
48 
49     /*
50       The counters indicate the total numbers of send, receive, and buffer expansion calls
51       ever performed. The counters are incremented in the beginning of the corresponding
52       operation; thus, acquiring a unique (for the operation type) cell to process.
53       The segments reference to the last working one for each operation type.
54 
55       Notably, the counter for send is combined with the channel closing status
56       for synchronization simplicity and performance reasons.
57 
58       The logical end of the buffer is initialized with the channel capacity.
59       If the channel is rendezvous or unlimited, the counter equals `BUFFER_END_RENDEZVOUS`
60       or `BUFFER_END_RENDEZVOUS`, respectively, and never updates. The `bufferEndSegment`
61       point to a special `NULL_SEGMENT` in this case.
62      */
63     private val sendersAndCloseStatus = atomic(0L)
64     private val receivers = atomic(0L)
65     private val bufferEnd = atomic(initialBufferEnd(capacity))
66 
67     internal val sendersCounter: Long get() = sendersAndCloseStatus.value.sendersCounter
68     internal val receiversCounter: Long get() = receivers.value
69     private val bufferEndCounter: Long get() = bufferEnd.value
70 
71     /*
72       Additionally to the counters above, we need an extra one that
73       tracks the number of cells processed by `expandBuffer()`.
74       When a receiver aborts, the corresponding cell might be
75       physically removed from the data structure to avoid memory
76       leaks, while it still can be unprocessed by `expandBuffer()`.
77       In this case, `expandBuffer()` cannot know whether the
78       removed cell contained sender or receiver and, therefore,
79       cannot proceed. To solve the race, we ensure that cells
80       correspond to cancelled receivers cannot be physically
81       removed until the cell is processed.
82       This additional counter enables the synchronization,
83      */
84     private val completedExpandBuffersAndPauseFlag = atomic(bufferEndCounter)
85 
86     private val isRendezvousOrUnlimited
87         get() = bufferEndCounter.let { it == BUFFER_END_RENDEZVOUS || it == BUFFER_END_UNLIMITED }
88 
89     private val sendSegment: AtomicRef<ChannelSegment<E>>
90     private val receiveSegment: AtomicRef<ChannelSegment<E>>
91     private val bufferEndSegment: AtomicRef<ChannelSegment<E>>
92 
93     init {
94         @Suppress("LeakingThis")
95         val firstSegment = ChannelSegment(id = 0, prev = null, channel = this, pointers = 3)
96         sendSegment = atomic(firstSegment)
97         receiveSegment = atomic(firstSegment)
98         // If this channel is rendezvous or has unlimited capacity, the algorithm never
99         // invokes the buffer expansion procedure, and the corresponding segment reference
100         // points to a special `NULL_SEGMENT` one and never updates.
101         @Suppress("UNCHECKED_CAST")
102         bufferEndSegment = atomic(if (isRendezvousOrUnlimited) (NULL_SEGMENT as ChannelSegment<E>) else firstSegment)
103     }
104 
105     // #########################
106     // ## The send operations ##
107     // #########################
108 
109     override suspend fun send(element: E): Unit =
110         sendImpl( // <-- this is an inline function
111             element = element,
112             // Do not create a continuation until it is required;
113             // it is created later via [onNoWaiterSuspend], if needed.
114             waiter = null,
115             // Finish immediately if a rendezvous happens
116             // or the element has been buffered.
117             onRendezvousOrBuffered = {},
118             // As no waiter is provided, suspension is impossible.
119             onSuspend = { _, _ -> assert { false } },
120             // According to the `send(e)` contract, we need to call
121             // `onUndeliveredElement(..)` handler and throw an exception
122             // if the channel is already closed.
123             onClosed = { onClosedSend(element) },
124             // When `send(e)` decides to suspend, the corresponding
125             // `onNoWaiterSuspend` function that creates a continuation
126             // is called. The tail-call optimization is applied here.
127             onNoWaiterSuspend = { segm, i, elem, s -> sendOnNoWaiterSuspend(segm, i, elem, s) }
128         )
129 
130     // NB: return type could've been Nothing, but it breaks TCO
131     private suspend fun onClosedSend(element: E): Unit = suspendCancellableCoroutine { continuation ->
132         onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
133             // If it crashes, add send exception as suppressed for better diagnostics
134             it.addSuppressed(sendException)
135             continuation.resumeWithStackTrace(it)
136             return@suspendCancellableCoroutine
137         }
138         continuation.resumeWithStackTrace(sendException)
139     }
140 
141     private suspend fun sendOnNoWaiterSuspend(
142         /* The working cell is specified by
143         the segment and the index in it. */
144         segment: ChannelSegment<E>,
145         index: Int,
146         /** The element to be inserted. */
147         element: E,
148         /** The global index of the cell. */
149         s: Long
150     ) = suspendCancellableCoroutineReusable sc@{ cont ->
151         sendImplOnNoWaiter( // <-- this is an inline function
152             segment = segment, index = index, element = element, s = s,
153             // Store the created continuation as a waiter.
154             waiter = cont,
155             // If a rendezvous happens or the element has been buffered,
156             // resume the continuation and finish. In case of prompt
157             // cancellation, it is guaranteed that the element
158             // has been already buffered or passed to receiver.
159             onRendezvousOrBuffered = { cont.resume(Unit) },
160             // If the channel is closed, call `onUndeliveredElement(..)` and complete the
161             // continuation with the corresponding exception.
162             onClosed = { onClosedSendOnNoWaiterSuspend(element, cont) },
163         )
164     }
165 
166     private fun Waiter.prepareSenderForSuspension(
167         /* The working cell is specified by
168         the segment and the index in it. */
169         segment: ChannelSegment<E>,
170         index: Int
171     ) {
172         // To distinguish cancelled senders and receivers,
173         // senders equip the index value with an additional marker,
174         // adding `SEGMENT_SIZE` to the value.
175         invokeOnCancellation(segment, index + SEGMENT_SIZE)
176     }
177 
178     private fun onClosedSendOnNoWaiterSuspend(element: E, cont: CancellableContinuation<Unit>) {
179         onUndeliveredElement?.callUndeliveredElement(element, cont.context)
180         cont.resumeWithException(recoverStackTrace(sendException, cont))
181     }
182 
183     override fun trySend(element: E): ChannelResult<Unit> {
184         // Do not try to send the element if the plain `send(e)` operation would suspend.
185         if (shouldSendSuspend(sendersAndCloseStatus.value)) return failure()
186         // This channel either has waiting receivers or is closed.
187         // Let's try to send the element!
188         // The logic is similar to the plain `send(e)` operation, with
189         // the only difference that we install `INTERRUPTED_SEND` in case
190         // the operation decides to suspend.
191         return sendImpl( // <-- this is an inline function
192             element = element,
193             // Store an already interrupted sender in case of suspension.
194             waiter = INTERRUPTED_SEND,
195             // Finish successfully when a rendezvous happens
196             // or the element has been buffered.
197             onRendezvousOrBuffered = { success(Unit) },
198             // On suspension, the `INTERRUPTED_SEND` token has been installed,
199             // and this `trySend(e)` must fail. According to the contract,
200             // we do not need to call the [onUndeliveredElement] handler.
201             onSuspend = { segm, _ ->
202                 segm.onSlotCleaned()
203                 failure()
204             },
205             // If the channel is closed, return the corresponding result.
206             onClosed = { closed(sendException) }
207         )
208     }
209 
210     /**
211      * This is a special `send(e)` implementation that returns `true` if the element
212      * has been successfully sent, and `false` if the channel is closed.
213      *
214      * In case of coroutine cancellation, the element may be undelivered --
215      * the [onUndeliveredElement] feature is unsupported in this implementation.
216      *
217      */
218     internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutine { cont ->
219         check(onUndeliveredElement == null) {
220             "the `onUndeliveredElement` feature is unsupported for `sendBroadcast(e)`"
221         }
222         sendImpl(
223             element = element,
224             waiter = SendBroadcast(cont),
225             onRendezvousOrBuffered = { cont.resume(true) },
226             onSuspend = { _, _ -> },
227             onClosed = { cont.resume(false) }
228         )
229     }
230 
231     /**
232      * Specifies waiting [sendBroadcast] operation.
233      */
234     private class SendBroadcast(
235         val cont: CancellableContinuation<Boolean>
236     ) : Waiter by cont as CancellableContinuationImpl<Boolean>
237 
238     /**
239      * Abstract send implementation.
240      */
241     private inline fun <R> sendImpl(
242         /* The element to be sent. */
243         element: E,
244         /* The waiter to be stored in case of suspension,
245         or `null` if the waiter is not created yet.
246         In the latter case, when the algorithm decides
247         to suspend, [onNoWaiterSuspend] is called. */
248         waiter: Any?,
249         /* This lambda is invoked when the element has been
250         buffered or a rendezvous with a receiver happens. */
251         onRendezvousOrBuffered: () -> R,
252         /* This lambda is called when the operation suspends in the
253         cell specified by the segment and the index in it. */
254         onSuspend: (segm: ChannelSegment<E>, i: Int) -> R,
255         /* This lambda is called when the channel
256         is observed in the closed state. */
257         onClosed: () -> R,
258         /* This lambda is called when the operation decides
259         to suspend, but the waiter is not provided (equals `null`).
260         It should create a waiter and delegate to `sendImplOnNoWaiter`. */
261         onNoWaiterSuspend: (
262             segm: ChannelSegment<E>,
263             i: Int,
264             element: E,
265             s: Long
266         ) -> R = { _, _, _, _ -> error("unexpected") }
267     ): R {
268         // Read the segment reference before the counter increment;
269         // it is crucial to be able to find the required segment later.
270         var segment = sendSegment.value
271         while (true) {
272             // Atomically increment the `senders` counter and obtain the
273             // value right before the increment along with the close status.
274             val sendersAndCloseStatusCur = sendersAndCloseStatus.getAndIncrement()
275             val s = sendersAndCloseStatusCur.sendersCounter
276             // Is this channel already closed? Keep the information.
277             val closed = sendersAndCloseStatusCur.isClosedForSend0
278             // Count the required segment id and the cell index in it.
279             val id = s / SEGMENT_SIZE
280             val i = (s % SEGMENT_SIZE).toInt()
281             // Try to find the required segment if the initially obtained
282             // one (in the beginning of this function) has lower id.
283             if (segment.id != id) {
284                 // Find the required segment.
285                 segment = findSegmentSend(id, segment) ?:
286                     // The required segment has not been found.
287                     // Finish immediately if this channel is closed,
288                     // restarting the operation otherwise.
289                     // In the latter case, the required segment was full
290                     // of interrupted waiters and, therefore, removed
291                     // physically to avoid memory leaks.
292                     if (closed) {
293                         return onClosed()
294                     } else {
295                         continue
296                     }
297             }
298             // Update the cell according to the algorithm. Importantly, when
299             // the channel is already closed, storing a waiter is illegal, so
300             // the algorithm stores the `INTERRUPTED_SEND` token in this case.
301             when (updateCellSend(segment, i, element, s, waiter, closed)) {
302                 RESULT_RENDEZVOUS -> {
303                     // A rendezvous with a receiver has happened.
304                     // The previous segments are no longer needed
305                     // for the upcoming requests, so the algorithm
306                     // resets the link to the previous segment.
307                     segment.cleanPrev()
308                     return onRendezvousOrBuffered()
309                 }
310                 RESULT_BUFFERED -> {
311                     // The element has been buffered.
312                     return onRendezvousOrBuffered()
313                 }
314                 RESULT_SUSPEND -> {
315                     // The operation has decided to suspend and installed the
316                     // specified waiter. If the channel was already closed,
317                     // and the `INTERRUPTED_SEND` token has been installed as a waiter,
318                     // this request finishes with the `onClosed()` action.
319                     if (closed) {
320                         segment.onSlotCleaned()
321                         return onClosed()
322                     }
323                     (waiter as? Waiter)?.prepareSenderForSuspension(segment, i)
324                     return onSuspend(segment, i)
325                 }
326                 RESULT_CLOSED -> {
327                     // This channel is closed.
328                     // In case this segment is already or going to be
329                     // processed by a receiver, ensure that all the
330                     // previous segments are unreachable.
331                     if (s < receiversCounter) segment.cleanPrev()
332                     return onClosed()
333                 }
334                 RESULT_FAILED -> {
335                     // Either the cell stores an interrupted receiver,
336                     // or it was poisoned by a concurrent receiver.
337                     // In both cases, all the previous segments are already processed,
338                     segment.cleanPrev()
339                     continue
340                 }
341                 RESULT_SUSPEND_NO_WAITER -> {
342                     // The operation has decided to suspend,
343                     // but no waiter has been provided.
344                     return onNoWaiterSuspend(segment, i, element, s)
345                 }
346             }
347         }
348     }
349 
350     // Note: this function is temporarily moved from ConflatedBufferedChannel to BufferedChannel class, because of this issue: KT-65554.
351     // For now, an inline function, which invokes atomic operations, may only be called within a parent class.
352     protected fun trySendDropOldest(element: E): ChannelResult<Unit> =
353         sendImpl( // <-- this is an inline function
354             element = element,
355             // Put the element into the logical buffer even
356             // if this channel is already full, the `onSuspend`
357             // callback below extract the first (oldest) element.
358             waiter = BUFFERED,
359             // Finish successfully when a rendezvous has happened
360             // or the element has been buffered.
361             onRendezvousOrBuffered = { return success(Unit) },
362             // In case the algorithm decided to suspend, the element
363             // was added to the buffer. However, as the buffer is now
364             // overflowed, the first (oldest) element has to be extracted.
365             onSuspend = { segm, i ->
366                 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i)
367                 return success(Unit)
368             },
369             // If the channel is closed, return the corresponding result.
370             onClosed = { return closed(sendException) }
371         )
372 
373     private inline fun sendImplOnNoWaiter(
374         /* The working cell is specified by
375         the segment and the index in it. */
376         segment: ChannelSegment<E>,
377         index: Int,
378         /* The element to be sent. */
379         element: E,
380         /* The global index of the cell. */
381         s: Long,
382         /* The waiter to be stored in case of suspension. */
383         waiter: Waiter,
384         /* This lambda is invoked when the element has been
385         buffered or a rendezvous with a receiver happens.*/
386         onRendezvousOrBuffered: () -> Unit,
387         /* This lambda is called when the channel
388         is observed in the closed state. */
389         onClosed: () -> Unit,
390     ) {
391         // Update the cell again, now with the non-null waiter,
392         // restarting the operation from the beginning on failure.
393         // Check the `sendImpl(..)` function for the comments.
394         when (updateCellSend(segment, index, element, s, waiter, false)) {
395             RESULT_RENDEZVOUS -> {
396                 segment.cleanPrev()
397                 onRendezvousOrBuffered()
398             }
399             RESULT_BUFFERED -> {
400                 onRendezvousOrBuffered()
401             }
402             RESULT_SUSPEND -> {
403                 waiter.prepareSenderForSuspension(segment, index)
404             }
405             RESULT_CLOSED -> {
406                 if (s < receiversCounter) segment.cleanPrev()
407                 onClosed()
408             }
409             RESULT_FAILED -> {
410                 segment.cleanPrev()
411                 sendImpl(
412                     element = element,
413                     waiter = waiter,
414                     onRendezvousOrBuffered = onRendezvousOrBuffered,
415                     onSuspend = { _, _ -> },
416                     onClosed = onClosed,
417                 )
418             }
419             else -> error("unexpected")
420         }
421     }
422 
423     private fun updateCellSend(
424         /* The working cell is specified by
425         the segment and the index in it. */
426         segment: ChannelSegment<E>,
427         index: Int,
428         /* The element to be sent. */
429         element: E,
430         /* The global index of the cell. */
431         s: Long,
432         /* The waiter to be stored in case of suspension. */
433         waiter: Any?,
434         closed: Boolean
435     ): Int {
436         // This is a fast-path of `updateCellSendSlow(..)`.
437         //
438         // First, the algorithm stores the element,
439         // performing the synchronization after that.
440         // This way, receivers safely retrieve the
441         // element, following the safe publication pattern.
442         segment.storeElement(index, element)
443         if (closed) return updateCellSendSlow(segment, index, element, s, waiter, closed)
444         // Read the current cell state.
445         val state = segment.getState(index)
446         when {
447             // The cell is empty.
448             state === null -> {
449                 // If the element should be buffered, or a rendezvous should happen
450                 // while the receiver is still coming, try to buffer the element.
451                 // Otherwise, try to store the specified waiter in the cell.
452                 if (bufferOrRendezvousSend(s)) {
453                     // Move the cell state to `BUFFERED`.
454                     if (segment.casState(index, null, BUFFERED)) {
455                         // The element has been successfully buffered, finish.
456                         return RESULT_BUFFERED
457                     }
458                 } else {
459                     // This `send(e)` operation should suspend.
460                     // However, in case the channel has already
461                     // been observed closed, `INTERRUPTED_SEND`
462                     // is installed instead.
463                     if (waiter == null) {
464                         // The waiter is not specified; return the corresponding result.
465                         return RESULT_SUSPEND_NO_WAITER
466                     } else {
467                         // Try to install the waiter.
468                         if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
469                     }
470                 }
471             }
472             // A waiting receiver is stored in the cell.
473             state is Waiter -> {
474                 // As the element will be passed directly to the waiter,
475                 // the algorithm cleans the element slot in the cell.
476                 segment.cleanElement(index)
477                 // Try to make a rendezvous with the suspended receiver.
478                 return if (state.tryResumeReceiver(element)) {
479                     // Rendezvous! Move the cell state to `DONE_RCV` and finish.
480                     segment.setState(index, DONE_RCV)
481                     onReceiveDequeued()
482                     RESULT_RENDEZVOUS
483                 } else {
484                     // The resumption has failed. Update the cell state correspondingly
485                     // and clean the element field. It is also possible for a concurrent
486                     // cancellation handler to update the cell state; we can safely
487                     // ignore these updates.
488                     if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
489                         segment.onCancelledRequest(index, true)
490                     }
491                     RESULT_FAILED
492                 }
493             }
494         }
495         return updateCellSendSlow(segment, index, element, s, waiter, closed)
496     }
497 
498     /**
499      * Updates the working cell of an abstract send operation.
500      */
501     private fun updateCellSendSlow(
502         /* The working cell is specified by
503         the segment and the index in it. */
504         segment: ChannelSegment<E>,
505         index: Int,
506         /* The element to be sent. */
507         element: E,
508         /* The global index of the cell. */
509         s: Long,
510         /* The waiter to be stored in case of suspension. */
511         waiter: Any?,
512         closed: Boolean
513     ): Int {
514         // Then, the cell state should be updated according to
515         // its state machine; see the paper mentioned in the very
516         // beginning for the cell life-cycle and the algorithm details.
517         while (true) {
518             // Read the current cell state.
519             val state = segment.getState(index)
520             when {
521                 // The cell is empty.
522                 state === null -> {
523                     // If the element should be buffered, or a rendezvous should happen
524                     // while the receiver is still coming, try to buffer the element.
525                     // Otherwise, try to store the specified waiter in the cell.
526                     if (bufferOrRendezvousSend(s) && !closed) {
527                         // Move the cell state to `BUFFERED`.
528                         if (segment.casState(index, null, BUFFERED)) {
529                             // The element has been successfully buffered, finish.
530                             return RESULT_BUFFERED
531                         }
532                     } else {
533                         // This `send(e)` operation should suspend.
534                         // However, in case the channel has already
535                         // been observed closed, `INTERRUPTED_SEND`
536                         // is installed instead.
537                         when {
538                             // The channel is closed
539                             closed -> if (segment.casState(index, null, INTERRUPTED_SEND)) {
540                                 segment.onCancelledRequest(index, false)
541                                 return RESULT_CLOSED
542                             }
543                             // The waiter is not specified; return the corresponding result.
544                             waiter == null -> return RESULT_SUSPEND_NO_WAITER
545                             // Try to install the waiter.
546                             else -> if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
547                         }
548                     }
549                 }
550                 // This cell is in the logical buffer.
551                 state === IN_BUFFER -> {
552                     // Try to buffer the element.
553                     if (segment.casState(index, state, BUFFERED)) {
554                         // The element has been successfully buffered, finish.
555                         return RESULT_BUFFERED
556                     }
557                 }
558                 // The cell stores a cancelled receiver.
559                 state === INTERRUPTED_RCV -> {
560                     // Clean the element slot to avoid memory leaks and finish.
561                     segment.cleanElement(index)
562                     return RESULT_FAILED
563                 }
564                 // The cell is poisoned by a concurrent receive.
565                 state === POISONED -> {
566                     // Clean the element slot to avoid memory leaks and finish.
567                     segment.cleanElement(index)
568                     return RESULT_FAILED
569                 }
570                 // The channel is already closed.
571                 state === CHANNEL_CLOSED -> {
572                     // Clean the element slot to avoid memory leaks,
573                     // ensure that the closing/cancellation procedure
574                     // has been completed, and finish.
575                     segment.cleanElement(index)
576                     completeCloseOrCancel()
577                     return RESULT_CLOSED
578                 }
579                 // A waiting receiver is stored in the cell.
580                 else -> {
581                     assert { state is Waiter || state is WaiterEB }
582                     // As the element will be passed directly to the waiter,
583                     // the algorithm cleans the element slot in the cell.
584                     segment.cleanElement(index)
585                     // Unwrap the waiting receiver from `WaiterEB` if needed.
586                     // As a receiver is stored in the cell, the buffer expansion
587                     // procedure would finish, so senders simply ignore the "EB" marker.
588                     val receiver = if (state is WaiterEB) state.waiter else state
589                     // Try to make a rendezvous with the suspended receiver.
590                     return if (receiver.tryResumeReceiver(element)) {
591                         // Rendezvous! Move the cell state to `DONE_RCV` and finish.
592                         segment.setState(index, DONE_RCV)
593                         onReceiveDequeued()
594                         RESULT_RENDEZVOUS
595                     } else {
596                         // The resumption has failed. Update the cell state correspondingly
597                         // and clean the element field. It is also possible for a concurrent
598                         // `expandBuffer()` or the cancellation handler to update the cell state;
599                         // we can safely ignore these updates as senders do not help `expandBuffer()`.
600                         if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
601                             segment.onCancelledRequest(index, true)
602                         }
603                         RESULT_FAILED
604                     }
605                 }
606             }
607         }
608     }
609 
610     /**
611      * Checks whether a [send] invocation is bound to suspend if it is called
612      * with the specified [sendersAndCloseStatus], [receivers], and [bufferEnd]
613      * values. When this channel is already closed, the function returns `false`.
614      *
615      * Specifically, [send] suspends if the channel is not unlimited,
616      * the number of receivers is greater than then index of the working cell of the
617      * potential [send] invocation, and the buffer does not cover this cell
618      * in case of buffered channel.
619      * When the channel is already closed, [send] does not suspend.
620      */
621     @JsName("shouldSendSuspend0")
622     private fun shouldSendSuspend(curSendersAndCloseStatus: Long): Boolean {
623         // Does not suspend if the channel is already closed.
624         if (curSendersAndCloseStatus.isClosedForSend0) return false
625         // Does not suspend if a rendezvous may happen or the buffer is not full.
626         return !bufferOrRendezvousSend(curSendersAndCloseStatus.sendersCounter)
627     }
628 
629     /**
630      * Returns `true` when the specified [send] should place
631      * its element to the working cell without suspension.
632      */
633     private fun bufferOrRendezvousSend(curSenders: Long): Boolean =
634         curSenders < bufferEndCounter || curSenders < receiversCounter + capacity
635 
636     /**
637      * Checks whether a [send] invocation is bound to suspend if it is called
638      * with the current counter and close status values. See [shouldSendSuspend] for details.
639      *
640      * Note that this implementation is _false positive_ in case of rendezvous channels,
641      * so it can return `false` when a [send] invocation is bound to suspend. Specifically,
642      * the counter of `receive()` operations may indicate that there is a waiting receiver,
643      * while it has already been cancelled, so the potential rendezvous is bound to fail.
644      */
645     internal open fun shouldSendSuspend(): Boolean = shouldSendSuspend(sendersAndCloseStatus.value)
646 
647     /**
648      * Tries to resume this receiver with the specified [element] as a result.
649      * Returns `true` on success and `false` otherwise.
650      */
651     @Suppress("UNCHECKED_CAST")
652     private fun Any.tryResumeReceiver(element: E): Boolean = when(this) {
653         is SelectInstance<*> -> { // `onReceiveXXX` select clause
654             trySelect(this@BufferedChannel, element)
655         }
656         is ReceiveCatching<*> -> {
657             this as ReceiveCatching<E>
658             cont.tryResume0(success(element), onUndeliveredElement?.bindCancellationFunResult())
659         }
660         is BufferedChannel<*>.BufferedChannelIterator -> {
661             this as BufferedChannel<E>.BufferedChannelIterator
662             tryResumeHasNext(element)
663         }
664         is CancellableContinuation<*> -> { // `receive()`
665             this as CancellableContinuation<E>
666             tryResume0(element, onUndeliveredElement?.bindCancellationFun())
667         }
668         else -> error("Unexpected receiver type: $this")
669     }
670 
671     // ##########################
672     // # The receive operations #
673     // ##########################
674 
675     /**
676      * This function is invoked when a receiver is added as a waiter in this channel.
677      */
678     protected open fun onReceiveEnqueued() {}
679 
680     /**
681      * This function is invoked when a waiting receiver is no longer stored in this channel;
682      * independently on whether it is caused by rendezvous, cancellation, or channel closing.
683      */
684     protected open fun onReceiveDequeued() {}
685 
686     override suspend fun receive(): E =
687         receiveImpl( // <-- this is an inline function
688             // Do not create a continuation until it is required;
689             // it is created later via [onNoWaiterSuspend], if needed.
690             waiter = null,
691             // Return the received element on successful retrieval from
692             // the buffer or rendezvous with a suspended sender.
693             // Also, inform `BufferedChannel` extensions that
694             // synchronization of this receive operation is completed.
695             onElementRetrieved = { element ->
696                 return element
697             },
698             // As no waiter is provided, suspension is impossible.
699             onSuspend = { _, _, _ -> error("unexpected") },
700             // Throw an exception if the channel is already closed.
701             onClosed = { throw recoverStackTrace(receiveException) },
702             // If `receive()` decides to suspend, the corresponding
703             // `suspend` function that creates a continuation is called.
704             // The tail-call optimization is applied here.
705             onNoWaiterSuspend = { segm, i, r -> receiveOnNoWaiterSuspend(segm, i, r) }
706         )
707 
708     private suspend fun receiveOnNoWaiterSuspend(
709         /* The working cell is specified by
710         the segment and the index in it. */
711         segment: ChannelSegment<E>,
712         index: Int,
713         /* The global index of the cell. */
714         r: Long
715     ) = suspendCancellableCoroutineReusable { cont ->
716         receiveImplOnNoWaiter( // <-- this is an inline function
717             segment = segment, index = index, r = r,
718             // Store the created continuation as a waiter.
719             waiter = cont,
720             // In case of successful element retrieval, resume
721             // the continuation with the element and inform the
722             // `BufferedChannel` extensions that the synchronization
723             // is completed. Importantly, the receiver coroutine
724             // may be cancelled after it is successfully resumed but
725             // not dispatched yet. In case `onUndeliveredElement` is
726             // specified, we need to invoke it in the latter case.
727             onElementRetrieved = { element ->
728                 val onCancellation = onUndeliveredElement?.bindCancellationFun()
729                 cont.resume(element, onCancellation)
730             },
731             onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
732         )
733     }
734 
735     private fun Waiter.prepareReceiverForSuspension(segment: ChannelSegment<E>, index: Int) {
736         onReceiveEnqueued()
737         invokeOnCancellation(segment, index)
738     }
739 
740     private fun onClosedReceiveOnNoWaiterSuspend(cont: CancellableContinuation<E>) {
741         cont.resumeWithException(receiveException)
742     }
743 
744     /*
745     The implementation is exactly the same as of `receive()`,
746     with the only difference that this function returns a `ChannelResult`
747     instance and does not throw exception explicitly in case the channel
748     is already closed for receiving. Please refer the plain `receive()`
749     implementation for the comments.
750     */
751     override suspend fun receiveCatching(): ChannelResult<E> =
752         receiveImpl( // <-- this is an inline function
753             waiter = null,
754             onElementRetrieved = { element ->
755                 success(element)
756             },
757             onSuspend = { _, _, _ -> error("unexpected") },
758             onClosed = { closed(closeCause) },
759             onNoWaiterSuspend = { segm, i, r -> receiveCatchingOnNoWaiterSuspend(segm, i, r) }
760         )
761 
762     private suspend fun receiveCatchingOnNoWaiterSuspend(
763         segment: ChannelSegment<E>,
764         index: Int,
765         r: Long
766     ) = suspendCancellableCoroutineReusable { cont ->
767         val waiter = ReceiveCatching(cont as CancellableContinuationImpl<ChannelResult<E>>)
768         receiveImplOnNoWaiter(
769             segment, index, r,
770             waiter = waiter,
771             onElementRetrieved = { element ->
772                 cont.resume(success(element), onUndeliveredElement?.bindCancellationFunResult())
773             },
774             onClosed = { onClosedReceiveCatchingOnNoWaiterSuspend(cont) }
775         )
776     }
777 
778     private fun onClosedReceiveCatchingOnNoWaiterSuspend(cont: CancellableContinuation<ChannelResult<E>>) {
779         cont.resume(closed(closeCause))
780     }
781 
782     override fun tryReceive(): ChannelResult<E> {
783         // Read the `receivers` counter first.
784         val r = receivers.value
785         val sendersAndCloseStatusCur = sendersAndCloseStatus.value
786         // Is this channel closed for receive?
787         if (sendersAndCloseStatusCur.isClosedForReceive0) {
788             return closed(closeCause)
789         }
790         // Do not try to receive an element if the plain `receive()` operation would suspend.
791         val s = sendersAndCloseStatusCur.sendersCounter
792         if (r >= s) return failure()
793         // Let's try to retrieve an element!
794         // The logic is similar to the plain `receive()` operation, with
795         // the only difference that we store `INTERRUPTED_RCV` in case
796         // the operation decides to suspend. This way, we can leverage
797         // the unconditional `Fetch-and-Add` instruction.
798         // One may consider storing `INTERRUPTED_RCV` instead of an actual waiter
799         // on suspension (a.k.a. "no elements to retrieve") as a short-cut of
800         // "suspending and cancelling immediately".
801         return receiveImpl( // <-- this is an inline function
802             // Store an already interrupted receiver in case of suspension.
803             waiter = INTERRUPTED_RCV,
804             // Finish when an element is successfully retrieved.
805             onElementRetrieved = { element -> success(element) },
806             // On suspension, the `INTERRUPTED_RCV` token has been
807             // installed, and this `tryReceive()` must fail.
808             onSuspend = { segm, _, globalIndex ->
809                 // Emulate "cancelled" receive, thus invoking 'waitExpandBufferCompletion' manually,
810                 // because effectively there were no cancellation
811                 waitExpandBufferCompletion(globalIndex)
812                 segm.onSlotCleaned()
813                 failure()
814             },
815             // If the channel is closed, return the corresponding result.
816             onClosed = { closed(closeCause) }
817         )
818     }
819 
820     /**
821      * Extracts the first element from this channel until the cell with the specified
822      * index is moved to the logical buffer. This is a key procedure for the _conflated_
823      * channel implementation, see [ConflatedBufferedChannel] with the [BufferOverflow.DROP_OLDEST]
824      * strategy on buffer overflowing.
825      */
826     protected fun dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(globalCellIndex: Long) {
827         assert { isConflatedDropOldest }
828         // Read the segment reference before the counter increment;
829         // it is crucial to be able to find the required segment later.
830         var segment = receiveSegment.value
831         while (true) {
832             // Read the receivers counter to check whether the specified cell is already in the buffer
833             // or should be moved to the buffer in a short time, due to the already started `receive()`.
834             val r = this.receivers.value
835             if (globalCellIndex < max(r + capacity, bufferEndCounter)) return
836             // The cell is outside the buffer. Try to extract the first element
837             // if the `receivers` counter has not been changed.
838             if (!this.receivers.compareAndSet(r, r + 1)) continue
839             // Count the required segment id and the cell index in it.
840             val id = r / SEGMENT_SIZE
841             val i = (r % SEGMENT_SIZE).toInt()
842             // Try to find the required segment if the initially obtained
843             // segment (in the beginning of this function) has lower id.
844             if (segment.id != id) {
845                 // Find the required segment, restarting the operation if it has not been found.
846                 segment = findSegmentReceive(id, segment) ?:
847                     // The required segment has not been found. It is possible that the channel is already
848                     // closed for receiving, so the linked list of segments is closed as well.
849                     // In the latter case, the operation will finish eventually after incrementing
850                     // the `receivers` counter sufficient times. Note that it is impossible to check
851                     // whether this channel is closed for receiving (we do this in `receive`),
852                     // as it may call this function when helping to complete closing the channel.
853                     continue
854             }
855             // Update the cell according to the cell life-cycle.
856             val updCellResult = updateCellReceive(segment, i, r, null)
857             when {
858                 updCellResult === FAILED -> {
859                     // The cell is poisoned; restart from the beginning.
860                     // To avoid memory leaks, we also need to reset
861                     // the `prev` pointer of the working segment.
862                     if (r < sendersCounter) segment.cleanPrev()
863                 }
864                 else -> { // element
865                     // A buffered element was retrieved from the cell.
866                     // Clean the reference to the previous segment.
867                     segment.cleanPrev()
868                     @Suppress("UNCHECKED_CAST")
869                     onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it }
870                 }
871             }
872         }
873     }
874 
875     /**
876      * Abstract receive implementation.
877      */
878     private inline fun <R> receiveImpl(
879         /* The waiter to be stored in case of suspension,
880         or `null` if the waiter is not created yet.
881         In the latter case, if the algorithm decides
882         to suspend, [onNoWaiterSuspend] is called. */
883         waiter: Any?,
884         /* This lambda is invoked when an element has been
885         successfully retrieved, either from the buffer or
886         by making a rendezvous with a suspended sender. */
887         onElementRetrieved: (element: E) -> R,
888         /* This lambda is called when the operation suspends in the cell
889         specified by the segment and its global and in-segment indices. */
890         onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
891         /* This lambda is called when the channel is observed
892         in the closed state and no waiting sender is found,
893         which means that it is closed for receiving. */
894         onClosed: () -> R,
895         /* This lambda is called when the operation decides
896         to suspend, but the waiter is not provided (equals `null`).
897         It should create a waiter and delegate to `sendImplOnNoWaiter`. */
898         onNoWaiterSuspend: (
899             segm: ChannelSegment<E>,
900             i: Int,
901             r: Long
902         ) -> R = { _, _, _ -> error("unexpected") }
903     ): R {
904         // Read the segment reference before the counter increment;
905         // it is crucial to be able to find the required segment later.
906         var segment = receiveSegment.value
907         while (true) {
908             // Similar to the `send(e)` operation, `receive()` first checks
909             // whether the channel is already closed for receiving.
910             if (isClosedForReceive) return onClosed()
911             // Atomically increments the `receivers` counter
912             // and obtain the value right before the increment.
913             val r = this.receivers.getAndIncrement()
914             // Count the required segment id and the cell index in it.
915             val id = r / SEGMENT_SIZE
916             val i = (r % SEGMENT_SIZE).toInt()
917             // Try to find the required segment if the initially obtained
918             // segment (in the beginning of this function) has lower id.
919             if (segment.id != id) {
920                 // Find the required segment, restarting the operation if it has not been found.
921                 segment = findSegmentReceive(id, segment) ?:
922                     // The required segment is not found. It is possible that the channel is already
923                     // closed for receiving, so the linked list of segments is closed as well.
924                     // In the latter case, the operation fails with the corresponding check at the beginning.
925                     continue
926             }
927             // Update the cell according to the cell life-cycle.
928             val updCellResult = updateCellReceive(segment, i, r, waiter)
929             return when {
930                 updCellResult === SUSPEND -> {
931                     // The operation has decided to suspend and
932                     // stored the specified waiter in the cell.
933                     (waiter as? Waiter)?.prepareReceiverForSuspension(segment, i)
934                     onSuspend(segment, i, r)
935                 }
936                 updCellResult === FAILED -> {
937                     // The operation has tried to make a rendezvous
938                     // but failed: either the opposite request has
939                     // already been cancelled or the cell is poisoned.
940                     // Restart from the beginning in this case.
941                     // To avoid memory leaks, we also need to reset
942                     // the `prev` pointer of the working segment.
943                     if (r < sendersCounter) segment.cleanPrev()
944                     continue
945                 }
946                 updCellResult === SUSPEND_NO_WAITER -> {
947                     // The operation has decided to suspend,
948                     // but no waiter has been provided.
949                     onNoWaiterSuspend(segment, i, r)
950                 }
951                 else -> { // element
952                     // Either a buffered element was retrieved from the cell
953                     // or a rendezvous with a waiting sender has happened.
954                     // Clean the reference to the previous segment before finishing.
955                     segment.cleanPrev()
956                     @Suppress("UNCHECKED_CAST")
957                     onElementRetrieved(updCellResult as E)
958                 }
959             }
960         }
961     }
962 
963     private inline fun receiveImplOnNoWaiter(
964         /* The working cell is specified by
965         the segment and the index in it. */
966         segment: ChannelSegment<E>,
967         index: Int,
968         /* The global index of the cell. */
969         r: Long,
970         /* The waiter to be stored in case of suspension. */
971         waiter: Waiter,
972         /* This lambda is invoked when an element has been
973         successfully retrieved, either from the buffer or
974         by making a rendezvous with a suspended sender. */
975         onElementRetrieved: (element: E) -> Unit,
976         /* This lambda is called when the channel is observed
977         in the closed state and no waiting senders is found,
978         which means that it is closed for receiving. */
979         onClosed: () -> Unit
980     ) {
981         // Update the cell with the non-null waiter,
982         // restarting from the beginning on failure.
983         // Check the `receiveImpl(..)` function for the comments.
984         val updCellResult = updateCellReceive(segment, index, r, waiter)
985         when {
986             updCellResult === SUSPEND -> {
987                 waiter.prepareReceiverForSuspension(segment, index)
988             }
989             updCellResult === FAILED -> {
990                 if (r < sendersCounter) segment.cleanPrev()
991                 receiveImpl(
992                     waiter = waiter,
993                     onElementRetrieved = onElementRetrieved,
994                     onSuspend = { _, _, _ -> },
995                     onClosed = onClosed
996                 )
997             }
998             else -> {
999                 segment.cleanPrev()
1000                 @Suppress("UNCHECKED_CAST")
1001                 onElementRetrieved(updCellResult as E)
1002             }
1003         }
1004     }
1005 
1006     private fun updateCellReceive(
1007         /* The working cell is specified by
1008         the segment and the index in it. */
1009         segment: ChannelSegment<E>,
1010         index: Int,
1011         /* The global index of the cell. */
1012         r: Long,
1013         /* The waiter to be stored in case of suspension. */
1014         waiter: Any?,
1015     ): Any? {
1016         // This is a fast-path of `updateCellReceiveSlow(..)`.
1017         //
1018         // Read the current cell state.
1019         val state = segment.getState(index)
1020         when {
1021             // The cell is empty.
1022             state === null -> {
1023                 // If a rendezvous must happen, the operation does not wait
1024                 // until the cell stores a buffered element or a suspended
1025                 // sender, poisoning the cell and restarting instead.
1026                 // Otherwise, try to store the specified waiter in the cell.
1027                 val senders = sendersAndCloseStatus.value.sendersCounter
1028                 if (r >= senders) {
1029                     // This `receive()` operation should suspend.
1030                     if (waiter === null) {
1031                         // The waiter is not specified;
1032                         // return the corresponding result.
1033                         return SUSPEND_NO_WAITER
1034                     }
1035                     // Try to install the waiter.
1036                     if (segment.casState(index, state, waiter)) {
1037                         // The waiter has been successfully installed.
1038                         // Invoke the `expandBuffer()` procedure and finish.
1039                         expandBuffer()
1040                         return SUSPEND
1041                     }
1042                 }
1043             }
1044             // The cell stores a buffered element.
1045             state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
1046                 // Retrieve the element and expand the buffer.
1047                 expandBuffer()
1048                 return segment.retrieveElement(index)
1049             }
1050         }
1051         return updateCellReceiveSlow(segment, index, r, waiter)
1052     }
1053 
1054     private fun updateCellReceiveSlow(
1055         /* The working cell is specified by
1056         the segment and the index in it. */
1057         segment: ChannelSegment<E>,
1058         index: Int,
1059         /* The global index of the cell. */
1060         r: Long,
1061         /* The waiter to be stored in case of suspension. */
1062         waiter: Any?,
1063     ): Any? {
1064         // The cell state should be updated according to  its state machine;
1065         // see the paper mentioned in the very beginning for the algorithm details.
1066         while (true) {
1067             // Read the current cell state.
1068             val state = segment.getState(index)
1069             when {
1070                 // The cell is empty.
1071                 state === null || state === IN_BUFFER -> {
1072                     // If a rendezvous must happen, the operation does not wait
1073                     // until the cell stores a buffered element or a suspended
1074                     // sender, poisoning the cell and restarting instead.
1075                     // Otherwise, try to store the specified waiter in the cell.
1076                     val senders = sendersAndCloseStatus.value.sendersCounter
1077                     if (r < senders) {
1078                         // The cell is already covered by sender,
1079                         // so a rendezvous must happen. Unfortunately,
1080                         // the cell is empty, so the operation poisons it.
1081                         if (segment.casState(index, state, POISONED)) {
1082                             // When the cell becomes poisoned, it is essentially
1083                             // the same as storing an already cancelled receiver.
1084                             // Thus, the `expandBuffer()` procedure should be invoked.
1085                             expandBuffer()
1086                             return FAILED
1087                         }
1088                     } else {
1089                         // This `receive()` operation should suspend.
1090                         if (waiter === null) {
1091                             // The waiter is not specified;
1092                             // return the corresponding result.
1093                             return SUSPEND_NO_WAITER
1094                         }
1095                         // Try to install the waiter.
1096                         if (segment.casState(index, state, waiter)) {
1097                             // The waiter has been successfully installed.
1098                             // Invoke the `expandBuffer()` procedure and finish.
1099                             expandBuffer()
1100                             return SUSPEND
1101                         }
1102                     }
1103                 }
1104                 // The cell stores a buffered element.
1105                 state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
1106                     // Retrieve the element and expand the buffer.
1107                     expandBuffer()
1108                     return segment.retrieveElement(index)
1109                 }
1110                 // The cell stores an interrupted sender.
1111                 state === INTERRUPTED_SEND -> return FAILED
1112                 // The cell is already poisoned by a concurrent
1113                 // `hasElements` call. Restart in this case.
1114                 state === POISONED -> return FAILED
1115                 // This channel is already closed.
1116                 state === CHANNEL_CLOSED -> {
1117                     // Although the channel is closed, it is still required
1118                     // to call the `expandBuffer()` procedure to keep
1119                     // `waitForExpandBufferCompletion()` correct.
1120                     expandBuffer()
1121                     return FAILED
1122                 }
1123                 // A concurrent `expandBuffer()` is resuming a
1124                 // suspended sender. Wait in a spin-loop until
1125                 // the resumption attempt completes: the cell
1126                 // state must change to either `BUFFERED` or
1127                 // `INTERRUPTED_SEND`.
1128                 state === RESUMING_BY_EB -> continue
1129                 // The cell stores a suspended sender; try to resume it.
1130                 else -> {
1131                     // To synchronize with expandBuffer(), the algorithm
1132                     // first moves the cell to an intermediate `S_RESUMING_BY_RCV`
1133                     // state, updating it to either `BUFFERED` (on success) or
1134                     // `INTERRUPTED_SEND` (on failure).
1135                     if (segment.casState(index, state, RESUMING_BY_RCV)) {
1136                         // Has a concurrent `expandBuffer()` delegated its completion?
1137                         val helpExpandBuffer = state is WaiterEB
1138                         // Extract the sender if needed and try to resume it.
1139                         val sender = if (state is WaiterEB) state.waiter else state
1140                         return if (sender.tryResumeSender(segment, index)) {
1141                             // The sender has been resumed successfully!
1142                             // Update the cell state correspondingly,
1143                             // expand the buffer, and return the element
1144                             // stored in the cell.
1145                             // In case a concurrent `expandBuffer()` has delegated
1146                             // its completion, the procedure should finish, as the
1147                             // sender is resumed. Thus, no further action is required.
1148                             segment.setState(index, DONE_RCV)
1149                             expandBuffer()
1150                             segment.retrieveElement(index)
1151                         } else {
1152                             // The resumption has failed. Update the cell correspondingly.
1153                             // In case a concurrent `expandBuffer()` has delegated
1154                             // its completion, the procedure should skip this cell, so
1155                             // `expandBuffer()` should be called once again.
1156                             segment.setState(index, INTERRUPTED_SEND)
1157                             segment.onCancelledRequest(index, false)
1158                             if (helpExpandBuffer) expandBuffer()
1159                             FAILED
1160                         }
1161                     }
1162                 }
1163             }
1164         }
1165     }
1166 
1167     private fun Any.tryResumeSender(segment: ChannelSegment<E>, index: Int): Boolean = when (this) {
1168         is CancellableContinuation<*> -> { // suspended `send(e)` operation
1169             @Suppress("UNCHECKED_CAST")
1170             this as CancellableContinuation<Unit>
1171             tryResume0(Unit)
1172         }
1173         is SelectInstance<*> -> {
1174             this as SelectImplementation<*>
1175             val trySelectResult = trySelectDetailed(clauseObject = this@BufferedChannel, result = Unit)
1176             // Clean the element slot to avoid memory leaks
1177             // if this `select` clause should be re-registered.
1178             if (trySelectResult === REREGISTER) segment.cleanElement(index)
1179             // Was the resumption successful?
1180             trySelectResult === SUCCESSFUL
1181         }
1182         is SendBroadcast -> cont.tryResume0(true) // // suspended `sendBroadcast(e)` operation
1183         else -> error("Unexpected waiter: $this")
1184     }
1185 
1186     // ################################
1187     // # The expandBuffer() procedure #
1188     // ################################
1189 
1190     private fun expandBuffer() {
1191         // Do not need to take any action if
1192         // this channel is rendezvous or unlimited.
1193         if (isRendezvousOrUnlimited) return
1194         // Read the current segment of
1195         // the `expandBuffer()` procedure.
1196         var segment = bufferEndSegment.value
1197         // Try to expand the buffer until succeed.
1198         try_again@ while (true) {
1199             // Increment the logical end of the buffer.
1200             // The `b`-th cell is going to be added to the buffer.
1201             val b = bufferEnd.getAndIncrement()
1202             val id = b / SEGMENT_SIZE
1203             // After that, read the current `senders` counter.
1204             // In case its value is lower than `b`, the `send(e)`
1205             // invocation that will work with this `b`-th cell
1206             // will detect that the cell is already a part of the
1207             // buffer when comparing with the `bufferEnd` counter.
1208             // However, `bufferEndSegment` may reference an outdated
1209             // segment, which should be updated to avoid memory leaks.
1210             val s = sendersCounter
1211             if (s <= b) {
1212                 // Should `bufferEndSegment` be moved forward to avoid memory leaks?
1213                 if (segment.id < id && segment.next != null)
1214                     moveSegmentBufferEndToSpecifiedOrLast(id, segment)
1215                 // Increment the number of completed `expandBuffer()`-s and finish.
1216                 incCompletedExpandBufferAttempts()
1217                 return
1218             }
1219             // Is `bufferEndSegment` outdated or is the segment with the required id already removed?
1220             // Find the required segment, creating new ones if needed.
1221             if (segment.id != id) {
1222                 segment = findSegmentBufferEnd(id, segment, b)
1223                     // Restart if the required segment is removed, or
1224                     // the linked list of segments is already closed,
1225                     // and the required one will never be created.
1226                     // Please note that `findSegmentBuffer(..)` updates
1227                     // the number of completed `expandBuffer()` attempt
1228                     // in this case.
1229                     ?: continue@try_again
1230             }
1231             // Try to add the cell to the logical buffer,
1232             // updating the cell state according to the state-machine.
1233             val i = (b % SEGMENT_SIZE).toInt()
1234             if (updateCellExpandBuffer(segment, i, b)) {
1235                 // The cell has been added to the logical buffer!
1236                 // Increment the number of completed `expandBuffer()`-s and finish.
1237                 //
1238                 // Note that it is possible to increment the number of
1239                 // completed `expandBuffer()` attempts earlier, right
1240                 // after the segment is obtained. We find this change
1241                 // counter-intuitive and prefer to avoid it.
1242                 incCompletedExpandBufferAttempts()
1243                 return
1244             } else {
1245                 // The cell has not been added to the buffer.
1246                 // Increment the number of completed `expandBuffer()`
1247                 // attempts and restart.
1248                 incCompletedExpandBufferAttempts()
1249                 continue@try_again
1250             }
1251         }
1252     }
1253 
1254     private fun updateCellExpandBuffer(
1255         /* The working cell is specified by
1256         the segment and the index in it. */
1257         segment: ChannelSegment<E>,
1258         index: Int,
1259         /* The global index of the cell. */
1260         b: Long
1261     ): Boolean {
1262         // This is a fast-path of `updateCellExpandBufferSlow(..)`.
1263         //
1264         // Read the current cell state.
1265         val state = segment.getState(index)
1266         if (state is Waiter) {
1267             // Usually, a sender is stored in the cell.
1268             // However, it is possible for a concurrent
1269             // receiver to be already suspended there.
1270             // Try to distinguish whether the waiter is a
1271             // sender by comparing the global cell index with
1272             // the `receivers` counter. In case the cell is not
1273             // covered by a receiver, a sender is stored in the cell.
1274             if (b >= receivers.value) {
1275                 // The cell stores a suspended sender. Try to resume it.
1276                 // To synchronize with a concurrent `receive()`, the algorithm
1277                 // first moves the cell state to an intermediate `RESUMING_BY_EB`
1278                 // state, updating it to either `BUFFERED` (on successful resumption)
1279                 // or `INTERRUPTED_SEND` (on failure).
1280                 if (segment.casState(index, state, RESUMING_BY_EB)) {
1281                     return if (state.tryResumeSender(segment, index)) {
1282                         // The sender has been resumed successfully!
1283                         // Move the cell to the logical buffer and finish.
1284                         segment.setState(index, BUFFERED)
1285                         true
1286                     } else {
1287                         // The resumption has failed.
1288                         segment.setState(index, INTERRUPTED_SEND)
1289                         segment.onCancelledRequest(index, false)
1290                         false
1291                     }
1292                 }
1293             }
1294         }
1295         return updateCellExpandBufferSlow(segment, index, b)
1296     }
1297 
1298     private fun updateCellExpandBufferSlow(
1299         /* The working cell is specified by
1300         the segment and the index in it. */
1301         segment: ChannelSegment<E>,
1302         index: Int,
1303         /* The global index of the cell. */
1304         b: Long
1305     ): Boolean {
1306         // Update the cell state according to its state machine.
1307         // See the paper mentioned in the very beginning for
1308         // the cell life-cycle and the algorithm details.
1309         while (true) {
1310             // Read the current cell state.
1311             val state = segment.getState(index)
1312             when {
1313                 // A suspended waiter, sender or receiver.
1314                 state is Waiter -> {
1315                     // Usually, a sender is stored in the cell.
1316                     // However, it is possible for a concurrent
1317                     // receiver to be already suspended there.
1318                     // Try to distinguish whether the waiter is a
1319                     // sender by comparing the global cell index with
1320                     // the `receivers` counter. In case the cell is not
1321                     // covered by a receiver, a sender is stored in the cell.
1322                     if (b < receivers.value) {
1323                         // The algorithm cannot distinguish whether the
1324                         // suspended in the cell operation is sender or receiver.
1325                         // To make progress, `expandBuffer()` delegates its completion
1326                         // to an upcoming pairwise request, atomically wrapping
1327                         // the waiter in `WaiterEB`. In case a sender is stored
1328                         // in the cell, the upcoming receiver will call `expandBuffer()`
1329                         // if the sender resumption fails; thus, effectively, skipping
1330                         // this cell. Otherwise, if a receiver is stored in the cell,
1331                         // this `expandBuffer()` procedure must finish; therefore,
1332                         // sender ignore the `WaiterEB` wrapper.
1333                         if (segment.casState(index, state, WaiterEB(waiter = state)))
1334                             return true
1335                     } else {
1336                         // The cell stores a suspended sender. Try to resume it.
1337                         // To synchronize with a concurrent `receive()`, the algorithm
1338                         // first moves the cell state to an intermediate `RESUMING_BY_EB`
1339                         // state, updating it to either `BUFFERED` (on successful resumption)
1340                         // or `INTERRUPTED_SEND` (on failure).
1341                         if (segment.casState(index, state, RESUMING_BY_EB)) {
1342                             return if (state.tryResumeSender(segment, index)) {
1343                                 // The sender has been resumed successfully!
1344                                 // Move the cell to the logical buffer and finish.
1345                                 segment.setState(index, BUFFERED)
1346                                 true
1347                             } else {
1348                                 // The resumption has failed.
1349                                 segment.setState(index, INTERRUPTED_SEND)
1350                                 segment.onCancelledRequest(index, false)
1351                                 false
1352                             }
1353                         }
1354                     }
1355                 }
1356                 // The cell stores an interrupted sender, skip it.
1357                 state === INTERRUPTED_SEND -> return false
1358                 // The cell is empty, a concurrent sender is coming.
1359                 state === null -> {
1360                     // To inform a concurrent sender that this cell is
1361                     // already a part of the buffer, the algorithm moves
1362                     // it to a special `IN_BUFFER` state.
1363                     if (segment.casState(index, state, IN_BUFFER)) return true
1364                 }
1365                 // The cell is already a part of the buffer, finish.
1366                 state === BUFFERED -> return true
1367                 // The cell is already processed by a receiver, no further action is required.
1368                 state === POISONED || state === DONE_RCV || state === INTERRUPTED_RCV -> return true
1369                 // The channel is closed, all the following
1370                 // cells are already in the same state, finish.
1371                 state === CHANNEL_CLOSED -> return true
1372                 // A concurrent receiver is resuming the suspended sender.
1373                 // Wait in a spin-loop until it changes the cell state
1374                 // to either `DONE_RCV` or `INTERRUPTED_SEND`.
1375                 state === RESUMING_BY_RCV -> continue // spin wait
1376                 else -> error("Unexpected cell state: $state")
1377             }
1378         }
1379     }
1380 
1381     /**
1382      * Increments the counter of completed [expandBuffer] invocations.
1383      * To guarantee starvation-freedom for [waitExpandBufferCompletion],
1384      * which waits until the counters of started and completed [expandBuffer] calls
1385      * coincide and become greater or equal to the specified value,
1386      * [waitExpandBufferCompletion] may set a flag that pauses further progress.
1387      */
1388     private fun incCompletedExpandBufferAttempts(nAttempts: Long = 1) {
1389         // Increment the number of completed `expandBuffer()` calls.
1390         completedExpandBuffersAndPauseFlag.addAndGet(nAttempts).also {
1391             // Should further `expandBuffer()`-s be paused?
1392             // If so, this thread should wait in a spin-loop
1393             // until the flag is unset.
1394             if (it.ebPauseExpandBuffers) {
1395                 @Suppress("ControlFlowWithEmptyBody")
1396                 while (completedExpandBuffersAndPauseFlag.value.ebPauseExpandBuffers) {}
1397             }
1398         }
1399     }
1400 
1401     /**
1402      * Waits in a spin-loop until the [expandBuffer] call that
1403      * should process the [globalIndex]-th cell is completed.
1404      * Essentially, it waits until the numbers of started ([bufferEnd])
1405      * and completed ([completedExpandBuffersAndPauseFlag]) [expandBuffer]
1406      * attempts coincide and become equal or greater than [globalIndex].
1407      * To avoid starvation, this function may set a flag
1408      * that pauses further progress.
1409      */
1410     internal fun waitExpandBufferCompletion(globalIndex: Long) {
1411         // Do nothing if this channel is rendezvous or unlimited;
1412         // `expandBuffer()` is not used in these cases.
1413         if (isRendezvousOrUnlimited) return
1414         // Wait in an infinite loop until the number of started
1415         // buffer expansion calls become not lower than the cell index.
1416         @Suppress("ControlFlowWithEmptyBody")
1417         while (bufferEndCounter <= globalIndex) {}
1418         // Now it is guaranteed that the `expandBuffer()` call that
1419         // should process the required cell has been started.
1420         // Wait in a fixed-size spin-loop until the numbers of
1421         // started and completed buffer expansion calls coincide.
1422         repeat(EXPAND_BUFFER_COMPLETION_WAIT_ITERATIONS) {
1423             // Read the number of started buffer expansion calls.
1424             val b = bufferEndCounter
1425             // Read the number of completed buffer expansion calls.
1426             val ebCompleted = completedExpandBuffersAndPauseFlag.value.ebCompletedCounter
1427             // Do the numbers of started and completed calls coincide?
1428             // Note that we need to re-read the number of started `expandBuffer()`
1429             // calls to obtain a correct snapshot.
1430             // Here we wait to a precise match in order to ensure that **our matching expandBuffer()**
1431             // completed. The only way to ensure that is to check that number of started expands == number of finished expands
1432             if (b == ebCompleted && b == bufferEndCounter) return
1433         }
1434         // To avoid starvation, pause further `expandBuffer()` calls.
1435         completedExpandBuffersAndPauseFlag.update {
1436             constructEBCompletedAndPauseFlag(it.ebCompletedCounter, true)
1437         }
1438         // Now wait in an infinite spin-loop until the counters coincide.
1439         while (true) {
1440             // Read the number of started buffer expansion calls.
1441             val b = bufferEndCounter
1442             // Read the number of completed buffer expansion calls
1443             // along with the flag that pauses further progress.
1444             val ebCompletedAndBit = completedExpandBuffersAndPauseFlag.value
1445             val ebCompleted = ebCompletedAndBit.ebCompletedCounter
1446             val pauseExpandBuffers = ebCompletedAndBit.ebPauseExpandBuffers
1447             // Do the numbers of started and completed calls coincide?
1448             // Note that we need to re-read the number of started `expandBuffer()`
1449             // calls to obtain a correct snapshot.
1450             if (b == ebCompleted && b == bufferEndCounter) {
1451                 // Unset the flag, which pauses progress, and finish.
1452                 completedExpandBuffersAndPauseFlag.update {
1453                     constructEBCompletedAndPauseFlag(it.ebCompletedCounter, false)
1454                 }
1455                 return
1456             }
1457             // It is possible that a concurrent caller of this function
1458             // has unset the flag, which pauses further progress to avoid
1459             // starvation. In this case, set the flag back.
1460             if (!pauseExpandBuffers) {
1461                 completedExpandBuffersAndPauseFlag.compareAndSet(
1462                     ebCompletedAndBit,
1463                     constructEBCompletedAndPauseFlag(ebCompleted, true)
1464                 )
1465             }
1466         }
1467     }
1468 
1469 
1470     // #######################
1471     // ## Select Expression ##
1472     // #######################
1473 
1474     @Suppress("UNCHECKED_CAST")
1475     override val onSend: SelectClause2<E, BufferedChannel<E>>
1476         get() = SelectClause2Impl(
1477             clauseObject = this@BufferedChannel,
1478             regFunc = BufferedChannel<*>::registerSelectForSend as RegistrationFunction,
1479             processResFunc = BufferedChannel<*>::processResultSelectSend as ProcessResultFunction
1480         )
1481 
1482     @Suppress("UNCHECKED_CAST")
1483     protected open fun registerSelectForSend(select: SelectInstance<*>, element: Any?) =
1484         sendImpl( // <-- this is an inline function
1485             element = element as E,
1486             waiter = select,
1487             onRendezvousOrBuffered = { select.selectInRegistrationPhase(Unit) },
1488             onSuspend = { _, _ -> },
1489             onClosed = { onClosedSelectOnSend(element, select) }
1490         )
1491 
1492 
1493     private fun onClosedSelectOnSend(element: E, select: SelectInstance<*>) {
1494         onUndeliveredElement?.callUndeliveredElement(element, select.context)
1495         select.selectInRegistrationPhase(CHANNEL_CLOSED)
1496     }
1497 
1498     @Suppress("UNUSED_PARAMETER", "RedundantNullableReturnType")
1499     private fun processResultSelectSend(ignoredParam: Any?, selectResult: Any?): Any? =
1500         if (selectResult === CHANNEL_CLOSED) throw sendException
1501         else this
1502 
1503     @Suppress("UNCHECKED_CAST")
1504     override val onReceive: SelectClause1<E>
1505         get() = SelectClause1Impl(
1506             clauseObject = this@BufferedChannel,
1507             regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1508             processResFunc = BufferedChannel<*>::processResultSelectReceive as ProcessResultFunction,
1509             onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1510         )
1511 
1512     @Suppress("UNCHECKED_CAST")
1513     override val onReceiveCatching: SelectClause1<ChannelResult<E>>
1514         get() = SelectClause1Impl(
1515             clauseObject = this@BufferedChannel,
1516             regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1517             processResFunc = BufferedChannel<*>::processResultSelectReceiveCatching as ProcessResultFunction,
1518             onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1519         )
1520 
1521     @Suppress("OVERRIDE_DEPRECATION", "UNCHECKED_CAST")
1522     override val onReceiveOrNull: SelectClause1<E?>
1523         get() = SelectClause1Impl(
1524             clauseObject = this@BufferedChannel,
1525             regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1526             processResFunc = BufferedChannel<*>::processResultSelectReceiveOrNull as ProcessResultFunction,
1527             onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1528         )
1529 
1530     @Suppress("UNUSED_PARAMETER")
1531     private fun registerSelectForReceive(select: SelectInstance<*>, ignoredParam: Any?) =
1532         receiveImpl( // <-- this is an inline function
1533             waiter = select,
1534             onElementRetrieved = { elem -> select.selectInRegistrationPhase(elem) },
1535             onSuspend = { _, _, _ -> },
1536             onClosed = { onClosedSelectOnReceive(select) }
1537         )
1538 
1539     private fun onClosedSelectOnReceive(select: SelectInstance<*>) {
1540         select.selectInRegistrationPhase(CHANNEL_CLOSED)
1541     }
1542 
1543     @Suppress("UNUSED_PARAMETER")
1544     private fun processResultSelectReceive(ignoredParam: Any?, selectResult: Any?): Any? =
1545         if (selectResult === CHANNEL_CLOSED) throw receiveException
1546         else selectResult
1547 
1548     @Suppress("UNUSED_PARAMETER")
1549     private fun processResultSelectReceiveOrNull(ignoredParam: Any?, selectResult: Any?): Any? =
1550         if (selectResult === CHANNEL_CLOSED) {
1551             if (closeCause == null) null
1552             else throw receiveException
1553         } else selectResult
1554 
1555     @Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER", "RedundantNullableReturnType")
1556     private fun processResultSelectReceiveCatching(ignoredParam: Any?, selectResult: Any?): Any? =
1557         if (selectResult === CHANNEL_CLOSED) closed(closeCause)
1558         else success(selectResult as E)
1559 
1560     @Suppress("UNCHECKED_CAST")
1561     private val onUndeliveredElementReceiveCancellationConstructor: OnCancellationConstructor? = onUndeliveredElement?.let {
1562         { select: SelectInstance<*>, _: Any?, element: Any? ->
1563             { _, _, _ ->
1564                 if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(element as E, select.context)
1565             }
1566         }
1567     }
1568 
1569     // ######################
1570     // ## Iterator Support ##
1571     // ######################
1572 
1573     override fun iterator(): ChannelIterator<E> = BufferedChannelIterator()
1574 
1575     /**
1576      * The key idea is that an iterator is a special receiver type,
1577      * which should be resumed differently to [receive] and [onReceive]
1578      * operations, but can be served as a waiter in a way similar to
1579      * [CancellableContinuation] and [SelectInstance].
1580      *
1581      * Roughly, [hasNext] is a [receive] sibling, while [next] simply
1582      * returns the already retrieved element and [hasNext] being idempotent.
1583      * From the implementation side, [receiveResult] stores the element retrieved by [hasNext]
1584      * (or a special [CHANNEL_CLOSED] token if the channel is closed).
1585      *
1586      * The [invoke] function is a [CancelHandler] implementation,
1587      * which requires knowing the [segment] and the [index] in it
1588      * that specify the location of the stored iterator.
1589      *
1590      * To resume the suspended [hasNext] call, a special [tryResumeHasNext]
1591      * function should be used in a way similar to [CancellableContinuation.tryResume]
1592      * and [SelectInstance.trySelect]. When the channel becomes closed,
1593      * [tryResumeHasNextOnClosedChannel] should be used instead.
1594      */
1595     private inner class BufferedChannelIterator : ChannelIterator<E>, Waiter {
1596         /**
1597          * Stores the element retrieved by [hasNext] or
1598          * a special [CHANNEL_CLOSED] token if this channel is closed.
1599          * If [hasNext] has not been invoked yet, [NO_RECEIVE_RESULT] is stored.
1600          */
1601         private var receiveResult: Any? = NO_RECEIVE_RESULT
1602 
1603         /**
1604          * When [hasNext] suspends, this field stores the corresponding
1605          * continuation. The [tryResumeHasNext] and [tryResumeHasNextOnClosedChannel]
1606          * function resume this continuation when the [hasNext] invocation should complete.
1607          *
1608          * This property is the subject to bening data race:
1609          * It is nulled-out on both completion and cancellation paths that
1610          * could happen concurrently.
1611          */
1612         @BenignDataRace
1613         private var continuation: CancellableContinuationImpl<Boolean>? = null
1614 
1615         // `hasNext()` is just a special receive operation.
1616         override suspend fun hasNext(): Boolean {
1617             return if (this.receiveResult !== NO_RECEIVE_RESULT && this.receiveResult !== CHANNEL_CLOSED) {
1618                 true
1619             } else receiveImpl( // <-- this is an inline function
1620                 // Do not create a continuation until it is required;
1621                 // it is created later via [onNoWaiterSuspend], if needed.
1622                 waiter = null,
1623                 // Store the received element in `receiveResult` on successful
1624                 // retrieval from the buffer or rendezvous with a suspended sender.
1625                 // Also, inform the `BufferedChannel` extensions that
1626                 // the synchronization of this receive operation is completed.
1627                 onElementRetrieved = { element ->
1628                     this.receiveResult = element
1629                     true
1630                 },
1631                 // As no waiter is provided, suspension is impossible.
1632                 onSuspend = { _, _, _ -> error("unreachable") },
1633                 // Return `false` or throw an exception if the channel is already closed.
1634                 onClosed = { onClosedHasNext() },
1635                 // If `hasNext()` decides to suspend, the corresponding
1636                 // `suspend` function that creates a continuation is called.
1637                 // The tail-call optimization is applied here.
1638                 onNoWaiterSuspend = { segm, i, r -> return hasNextOnNoWaiterSuspend(segm, i, r) }
1639             )
1640         }
1641 
1642         private fun onClosedHasNext(): Boolean {
1643             this.receiveResult = CHANNEL_CLOSED
1644             val cause = closeCause ?: return false
1645             throw recoverStackTrace(cause)
1646         }
1647 
1648         private suspend fun hasNextOnNoWaiterSuspend(
1649             /* The working cell is specified by
1650             the segment and the index in it. */
1651             segment: ChannelSegment<E>,
1652             index: Int,
1653             /* The global index of the cell. */
1654             r: Long
1655         ): Boolean = suspendCancellableCoroutineReusable { cont ->
1656             this.continuation = cont
1657             receiveImplOnNoWaiter( // <-- this is an inline function
1658                 segment = segment, index = index, r = r,
1659                 waiter = this, // store this iterator as a waiter
1660                 // In case of successful element retrieval, store
1661                 // it in `receiveResult` and resume the continuation.
1662                 // Importantly, the receiver coroutine may be cancelled
1663                 // after it is successfully resumed but not dispatched yet.
1664                 // In case `onUndeliveredElement` is present, we must
1665                 // invoke it in the latter case.
1666                 onElementRetrieved = { element ->
1667                     this.receiveResult = element
1668                     this.continuation = null
1669                     cont.resume(true, onUndeliveredElement?.bindCancellationFun(element))
1670                 },
1671                 onClosed = { onClosedHasNextNoWaiterSuspend() }
1672             )
1673         }
1674 
1675         override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
1676             this.continuation?.invokeOnCancellation(segment, index)
1677         }
1678 
1679         private fun onClosedHasNextNoWaiterSuspend() {
1680             // Read the current continuation and clean
1681             // the corresponding field to avoid memory leaks.
1682             val cont = this.continuation!!
1683             this.continuation = null
1684             // Update the `hasNext()` internal result.
1685             this.receiveResult = CHANNEL_CLOSED
1686             // If this channel was closed without exception,
1687             // `hasNext()` should return `false`; otherwise,
1688             // it throws the closing exception.
1689             val cause = closeCause
1690             if (cause == null) {
1691                 cont.resume(false)
1692             } else {
1693                 cont.resumeWithException(recoverStackTrace(cause, cont))
1694             }
1695         }
1696 
1697         @Suppress("UNCHECKED_CAST")
1698         override fun next(): E {
1699             // Read the already received result, or [NO_RECEIVE_RESULT] if [hasNext] has not been invoked yet.
1700             val result = receiveResult
1701             check(result !== NO_RECEIVE_RESULT) { "`hasNext()` has not been invoked" }
1702             receiveResult = NO_RECEIVE_RESULT
1703             // Is this channel closed?
1704             if (result === CHANNEL_CLOSED) throw recoverStackTrace(receiveException)
1705             // Return the element.
1706             return result as E
1707         }
1708 
1709         fun tryResumeHasNext(element: E): Boolean {
1710             // Read the current continuation and clean
1711             // the corresponding field to avoid memory leaks.
1712             val cont = this.continuation!!
1713             this.continuation = null
1714             // Store the retrieved element in `receiveResult`.
1715             this.receiveResult = element
1716             // Try to resume this `hasNext()`. Importantly, the receiver coroutine
1717             // may be cancelled after it is successfully resumed but not dispatched yet.
1718             // In case `onUndeliveredElement` is specified, we need to invoke it in the latter case.
1719             return cont.tryResume0(true, onUndeliveredElement?.bindCancellationFun(element))
1720         }
1721 
1722         fun tryResumeHasNextOnClosedChannel() {
1723             /*
1724              * Read the current continuation of the suspended `hasNext()` call and clean the corresponding field to avoid memory leaks.
1725              * While this nulling out is unnecessary, it eliminates memory leaks (through the continuation)
1726              * if the channel iterator accidentally remains GC-reachable after the channel is closed.
1727              */
1728             val cont = this.continuation!!
1729             this.continuation = null
1730             // Update the `hasNext()` internal result and inform
1731             // `BufferedChannel` extensions that synchronization
1732             // of this receive operation is completed.
1733             this.receiveResult = CHANNEL_CLOSED
1734             // If this channel was closed without exception,
1735             // `hasNext()` should return `false`; otherwise,
1736             // it throws the closing exception.
1737             val cause = closeCause
1738             if (cause == null) {
1739                 cont.resume(false)
1740             } else {
1741                 cont.resumeWithException(recoverStackTrace(cause, cont))
1742             }
1743         }
1744     }
1745 
1746     // ##############################
1747     // ## Closing and Cancellation ##
1748     // ##############################
1749 
1750     /**
1751      * Store the cause of closing this channel, either via [close] or [cancel] call.
1752      * The closing cause can be set only once.
1753      */
1754     private val _closeCause = atomic<Any?>(NO_CLOSE_CAUSE)
1755     // Should be called only if this channel is closed or cancelled.
1756     protected val closeCause get() = _closeCause.value as Throwable?
1757 
1758     /** Returns the closing cause if it is non-null, or [ClosedSendChannelException] otherwise. */
1759     protected val sendException get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
1760 
1761     /** Returns the closing cause if it is non-null, or [ClosedReceiveChannelException] otherwise. */
1762     private val receiveException get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1763 
1764     /**
1765       Stores the closed handler installed by [invokeOnClose].
1766       To synchronize [invokeOnClose] and [close], two additional
1767       marker states, [CLOSE_HANDLER_INVOKED] and [CLOSE_HANDLER_CLOSED]
1768       are used. The resulting state diagram is presented below.
1769 
1770       +------+  install handler  +---------+  close(..)  +---------+
1771       | null |------------------>| handler |------------>| INVOKED |
1772       +------+                   +---------+             +---------+
1773          |
1774          | close(..)  +--------+
1775          +----------->| CLOSED |
1776                       +--------+
1777      */
1778     private val closeHandler = atomic<Any?>(null)
1779 
1780     /**
1781      * Invoked when channel is closed as the last action of [close] invocation.
1782      * This method should be idempotent and can be called multiple times.
1783      */
1784     protected open fun onClosedIdempotent() {}
1785 
1786     override fun close(cause: Throwable?): Boolean =
1787         closeOrCancelImpl(cause, cancel = false)
1788 
1789     @Suppress("OVERRIDE_DEPRECATION")
1790     final override fun cancel(cause: Throwable?): Boolean = cancelImpl(cause)
1791 
1792     @Suppress("OVERRIDE_DEPRECATION")
1793     final override fun cancel() { cancelImpl(null) }
1794 
1795     final override fun cancel(cause: CancellationException?) { cancelImpl(cause) }
1796 
1797     internal open fun cancelImpl(cause: Throwable?): Boolean =
1798         closeOrCancelImpl(cause ?: CancellationException("Channel was cancelled"), cancel = true)
1799 
1800     /**
1801      * This is a common implementation for [close] and [cancel]. It first tries
1802      * to install the specified cause; the invocation that successfully installs
1803      * the cause returns `true` as a results of this function, while all further
1804      * [close] and [cancel] calls return `false`.
1805      *
1806      * After the closing/cancellation cause is installed, the channel should be marked
1807      * as closed or cancelled, which bounds further `send(e)`-s to fails.
1808      *
1809      * Then, [completeCloseOrCancel] is called, which cancels waiting `receive()`
1810      * requests ([cancelSuspendedReceiveRequests]) and removes unprocessed elements
1811      * ([removeUnprocessedElements]) in case this channel is cancelled.
1812      *
1813      * Finally, if this [closeOrCancelImpl] has installed the cause, therefore,
1814      * has closed the channel, [closeHandler] and [onClosedIdempotent] should be invoked.
1815      */
1816     protected open fun closeOrCancelImpl(cause: Throwable?, cancel: Boolean): Boolean {
1817         // If this is a `cancel(..)` invocation, set a bit that the cancellation
1818         // has been started. This is crucial for ensuring linearizability,
1819         // when concurrent `close(..)` and `isClosedFor[Send,Receive]` operations
1820         // help this `cancel(..)`.
1821         if (cancel) markCancellationStarted()
1822         // Try to install the specified cause. On success, this invocation will
1823         // return `true` as a result; otherwise, it will complete with `false`.
1824         val closedByThisOperation = _closeCause.compareAndSet(NO_CLOSE_CAUSE, cause)
1825         // Mark this channel as closed or cancelled, depending on this operation type.
1826         if (cancel) markCancelled() else markClosed()
1827         // Complete the closing or cancellation procedure.
1828         completeCloseOrCancel()
1829         // Finally, if this operation has installed the cause,
1830         // it should invoke the close handlers.
1831         return closedByThisOperation.also {
1832             onClosedIdempotent()
1833             if (it) invokeCloseHandler()
1834         }
1835     }
1836 
1837     /**
1838      * Invokes the installed close handler,
1839      * updating the [closeHandler] state correspondingly.
1840      */
1841     private fun invokeCloseHandler() {
1842         val closeHandler = closeHandler.getAndUpdate {
1843             if (it === null) {
1844                 // Inform concurrent `invokeOnClose`
1845                 // that this channel is already closed.
1846                 CLOSE_HANDLER_CLOSED
1847             } else {
1848                 // Replace the handler with a special
1849                 // `INVOKED` marker to avoid memory leaks.
1850                 CLOSE_HANDLER_INVOKED
1851             }
1852         } ?: return // no handler was installed, finish.
1853         // Invoke the handler.
1854         @Suppress("UNCHECKED_CAST")
1855         closeHandler as (cause: Throwable?) -> Unit
1856         closeHandler(closeCause)
1857     }
1858 
1859     override fun invokeOnClose(handler: (cause: Throwable?) -> Unit) {
1860         // Try to install the handler, finishing on success.
1861         if (closeHandler.compareAndSet(null, handler)) {
1862             // Handler has been successfully set, finish the operation.
1863             return
1864         }
1865         // Either another handler is already set, or this channel is closed.
1866         // In the latter case, the current handler should be invoked.
1867         // However, the implementation must ensure that at most one
1868         // handler is called, throwing an `IllegalStateException`
1869         // if another close handler has been invoked.
1870         closeHandler.loop { cur ->
1871             when {
1872                 cur === CLOSE_HANDLER_CLOSED -> {
1873                     // Try to update the state from `CLOSED` to `INVOKED`.
1874                     // This is crucial to guarantee that at most one handler can be called.
1875                     // On success, invoke the handler and finish.
1876                     if (closeHandler.compareAndSet(CLOSE_HANDLER_CLOSED, CLOSE_HANDLER_INVOKED)) {
1877                         handler(closeCause)
1878                         return
1879                     }
1880                 }
1881                 cur === CLOSE_HANDLER_INVOKED -> error("Another handler was already registered and successfully invoked")
1882                 else -> error("Another handler is already registered: $cur")
1883             }
1884         }
1885     }
1886 
1887     /**
1888      * Marks this channel as closed.
1889      * In case [cancelImpl] has already been invoked,
1890      * and this channel is marked with [CLOSE_STATUS_CANCELLATION_STARTED],
1891      * this function marks the channel as cancelled.
1892      *
1893      * All operation that notice this channel in the closed state,
1894      * must help to complete the closing via [completeCloseOrCancel].
1895      */
1896     private fun markClosed(): Unit =
1897         sendersAndCloseStatus.update { cur ->
1898             when (cur.sendersCloseStatus) {
1899                 CLOSE_STATUS_ACTIVE -> // the channel is neither closed nor cancelled
1900                     constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CLOSED)
1901                 CLOSE_STATUS_CANCELLATION_STARTED -> // the channel is going to be cancelled
1902                     constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLED)
1903                 else -> return // the channel is already marked as closed or cancelled.
1904             }
1905         }
1906 
1907     /**
1908      * Marks this channel as cancelled.
1909      *
1910      * All operation that notice this channel in the cancelled state,
1911      * must help to complete the cancellation via [completeCloseOrCancel].
1912      */
1913     private fun markCancelled(): Unit =
1914         sendersAndCloseStatus.update { cur ->
1915             constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLED)
1916         }
1917 
1918     /**
1919      * When the cancellation procedure starts, it is critical
1920      * to mark the closing status correspondingly. Thus, other
1921      * operations, which may help to complete the cancellation,
1922      * always correctly update the status to `CANCELLED`.
1923      */
1924     private fun markCancellationStarted(): Unit =
1925         sendersAndCloseStatus.update { cur ->
1926             if (cur.sendersCloseStatus == CLOSE_STATUS_ACTIVE)
1927                 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLATION_STARTED)
1928             else return // this channel is already closed or cancelled
1929         }
1930 
1931     /**
1932      * Completes the started [close] or [cancel] procedure.
1933      */
1934     private fun completeCloseOrCancel() {
1935         isClosedForSend // must finish the started close/cancel if one is detected.
1936     }
1937 
1938     protected open val isConflatedDropOldest get() = false
1939 
1940     /**
1941      * Completes the channel closing procedure.
1942      */
1943     private fun completeClose(sendersCur: Long): ChannelSegment<E> {
1944         // Close the linked list for further segment addition,
1945         // obtaining the last segment in the data structure.
1946         val lastSegment = closeLinkedList()
1947         // In the conflated channel implementation (with the DROP_OLDEST
1948         // elements conflation strategy), it is critical to mark all empty
1949         // cells as closed to prevent in-progress `send(e)`-s, which have not
1950         // put their elements yet, completions after this channel is closed.
1951         // Otherwise, it is possible for a `send(e)` to put an element when
1952         // the buffer is already full, while a concurrent receiver may extract
1953         // the oldest element. When the channel is not closed, we can linearize
1954         // this `receive()` before the `send(e)`, but after the channel is closed,
1955         // `send(e)` must fails. Marking all unprocessed cells as `CLOSED` solves the issue.
1956         if (isConflatedDropOldest) {
1957             val lastBufferedCellGlobalIndex = markAllEmptyCellsAsClosed(lastSegment)
1958             if (lastBufferedCellGlobalIndex != -1L)
1959                 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(lastBufferedCellGlobalIndex)
1960         }
1961         // Resume waiting `receive()` requests,
1962         // informing them that the channel is closed.
1963         cancelSuspendedReceiveRequests(lastSegment, sendersCur)
1964         // Return the last segment in the linked list as a result
1965         // of this function; we need it in `completeCancel(..)`.
1966         return lastSegment
1967     }
1968 
1969     /**
1970      * Completes the channel cancellation procedure.
1971      */
1972     private fun completeCancel(sendersCur: Long) {
1973         // First, ensure that this channel is closed,
1974         // obtaining the last segment in the linked list.
1975         val lastSegment = completeClose(sendersCur)
1976         // Cancel suspended `send(e)` requests and
1977         // remove buffered elements in the reverse order.
1978         removeUnprocessedElements(lastSegment)
1979     }
1980 
1981     /**
1982      * Closes the underlying linked list of segments for further segment addition.
1983      */
1984     private fun closeLinkedList(): ChannelSegment<E> {
1985         // Choose the last segment.
1986         var lastSegment = bufferEndSegment.value
1987         sendSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
1988         receiveSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
1989         // Close the linked list of segment for new segment addition
1990         // and return the last segment in the linked list.
1991         return lastSegment.close()
1992     }
1993 
1994     /**
1995      * This function marks all empty cells, in the `null` and [IN_BUFFER] state,
1996      * as closed. Notably, it processes the cells from right to left, and finishes
1997      * immediately when the processing cell is already covered by `receive()` or
1998      * contains a buffered elements ([BUFFERED] state).
1999      *
2000      * This function returns the global index of the last buffered element,
2001      * or `-1` if this channel does not contain buffered elements.
2002      */
2003     private fun markAllEmptyCellsAsClosed(lastSegment: ChannelSegment<E>): Long {
2004         // Process the cells in reverse order, from right to left.
2005         var segment = lastSegment
2006         while (true) {
2007             for (index in SEGMENT_SIZE - 1 downTo 0) {
2008                 // Is this cell already covered by `receive()`?
2009                 val globalIndex = segment.id * SEGMENT_SIZE + index
2010                 if (globalIndex < receiversCounter) return -1
2011                 // Process the cell `segment[index]`.
2012                 cell_update@ while (true) {
2013                     val state = segment.getState(index)
2014                     when {
2015                         // The cell is empty.
2016                         state === null || state === IN_BUFFER -> {
2017                             // Inform a possibly upcoming sender that this channel is already closed.
2018                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2019                                 segment.onSlotCleaned()
2020                                 break@cell_update
2021                             }
2022                         }
2023                         // The cell stores a buffered element.
2024                         state === BUFFERED -> return globalIndex
2025                         // Skip this cell if it is not empty and does not store a buffered element.
2026                         else -> break@cell_update
2027                     }
2028                 }
2029             }
2030             // Process the next segment, finishing if the linked list ends.
2031             segment = segment.prev ?: return -1
2032         }
2033     }
2034 
2035     /**
2036      * Cancels suspended `send(e)` requests and removes buffered elements
2037      * starting from the last cell in the specified [lastSegment] (it must
2038      * be the physical tail of the underlying linked list) and updating
2039      * the cells in reverse order.
2040      */
2041     private fun removeUnprocessedElements(lastSegment: ChannelSegment<E>) {
2042         // Read the `onUndeliveredElement` lambda at once. In case it
2043         // throws an exception, this exception is handled and stored in
2044         // the variable below. If multiple exceptions are thrown, the first
2045         // one is stored in the variable, while the others are suppressed.
2046         val onUndeliveredElement = onUndeliveredElement
2047         var undeliveredElementException: UndeliveredElementException? = null // first cancel exception, others suppressed
2048         // To perform synchronization correctly, it is critical to
2049         // process the cells in reverse order, from right to left.
2050         // However, according to the API, suspended senders should
2051         // be cancelled in the order of their suspension. Therefore,
2052         // we need to collect all of them and cancel in the reverse
2053         // order after that.
2054         var suspendedSenders = InlineList<Waiter>()
2055         var segment = lastSegment
2056         process_segments@ while (true) {
2057             for (index in SEGMENT_SIZE - 1 downTo 0) {
2058                 // Process the cell `segment[index]`.
2059                 val globalIndex = segment.id * SEGMENT_SIZE + index
2060                 // Update the cell state.
2061                 update_cell@ while (true) {
2062                     // Read the current state of the cell.
2063                     val state = segment.getState(index)
2064                     when {
2065                         // The cell is already processed by a receiver.
2066                         state === DONE_RCV -> break@process_segments
2067                         // The cell stores a buffered element.
2068                         state === BUFFERED -> {
2069                             // Is the cell already covered by a receiver?
2070                             if (globalIndex < receiversCounter) break@process_segments
2071                             // Update the cell state to `CHANNEL_CLOSED`.
2072                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2073                                 // If `onUndeliveredElement` lambda is non-null, call it.
2074                                 if (onUndeliveredElement != null) {
2075                                     val element = segment.getElement(index)
2076                                     undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(element, undeliveredElementException)
2077                                 }
2078                                 // Clean the element field and inform the segment
2079                                 // that the slot is cleaned to avoid memory leaks.
2080                                 segment.cleanElement(index)
2081                                 segment.onSlotCleaned()
2082                                 break@update_cell
2083                             }
2084                         }
2085                         // The cell is empty.
2086                         state === IN_BUFFER || state === null -> {
2087                             // Update the cell state to `CHANNEL_CLOSED`.
2088                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2089                                 // Inform the segment that the slot is cleaned to avoid memory leaks.
2090                                 segment.onSlotCleaned()
2091                                 break@update_cell
2092                             }
2093                         }
2094                         // The cell stores a suspended waiter.
2095                         state is Waiter || state is WaiterEB -> {
2096                             // Is the cell already covered by a receiver?
2097                             if (globalIndex < receiversCounter) break@process_segments
2098                             // Obtain the sender.
2099                             val sender: Waiter = if (state is WaiterEB) state.waiter
2100                                                  else state as Waiter
2101                             // Update the cell state to `CHANNEL_CLOSED`.
2102                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2103                                 // If `onUndeliveredElement` lambda is non-null, call it.
2104                                 if (onUndeliveredElement != null) {
2105                                     val element = segment.getElement(index)
2106                                     undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(element, undeliveredElementException)
2107                                 }
2108                                 // Save the sender for further cancellation.
2109                                 suspendedSenders += sender
2110                                 // Clean the element field and inform the segment
2111                                 // that the slot is cleaned to avoid memory leaks.
2112                                 segment.cleanElement(index)
2113                                 segment.onSlotCleaned()
2114                                 break@update_cell
2115                             }
2116                         }
2117                         // A concurrent receiver is resuming a suspended sender.
2118                         // As the cell is covered by a receiver, finish immediately.
2119                         state === RESUMING_BY_EB || state === RESUMING_BY_RCV -> break@process_segments
2120                         // A concurrent `expandBuffer()` is resuming a suspended sender.
2121                         // Wait in a spin-loop until the cell state changes.
2122                         state === RESUMING_BY_EB -> continue@update_cell
2123                         else -> break@update_cell
2124                     }
2125                 }
2126             }
2127             // Process the previous segment.
2128             segment = segment.prev ?: break
2129         }
2130         // Cancel suspended senders in their order of addition to this channel.
2131         suspendedSenders.forEachReversed { it.resumeSenderOnCancelledChannel() }
2132         // Throw `UndeliveredElementException` at the end if there was one.
2133         undeliveredElementException?.let { throw it }
2134     }
2135 
2136     /**
2137      * Cancels suspended `receive` requests from the end to the beginning,
2138      * also moving empty cells to the `CHANNEL_CLOSED` state.
2139      */
2140     private fun cancelSuspendedReceiveRequests(lastSegment: ChannelSegment<E>, sendersCounter: Long) {
2141         // To perform synchronization correctly, it is critical to
2142         // extract suspended requests in the reverse order,
2143         // from the end to the beginning.
2144         // However, according to the API, they should be cancelled
2145         // in the order of their suspension. Therefore, we need to
2146         // collect the suspended requests first, cancelling them
2147         // in the reverse order after that.
2148         var suspendedReceivers = InlineList<Waiter>()
2149         var segment: ChannelSegment<E>? = lastSegment
2150         process_segments@ while (segment != null) {
2151             for (index in SEGMENT_SIZE - 1 downTo 0) {
2152                 // Is the cell already covered by a sender? Finish immediately in this case.
2153                 if (segment.id * SEGMENT_SIZE + index < sendersCounter) break@process_segments
2154                 // Try to move the cell state to `CHANNEL_CLOSED`.
2155                 cell_update@ while (true) {
2156                     val state = segment.getState(index)
2157                     when {
2158                         state === null || state === IN_BUFFER -> {
2159                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2160                                 segment.onSlotCleaned()
2161                                 break@cell_update
2162                             }
2163                         }
2164                         state is WaiterEB -> {
2165                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2166                                 suspendedReceivers += state.waiter // save for cancellation.
2167                                 segment.onCancelledRequest(index = index, receiver = true)
2168                                 break@cell_update
2169                             }
2170                         }
2171                         state is Waiter -> {
2172                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2173                                 suspendedReceivers += state // save for cancellation.
2174                                 segment.onCancelledRequest(index = index, receiver = true)
2175                                 break@cell_update
2176                             }
2177                         }
2178                         else -> break@cell_update // nothing to cancel.
2179                     }
2180                 }
2181             }
2182             // Process the previous segment.
2183             segment = segment.prev
2184         }
2185         // Cancel the suspended requests in their order of addition to this channel.
2186         suspendedReceivers.forEachReversed { it.resumeReceiverOnClosedChannel() }
2187     }
2188 
2189     /**
2190      * Resumes this receiver because this channel is closed.
2191      * This function does not take any effect if the operation has already been resumed or cancelled.
2192      */
2193     private fun Waiter.resumeReceiverOnClosedChannel() = resumeWaiterOnClosedChannel(receiver = true)
2194 
2195     /**
2196      * Resumes this sender because this channel is cancelled.
2197      * This function does not take any effect if the operation has already been resumed or cancelled.
2198      */
2199     private fun Waiter.resumeSenderOnCancelledChannel() = resumeWaiterOnClosedChannel(receiver = false)
2200 
2201     private fun Waiter.resumeWaiterOnClosedChannel(receiver: Boolean) {
2202         when (this) {
2203             is SendBroadcast -> cont.resume(false)
2204             is CancellableContinuation<*> -> resumeWithException(if (receiver) receiveException else sendException)
2205             is ReceiveCatching<*> -> cont.resume(closed(closeCause))
2206             is BufferedChannel<*>.BufferedChannelIterator -> tryResumeHasNextOnClosedChannel()
2207             is SelectInstance<*> -> trySelect(this@BufferedChannel, CHANNEL_CLOSED)
2208             else -> error("Unexpected waiter: $this")
2209         }
2210     }
2211 
2212     @ExperimentalCoroutinesApi
2213     override val isClosedForSend: Boolean
2214         get() = sendersAndCloseStatus.value.isClosedForSend0
2215 
2216     private val Long.isClosedForSend0 get() =
2217         isClosed(this, isClosedForReceive = false)
2218 
2219     @ExperimentalCoroutinesApi
2220     override val isClosedForReceive: Boolean
2221         get() = sendersAndCloseStatus.value.isClosedForReceive0
2222 
2223     private val Long.isClosedForReceive0 get() =
2224         isClosed(this, isClosedForReceive = true)
2225 
2226     private fun isClosed(
2227         sendersAndCloseStatusCur: Long,
2228         isClosedForReceive: Boolean
2229     ) = when (sendersAndCloseStatusCur.sendersCloseStatus) {
2230         // This channel is active and has not been closed.
2231         CLOSE_STATUS_ACTIVE -> false
2232         // The cancellation procedure has been started but
2233         // not linearized yet, so this channel should be
2234         // considered as active.
2235         CLOSE_STATUS_CANCELLATION_STARTED -> false
2236         // This channel has been successfully closed.
2237         // Help to complete the closing procedure to
2238         // guarantee linearizability, and return `true`
2239         // for senders or the flag whether there still
2240         // exist elements to retrieve for receivers.
2241         CLOSE_STATUS_CLOSED -> {
2242             completeClose(sendersAndCloseStatusCur.sendersCounter)
2243             // When `isClosedForReceive` is `false`, always return `true`.
2244             // Otherwise, it is possible that the channel is closed but
2245             // still has elements to retrieve.
2246             if (isClosedForReceive) !hasElements() else true
2247         }
2248         // This channel has been successfully cancelled.
2249         // Help to complete the cancellation procedure to
2250         // guarantee linearizability and return `true`.
2251         CLOSE_STATUS_CANCELLED -> {
2252             completeCancel(sendersAndCloseStatusCur.sendersCounter)
2253             true
2254         }
2255         else -> error("unexpected close status: ${sendersAndCloseStatusCur.sendersCloseStatus}")
2256     }
2257 
2258     @ExperimentalCoroutinesApi
2259     override val isEmpty: Boolean get() {
2260         // This function should return `false` if
2261         // this channel is closed for `receive`.
2262         if (isClosedForReceive) return false
2263         // Does this channel has elements to retrieve?
2264         if (hasElements()) return false
2265         // This channel does not have elements to retrieve;
2266         // Check that it is still not closed for `receive`.
2267         return !isClosedForReceive
2268     }
2269 
2270     /**
2271      * Checks whether this channel contains elements to retrieve.
2272      * Unfortunately, simply comparing the counters is insufficient,
2273      * as some cells can be in the `INTERRUPTED` state due to cancellation.
2274      * This function tries to find the first "alive" element,
2275      * updating the `receivers` counter to skip empty cells.
2276      *
2277      * The implementation is similar to `receive()`.
2278      */
2279     internal fun hasElements(): Boolean {
2280         while (true) {
2281             // Read the segment before obtaining the `receivers` counter value.
2282             var segment = receiveSegment.value
2283             // Obtains the `receivers` and `senders` counter values.
2284             val r = receiversCounter
2285             val s = sendersCounter
2286             // Is there a chance that this channel has elements?
2287             if (s <= r) return false // no elements
2288             // The `r`-th cell is covered by a sender; check whether it contains an element.
2289             // First, try to find the required segment if the initially
2290             // obtained segment (in the beginning of this function) has lower id.
2291             val id = r / SEGMENT_SIZE
2292             if (segment.id != id) {
2293                 // Try to find the required segment.
2294                 segment = findSegmentReceive(id, segment) ?:
2295                     // The required segment has not been found. Either it has already
2296                     // been removed, or the underlying linked list is already closed
2297                     // for segment additions. In the latter case, the channel is closed
2298                     // and does not contain elements, so this operation returns `false`.
2299                     // Otherwise, if the required segment is removed, the operation restarts.
2300                     if (receiveSegment.value.id < id) return false else continue
2301             }
2302             segment.cleanPrev() // all the previous segments are no longer needed.
2303             // Does the `r`-th cell contain waiting sender or buffered element?
2304             val i = (r % SEGMENT_SIZE).toInt()
2305             if (isCellNonEmpty(segment, i, r)) return true
2306             // The cell is empty. Update `receivers` counter and try again.
2307             receivers.compareAndSet(r, r + 1) // if this CAS fails, the counter has already been updated.
2308         }
2309     }
2310 
2311     /**
2312      * Checks whether this cell contains a buffered element or a waiting sender,
2313      * returning `true` in this case. Otherwise, if this cell is empty
2314      * (due to waiter cancellation, cell poisoning, or channel closing),
2315      * this function returns `false`.
2316      *
2317      * Notably, this function must be called only if the cell is covered by a sender.
2318      */
2319     private fun isCellNonEmpty(
2320         segment: ChannelSegment<E>,
2321         index: Int,
2322         globalIndex: Long
2323     ): Boolean {
2324         // The logic is similar to `updateCellReceive` with the only difference
2325         // that this function neither changes the cell state nor retrieves the element.
2326         while (true) {
2327             // Read the current cell state.
2328             val state = segment.getState(index)
2329             when {
2330                 // The cell is empty but a sender is coming.
2331                 state === null || state === IN_BUFFER -> {
2332                     // Poison the cell to ensure correctness.
2333                     if (segment.casState(index, state, POISONED)) {
2334                         // When the cell becomes poisoned, it is essentially
2335                         // the same as storing an already cancelled receiver.
2336                         // Thus, the `expandBuffer()` procedure should be invoked.
2337                         expandBuffer()
2338                         return false
2339                     }
2340                 }
2341                 // The cell stores a buffered element.
2342                 state === BUFFERED -> return true
2343                 // The cell stores an interrupted sender.
2344                 state === INTERRUPTED_SEND -> return false
2345                 // This channel is already closed.
2346                 state === CHANNEL_CLOSED -> return false
2347                 // The cell is already processed
2348                 // by a concurrent receiver.
2349                 state === DONE_RCV -> return false
2350                 // The cell is already poisoned
2351                 // by a concurrent receiver.
2352                 state === POISONED -> return false
2353                 // A concurrent `expandBuffer()` is resuming
2354                 // a suspended sender. This function is eligible
2355                 // to linearize before the buffer expansion procedure.
2356                 state === RESUMING_BY_EB -> return true
2357                 // A concurrent receiver is resuming
2358                 // a suspended sender. The element
2359                 // is no longer available for retrieval.
2360                 state === RESUMING_BY_RCV -> return false
2361                 // The cell stores a suspended request.
2362                 // However, it is possible that this request
2363                 // is receiver if the cell is covered by both
2364                 // send and receive operations.
2365                 // In case the cell is already covered by
2366                 // a receiver, the element is no longer
2367                 // available for retrieval, and this function
2368                 // return `false`. Otherwise, it is guaranteed
2369                 // that the suspended request is sender, so
2370                 // this function returns `true`.
2371                 else -> return globalIndex == receiversCounter
2372             }
2373         }
2374     }
2375 
2376     // #######################
2377     // # Segments Management #
2378     // #######################
2379 
2380     /**
2381      * Finds the segment with the specified [id] starting by the [startFrom]
2382      * segment and following the [ChannelSegment.next] references. In case
2383      * the required segment has not been created yet, this function attempts
2384      * to add it to the underlying linked list. Finally, it updates [sendSegment]
2385      * to the found segment if its [ChannelSegment.id] is greater than the one
2386      * of the already stored segment.
2387      *
2388      * In case the requested segment is already removed, or if it should be allocated
2389      * but the linked list structure is closed for new segments addition, this function
2390      * returns `null`. The implementation also efficiently skips a sequence of removed
2391      * segments, updating the counter value in [sendersAndCloseStatus] correspondingly.
2392      */
2393     private fun findSegmentSend(id: Long, startFrom: ChannelSegment<E>): ChannelSegment<E>? {
2394         return sendSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2395             if (it.isClosed) {
2396                 // The required segment has not been found and new segments
2397                 // cannot be added, as the linked listed in already added.
2398                 // This channel is already closed or cancelled; help to complete
2399                 // the closing or cancellation procedure.
2400                 completeCloseOrCancel()
2401                 // Clean the `prev` reference of the provided segment
2402                 // if all the previous cells are already covered by senders.
2403                 // It is important to clean the `prev` reference only in
2404                 // this case, as the closing/cancellation procedure may
2405                 // need correct value to traverse the linked list from right to left.
2406                 if (startFrom.id * SEGMENT_SIZE <  receiversCounter) startFrom.cleanPrev()
2407                 // As the required segment is not found and cannot be allocated, return `null`.
2408                 null
2409             } else {
2410                 // Get the found segment.
2411                 val segment = it.segment
2412                 // Is the required segment removed?
2413                 if (segment.id > id) {
2414                     // The required segment has been removed; `segment` is the first
2415                     // segment with `id` not lower than the required one.
2416                     // Skip the sequence of removed cells in O(1).
2417                     updateSendersCounterIfLower(segment.id * SEGMENT_SIZE)
2418                     // Clean the `prev` reference of the provided segment
2419                     // if all the previous cells are already covered by senders.
2420                     // It is important to clean the `prev` reference only in
2421                     // this case, as the closing/cancellation procedure may
2422                     // need correct value to traverse the linked list from right to left.
2423                     if (segment.id * SEGMENT_SIZE <  receiversCounter) segment.cleanPrev()
2424                     // As the required segment is not found and cannot be allocated, return `null`.
2425                     null
2426                 } else {
2427                     assert { segment.id == id }
2428                     // The required segment has been found; return it!
2429                     segment
2430                 }
2431             }
2432         }
2433     }
2434 
2435     /**
2436      * Finds the segment with the specified [id] starting by the [startFrom]
2437      * segment and following the [ChannelSegment.next] references. In case
2438      * the required segment has not been created yet, this function attempts
2439      * to add it to the underlying linked list. Finally, it updates [receiveSegment]
2440      * to the found segment if its [ChannelSegment.id] is greater than the one
2441      * of the already stored segment.
2442      *
2443      * In case the requested segment is already removed, or if it should be allocated
2444      * but the linked list structure is closed for new segments addition, this function
2445      * returns `null`. The implementation also efficiently skips a sequence of removed
2446      * segments, updating the [receivers] counter correspondingly.
2447      */
2448     private fun findSegmentReceive(id: Long, startFrom: ChannelSegment<E>): ChannelSegment<E>? =
2449         receiveSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2450             if (it.isClosed) {
2451                 // The required segment has not been found and new segments
2452                 // cannot be added, as the linked listed in already added.
2453                 // This channel is already closed or cancelled; help to complete
2454                 // the closing or cancellation procedure.
2455                 completeCloseOrCancel()
2456                 // Clean the `prev` reference of the provided segment
2457                 // if all the previous cells are already covered by senders.
2458                 // It is important to clean the `prev` reference only in
2459                 // this case, as the closing/cancellation procedure may
2460                 // need correct value to traverse the linked list from right to left.
2461                 if (startFrom.id * SEGMENT_SIZE < sendersCounter) startFrom.cleanPrev()
2462                 // As the required segment is not found and cannot be allocated, return `null`.
2463                 null
2464             } else {
2465                 // Get the found segment.
2466                 val segment = it.segment
2467                 // Advance the `bufferEnd` segment if required.
2468                 if (!isRendezvousOrUnlimited && id <= bufferEndCounter / SEGMENT_SIZE) {
2469                     bufferEndSegment.moveForward(segment)
2470                 }
2471                 // Is the required segment removed?
2472                 if (segment.id > id) {
2473                     // The required segment has been removed; `segment` is the first
2474                     // segment with `id` not lower than the required one.
2475                     // Skip the sequence of removed cells in O(1).
2476                     updateReceiversCounterIfLower(segment.id * SEGMENT_SIZE)
2477                     // Clean the `prev` reference of the provided segment
2478                     // if all the previous cells are already covered by senders.
2479                     // It is important to clean the `prev` reference only in
2480                     // this case, as the closing/cancellation procedure may
2481                     // need correct value to traverse the linked list from right to left.
2482                     if (segment.id * SEGMENT_SIZE < sendersCounter) segment.cleanPrev()
2483                     // As the required segment is already removed, return `null`.
2484                     null
2485                 } else {
2486                     assert { segment.id == id }
2487                     // The required segment has been found; return it!
2488                     segment
2489                 }
2490             }
2491         }
2492 
2493     /**
2494      * Importantly, when this function does not find the requested segment,
2495      * it always updates the number of completed `expandBuffer()` attempts.
2496      */
2497     private fun findSegmentBufferEnd(id: Long, startFrom: ChannelSegment<E>, currentBufferEndCounter: Long): ChannelSegment<E>? =
2498         bufferEndSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2499             if (it.isClosed) {
2500                 // The required segment has not been found and new segments
2501                 // cannot be added, as the linked listed in already added.
2502                 // This channel is already closed or cancelled; help to complete
2503                 // the closing or cancellation procedure.
2504                 completeCloseOrCancel()
2505                 // Update `bufferEndSegment` to the last segment
2506                 // in the linked list to avoid memory leaks.
2507                 moveSegmentBufferEndToSpecifiedOrLast(id, startFrom)
2508                 // When this function does not find the requested segment,
2509                 // it should update the number of completed `expandBuffer()` attempts.
2510                 incCompletedExpandBufferAttempts()
2511                 null
2512             } else {
2513                 // Get the found segment.
2514                 val segment = it.segment
2515                 // Is the required segment removed?
2516                 if (segment.id > id) {
2517                     // The required segment has been removed; `segment` is the first segment
2518                     // with `id` not lower than the required one.
2519                     // Try to skip the sequence of removed cells in O(1) by increasing the `bufferEnd` counter.
2520                     // Importantly, when this function does not find the requested segment,
2521                     // it should update the number of completed `expandBuffer()` attempts.
2522                     if (bufferEnd.compareAndSet(currentBufferEndCounter + 1, segment.id * SEGMENT_SIZE)) {
2523                         incCompletedExpandBufferAttempts(segment.id * SEGMENT_SIZE - currentBufferEndCounter)
2524                     } else {
2525                         incCompletedExpandBufferAttempts()
2526                     }
2527                     // As the required segment is already removed, return `null`.
2528                     null
2529                 } else {
2530                     assert { segment.id == id }
2531                     // The required segment has been found; return it!
2532                     segment
2533                 }
2534             }
2535         }
2536 
2537     /**
2538      * Updates [bufferEndSegment] to the one with the specified [id] or
2539      * to the last existing segment, if the required segment is not yet created.
2540      *
2541      * Unlike [findSegmentBufferEnd], this function does not allocate new segments.
2542      */
2543     private fun moveSegmentBufferEndToSpecifiedOrLast(id: Long, startFrom: ChannelSegment<E>) {
2544         // Start searching the required segment from the specified one.
2545         var segment: ChannelSegment<E> = startFrom
2546         while (segment.id < id) {
2547             segment = segment.next ?: break
2548         }
2549         // Skip all removed segments and try to update `bufferEndSegment`
2550         // to the first non-removed one. This part should succeed eventually,
2551         // as the tail segment is never removed.
2552         while (true) {
2553             while (segment.isRemoved) {
2554                 segment = segment.next ?: break
2555             }
2556             // Try to update `bufferEndSegment`. On failure,
2557             // the found segment is already removed, so it
2558             // should be skipped.
2559             if (bufferEndSegment.moveForward(segment)) return
2560         }
2561     }
2562 
2563     /**
2564      * Updates the `senders` counter if its value
2565      * is lower that the specified one.
2566      *
2567      * Senders use this function to efficiently skip
2568      * a sequence of cancelled receivers.
2569      */
2570     private fun updateSendersCounterIfLower(value: Long): Unit =
2571         sendersAndCloseStatus.loop { cur ->
2572             val curCounter = cur.sendersCounter
2573             if (curCounter >= value) return
2574             val update = constructSendersAndCloseStatus(curCounter, cur.sendersCloseStatus)
2575             if (sendersAndCloseStatus.compareAndSet(cur, update)) return
2576         }
2577 
2578     /**
2579      * Updates the `receivers` counter if its value
2580      * is lower that the specified one.
2581      *
2582      * Receivers use this function to efficiently skip
2583      * a sequence of cancelled senders.
2584      */
2585     private fun updateReceiversCounterIfLower(value: Long): Unit =
2586         receivers.loop { cur ->
2587             if (cur >= value) return
2588             if (receivers.compareAndSet(cur, value)) return
2589         }
2590 
2591     // ###################
2592     // # Debug Functions #
2593     // ###################
2594 
2595     @Suppress("ConvertTwoComparisonsToRangeCheck")
2596     override fun toString(): String {
2597         val sb = StringBuilder()
2598         // Append the close status
2599         when (sendersAndCloseStatus.value.sendersCloseStatus) {
2600             CLOSE_STATUS_CLOSED -> sb.append("closed,")
2601             CLOSE_STATUS_CANCELLED -> sb.append("cancelled,")
2602         }
2603         // Append the buffer capacity
2604         sb.append("capacity=$capacity,")
2605         // Append the data
2606         sb.append("data=[")
2607         val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2608             .filter { it !== NULL_SEGMENT }
2609             .minBy { it.id }
2610         val r = receiversCounter
2611         val s = sendersCounter
2612         var segment = firstSegment
2613         append_elements@ while (true) {
2614             process_cell@ for (i in 0 until SEGMENT_SIZE) {
2615                 val globalCellIndex = segment.id * SEGMENT_SIZE + i
2616                 if (globalCellIndex >= s && globalCellIndex >= r) break@append_elements
2617                 val cellState = segment.getState(i)
2618                 val element = segment.getElement(i)
2619                 val cellStateString = when (cellState) {
2620                     is CancellableContinuation<*> -> {
2621                         when {
2622                             globalCellIndex < r && globalCellIndex >= s -> "receive"
2623                             globalCellIndex < s && globalCellIndex >= r -> "send"
2624                             else -> "cont"
2625                         }
2626                     }
2627                     is SelectInstance<*> -> {
2628                         when {
2629                             globalCellIndex < r && globalCellIndex >= s -> "onReceive"
2630                             globalCellIndex < s && globalCellIndex >= r -> "onSend"
2631                             else -> "select"
2632                         }
2633                     }
2634                     is ReceiveCatching<*> -> "receiveCatching"
2635                     is SendBroadcast -> "sendBroadcast"
2636                     is WaiterEB -> "EB($cellState)"
2637                     RESUMING_BY_RCV, RESUMING_BY_EB -> "resuming_sender"
2638                     null, IN_BUFFER, DONE_RCV, POISONED, INTERRUPTED_RCV, INTERRUPTED_SEND, CHANNEL_CLOSED -> continue@process_cell
2639                     else -> cellState.toString() // leave it just in case something is missed.
2640                 }
2641                 if (element != null) {
2642                     sb.append("($cellStateString,$element),")
2643                 } else {
2644                     sb.append("$cellStateString,")
2645                 }
2646             }
2647             // Process the next segment if exists.
2648             segment = segment.next ?: break
2649         }
2650         if (sb.last() == ',') sb.deleteAt(sb.length - 1)
2651         sb.append("]")
2652         // The string representation is constructed.
2653         return sb.toString()
2654     }
2655 
2656     // Returns a debug representation of this channel,
2657     // which is actively used in Lincheck tests.
2658     internal fun toStringDebug(): String {
2659         val sb = StringBuilder()
2660         // Append the counter values and the close status
2661         sb.append("S=${sendersCounter},R=${receiversCounter},B=${bufferEndCounter},B'=${completedExpandBuffersAndPauseFlag.value},C=${sendersAndCloseStatus.value.sendersCloseStatus},")
2662         when (sendersAndCloseStatus.value.sendersCloseStatus) {
2663             CLOSE_STATUS_CANCELLATION_STARTED -> sb.append("CANCELLATION_STARTED,")
2664             CLOSE_STATUS_CLOSED -> sb.append("CLOSED,")
2665             CLOSE_STATUS_CANCELLED -> sb.append("CANCELLED,")
2666         }
2667         // Append the segment references
2668         sb.append("SEND_SEGM=${sendSegment.value.hexAddress},RCV_SEGM=${receiveSegment.value.hexAddress}")
2669         if (!isRendezvousOrUnlimited) sb.append(",EB_SEGM=${bufferEndSegment.value.hexAddress}")
2670         sb.append("  ") // add some space
2671         // Append the linked list of segments.
2672         val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2673             .filter { it !== NULL_SEGMENT }
2674             .minBy { it.id }
2675         var segment = firstSegment
2676         while (true) {
2677             sb.append("${segment.hexAddress}=[${if (segment.isRemoved) "*" else ""}${segment.id},prev=${segment.prev?.hexAddress},")
2678             repeat(SEGMENT_SIZE) { i ->
2679                 val cellState = segment.getState(i)
2680                 val element = segment.getElement(i)
2681                 val cellStateString = when (cellState) {
2682                     is CancellableContinuation<*> -> "cont"
2683                     is SelectInstance<*> -> "select"
2684                     is ReceiveCatching<*> -> "receiveCatching"
2685                     is SendBroadcast -> "send(broadcast)"
2686                     is WaiterEB -> "EB($cellState)"
2687                     else -> cellState.toString()
2688                 }
2689                 sb.append("[$i]=($cellStateString,$element),")
2690             }
2691             sb.append("next=${segment.next?.hexAddress}]  ")
2692             // Process the next segment if exists.
2693             segment = segment.next ?: break
2694         }
2695         // The string representation of this channel is now constructed!
2696         return sb.toString()
2697     }
2698 
2699 
2700     // This is an internal methods for tests.
2701     fun checkSegmentStructureInvariants() {
2702         if (isRendezvousOrUnlimited) {
2703             check(bufferEndSegment.value === NULL_SEGMENT) {
2704                 "bufferEndSegment must be NULL_SEGMENT for rendezvous and unlimited channels; they do not manipulate it.\n" +
2705                     "Channel state: $this"
2706             }
2707         } else {
2708             check(receiveSegment.value.id <= bufferEndSegment.value.id) {
2709                 "bufferEndSegment should not have lower id than receiveSegment.\n" +
2710                     "Channel state: $this"
2711             }
2712         }
2713         val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2714             .filter { it !== NULL_SEGMENT }
2715             .minBy { it.id }
2716         check(firstSegment.prev == null) {
2717             "All processed segments should be unreachable from the data structure, but the `prev` link of the leftmost segment is non-null.\n" +
2718                 "Channel state: $this"
2719         }
2720         // Check that the doubly-linked list of segments does not
2721         // contain full-of-cancelled-cells segments.
2722         var segment = firstSegment
2723         while (segment.next != null) {
2724             // Note that the `prev` reference can be `null` if this channel is closed.
2725             check(segment.next!!.prev == null || segment.next!!.prev === segment) {
2726                 "The `segment.next.prev === segment` invariant is violated.\n" +
2727                     "Channel state: $this"
2728             }
2729             // Count the number of closed/interrupted cells
2730             // and check that all cells are in expected states.
2731             var interruptedOrClosedCells = 0
2732             for (i in 0 until SEGMENT_SIZE) {
2733                 when (val state = segment.getState(i)) {
2734                     BUFFERED -> {} // The cell stores a buffered element.
2735                     is Waiter -> {} // The cell stores a suspended request.
2736                     INTERRUPTED_RCV, INTERRUPTED_SEND, CHANNEL_CLOSED -> {
2737                         // The cell stored an interrupted request or indicates
2738                         // that this channel is already closed.
2739                         // Check that the element slot is cleaned and increment
2740                         // the number of cells in closed/interrupted state.
2741                         check(segment.getElement(i) == null)
2742                         interruptedOrClosedCells++
2743                     }
2744                     POISONED, DONE_RCV -> {
2745                         // The cell is successfully processed or poisoned.
2746                         // Check that the element slot is cleaned.
2747                         check(segment.getElement(i) == null)
2748                     }
2749                     // Other states are illegal after all running operations finish.
2750                     else -> error("Unexpected segment cell state: $state.\nChannel state: $this")
2751                 }
2752             }
2753             // Is this segment full of cancelled/closed cells?
2754             // If so, this segment should be removed from the
2755             // linked list if nether `receiveSegment`, nor
2756             // `sendSegment`, nor `bufferEndSegment` reference it.
2757             if (interruptedOrClosedCells == SEGMENT_SIZE) {
2758                 check(segment === receiveSegment.value || segment === sendSegment.value || segment === bufferEndSegment.value) {
2759                     "Logically removed segment is reachable.\nChannel state: $this"
2760                 }
2761             }
2762             // Process the next segment.
2763             segment = segment.next!!
2764         }
2765     }
2766 
2767     private fun OnUndeliveredElement<E>.bindCancellationFunResult() = ::onCancellationChannelResultImplDoNotCall
2768 
2769     /**
2770      * Do not call directly. Go through [bindCancellationFunResult] to ensure the callback isn't null.
2771      * [bindCancellationFunResult] could have just returned a lambda as well, but there would be a risk of that
2772      * lambda capturing the environment.
2773      */
2774     private fun onCancellationChannelResultImplDoNotCall(
2775         cause: Throwable, element: ChannelResult<E>, context: CoroutineContext
2776     ) {
2777         onUndeliveredElement!!.callUndeliveredElement(element.getOrNull()!!, context)
2778     }
2779 
2780     private fun OnUndeliveredElement<E>.bindCancellationFun(element: E):
2781             (Throwable, Any?, CoroutineContext) -> Unit =
2782         { _: Throwable, _, context: CoroutineContext -> callUndeliveredElement(element, context) }
2783 
2784     private fun OnUndeliveredElement<E>.bindCancellationFun() = ::onCancellationImplDoNotCall
2785 
2786     /**
2787      * Do not call directly. Go through [bindCancellationFun] to ensure the callback isn't null.
2788      * [bindCancellationFun] could have just returned a lambda as well, but there would be a risk of that
2789      * lambda capturing the environment.
2790      */
2791     private fun onCancellationImplDoNotCall(cause: Throwable, element: E, context: CoroutineContext) {
2792         onUndeliveredElement!!.callUndeliveredElement(element, context)
2793     }
2794 }
2795 
2796 /**
2797  * The channel is represented as a list of segments, which simulates an infinite array.
2798  * Each segment has its own [id], which increase from the beginning. These [id]s help
2799  * to update [BufferedChannel.sendSegment], [BufferedChannel.receiveSegment],
2800  * and [BufferedChannel.bufferEndSegment] correctly.
2801  */
2802 internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: BufferedChannel<E>?, pointers: Int) : Segment<ChannelSegment<E>>(id, prev, pointers) {
2803     private val _channel: BufferedChannel<E>? = channel
2804     val channel get() = _channel!! // always non-null except for `NULL_SEGMENT`
2805 
2806     private val data = atomicArrayOfNulls<Any?>(SEGMENT_SIZE * 2) // 2 registers per slot: state + element
2807     override val numberOfSlots: Int get() = SEGMENT_SIZE
2808 
2809     // ########################################
2810     // # Manipulation with the Element Fields #
2811     // ########################################
2812 
storeElementnull2813     internal fun storeElement(index: Int, element: E) {
2814         setElementLazy(index, element)
2815     }
2816 
2817     @Suppress("UNCHECKED_CAST")
getElementnull2818     internal fun getElement(index: Int) = data[index * 2].value as E
2819 
2820     internal fun retrieveElement(index: Int): E = getElement(index).also { cleanElement(index) }
2821 
cleanElementnull2822     internal fun cleanElement(index: Int) {
2823         setElementLazy(index, null)
2824     }
2825 
setElementLazynull2826     private fun setElementLazy(index: Int, value: Any?) {
2827         data[index * 2].lazySet(value)
2828     }
2829 
2830     // ######################################
2831     // # Manipulation with the State Fields #
2832     // ######################################
2833 
getStatenull2834     internal fun getState(index: Int): Any? = data[index * 2 + 1].value
2835 
2836     internal fun setState(index: Int, value: Any?) {
2837         data[index * 2 + 1].value = value
2838     }
2839 
casStatenull2840     internal fun casState(index: Int, from: Any?, to: Any?) = data[index * 2 + 1].compareAndSet(from, to)
2841 
2842     internal fun getAndSetState(index: Int, update: Any?) = data[index * 2 + 1].getAndSet(update)
2843 
2844 
2845     // ########################
2846     // # Cancellation Support #
2847     // ########################
2848 
2849     override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
2850         // To distinguish cancelled senders and receivers, senders equip the index value with
2851         // an additional marker, adding `SEGMENT_SIZE` to the value.
2852         val isSender = index >= SEGMENT_SIZE
2853         // Unwrap the index.
2854         @Suppress("NAME_SHADOWING") val index = if (isSender) index - SEGMENT_SIZE else index
2855         // Read the element, which may be needed further to call `onUndeliveredElement`.
2856         val element = getElement(index)
2857         // Update the cell state.
2858         while (true) {
2859             // CAS-loop
2860             // Read the current state of the cell.
2861             val cur = getState(index)
2862             when {
2863                 // The cell stores a waiter.
2864                 cur is Waiter || cur is WaiterEB -> {
2865                     // The cancelled request is either send or receive.
2866                     // Update the cell state correspondingly.
2867                     val update = if (isSender) INTERRUPTED_SEND else INTERRUPTED_RCV
2868                     if (casState(index, cur, update)) {
2869                         // The waiter has been successfully cancelled.
2870                         // Clean the element slot and invoke `onSlotCleaned()`,
2871                         // which may cause deleting the whole segment from the linked list.
2872                         // In case the cancelled request is receiver, it is critical to ensure
2873                         // that the `expandBuffer()` attempt that processes this cell is completed,
2874                         // so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`.
2875                         cleanElement(index)
2876                         onCancelledRequest(index, !isSender)
2877                         // Call `onUndeliveredElement` if needed.
2878                         if (isSender) {
2879                             channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2880                         }
2881                         return
2882                     }
2883                 }
2884                 // The cell already indicates that the operation is cancelled.
2885                 cur === INTERRUPTED_SEND || cur === INTERRUPTED_RCV -> {
2886                     // Clean the element slot to avoid memory leaks,
2887                     // invoke `onUndeliveredElement` if needed, and finish
2888                     cleanElement(index)
2889                     // Call `onUndeliveredElement` if needed.
2890                     if (isSender) {
2891                         channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2892                     }
2893                     return
2894                 }
2895                 // An opposite operation is resuming this request;
2896                 // wait until the cell state updates.
2897                 // It is possible that an opposite operation has already
2898                 // resumed this request, which will result in updating
2899                 // the cell state to `DONE_RCV` or `BUFFERED`, while the
2900                 // current cancellation is caused by prompt cancellation.
2901                 cur === RESUMING_BY_EB || cur === RESUMING_BY_RCV -> continue
2902                 // This request was successfully resumed, so this cancellation
2903                 // is caused by the prompt cancellation feature and should be ignored.
2904                 cur === DONE_RCV || cur === BUFFERED -> return
2905                 // The cell state indicates that the channel is closed;
2906                 // this cancellation should be ignored.
2907                 cur === CHANNEL_CLOSED -> return
2908                 else -> error("unexpected state: $cur")
2909             }
2910         }
2911     }
2912 
2913     /**
2914      * Invokes `onSlotCleaned()` preceded by a `waitExpandBufferCompletion(..)` call
2915      * in case the cancelled request is receiver.
2916      */
onCancelledRequestnull2917     fun onCancelledRequest(index: Int, receiver: Boolean) {
2918         if (receiver) channel.waitExpandBufferCompletion(id * SEGMENT_SIZE + index)
2919         onSlotCleaned()
2920     }
2921 }
2922 
2923 // WA for atomicfu + JVM_IR compiler bug that lead to SMAP-related compiler crashes: KT-55983
createSegmentFunctionnull2924 internal fun <E> createSegmentFunction(): KFunction2<Long, ChannelSegment<E>, ChannelSegment<E>> = ::createSegment
2925 
2926 private fun <E> createSegment(id: Long, prev: ChannelSegment<E>) = ChannelSegment(
2927     id = id,
2928     prev = prev,
2929     channel = prev.channel,
2930     pointers = 0
2931 )
2932 private val NULL_SEGMENT = ChannelSegment<Any?>(id = -1, prev = null, channel = null, pointers = 0)
2933 
2934 /**
2935  * Number of cells in each segment.
2936  */
2937 @JvmField
2938 internal val SEGMENT_SIZE = systemProp("kotlinx.coroutines.bufferedChannel.segmentSize", 32)
2939 
2940 /**
2941  * Number of iterations to wait in [BufferedChannel.waitExpandBufferCompletion] until the numbers of started and completed
2942  * [BufferedChannel.expandBuffer] calls coincide. When the limit is reached, [BufferedChannel.waitExpandBufferCompletion]
2943  * blocks further [BufferedChannel.expandBuffer]-s to avoid starvation.
2944  */
2945 private val EXPAND_BUFFER_COMPLETION_WAIT_ITERATIONS = systemProp("kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations", 10_000)
2946 
2947 /**
2948  * Tries to resume this continuation with the specified
2949  * value. Returns `true` on success and `false` on failure.
2950  */
2951 private fun <T> CancellableContinuation<T>.tryResume0(
2952     value: T,
2953     onCancellation: ((cause: Throwable, value: T, context: CoroutineContext) -> Unit)? = null
2954 ): Boolean =
2955     tryResume(value, null, onCancellation).let { token ->
2956         if (token != null) {
2957             completeResume(token)
2958             true
2959         } else false
2960     }
2961 
2962 /*
2963   If the channel is rendezvous or unlimited, the `bufferEnd` counter
2964   should be initialized with the corresponding value below and never change.
2965   In this case, the `expandBuffer(..)` operation does nothing.
2966  */
2967 private const val BUFFER_END_RENDEZVOUS = 0L // no buffer
2968 private const val BUFFER_END_UNLIMITED = Long.MAX_VALUE // infinite buffer
initialBufferEndnull2969 private fun initialBufferEnd(capacity: Int): Long = when (capacity) {
2970     Channel.RENDEZVOUS -> BUFFER_END_RENDEZVOUS
2971     Channel.UNLIMITED -> BUFFER_END_UNLIMITED
2972     else -> capacity.toLong()
2973 }
2974 
2975 /*
2976   Cell states. The initial "empty" state is represented with `null`,
2977   and suspended operations are represented with [Waiter] instances.
2978  */
2979 
2980 // The cell stores a buffered element.
2981 @JvmField
2982 internal val BUFFERED = Symbol("BUFFERED")
2983 // Concurrent `expandBuffer(..)` can inform the
2984 // upcoming sender that it should buffer the element.
2985 private val IN_BUFFER = Symbol("SHOULD_BUFFER")
2986 // Indicates that a receiver (RCV suffix) is resuming
2987 // the suspended sender; after that, it should update
2988 // the state to either `DONE_RCV` (on success) or
2989 // `INTERRUPTED_SEND` (on failure).
2990 private val RESUMING_BY_RCV = Symbol("S_RESUMING_BY_RCV")
2991 // Indicates that `expandBuffer(..)` (RCV suffix) is resuming
2992 // the suspended sender; after that, it should update
2993 // the state to either `BUFFERED` (on success) or
2994 // `INTERRUPTED_SEND` (on failure).
2995 private val RESUMING_BY_EB = Symbol("RESUMING_BY_EB")
2996 // When a receiver comes to the cell already covered by
2997 // a sender (according to the counters), but the cell
2998 // is still in `EMPTY` or `IN_BUFFER` state, it breaks
2999 // the cell by changing its state to `POISONED`.
3000 private val POISONED = Symbol("POISONED")
3001 // When the element is successfully transferred
3002 // to a receiver, the cell changes to `DONE_RCV`.
3003 private val DONE_RCV = Symbol("DONE_RCV")
3004 // Cancelled sender.
3005 private val INTERRUPTED_SEND = Symbol("INTERRUPTED_SEND")
3006 // Cancelled receiver.
3007 private val INTERRUPTED_RCV = Symbol("INTERRUPTED_RCV")
3008 // Indicates that the channel is closed.
3009 internal val CHANNEL_CLOSED = Symbol("CHANNEL_CLOSED")
3010 // When the cell is already covered by both sender and
3011 // receiver (`sender` and `receivers` counters are greater
3012 // than the cell number), the `expandBuffer(..)` procedure
3013 // cannot distinguish which kind of operation is stored
3014 // in the cell. Thus, it wraps the waiter with this descriptor,
3015 // informing the possibly upcoming receiver that it should
3016 // complete the `expandBuffer(..)` procedure if the waiter stored
3017 // in the cell is sender. In turn, senders ignore this information.
3018 private class WaiterEB(@JvmField val waiter: Waiter) {
toStringnull3019     override fun toString() = "WaiterEB($waiter)"
3020 }
3021 
3022 
3023 
3024 /**
3025  * To distinguish suspended [BufferedChannel.receive] and
3026  * [BufferedChannel.receiveCatching] operations, the latter
3027  * uses this wrapper for its continuation.
3028  */
3029 private class ReceiveCatching<E>(
3030     @JvmField val cont: CancellableContinuationImpl<ChannelResult<E>>
3031 ) : Waiter by cont
3032 
3033 /*
3034   Internal results for [BufferedChannel.updateCellReceive].
3035   On successful rendezvous with waiting sender or
3036   buffered element retrieval, the corresponding element
3037   is returned as result of [BufferedChannel.updateCellReceive].
3038  */
3039 private val SUSPEND = Symbol("SUSPEND")
3040 private val SUSPEND_NO_WAITER = Symbol("SUSPEND_NO_WAITER")
3041 private val FAILED = Symbol("FAILED")
3042 
3043 /*
3044   Internal results for [BufferedChannel.updateCellSend]
3045  */
3046 private const val RESULT_RENDEZVOUS = 0
3047 private const val RESULT_BUFFERED = 1
3048 private const val RESULT_SUSPEND = 2
3049 private const val RESULT_SUSPEND_NO_WAITER = 3
3050 private const val RESULT_CLOSED = 4
3051 private const val RESULT_FAILED = 5
3052 
3053 /**
3054  * Special value for [BufferedChannel.BufferedChannelIterator.receiveResult]
3055  * that indicates the absence of pre-received result.
3056  */
3057 private val NO_RECEIVE_RESULT = Symbol("NO_RECEIVE_RESULT")
3058 
3059 /*
3060   As [BufferedChannel.invokeOnClose] can be invoked concurrently
3061   with channel closing, we have to synchronize them. These two
3062   markers help with the synchronization.
3063  */
3064 private val CLOSE_HANDLER_CLOSED = Symbol("CLOSE_HANDLER_CLOSED")
3065 private val CLOSE_HANDLER_INVOKED = Symbol("CLOSE_HANDLER_INVOKED")
3066 
3067 /**
3068  * Specifies the absence of closing cause, stored in [BufferedChannel._closeCause].
3069  * When the channel is closed or cancelled without exception, this [NO_CLOSE_CAUSE]
3070  * marker should be replaced with `null`.
3071  */
3072 private val NO_CLOSE_CAUSE = Symbol("NO_CLOSE_CAUSE")
3073 
3074 /*
3075   The channel close statuses. The transition scheme is the following:
3076     +--------+   +----------------------+   +-----------+
3077     | ACTIVE |-->| CANCELLATION_STARTED |-->| CANCELLED |
3078     +--------+   +----------------------+   +-----------+
3079         |                                         ^
3080         |             +--------+                  |
3081         +------------>| CLOSED |------------------+
3082                       +--------+
3083   We need `CANCELLATION_STARTED` to synchronize
3084   concurrent closing and cancellation.
3085  */
3086 private const val CLOSE_STATUS_ACTIVE = 0
3087 private const val CLOSE_STATUS_CANCELLATION_STARTED = 1
3088 private const val CLOSE_STATUS_CLOSED = 2
3089 private const val CLOSE_STATUS_CANCELLED = 3
3090 
3091 /*
3092   The `senders` counter and the channel close status
3093   are stored in a single 64-bit register to save the space
3094   and reduce the number of reads in sending operations.
3095   The code below encapsulates the required bit arithmetics.
3096  */
3097 private const val SENDERS_CLOSE_STATUS_SHIFT = 60
3098 private const val SENDERS_COUNTER_MASK = (1L shl SENDERS_CLOSE_STATUS_SHIFT) - 1
3099 private inline val Long.sendersCounter get() = this and SENDERS_COUNTER_MASK
3100 private inline val Long.sendersCloseStatus: Int get() = (this shr SENDERS_CLOSE_STATUS_SHIFT).toInt()
3101 private fun constructSendersAndCloseStatus(counter: Long, closeStatus: Int): Long =
3102     (closeStatus.toLong() shl SENDERS_CLOSE_STATUS_SHIFT) + counter
3103 
3104 /*
3105   The `completedExpandBuffersAndPauseFlag` 64-bit counter contains
3106   the number of completed `expandBuffer()` attempts along with a special
3107   flag that pauses progress to avoid starvation in `waitExpandBufferCompletion(..)`.
3108   The code below encapsulates the required bit arithmetics.
3109  */
3110 private const val EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT = 1L shl 62
3111 private const val EB_COMPLETED_COUNTER_MASK = EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT - 1
3112 private inline val Long.ebCompletedCounter get() = this and EB_COMPLETED_COUNTER_MASK
3113 private inline val Long.ebPauseExpandBuffers: Boolean get() = (this and EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT) != 0L
3114 private fun constructEBCompletedAndPauseFlag(counter: Long, pauseEB: Boolean): Long =
3115     (if (pauseEB) EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT else 0) + counter
3116