• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 @file:Suppress("PrivatePropertyName")
2 
3 package kotlinx.coroutines.channels
4 
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.channels.ChannelResult.Companion.closed
8 import kotlinx.coroutines.channels.ChannelResult.Companion.failure
9 import kotlinx.coroutines.channels.ChannelResult.Companion.success
10 import kotlinx.coroutines.flow.internal.*
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.selects.*
13 import kotlinx.coroutines.selects.TrySelectDetailedResult.*
14 import kotlin.contracts.*
15 import kotlin.coroutines.*
16 import kotlin.js.*
17 import kotlin.jvm.*
18 import kotlin.math.*
19 import kotlin.random.*
20 import kotlin.reflect.*
21 
22 /**
23  * The buffered channel implementation, which also serves as a rendezvous channel when the capacity is zero.
24  * The high-level structure bases on a conceptually infinite array for storing elements and waiting requests,
25  * separate counters of [send] and [receive] invocations that were ever performed, and an additional counter
26  * that indicates the end of the logical buffer by counting the number of array cells it ever contained.
27  * The key idea is that both [send] and [receive] start by incrementing their counters, assigning the array cell
28  * referenced by the counter. In case of rendezvous channels, the operation either suspends and stores its continuation
29  * in the cell or makes a rendezvous with the opposite request. Each cell can be processed by exactly one [send] and
30  * one [receive]. As for buffered channels, [send]-s can also add elements without suspension if the logical buffer
31  * contains the cell, while the [receive] operation updates the end of the buffer when its synchronization finishes.
32  *
33  * Please see the ["Fast and Scalable Channels in Kotlin Coroutines"](https://arxiv.org/abs/2211.04986)
34  * paper by Nikita Koval, Roman Elizarov, and Dan Alistarh for the detailed algorithm description.
35  */
36 internal open class BufferedChannel<E>(
37     /**
38      * Channel capacity; `Channel.RENDEZVOUS` for rendezvous channel
39      * and `Channel.UNLIMITED` for unlimited capacity.
40      */
41     private val capacity: Int,
42     @JvmField
43     internal val onUndeliveredElement: OnUndeliveredElement<E>? = null
44 ) : Channel<E> {
45     init {
46         require(capacity >= 0) { "Invalid channel capacity: $capacity, should be >=0" }
47         // This implementation has second `init`.
48     }
49 
50     // Maintenance note: use `Buffered1ChannelLincheckTest` to check hypotheses.
51 
52     /*
53       The counters indicate the total numbers of send, receive, and buffer expansion calls
54       ever performed. The counters are incremented in the beginning of the corresponding
55       operation; thus, acquiring a unique (for the operation type) cell to process.
56       The segments reference to the last working one for each operation type.
57 
58       Notably, the counter for send is combined with the channel closing status
59       for synchronization simplicity and performance reasons.
60 
61       The logical end of the buffer is initialized with the channel capacity.
62       If the channel is rendezvous or unlimited, the counter equals `BUFFER_END_RENDEZVOUS`
63       or `BUFFER_END_RENDEZVOUS`, respectively, and never updates. The `bufferEndSegment`
64       point to a special `NULL_SEGMENT` in this case.
65      */
66     private val sendersAndCloseStatus = atomic(0L)
67     private val receivers = atomic(0L)
68     private val bufferEnd = atomic(initialBufferEnd(capacity))
69 
70     internal val sendersCounter: Long get() = sendersAndCloseStatus.value.sendersCounter
71     internal val receiversCounter: Long get() = receivers.value
72     private val bufferEndCounter: Long get() = bufferEnd.value
73 
74     /*
75       Additionally to the counters above, we need an extra one that
76       tracks the number of cells processed by `expandBuffer()`.
77       When a receiver aborts, the corresponding cell might be
78       physically removed from the data structure to avoid memory
79       leaks, while it still can be unprocessed by `expandBuffer()`.
80       In this case, `expandBuffer()` cannot know whether the
81       removed cell contained sender or receiver and, therefore,
82       cannot proceed. To solve the race, we ensure that cells
83       correspond to cancelled receivers cannot be physically
84       removed until the cell is processed.
85       This additional counter enables the synchronization,
86      */
87     private val completedExpandBuffersAndPauseFlag = atomic(bufferEndCounter)
88 
89     private val isRendezvousOrUnlimited
90         get() = bufferEndCounter.let { it == BUFFER_END_RENDEZVOUS || it == BUFFER_END_UNLIMITED }
91 
92     private val sendSegment: AtomicRef<ChannelSegment<E>>
93     private val receiveSegment: AtomicRef<ChannelSegment<E>>
94     private val bufferEndSegment: AtomicRef<ChannelSegment<E>>
95 
96     init {
97         @Suppress("LeakingThis")
98         val firstSegment = ChannelSegment(id = 0, prev = null, channel = this, pointers = 3)
99         sendSegment = atomic(firstSegment)
100         receiveSegment = atomic(firstSegment)
101         // If this channel is rendezvous or has unlimited capacity, the algorithm never
102         // invokes the buffer expansion procedure, and the corresponding segment reference
103         // points to a special `NULL_SEGMENT` one and never updates.
104         @Suppress("UNCHECKED_CAST")
105         bufferEndSegment = atomic(if (isRendezvousOrUnlimited) (NULL_SEGMENT as ChannelSegment<E>) else firstSegment)
106     }
107 
108     // #########################
109     // ## The send operations ##
110     // #########################
111 
112     override suspend fun send(element: E): Unit =
113         sendImpl( // <-- this is an inline function
114             element = element,
115             // Do not create a continuation until it is required;
116             // it is created later via [onNoWaiterSuspend], if needed.
117             waiter = null,
118             // Finish immediately if a rendezvous happens
119             // or the element has been buffered.
120             onRendezvousOrBuffered = {},
121             // As no waiter is provided, suspension is impossible.
122             onSuspend = { _, _ -> assert { false } },
123             // According to the `send(e)` contract, we need to call
124             // `onUndeliveredElement(..)` handler and throw an exception
125             // if the channel is already closed.
126             onClosed = { onClosedSend(element) },
127             // When `send(e)` decides to suspend, the corresponding
128             // `onNoWaiterSuspend` function that creates a continuation
129             // is called. The tail-call optimization is applied here.
130             onNoWaiterSuspend = { segm, i, elem, s -> sendOnNoWaiterSuspend(segm, i, elem, s) }
131         )
132 
133     // NB: return type could've been Nothing, but it breaks TCO
134     private suspend fun onClosedSend(element: E): Unit = suspendCancellableCoroutine { continuation ->
135         onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
136             // If it crashes, add send exception as suppressed for better diagnostics
137             it.addSuppressed(sendException)
138             continuation.resumeWithStackTrace(it)
139             return@suspendCancellableCoroutine
140         }
141         continuation.resumeWithStackTrace(sendException)
142     }
143 
144     private suspend fun sendOnNoWaiterSuspend(
145         /* The working cell is specified by
146         the segment and the index in it. */
147         segment: ChannelSegment<E>,
148         index: Int,
149         /** The element to be inserted. */
150         element: E,
151         /** The global index of the cell. */
152         s: Long
153     ) = suspendCancellableCoroutineReusable sc@{ cont ->
154         sendImplOnNoWaiter( // <-- this is an inline function
155             segment = segment, index = index, element = element, s = s,
156             // Store the created continuation as a waiter.
157             waiter = cont,
158             // If a rendezvous happens or the element has been buffered,
159             // resume the continuation and finish. In case of prompt
160             // cancellation, it is guaranteed that the element
161             // has been already buffered or passed to receiver.
162             onRendezvousOrBuffered = { cont.resume(Unit) },
163             // If the channel is closed, call `onUndeliveredElement(..)` and complete the
164             // continuation with the corresponding exception.
165             onClosed = { onClosedSendOnNoWaiterSuspend(element, cont) },
166         )
167     }
168 
169     private fun Waiter.prepareSenderForSuspension(
170         /* The working cell is specified by
171         the segment and the index in it. */
172         segment: ChannelSegment<E>,
173         index: Int
174     ) {
175         // To distinguish cancelled senders and receivers,
176         // senders equip the index value with an additional marker,
177         // adding `SEGMENT_SIZE` to the value.
178         invokeOnCancellation(segment, index + SEGMENT_SIZE)
179     }
180 
181     private fun onClosedSendOnNoWaiterSuspend(element: E, cont: CancellableContinuation<Unit>) {
182         onUndeliveredElement?.callUndeliveredElement(element, cont.context)
183         cont.resumeWithException(recoverStackTrace(sendException, cont))
184     }
185 
186     override fun trySend(element: E): ChannelResult<Unit> {
187         // Do not try to send the element if the plain `send(e)` operation would suspend.
188         if (shouldSendSuspend(sendersAndCloseStatus.value)) return failure()
189         // This channel either has waiting receivers or is closed.
190         // Let's try to send the element!
191         // The logic is similar to the plain `send(e)` operation, with
192         // the only difference that we install `INTERRUPTED_SEND` in case
193         // the operation decides to suspend.
194         return sendImpl( // <-- this is an inline function
195             element = element,
196             // Store an already interrupted sender in case of suspension.
197             waiter = INTERRUPTED_SEND,
198             // Finish successfully when a rendezvous happens
199             // or the element has been buffered.
200             onRendezvousOrBuffered = { success(Unit) },
201             // On suspension, the `INTERRUPTED_SEND` token has been installed,
202             // and this `trySend(e)` must fail. According to the contract,
203             // we do not need to call the [onUndeliveredElement] handler.
204             onSuspend = { segm, _ ->
205                 segm.onSlotCleaned()
206                 failure()
207             },
208             // If the channel is closed, return the corresponding result.
209             onClosed = { closed(sendException) }
210         )
211     }
212 
213     /**
214      * This is a special `send(e)` implementation that returns `true` if the element
215      * has been successfully sent, and `false` if the channel is closed.
216      *
217      * In case of coroutine cancellation, the element may be undelivered --
218      * the [onUndeliveredElement] feature is unsupported in this implementation.
219      *
220      */
221     internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutine { cont ->
222         check(onUndeliveredElement == null) {
223             "the `onUndeliveredElement` feature is unsupported for `sendBroadcast(e)`"
224         }
225         sendImpl(
226             element = element,
227             waiter = SendBroadcast(cont),
228             onRendezvousOrBuffered = { cont.resume(true) },
229             onSuspend = { _, _ -> },
230             onClosed = { cont.resume(false) }
231         )
232     }
233 
234     /**
235      * Specifies waiting [sendBroadcast] operation.
236      */
237     private class SendBroadcast(
238         val cont: CancellableContinuation<Boolean>
239     ) : Waiter by cont as CancellableContinuationImpl<Boolean>
240 
241     /**
242      * Abstract send implementation.
243      */
244     protected inline fun <R> sendImpl(
245         /* The element to be sent. */
246         element: E,
247         /* The waiter to be stored in case of suspension,
248         or `null` if the waiter is not created yet.
249         In the latter case, when the algorithm decides
250         to suspend, [onNoWaiterSuspend] is called. */
251         waiter: Any?,
252         /* This lambda is invoked when the element has been
253         buffered or a rendezvous with a receiver happens. */
254         onRendezvousOrBuffered: () -> R,
255         /* This lambda is called when the operation suspends in the
256         cell specified by the segment and the index in it. */
257         onSuspend: (segm: ChannelSegment<E>, i: Int) -> R,
258         /* This lambda is called when the channel
259         is observed in the closed state. */
260         onClosed: () -> R,
261         /* This lambda is called when the operation decides
262         to suspend, but the waiter is not provided (equals `null`).
263         It should create a waiter and delegate to `sendImplOnNoWaiter`. */
264         onNoWaiterSuspend: (
265             segm: ChannelSegment<E>,
266             i: Int,
267             element: E,
268             s: Long
269         ) -> R = { _, _, _, _ -> error("unexpected") }
270     ): R {
271         // Read the segment reference before the counter increment;
272         // it is crucial to be able to find the required segment later.
273         var segment = sendSegment.value
274         while (true) {
275             // Atomically increment the `senders` counter and obtain the
276             // value right before the increment along with the close status.
277             val sendersAndCloseStatusCur = sendersAndCloseStatus.getAndIncrement()
278             val s = sendersAndCloseStatusCur.sendersCounter
279             // Is this channel already closed? Keep the information.
280             val closed = sendersAndCloseStatusCur.isClosedForSend0
281             // Count the required segment id and the cell index in it.
282             val id = s / SEGMENT_SIZE
283             val i = (s % SEGMENT_SIZE).toInt()
284             // Try to find the required segment if the initially obtained
285             // one (in the beginning of this function) has lower id.
286             if (segment.id != id) {
287                 // Find the required segment.
288                 segment = findSegmentSend(id, segment) ?:
289                     // The required segment has not been found.
290                     // Finish immediately if this channel is closed,
291                     // restarting the operation otherwise.
292                     // In the latter case, the required segment was full
293                     // of interrupted waiters and, therefore, removed
294                     // physically to avoid memory leaks.
295                     if (closed) {
296                         return onClosed()
297                     } else {
298                         continue
299                     }
300             }
301             // Update the cell according to the algorithm. Importantly, when
302             // the channel is already closed, storing a waiter is illegal, so
303             // the algorithm stores the `INTERRUPTED_SEND` token in this case.
304             when (updateCellSend(segment, i, element, s, waiter, closed)) {
305                 RESULT_RENDEZVOUS -> {
306                     // A rendezvous with a receiver has happened.
307                     // The previous segments are no longer needed
308                     // for the upcoming requests, so the algorithm
309                     // resets the link to the previous segment.
310                     segment.cleanPrev()
311                     return onRendezvousOrBuffered()
312                 }
313                 RESULT_BUFFERED -> {
314                     // The element has been buffered.
315                     return onRendezvousOrBuffered()
316                 }
317                 RESULT_SUSPEND -> {
318                     // The operation has decided to suspend and installed the
319                     // specified waiter. If the channel was already closed,
320                     // and the `INTERRUPTED_SEND` token has been installed as a waiter,
321                     // this request finishes with the `onClosed()` action.
322                     if (closed) {
323                         segment.onSlotCleaned()
324                         return onClosed()
325                     }
326                     (waiter as? Waiter)?.prepareSenderForSuspension(segment, i)
327                     return onSuspend(segment, i)
328                 }
329                 RESULT_CLOSED -> {
330                     // This channel is closed.
331                     // In case this segment is already or going to be
332                     // processed by a receiver, ensure that all the
333                     // previous segments are unreachable.
334                     if (s < receiversCounter) segment.cleanPrev()
335                     return onClosed()
336                 }
337                 RESULT_FAILED -> {
338                     // Either the cell stores an interrupted receiver,
339                     // or it was poisoned by a concurrent receiver.
340                     // In both cases, all the previous segments are already processed,
341                     segment.cleanPrev()
342                     continue
343                 }
344                 RESULT_SUSPEND_NO_WAITER -> {
345                     // The operation has decided to suspend,
346                     // but no waiter has been provided.
347                     return onNoWaiterSuspend(segment, i, element, s)
348                 }
349             }
350         }
351     }
352 
353     private inline fun sendImplOnNoWaiter(
354         /* The working cell is specified by
355         the segment and the index in it. */
356         segment: ChannelSegment<E>,
357         index: Int,
358         /* The element to be sent. */
359         element: E,
360         /* The global index of the cell. */
361         s: Long,
362         /* The waiter to be stored in case of suspension. */
363         waiter: Waiter,
364         /* This lambda is invoked when the element has been
365         buffered or a rendezvous with a receiver happens.*/
366         onRendezvousOrBuffered: () -> Unit,
367         /* This lambda is called when the channel
368         is observed in the closed state. */
369         onClosed: () -> Unit,
370     ) {
371         // Update the cell again, now with the non-null waiter,
372         // restarting the operation from the beginning on failure.
373         // Check the `sendImpl(..)` function for the comments.
374         when (updateCellSend(segment, index, element, s, waiter, false)) {
375             RESULT_RENDEZVOUS -> {
376                 segment.cleanPrev()
377                 onRendezvousOrBuffered()
378             }
379             RESULT_BUFFERED -> {
380                 onRendezvousOrBuffered()
381             }
382             RESULT_SUSPEND -> {
383                 waiter.prepareSenderForSuspension(segment, index)
384             }
385             RESULT_CLOSED -> {
386                 if (s < receiversCounter) segment.cleanPrev()
387                 onClosed()
388             }
389             RESULT_FAILED -> {
390                 segment.cleanPrev()
391                 sendImpl(
392                     element = element,
393                     waiter = waiter,
394                     onRendezvousOrBuffered = onRendezvousOrBuffered,
395                     onSuspend = { _, _ -> },
396                     onClosed = onClosed,
397                 )
398             }
399             else -> error("unexpected")
400         }
401     }
402 
403     private fun updateCellSend(
404         /* The working cell is specified by
405         the segment and the index in it. */
406         segment: ChannelSegment<E>,
407         index: Int,
408         /* The element to be sent. */
409         element: E,
410         /* The global index of the cell. */
411         s: Long,
412         /* The waiter to be stored in case of suspension. */
413         waiter: Any?,
414         closed: Boolean
415     ): Int {
416         // This is a fast-path of `updateCellSendSlow(..)`.
417         //
418         // First, the algorithm stores the element,
419         // performing the synchronization after that.
420         // This way, receivers safely retrieve the
421         // element, following the safe publication pattern.
422         segment.storeElement(index, element)
423         if (closed) return updateCellSendSlow(segment, index, element, s, waiter, closed)
424         // Read the current cell state.
425         val state = segment.getState(index)
426         when {
427             // The cell is empty.
428             state === null -> {
429                 // If the element should be buffered, or a rendezvous should happen
430                 // while the receiver is still coming, try to buffer the element.
431                 // Otherwise, try to store the specified waiter in the cell.
432                 if (bufferOrRendezvousSend(s)) {
433                     // Move the cell state to `BUFFERED`.
434                     if (segment.casState(index, null, BUFFERED)) {
435                         // The element has been successfully buffered, finish.
436                         return RESULT_BUFFERED
437                     }
438                 } else {
439                     // This `send(e)` operation should suspend.
440                     // However, in case the channel has already
441                     // been observed closed, `INTERRUPTED_SEND`
442                     // is installed instead.
443                     if (waiter == null) {
444                         // The waiter is not specified; return the corresponding result.
445                         return RESULT_SUSPEND_NO_WAITER
446                     } else {
447                         // Try to install the waiter.
448                         if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
449                     }
450                 }
451             }
452             // A waiting receiver is stored in the cell.
453             state is Waiter -> {
454                 // As the element will be passed directly to the waiter,
455                 // the algorithm cleans the element slot in the cell.
456                 segment.cleanElement(index)
457                 // Try to make a rendezvous with the suspended receiver.
458                 return if (state.tryResumeReceiver(element)) {
459                     // Rendezvous! Move the cell state to `DONE_RCV` and finish.
460                     segment.setState(index, DONE_RCV)
461                     onReceiveDequeued()
462                     RESULT_RENDEZVOUS
463                 } else {
464                     // The resumption has failed. Update the cell state correspondingly
465                     // and clean the element field. It is also possible for a concurrent
466                     // cancellation handler to update the cell state; we can safely
467                     // ignore these updates.
468                     if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
469                         segment.onCancelledRequest(index, true)
470                     }
471                     RESULT_FAILED
472                 }
473             }
474         }
475         return updateCellSendSlow(segment, index, element, s, waiter, closed)
476     }
477 
478     /**
479      * Updates the working cell of an abstract send operation.
480      */
481     private fun updateCellSendSlow(
482         /* The working cell is specified by
483         the segment and the index in it. */
484         segment: ChannelSegment<E>,
485         index: Int,
486         /* The element to be sent. */
487         element: E,
488         /* The global index of the cell. */
489         s: Long,
490         /* The waiter to be stored in case of suspension. */
491         waiter: Any?,
492         closed: Boolean
493     ): Int {
494         // Then, the cell state should be updated according to
495         // its state machine; see the paper mentioned in the very
496         // beginning for the cell life-cycle and the algorithm details.
497         while (true) {
498             // Read the current cell state.
499             val state = segment.getState(index)
500             when {
501                 // The cell is empty.
502                 state === null -> {
503                     // If the element should be buffered, or a rendezvous should happen
504                     // while the receiver is still coming, try to buffer the element.
505                     // Otherwise, try to store the specified waiter in the cell.
506                     if (bufferOrRendezvousSend(s) && !closed) {
507                         // Move the cell state to `BUFFERED`.
508                         if (segment.casState(index, null, BUFFERED)) {
509                             // The element has been successfully buffered, finish.
510                             return RESULT_BUFFERED
511                         }
512                     } else {
513                         // This `send(e)` operation should suspend.
514                         // However, in case the channel has already
515                         // been observed closed, `INTERRUPTED_SEND`
516                         // is installed instead.
517                         when {
518                             // The channel is closed
519                             closed -> if (segment.casState(index, null, INTERRUPTED_SEND)) {
520                                 segment.onCancelledRequest(index, false)
521                                 return RESULT_CLOSED
522                             }
523                             // The waiter is not specified; return the corresponding result.
524                             waiter == null -> return RESULT_SUSPEND_NO_WAITER
525                             // Try to install the waiter.
526                             else -> if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
527                         }
528                     }
529                 }
530                 // This cell is in the logical buffer.
531                 state === IN_BUFFER -> {
532                     // Try to buffer the element.
533                     if (segment.casState(index, state, BUFFERED)) {
534                         // The element has been successfully buffered, finish.
535                         return RESULT_BUFFERED
536                     }
537                 }
538                 // The cell stores a cancelled receiver.
539                 state === INTERRUPTED_RCV -> {
540                     // Clean the element slot to avoid memory leaks and finish.
541                     segment.cleanElement(index)
542                     return RESULT_FAILED
543                 }
544                 // The cell is poisoned by a concurrent receive.
545                 state === POISONED -> {
546                     // Clean the element slot to avoid memory leaks and finish.
547                     segment.cleanElement(index)
548                     return RESULT_FAILED
549                 }
550                 // The channel is already closed.
551                 state === CHANNEL_CLOSED -> {
552                     // Clean the element slot to avoid memory leaks,
553                     // ensure that the closing/cancellation procedure
554                     // has been completed, and finish.
555                     segment.cleanElement(index)
556                     completeCloseOrCancel()
557                     return RESULT_CLOSED
558                 }
559                 // A waiting receiver is stored in the cell.
560                 else -> {
561                     assert { state is Waiter || state is WaiterEB }
562                     // As the element will be passed directly to the waiter,
563                     // the algorithm cleans the element slot in the cell.
564                     segment.cleanElement(index)
565                     // Unwrap the waiting receiver from `WaiterEB` if needed.
566                     // As a receiver is stored in the cell, the buffer expansion
567                     // procedure would finish, so senders simply ignore the "EB" marker.
568                     val receiver = if (state is WaiterEB) state.waiter else state
569                     // Try to make a rendezvous with the suspended receiver.
570                     return if (receiver.tryResumeReceiver(element)) {
571                         // Rendezvous! Move the cell state to `DONE_RCV` and finish.
572                         segment.setState(index, DONE_RCV)
573                         onReceiveDequeued()
574                         RESULT_RENDEZVOUS
575                     } else {
576                         // The resumption has failed. Update the cell state correspondingly
577                         // and clean the element field. It is also possible for a concurrent
578                         // `expandBuffer()` or the cancellation handler to update the cell state;
579                         // we can safely ignore these updates as senders do not help `expandBuffer()`.
580                         if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
581                             segment.onCancelledRequest(index, true)
582                         }
583                         RESULT_FAILED
584                     }
585                 }
586             }
587         }
588     }
589 
590     /**
591      * Checks whether a [send] invocation is bound to suspend if it is called
592      * with the specified [sendersAndCloseStatus], [receivers], and [bufferEnd]
593      * values. When this channel is already closed, the function returns `false`.
594      *
595      * Specifically, [send] suspends if the channel is not unlimited,
596      * the number of receivers is greater than then index of the working cell of the
597      * potential [send] invocation, and the buffer does not cover this cell
598      * in case of buffered channel.
599      * When the channel is already closed, [send] does not suspend.
600      */
601     @JsName("shouldSendSuspend0")
602     private fun shouldSendSuspend(curSendersAndCloseStatus: Long): Boolean {
603         // Does not suspend if the channel is already closed.
604         if (curSendersAndCloseStatus.isClosedForSend0) return false
605         // Does not suspend if a rendezvous may happen or the buffer is not full.
606         return !bufferOrRendezvousSend(curSendersAndCloseStatus.sendersCounter)
607     }
608 
609     /**
610      * Returns `true` when the specified [send] should place
611      * its element to the working cell without suspension.
612      */
613     private fun bufferOrRendezvousSend(curSenders: Long): Boolean =
614         curSenders < bufferEndCounter || curSenders < receiversCounter + capacity
615 
616     /**
617      * Checks whether a [send] invocation is bound to suspend if it is called
618      * with the current counter and close status values. See [shouldSendSuspend] for details.
619      *
620      * Note that this implementation is _false positive_ in case of rendezvous channels,
621      * so it can return `false` when a [send] invocation is bound to suspend. Specifically,
622      * the counter of `receive()` operations may indicate that there is a waiting receiver,
623      * while it has already been cancelled, so the potential rendezvous is bound to fail.
624      */
625     internal open fun shouldSendSuspend(): Boolean = shouldSendSuspend(sendersAndCloseStatus.value)
626 
627     /**
628      * Tries to resume this receiver with the specified [element] as a result.
629      * Returns `true` on success and `false` otherwise.
630      */
631     @Suppress("UNCHECKED_CAST")
632     private fun Any.tryResumeReceiver(element: E): Boolean = when(this) {
633         is SelectInstance<*> -> { // `onReceiveXXX` select clause
634             trySelect(this@BufferedChannel, element)
635         }
636         is ReceiveCatching<*> -> {
637             this as ReceiveCatching<E>
638             cont.tryResume0(success(element), onUndeliveredElement?.bindCancellationFun(element, cont.context))
639         }
640         is BufferedChannel<*>.BufferedChannelIterator -> {
641             this as BufferedChannel<E>.BufferedChannelIterator
642             tryResumeHasNext(element)
643         }
644         is CancellableContinuation<*> -> { // `receive()`
645             this as CancellableContinuation<E>
646             tryResume0(element, onUndeliveredElement?.bindCancellationFun(element, context))
647         }
648         else -> error("Unexpected receiver type: $this")
649     }
650 
651     // ##########################
652     // # The receive operations #
653     // ##########################
654 
655     /**
656      * This function is invoked when a receiver is added as a waiter in this channel.
657      */
658     protected open fun onReceiveEnqueued() {}
659 
660     /**
661      * This function is invoked when a waiting receiver is no longer stored in this channel;
662      * independently on whether it is caused by rendezvous, cancellation, or channel closing.
663      */
664     protected open fun onReceiveDequeued() {}
665 
666     override suspend fun receive(): E =
667         receiveImpl( // <-- this is an inline function
668             // Do not create a continuation until it is required;
669             // it is created later via [onNoWaiterSuspend], if needed.
670             waiter = null,
671             // Return the received element on successful retrieval from
672             // the buffer or rendezvous with a suspended sender.
673             // Also, inform `BufferedChannel` extensions that
674             // synchronization of this receive operation is completed.
675             onElementRetrieved = { element ->
676                 return element
677             },
678             // As no waiter is provided, suspension is impossible.
679             onSuspend = { _, _, _ -> error("unexpected") },
680             // Throw an exception if the channel is already closed.
681             onClosed = { throw recoverStackTrace(receiveException) },
682             // If `receive()` decides to suspend, the corresponding
683             // `suspend` function that creates a continuation is called.
684             // The tail-call optimization is applied here.
685             onNoWaiterSuspend = { segm, i, r -> receiveOnNoWaiterSuspend(segm, i, r) }
686         )
687 
688     private suspend fun receiveOnNoWaiterSuspend(
689         /* The working cell is specified by
690         the segment and the index in it. */
691         segment: ChannelSegment<E>,
692         index: Int,
693         /* The global index of the cell. */
694         r: Long
695     ) = suspendCancellableCoroutineReusable { cont ->
696         receiveImplOnNoWaiter( // <-- this is an inline function
697             segment = segment, index = index, r = r,
698             // Store the created continuation as a waiter.
699             waiter = cont,
700             // In case of successful element retrieval, resume
701             // the continuation with the element and inform the
702             // `BufferedChannel` extensions that the synchronization
703             // is completed. Importantly, the receiver coroutine
704             // may be cancelled after it is successfully resumed but
705             // not dispatched yet. In case `onUndeliveredElement` is
706             // specified, we need to invoke it in the latter case.
707             onElementRetrieved = { element ->
708                 val onCancellation = onUndeliveredElement?.bindCancellationFun(element, cont.context)
709                 cont.resume(element, onCancellation)
710             },
711             onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
712         )
713     }
714 
715     private fun Waiter.prepareReceiverForSuspension(segment: ChannelSegment<E>, index: Int) {
716         onReceiveEnqueued()
717         invokeOnCancellation(segment, index)
718     }
719 
720     private fun onClosedReceiveOnNoWaiterSuspend(cont: CancellableContinuation<E>) {
721         cont.resumeWithException(receiveException)
722     }
723 
724     /*
725     The implementation is exactly the same as of `receive()`,
726     with the only difference that this function returns a `ChannelResult`
727     instance and does not throw exception explicitly in case the channel
728     is already closed for receiving. Please refer the plain `receive()`
729     implementation for the comments.
730     */
731     override suspend fun receiveCatching(): ChannelResult<E> =
732         receiveImpl( // <-- this is an inline function
733             waiter = null,
734             onElementRetrieved = { element ->
735                 success(element)
736             },
737             onSuspend = { _, _, _ -> error("unexpected") },
738             onClosed = { closed(closeCause) },
739             onNoWaiterSuspend = { segm, i, r -> receiveCatchingOnNoWaiterSuspend(segm, i, r) }
740         )
741 
742     private suspend fun receiveCatchingOnNoWaiterSuspend(
743         segment: ChannelSegment<E>,
744         index: Int,
745         r: Long
746     ) = suspendCancellableCoroutineReusable { cont ->
747         val waiter = ReceiveCatching(cont as CancellableContinuationImpl<ChannelResult<E>>)
748         receiveImplOnNoWaiter(
749             segment, index, r,
750             waiter = waiter,
751             onElementRetrieved = { element ->
752                 cont.resume(success(element), onUndeliveredElement?.bindCancellationFun(element, cont.context))
753             },
754             onClosed = { onClosedReceiveCatchingOnNoWaiterSuspend(cont) }
755         )
756     }
757 
758     private fun onClosedReceiveCatchingOnNoWaiterSuspend(cont: CancellableContinuation<ChannelResult<E>>) {
759         cont.resume(closed(closeCause))
760     }
761 
762     override fun tryReceive(): ChannelResult<E> {
763         // Read the `receivers` counter first.
764         val r = receivers.value
765         val sendersAndCloseStatusCur = sendersAndCloseStatus.value
766         // Is this channel closed for receive?
767         if (sendersAndCloseStatusCur.isClosedForReceive0) {
768             return closed(closeCause)
769         }
770         // Do not try to receive an element if the plain `receive()` operation would suspend.
771         val s = sendersAndCloseStatusCur.sendersCounter
772         if (r >= s) return failure()
773         // Let's try to retrieve an element!
774         // The logic is similar to the plain `receive()` operation, with
775         // the only difference that we store `INTERRUPTED_RCV` in case
776         // the operation decides to suspend. This way, we can leverage
777         // the unconditional `Fetch-and-Add` instruction.
778         // One may consider storing `INTERRUPTED_RCV` instead of an actual waiter
779         // on suspension (a.k.a. "no elements to retrieve") as a short-cut of
780         // "suspending and cancelling immediately".
781         return receiveImpl( // <-- this is an inline function
782             // Store an already interrupted receiver in case of suspension.
783             waiter = INTERRUPTED_RCV,
784             // Finish when an element is successfully retrieved.
785             onElementRetrieved = { element -> success(element) },
786             // On suspension, the `INTERRUPTED_RCV` token has been
787             // installed, and this `tryReceive()` must fail.
788             onSuspend = { segm, _, globalIndex ->
789                 // Emulate "cancelled" receive, thus invoking 'waitExpandBufferCompletion' manually,
790                 // because effectively there were no cancellation
791                 waitExpandBufferCompletion(globalIndex)
792                 segm.onSlotCleaned()
793                 failure()
794             },
795             // If the channel is closed, return the corresponding result.
796             onClosed = { closed(closeCause) }
797         )
798     }
799 
800     /**
801      * Extracts the first element from this channel until the cell with the specified
802      * index is moved to the logical buffer. This is a key procedure for the _conflated_
803      * channel implementation, see [ConflatedBufferedChannel] with the [BufferOverflow.DROP_OLDEST]
804      * strategy on buffer overflowing.
805      */
806     protected fun dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(globalCellIndex: Long) {
807         assert { isConflatedDropOldest }
808         // Read the segment reference before the counter increment;
809         // it is crucial to be able to find the required segment later.
810         var segment = receiveSegment.value
811         while (true) {
812             // Read the receivers counter to check whether the specified cell is already in the buffer
813             // or should be moved to the buffer in a short time, due to the already started `receive()`.
814             val r = this.receivers.value
815             if (globalCellIndex < max(r + capacity, bufferEndCounter)) return
816             // The cell is outside the buffer. Try to extract the first element
817             // if the `receivers` counter has not been changed.
818             if (!this.receivers.compareAndSet(r, r + 1)) continue
819             // Count the required segment id and the cell index in it.
820             val id = r / SEGMENT_SIZE
821             val i = (r % SEGMENT_SIZE).toInt()
822             // Try to find the required segment if the initially obtained
823             // segment (in the beginning of this function) has lower id.
824             if (segment.id != id) {
825                 // Find the required segment, restarting the operation if it has not been found.
826                 segment = findSegmentReceive(id, segment) ?:
827                     // The required segment has not been found. It is possible that the channel is already
828                     // closed for receiving, so the linked list of segments is closed as well.
829                     // In the latter case, the operation will finish eventually after incrementing
830                     // the `receivers` counter sufficient times. Note that it is impossible to check
831                     // whether this channel is closed for receiving (we do this in `receive`),
832                     // as it may call this function when helping to complete closing the channel.
833                     continue
834             }
835             // Update the cell according to the cell life-cycle.
836             val updCellResult = updateCellReceive(segment, i, r, null)
837             when {
838                 updCellResult === FAILED -> {
839                     // The cell is poisoned; restart from the beginning.
840                     // To avoid memory leaks, we also need to reset
841                     // the `prev` pointer of the working segment.
842                     if (r < sendersCounter) segment.cleanPrev()
843                 }
844                 else -> { // element
845                     // A buffered element was retrieved from the cell.
846                     // Clean the reference to the previous segment.
847                     segment.cleanPrev()
848                     @Suppress("UNCHECKED_CAST")
849                     onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it }
850                 }
851             }
852         }
853     }
854 
855     /**
856      * Abstract receive implementation.
857      */
858     private inline fun <R> receiveImpl(
859         /* The waiter to be stored in case of suspension,
860         or `null` if the waiter is not created yet.
861         In the latter case, if the algorithm decides
862         to suspend, [onNoWaiterSuspend] is called. */
863         waiter: Any?,
864         /* This lambda is invoked when an element has been
865         successfully retrieved, either from the buffer or
866         by making a rendezvous with a suspended sender. */
867         onElementRetrieved: (element: E) -> R,
868         /* This lambda is called when the operation suspends in the cell
869         specified by the segment and its global and in-segment indices. */
870         onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
871         /* This lambda is called when the channel is observed
872         in the closed state and no waiting sender is found,
873         which means that it is closed for receiving. */
874         onClosed: () -> R,
875         /* This lambda is called when the operation decides
876         to suspend, but the waiter is not provided (equals `null`).
877         It should create a waiter and delegate to `sendImplOnNoWaiter`. */
878         onNoWaiterSuspend: (
879             segm: ChannelSegment<E>,
880             i: Int,
881             r: Long
882         ) -> R = { _, _, _ -> error("unexpected") }
883     ): R {
884         // Read the segment reference before the counter increment;
885         // it is crucial to be able to find the required segment later.
886         var segment = receiveSegment.value
887         while (true) {
888             // Similar to the `send(e)` operation, `receive()` first checks
889             // whether the channel is already closed for receiving.
890             if (isClosedForReceive) return onClosed()
891             // Atomically increments the `receivers` counter
892             // and obtain the value right before the increment.
893             val r = this.receivers.getAndIncrement()
894             // Count the required segment id and the cell index in it.
895             val id = r / SEGMENT_SIZE
896             val i = (r % SEGMENT_SIZE).toInt()
897             // Try to find the required segment if the initially obtained
898             // segment (in the beginning of this function) has lower id.
899             if (segment.id != id) {
900                 // Find the required segment, restarting the operation if it has not been found.
901                 segment = findSegmentReceive(id, segment) ?:
902                     // The required segment is not found. It is possible that the channel is already
903                     // closed for receiving, so the linked list of segments is closed as well.
904                     // In the latter case, the operation fails with the corresponding check at the beginning.
905                     continue
906             }
907             // Update the cell according to the cell life-cycle.
908             val updCellResult = updateCellReceive(segment, i, r, waiter)
909             return when {
910                 updCellResult === SUSPEND -> {
911                     // The operation has decided to suspend and
912                     // stored the specified waiter in the cell.
913                     (waiter as? Waiter)?.prepareReceiverForSuspension(segment, i)
914                     onSuspend(segment, i, r)
915                 }
916                 updCellResult === FAILED -> {
917                     // The operation has tried to make a rendezvous
918                     // but failed: either the opposite request has
919                     // already been cancelled or the cell is poisoned.
920                     // Restart from the beginning in this case.
921                     // To avoid memory leaks, we also need to reset
922                     // the `prev` pointer of the working segment.
923                     if (r < sendersCounter) segment.cleanPrev()
924                     continue
925                 }
926                 updCellResult === SUSPEND_NO_WAITER -> {
927                     // The operation has decided to suspend,
928                     // but no waiter has been provided.
929                     onNoWaiterSuspend(segment, i, r)
930                 }
931                 else -> { // element
932                     // Either a buffered element was retrieved from the cell
933                     // or a rendezvous with a waiting sender has happened.
934                     // Clean the reference to the previous segment before finishing.
935                     segment.cleanPrev()
936                     @Suppress("UNCHECKED_CAST")
937                     onElementRetrieved(updCellResult as E)
938                 }
939             }
940         }
941     }
942 
943     private inline fun receiveImplOnNoWaiter(
944         /* The working cell is specified by
945         the segment and the index in it. */
946         segment: ChannelSegment<E>,
947         index: Int,
948         /* The global index of the cell. */
949         r: Long,
950         /* The waiter to be stored in case of suspension. */
951         waiter: Waiter,
952         /* This lambda is invoked when an element has been
953         successfully retrieved, either from the buffer or
954         by making a rendezvous with a suspended sender. */
955         onElementRetrieved: (element: E) -> Unit,
956         /* This lambda is called when the channel is observed
957         in the closed state and no waiting senders is found,
958         which means that it is closed for receiving. */
959         onClosed: () -> Unit
960     ) {
961         // Update the cell with the non-null waiter,
962         // restarting from the beginning on failure.
963         // Check the `receiveImpl(..)` function for the comments.
964         val updCellResult = updateCellReceive(segment, index, r, waiter)
965         when {
966             updCellResult === SUSPEND -> {
967                 waiter.prepareReceiverForSuspension(segment, index)
968             }
969             updCellResult === FAILED -> {
970                 if (r < sendersCounter) segment.cleanPrev()
971                 receiveImpl(
972                     waiter = waiter,
973                     onElementRetrieved = onElementRetrieved,
974                     onSuspend = { _, _, _ -> },
975                     onClosed = onClosed
976                 )
977             }
978             else -> {
979                 segment.cleanPrev()
980                 @Suppress("UNCHECKED_CAST")
981                 onElementRetrieved(updCellResult as E)
982             }
983         }
984     }
985 
986     private fun updateCellReceive(
987         /* The working cell is specified by
988         the segment and the index in it. */
989         segment: ChannelSegment<E>,
990         index: Int,
991         /* The global index of the cell. */
992         r: Long,
993         /* The waiter to be stored in case of suspension. */
994         waiter: Any?,
995     ): Any? {
996         // This is a fast-path of `updateCellReceiveSlow(..)`.
997         //
998         // Read the current cell state.
999         val state = segment.getState(index)
1000         when {
1001             // The cell is empty.
1002             state === null -> {
1003                 // If a rendezvous must happen, the operation does not wait
1004                 // until the cell stores a buffered element or a suspended
1005                 // sender, poisoning the cell and restarting instead.
1006                 // Otherwise, try to store the specified waiter in the cell.
1007                 val senders = sendersAndCloseStatus.value.sendersCounter
1008                 if (r >= senders) {
1009                     // This `receive()` operation should suspend.
1010                     if (waiter === null) {
1011                         // The waiter is not specified;
1012                         // return the corresponding result.
1013                         return SUSPEND_NO_WAITER
1014                     }
1015                     // Try to install the waiter.
1016                     if (segment.casState(index, state, waiter)) {
1017                         // The waiter has been successfully installed.
1018                         // Invoke the `expandBuffer()` procedure and finish.
1019                         expandBuffer()
1020                         return SUSPEND
1021                     }
1022                 }
1023             }
1024             // The cell stores a buffered element.
1025             state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
1026                 // Retrieve the element and expand the buffer.
1027                 expandBuffer()
1028                 return segment.retrieveElement(index)
1029             }
1030         }
1031         return updateCellReceiveSlow(segment, index, r, waiter)
1032     }
1033 
1034     private fun updateCellReceiveSlow(
1035         /* The working cell is specified by
1036         the segment and the index in it. */
1037         segment: ChannelSegment<E>,
1038         index: Int,
1039         /* The global index of the cell. */
1040         r: Long,
1041         /* The waiter to be stored in case of suspension. */
1042         waiter: Any?,
1043     ): Any? {
1044         // The cell state should be updated according to  its state machine;
1045         // see the paper mentioned in the very beginning for the algorithm details.
1046         while (true) {
1047             // Read the current cell state.
1048             val state = segment.getState(index)
1049             when {
1050                 // The cell is empty.
1051                 state === null || state === IN_BUFFER -> {
1052                     // If a rendezvous must happen, the operation does not wait
1053                     // until the cell stores a buffered element or a suspended
1054                     // sender, poisoning the cell and restarting instead.
1055                     // Otherwise, try to store the specified waiter in the cell.
1056                     val senders = sendersAndCloseStatus.value.sendersCounter
1057                     if (r < senders) {
1058                         // The cell is already covered by sender,
1059                         // so a rendezvous must happen. Unfortunately,
1060                         // the cell is empty, so the operation poisons it.
1061                         if (segment.casState(index, state, POISONED)) {
1062                             // When the cell becomes poisoned, it is essentially
1063                             // the same as storing an already cancelled receiver.
1064                             // Thus, the `expandBuffer()` procedure should be invoked.
1065                             expandBuffer()
1066                             return FAILED
1067                         }
1068                     } else {
1069                         // This `receive()` operation should suspend.
1070                         if (waiter === null) {
1071                             // The waiter is not specified;
1072                             // return the corresponding result.
1073                             return SUSPEND_NO_WAITER
1074                         }
1075                         // Try to install the waiter.
1076                         if (segment.casState(index, state, waiter)) {
1077                             // The waiter has been successfully installed.
1078                             // Invoke the `expandBuffer()` procedure and finish.
1079                             expandBuffer()
1080                             return SUSPEND
1081                         }
1082                     }
1083                 }
1084                 // The cell stores a buffered element.
1085                 state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
1086                     // Retrieve the element and expand the buffer.
1087                     expandBuffer()
1088                     return segment.retrieveElement(index)
1089                 }
1090                 // The cell stores an interrupted sender.
1091                 state === INTERRUPTED_SEND -> return FAILED
1092                 // The cell is already poisoned by a concurrent
1093                 // `hasElements` call. Restart in this case.
1094                 state === POISONED -> return FAILED
1095                 // This channel is already closed.
1096                 state === CHANNEL_CLOSED -> {
1097                     // Although the channel is closed, it is still required
1098                     // to call the `expandBuffer()` procedure to keep
1099                     // `waitForExpandBufferCompletion()` correct.
1100                     expandBuffer()
1101                     return FAILED
1102                 }
1103                 // A concurrent `expandBuffer()` is resuming a
1104                 // suspended sender. Wait in a spin-loop until
1105                 // the resumption attempt completes: the cell
1106                 // state must change to either `BUFFERED` or
1107                 // `INTERRUPTED_SEND`.
1108                 state === RESUMING_BY_EB -> continue
1109                 // The cell stores a suspended sender; try to resume it.
1110                 else -> {
1111                     // To synchronize with expandBuffer(), the algorithm
1112                     // first moves the cell to an intermediate `S_RESUMING_BY_RCV`
1113                     // state, updating it to either `BUFFERED` (on success) or
1114                     // `INTERRUPTED_SEND` (on failure).
1115                     if (segment.casState(index, state, RESUMING_BY_RCV)) {
1116                         // Has a concurrent `expandBuffer()` delegated its completion?
1117                         val helpExpandBuffer = state is WaiterEB
1118                         // Extract the sender if needed and try to resume it.
1119                         val sender = if (state is WaiterEB) state.waiter else state
1120                         return if (sender.tryResumeSender(segment, index)) {
1121                             // The sender has been resumed successfully!
1122                             // Update the cell state correspondingly,
1123                             // expand the buffer, and return the element
1124                             // stored in the cell.
1125                             // In case a concurrent `expandBuffer()` has delegated
1126                             // its completion, the procedure should finish, as the
1127                             // sender is resumed. Thus, no further action is required.
1128                             segment.setState(index, DONE_RCV)
1129                             expandBuffer()
1130                             segment.retrieveElement(index)
1131                         } else {
1132                             // The resumption has failed. Update the cell correspondingly.
1133                             // In case a concurrent `expandBuffer()` has delegated
1134                             // its completion, the procedure should skip this cell, so
1135                             // `expandBuffer()` should be called once again.
1136                             segment.setState(index, INTERRUPTED_SEND)
1137                             segment.onCancelledRequest(index, false)
1138                             if (helpExpandBuffer) expandBuffer()
1139                             FAILED
1140                         }
1141                     }
1142                 }
1143             }
1144         }
1145     }
1146 
1147     private fun Any.tryResumeSender(segment: ChannelSegment<E>, index: Int): Boolean = when (this) {
1148         is CancellableContinuation<*> -> { // suspended `send(e)` operation
1149             @Suppress("UNCHECKED_CAST")
1150             this as CancellableContinuation<Unit>
1151             tryResume0(Unit)
1152         }
1153         is SelectInstance<*> -> {
1154             this as SelectImplementation<*>
1155             val trySelectResult = trySelectDetailed(clauseObject = this@BufferedChannel, result = Unit)
1156             // Clean the element slot to avoid memory leaks
1157             // if this `select` clause should be re-registered.
1158             if (trySelectResult === REREGISTER) segment.cleanElement(index)
1159             // Was the resumption successful?
1160             trySelectResult === SUCCESSFUL
1161         }
1162         is SendBroadcast -> cont.tryResume0(true) // // suspended `sendBroadcast(e)` operation
1163         else -> error("Unexpected waiter: $this")
1164     }
1165 
1166     // ################################
1167     // # The expandBuffer() procedure #
1168     // ################################
1169 
1170     private fun expandBuffer() {
1171         // Do not need to take any action if
1172         // this channel is rendezvous or unlimited.
1173         if (isRendezvousOrUnlimited) return
1174         // Read the current segment of
1175         // the `expandBuffer()` procedure.
1176         var segment = bufferEndSegment.value
1177         // Try to expand the buffer until succeed.
1178         try_again@ while (true) {
1179             // Increment the logical end of the buffer.
1180             // The `b`-th cell is going to be added to the buffer.
1181             val b = bufferEnd.getAndIncrement()
1182             val id = b / SEGMENT_SIZE
1183             // After that, read the current `senders` counter.
1184             // In case its value is lower than `b`, the `send(e)`
1185             // invocation that will work with this `b`-th cell
1186             // will detect that the cell is already a part of the
1187             // buffer when comparing with the `bufferEnd` counter.
1188             // However, `bufferEndSegment` may reference an outdated
1189             // segment, which should be updated to avoid memory leaks.
1190             val s = sendersCounter
1191             if (s <= b) {
1192                 // Should `bufferEndSegment` be moved forward to avoid memory leaks?
1193                 if (segment.id < id && segment.next != null)
1194                     moveSegmentBufferEndToSpecifiedOrLast(id, segment)
1195                 // Increment the number of completed `expandBuffer()`-s and finish.
1196                 incCompletedExpandBufferAttempts()
1197                 return
1198             }
1199             // Is `bufferEndSegment` outdated or is the segment with the required id already removed?
1200             // Find the required segment, creating new ones if needed.
1201             if (segment.id != id) {
1202                 segment = findSegmentBufferEnd(id, segment, b)
1203                     // Restart if the required segment is removed, or
1204                     // the linked list of segments is already closed,
1205                     // and the required one will never be created.
1206                     // Please note that `findSegmentBuffer(..)` updates
1207                     // the number of completed `expandBuffer()` attempt
1208                     // in this case.
1209                     ?: continue@try_again
1210             }
1211             // Try to add the cell to the logical buffer,
1212             // updating the cell state according to the state-machine.
1213             val i = (b % SEGMENT_SIZE).toInt()
1214             if (updateCellExpandBuffer(segment, i, b)) {
1215                 // The cell has been added to the logical buffer!
1216                 // Increment the number of completed `expandBuffer()`-s and finish.
1217                 //
1218                 // Note that it is possible to increment the number of
1219                 // completed `expandBuffer()` attempts earlier, right
1220                 // after the segment is obtained. We find this change
1221                 // counter-intuitive and prefer to avoid it.
1222                 incCompletedExpandBufferAttempts()
1223                 return
1224             } else {
1225                 // The cell has not been added to the buffer.
1226                 // Increment the number of completed `expandBuffer()`
1227                 // attempts and restart.
1228                 incCompletedExpandBufferAttempts()
1229                 continue@try_again
1230             }
1231         }
1232     }
1233 
1234     private fun updateCellExpandBuffer(
1235         /* The working cell is specified by
1236         the segment and the index in it. */
1237         segment: ChannelSegment<E>,
1238         index: Int,
1239         /* The global index of the cell. */
1240         b: Long
1241     ): Boolean {
1242         // This is a fast-path of `updateCellExpandBufferSlow(..)`.
1243         //
1244         // Read the current cell state.
1245         val state = segment.getState(index)
1246         if (state is Waiter) {
1247             // Usually, a sender is stored in the cell.
1248             // However, it is possible for a concurrent
1249             // receiver to be already suspended there.
1250             // Try to distinguish whether the waiter is a
1251             // sender by comparing the global cell index with
1252             // the `receivers` counter. In case the cell is not
1253             // covered by a receiver, a sender is stored in the cell.
1254             if (b >= receivers.value) {
1255                 // The cell stores a suspended sender. Try to resume it.
1256                 // To synchronize with a concurrent `receive()`, the algorithm
1257                 // first moves the cell state to an intermediate `RESUMING_BY_EB`
1258                 // state, updating it to either `BUFFERED` (on successful resumption)
1259                 // or `INTERRUPTED_SEND` (on failure).
1260                 if (segment.casState(index, state, RESUMING_BY_EB)) {
1261                     return if (state.tryResumeSender(segment, index)) {
1262                         // The sender has been resumed successfully!
1263                         // Move the cell to the logical buffer and finish.
1264                         segment.setState(index, BUFFERED)
1265                         true
1266                     } else {
1267                         // The resumption has failed.
1268                         segment.setState(index, INTERRUPTED_SEND)
1269                         segment.onCancelledRequest(index, false)
1270                         false
1271                     }
1272                 }
1273             }
1274         }
1275         return updateCellExpandBufferSlow(segment, index, b)
1276     }
1277 
1278     private fun updateCellExpandBufferSlow(
1279         /* The working cell is specified by
1280         the segment and the index in it. */
1281         segment: ChannelSegment<E>,
1282         index: Int,
1283         /* The global index of the cell. */
1284         b: Long
1285     ): Boolean {
1286         // Update the cell state according to its state machine.
1287         // See the paper mentioned in the very beginning for
1288         // the cell life-cycle and the algorithm details.
1289         while (true) {
1290             // Read the current cell state.
1291             val state = segment.getState(index)
1292             when {
1293                 // A suspended waiter, sender or receiver.
1294                 state is Waiter -> {
1295                     // Usually, a sender is stored in the cell.
1296                     // However, it is possible for a concurrent
1297                     // receiver to be already suspended there.
1298                     // Try to distinguish whether the waiter is a
1299                     // sender by comparing the global cell index with
1300                     // the `receivers` counter. In case the cell is not
1301                     // covered by a receiver, a sender is stored in the cell.
1302                     if (b < receivers.value) {
1303                         // The algorithm cannot distinguish whether the
1304                         // suspended in the cell operation is sender or receiver.
1305                         // To make progress, `expandBuffer()` delegates its completion
1306                         // to an upcoming pairwise request, atomically wrapping
1307                         // the waiter in `WaiterEB`. In case a sender is stored
1308                         // in the cell, the upcoming receiver will call `expandBuffer()`
1309                         // if the sender resumption fails; thus, effectively, skipping
1310                         // this cell. Otherwise, if a receiver is stored in the cell,
1311                         // this `expandBuffer()` procedure must finish; therefore,
1312                         // sender ignore the `WaiterEB` wrapper.
1313                         if (segment.casState(index, state, WaiterEB(waiter = state)))
1314                             return true
1315                     } else {
1316                         // The cell stores a suspended sender. Try to resume it.
1317                         // To synchronize with a concurrent `receive()`, the algorithm
1318                         // first moves the cell state to an intermediate `RESUMING_BY_EB`
1319                         // state, updating it to either `BUFFERED` (on successful resumption)
1320                         // or `INTERRUPTED_SEND` (on failure).
1321                         if (segment.casState(index, state, RESUMING_BY_EB)) {
1322                             return if (state.tryResumeSender(segment, index)) {
1323                                 // The sender has been resumed successfully!
1324                                 // Move the cell to the logical buffer and finish.
1325                                 segment.setState(index, BUFFERED)
1326                                 true
1327                             } else {
1328                                 // The resumption has failed.
1329                                 segment.setState(index, INTERRUPTED_SEND)
1330                                 segment.onCancelledRequest(index, false)
1331                                 false
1332                             }
1333                         }
1334                     }
1335                 }
1336                 // The cell stores an interrupted sender, skip it.
1337                 state === INTERRUPTED_SEND -> return false
1338                 // The cell is empty, a concurrent sender is coming.
1339                 state === null -> {
1340                     // To inform a concurrent sender that this cell is
1341                     // already a part of the buffer, the algorithm moves
1342                     // it to a special `IN_BUFFER` state.
1343                     if (segment.casState(index, state, IN_BUFFER)) return true
1344                 }
1345                 // The cell is already a part of the buffer, finish.
1346                 state === BUFFERED -> return true
1347                 // The cell is already processed by a receiver, no further action is required.
1348                 state === POISONED || state === DONE_RCV || state === INTERRUPTED_RCV -> return true
1349                 // The channel is closed, all the following
1350                 // cells are already in the same state, finish.
1351                 state === CHANNEL_CLOSED -> return true
1352                 // A concurrent receiver is resuming the suspended sender.
1353                 // Wait in a spin-loop until it changes the cell state
1354                 // to either `DONE_RCV` or `INTERRUPTED_SEND`.
1355                 state === RESUMING_BY_RCV -> continue // spin wait
1356                 else -> error("Unexpected cell state: $state")
1357             }
1358         }
1359     }
1360 
1361     /**
1362      * Increments the counter of completed [expandBuffer] invocations.
1363      * To guarantee starvation-freedom for [waitExpandBufferCompletion],
1364      * which waits until the counters of started and completed [expandBuffer] calls
1365      * coincide and become greater or equal to the specified value,
1366      * [waitExpandBufferCompletion] may set a flag that pauses further progress.
1367      */
1368     private fun incCompletedExpandBufferAttempts(nAttempts: Long = 1) {
1369         // Increment the number of completed `expandBuffer()` calls.
1370         completedExpandBuffersAndPauseFlag.addAndGet(nAttempts).also {
1371             // Should further `expandBuffer()`-s be paused?
1372             // If so, this thread should wait in a spin-loop
1373             // until the flag is unset.
1374             if (it.ebPauseExpandBuffers) {
1375                 @Suppress("ControlFlowWithEmptyBody")
1376                 while (completedExpandBuffersAndPauseFlag.value.ebPauseExpandBuffers) {}
1377             }
1378         }
1379     }
1380 
1381     /**
1382      * Waits in a spin-loop until the [expandBuffer] call that
1383      * should process the [globalIndex]-th cell is completed.
1384      * Essentially, it waits until the numbers of started ([bufferEnd])
1385      * and completed ([completedExpandBuffersAndPauseFlag]) [expandBuffer]
1386      * attempts coincide and become equal or greater than [globalIndex].
1387      * To avoid starvation, this function may set a flag
1388      * that pauses further progress.
1389      */
1390     internal fun waitExpandBufferCompletion(globalIndex: Long) {
1391         // Do nothing if this channel is rendezvous or unlimited;
1392         // `expandBuffer()` is not used in these cases.
1393         if (isRendezvousOrUnlimited) return
1394         // Wait in an infinite loop until the number of started
1395         // buffer expansion calls become not lower than the cell index.
1396         @Suppress("ControlFlowWithEmptyBody")
1397         while (bufferEndCounter <= globalIndex) {}
1398         // Now it is guaranteed that the `expandBuffer()` call that
1399         // should process the required cell has been started.
1400         // Wait in a fixed-size spin-loop until the numbers of
1401         // started and completed buffer expansion calls coincide.
1402         repeat(EXPAND_BUFFER_COMPLETION_WAIT_ITERATIONS) {
1403             // Read the number of started buffer expansion calls.
1404             val b = bufferEndCounter
1405             // Read the number of completed buffer expansion calls.
1406             val ebCompleted = completedExpandBuffersAndPauseFlag.value.ebCompletedCounter
1407             // Do the numbers of started and completed calls coincide?
1408             // Note that we need to re-read the number of started `expandBuffer()`
1409             // calls to obtain a correct snapshot.
1410             // Here we wait to a precise match in order to ensure that **our matching expandBuffer()**
1411             // completed. The only way to ensure that is to check that number of started expands == number of finished expands
1412             if (b == ebCompleted && b == bufferEndCounter) return
1413         }
1414         // To avoid starvation, pause further `expandBuffer()` calls.
1415         completedExpandBuffersAndPauseFlag.update {
1416             constructEBCompletedAndPauseFlag(it.ebCompletedCounter, true)
1417         }
1418         // Now wait in an infinite spin-loop until the counters coincide.
1419         while (true) {
1420             // Read the number of started buffer expansion calls.
1421             val b = bufferEndCounter
1422             // Read the number of completed buffer expansion calls
1423             // along with the flag that pauses further progress.
1424             val ebCompletedAndBit = completedExpandBuffersAndPauseFlag.value
1425             val ebCompleted = ebCompletedAndBit.ebCompletedCounter
1426             val pauseExpandBuffers = ebCompletedAndBit.ebPauseExpandBuffers
1427             // Do the numbers of started and completed calls coincide?
1428             // Note that we need to re-read the number of started `expandBuffer()`
1429             // calls to obtain a correct snapshot.
1430             if (b == ebCompleted && b == bufferEndCounter) {
1431                 // Unset the flag, which pauses progress, and finish.
1432                 completedExpandBuffersAndPauseFlag.update {
1433                     constructEBCompletedAndPauseFlag(it.ebCompletedCounter, false)
1434                 }
1435                 return
1436             }
1437             // It is possible that a concurrent caller of this function
1438             // has unset the flag, which pauses further progress to avoid
1439             // starvation. In this case, set the flag back.
1440             if (!pauseExpandBuffers) {
1441                 completedExpandBuffersAndPauseFlag.compareAndSet(
1442                     ebCompletedAndBit,
1443                     constructEBCompletedAndPauseFlag(ebCompleted, true)
1444                 )
1445             }
1446         }
1447     }
1448 
1449 
1450     // #######################
1451     // ## Select Expression ##
1452     // #######################
1453 
1454     @Suppress("UNCHECKED_CAST")
1455     override val onSend: SelectClause2<E, BufferedChannel<E>>
1456         get() = SelectClause2Impl(
1457             clauseObject = this@BufferedChannel,
1458             regFunc = BufferedChannel<*>::registerSelectForSend as RegistrationFunction,
1459             processResFunc = BufferedChannel<*>::processResultSelectSend as ProcessResultFunction
1460         )
1461 
1462     @Suppress("UNCHECKED_CAST")
1463     protected open fun registerSelectForSend(select: SelectInstance<*>, element: Any?) =
1464         sendImpl( // <-- this is an inline function
1465             element = element as E,
1466             waiter = select,
1467             onRendezvousOrBuffered = { select.selectInRegistrationPhase(Unit) },
1468             onSuspend = { _, _ -> },
1469             onClosed = { onClosedSelectOnSend(element, select) }
1470         )
1471 
1472 
1473     private fun onClosedSelectOnSend(element: E, select: SelectInstance<*>) {
1474         onUndeliveredElement?.callUndeliveredElement(element, select.context)
1475         select.selectInRegistrationPhase(CHANNEL_CLOSED)
1476     }
1477 
1478     @Suppress("UNUSED_PARAMETER", "RedundantNullableReturnType")
1479     private fun processResultSelectSend(ignoredParam: Any?, selectResult: Any?): Any? =
1480         if (selectResult === CHANNEL_CLOSED) throw sendException
1481         else this
1482 
1483     @Suppress("UNCHECKED_CAST")
1484     override val onReceive: SelectClause1<E>
1485         get() = SelectClause1Impl(
1486             clauseObject = this@BufferedChannel,
1487             regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1488             processResFunc = BufferedChannel<*>::processResultSelectReceive as ProcessResultFunction,
1489             onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1490         )
1491 
1492     @Suppress("UNCHECKED_CAST")
1493     override val onReceiveCatching: SelectClause1<ChannelResult<E>>
1494         get() = SelectClause1Impl(
1495             clauseObject = this@BufferedChannel,
1496             regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1497             processResFunc = BufferedChannel<*>::processResultSelectReceiveCatching as ProcessResultFunction,
1498             onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1499         )
1500 
1501     @Suppress("OVERRIDE_DEPRECATION", "UNCHECKED_CAST")
1502     override val onReceiveOrNull: SelectClause1<E?>
1503         get() = SelectClause1Impl(
1504             clauseObject = this@BufferedChannel,
1505             regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1506             processResFunc = BufferedChannel<*>::processResultSelectReceiveOrNull as ProcessResultFunction,
1507             onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1508         )
1509 
1510     @Suppress("UNUSED_PARAMETER")
1511     private fun registerSelectForReceive(select: SelectInstance<*>, ignoredParam: Any?) =
1512         receiveImpl( // <-- this is an inline function
1513             waiter = select,
1514             onElementRetrieved = { elem -> select.selectInRegistrationPhase(elem) },
1515             onSuspend = { _, _, _ -> },
1516             onClosed = { onClosedSelectOnReceive(select) }
1517         )
1518 
1519     private fun onClosedSelectOnReceive(select: SelectInstance<*>) {
1520         select.selectInRegistrationPhase(CHANNEL_CLOSED)
1521     }
1522 
1523     @Suppress("UNUSED_PARAMETER")
1524     private fun processResultSelectReceive(ignoredParam: Any?, selectResult: Any?): Any? =
1525         if (selectResult === CHANNEL_CLOSED) throw receiveException
1526         else selectResult
1527 
1528     @Suppress("UNUSED_PARAMETER")
1529     private fun processResultSelectReceiveOrNull(ignoredParam: Any?, selectResult: Any?): Any? =
1530         if (selectResult === CHANNEL_CLOSED) {
1531             if (closeCause == null) null
1532             else throw receiveException
1533         } else selectResult
1534 
1535     @Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER", "RedundantNullableReturnType")
1536     private fun processResultSelectReceiveCatching(ignoredParam: Any?, selectResult: Any?): Any? =
1537         if (selectResult === CHANNEL_CLOSED) closed(closeCause)
1538         else success(selectResult as E)
1539 
1540     @Suppress("UNCHECKED_CAST")
1541     private val onUndeliveredElementReceiveCancellationConstructor: OnCancellationConstructor? = onUndeliveredElement?.let {
1542         { select: SelectInstance<*>, _: Any?, element: Any? ->
1543             { if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(element as E, select.context) }
1544         }
1545     }
1546 
1547     // ######################
1548     // ## Iterator Support ##
1549     // ######################
1550 
1551     override fun iterator(): ChannelIterator<E> = BufferedChannelIterator()
1552 
1553     /**
1554      * The key idea is that an iterator is a special receiver type,
1555      * which should be resumed differently to [receive] and [onReceive]
1556      * operations, but can be served as a waiter in a way similar to
1557      * [CancellableContinuation] and [SelectInstance].
1558      *
1559      * Roughly, [hasNext] is a [receive] sibling, while [next] simply
1560      * returns the already retrieved element. From the implementation
1561      * side, [receiveResult] stores the element retrieved by [hasNext]
1562      * (or a special [CHANNEL_CLOSED] token if the channel is closed).
1563      *
1564      * The [invoke] function is a [CancelHandler] implementation,
1565      * which requires knowing the [segment] and the [index] in it
1566      * that specify the location of the stored iterator.
1567      *
1568      * To resume the suspended [hasNext] call, a special [tryResumeHasNext]
1569      * function should be used in a way similar to [CancellableContinuation.tryResume]
1570      * and [SelectInstance.trySelect]. When the channel becomes closed,
1571      * [tryResumeHasNextOnClosedChannel] should be used instead.
1572      */
1573     private inner class BufferedChannelIterator : ChannelIterator<E>, Waiter {
1574         /**
1575          * Stores the element retrieved by [hasNext] or
1576          * a special [CHANNEL_CLOSED] token if this channel is closed.
1577          * If [hasNext] has not been invoked yet, [NO_RECEIVE_RESULT] is stored.
1578          */
1579         private var receiveResult: Any? = NO_RECEIVE_RESULT
1580 
1581         /**
1582          * When [hasNext] suspends, this field stores the corresponding
1583          * continuation. The [tryResumeHasNext] and [tryResumeHasNextOnClosedChannel]
1584          * function resume this continuation when the [hasNext] invocation should complete.
1585          */
1586         private var continuation: CancellableContinuationImpl<Boolean>? = null
1587 
1588         // `hasNext()` is just a special receive operation.
1589         override suspend fun hasNext(): Boolean =
1590             receiveImpl( // <-- this is an inline function
1591                 // Do not create a continuation until it is required;
1592                 // it is created later via [onNoWaiterSuspend], if needed.
1593                 waiter = null,
1594                 // Store the received element in `receiveResult` on successful
1595                 // retrieval from the buffer or rendezvous with a suspended sender.
1596                 // Also, inform the `BufferedChannel` extensions that
1597                 // the synchronization of this receive operation is completed.
1598                 onElementRetrieved = { element ->
1599                     this.receiveResult = element
1600                     true
1601                 },
1602                 // As no waiter is provided, suspension is impossible.
1603                 onSuspend = { _, _, _ -> error("unreachable") },
1604                 // Return `false` or throw an exception if the channel is already closed.
1605                 onClosed = { onClosedHasNext() },
1606                 // If `hasNext()` decides to suspend, the corresponding
1607                 // `suspend` function that creates a continuation is called.
1608                 // The tail-call optimization is applied here.
1609                 onNoWaiterSuspend = { segm, i, r -> return hasNextOnNoWaiterSuspend(segm, i, r) }
1610             )
1611 
1612         private fun onClosedHasNext(): Boolean {
1613             this.receiveResult = CHANNEL_CLOSED
1614             val cause = closeCause ?: return false
1615             throw recoverStackTrace(cause)
1616         }
1617 
1618         private suspend fun hasNextOnNoWaiterSuspend(
1619             /* The working cell is specified by
1620             the segment and the index in it. */
1621             segment: ChannelSegment<E>,
1622             index: Int,
1623             /* The global index of the cell. */
1624             r: Long
1625         ): Boolean = suspendCancellableCoroutineReusable { cont ->
1626             this.continuation = cont
1627             receiveImplOnNoWaiter( // <-- this is an inline function
1628                 segment = segment, index = index, r = r,
1629                 waiter = this, // store this iterator as a waiter
1630                 // In case of successful element retrieval, store
1631                 // it in `receiveResult` and resume the continuation.
1632                 // Importantly, the receiver coroutine may be cancelled
1633                 // after it is successfully resumed but not dispatched yet.
1634                 // In case `onUndeliveredElement` is present, we must
1635                 // invoke it in the latter case.
1636                 onElementRetrieved = { element ->
1637                     this.receiveResult = element
1638                     this.continuation = null
1639                     cont.resume(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
1640                 },
1641                 onClosed = { onClosedHasNextNoWaiterSuspend() }
1642             )
1643         }
1644 
1645         override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
1646             this.continuation?.invokeOnCancellation(segment, index)
1647         }
1648 
1649         private fun onClosedHasNextNoWaiterSuspend() {
1650             // Read the current continuation and clean
1651             // the corresponding field to avoid memory leaks.
1652             val cont = this.continuation!!
1653             this.continuation = null
1654             // Update the `hasNext()` internal result.
1655             this.receiveResult = CHANNEL_CLOSED
1656             // If this channel was closed without exception,
1657             // `hasNext()` should return `false`; otherwise,
1658             // it throws the closing exception.
1659             val cause = closeCause
1660             if (cause == null) {
1661                 cont.resume(false)
1662             } else {
1663                 cont.resumeWithException(recoverStackTrace(cause, cont))
1664             }
1665         }
1666 
1667         @Suppress("UNCHECKED_CAST")
1668         override fun next(): E {
1669             // Read the already received result, or [NO_RECEIVE_RESULT] if [hasNext] has not been invoked yet.
1670             val result = receiveResult
1671             check(result !== NO_RECEIVE_RESULT) { "`hasNext()` has not been invoked" }
1672             receiveResult = NO_RECEIVE_RESULT
1673             // Is this channel closed?
1674             if (result === CHANNEL_CLOSED) throw recoverStackTrace(receiveException)
1675             // Return the element.
1676             return result as E
1677         }
1678 
1679         fun tryResumeHasNext(element: E): Boolean {
1680             // Read the current continuation and clean
1681             // the corresponding field to avoid memory leaks.
1682             val cont = this.continuation!!
1683             this.continuation = null
1684             // Store the retrieved element in `receiveResult`.
1685             this.receiveResult = element
1686             // Try to resume this `hasNext()`. Importantly, the receiver coroutine
1687             // may be cancelled after it is successfully resumed but not dispatched yet.
1688             // In case `onUndeliveredElement` is specified, we need to invoke it in the latter case.
1689             return cont.tryResume0(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
1690         }
1691 
1692         fun tryResumeHasNextOnClosedChannel() {
1693             // Read the current continuation and clean
1694             // the corresponding field to avoid memory leaks.
1695             val cont = this.continuation!!
1696             this.continuation = null
1697             // Update the `hasNext()` internal result and inform
1698             // `BufferedChannel` extensions that synchronization
1699             // of this receive operation is completed.
1700             this.receiveResult = CHANNEL_CLOSED
1701             // If this channel was closed without exception,
1702             // `hasNext()` should return `false`; otherwise,
1703             // it throws the closing exception.
1704             val cause = closeCause
1705             if (cause == null) {
1706                 cont.resume(false)
1707             } else {
1708                 cont.resumeWithException(recoverStackTrace(cause, cont))
1709             }
1710         }
1711     }
1712 
1713     // ##############################
1714     // ## Closing and Cancellation ##
1715     // ##############################
1716 
1717     /**
1718      * Store the cause of closing this channel, either via [close] or [cancel] call.
1719      * The closing cause can be set only once.
1720      */
1721     private val _closeCause = atomic<Any?>(NO_CLOSE_CAUSE)
1722     // Should be called only if this channel is closed or cancelled.
1723     protected val closeCause get() = _closeCause.value as Throwable?
1724 
1725     /** Returns the closing cause if it is non-null, or [ClosedSendChannelException] otherwise. */
1726     protected val sendException get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
1727 
1728     /** Returns the closing cause if it is non-null, or [ClosedReceiveChannelException] otherwise. */
1729     private val receiveException get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1730 
1731     /**
1732       Stores the closed handler installed by [invokeOnClose].
1733       To synchronize [invokeOnClose] and [close], two additional
1734       marker states, [CLOSE_HANDLER_INVOKED] and [CLOSE_HANDLER_CLOSED]
1735       are used. The resulting state diagram is presented below.
1736 
1737       +------+  install handler  +---------+  close(..)  +---------+
1738       | null |------------------>| handler |------------>| INVOKED |
1739       +------+                   +---------+             +---------+
1740          |
1741          | close(..)  +--------+
1742          +----------->| CLOSED |
1743                       +--------+
1744      */
1745     private val closeHandler = atomic<Any?>(null)
1746 
1747     /**
1748      * Invoked when channel is closed as the last action of [close] invocation.
1749      * This method should be idempotent and can be called multiple times.
1750      */
1751     protected open fun onClosedIdempotent() {}
1752 
1753     override fun close(cause: Throwable?): Boolean =
1754         closeOrCancelImpl(cause, cancel = false)
1755 
1756     @Suppress("OVERRIDE_DEPRECATION")
1757     final override fun cancel(cause: Throwable?): Boolean = cancelImpl(cause)
1758 
1759     @Suppress("OVERRIDE_DEPRECATION")
1760     final override fun cancel() { cancelImpl(null) }
1761 
1762     final override fun cancel(cause: CancellationException?) { cancelImpl(cause) }
1763 
1764     internal open fun cancelImpl(cause: Throwable?): Boolean =
1765         closeOrCancelImpl(cause ?: CancellationException("Channel was cancelled"), cancel = true)
1766 
1767     /**
1768      * This is a common implementation for [close] and [cancel]. It first tries
1769      * to install the specified cause; the invocation that successfully installs
1770      * the cause returns `true` as a results of this function, while all further
1771      * [close] and [cancel] calls return `false`.
1772      *
1773      * After the closing/cancellation cause is installed, the channel should be marked
1774      * as closed or cancelled, which bounds further `send(e)`-s to fails.
1775      *
1776      * Then, [completeCloseOrCancel] is called, which cancels waiting `receive()`
1777      * requests ([cancelSuspendedReceiveRequests]) and removes unprocessed elements
1778      * ([removeUnprocessedElements]) in case this channel is cancelled.
1779      *
1780      * Finally, if this [closeOrCancelImpl] has installed the cause, therefore,
1781      * has closed the channel, [closeHandler] and [onClosedIdempotent] should be invoked.
1782      */
1783     protected open fun closeOrCancelImpl(cause: Throwable?, cancel: Boolean): Boolean {
1784         // If this is a `cancel(..)` invocation, set a bit that the cancellation
1785         // has been started. This is crucial for ensuring linearizability,
1786         // when concurrent `close(..)` and `isClosedFor[Send,Receive]` operations
1787         // help this `cancel(..)`.
1788         if (cancel) markCancellationStarted()
1789         // Try to install the specified cause. On success, this invocation will
1790         // return `true` as a result; otherwise, it will complete with `false`.
1791         val closedByThisOperation = _closeCause.compareAndSet(NO_CLOSE_CAUSE, cause)
1792         // Mark this channel as closed or cancelled, depending on this operation type.
1793         if (cancel) markCancelled() else markClosed()
1794         // Complete the closing or cancellation procedure.
1795         completeCloseOrCancel()
1796         // Finally, if this operation has installed the cause,
1797         // it should invoke the close handlers.
1798         return closedByThisOperation.also {
1799             onClosedIdempotent()
1800             if (it) invokeCloseHandler()
1801         }
1802     }
1803 
1804     /**
1805      * Invokes the installed close handler,
1806      * updating the [closeHandler] state correspondingly.
1807      */
1808     private fun invokeCloseHandler() {
1809         val closeHandler = closeHandler.getAndUpdate {
1810             if (it === null) {
1811                 // Inform concurrent `invokeOnClose`
1812                 // that this channel is already closed.
1813                 CLOSE_HANDLER_CLOSED
1814             } else {
1815                 // Replace the handler with a special
1816                 // `INVOKED` marker to avoid memory leaks.
1817                 CLOSE_HANDLER_INVOKED
1818             }
1819         } ?: return // no handler was installed, finish.
1820         // Invoke the handler.
1821         @Suppress("UNCHECKED_CAST")
1822         closeHandler as (cause: Throwable?) -> Unit
1823         closeHandler(closeCause)
1824     }
1825 
1826     override fun invokeOnClose(handler: (cause: Throwable?) -> Unit) {
1827         // Try to install the handler, finishing on success.
1828         if (closeHandler.compareAndSet(null, handler)) {
1829             // Handler has been successfully set, finish the operation.
1830             return
1831         }
1832         // Either another handler is already set, or this channel is closed.
1833         // In the latter case, the current handler should be invoked.
1834         // However, the implementation must ensure that at most one
1835         // handler is called, throwing an `IllegalStateException`
1836         // if another close handler has been invoked.
1837         closeHandler.loop { cur ->
1838             when {
1839                 cur === CLOSE_HANDLER_CLOSED -> {
1840                     // Try to update the state from `CLOSED` to `INVOKED`.
1841                     // This is crucial to guarantee that at most one handler can be called.
1842                     // On success, invoke the handler and finish.
1843                     if (closeHandler.compareAndSet(CLOSE_HANDLER_CLOSED, CLOSE_HANDLER_INVOKED)) {
1844                         handler(closeCause)
1845                         return
1846                     }
1847                 }
1848                 cur === CLOSE_HANDLER_INVOKED -> error("Another handler was already registered and successfully invoked")
1849                 else -> error("Another handler is already registered: $cur")
1850             }
1851         }
1852     }
1853 
1854     /**
1855      * Marks this channel as closed.
1856      * In case [cancelImpl] has already been invoked,
1857      * and this channel is marked with [CLOSE_STATUS_CANCELLATION_STARTED],
1858      * this function marks the channel as cancelled.
1859      *
1860      * All operation that notice this channel in the closed state,
1861      * must help to complete the closing via [completeCloseOrCancel].
1862      */
1863     private fun markClosed(): Unit =
1864         sendersAndCloseStatus.update { cur ->
1865             when (cur.sendersCloseStatus) {
1866                 CLOSE_STATUS_ACTIVE -> // the channel is neither closed nor cancelled
1867                     constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CLOSED)
1868                 CLOSE_STATUS_CANCELLATION_STARTED -> // the channel is going to be cancelled
1869                     constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLED)
1870                 else -> return // the channel is already marked as closed or cancelled.
1871             }
1872         }
1873 
1874     /**
1875      * Marks this channel as cancelled.
1876      *
1877      * All operation that notice this channel in the cancelled state,
1878      * must help to complete the cancellation via [completeCloseOrCancel].
1879      */
1880     private fun markCancelled(): Unit =
1881         sendersAndCloseStatus.update { cur ->
1882             constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLED)
1883         }
1884 
1885     /**
1886      * When the cancellation procedure starts, it is critical
1887      * to mark the closing status correspondingly. Thus, other
1888      * operations, which may help to complete the cancellation,
1889      * always correctly update the status to `CANCELLED`.
1890      */
1891     private fun markCancellationStarted(): Unit =
1892         sendersAndCloseStatus.update { cur ->
1893             if (cur.sendersCloseStatus == CLOSE_STATUS_ACTIVE)
1894                 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLATION_STARTED)
1895             else return // this channel is already closed or cancelled
1896         }
1897 
1898     /**
1899      * Completes the started [close] or [cancel] procedure.
1900      */
1901     private fun completeCloseOrCancel() {
1902         isClosedForSend // must finish the started close/cancel if one is detected.
1903     }
1904 
1905     protected open val isConflatedDropOldest get() = false
1906 
1907     /**
1908      * Completes the channel closing procedure.
1909      */
1910     private fun completeClose(sendersCur: Long): ChannelSegment<E> {
1911         // Close the linked list for further segment addition,
1912         // obtaining the last segment in the data structure.
1913         val lastSegment = closeLinkedList()
1914         // In the conflated channel implementation (with the DROP_OLDEST
1915         // elements conflation strategy), it is critical to mark all empty
1916         // cells as closed to prevent in-progress `send(e)`-s, which have not
1917         // put their elements yet, completions after this channel is closed.
1918         // Otherwise, it is possible for a `send(e)` to put an element when
1919         // the buffer is already full, while a concurrent receiver may extract
1920         // the oldest element. When the channel is not closed, we can linearize
1921         // this `receive()` before the `send(e)`, but after the channel is closed,
1922         // `send(e)` must fails. Marking all unprocessed cells as `CLOSED` solves the issue.
1923         if (isConflatedDropOldest) {
1924             val lastBufferedCellGlobalIndex = markAllEmptyCellsAsClosed(lastSegment)
1925             if (lastBufferedCellGlobalIndex != -1L)
1926                 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(lastBufferedCellGlobalIndex)
1927         }
1928         // Resume waiting `receive()` requests,
1929         // informing them that the channel is closed.
1930         cancelSuspendedReceiveRequests(lastSegment, sendersCur)
1931         // Return the last segment in the linked list as a result
1932         // of this function; we need it in `completeCancel(..)`.
1933         return lastSegment
1934     }
1935 
1936     /**
1937      * Completes the channel cancellation procedure.
1938      */
1939     private fun completeCancel(sendersCur: Long) {
1940         // First, ensure that this channel is closed,
1941         // obtaining the last segment in the linked list.
1942         val lastSegment = completeClose(sendersCur)
1943         // Cancel suspended `send(e)` requests and
1944         // remove buffered elements in the reverse order.
1945         removeUnprocessedElements(lastSegment)
1946     }
1947 
1948     /**
1949      * Closes the underlying linked list of segments for further segment addition.
1950      */
1951     private fun closeLinkedList(): ChannelSegment<E> {
1952         // Choose the last segment.
1953         var lastSegment = bufferEndSegment.value
1954         sendSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
1955         receiveSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
1956         // Close the linked list of segment for new segment addition
1957         // and return the last segment in the linked list.
1958         return lastSegment.close()
1959     }
1960 
1961     /**
1962      * This function marks all empty cells, in the `null` and [IN_BUFFER] state,
1963      * as closed. Notably, it processes the cells from right to left, and finishes
1964      * immediately when the processing cell is already covered by `receive()` or
1965      * contains a buffered elements ([BUFFERED] state).
1966      *
1967      * This function returns the global index of the last buffered element,
1968      * or `-1` if this channel does not contain buffered elements.
1969      */
1970     private fun markAllEmptyCellsAsClosed(lastSegment: ChannelSegment<E>): Long {
1971         // Process the cells in reverse order, from right to left.
1972         var segment = lastSegment
1973         while (true) {
1974             for (index in SEGMENT_SIZE - 1 downTo 0) {
1975                 // Is this cell already covered by `receive()`?
1976                 val globalIndex = segment.id * SEGMENT_SIZE + index
1977                 if (globalIndex < receiversCounter) return -1
1978                 // Process the cell `segment[index]`.
1979                 cell_update@ while (true) {
1980                     val state = segment.getState(index)
1981                     when {
1982                         // The cell is empty.
1983                         state === null || state === IN_BUFFER -> {
1984                             // Inform a possibly upcoming sender that this channel is already closed.
1985                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
1986                                 segment.onSlotCleaned()
1987                                 break@cell_update
1988                             }
1989                         }
1990                         // The cell stores a buffered element.
1991                         state === BUFFERED -> return globalIndex
1992                         // Skip this cell if it is not empty and does not store a buffered element.
1993                         else -> break@cell_update
1994                     }
1995                 }
1996             }
1997             // Process the next segment, finishing if the linked list ends.
1998             segment = segment.prev ?: return -1
1999         }
2000     }
2001 
2002     /**
2003      * Cancels suspended `send(e)` requests and removes buffered elements
2004      * starting from the last cell in the specified [lastSegment] (it must
2005      * be the physical tail of the underlying linked list) and updating
2006      * the cells in reverse order.
2007      */
2008     private fun removeUnprocessedElements(lastSegment: ChannelSegment<E>) {
2009         // Read the `onUndeliveredElement` lambda at once. In case it
2010         // throws an exception, this exception is handled and stored in
2011         // the variable below. If multiple exceptions are thrown, the first
2012         // one is stored in the variable, while the others are suppressed.
2013         val onUndeliveredElement = onUndeliveredElement
2014         var undeliveredElementException: UndeliveredElementException? = null // first cancel exception, others suppressed
2015         // To perform synchronization correctly, it is critical to
2016         // process the cells in reverse order, from right to left.
2017         // However, according to the API, suspended senders should
2018         // be cancelled in the order of their suspension. Therefore,
2019         // we need to collect all of them and cancel in the reverse
2020         // order after that.
2021         var suspendedSenders = InlineList<Waiter>()
2022         var segment = lastSegment
2023         process_segments@ while (true) {
2024             for (index in SEGMENT_SIZE - 1 downTo 0) {
2025                 // Process the cell `segment[index]`.
2026                 val globalIndex = segment.id * SEGMENT_SIZE + index
2027                 // Update the cell state.
2028                 update_cell@ while (true) {
2029                     // Read the current state of the cell.
2030                     val state = segment.getState(index)
2031                     when {
2032                         // The cell is already processed by a receiver.
2033                         state === DONE_RCV -> break@process_segments
2034                         // The cell stores a buffered element.
2035                         state === BUFFERED -> {
2036                             // Is the cell already covered by a receiver?
2037                             if (globalIndex < receiversCounter) break@process_segments
2038                             // Update the cell state to `CHANNEL_CLOSED`.
2039                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2040                                 // If `onUndeliveredElement` lambda is non-null, call it.
2041                                 if (onUndeliveredElement != null) {
2042                                     val element = segment.getElement(index)
2043                                     undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(element, undeliveredElementException)
2044                                 }
2045                                 // Clean the element field and inform the segment
2046                                 // that the slot is cleaned to avoid memory leaks.
2047                                 segment.cleanElement(index)
2048                                 segment.onSlotCleaned()
2049                                 break@update_cell
2050                             }
2051                         }
2052                         // The cell is empty.
2053                         state === IN_BUFFER || state === null -> {
2054                             // Update the cell state to `CHANNEL_CLOSED`.
2055                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2056                                 // Inform the segment that the slot is cleaned to avoid memory leaks.
2057                                 segment.onSlotCleaned()
2058                                 break@update_cell
2059                             }
2060                         }
2061                         // The cell stores a suspended waiter.
2062                         state is Waiter || state is WaiterEB -> {
2063                             // Is the cell already covered by a receiver?
2064                             if (globalIndex < receiversCounter) break@process_segments
2065                             // Obtain the sender.
2066                             val sender: Waiter = if (state is WaiterEB) state.waiter
2067                                                  else state as Waiter
2068                             // Update the cell state to `CHANNEL_CLOSED`.
2069                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2070                                 // If `onUndeliveredElement` lambda is non-null, call it.
2071                                 if (onUndeliveredElement != null) {
2072                                     val element = segment.getElement(index)
2073                                     undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(element, undeliveredElementException)
2074                                 }
2075                                 // Save the sender for further cancellation.
2076                                 suspendedSenders += sender
2077                                 // Clean the element field and inform the segment
2078                                 // that the slot is cleaned to avoid memory leaks.
2079                                 segment.cleanElement(index)
2080                                 segment.onSlotCleaned()
2081                                 break@update_cell
2082                             }
2083                         }
2084                         // A concurrent receiver is resuming a suspended sender.
2085                         // As the cell is covered by a receiver, finish immediately.
2086                         state === RESUMING_BY_EB || state === RESUMING_BY_RCV -> break@process_segments
2087                         // A concurrent `expandBuffer()` is resuming a suspended sender.
2088                         // Wait in a spin-loop until the cell state changes.
2089                         state === RESUMING_BY_EB -> continue@update_cell
2090                         else -> break@update_cell
2091                     }
2092                 }
2093             }
2094             // Process the previous segment.
2095             segment = segment.prev ?: break
2096         }
2097         // Cancel suspended senders in their order of addition to this channel.
2098         suspendedSenders.forEachReversed { it.resumeSenderOnCancelledChannel() }
2099         // Throw `UndeliveredElementException` at the end if there was one.
2100         undeliveredElementException?.let { throw it }
2101     }
2102 
2103     /**
2104      * Cancels suspended `receive` requests from the end to the beginning,
2105      * also moving empty cells to the `CHANNEL_CLOSED` state.
2106      */
2107     private fun cancelSuspendedReceiveRequests(lastSegment: ChannelSegment<E>, sendersCounter: Long) {
2108         // To perform synchronization correctly, it is critical to
2109         // extract suspended requests in the reverse order,
2110         // from the end to the beginning.
2111         // However, according to the API, they should be cancelled
2112         // in the order of their suspension. Therefore, we need to
2113         // collect the suspended requests first, cancelling them
2114         // in the reverse order after that.
2115         var suspendedReceivers = InlineList<Waiter>()
2116         var segment: ChannelSegment<E>? = lastSegment
2117         process_segments@ while (segment != null) {
2118             for (index in SEGMENT_SIZE - 1 downTo 0) {
2119                 // Is the cell already covered by a sender? Finish immediately in this case.
2120                 if (segment.id * SEGMENT_SIZE + index < sendersCounter) break@process_segments
2121                 // Try to move the cell state to `CHANNEL_CLOSED`.
2122                 cell_update@ while (true) {
2123                     val state = segment.getState(index)
2124                     when {
2125                         state === null || state === IN_BUFFER -> {
2126                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2127                                 segment.onSlotCleaned()
2128                                 break@cell_update
2129                             }
2130                         }
2131                         state is WaiterEB -> {
2132                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2133                                 suspendedReceivers += state.waiter // save for cancellation.
2134                                 segment.onCancelledRequest(index = index, receiver = true)
2135                                 break@cell_update
2136                             }
2137                         }
2138                         state is Waiter -> {
2139                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2140                                 suspendedReceivers += state // save for cancellation.
2141                                 segment.onCancelledRequest(index = index, receiver = true)
2142                                 break@cell_update
2143                             }
2144                         }
2145                         else -> break@cell_update // nothing to cancel.
2146                     }
2147                 }
2148             }
2149             // Process the previous segment.
2150             segment = segment.prev
2151         }
2152         // Cancel the suspended requests in their order of addition to this channel.
2153         suspendedReceivers.forEachReversed { it.resumeReceiverOnClosedChannel() }
2154     }
2155 
2156     /**
2157      * Resumes this receiver because this channel is closed.
2158      * This function does not take any effect if the operation has already been resumed or cancelled.
2159      */
2160     private fun Waiter.resumeReceiverOnClosedChannel() = resumeWaiterOnClosedChannel(receiver = true)
2161 
2162     /**
2163      * Resumes this sender because this channel is cancelled.
2164      * This function does not take any effect if the operation has already been resumed or cancelled.
2165      */
2166     private fun Waiter.resumeSenderOnCancelledChannel() = resumeWaiterOnClosedChannel(receiver = false)
2167 
2168     private fun Waiter.resumeWaiterOnClosedChannel(receiver: Boolean) {
2169         when (this) {
2170             is SendBroadcast -> cont.resume(false)
2171             is CancellableContinuation<*> -> resumeWithException(if (receiver) receiveException else sendException)
2172             is ReceiveCatching<*> -> cont.resume(closed(closeCause))
2173             is BufferedChannel<*>.BufferedChannelIterator -> tryResumeHasNextOnClosedChannel()
2174             is SelectInstance<*> -> trySelect(this@BufferedChannel, CHANNEL_CLOSED)
2175             else -> error("Unexpected waiter: $this")
2176         }
2177     }
2178 
2179     @ExperimentalCoroutinesApi
2180     override val isClosedForSend: Boolean
2181         get() = sendersAndCloseStatus.value.isClosedForSend0
2182 
2183     private val Long.isClosedForSend0 get() =
2184         isClosed(this, isClosedForReceive = false)
2185 
2186     @ExperimentalCoroutinesApi
2187     override val isClosedForReceive: Boolean
2188         get() = sendersAndCloseStatus.value.isClosedForReceive0
2189 
2190     private val Long.isClosedForReceive0 get() =
2191         isClosed(this, isClosedForReceive = true)
2192 
2193     private fun isClosed(
2194         sendersAndCloseStatusCur: Long,
2195         isClosedForReceive: Boolean
2196     ) = when (sendersAndCloseStatusCur.sendersCloseStatus) {
2197         // This channel is active and has not been closed.
2198         CLOSE_STATUS_ACTIVE -> false
2199         // The cancellation procedure has been started but
2200         // not linearized yet, so this channel should be
2201         // considered as active.
2202         CLOSE_STATUS_CANCELLATION_STARTED -> false
2203         // This channel has been successfully closed.
2204         // Help to complete the closing procedure to
2205         // guarantee linearizability, and return `true`
2206         // for senders or the flag whether there still
2207         // exist elements to retrieve for receivers.
2208         CLOSE_STATUS_CLOSED -> {
2209             completeClose(sendersAndCloseStatusCur.sendersCounter)
2210             // When `isClosedForReceive` is `false`, always return `true`.
2211             // Otherwise, it is possible that the channel is closed but
2212             // still has elements to retrieve.
2213             if (isClosedForReceive) !hasElements() else true
2214         }
2215         // This channel has been successfully cancelled.
2216         // Help to complete the cancellation procedure to
2217         // guarantee linearizability and return `true`.
2218         CLOSE_STATUS_CANCELLED -> {
2219             completeCancel(sendersAndCloseStatusCur.sendersCounter)
2220             true
2221         }
2222         else -> error("unexpected close status: ${sendersAndCloseStatusCur.sendersCloseStatus}")
2223     }
2224 
2225     @ExperimentalCoroutinesApi
2226     override val isEmpty: Boolean get() {
2227         // This function should return `false` if
2228         // this channel is closed for `receive`.
2229         if (isClosedForReceive) return false
2230         // Does this channel has elements to retrieve?
2231         if (hasElements()) return false
2232         // This channel does not have elements to retrieve;
2233         // Check that it is still not closed for `receive`.
2234         return !isClosedForReceive
2235     }
2236 
2237     /**
2238      * Checks whether this channel contains elements to retrieve.
2239      * Unfortunately, simply comparing the counters is insufficient,
2240      * as some cells can be in the `INTERRUPTED` state due to cancellation.
2241      * This function tries to find the first "alive" element,
2242      * updating the `receivers` counter to skip empty cells.
2243      *
2244      * The implementation is similar to `receive()`.
2245      */
2246     internal fun hasElements(): Boolean {
2247         while (true) {
2248             // Read the segment before obtaining the `receivers` counter value.
2249             var segment = receiveSegment.value
2250             // Obtains the `receivers` and `senders` counter values.
2251             val r = receiversCounter
2252             val s = sendersCounter
2253             // Is there a chance that this channel has elements?
2254             if (s <= r) return false // no elements
2255             // The `r`-th cell is covered by a sender; check whether it contains an element.
2256             // First, try to find the required segment if the initially
2257             // obtained segment (in the beginning of this function) has lower id.
2258             val id = r / SEGMENT_SIZE
2259             if (segment.id != id) {
2260                 // Try to find the required segment.
2261                 segment = findSegmentReceive(id, segment) ?:
2262                     // The required segment has not been found. Either it has already
2263                     // been removed, or the underlying linked list is already closed
2264                     // for segment additions. In the latter case, the channel is closed
2265                     // and does not contain elements, so this operation returns `false`.
2266                     // Otherwise, if the required segment is removed, the operation restarts.
2267                     if (receiveSegment.value.id < id) return false else continue
2268             }
2269             segment.cleanPrev() // all the previous segments are no longer needed.
2270             // Does the `r`-th cell contain waiting sender or buffered element?
2271             val i = (r % SEGMENT_SIZE).toInt()
2272             if (isCellNonEmpty(segment, i, r)) return true
2273             // The cell is empty. Update `receivers` counter and try again.
2274             receivers.compareAndSet(r, r + 1) // if this CAS fails, the counter has already been updated.
2275         }
2276     }
2277 
2278     /**
2279      * Checks whether this cell contains a buffered element or a waiting sender,
2280      * returning `true` in this case. Otherwise, if this cell is empty
2281      * (due to waiter cancellation, cell poisoning, or channel closing),
2282      * this function returns `false`.
2283      *
2284      * Notably, this function must be called only if the cell is covered by a sender.
2285      */
2286     private fun isCellNonEmpty(
2287         segment: ChannelSegment<E>,
2288         index: Int,
2289         globalIndex: Long
2290     ): Boolean {
2291         // The logic is similar to `updateCellReceive` with the only difference
2292         // that this function neither changes the cell state nor retrieves the element.
2293         while (true) {
2294             // Read the current cell state.
2295             val state = segment.getState(index)
2296             when {
2297                 // The cell is empty but a sender is coming.
2298                 state === null || state === IN_BUFFER -> {
2299                     // Poison the cell to ensure correctness.
2300                     if (segment.casState(index, state, POISONED)) {
2301                         // When the cell becomes poisoned, it is essentially
2302                         // the same as storing an already cancelled receiver.
2303                         // Thus, the `expandBuffer()` procedure should be invoked.
2304                         expandBuffer()
2305                         return false
2306                     }
2307                 }
2308                 // The cell stores a buffered element.
2309                 state === BUFFERED -> return true
2310                 // The cell stores an interrupted sender.
2311                 state === INTERRUPTED_SEND -> return false
2312                 // This channel is already closed.
2313                 state === CHANNEL_CLOSED -> return false
2314                 // The cell is already processed
2315                 // by a concurrent receiver.
2316                 state === DONE_RCV -> return false
2317                 // The cell is already poisoned
2318                 // by a concurrent receiver.
2319                 state === POISONED -> return false
2320                 // A concurrent `expandBuffer()` is resuming
2321                 // a suspended sender. This function is eligible
2322                 // to linearize before the buffer expansion procedure.
2323                 state === RESUMING_BY_EB -> return true
2324                 // A concurrent receiver is resuming
2325                 // a suspended sender. The element
2326                 // is no longer available for retrieval.
2327                 state === RESUMING_BY_RCV -> return false
2328                 // The cell stores a suspended request.
2329                 // However, it is possible that this request
2330                 // is receiver if the cell is covered by both
2331                 // send and receive operations.
2332                 // In case the cell is already covered by
2333                 // a receiver, the element is no longer
2334                 // available for retrieval, and this function
2335                 // return `false`. Otherwise, it is guaranteed
2336                 // that the suspended request is sender, so
2337                 // this function returns `true`.
2338                 else -> return globalIndex == receiversCounter
2339             }
2340         }
2341     }
2342 
2343     // #######################
2344     // # Segments Management #
2345     // #######################
2346 
2347     /**
2348      * Finds the segment with the specified [id] starting by the [startFrom]
2349      * segment and following the [ChannelSegment.next] references. In case
2350      * the required segment has not been created yet, this function attempts
2351      * to add it to the underlying linked list. Finally, it updates [sendSegment]
2352      * to the found segment if its [ChannelSegment.id] is greater than the one
2353      * of the already stored segment.
2354      *
2355      * In case the requested segment is already removed, or if it should be allocated
2356      * but the linked list structure is closed for new segments addition, this function
2357      * returns `null`. The implementation also efficiently skips a sequence of removed
2358      * segments, updating the counter value in [sendersAndCloseStatus] correspondingly.
2359      */
2360     private fun findSegmentSend(id: Long, startFrom: ChannelSegment<E>): ChannelSegment<E>? {
2361         return sendSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2362             if (it.isClosed) {
2363                 // The required segment has not been found and new segments
2364                 // cannot be added, as the linked listed in already added.
2365                 // This channel is already closed or cancelled; help to complete
2366                 // the closing or cancellation procedure.
2367                 completeCloseOrCancel()
2368                 // Clean the `prev` reference of the provided segment
2369                 // if all the previous cells are already covered by senders.
2370                 // It is important to clean the `prev` reference only in
2371                 // this case, as the closing/cancellation procedure may
2372                 // need correct value to traverse the linked list from right to left.
2373                 if (startFrom.id * SEGMENT_SIZE <  receiversCounter) startFrom.cleanPrev()
2374                 // As the required segment is not found and cannot be allocated, return `null`.
2375                 null
2376             } else {
2377                 // Get the found segment.
2378                 val segment = it.segment
2379                 // Is the required segment removed?
2380                 if (segment.id > id) {
2381                     // The required segment has been removed; `segment` is the first
2382                     // segment with `id` not lower than the required one.
2383                     // Skip the sequence of removed cells in O(1).
2384                     updateSendersCounterIfLower(segment.id * SEGMENT_SIZE)
2385                     // Clean the `prev` reference of the provided segment
2386                     // if all the previous cells are already covered by senders.
2387                     // It is important to clean the `prev` reference only in
2388                     // this case, as the closing/cancellation procedure may
2389                     // need correct value to traverse the linked list from right to left.
2390                     if (segment.id * SEGMENT_SIZE <  receiversCounter) segment.cleanPrev()
2391                     // As the required segment is not found and cannot be allocated, return `null`.
2392                     null
2393                 } else {
2394                     assert { segment.id == id }
2395                     // The required segment has been found; return it!
2396                     segment
2397                 }
2398             }
2399         }
2400     }
2401 
2402     /**
2403      * Finds the segment with the specified [id] starting by the [startFrom]
2404      * segment and following the [ChannelSegment.next] references. In case
2405      * the required segment has not been created yet, this function attempts
2406      * to add it to the underlying linked list. Finally, it updates [receiveSegment]
2407      * to the found segment if its [ChannelSegment.id] is greater than the one
2408      * of the already stored segment.
2409      *
2410      * In case the requested segment is already removed, or if it should be allocated
2411      * but the linked list structure is closed for new segments addition, this function
2412      * returns `null`. The implementation also efficiently skips a sequence of removed
2413      * segments, updating the [receivers] counter correspondingly.
2414      */
2415     private fun findSegmentReceive(id: Long, startFrom: ChannelSegment<E>): ChannelSegment<E>? =
2416         receiveSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2417             if (it.isClosed) {
2418                 // The required segment has not been found and new segments
2419                 // cannot be added, as the linked listed in already added.
2420                 // This channel is already closed or cancelled; help to complete
2421                 // the closing or cancellation procedure.
2422                 completeCloseOrCancel()
2423                 // Clean the `prev` reference of the provided segment
2424                 // if all the previous cells are already covered by senders.
2425                 // It is important to clean the `prev` reference only in
2426                 // this case, as the closing/cancellation procedure may
2427                 // need correct value to traverse the linked list from right to left.
2428                 if (startFrom.id * SEGMENT_SIZE < sendersCounter) startFrom.cleanPrev()
2429                 // As the required segment is not found and cannot be allocated, return `null`.
2430                 null
2431             } else {
2432                 // Get the found segment.
2433                 val segment = it.segment
2434                 // Advance the `bufferEnd` segment if required.
2435                 if (!isRendezvousOrUnlimited && id <= bufferEndCounter / SEGMENT_SIZE) {
2436                     bufferEndSegment.moveForward(segment)
2437                 }
2438                 // Is the required segment removed?
2439                 if (segment.id > id) {
2440                     // The required segment has been removed; `segment` is the first
2441                     // segment with `id` not lower than the required one.
2442                     // Skip the sequence of removed cells in O(1).
2443                     updateReceiversCounterIfLower(segment.id * SEGMENT_SIZE)
2444                     // Clean the `prev` reference of the provided segment
2445                     // if all the previous cells are already covered by senders.
2446                     // It is important to clean the `prev` reference only in
2447                     // this case, as the closing/cancellation procedure may
2448                     // need correct value to traverse the linked list from right to left.
2449                     if (segment.id * SEGMENT_SIZE < sendersCounter) segment.cleanPrev()
2450                     // As the required segment is already removed, return `null`.
2451                     null
2452                 } else {
2453                     assert { segment.id == id }
2454                     // The required segment has been found; return it!
2455                     segment
2456                 }
2457             }
2458         }
2459 
2460     /**
2461      * Importantly, when this function does not find the requested segment,
2462      * it always updates the number of completed `expandBuffer()` attempts.
2463      */
2464     private fun findSegmentBufferEnd(id: Long, startFrom: ChannelSegment<E>, currentBufferEndCounter: Long): ChannelSegment<E>? =
2465         bufferEndSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2466             if (it.isClosed) {
2467                 // The required segment has not been found and new segments
2468                 // cannot be added, as the linked listed in already added.
2469                 // This channel is already closed or cancelled; help to complete
2470                 // the closing or cancellation procedure.
2471                 completeCloseOrCancel()
2472                 // Update `bufferEndSegment` to the last segment
2473                 // in the linked list to avoid memory leaks.
2474                 moveSegmentBufferEndToSpecifiedOrLast(id, startFrom)
2475                 // When this function does not find the requested segment,
2476                 // it should update the number of completed `expandBuffer()` attempts.
2477                 incCompletedExpandBufferAttempts()
2478                 null
2479             } else {
2480                 // Get the found segment.
2481                 val segment = it.segment
2482                 // Is the required segment removed?
2483                 if (segment.id > id) {
2484                     // The required segment has been removed; `segment` is the first segment
2485                     // with `id` not lower than the required one.
2486                     // Try to skip the sequence of removed cells in O(1) by increasing the `bufferEnd` counter.
2487                     // Importantly, when this function does not find the requested segment,
2488                     // it should update the number of completed `expandBuffer()` attempts.
2489                     if (bufferEnd.compareAndSet(currentBufferEndCounter + 1, segment.id * SEGMENT_SIZE)) {
2490                         incCompletedExpandBufferAttempts(segment.id * SEGMENT_SIZE - currentBufferEndCounter)
2491                     } else {
2492                         incCompletedExpandBufferAttempts()
2493                     }
2494                     // As the required segment is already removed, return `null`.
2495                     null
2496                 } else {
2497                     assert { segment.id == id }
2498                     // The required segment has been found; return it!
2499                     segment
2500                 }
2501             }
2502         }
2503 
2504     /**
2505      * Updates [bufferEndSegment] to the one with the specified [id] or
2506      * to the last existing segment, if the required segment is not yet created.
2507      *
2508      * Unlike [findSegmentBufferEnd], this function does not allocate new segments.
2509      */
2510     private fun moveSegmentBufferEndToSpecifiedOrLast(id: Long, startFrom: ChannelSegment<E>) {
2511         // Start searching the required segment from the specified one.
2512         var segment: ChannelSegment<E> = startFrom
2513         while (segment.id < id) {
2514             segment = segment.next ?: break
2515         }
2516         // Skip all removed segments and try to update `bufferEndSegment`
2517         // to the first non-removed one. This part should succeed eventually,
2518         // as the tail segment is never removed.
2519         while (true) {
2520             while (segment.isRemoved) {
2521                 segment = segment.next ?: break
2522             }
2523             // Try to update `bufferEndSegment`. On failure,
2524             // the found segment is already removed, so it
2525             // should be skipped.
2526             if (bufferEndSegment.moveForward(segment)) return
2527         }
2528     }
2529 
2530     /**
2531      * Updates the `senders` counter if its value
2532      * is lower that the specified one.
2533      *
2534      * Senders use this function to efficiently skip
2535      * a sequence of cancelled receivers.
2536      */
2537     private fun updateSendersCounterIfLower(value: Long): Unit =
2538         sendersAndCloseStatus.loop { cur ->
2539             val curCounter = cur.sendersCounter
2540             if (curCounter >= value) return
2541             val update = constructSendersAndCloseStatus(curCounter, cur.sendersCloseStatus)
2542             if (sendersAndCloseStatus.compareAndSet(cur, update)) return
2543         }
2544 
2545     /**
2546      * Updates the `receivers` counter if its value
2547      * is lower that the specified one.
2548      *
2549      * Receivers use this function to efficiently skip
2550      * a sequence of cancelled senders.
2551      */
2552     private fun updateReceiversCounterIfLower(value: Long): Unit =
2553         receivers.loop { cur ->
2554             if (cur >= value) return
2555             if (receivers.compareAndSet(cur, value)) return
2556         }
2557 
2558     // ###################
2559     // # Debug Functions #
2560     // ###################
2561 
2562     @Suppress("ConvertTwoComparisonsToRangeCheck")
2563     override fun toString(): String {
2564         val sb = StringBuilder()
2565         // Append the close status
2566         when (sendersAndCloseStatus.value.sendersCloseStatus) {
2567             CLOSE_STATUS_CLOSED -> sb.append("closed,")
2568             CLOSE_STATUS_CANCELLED -> sb.append("cancelled,")
2569         }
2570         // Append the buffer capacity
2571         sb.append("capacity=$capacity,")
2572         // Append the data
2573         sb.append("data=[")
2574         val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2575             .filter { it !== NULL_SEGMENT }
2576             .minBy { it.id }
2577         val r = receiversCounter
2578         val s = sendersCounter
2579         var segment = firstSegment
2580         append_elements@ while (true) {
2581             process_cell@ for (i in 0 until SEGMENT_SIZE) {
2582                 val globalCellIndex = segment.id * SEGMENT_SIZE + i
2583                 if (globalCellIndex >= s && globalCellIndex >= r) break@append_elements
2584                 val cellState = segment.getState(i)
2585                 val element = segment.getElement(i)
2586                 val cellStateString = when (cellState) {
2587                     is CancellableContinuation<*> -> {
2588                         when {
2589                             globalCellIndex < r && globalCellIndex >= s -> "receive"
2590                             globalCellIndex < s && globalCellIndex >= r -> "send"
2591                             else -> "cont"
2592                         }
2593                     }
2594                     is SelectInstance<*> -> {
2595                         when {
2596                             globalCellIndex < r && globalCellIndex >= s -> "onReceive"
2597                             globalCellIndex < s && globalCellIndex >= r -> "onSend"
2598                             else -> "select"
2599                         }
2600                     }
2601                     is ReceiveCatching<*> -> "receiveCatching"
2602                     is SendBroadcast -> "sendBroadcast"
2603                     is WaiterEB -> "EB($cellState)"
2604                     RESUMING_BY_RCV, RESUMING_BY_EB -> "resuming_sender"
2605                     null, IN_BUFFER, DONE_RCV, POISONED, INTERRUPTED_RCV, INTERRUPTED_SEND, CHANNEL_CLOSED -> continue@process_cell
2606                     else -> cellState.toString() // leave it just in case something is missed.
2607                 }
2608                 if (element != null) {
2609                     sb.append("($cellStateString,$element),")
2610                 } else {
2611                     sb.append("$cellStateString,")
2612                 }
2613             }
2614             // Process the next segment if exists.
2615             segment = segment.next ?: break
2616         }
2617         if (sb.last() == ',') sb.deleteAt(sb.length - 1)
2618         sb.append("]")
2619         // The string representation is constructed.
2620         return sb.toString()
2621     }
2622 
2623     // Returns a debug representation of this channel,
2624     // which is actively used in Lincheck tests.
2625     internal fun toStringDebug(): String {
2626         val sb = StringBuilder()
2627         // Append the counter values and the close status
2628         sb.append("S=${sendersCounter},R=${receiversCounter},B=${bufferEndCounter},B'=${completedExpandBuffersAndPauseFlag.value},C=${sendersAndCloseStatus.value.sendersCloseStatus},")
2629         when (sendersAndCloseStatus.value.sendersCloseStatus) {
2630             CLOSE_STATUS_CANCELLATION_STARTED -> sb.append("CANCELLATION_STARTED,")
2631             CLOSE_STATUS_CLOSED -> sb.append("CLOSED,")
2632             CLOSE_STATUS_CANCELLED -> sb.append("CANCELLED,")
2633         }
2634         // Append the segment references
2635         sb.append("SEND_SEGM=${sendSegment.value.hexAddress},RCV_SEGM=${receiveSegment.value.hexAddress}")
2636         if (!isRendezvousOrUnlimited) sb.append(",EB_SEGM=${bufferEndSegment.value.hexAddress}")
2637         sb.append("  ") // add some space
2638         // Append the linked list of segments.
2639         val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2640             .filter { it !== NULL_SEGMENT }
2641             .minBy { it.id }
2642         var segment = firstSegment
2643         while (true) {
2644             sb.append("${segment.hexAddress}=[${if (segment.isRemoved) "*" else ""}${segment.id},prev=${segment.prev?.hexAddress},")
2645             repeat(SEGMENT_SIZE) { i ->
2646                 val cellState = segment.getState(i)
2647                 val element = segment.getElement(i)
2648                 val cellStateString = when (cellState) {
2649                     is CancellableContinuation<*> -> "cont"
2650                     is SelectInstance<*> -> "select"
2651                     is ReceiveCatching<*> -> "receiveCatching"
2652                     is SendBroadcast -> "send(broadcast)"
2653                     is WaiterEB -> "EB($cellState)"
2654                     else -> cellState.toString()
2655                 }
2656                 sb.append("[$i]=($cellStateString,$element),")
2657             }
2658             sb.append("next=${segment.next?.hexAddress}]  ")
2659             // Process the next segment if exists.
2660             segment = segment.next ?: break
2661         }
2662         // The string representation of this channel is now constructed!
2663         return sb.toString()
2664     }
2665 
2666 
2667     // This is an internal methods for tests.
2668     fun checkSegmentStructureInvariants() {
2669         if (isRendezvousOrUnlimited) {
2670             check(bufferEndSegment.value === NULL_SEGMENT) {
2671                 "bufferEndSegment must be NULL_SEGMENT for rendezvous and unlimited channels; they do not manipulate it.\n" +
2672                     "Channel state: $this"
2673             }
2674         } else {
2675             check(receiveSegment.value.id <= bufferEndSegment.value.id) {
2676                 "bufferEndSegment should not have lower id than receiveSegment.\n" +
2677                     "Channel state: $this"
2678             }
2679         }
2680         val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2681             .filter { it !== NULL_SEGMENT }
2682             .minBy { it.id }
2683         check(firstSegment.prev == null) {
2684             "All processed segments should be unreachable from the data structure, but the `prev` link of the leftmost segment is non-null.\n" +
2685                 "Channel state: $this"
2686         }
2687         // Check that the doubly-linked list of segments does not
2688         // contain full-of-cancelled-cells segments.
2689         var segment = firstSegment
2690         while (segment.next != null) {
2691             // Note that the `prev` reference can be `null` if this channel is closed.
2692             check(segment.next!!.prev == null || segment.next!!.prev === segment) {
2693                 "The `segment.next.prev === segment` invariant is violated.\n" +
2694                     "Channel state: $this"
2695             }
2696             // Count the number of closed/interrupted cells
2697             // and check that all cells are in expected states.
2698             var interruptedOrClosedCells = 0
2699             for (i in 0 until SEGMENT_SIZE) {
2700                 when (val state = segment.getState(i)) {
2701                     BUFFERED -> {} // The cell stores a buffered element.
2702                     is Waiter -> {} // The cell stores a suspended request.
2703                     INTERRUPTED_RCV, INTERRUPTED_SEND, CHANNEL_CLOSED -> {
2704                         // The cell stored an interrupted request or indicates
2705                         // that this channel is already closed.
2706                         // Check that the element slot is cleaned and increment
2707                         // the number of cells in closed/interrupted state.
2708                         check(segment.getElement(i) == null)
2709                         interruptedOrClosedCells++
2710                     }
2711                     POISONED, DONE_RCV -> {
2712                         // The cell is successfully processed or poisoned.
2713                         // Check that the element slot is cleaned.
2714                         check(segment.getElement(i) == null)
2715                     }
2716                     // Other states are illegal after all running operations finish.
2717                     else -> error("Unexpected segment cell state: $state.\nChannel state: $this")
2718                 }
2719             }
2720             // Is this segment full of cancelled/closed cells?
2721             // If so, this segment should be removed from the
2722             // linked list if nether `receiveSegment`, nor
2723             // `sendSegment`, nor `bufferEndSegment` reference it.
2724             if (interruptedOrClosedCells == SEGMENT_SIZE) {
2725                 check(segment === receiveSegment.value || segment === sendSegment.value || segment === bufferEndSegment.value) {
2726                     "Logically removed segment is reachable.\nChannel state: $this"
2727                 }
2728             }
2729             // Process the next segment.
2730             segment = segment.next!!
2731         }
2732     }
2733 }
2734 
2735 /**
2736  * The channel is represented as a list of segments, which simulates an infinite array.
2737  * Each segment has its own [id], which increase from the beginning. These [id]s help
2738  * to update [BufferedChannel.sendSegment], [BufferedChannel.receiveSegment],
2739  * and [BufferedChannel.bufferEndSegment] correctly.
2740  */
2741 internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: BufferedChannel<E>?, pointers: Int) : Segment<ChannelSegment<E>>(id, prev, pointers) {
2742     private val _channel: BufferedChannel<E>? = channel
2743     val channel get() = _channel!! // always non-null except for `NULL_SEGMENT`
2744 
2745     private val data = atomicArrayOfNulls<Any?>(SEGMENT_SIZE * 2) // 2 registers per slot: state + element
2746     override val numberOfSlots: Int get() = SEGMENT_SIZE
2747 
2748     // ########################################
2749     // # Manipulation with the Element Fields #
2750     // ########################################
2751 
storeElementnull2752     internal fun storeElement(index: Int, element: E) {
2753         setElementLazy(index, element)
2754     }
2755 
2756     @Suppress("UNCHECKED_CAST")
getElementnull2757     internal fun getElement(index: Int) = data[index * 2].value as E
2758 
2759     internal fun retrieveElement(index: Int): E = getElement(index).also { cleanElement(index) }
2760 
cleanElementnull2761     internal fun cleanElement(index: Int) {
2762         setElementLazy(index, null)
2763     }
2764 
setElementLazynull2765     private fun setElementLazy(index: Int, value: Any?) {
2766         data[index * 2].lazySet(value)
2767     }
2768 
2769     // ######################################
2770     // # Manipulation with the State Fields #
2771     // ######################################
2772 
getStatenull2773     internal fun getState(index: Int): Any? = data[index * 2 + 1].value
2774 
2775     internal fun setState(index: Int, value: Any?) {
2776         data[index * 2 + 1].value = value
2777     }
2778 
casStatenull2779     internal fun casState(index: Int, from: Any?, to: Any?) = data[index * 2 + 1].compareAndSet(from, to)
2780 
2781     internal fun getAndSetState(index: Int, update: Any?) = data[index * 2 + 1].getAndSet(update)
2782 
2783 
2784     // ########################
2785     // # Cancellation Support #
2786     // ########################
2787 
2788     override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
2789         // To distinguish cancelled senders and receivers, senders equip the index value with
2790         // an additional marker, adding `SEGMENT_SIZE` to the value.
2791         val isSender = index >= SEGMENT_SIZE
2792         // Unwrap the index.
2793         @Suppress("NAME_SHADOWING") val index = if (isSender) index - SEGMENT_SIZE else index
2794         // Read the element, which may be needed further to call `onUndeliveredElement`.
2795         val element = getElement(index)
2796         // Update the cell state.
2797         while (true) {
2798             // CAS-loop
2799             // Read the current state of the cell.
2800             val cur = getState(index)
2801             when {
2802                 // The cell stores a waiter.
2803                 cur is Waiter || cur is WaiterEB -> {
2804                     // The cancelled request is either send or receive.
2805                     // Update the cell state correspondingly.
2806                     val update = if (isSender) INTERRUPTED_SEND else INTERRUPTED_RCV
2807                     if (casState(index, cur, update)) {
2808                         // The waiter has been successfully cancelled.
2809                         // Clean the element slot and invoke `onSlotCleaned()`,
2810                         // which may cause deleting the whole segment from the linked list.
2811                         // In case the cancelled request is receiver, it is critical to ensure
2812                         // that the `expandBuffer()` attempt that processes this cell is completed,
2813                         // so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`.
2814                         cleanElement(index)
2815                         onCancelledRequest(index, !isSender)
2816                         // Call `onUndeliveredElement` if needed.
2817                         if (isSender) {
2818                             channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2819                         }
2820                         return
2821                     }
2822                 }
2823                 // The cell already indicates that the operation is cancelled.
2824                 cur === INTERRUPTED_SEND || cur === INTERRUPTED_RCV -> {
2825                     // Clean the element slot to avoid memory leaks,
2826                     // invoke `onUndeliveredElement` if needed, and finish
2827                     cleanElement(index)
2828                     // Call `onUndeliveredElement` if needed.
2829                     if (isSender) {
2830                         channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2831                     }
2832                     return
2833                 }
2834                 // An opposite operation is resuming this request;
2835                 // wait until the cell state updates.
2836                 // It is possible that an opposite operation has already
2837                 // resumed this request, which will result in updating
2838                 // the cell state to `DONE_RCV` or `BUFFERED`, while the
2839                 // current cancellation is caused by prompt cancellation.
2840                 cur === RESUMING_BY_EB || cur === RESUMING_BY_RCV -> continue
2841                 // This request was successfully resumed, so this cancellation
2842                 // is caused by the prompt cancellation feature and should be ignored.
2843                 cur === DONE_RCV || cur === BUFFERED -> return
2844                 // The cell state indicates that the channel is closed;
2845                 // this cancellation should be ignored.
2846                 cur === CHANNEL_CLOSED -> return
2847                 else -> error("unexpected state: $cur")
2848             }
2849         }
2850     }
2851 
2852     /**
2853      * Invokes `onSlotCleaned()` preceded by a `waitExpandBufferCompletion(..)` call
2854      * in case the cancelled request is receiver.
2855      */
onCancelledRequestnull2856     fun onCancelledRequest(index: Int, receiver: Boolean) {
2857         if (receiver) channel.waitExpandBufferCompletion(id * SEGMENT_SIZE + index)
2858         onSlotCleaned()
2859     }
2860 }
2861 
2862 // WA for atomicfu + JVM_IR compiler bug that lead to SMAP-related compiler crashes: KT-55983
createSegmentFunctionnull2863 internal fun <E> createSegmentFunction(): KFunction2<Long, ChannelSegment<E>, ChannelSegment<E>> = ::createSegment
2864 
2865 private fun <E> createSegment(id: Long, prev: ChannelSegment<E>) = ChannelSegment(
2866     id = id,
2867     prev = prev,
2868     channel = prev.channel,
2869     pointers = 0
2870 )
2871 private val NULL_SEGMENT = ChannelSegment<Any?>(id = -1, prev = null, channel = null, pointers = 0)
2872 
2873 /**
2874  * Number of cells in each segment.
2875  */
2876 @JvmField
2877 internal val SEGMENT_SIZE = systemProp("kotlinx.coroutines.bufferedChannel.segmentSize", 32)
2878 
2879 /**
2880  * Number of iterations to wait in [BufferedChannel.waitExpandBufferCompletion] until the numbers of started and completed
2881  * [BufferedChannel.expandBuffer] calls coincide. When the limit is reached, [BufferedChannel.waitExpandBufferCompletion]
2882  * blocks further [BufferedChannel.expandBuffer]-s to avoid starvation.
2883  */
2884 private val EXPAND_BUFFER_COMPLETION_WAIT_ITERATIONS = systemProp("kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations", 10_000)
2885 
2886 /**
2887  * Tries to resume this continuation with the specified
2888  * value. Returns `true` on success and `false` on failure.
2889  */
2890 private fun <T> CancellableContinuation<T>.tryResume0(
2891     value: T,
2892     onCancellation: ((cause: Throwable) -> Unit)? = null
2893 ): Boolean =
2894     tryResume(value, null, onCancellation).let { token ->
2895         if (token != null) {
2896             completeResume(token)
2897             true
2898         } else false
2899     }
2900 
2901 /*
2902   If the channel is rendezvous or unlimited, the `bufferEnd` counter
2903   should be initialized with the corresponding value below and never change.
2904   In this case, the `expandBuffer(..)` operation does nothing.
2905  */
2906 private const val BUFFER_END_RENDEZVOUS = 0L // no buffer
2907 private const val BUFFER_END_UNLIMITED = Long.MAX_VALUE // infinite buffer
initialBufferEndnull2908 private fun initialBufferEnd(capacity: Int): Long = when (capacity) {
2909     Channel.RENDEZVOUS -> BUFFER_END_RENDEZVOUS
2910     Channel.UNLIMITED -> BUFFER_END_UNLIMITED
2911     else -> capacity.toLong()
2912 }
2913 
2914 /*
2915   Cell states. The initial "empty" state is represented with `null`,
2916   and suspended operations are represented with [Waiter] instances.
2917  */
2918 
2919 // The cell stores a buffered element.
2920 @JvmField
2921 internal val BUFFERED = Symbol("BUFFERED")
2922 // Concurrent `expandBuffer(..)` can inform the
2923 // upcoming sender that it should buffer the element.
2924 private val IN_BUFFER = Symbol("SHOULD_BUFFER")
2925 // Indicates that a receiver (RCV suffix) is resuming
2926 // the suspended sender; after that, it should update
2927 // the state to either `DONE_RCV` (on success) or
2928 // `INTERRUPTED_SEND` (on failure).
2929 private val RESUMING_BY_RCV = Symbol("S_RESUMING_BY_RCV")
2930 // Indicates that `expandBuffer(..)` (RCV suffix) is resuming
2931 // the suspended sender; after that, it should update
2932 // the state to either `BUFFERED` (on success) or
2933 // `INTERRUPTED_SEND` (on failure).
2934 private val RESUMING_BY_EB = Symbol("RESUMING_BY_EB")
2935 // When a receiver comes to the cell already covered by
2936 // a sender (according to the counters), but the cell
2937 // is still in `EMPTY` or `IN_BUFFER` state, it breaks
2938 // the cell by changing its state to `POISONED`.
2939 private val POISONED = Symbol("POISONED")
2940 // When the element is successfully transferred
2941 // to a receiver, the cell changes to `DONE_RCV`.
2942 private val DONE_RCV = Symbol("DONE_RCV")
2943 // Cancelled sender.
2944 private val INTERRUPTED_SEND = Symbol("INTERRUPTED_SEND")
2945 // Cancelled receiver.
2946 private val INTERRUPTED_RCV = Symbol("INTERRUPTED_RCV")
2947 // Indicates that the channel is closed.
2948 internal val CHANNEL_CLOSED = Symbol("CHANNEL_CLOSED")
2949 // When the cell is already covered by both sender and
2950 // receiver (`sender` and `receivers` counters are greater
2951 // than the cell number), the `expandBuffer(..)` procedure
2952 // cannot distinguish which kind of operation is stored
2953 // in the cell. Thus, it wraps the waiter with this descriptor,
2954 // informing the possibly upcoming receiver that it should
2955 // complete the `expandBuffer(..)` procedure if the waiter stored
2956 // in the cell is sender. In turn, senders ignore this information.
2957 private class WaiterEB(@JvmField val waiter: Waiter) {
toStringnull2958     override fun toString() = "WaiterEB($waiter)"
2959 }
2960 
2961 
2962 
2963 /**
2964  * To distinguish suspended [BufferedChannel.receive] and
2965  * [BufferedChannel.receiveCatching] operations, the latter
2966  * uses this wrapper for its continuation.
2967  */
2968 private class ReceiveCatching<E>(
2969     @JvmField val cont: CancellableContinuationImpl<ChannelResult<E>>
2970 ) : Waiter by cont
2971 
2972 /*
2973   Internal results for [BufferedChannel.updateCellReceive].
2974   On successful rendezvous with waiting sender or
2975   buffered element retrieval, the corresponding element
2976   is returned as result of [BufferedChannel.updateCellReceive].
2977  */
2978 private val SUSPEND = Symbol("SUSPEND")
2979 private val SUSPEND_NO_WAITER = Symbol("SUSPEND_NO_WAITER")
2980 private val FAILED = Symbol("FAILED")
2981 
2982 /*
2983   Internal results for [BufferedChannel.updateCellSend]
2984  */
2985 private const val RESULT_RENDEZVOUS = 0
2986 private const val RESULT_BUFFERED = 1
2987 private const val RESULT_SUSPEND = 2
2988 private const val RESULT_SUSPEND_NO_WAITER = 3
2989 private const val RESULT_CLOSED = 4
2990 private const val RESULT_FAILED = 5
2991 
2992 /**
2993  * Special value for [BufferedChannel.BufferedChannelIterator.receiveResult]
2994  * that indicates the absence of pre-received result.
2995  */
2996 private val NO_RECEIVE_RESULT = Symbol("NO_RECEIVE_RESULT")
2997 
2998 /*
2999   As [BufferedChannel.invokeOnClose] can be invoked concurrently
3000   with channel closing, we have to synchronize them. These two
3001   markers help with the synchronization.
3002  */
3003 private val CLOSE_HANDLER_CLOSED = Symbol("CLOSE_HANDLER_CLOSED")
3004 private val CLOSE_HANDLER_INVOKED = Symbol("CLOSE_HANDLER_INVOKED")
3005 
3006 /**
3007  * Specifies the absence of closing cause, stored in [BufferedChannel._closeCause].
3008  * When the channel is closed or cancelled without exception, this [NO_CLOSE_CAUSE]
3009  * marker should be replaced with `null`.
3010  */
3011 private val NO_CLOSE_CAUSE = Symbol("NO_CLOSE_CAUSE")
3012 
3013 /*
3014   The channel close statuses. The transition scheme is the following:
3015     +--------+   +----------------------+   +-----------+
3016     | ACTIVE |-->| CANCELLATION_STARTED |-->| CANCELLED |
3017     +--------+   +----------------------+   +-----------+
3018         |                                         ^
3019         |             +--------+                  |
3020         +------------>| CLOSED |------------------+
3021                       +--------+
3022   We need `CANCELLATION_STARTED` to synchronize
3023   concurrent closing and cancellation.
3024  */
3025 private const val CLOSE_STATUS_ACTIVE = 0
3026 private const val CLOSE_STATUS_CANCELLATION_STARTED = 1
3027 private const val CLOSE_STATUS_CLOSED = 2
3028 private const val CLOSE_STATUS_CANCELLED = 3
3029 
3030 /*
3031   The `senders` counter and the channel close status
3032   are stored in a single 64-bit register to save the space
3033   and reduce the number of reads in sending operations.
3034   The code below encapsulates the required bit arithmetics.
3035  */
3036 private const val SENDERS_CLOSE_STATUS_SHIFT = 60
3037 private const val SENDERS_COUNTER_MASK = (1L shl SENDERS_CLOSE_STATUS_SHIFT) - 1
3038 private inline val Long.sendersCounter get() = this and SENDERS_COUNTER_MASK
3039 private inline val Long.sendersCloseStatus: Int get() = (this shr SENDERS_CLOSE_STATUS_SHIFT).toInt()
3040 private fun constructSendersAndCloseStatus(counter: Long, closeStatus: Int): Long =
3041     (closeStatus.toLong() shl SENDERS_CLOSE_STATUS_SHIFT) + counter
3042 
3043 /*
3044   The `completedExpandBuffersAndPauseFlag` 64-bit counter contains
3045   the number of completed `expandBuffer()` attempts along with a special
3046   flag that pauses progress to avoid starvation in `waitExpandBufferCompletion(..)`.
3047   The code below encapsulates the required bit arithmetics.
3048  */
3049 private const val EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT = 1L shl 62
3050 private const val EB_COMPLETED_COUNTER_MASK = EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT - 1
3051 private inline val Long.ebCompletedCounter get() = this and EB_COMPLETED_COUNTER_MASK
3052 private inline val Long.ebPauseExpandBuffers: Boolean get() = (this and EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT) != 0L
3053 private fun constructEBCompletedAndPauseFlag(counter: Long, pauseEB: Boolean): Long =
3054     (if (pauseEB) EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT else 0) + counter
3055