<lambda>null1 @file:Suppress("PrivatePropertyName")
2
3 package kotlinx.coroutines.channels
4
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.channels.ChannelResult.Companion.closed
8 import kotlinx.coroutines.channels.ChannelResult.Companion.failure
9 import kotlinx.coroutines.channels.ChannelResult.Companion.success
10 import kotlinx.coroutines.flow.internal.*
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.selects.*
13 import kotlinx.coroutines.selects.TrySelectDetailedResult.*
14 import kotlin.contracts.*
15 import kotlin.coroutines.*
16 import kotlin.js.*
17 import kotlin.jvm.*
18 import kotlin.math.*
19 import kotlin.random.*
20 import kotlin.reflect.*
21
22 /**
23 * The buffered channel implementation, which also serves as a rendezvous channel when the capacity is zero.
24 * The high-level structure bases on a conceptually infinite array for storing elements and waiting requests,
25 * separate counters of [send] and [receive] invocations that were ever performed, and an additional counter
26 * that indicates the end of the logical buffer by counting the number of array cells it ever contained.
27 * The key idea is that both [send] and [receive] start by incrementing their counters, assigning the array cell
28 * referenced by the counter. In case of rendezvous channels, the operation either suspends and stores its continuation
29 * in the cell or makes a rendezvous with the opposite request. Each cell can be processed by exactly one [send] and
30 * one [receive]. As for buffered channels, [send]-s can also add elements without suspension if the logical buffer
31 * contains the cell, while the [receive] operation updates the end of the buffer when its synchronization finishes.
32 *
33 * Please see the ["Fast and Scalable Channels in Kotlin Coroutines"](https://arxiv.org/abs/2211.04986)
34 * paper by Nikita Koval, Roman Elizarov, and Dan Alistarh for the detailed algorithm description.
35 */
36 internal open class BufferedChannel<E>(
37 /**
38 * Channel capacity; `Channel.RENDEZVOUS` for rendezvous channel
39 * and `Channel.UNLIMITED` for unlimited capacity.
40 */
41 private val capacity: Int,
42 @JvmField
43 internal val onUndeliveredElement: OnUndeliveredElement<E>? = null
44 ) : Channel<E> {
45 init {
46 require(capacity >= 0) { "Invalid channel capacity: $capacity, should be >=0" }
47 // This implementation has second `init`.
48 }
49
50 // Maintenance note: use `Buffered1ChannelLincheckTest` to check hypotheses.
51
52 /*
53 The counters indicate the total numbers of send, receive, and buffer expansion calls
54 ever performed. The counters are incremented in the beginning of the corresponding
55 operation; thus, acquiring a unique (for the operation type) cell to process.
56 The segments reference to the last working one for each operation type.
57
58 Notably, the counter for send is combined with the channel closing status
59 for synchronization simplicity and performance reasons.
60
61 The logical end of the buffer is initialized with the channel capacity.
62 If the channel is rendezvous or unlimited, the counter equals `BUFFER_END_RENDEZVOUS`
63 or `BUFFER_END_RENDEZVOUS`, respectively, and never updates. The `bufferEndSegment`
64 point to a special `NULL_SEGMENT` in this case.
65 */
66 private val sendersAndCloseStatus = atomic(0L)
67 private val receivers = atomic(0L)
68 private val bufferEnd = atomic(initialBufferEnd(capacity))
69
70 internal val sendersCounter: Long get() = sendersAndCloseStatus.value.sendersCounter
71 internal val receiversCounter: Long get() = receivers.value
72 private val bufferEndCounter: Long get() = bufferEnd.value
73
74 /*
75 Additionally to the counters above, we need an extra one that
76 tracks the number of cells processed by `expandBuffer()`.
77 When a receiver aborts, the corresponding cell might be
78 physically removed from the data structure to avoid memory
79 leaks, while it still can be unprocessed by `expandBuffer()`.
80 In this case, `expandBuffer()` cannot know whether the
81 removed cell contained sender or receiver and, therefore,
82 cannot proceed. To solve the race, we ensure that cells
83 correspond to cancelled receivers cannot be physically
84 removed until the cell is processed.
85 This additional counter enables the synchronization,
86 */
87 private val completedExpandBuffersAndPauseFlag = atomic(bufferEndCounter)
88
89 private val isRendezvousOrUnlimited
90 get() = bufferEndCounter.let { it == BUFFER_END_RENDEZVOUS || it == BUFFER_END_UNLIMITED }
91
92 private val sendSegment: AtomicRef<ChannelSegment<E>>
93 private val receiveSegment: AtomicRef<ChannelSegment<E>>
94 private val bufferEndSegment: AtomicRef<ChannelSegment<E>>
95
96 init {
97 @Suppress("LeakingThis")
98 val firstSegment = ChannelSegment(id = 0, prev = null, channel = this, pointers = 3)
99 sendSegment = atomic(firstSegment)
100 receiveSegment = atomic(firstSegment)
101 // If this channel is rendezvous or has unlimited capacity, the algorithm never
102 // invokes the buffer expansion procedure, and the corresponding segment reference
103 // points to a special `NULL_SEGMENT` one and never updates.
104 @Suppress("UNCHECKED_CAST")
105 bufferEndSegment = atomic(if (isRendezvousOrUnlimited) (NULL_SEGMENT as ChannelSegment<E>) else firstSegment)
106 }
107
108 // #########################
109 // ## The send operations ##
110 // #########################
111
112 override suspend fun send(element: E): Unit =
113 sendImpl( // <-- this is an inline function
114 element = element,
115 // Do not create a continuation until it is required;
116 // it is created later via [onNoWaiterSuspend], if needed.
117 waiter = null,
118 // Finish immediately if a rendezvous happens
119 // or the element has been buffered.
120 onRendezvousOrBuffered = {},
121 // As no waiter is provided, suspension is impossible.
122 onSuspend = { _, _ -> assert { false } },
123 // According to the `send(e)` contract, we need to call
124 // `onUndeliveredElement(..)` handler and throw an exception
125 // if the channel is already closed.
126 onClosed = { onClosedSend(element) },
127 // When `send(e)` decides to suspend, the corresponding
128 // `onNoWaiterSuspend` function that creates a continuation
129 // is called. The tail-call optimization is applied here.
130 onNoWaiterSuspend = { segm, i, elem, s -> sendOnNoWaiterSuspend(segm, i, elem, s) }
131 )
132
133 // NB: return type could've been Nothing, but it breaks TCO
134 private suspend fun onClosedSend(element: E): Unit = suspendCancellableCoroutine { continuation ->
135 onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
136 // If it crashes, add send exception as suppressed for better diagnostics
137 it.addSuppressed(sendException)
138 continuation.resumeWithStackTrace(it)
139 return@suspendCancellableCoroutine
140 }
141 continuation.resumeWithStackTrace(sendException)
142 }
143
144 private suspend fun sendOnNoWaiterSuspend(
145 /* The working cell is specified by
146 the segment and the index in it. */
147 segment: ChannelSegment<E>,
148 index: Int,
149 /** The element to be inserted. */
150 element: E,
151 /** The global index of the cell. */
152 s: Long
153 ) = suspendCancellableCoroutineReusable sc@{ cont ->
154 sendImplOnNoWaiter( // <-- this is an inline function
155 segment = segment, index = index, element = element, s = s,
156 // Store the created continuation as a waiter.
157 waiter = cont,
158 // If a rendezvous happens or the element has been buffered,
159 // resume the continuation and finish. In case of prompt
160 // cancellation, it is guaranteed that the element
161 // has been already buffered or passed to receiver.
162 onRendezvousOrBuffered = { cont.resume(Unit) },
163 // If the channel is closed, call `onUndeliveredElement(..)` and complete the
164 // continuation with the corresponding exception.
165 onClosed = { onClosedSendOnNoWaiterSuspend(element, cont) },
166 )
167 }
168
169 private fun Waiter.prepareSenderForSuspension(
170 /* The working cell is specified by
171 the segment and the index in it. */
172 segment: ChannelSegment<E>,
173 index: Int
174 ) {
175 // To distinguish cancelled senders and receivers,
176 // senders equip the index value with an additional marker,
177 // adding `SEGMENT_SIZE` to the value.
178 invokeOnCancellation(segment, index + SEGMENT_SIZE)
179 }
180
181 private fun onClosedSendOnNoWaiterSuspend(element: E, cont: CancellableContinuation<Unit>) {
182 onUndeliveredElement?.callUndeliveredElement(element, cont.context)
183 cont.resumeWithException(recoverStackTrace(sendException, cont))
184 }
185
186 override fun trySend(element: E): ChannelResult<Unit> {
187 // Do not try to send the element if the plain `send(e)` operation would suspend.
188 if (shouldSendSuspend(sendersAndCloseStatus.value)) return failure()
189 // This channel either has waiting receivers or is closed.
190 // Let's try to send the element!
191 // The logic is similar to the plain `send(e)` operation, with
192 // the only difference that we install `INTERRUPTED_SEND` in case
193 // the operation decides to suspend.
194 return sendImpl( // <-- this is an inline function
195 element = element,
196 // Store an already interrupted sender in case of suspension.
197 waiter = INTERRUPTED_SEND,
198 // Finish successfully when a rendezvous happens
199 // or the element has been buffered.
200 onRendezvousOrBuffered = { success(Unit) },
201 // On suspension, the `INTERRUPTED_SEND` token has been installed,
202 // and this `trySend(e)` must fail. According to the contract,
203 // we do not need to call the [onUndeliveredElement] handler.
204 onSuspend = { segm, _ ->
205 segm.onSlotCleaned()
206 failure()
207 },
208 // If the channel is closed, return the corresponding result.
209 onClosed = { closed(sendException) }
210 )
211 }
212
213 /**
214 * This is a special `send(e)` implementation that returns `true` if the element
215 * has been successfully sent, and `false` if the channel is closed.
216 *
217 * In case of coroutine cancellation, the element may be undelivered --
218 * the [onUndeliveredElement] feature is unsupported in this implementation.
219 *
220 */
221 internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutine { cont ->
222 check(onUndeliveredElement == null) {
223 "the `onUndeliveredElement` feature is unsupported for `sendBroadcast(e)`"
224 }
225 sendImpl(
226 element = element,
227 waiter = SendBroadcast(cont),
228 onRendezvousOrBuffered = { cont.resume(true) },
229 onSuspend = { _, _ -> },
230 onClosed = { cont.resume(false) }
231 )
232 }
233
234 /**
235 * Specifies waiting [sendBroadcast] operation.
236 */
237 private class SendBroadcast(
238 val cont: CancellableContinuation<Boolean>
239 ) : Waiter by cont as CancellableContinuationImpl<Boolean>
240
241 /**
242 * Abstract send implementation.
243 */
244 protected inline fun <R> sendImpl(
245 /* The element to be sent. */
246 element: E,
247 /* The waiter to be stored in case of suspension,
248 or `null` if the waiter is not created yet.
249 In the latter case, when the algorithm decides
250 to suspend, [onNoWaiterSuspend] is called. */
251 waiter: Any?,
252 /* This lambda is invoked when the element has been
253 buffered or a rendezvous with a receiver happens. */
254 onRendezvousOrBuffered: () -> R,
255 /* This lambda is called when the operation suspends in the
256 cell specified by the segment and the index in it. */
257 onSuspend: (segm: ChannelSegment<E>, i: Int) -> R,
258 /* This lambda is called when the channel
259 is observed in the closed state. */
260 onClosed: () -> R,
261 /* This lambda is called when the operation decides
262 to suspend, but the waiter is not provided (equals `null`).
263 It should create a waiter and delegate to `sendImplOnNoWaiter`. */
264 onNoWaiterSuspend: (
265 segm: ChannelSegment<E>,
266 i: Int,
267 element: E,
268 s: Long
269 ) -> R = { _, _, _, _ -> error("unexpected") }
270 ): R {
271 // Read the segment reference before the counter increment;
272 // it is crucial to be able to find the required segment later.
273 var segment = sendSegment.value
274 while (true) {
275 // Atomically increment the `senders` counter and obtain the
276 // value right before the increment along with the close status.
277 val sendersAndCloseStatusCur = sendersAndCloseStatus.getAndIncrement()
278 val s = sendersAndCloseStatusCur.sendersCounter
279 // Is this channel already closed? Keep the information.
280 val closed = sendersAndCloseStatusCur.isClosedForSend0
281 // Count the required segment id and the cell index in it.
282 val id = s / SEGMENT_SIZE
283 val i = (s % SEGMENT_SIZE).toInt()
284 // Try to find the required segment if the initially obtained
285 // one (in the beginning of this function) has lower id.
286 if (segment.id != id) {
287 // Find the required segment.
288 segment = findSegmentSend(id, segment) ?:
289 // The required segment has not been found.
290 // Finish immediately if this channel is closed,
291 // restarting the operation otherwise.
292 // In the latter case, the required segment was full
293 // of interrupted waiters and, therefore, removed
294 // physically to avoid memory leaks.
295 if (closed) {
296 return onClosed()
297 } else {
298 continue
299 }
300 }
301 // Update the cell according to the algorithm. Importantly, when
302 // the channel is already closed, storing a waiter is illegal, so
303 // the algorithm stores the `INTERRUPTED_SEND` token in this case.
304 when (updateCellSend(segment, i, element, s, waiter, closed)) {
305 RESULT_RENDEZVOUS -> {
306 // A rendezvous with a receiver has happened.
307 // The previous segments are no longer needed
308 // for the upcoming requests, so the algorithm
309 // resets the link to the previous segment.
310 segment.cleanPrev()
311 return onRendezvousOrBuffered()
312 }
313 RESULT_BUFFERED -> {
314 // The element has been buffered.
315 return onRendezvousOrBuffered()
316 }
317 RESULT_SUSPEND -> {
318 // The operation has decided to suspend and installed the
319 // specified waiter. If the channel was already closed,
320 // and the `INTERRUPTED_SEND` token has been installed as a waiter,
321 // this request finishes with the `onClosed()` action.
322 if (closed) {
323 segment.onSlotCleaned()
324 return onClosed()
325 }
326 (waiter as? Waiter)?.prepareSenderForSuspension(segment, i)
327 return onSuspend(segment, i)
328 }
329 RESULT_CLOSED -> {
330 // This channel is closed.
331 // In case this segment is already or going to be
332 // processed by a receiver, ensure that all the
333 // previous segments are unreachable.
334 if (s < receiversCounter) segment.cleanPrev()
335 return onClosed()
336 }
337 RESULT_FAILED -> {
338 // Either the cell stores an interrupted receiver,
339 // or it was poisoned by a concurrent receiver.
340 // In both cases, all the previous segments are already processed,
341 segment.cleanPrev()
342 continue
343 }
344 RESULT_SUSPEND_NO_WAITER -> {
345 // The operation has decided to suspend,
346 // but no waiter has been provided.
347 return onNoWaiterSuspend(segment, i, element, s)
348 }
349 }
350 }
351 }
352
353 private inline fun sendImplOnNoWaiter(
354 /* The working cell is specified by
355 the segment and the index in it. */
356 segment: ChannelSegment<E>,
357 index: Int,
358 /* The element to be sent. */
359 element: E,
360 /* The global index of the cell. */
361 s: Long,
362 /* The waiter to be stored in case of suspension. */
363 waiter: Waiter,
364 /* This lambda is invoked when the element has been
365 buffered or a rendezvous with a receiver happens.*/
366 onRendezvousOrBuffered: () -> Unit,
367 /* This lambda is called when the channel
368 is observed in the closed state. */
369 onClosed: () -> Unit,
370 ) {
371 // Update the cell again, now with the non-null waiter,
372 // restarting the operation from the beginning on failure.
373 // Check the `sendImpl(..)` function for the comments.
374 when (updateCellSend(segment, index, element, s, waiter, false)) {
375 RESULT_RENDEZVOUS -> {
376 segment.cleanPrev()
377 onRendezvousOrBuffered()
378 }
379 RESULT_BUFFERED -> {
380 onRendezvousOrBuffered()
381 }
382 RESULT_SUSPEND -> {
383 waiter.prepareSenderForSuspension(segment, index)
384 }
385 RESULT_CLOSED -> {
386 if (s < receiversCounter) segment.cleanPrev()
387 onClosed()
388 }
389 RESULT_FAILED -> {
390 segment.cleanPrev()
391 sendImpl(
392 element = element,
393 waiter = waiter,
394 onRendezvousOrBuffered = onRendezvousOrBuffered,
395 onSuspend = { _, _ -> },
396 onClosed = onClosed,
397 )
398 }
399 else -> error("unexpected")
400 }
401 }
402
403 private fun updateCellSend(
404 /* The working cell is specified by
405 the segment and the index in it. */
406 segment: ChannelSegment<E>,
407 index: Int,
408 /* The element to be sent. */
409 element: E,
410 /* The global index of the cell. */
411 s: Long,
412 /* The waiter to be stored in case of suspension. */
413 waiter: Any?,
414 closed: Boolean
415 ): Int {
416 // This is a fast-path of `updateCellSendSlow(..)`.
417 //
418 // First, the algorithm stores the element,
419 // performing the synchronization after that.
420 // This way, receivers safely retrieve the
421 // element, following the safe publication pattern.
422 segment.storeElement(index, element)
423 if (closed) return updateCellSendSlow(segment, index, element, s, waiter, closed)
424 // Read the current cell state.
425 val state = segment.getState(index)
426 when {
427 // The cell is empty.
428 state === null -> {
429 // If the element should be buffered, or a rendezvous should happen
430 // while the receiver is still coming, try to buffer the element.
431 // Otherwise, try to store the specified waiter in the cell.
432 if (bufferOrRendezvousSend(s)) {
433 // Move the cell state to `BUFFERED`.
434 if (segment.casState(index, null, BUFFERED)) {
435 // The element has been successfully buffered, finish.
436 return RESULT_BUFFERED
437 }
438 } else {
439 // This `send(e)` operation should suspend.
440 // However, in case the channel has already
441 // been observed closed, `INTERRUPTED_SEND`
442 // is installed instead.
443 if (waiter == null) {
444 // The waiter is not specified; return the corresponding result.
445 return RESULT_SUSPEND_NO_WAITER
446 } else {
447 // Try to install the waiter.
448 if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
449 }
450 }
451 }
452 // A waiting receiver is stored in the cell.
453 state is Waiter -> {
454 // As the element will be passed directly to the waiter,
455 // the algorithm cleans the element slot in the cell.
456 segment.cleanElement(index)
457 // Try to make a rendezvous with the suspended receiver.
458 return if (state.tryResumeReceiver(element)) {
459 // Rendezvous! Move the cell state to `DONE_RCV` and finish.
460 segment.setState(index, DONE_RCV)
461 onReceiveDequeued()
462 RESULT_RENDEZVOUS
463 } else {
464 // The resumption has failed. Update the cell state correspondingly
465 // and clean the element field. It is also possible for a concurrent
466 // cancellation handler to update the cell state; we can safely
467 // ignore these updates.
468 if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
469 segment.onCancelledRequest(index, true)
470 }
471 RESULT_FAILED
472 }
473 }
474 }
475 return updateCellSendSlow(segment, index, element, s, waiter, closed)
476 }
477
478 /**
479 * Updates the working cell of an abstract send operation.
480 */
481 private fun updateCellSendSlow(
482 /* The working cell is specified by
483 the segment and the index in it. */
484 segment: ChannelSegment<E>,
485 index: Int,
486 /* The element to be sent. */
487 element: E,
488 /* The global index of the cell. */
489 s: Long,
490 /* The waiter to be stored in case of suspension. */
491 waiter: Any?,
492 closed: Boolean
493 ): Int {
494 // Then, the cell state should be updated according to
495 // its state machine; see the paper mentioned in the very
496 // beginning for the cell life-cycle and the algorithm details.
497 while (true) {
498 // Read the current cell state.
499 val state = segment.getState(index)
500 when {
501 // The cell is empty.
502 state === null -> {
503 // If the element should be buffered, or a rendezvous should happen
504 // while the receiver is still coming, try to buffer the element.
505 // Otherwise, try to store the specified waiter in the cell.
506 if (bufferOrRendezvousSend(s) && !closed) {
507 // Move the cell state to `BUFFERED`.
508 if (segment.casState(index, null, BUFFERED)) {
509 // The element has been successfully buffered, finish.
510 return RESULT_BUFFERED
511 }
512 } else {
513 // This `send(e)` operation should suspend.
514 // However, in case the channel has already
515 // been observed closed, `INTERRUPTED_SEND`
516 // is installed instead.
517 when {
518 // The channel is closed
519 closed -> if (segment.casState(index, null, INTERRUPTED_SEND)) {
520 segment.onCancelledRequest(index, false)
521 return RESULT_CLOSED
522 }
523 // The waiter is not specified; return the corresponding result.
524 waiter == null -> return RESULT_SUSPEND_NO_WAITER
525 // Try to install the waiter.
526 else -> if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
527 }
528 }
529 }
530 // This cell is in the logical buffer.
531 state === IN_BUFFER -> {
532 // Try to buffer the element.
533 if (segment.casState(index, state, BUFFERED)) {
534 // The element has been successfully buffered, finish.
535 return RESULT_BUFFERED
536 }
537 }
538 // The cell stores a cancelled receiver.
539 state === INTERRUPTED_RCV -> {
540 // Clean the element slot to avoid memory leaks and finish.
541 segment.cleanElement(index)
542 return RESULT_FAILED
543 }
544 // The cell is poisoned by a concurrent receive.
545 state === POISONED -> {
546 // Clean the element slot to avoid memory leaks and finish.
547 segment.cleanElement(index)
548 return RESULT_FAILED
549 }
550 // The channel is already closed.
551 state === CHANNEL_CLOSED -> {
552 // Clean the element slot to avoid memory leaks,
553 // ensure that the closing/cancellation procedure
554 // has been completed, and finish.
555 segment.cleanElement(index)
556 completeCloseOrCancel()
557 return RESULT_CLOSED
558 }
559 // A waiting receiver is stored in the cell.
560 else -> {
561 assert { state is Waiter || state is WaiterEB }
562 // As the element will be passed directly to the waiter,
563 // the algorithm cleans the element slot in the cell.
564 segment.cleanElement(index)
565 // Unwrap the waiting receiver from `WaiterEB` if needed.
566 // As a receiver is stored in the cell, the buffer expansion
567 // procedure would finish, so senders simply ignore the "EB" marker.
568 val receiver = if (state is WaiterEB) state.waiter else state
569 // Try to make a rendezvous with the suspended receiver.
570 return if (receiver.tryResumeReceiver(element)) {
571 // Rendezvous! Move the cell state to `DONE_RCV` and finish.
572 segment.setState(index, DONE_RCV)
573 onReceiveDequeued()
574 RESULT_RENDEZVOUS
575 } else {
576 // The resumption has failed. Update the cell state correspondingly
577 // and clean the element field. It is also possible for a concurrent
578 // `expandBuffer()` or the cancellation handler to update the cell state;
579 // we can safely ignore these updates as senders do not help `expandBuffer()`.
580 if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
581 segment.onCancelledRequest(index, true)
582 }
583 RESULT_FAILED
584 }
585 }
586 }
587 }
588 }
589
590 /**
591 * Checks whether a [send] invocation is bound to suspend if it is called
592 * with the specified [sendersAndCloseStatus], [receivers], and [bufferEnd]
593 * values. When this channel is already closed, the function returns `false`.
594 *
595 * Specifically, [send] suspends if the channel is not unlimited,
596 * the number of receivers is greater than then index of the working cell of the
597 * potential [send] invocation, and the buffer does not cover this cell
598 * in case of buffered channel.
599 * When the channel is already closed, [send] does not suspend.
600 */
601 @JsName("shouldSendSuspend0")
602 private fun shouldSendSuspend(curSendersAndCloseStatus: Long): Boolean {
603 // Does not suspend if the channel is already closed.
604 if (curSendersAndCloseStatus.isClosedForSend0) return false
605 // Does not suspend if a rendezvous may happen or the buffer is not full.
606 return !bufferOrRendezvousSend(curSendersAndCloseStatus.sendersCounter)
607 }
608
609 /**
610 * Returns `true` when the specified [send] should place
611 * its element to the working cell without suspension.
612 */
613 private fun bufferOrRendezvousSend(curSenders: Long): Boolean =
614 curSenders < bufferEndCounter || curSenders < receiversCounter + capacity
615
616 /**
617 * Checks whether a [send] invocation is bound to suspend if it is called
618 * with the current counter and close status values. See [shouldSendSuspend] for details.
619 *
620 * Note that this implementation is _false positive_ in case of rendezvous channels,
621 * so it can return `false` when a [send] invocation is bound to suspend. Specifically,
622 * the counter of `receive()` operations may indicate that there is a waiting receiver,
623 * while it has already been cancelled, so the potential rendezvous is bound to fail.
624 */
625 internal open fun shouldSendSuspend(): Boolean = shouldSendSuspend(sendersAndCloseStatus.value)
626
627 /**
628 * Tries to resume this receiver with the specified [element] as a result.
629 * Returns `true` on success and `false` otherwise.
630 */
631 @Suppress("UNCHECKED_CAST")
632 private fun Any.tryResumeReceiver(element: E): Boolean = when(this) {
633 is SelectInstance<*> -> { // `onReceiveXXX` select clause
634 trySelect(this@BufferedChannel, element)
635 }
636 is ReceiveCatching<*> -> {
637 this as ReceiveCatching<E>
638 cont.tryResume0(success(element), onUndeliveredElement?.bindCancellationFun(element, cont.context))
639 }
640 is BufferedChannel<*>.BufferedChannelIterator -> {
641 this as BufferedChannel<E>.BufferedChannelIterator
642 tryResumeHasNext(element)
643 }
644 is CancellableContinuation<*> -> { // `receive()`
645 this as CancellableContinuation<E>
646 tryResume0(element, onUndeliveredElement?.bindCancellationFun(element, context))
647 }
648 else -> error("Unexpected receiver type: $this")
649 }
650
651 // ##########################
652 // # The receive operations #
653 // ##########################
654
655 /**
656 * This function is invoked when a receiver is added as a waiter in this channel.
657 */
658 protected open fun onReceiveEnqueued() {}
659
660 /**
661 * This function is invoked when a waiting receiver is no longer stored in this channel;
662 * independently on whether it is caused by rendezvous, cancellation, or channel closing.
663 */
664 protected open fun onReceiveDequeued() {}
665
666 override suspend fun receive(): E =
667 receiveImpl( // <-- this is an inline function
668 // Do not create a continuation until it is required;
669 // it is created later via [onNoWaiterSuspend], if needed.
670 waiter = null,
671 // Return the received element on successful retrieval from
672 // the buffer or rendezvous with a suspended sender.
673 // Also, inform `BufferedChannel` extensions that
674 // synchronization of this receive operation is completed.
675 onElementRetrieved = { element ->
676 return element
677 },
678 // As no waiter is provided, suspension is impossible.
679 onSuspend = { _, _, _ -> error("unexpected") },
680 // Throw an exception if the channel is already closed.
681 onClosed = { throw recoverStackTrace(receiveException) },
682 // If `receive()` decides to suspend, the corresponding
683 // `suspend` function that creates a continuation is called.
684 // The tail-call optimization is applied here.
685 onNoWaiterSuspend = { segm, i, r -> receiveOnNoWaiterSuspend(segm, i, r) }
686 )
687
688 private suspend fun receiveOnNoWaiterSuspend(
689 /* The working cell is specified by
690 the segment and the index in it. */
691 segment: ChannelSegment<E>,
692 index: Int,
693 /* The global index of the cell. */
694 r: Long
695 ) = suspendCancellableCoroutineReusable { cont ->
696 receiveImplOnNoWaiter( // <-- this is an inline function
697 segment = segment, index = index, r = r,
698 // Store the created continuation as a waiter.
699 waiter = cont,
700 // In case of successful element retrieval, resume
701 // the continuation with the element and inform the
702 // `BufferedChannel` extensions that the synchronization
703 // is completed. Importantly, the receiver coroutine
704 // may be cancelled after it is successfully resumed but
705 // not dispatched yet. In case `onUndeliveredElement` is
706 // specified, we need to invoke it in the latter case.
707 onElementRetrieved = { element ->
708 val onCancellation = onUndeliveredElement?.bindCancellationFun(element, cont.context)
709 cont.resume(element, onCancellation)
710 },
711 onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
712 )
713 }
714
715 private fun Waiter.prepareReceiverForSuspension(segment: ChannelSegment<E>, index: Int) {
716 onReceiveEnqueued()
717 invokeOnCancellation(segment, index)
718 }
719
720 private fun onClosedReceiveOnNoWaiterSuspend(cont: CancellableContinuation<E>) {
721 cont.resumeWithException(receiveException)
722 }
723
724 /*
725 The implementation is exactly the same as of `receive()`,
726 with the only difference that this function returns a `ChannelResult`
727 instance and does not throw exception explicitly in case the channel
728 is already closed for receiving. Please refer the plain `receive()`
729 implementation for the comments.
730 */
731 override suspend fun receiveCatching(): ChannelResult<E> =
732 receiveImpl( // <-- this is an inline function
733 waiter = null,
734 onElementRetrieved = { element ->
735 success(element)
736 },
737 onSuspend = { _, _, _ -> error("unexpected") },
738 onClosed = { closed(closeCause) },
739 onNoWaiterSuspend = { segm, i, r -> receiveCatchingOnNoWaiterSuspend(segm, i, r) }
740 )
741
742 private suspend fun receiveCatchingOnNoWaiterSuspend(
743 segment: ChannelSegment<E>,
744 index: Int,
745 r: Long
746 ) = suspendCancellableCoroutineReusable { cont ->
747 val waiter = ReceiveCatching(cont as CancellableContinuationImpl<ChannelResult<E>>)
748 receiveImplOnNoWaiter(
749 segment, index, r,
750 waiter = waiter,
751 onElementRetrieved = { element ->
752 cont.resume(success(element), onUndeliveredElement?.bindCancellationFun(element, cont.context))
753 },
754 onClosed = { onClosedReceiveCatchingOnNoWaiterSuspend(cont) }
755 )
756 }
757
758 private fun onClosedReceiveCatchingOnNoWaiterSuspend(cont: CancellableContinuation<ChannelResult<E>>) {
759 cont.resume(closed(closeCause))
760 }
761
762 override fun tryReceive(): ChannelResult<E> {
763 // Read the `receivers` counter first.
764 val r = receivers.value
765 val sendersAndCloseStatusCur = sendersAndCloseStatus.value
766 // Is this channel closed for receive?
767 if (sendersAndCloseStatusCur.isClosedForReceive0) {
768 return closed(closeCause)
769 }
770 // Do not try to receive an element if the plain `receive()` operation would suspend.
771 val s = sendersAndCloseStatusCur.sendersCounter
772 if (r >= s) return failure()
773 // Let's try to retrieve an element!
774 // The logic is similar to the plain `receive()` operation, with
775 // the only difference that we store `INTERRUPTED_RCV` in case
776 // the operation decides to suspend. This way, we can leverage
777 // the unconditional `Fetch-and-Add` instruction.
778 // One may consider storing `INTERRUPTED_RCV` instead of an actual waiter
779 // on suspension (a.k.a. "no elements to retrieve") as a short-cut of
780 // "suspending and cancelling immediately".
781 return receiveImpl( // <-- this is an inline function
782 // Store an already interrupted receiver in case of suspension.
783 waiter = INTERRUPTED_RCV,
784 // Finish when an element is successfully retrieved.
785 onElementRetrieved = { element -> success(element) },
786 // On suspension, the `INTERRUPTED_RCV` token has been
787 // installed, and this `tryReceive()` must fail.
788 onSuspend = { segm, _, globalIndex ->
789 // Emulate "cancelled" receive, thus invoking 'waitExpandBufferCompletion' manually,
790 // because effectively there were no cancellation
791 waitExpandBufferCompletion(globalIndex)
792 segm.onSlotCleaned()
793 failure()
794 },
795 // If the channel is closed, return the corresponding result.
796 onClosed = { closed(closeCause) }
797 )
798 }
799
800 /**
801 * Extracts the first element from this channel until the cell with the specified
802 * index is moved to the logical buffer. This is a key procedure for the _conflated_
803 * channel implementation, see [ConflatedBufferedChannel] with the [BufferOverflow.DROP_OLDEST]
804 * strategy on buffer overflowing.
805 */
806 protected fun dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(globalCellIndex: Long) {
807 assert { isConflatedDropOldest }
808 // Read the segment reference before the counter increment;
809 // it is crucial to be able to find the required segment later.
810 var segment = receiveSegment.value
811 while (true) {
812 // Read the receivers counter to check whether the specified cell is already in the buffer
813 // or should be moved to the buffer in a short time, due to the already started `receive()`.
814 val r = this.receivers.value
815 if (globalCellIndex < max(r + capacity, bufferEndCounter)) return
816 // The cell is outside the buffer. Try to extract the first element
817 // if the `receivers` counter has not been changed.
818 if (!this.receivers.compareAndSet(r, r + 1)) continue
819 // Count the required segment id and the cell index in it.
820 val id = r / SEGMENT_SIZE
821 val i = (r % SEGMENT_SIZE).toInt()
822 // Try to find the required segment if the initially obtained
823 // segment (in the beginning of this function) has lower id.
824 if (segment.id != id) {
825 // Find the required segment, restarting the operation if it has not been found.
826 segment = findSegmentReceive(id, segment) ?:
827 // The required segment has not been found. It is possible that the channel is already
828 // closed for receiving, so the linked list of segments is closed as well.
829 // In the latter case, the operation will finish eventually after incrementing
830 // the `receivers` counter sufficient times. Note that it is impossible to check
831 // whether this channel is closed for receiving (we do this in `receive`),
832 // as it may call this function when helping to complete closing the channel.
833 continue
834 }
835 // Update the cell according to the cell life-cycle.
836 val updCellResult = updateCellReceive(segment, i, r, null)
837 when {
838 updCellResult === FAILED -> {
839 // The cell is poisoned; restart from the beginning.
840 // To avoid memory leaks, we also need to reset
841 // the `prev` pointer of the working segment.
842 if (r < sendersCounter) segment.cleanPrev()
843 }
844 else -> { // element
845 // A buffered element was retrieved from the cell.
846 // Clean the reference to the previous segment.
847 segment.cleanPrev()
848 @Suppress("UNCHECKED_CAST")
849 onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it }
850 }
851 }
852 }
853 }
854
855 /**
856 * Abstract receive implementation.
857 */
858 private inline fun <R> receiveImpl(
859 /* The waiter to be stored in case of suspension,
860 or `null` if the waiter is not created yet.
861 In the latter case, if the algorithm decides
862 to suspend, [onNoWaiterSuspend] is called. */
863 waiter: Any?,
864 /* This lambda is invoked when an element has been
865 successfully retrieved, either from the buffer or
866 by making a rendezvous with a suspended sender. */
867 onElementRetrieved: (element: E) -> R,
868 /* This lambda is called when the operation suspends in the cell
869 specified by the segment and its global and in-segment indices. */
870 onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
871 /* This lambda is called when the channel is observed
872 in the closed state and no waiting sender is found,
873 which means that it is closed for receiving. */
874 onClosed: () -> R,
875 /* This lambda is called when the operation decides
876 to suspend, but the waiter is not provided (equals `null`).
877 It should create a waiter and delegate to `sendImplOnNoWaiter`. */
878 onNoWaiterSuspend: (
879 segm: ChannelSegment<E>,
880 i: Int,
881 r: Long
882 ) -> R = { _, _, _ -> error("unexpected") }
883 ): R {
884 // Read the segment reference before the counter increment;
885 // it is crucial to be able to find the required segment later.
886 var segment = receiveSegment.value
887 while (true) {
888 // Similar to the `send(e)` operation, `receive()` first checks
889 // whether the channel is already closed for receiving.
890 if (isClosedForReceive) return onClosed()
891 // Atomically increments the `receivers` counter
892 // and obtain the value right before the increment.
893 val r = this.receivers.getAndIncrement()
894 // Count the required segment id and the cell index in it.
895 val id = r / SEGMENT_SIZE
896 val i = (r % SEGMENT_SIZE).toInt()
897 // Try to find the required segment if the initially obtained
898 // segment (in the beginning of this function) has lower id.
899 if (segment.id != id) {
900 // Find the required segment, restarting the operation if it has not been found.
901 segment = findSegmentReceive(id, segment) ?:
902 // The required segment is not found. It is possible that the channel is already
903 // closed for receiving, so the linked list of segments is closed as well.
904 // In the latter case, the operation fails with the corresponding check at the beginning.
905 continue
906 }
907 // Update the cell according to the cell life-cycle.
908 val updCellResult = updateCellReceive(segment, i, r, waiter)
909 return when {
910 updCellResult === SUSPEND -> {
911 // The operation has decided to suspend and
912 // stored the specified waiter in the cell.
913 (waiter as? Waiter)?.prepareReceiverForSuspension(segment, i)
914 onSuspend(segment, i, r)
915 }
916 updCellResult === FAILED -> {
917 // The operation has tried to make a rendezvous
918 // but failed: either the opposite request has
919 // already been cancelled or the cell is poisoned.
920 // Restart from the beginning in this case.
921 // To avoid memory leaks, we also need to reset
922 // the `prev` pointer of the working segment.
923 if (r < sendersCounter) segment.cleanPrev()
924 continue
925 }
926 updCellResult === SUSPEND_NO_WAITER -> {
927 // The operation has decided to suspend,
928 // but no waiter has been provided.
929 onNoWaiterSuspend(segment, i, r)
930 }
931 else -> { // element
932 // Either a buffered element was retrieved from the cell
933 // or a rendezvous with a waiting sender has happened.
934 // Clean the reference to the previous segment before finishing.
935 segment.cleanPrev()
936 @Suppress("UNCHECKED_CAST")
937 onElementRetrieved(updCellResult as E)
938 }
939 }
940 }
941 }
942
943 private inline fun receiveImplOnNoWaiter(
944 /* The working cell is specified by
945 the segment and the index in it. */
946 segment: ChannelSegment<E>,
947 index: Int,
948 /* The global index of the cell. */
949 r: Long,
950 /* The waiter to be stored in case of suspension. */
951 waiter: Waiter,
952 /* This lambda is invoked when an element has been
953 successfully retrieved, either from the buffer or
954 by making a rendezvous with a suspended sender. */
955 onElementRetrieved: (element: E) -> Unit,
956 /* This lambda is called when the channel is observed
957 in the closed state and no waiting senders is found,
958 which means that it is closed for receiving. */
959 onClosed: () -> Unit
960 ) {
961 // Update the cell with the non-null waiter,
962 // restarting from the beginning on failure.
963 // Check the `receiveImpl(..)` function for the comments.
964 val updCellResult = updateCellReceive(segment, index, r, waiter)
965 when {
966 updCellResult === SUSPEND -> {
967 waiter.prepareReceiverForSuspension(segment, index)
968 }
969 updCellResult === FAILED -> {
970 if (r < sendersCounter) segment.cleanPrev()
971 receiveImpl(
972 waiter = waiter,
973 onElementRetrieved = onElementRetrieved,
974 onSuspend = { _, _, _ -> },
975 onClosed = onClosed
976 )
977 }
978 else -> {
979 segment.cleanPrev()
980 @Suppress("UNCHECKED_CAST")
981 onElementRetrieved(updCellResult as E)
982 }
983 }
984 }
985
986 private fun updateCellReceive(
987 /* The working cell is specified by
988 the segment and the index in it. */
989 segment: ChannelSegment<E>,
990 index: Int,
991 /* The global index of the cell. */
992 r: Long,
993 /* The waiter to be stored in case of suspension. */
994 waiter: Any?,
995 ): Any? {
996 // This is a fast-path of `updateCellReceiveSlow(..)`.
997 //
998 // Read the current cell state.
999 val state = segment.getState(index)
1000 when {
1001 // The cell is empty.
1002 state === null -> {
1003 // If a rendezvous must happen, the operation does not wait
1004 // until the cell stores a buffered element or a suspended
1005 // sender, poisoning the cell and restarting instead.
1006 // Otherwise, try to store the specified waiter in the cell.
1007 val senders = sendersAndCloseStatus.value.sendersCounter
1008 if (r >= senders) {
1009 // This `receive()` operation should suspend.
1010 if (waiter === null) {
1011 // The waiter is not specified;
1012 // return the corresponding result.
1013 return SUSPEND_NO_WAITER
1014 }
1015 // Try to install the waiter.
1016 if (segment.casState(index, state, waiter)) {
1017 // The waiter has been successfully installed.
1018 // Invoke the `expandBuffer()` procedure and finish.
1019 expandBuffer()
1020 return SUSPEND
1021 }
1022 }
1023 }
1024 // The cell stores a buffered element.
1025 state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
1026 // Retrieve the element and expand the buffer.
1027 expandBuffer()
1028 return segment.retrieveElement(index)
1029 }
1030 }
1031 return updateCellReceiveSlow(segment, index, r, waiter)
1032 }
1033
1034 private fun updateCellReceiveSlow(
1035 /* The working cell is specified by
1036 the segment and the index in it. */
1037 segment: ChannelSegment<E>,
1038 index: Int,
1039 /* The global index of the cell. */
1040 r: Long,
1041 /* The waiter to be stored in case of suspension. */
1042 waiter: Any?,
1043 ): Any? {
1044 // The cell state should be updated according to its state machine;
1045 // see the paper mentioned in the very beginning for the algorithm details.
1046 while (true) {
1047 // Read the current cell state.
1048 val state = segment.getState(index)
1049 when {
1050 // The cell is empty.
1051 state === null || state === IN_BUFFER -> {
1052 // If a rendezvous must happen, the operation does not wait
1053 // until the cell stores a buffered element or a suspended
1054 // sender, poisoning the cell and restarting instead.
1055 // Otherwise, try to store the specified waiter in the cell.
1056 val senders = sendersAndCloseStatus.value.sendersCounter
1057 if (r < senders) {
1058 // The cell is already covered by sender,
1059 // so a rendezvous must happen. Unfortunately,
1060 // the cell is empty, so the operation poisons it.
1061 if (segment.casState(index, state, POISONED)) {
1062 // When the cell becomes poisoned, it is essentially
1063 // the same as storing an already cancelled receiver.
1064 // Thus, the `expandBuffer()` procedure should be invoked.
1065 expandBuffer()
1066 return FAILED
1067 }
1068 } else {
1069 // This `receive()` operation should suspend.
1070 if (waiter === null) {
1071 // The waiter is not specified;
1072 // return the corresponding result.
1073 return SUSPEND_NO_WAITER
1074 }
1075 // Try to install the waiter.
1076 if (segment.casState(index, state, waiter)) {
1077 // The waiter has been successfully installed.
1078 // Invoke the `expandBuffer()` procedure and finish.
1079 expandBuffer()
1080 return SUSPEND
1081 }
1082 }
1083 }
1084 // The cell stores a buffered element.
1085 state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
1086 // Retrieve the element and expand the buffer.
1087 expandBuffer()
1088 return segment.retrieveElement(index)
1089 }
1090 // The cell stores an interrupted sender.
1091 state === INTERRUPTED_SEND -> return FAILED
1092 // The cell is already poisoned by a concurrent
1093 // `hasElements` call. Restart in this case.
1094 state === POISONED -> return FAILED
1095 // This channel is already closed.
1096 state === CHANNEL_CLOSED -> {
1097 // Although the channel is closed, it is still required
1098 // to call the `expandBuffer()` procedure to keep
1099 // `waitForExpandBufferCompletion()` correct.
1100 expandBuffer()
1101 return FAILED
1102 }
1103 // A concurrent `expandBuffer()` is resuming a
1104 // suspended sender. Wait in a spin-loop until
1105 // the resumption attempt completes: the cell
1106 // state must change to either `BUFFERED` or
1107 // `INTERRUPTED_SEND`.
1108 state === RESUMING_BY_EB -> continue
1109 // The cell stores a suspended sender; try to resume it.
1110 else -> {
1111 // To synchronize with expandBuffer(), the algorithm
1112 // first moves the cell to an intermediate `S_RESUMING_BY_RCV`
1113 // state, updating it to either `BUFFERED` (on success) or
1114 // `INTERRUPTED_SEND` (on failure).
1115 if (segment.casState(index, state, RESUMING_BY_RCV)) {
1116 // Has a concurrent `expandBuffer()` delegated its completion?
1117 val helpExpandBuffer = state is WaiterEB
1118 // Extract the sender if needed and try to resume it.
1119 val sender = if (state is WaiterEB) state.waiter else state
1120 return if (sender.tryResumeSender(segment, index)) {
1121 // The sender has been resumed successfully!
1122 // Update the cell state correspondingly,
1123 // expand the buffer, and return the element
1124 // stored in the cell.
1125 // In case a concurrent `expandBuffer()` has delegated
1126 // its completion, the procedure should finish, as the
1127 // sender is resumed. Thus, no further action is required.
1128 segment.setState(index, DONE_RCV)
1129 expandBuffer()
1130 segment.retrieveElement(index)
1131 } else {
1132 // The resumption has failed. Update the cell correspondingly.
1133 // In case a concurrent `expandBuffer()` has delegated
1134 // its completion, the procedure should skip this cell, so
1135 // `expandBuffer()` should be called once again.
1136 segment.setState(index, INTERRUPTED_SEND)
1137 segment.onCancelledRequest(index, false)
1138 if (helpExpandBuffer) expandBuffer()
1139 FAILED
1140 }
1141 }
1142 }
1143 }
1144 }
1145 }
1146
1147 private fun Any.tryResumeSender(segment: ChannelSegment<E>, index: Int): Boolean = when (this) {
1148 is CancellableContinuation<*> -> { // suspended `send(e)` operation
1149 @Suppress("UNCHECKED_CAST")
1150 this as CancellableContinuation<Unit>
1151 tryResume0(Unit)
1152 }
1153 is SelectInstance<*> -> {
1154 this as SelectImplementation<*>
1155 val trySelectResult = trySelectDetailed(clauseObject = this@BufferedChannel, result = Unit)
1156 // Clean the element slot to avoid memory leaks
1157 // if this `select` clause should be re-registered.
1158 if (trySelectResult === REREGISTER) segment.cleanElement(index)
1159 // Was the resumption successful?
1160 trySelectResult === SUCCESSFUL
1161 }
1162 is SendBroadcast -> cont.tryResume0(true) // // suspended `sendBroadcast(e)` operation
1163 else -> error("Unexpected waiter: $this")
1164 }
1165
1166 // ################################
1167 // # The expandBuffer() procedure #
1168 // ################################
1169
1170 private fun expandBuffer() {
1171 // Do not need to take any action if
1172 // this channel is rendezvous or unlimited.
1173 if (isRendezvousOrUnlimited) return
1174 // Read the current segment of
1175 // the `expandBuffer()` procedure.
1176 var segment = bufferEndSegment.value
1177 // Try to expand the buffer until succeed.
1178 try_again@ while (true) {
1179 // Increment the logical end of the buffer.
1180 // The `b`-th cell is going to be added to the buffer.
1181 val b = bufferEnd.getAndIncrement()
1182 val id = b / SEGMENT_SIZE
1183 // After that, read the current `senders` counter.
1184 // In case its value is lower than `b`, the `send(e)`
1185 // invocation that will work with this `b`-th cell
1186 // will detect that the cell is already a part of the
1187 // buffer when comparing with the `bufferEnd` counter.
1188 // However, `bufferEndSegment` may reference an outdated
1189 // segment, which should be updated to avoid memory leaks.
1190 val s = sendersCounter
1191 if (s <= b) {
1192 // Should `bufferEndSegment` be moved forward to avoid memory leaks?
1193 if (segment.id < id && segment.next != null)
1194 moveSegmentBufferEndToSpecifiedOrLast(id, segment)
1195 // Increment the number of completed `expandBuffer()`-s and finish.
1196 incCompletedExpandBufferAttempts()
1197 return
1198 }
1199 // Is `bufferEndSegment` outdated or is the segment with the required id already removed?
1200 // Find the required segment, creating new ones if needed.
1201 if (segment.id != id) {
1202 segment = findSegmentBufferEnd(id, segment, b)
1203 // Restart if the required segment is removed, or
1204 // the linked list of segments is already closed,
1205 // and the required one will never be created.
1206 // Please note that `findSegmentBuffer(..)` updates
1207 // the number of completed `expandBuffer()` attempt
1208 // in this case.
1209 ?: continue@try_again
1210 }
1211 // Try to add the cell to the logical buffer,
1212 // updating the cell state according to the state-machine.
1213 val i = (b % SEGMENT_SIZE).toInt()
1214 if (updateCellExpandBuffer(segment, i, b)) {
1215 // The cell has been added to the logical buffer!
1216 // Increment the number of completed `expandBuffer()`-s and finish.
1217 //
1218 // Note that it is possible to increment the number of
1219 // completed `expandBuffer()` attempts earlier, right
1220 // after the segment is obtained. We find this change
1221 // counter-intuitive and prefer to avoid it.
1222 incCompletedExpandBufferAttempts()
1223 return
1224 } else {
1225 // The cell has not been added to the buffer.
1226 // Increment the number of completed `expandBuffer()`
1227 // attempts and restart.
1228 incCompletedExpandBufferAttempts()
1229 continue@try_again
1230 }
1231 }
1232 }
1233
1234 private fun updateCellExpandBuffer(
1235 /* The working cell is specified by
1236 the segment and the index in it. */
1237 segment: ChannelSegment<E>,
1238 index: Int,
1239 /* The global index of the cell. */
1240 b: Long
1241 ): Boolean {
1242 // This is a fast-path of `updateCellExpandBufferSlow(..)`.
1243 //
1244 // Read the current cell state.
1245 val state = segment.getState(index)
1246 if (state is Waiter) {
1247 // Usually, a sender is stored in the cell.
1248 // However, it is possible for a concurrent
1249 // receiver to be already suspended there.
1250 // Try to distinguish whether the waiter is a
1251 // sender by comparing the global cell index with
1252 // the `receivers` counter. In case the cell is not
1253 // covered by a receiver, a sender is stored in the cell.
1254 if (b >= receivers.value) {
1255 // The cell stores a suspended sender. Try to resume it.
1256 // To synchronize with a concurrent `receive()`, the algorithm
1257 // first moves the cell state to an intermediate `RESUMING_BY_EB`
1258 // state, updating it to either `BUFFERED` (on successful resumption)
1259 // or `INTERRUPTED_SEND` (on failure).
1260 if (segment.casState(index, state, RESUMING_BY_EB)) {
1261 return if (state.tryResumeSender(segment, index)) {
1262 // The sender has been resumed successfully!
1263 // Move the cell to the logical buffer and finish.
1264 segment.setState(index, BUFFERED)
1265 true
1266 } else {
1267 // The resumption has failed.
1268 segment.setState(index, INTERRUPTED_SEND)
1269 segment.onCancelledRequest(index, false)
1270 false
1271 }
1272 }
1273 }
1274 }
1275 return updateCellExpandBufferSlow(segment, index, b)
1276 }
1277
1278 private fun updateCellExpandBufferSlow(
1279 /* The working cell is specified by
1280 the segment and the index in it. */
1281 segment: ChannelSegment<E>,
1282 index: Int,
1283 /* The global index of the cell. */
1284 b: Long
1285 ): Boolean {
1286 // Update the cell state according to its state machine.
1287 // See the paper mentioned in the very beginning for
1288 // the cell life-cycle and the algorithm details.
1289 while (true) {
1290 // Read the current cell state.
1291 val state = segment.getState(index)
1292 when {
1293 // A suspended waiter, sender or receiver.
1294 state is Waiter -> {
1295 // Usually, a sender is stored in the cell.
1296 // However, it is possible for a concurrent
1297 // receiver to be already suspended there.
1298 // Try to distinguish whether the waiter is a
1299 // sender by comparing the global cell index with
1300 // the `receivers` counter. In case the cell is not
1301 // covered by a receiver, a sender is stored in the cell.
1302 if (b < receivers.value) {
1303 // The algorithm cannot distinguish whether the
1304 // suspended in the cell operation is sender or receiver.
1305 // To make progress, `expandBuffer()` delegates its completion
1306 // to an upcoming pairwise request, atomically wrapping
1307 // the waiter in `WaiterEB`. In case a sender is stored
1308 // in the cell, the upcoming receiver will call `expandBuffer()`
1309 // if the sender resumption fails; thus, effectively, skipping
1310 // this cell. Otherwise, if a receiver is stored in the cell,
1311 // this `expandBuffer()` procedure must finish; therefore,
1312 // sender ignore the `WaiterEB` wrapper.
1313 if (segment.casState(index, state, WaiterEB(waiter = state)))
1314 return true
1315 } else {
1316 // The cell stores a suspended sender. Try to resume it.
1317 // To synchronize with a concurrent `receive()`, the algorithm
1318 // first moves the cell state to an intermediate `RESUMING_BY_EB`
1319 // state, updating it to either `BUFFERED` (on successful resumption)
1320 // or `INTERRUPTED_SEND` (on failure).
1321 if (segment.casState(index, state, RESUMING_BY_EB)) {
1322 return if (state.tryResumeSender(segment, index)) {
1323 // The sender has been resumed successfully!
1324 // Move the cell to the logical buffer and finish.
1325 segment.setState(index, BUFFERED)
1326 true
1327 } else {
1328 // The resumption has failed.
1329 segment.setState(index, INTERRUPTED_SEND)
1330 segment.onCancelledRequest(index, false)
1331 false
1332 }
1333 }
1334 }
1335 }
1336 // The cell stores an interrupted sender, skip it.
1337 state === INTERRUPTED_SEND -> return false
1338 // The cell is empty, a concurrent sender is coming.
1339 state === null -> {
1340 // To inform a concurrent sender that this cell is
1341 // already a part of the buffer, the algorithm moves
1342 // it to a special `IN_BUFFER` state.
1343 if (segment.casState(index, state, IN_BUFFER)) return true
1344 }
1345 // The cell is already a part of the buffer, finish.
1346 state === BUFFERED -> return true
1347 // The cell is already processed by a receiver, no further action is required.
1348 state === POISONED || state === DONE_RCV || state === INTERRUPTED_RCV -> return true
1349 // The channel is closed, all the following
1350 // cells are already in the same state, finish.
1351 state === CHANNEL_CLOSED -> return true
1352 // A concurrent receiver is resuming the suspended sender.
1353 // Wait in a spin-loop until it changes the cell state
1354 // to either `DONE_RCV` or `INTERRUPTED_SEND`.
1355 state === RESUMING_BY_RCV -> continue // spin wait
1356 else -> error("Unexpected cell state: $state")
1357 }
1358 }
1359 }
1360
1361 /**
1362 * Increments the counter of completed [expandBuffer] invocations.
1363 * To guarantee starvation-freedom for [waitExpandBufferCompletion],
1364 * which waits until the counters of started and completed [expandBuffer] calls
1365 * coincide and become greater or equal to the specified value,
1366 * [waitExpandBufferCompletion] may set a flag that pauses further progress.
1367 */
1368 private fun incCompletedExpandBufferAttempts(nAttempts: Long = 1) {
1369 // Increment the number of completed `expandBuffer()` calls.
1370 completedExpandBuffersAndPauseFlag.addAndGet(nAttempts).also {
1371 // Should further `expandBuffer()`-s be paused?
1372 // If so, this thread should wait in a spin-loop
1373 // until the flag is unset.
1374 if (it.ebPauseExpandBuffers) {
1375 @Suppress("ControlFlowWithEmptyBody")
1376 while (completedExpandBuffersAndPauseFlag.value.ebPauseExpandBuffers) {}
1377 }
1378 }
1379 }
1380
1381 /**
1382 * Waits in a spin-loop until the [expandBuffer] call that
1383 * should process the [globalIndex]-th cell is completed.
1384 * Essentially, it waits until the numbers of started ([bufferEnd])
1385 * and completed ([completedExpandBuffersAndPauseFlag]) [expandBuffer]
1386 * attempts coincide and become equal or greater than [globalIndex].
1387 * To avoid starvation, this function may set a flag
1388 * that pauses further progress.
1389 */
1390 internal fun waitExpandBufferCompletion(globalIndex: Long) {
1391 // Do nothing if this channel is rendezvous or unlimited;
1392 // `expandBuffer()` is not used in these cases.
1393 if (isRendezvousOrUnlimited) return
1394 // Wait in an infinite loop until the number of started
1395 // buffer expansion calls become not lower than the cell index.
1396 @Suppress("ControlFlowWithEmptyBody")
1397 while (bufferEndCounter <= globalIndex) {}
1398 // Now it is guaranteed that the `expandBuffer()` call that
1399 // should process the required cell has been started.
1400 // Wait in a fixed-size spin-loop until the numbers of
1401 // started and completed buffer expansion calls coincide.
1402 repeat(EXPAND_BUFFER_COMPLETION_WAIT_ITERATIONS) {
1403 // Read the number of started buffer expansion calls.
1404 val b = bufferEndCounter
1405 // Read the number of completed buffer expansion calls.
1406 val ebCompleted = completedExpandBuffersAndPauseFlag.value.ebCompletedCounter
1407 // Do the numbers of started and completed calls coincide?
1408 // Note that we need to re-read the number of started `expandBuffer()`
1409 // calls to obtain a correct snapshot.
1410 // Here we wait to a precise match in order to ensure that **our matching expandBuffer()**
1411 // completed. The only way to ensure that is to check that number of started expands == number of finished expands
1412 if (b == ebCompleted && b == bufferEndCounter) return
1413 }
1414 // To avoid starvation, pause further `expandBuffer()` calls.
1415 completedExpandBuffersAndPauseFlag.update {
1416 constructEBCompletedAndPauseFlag(it.ebCompletedCounter, true)
1417 }
1418 // Now wait in an infinite spin-loop until the counters coincide.
1419 while (true) {
1420 // Read the number of started buffer expansion calls.
1421 val b = bufferEndCounter
1422 // Read the number of completed buffer expansion calls
1423 // along with the flag that pauses further progress.
1424 val ebCompletedAndBit = completedExpandBuffersAndPauseFlag.value
1425 val ebCompleted = ebCompletedAndBit.ebCompletedCounter
1426 val pauseExpandBuffers = ebCompletedAndBit.ebPauseExpandBuffers
1427 // Do the numbers of started and completed calls coincide?
1428 // Note that we need to re-read the number of started `expandBuffer()`
1429 // calls to obtain a correct snapshot.
1430 if (b == ebCompleted && b == bufferEndCounter) {
1431 // Unset the flag, which pauses progress, and finish.
1432 completedExpandBuffersAndPauseFlag.update {
1433 constructEBCompletedAndPauseFlag(it.ebCompletedCounter, false)
1434 }
1435 return
1436 }
1437 // It is possible that a concurrent caller of this function
1438 // has unset the flag, which pauses further progress to avoid
1439 // starvation. In this case, set the flag back.
1440 if (!pauseExpandBuffers) {
1441 completedExpandBuffersAndPauseFlag.compareAndSet(
1442 ebCompletedAndBit,
1443 constructEBCompletedAndPauseFlag(ebCompleted, true)
1444 )
1445 }
1446 }
1447 }
1448
1449
1450 // #######################
1451 // ## Select Expression ##
1452 // #######################
1453
1454 @Suppress("UNCHECKED_CAST")
1455 override val onSend: SelectClause2<E, BufferedChannel<E>>
1456 get() = SelectClause2Impl(
1457 clauseObject = this@BufferedChannel,
1458 regFunc = BufferedChannel<*>::registerSelectForSend as RegistrationFunction,
1459 processResFunc = BufferedChannel<*>::processResultSelectSend as ProcessResultFunction
1460 )
1461
1462 @Suppress("UNCHECKED_CAST")
1463 protected open fun registerSelectForSend(select: SelectInstance<*>, element: Any?) =
1464 sendImpl( // <-- this is an inline function
1465 element = element as E,
1466 waiter = select,
1467 onRendezvousOrBuffered = { select.selectInRegistrationPhase(Unit) },
1468 onSuspend = { _, _ -> },
1469 onClosed = { onClosedSelectOnSend(element, select) }
1470 )
1471
1472
1473 private fun onClosedSelectOnSend(element: E, select: SelectInstance<*>) {
1474 onUndeliveredElement?.callUndeliveredElement(element, select.context)
1475 select.selectInRegistrationPhase(CHANNEL_CLOSED)
1476 }
1477
1478 @Suppress("UNUSED_PARAMETER", "RedundantNullableReturnType")
1479 private fun processResultSelectSend(ignoredParam: Any?, selectResult: Any?): Any? =
1480 if (selectResult === CHANNEL_CLOSED) throw sendException
1481 else this
1482
1483 @Suppress("UNCHECKED_CAST")
1484 override val onReceive: SelectClause1<E>
1485 get() = SelectClause1Impl(
1486 clauseObject = this@BufferedChannel,
1487 regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1488 processResFunc = BufferedChannel<*>::processResultSelectReceive as ProcessResultFunction,
1489 onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1490 )
1491
1492 @Suppress("UNCHECKED_CAST")
1493 override val onReceiveCatching: SelectClause1<ChannelResult<E>>
1494 get() = SelectClause1Impl(
1495 clauseObject = this@BufferedChannel,
1496 regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1497 processResFunc = BufferedChannel<*>::processResultSelectReceiveCatching as ProcessResultFunction,
1498 onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1499 )
1500
1501 @Suppress("OVERRIDE_DEPRECATION", "UNCHECKED_CAST")
1502 override val onReceiveOrNull: SelectClause1<E?>
1503 get() = SelectClause1Impl(
1504 clauseObject = this@BufferedChannel,
1505 regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1506 processResFunc = BufferedChannel<*>::processResultSelectReceiveOrNull as ProcessResultFunction,
1507 onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1508 )
1509
1510 @Suppress("UNUSED_PARAMETER")
1511 private fun registerSelectForReceive(select: SelectInstance<*>, ignoredParam: Any?) =
1512 receiveImpl( // <-- this is an inline function
1513 waiter = select,
1514 onElementRetrieved = { elem -> select.selectInRegistrationPhase(elem) },
1515 onSuspend = { _, _, _ -> },
1516 onClosed = { onClosedSelectOnReceive(select) }
1517 )
1518
1519 private fun onClosedSelectOnReceive(select: SelectInstance<*>) {
1520 select.selectInRegistrationPhase(CHANNEL_CLOSED)
1521 }
1522
1523 @Suppress("UNUSED_PARAMETER")
1524 private fun processResultSelectReceive(ignoredParam: Any?, selectResult: Any?): Any? =
1525 if (selectResult === CHANNEL_CLOSED) throw receiveException
1526 else selectResult
1527
1528 @Suppress("UNUSED_PARAMETER")
1529 private fun processResultSelectReceiveOrNull(ignoredParam: Any?, selectResult: Any?): Any? =
1530 if (selectResult === CHANNEL_CLOSED) {
1531 if (closeCause == null) null
1532 else throw receiveException
1533 } else selectResult
1534
1535 @Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER", "RedundantNullableReturnType")
1536 private fun processResultSelectReceiveCatching(ignoredParam: Any?, selectResult: Any?): Any? =
1537 if (selectResult === CHANNEL_CLOSED) closed(closeCause)
1538 else success(selectResult as E)
1539
1540 @Suppress("UNCHECKED_CAST")
1541 private val onUndeliveredElementReceiveCancellationConstructor: OnCancellationConstructor? = onUndeliveredElement?.let {
1542 { select: SelectInstance<*>, _: Any?, element: Any? ->
1543 { if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(element as E, select.context) }
1544 }
1545 }
1546
1547 // ######################
1548 // ## Iterator Support ##
1549 // ######################
1550
1551 override fun iterator(): ChannelIterator<E> = BufferedChannelIterator()
1552
1553 /**
1554 * The key idea is that an iterator is a special receiver type,
1555 * which should be resumed differently to [receive] and [onReceive]
1556 * operations, but can be served as a waiter in a way similar to
1557 * [CancellableContinuation] and [SelectInstance].
1558 *
1559 * Roughly, [hasNext] is a [receive] sibling, while [next] simply
1560 * returns the already retrieved element. From the implementation
1561 * side, [receiveResult] stores the element retrieved by [hasNext]
1562 * (or a special [CHANNEL_CLOSED] token if the channel is closed).
1563 *
1564 * The [invoke] function is a [CancelHandler] implementation,
1565 * which requires knowing the [segment] and the [index] in it
1566 * that specify the location of the stored iterator.
1567 *
1568 * To resume the suspended [hasNext] call, a special [tryResumeHasNext]
1569 * function should be used in a way similar to [CancellableContinuation.tryResume]
1570 * and [SelectInstance.trySelect]. When the channel becomes closed,
1571 * [tryResumeHasNextOnClosedChannel] should be used instead.
1572 */
1573 private inner class BufferedChannelIterator : ChannelIterator<E>, Waiter {
1574 /**
1575 * Stores the element retrieved by [hasNext] or
1576 * a special [CHANNEL_CLOSED] token if this channel is closed.
1577 * If [hasNext] has not been invoked yet, [NO_RECEIVE_RESULT] is stored.
1578 */
1579 private var receiveResult: Any? = NO_RECEIVE_RESULT
1580
1581 /**
1582 * When [hasNext] suspends, this field stores the corresponding
1583 * continuation. The [tryResumeHasNext] and [tryResumeHasNextOnClosedChannel]
1584 * function resume this continuation when the [hasNext] invocation should complete.
1585 */
1586 private var continuation: CancellableContinuationImpl<Boolean>? = null
1587
1588 // `hasNext()` is just a special receive operation.
1589 override suspend fun hasNext(): Boolean =
1590 receiveImpl( // <-- this is an inline function
1591 // Do not create a continuation until it is required;
1592 // it is created later via [onNoWaiterSuspend], if needed.
1593 waiter = null,
1594 // Store the received element in `receiveResult` on successful
1595 // retrieval from the buffer or rendezvous with a suspended sender.
1596 // Also, inform the `BufferedChannel` extensions that
1597 // the synchronization of this receive operation is completed.
1598 onElementRetrieved = { element ->
1599 this.receiveResult = element
1600 true
1601 },
1602 // As no waiter is provided, suspension is impossible.
1603 onSuspend = { _, _, _ -> error("unreachable") },
1604 // Return `false` or throw an exception if the channel is already closed.
1605 onClosed = { onClosedHasNext() },
1606 // If `hasNext()` decides to suspend, the corresponding
1607 // `suspend` function that creates a continuation is called.
1608 // The tail-call optimization is applied here.
1609 onNoWaiterSuspend = { segm, i, r -> return hasNextOnNoWaiterSuspend(segm, i, r) }
1610 )
1611
1612 private fun onClosedHasNext(): Boolean {
1613 this.receiveResult = CHANNEL_CLOSED
1614 val cause = closeCause ?: return false
1615 throw recoverStackTrace(cause)
1616 }
1617
1618 private suspend fun hasNextOnNoWaiterSuspend(
1619 /* The working cell is specified by
1620 the segment and the index in it. */
1621 segment: ChannelSegment<E>,
1622 index: Int,
1623 /* The global index of the cell. */
1624 r: Long
1625 ): Boolean = suspendCancellableCoroutineReusable { cont ->
1626 this.continuation = cont
1627 receiveImplOnNoWaiter( // <-- this is an inline function
1628 segment = segment, index = index, r = r,
1629 waiter = this, // store this iterator as a waiter
1630 // In case of successful element retrieval, store
1631 // it in `receiveResult` and resume the continuation.
1632 // Importantly, the receiver coroutine may be cancelled
1633 // after it is successfully resumed but not dispatched yet.
1634 // In case `onUndeliveredElement` is present, we must
1635 // invoke it in the latter case.
1636 onElementRetrieved = { element ->
1637 this.receiveResult = element
1638 this.continuation = null
1639 cont.resume(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
1640 },
1641 onClosed = { onClosedHasNextNoWaiterSuspend() }
1642 )
1643 }
1644
1645 override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
1646 this.continuation?.invokeOnCancellation(segment, index)
1647 }
1648
1649 private fun onClosedHasNextNoWaiterSuspend() {
1650 // Read the current continuation and clean
1651 // the corresponding field to avoid memory leaks.
1652 val cont = this.continuation!!
1653 this.continuation = null
1654 // Update the `hasNext()` internal result.
1655 this.receiveResult = CHANNEL_CLOSED
1656 // If this channel was closed without exception,
1657 // `hasNext()` should return `false`; otherwise,
1658 // it throws the closing exception.
1659 val cause = closeCause
1660 if (cause == null) {
1661 cont.resume(false)
1662 } else {
1663 cont.resumeWithException(recoverStackTrace(cause, cont))
1664 }
1665 }
1666
1667 @Suppress("UNCHECKED_CAST")
1668 override fun next(): E {
1669 // Read the already received result, or [NO_RECEIVE_RESULT] if [hasNext] has not been invoked yet.
1670 val result = receiveResult
1671 check(result !== NO_RECEIVE_RESULT) { "`hasNext()` has not been invoked" }
1672 receiveResult = NO_RECEIVE_RESULT
1673 // Is this channel closed?
1674 if (result === CHANNEL_CLOSED) throw recoverStackTrace(receiveException)
1675 // Return the element.
1676 return result as E
1677 }
1678
1679 fun tryResumeHasNext(element: E): Boolean {
1680 // Read the current continuation and clean
1681 // the corresponding field to avoid memory leaks.
1682 val cont = this.continuation!!
1683 this.continuation = null
1684 // Store the retrieved element in `receiveResult`.
1685 this.receiveResult = element
1686 // Try to resume this `hasNext()`. Importantly, the receiver coroutine
1687 // may be cancelled after it is successfully resumed but not dispatched yet.
1688 // In case `onUndeliveredElement` is specified, we need to invoke it in the latter case.
1689 return cont.tryResume0(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
1690 }
1691
1692 fun tryResumeHasNextOnClosedChannel() {
1693 // Read the current continuation and clean
1694 // the corresponding field to avoid memory leaks.
1695 val cont = this.continuation!!
1696 this.continuation = null
1697 // Update the `hasNext()` internal result and inform
1698 // `BufferedChannel` extensions that synchronization
1699 // of this receive operation is completed.
1700 this.receiveResult = CHANNEL_CLOSED
1701 // If this channel was closed without exception,
1702 // `hasNext()` should return `false`; otherwise,
1703 // it throws the closing exception.
1704 val cause = closeCause
1705 if (cause == null) {
1706 cont.resume(false)
1707 } else {
1708 cont.resumeWithException(recoverStackTrace(cause, cont))
1709 }
1710 }
1711 }
1712
1713 // ##############################
1714 // ## Closing and Cancellation ##
1715 // ##############################
1716
1717 /**
1718 * Store the cause of closing this channel, either via [close] or [cancel] call.
1719 * The closing cause can be set only once.
1720 */
1721 private val _closeCause = atomic<Any?>(NO_CLOSE_CAUSE)
1722 // Should be called only if this channel is closed or cancelled.
1723 protected val closeCause get() = _closeCause.value as Throwable?
1724
1725 /** Returns the closing cause if it is non-null, or [ClosedSendChannelException] otherwise. */
1726 protected val sendException get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
1727
1728 /** Returns the closing cause if it is non-null, or [ClosedReceiveChannelException] otherwise. */
1729 private val receiveException get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1730
1731 /**
1732 Stores the closed handler installed by [invokeOnClose].
1733 To synchronize [invokeOnClose] and [close], two additional
1734 marker states, [CLOSE_HANDLER_INVOKED] and [CLOSE_HANDLER_CLOSED]
1735 are used. The resulting state diagram is presented below.
1736
1737 +------+ install handler +---------+ close(..) +---------+
1738 | null |------------------>| handler |------------>| INVOKED |
1739 +------+ +---------+ +---------+
1740 |
1741 | close(..) +--------+
1742 +----------->| CLOSED |
1743 +--------+
1744 */
1745 private val closeHandler = atomic<Any?>(null)
1746
1747 /**
1748 * Invoked when channel is closed as the last action of [close] invocation.
1749 * This method should be idempotent and can be called multiple times.
1750 */
1751 protected open fun onClosedIdempotent() {}
1752
1753 override fun close(cause: Throwable?): Boolean =
1754 closeOrCancelImpl(cause, cancel = false)
1755
1756 @Suppress("OVERRIDE_DEPRECATION")
1757 final override fun cancel(cause: Throwable?): Boolean = cancelImpl(cause)
1758
1759 @Suppress("OVERRIDE_DEPRECATION")
1760 final override fun cancel() { cancelImpl(null) }
1761
1762 final override fun cancel(cause: CancellationException?) { cancelImpl(cause) }
1763
1764 internal open fun cancelImpl(cause: Throwable?): Boolean =
1765 closeOrCancelImpl(cause ?: CancellationException("Channel was cancelled"), cancel = true)
1766
1767 /**
1768 * This is a common implementation for [close] and [cancel]. It first tries
1769 * to install the specified cause; the invocation that successfully installs
1770 * the cause returns `true` as a results of this function, while all further
1771 * [close] and [cancel] calls return `false`.
1772 *
1773 * After the closing/cancellation cause is installed, the channel should be marked
1774 * as closed or cancelled, which bounds further `send(e)`-s to fails.
1775 *
1776 * Then, [completeCloseOrCancel] is called, which cancels waiting `receive()`
1777 * requests ([cancelSuspendedReceiveRequests]) and removes unprocessed elements
1778 * ([removeUnprocessedElements]) in case this channel is cancelled.
1779 *
1780 * Finally, if this [closeOrCancelImpl] has installed the cause, therefore,
1781 * has closed the channel, [closeHandler] and [onClosedIdempotent] should be invoked.
1782 */
1783 protected open fun closeOrCancelImpl(cause: Throwable?, cancel: Boolean): Boolean {
1784 // If this is a `cancel(..)` invocation, set a bit that the cancellation
1785 // has been started. This is crucial for ensuring linearizability,
1786 // when concurrent `close(..)` and `isClosedFor[Send,Receive]` operations
1787 // help this `cancel(..)`.
1788 if (cancel) markCancellationStarted()
1789 // Try to install the specified cause. On success, this invocation will
1790 // return `true` as a result; otherwise, it will complete with `false`.
1791 val closedByThisOperation = _closeCause.compareAndSet(NO_CLOSE_CAUSE, cause)
1792 // Mark this channel as closed or cancelled, depending on this operation type.
1793 if (cancel) markCancelled() else markClosed()
1794 // Complete the closing or cancellation procedure.
1795 completeCloseOrCancel()
1796 // Finally, if this operation has installed the cause,
1797 // it should invoke the close handlers.
1798 return closedByThisOperation.also {
1799 onClosedIdempotent()
1800 if (it) invokeCloseHandler()
1801 }
1802 }
1803
1804 /**
1805 * Invokes the installed close handler,
1806 * updating the [closeHandler] state correspondingly.
1807 */
1808 private fun invokeCloseHandler() {
1809 val closeHandler = closeHandler.getAndUpdate {
1810 if (it === null) {
1811 // Inform concurrent `invokeOnClose`
1812 // that this channel is already closed.
1813 CLOSE_HANDLER_CLOSED
1814 } else {
1815 // Replace the handler with a special
1816 // `INVOKED` marker to avoid memory leaks.
1817 CLOSE_HANDLER_INVOKED
1818 }
1819 } ?: return // no handler was installed, finish.
1820 // Invoke the handler.
1821 @Suppress("UNCHECKED_CAST")
1822 closeHandler as (cause: Throwable?) -> Unit
1823 closeHandler(closeCause)
1824 }
1825
1826 override fun invokeOnClose(handler: (cause: Throwable?) -> Unit) {
1827 // Try to install the handler, finishing on success.
1828 if (closeHandler.compareAndSet(null, handler)) {
1829 // Handler has been successfully set, finish the operation.
1830 return
1831 }
1832 // Either another handler is already set, or this channel is closed.
1833 // In the latter case, the current handler should be invoked.
1834 // However, the implementation must ensure that at most one
1835 // handler is called, throwing an `IllegalStateException`
1836 // if another close handler has been invoked.
1837 closeHandler.loop { cur ->
1838 when {
1839 cur === CLOSE_HANDLER_CLOSED -> {
1840 // Try to update the state from `CLOSED` to `INVOKED`.
1841 // This is crucial to guarantee that at most one handler can be called.
1842 // On success, invoke the handler and finish.
1843 if (closeHandler.compareAndSet(CLOSE_HANDLER_CLOSED, CLOSE_HANDLER_INVOKED)) {
1844 handler(closeCause)
1845 return
1846 }
1847 }
1848 cur === CLOSE_HANDLER_INVOKED -> error("Another handler was already registered and successfully invoked")
1849 else -> error("Another handler is already registered: $cur")
1850 }
1851 }
1852 }
1853
1854 /**
1855 * Marks this channel as closed.
1856 * In case [cancelImpl] has already been invoked,
1857 * and this channel is marked with [CLOSE_STATUS_CANCELLATION_STARTED],
1858 * this function marks the channel as cancelled.
1859 *
1860 * All operation that notice this channel in the closed state,
1861 * must help to complete the closing via [completeCloseOrCancel].
1862 */
1863 private fun markClosed(): Unit =
1864 sendersAndCloseStatus.update { cur ->
1865 when (cur.sendersCloseStatus) {
1866 CLOSE_STATUS_ACTIVE -> // the channel is neither closed nor cancelled
1867 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CLOSED)
1868 CLOSE_STATUS_CANCELLATION_STARTED -> // the channel is going to be cancelled
1869 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLED)
1870 else -> return // the channel is already marked as closed or cancelled.
1871 }
1872 }
1873
1874 /**
1875 * Marks this channel as cancelled.
1876 *
1877 * All operation that notice this channel in the cancelled state,
1878 * must help to complete the cancellation via [completeCloseOrCancel].
1879 */
1880 private fun markCancelled(): Unit =
1881 sendersAndCloseStatus.update { cur ->
1882 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLED)
1883 }
1884
1885 /**
1886 * When the cancellation procedure starts, it is critical
1887 * to mark the closing status correspondingly. Thus, other
1888 * operations, which may help to complete the cancellation,
1889 * always correctly update the status to `CANCELLED`.
1890 */
1891 private fun markCancellationStarted(): Unit =
1892 sendersAndCloseStatus.update { cur ->
1893 if (cur.sendersCloseStatus == CLOSE_STATUS_ACTIVE)
1894 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLATION_STARTED)
1895 else return // this channel is already closed or cancelled
1896 }
1897
1898 /**
1899 * Completes the started [close] or [cancel] procedure.
1900 */
1901 private fun completeCloseOrCancel() {
1902 isClosedForSend // must finish the started close/cancel if one is detected.
1903 }
1904
1905 protected open val isConflatedDropOldest get() = false
1906
1907 /**
1908 * Completes the channel closing procedure.
1909 */
1910 private fun completeClose(sendersCur: Long): ChannelSegment<E> {
1911 // Close the linked list for further segment addition,
1912 // obtaining the last segment in the data structure.
1913 val lastSegment = closeLinkedList()
1914 // In the conflated channel implementation (with the DROP_OLDEST
1915 // elements conflation strategy), it is critical to mark all empty
1916 // cells as closed to prevent in-progress `send(e)`-s, which have not
1917 // put their elements yet, completions after this channel is closed.
1918 // Otherwise, it is possible for a `send(e)` to put an element when
1919 // the buffer is already full, while a concurrent receiver may extract
1920 // the oldest element. When the channel is not closed, we can linearize
1921 // this `receive()` before the `send(e)`, but after the channel is closed,
1922 // `send(e)` must fails. Marking all unprocessed cells as `CLOSED` solves the issue.
1923 if (isConflatedDropOldest) {
1924 val lastBufferedCellGlobalIndex = markAllEmptyCellsAsClosed(lastSegment)
1925 if (lastBufferedCellGlobalIndex != -1L)
1926 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(lastBufferedCellGlobalIndex)
1927 }
1928 // Resume waiting `receive()` requests,
1929 // informing them that the channel is closed.
1930 cancelSuspendedReceiveRequests(lastSegment, sendersCur)
1931 // Return the last segment in the linked list as a result
1932 // of this function; we need it in `completeCancel(..)`.
1933 return lastSegment
1934 }
1935
1936 /**
1937 * Completes the channel cancellation procedure.
1938 */
1939 private fun completeCancel(sendersCur: Long) {
1940 // First, ensure that this channel is closed,
1941 // obtaining the last segment in the linked list.
1942 val lastSegment = completeClose(sendersCur)
1943 // Cancel suspended `send(e)` requests and
1944 // remove buffered elements in the reverse order.
1945 removeUnprocessedElements(lastSegment)
1946 }
1947
1948 /**
1949 * Closes the underlying linked list of segments for further segment addition.
1950 */
1951 private fun closeLinkedList(): ChannelSegment<E> {
1952 // Choose the last segment.
1953 var lastSegment = bufferEndSegment.value
1954 sendSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
1955 receiveSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
1956 // Close the linked list of segment for new segment addition
1957 // and return the last segment in the linked list.
1958 return lastSegment.close()
1959 }
1960
1961 /**
1962 * This function marks all empty cells, in the `null` and [IN_BUFFER] state,
1963 * as closed. Notably, it processes the cells from right to left, and finishes
1964 * immediately when the processing cell is already covered by `receive()` or
1965 * contains a buffered elements ([BUFFERED] state).
1966 *
1967 * This function returns the global index of the last buffered element,
1968 * or `-1` if this channel does not contain buffered elements.
1969 */
1970 private fun markAllEmptyCellsAsClosed(lastSegment: ChannelSegment<E>): Long {
1971 // Process the cells in reverse order, from right to left.
1972 var segment = lastSegment
1973 while (true) {
1974 for (index in SEGMENT_SIZE - 1 downTo 0) {
1975 // Is this cell already covered by `receive()`?
1976 val globalIndex = segment.id * SEGMENT_SIZE + index
1977 if (globalIndex < receiversCounter) return -1
1978 // Process the cell `segment[index]`.
1979 cell_update@ while (true) {
1980 val state = segment.getState(index)
1981 when {
1982 // The cell is empty.
1983 state === null || state === IN_BUFFER -> {
1984 // Inform a possibly upcoming sender that this channel is already closed.
1985 if (segment.casState(index, state, CHANNEL_CLOSED)) {
1986 segment.onSlotCleaned()
1987 break@cell_update
1988 }
1989 }
1990 // The cell stores a buffered element.
1991 state === BUFFERED -> return globalIndex
1992 // Skip this cell if it is not empty and does not store a buffered element.
1993 else -> break@cell_update
1994 }
1995 }
1996 }
1997 // Process the next segment, finishing if the linked list ends.
1998 segment = segment.prev ?: return -1
1999 }
2000 }
2001
2002 /**
2003 * Cancels suspended `send(e)` requests and removes buffered elements
2004 * starting from the last cell in the specified [lastSegment] (it must
2005 * be the physical tail of the underlying linked list) and updating
2006 * the cells in reverse order.
2007 */
2008 private fun removeUnprocessedElements(lastSegment: ChannelSegment<E>) {
2009 // Read the `onUndeliveredElement` lambda at once. In case it
2010 // throws an exception, this exception is handled and stored in
2011 // the variable below. If multiple exceptions are thrown, the first
2012 // one is stored in the variable, while the others are suppressed.
2013 val onUndeliveredElement = onUndeliveredElement
2014 var undeliveredElementException: UndeliveredElementException? = null // first cancel exception, others suppressed
2015 // To perform synchronization correctly, it is critical to
2016 // process the cells in reverse order, from right to left.
2017 // However, according to the API, suspended senders should
2018 // be cancelled in the order of their suspension. Therefore,
2019 // we need to collect all of them and cancel in the reverse
2020 // order after that.
2021 var suspendedSenders = InlineList<Waiter>()
2022 var segment = lastSegment
2023 process_segments@ while (true) {
2024 for (index in SEGMENT_SIZE - 1 downTo 0) {
2025 // Process the cell `segment[index]`.
2026 val globalIndex = segment.id * SEGMENT_SIZE + index
2027 // Update the cell state.
2028 update_cell@ while (true) {
2029 // Read the current state of the cell.
2030 val state = segment.getState(index)
2031 when {
2032 // The cell is already processed by a receiver.
2033 state === DONE_RCV -> break@process_segments
2034 // The cell stores a buffered element.
2035 state === BUFFERED -> {
2036 // Is the cell already covered by a receiver?
2037 if (globalIndex < receiversCounter) break@process_segments
2038 // Update the cell state to `CHANNEL_CLOSED`.
2039 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2040 // If `onUndeliveredElement` lambda is non-null, call it.
2041 if (onUndeliveredElement != null) {
2042 val element = segment.getElement(index)
2043 undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(element, undeliveredElementException)
2044 }
2045 // Clean the element field and inform the segment
2046 // that the slot is cleaned to avoid memory leaks.
2047 segment.cleanElement(index)
2048 segment.onSlotCleaned()
2049 break@update_cell
2050 }
2051 }
2052 // The cell is empty.
2053 state === IN_BUFFER || state === null -> {
2054 // Update the cell state to `CHANNEL_CLOSED`.
2055 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2056 // Inform the segment that the slot is cleaned to avoid memory leaks.
2057 segment.onSlotCleaned()
2058 break@update_cell
2059 }
2060 }
2061 // The cell stores a suspended waiter.
2062 state is Waiter || state is WaiterEB -> {
2063 // Is the cell already covered by a receiver?
2064 if (globalIndex < receiversCounter) break@process_segments
2065 // Obtain the sender.
2066 val sender: Waiter = if (state is WaiterEB) state.waiter
2067 else state as Waiter
2068 // Update the cell state to `CHANNEL_CLOSED`.
2069 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2070 // If `onUndeliveredElement` lambda is non-null, call it.
2071 if (onUndeliveredElement != null) {
2072 val element = segment.getElement(index)
2073 undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(element, undeliveredElementException)
2074 }
2075 // Save the sender for further cancellation.
2076 suspendedSenders += sender
2077 // Clean the element field and inform the segment
2078 // that the slot is cleaned to avoid memory leaks.
2079 segment.cleanElement(index)
2080 segment.onSlotCleaned()
2081 break@update_cell
2082 }
2083 }
2084 // A concurrent receiver is resuming a suspended sender.
2085 // As the cell is covered by a receiver, finish immediately.
2086 state === RESUMING_BY_EB || state === RESUMING_BY_RCV -> break@process_segments
2087 // A concurrent `expandBuffer()` is resuming a suspended sender.
2088 // Wait in a spin-loop until the cell state changes.
2089 state === RESUMING_BY_EB -> continue@update_cell
2090 else -> break@update_cell
2091 }
2092 }
2093 }
2094 // Process the previous segment.
2095 segment = segment.prev ?: break
2096 }
2097 // Cancel suspended senders in their order of addition to this channel.
2098 suspendedSenders.forEachReversed { it.resumeSenderOnCancelledChannel() }
2099 // Throw `UndeliveredElementException` at the end if there was one.
2100 undeliveredElementException?.let { throw it }
2101 }
2102
2103 /**
2104 * Cancels suspended `receive` requests from the end to the beginning,
2105 * also moving empty cells to the `CHANNEL_CLOSED` state.
2106 */
2107 private fun cancelSuspendedReceiveRequests(lastSegment: ChannelSegment<E>, sendersCounter: Long) {
2108 // To perform synchronization correctly, it is critical to
2109 // extract suspended requests in the reverse order,
2110 // from the end to the beginning.
2111 // However, according to the API, they should be cancelled
2112 // in the order of their suspension. Therefore, we need to
2113 // collect the suspended requests first, cancelling them
2114 // in the reverse order after that.
2115 var suspendedReceivers = InlineList<Waiter>()
2116 var segment: ChannelSegment<E>? = lastSegment
2117 process_segments@ while (segment != null) {
2118 for (index in SEGMENT_SIZE - 1 downTo 0) {
2119 // Is the cell already covered by a sender? Finish immediately in this case.
2120 if (segment.id * SEGMENT_SIZE + index < sendersCounter) break@process_segments
2121 // Try to move the cell state to `CHANNEL_CLOSED`.
2122 cell_update@ while (true) {
2123 val state = segment.getState(index)
2124 when {
2125 state === null || state === IN_BUFFER -> {
2126 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2127 segment.onSlotCleaned()
2128 break@cell_update
2129 }
2130 }
2131 state is WaiterEB -> {
2132 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2133 suspendedReceivers += state.waiter // save for cancellation.
2134 segment.onCancelledRequest(index = index, receiver = true)
2135 break@cell_update
2136 }
2137 }
2138 state is Waiter -> {
2139 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2140 suspendedReceivers += state // save for cancellation.
2141 segment.onCancelledRequest(index = index, receiver = true)
2142 break@cell_update
2143 }
2144 }
2145 else -> break@cell_update // nothing to cancel.
2146 }
2147 }
2148 }
2149 // Process the previous segment.
2150 segment = segment.prev
2151 }
2152 // Cancel the suspended requests in their order of addition to this channel.
2153 suspendedReceivers.forEachReversed { it.resumeReceiverOnClosedChannel() }
2154 }
2155
2156 /**
2157 * Resumes this receiver because this channel is closed.
2158 * This function does not take any effect if the operation has already been resumed or cancelled.
2159 */
2160 private fun Waiter.resumeReceiverOnClosedChannel() = resumeWaiterOnClosedChannel(receiver = true)
2161
2162 /**
2163 * Resumes this sender because this channel is cancelled.
2164 * This function does not take any effect if the operation has already been resumed or cancelled.
2165 */
2166 private fun Waiter.resumeSenderOnCancelledChannel() = resumeWaiterOnClosedChannel(receiver = false)
2167
2168 private fun Waiter.resumeWaiterOnClosedChannel(receiver: Boolean) {
2169 when (this) {
2170 is SendBroadcast -> cont.resume(false)
2171 is CancellableContinuation<*> -> resumeWithException(if (receiver) receiveException else sendException)
2172 is ReceiveCatching<*> -> cont.resume(closed(closeCause))
2173 is BufferedChannel<*>.BufferedChannelIterator -> tryResumeHasNextOnClosedChannel()
2174 is SelectInstance<*> -> trySelect(this@BufferedChannel, CHANNEL_CLOSED)
2175 else -> error("Unexpected waiter: $this")
2176 }
2177 }
2178
2179 @ExperimentalCoroutinesApi
2180 override val isClosedForSend: Boolean
2181 get() = sendersAndCloseStatus.value.isClosedForSend0
2182
2183 private val Long.isClosedForSend0 get() =
2184 isClosed(this, isClosedForReceive = false)
2185
2186 @ExperimentalCoroutinesApi
2187 override val isClosedForReceive: Boolean
2188 get() = sendersAndCloseStatus.value.isClosedForReceive0
2189
2190 private val Long.isClosedForReceive0 get() =
2191 isClosed(this, isClosedForReceive = true)
2192
2193 private fun isClosed(
2194 sendersAndCloseStatusCur: Long,
2195 isClosedForReceive: Boolean
2196 ) = when (sendersAndCloseStatusCur.sendersCloseStatus) {
2197 // This channel is active and has not been closed.
2198 CLOSE_STATUS_ACTIVE -> false
2199 // The cancellation procedure has been started but
2200 // not linearized yet, so this channel should be
2201 // considered as active.
2202 CLOSE_STATUS_CANCELLATION_STARTED -> false
2203 // This channel has been successfully closed.
2204 // Help to complete the closing procedure to
2205 // guarantee linearizability, and return `true`
2206 // for senders or the flag whether there still
2207 // exist elements to retrieve for receivers.
2208 CLOSE_STATUS_CLOSED -> {
2209 completeClose(sendersAndCloseStatusCur.sendersCounter)
2210 // When `isClosedForReceive` is `false`, always return `true`.
2211 // Otherwise, it is possible that the channel is closed but
2212 // still has elements to retrieve.
2213 if (isClosedForReceive) !hasElements() else true
2214 }
2215 // This channel has been successfully cancelled.
2216 // Help to complete the cancellation procedure to
2217 // guarantee linearizability and return `true`.
2218 CLOSE_STATUS_CANCELLED -> {
2219 completeCancel(sendersAndCloseStatusCur.sendersCounter)
2220 true
2221 }
2222 else -> error("unexpected close status: ${sendersAndCloseStatusCur.sendersCloseStatus}")
2223 }
2224
2225 @ExperimentalCoroutinesApi
2226 override val isEmpty: Boolean get() {
2227 // This function should return `false` if
2228 // this channel is closed for `receive`.
2229 if (isClosedForReceive) return false
2230 // Does this channel has elements to retrieve?
2231 if (hasElements()) return false
2232 // This channel does not have elements to retrieve;
2233 // Check that it is still not closed for `receive`.
2234 return !isClosedForReceive
2235 }
2236
2237 /**
2238 * Checks whether this channel contains elements to retrieve.
2239 * Unfortunately, simply comparing the counters is insufficient,
2240 * as some cells can be in the `INTERRUPTED` state due to cancellation.
2241 * This function tries to find the first "alive" element,
2242 * updating the `receivers` counter to skip empty cells.
2243 *
2244 * The implementation is similar to `receive()`.
2245 */
2246 internal fun hasElements(): Boolean {
2247 while (true) {
2248 // Read the segment before obtaining the `receivers` counter value.
2249 var segment = receiveSegment.value
2250 // Obtains the `receivers` and `senders` counter values.
2251 val r = receiversCounter
2252 val s = sendersCounter
2253 // Is there a chance that this channel has elements?
2254 if (s <= r) return false // no elements
2255 // The `r`-th cell is covered by a sender; check whether it contains an element.
2256 // First, try to find the required segment if the initially
2257 // obtained segment (in the beginning of this function) has lower id.
2258 val id = r / SEGMENT_SIZE
2259 if (segment.id != id) {
2260 // Try to find the required segment.
2261 segment = findSegmentReceive(id, segment) ?:
2262 // The required segment has not been found. Either it has already
2263 // been removed, or the underlying linked list is already closed
2264 // for segment additions. In the latter case, the channel is closed
2265 // and does not contain elements, so this operation returns `false`.
2266 // Otherwise, if the required segment is removed, the operation restarts.
2267 if (receiveSegment.value.id < id) return false else continue
2268 }
2269 segment.cleanPrev() // all the previous segments are no longer needed.
2270 // Does the `r`-th cell contain waiting sender or buffered element?
2271 val i = (r % SEGMENT_SIZE).toInt()
2272 if (isCellNonEmpty(segment, i, r)) return true
2273 // The cell is empty. Update `receivers` counter and try again.
2274 receivers.compareAndSet(r, r + 1) // if this CAS fails, the counter has already been updated.
2275 }
2276 }
2277
2278 /**
2279 * Checks whether this cell contains a buffered element or a waiting sender,
2280 * returning `true` in this case. Otherwise, if this cell is empty
2281 * (due to waiter cancellation, cell poisoning, or channel closing),
2282 * this function returns `false`.
2283 *
2284 * Notably, this function must be called only if the cell is covered by a sender.
2285 */
2286 private fun isCellNonEmpty(
2287 segment: ChannelSegment<E>,
2288 index: Int,
2289 globalIndex: Long
2290 ): Boolean {
2291 // The logic is similar to `updateCellReceive` with the only difference
2292 // that this function neither changes the cell state nor retrieves the element.
2293 while (true) {
2294 // Read the current cell state.
2295 val state = segment.getState(index)
2296 when {
2297 // The cell is empty but a sender is coming.
2298 state === null || state === IN_BUFFER -> {
2299 // Poison the cell to ensure correctness.
2300 if (segment.casState(index, state, POISONED)) {
2301 // When the cell becomes poisoned, it is essentially
2302 // the same as storing an already cancelled receiver.
2303 // Thus, the `expandBuffer()` procedure should be invoked.
2304 expandBuffer()
2305 return false
2306 }
2307 }
2308 // The cell stores a buffered element.
2309 state === BUFFERED -> return true
2310 // The cell stores an interrupted sender.
2311 state === INTERRUPTED_SEND -> return false
2312 // This channel is already closed.
2313 state === CHANNEL_CLOSED -> return false
2314 // The cell is already processed
2315 // by a concurrent receiver.
2316 state === DONE_RCV -> return false
2317 // The cell is already poisoned
2318 // by a concurrent receiver.
2319 state === POISONED -> return false
2320 // A concurrent `expandBuffer()` is resuming
2321 // a suspended sender. This function is eligible
2322 // to linearize before the buffer expansion procedure.
2323 state === RESUMING_BY_EB -> return true
2324 // A concurrent receiver is resuming
2325 // a suspended sender. The element
2326 // is no longer available for retrieval.
2327 state === RESUMING_BY_RCV -> return false
2328 // The cell stores a suspended request.
2329 // However, it is possible that this request
2330 // is receiver if the cell is covered by both
2331 // send and receive operations.
2332 // In case the cell is already covered by
2333 // a receiver, the element is no longer
2334 // available for retrieval, and this function
2335 // return `false`. Otherwise, it is guaranteed
2336 // that the suspended request is sender, so
2337 // this function returns `true`.
2338 else -> return globalIndex == receiversCounter
2339 }
2340 }
2341 }
2342
2343 // #######################
2344 // # Segments Management #
2345 // #######################
2346
2347 /**
2348 * Finds the segment with the specified [id] starting by the [startFrom]
2349 * segment and following the [ChannelSegment.next] references. In case
2350 * the required segment has not been created yet, this function attempts
2351 * to add it to the underlying linked list. Finally, it updates [sendSegment]
2352 * to the found segment if its [ChannelSegment.id] is greater than the one
2353 * of the already stored segment.
2354 *
2355 * In case the requested segment is already removed, or if it should be allocated
2356 * but the linked list structure is closed for new segments addition, this function
2357 * returns `null`. The implementation also efficiently skips a sequence of removed
2358 * segments, updating the counter value in [sendersAndCloseStatus] correspondingly.
2359 */
2360 private fun findSegmentSend(id: Long, startFrom: ChannelSegment<E>): ChannelSegment<E>? {
2361 return sendSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2362 if (it.isClosed) {
2363 // The required segment has not been found and new segments
2364 // cannot be added, as the linked listed in already added.
2365 // This channel is already closed or cancelled; help to complete
2366 // the closing or cancellation procedure.
2367 completeCloseOrCancel()
2368 // Clean the `prev` reference of the provided segment
2369 // if all the previous cells are already covered by senders.
2370 // It is important to clean the `prev` reference only in
2371 // this case, as the closing/cancellation procedure may
2372 // need correct value to traverse the linked list from right to left.
2373 if (startFrom.id * SEGMENT_SIZE < receiversCounter) startFrom.cleanPrev()
2374 // As the required segment is not found and cannot be allocated, return `null`.
2375 null
2376 } else {
2377 // Get the found segment.
2378 val segment = it.segment
2379 // Is the required segment removed?
2380 if (segment.id > id) {
2381 // The required segment has been removed; `segment` is the first
2382 // segment with `id` not lower than the required one.
2383 // Skip the sequence of removed cells in O(1).
2384 updateSendersCounterIfLower(segment.id * SEGMENT_SIZE)
2385 // Clean the `prev` reference of the provided segment
2386 // if all the previous cells are already covered by senders.
2387 // It is important to clean the `prev` reference only in
2388 // this case, as the closing/cancellation procedure may
2389 // need correct value to traverse the linked list from right to left.
2390 if (segment.id * SEGMENT_SIZE < receiversCounter) segment.cleanPrev()
2391 // As the required segment is not found and cannot be allocated, return `null`.
2392 null
2393 } else {
2394 assert { segment.id == id }
2395 // The required segment has been found; return it!
2396 segment
2397 }
2398 }
2399 }
2400 }
2401
2402 /**
2403 * Finds the segment with the specified [id] starting by the [startFrom]
2404 * segment and following the [ChannelSegment.next] references. In case
2405 * the required segment has not been created yet, this function attempts
2406 * to add it to the underlying linked list. Finally, it updates [receiveSegment]
2407 * to the found segment if its [ChannelSegment.id] is greater than the one
2408 * of the already stored segment.
2409 *
2410 * In case the requested segment is already removed, or if it should be allocated
2411 * but the linked list structure is closed for new segments addition, this function
2412 * returns `null`. The implementation also efficiently skips a sequence of removed
2413 * segments, updating the [receivers] counter correspondingly.
2414 */
2415 private fun findSegmentReceive(id: Long, startFrom: ChannelSegment<E>): ChannelSegment<E>? =
2416 receiveSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2417 if (it.isClosed) {
2418 // The required segment has not been found and new segments
2419 // cannot be added, as the linked listed in already added.
2420 // This channel is already closed or cancelled; help to complete
2421 // the closing or cancellation procedure.
2422 completeCloseOrCancel()
2423 // Clean the `prev` reference of the provided segment
2424 // if all the previous cells are already covered by senders.
2425 // It is important to clean the `prev` reference only in
2426 // this case, as the closing/cancellation procedure may
2427 // need correct value to traverse the linked list from right to left.
2428 if (startFrom.id * SEGMENT_SIZE < sendersCounter) startFrom.cleanPrev()
2429 // As the required segment is not found and cannot be allocated, return `null`.
2430 null
2431 } else {
2432 // Get the found segment.
2433 val segment = it.segment
2434 // Advance the `bufferEnd` segment if required.
2435 if (!isRendezvousOrUnlimited && id <= bufferEndCounter / SEGMENT_SIZE) {
2436 bufferEndSegment.moveForward(segment)
2437 }
2438 // Is the required segment removed?
2439 if (segment.id > id) {
2440 // The required segment has been removed; `segment` is the first
2441 // segment with `id` not lower than the required one.
2442 // Skip the sequence of removed cells in O(1).
2443 updateReceiversCounterIfLower(segment.id * SEGMENT_SIZE)
2444 // Clean the `prev` reference of the provided segment
2445 // if all the previous cells are already covered by senders.
2446 // It is important to clean the `prev` reference only in
2447 // this case, as the closing/cancellation procedure may
2448 // need correct value to traverse the linked list from right to left.
2449 if (segment.id * SEGMENT_SIZE < sendersCounter) segment.cleanPrev()
2450 // As the required segment is already removed, return `null`.
2451 null
2452 } else {
2453 assert { segment.id == id }
2454 // The required segment has been found; return it!
2455 segment
2456 }
2457 }
2458 }
2459
2460 /**
2461 * Importantly, when this function does not find the requested segment,
2462 * it always updates the number of completed `expandBuffer()` attempts.
2463 */
2464 private fun findSegmentBufferEnd(id: Long, startFrom: ChannelSegment<E>, currentBufferEndCounter: Long): ChannelSegment<E>? =
2465 bufferEndSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2466 if (it.isClosed) {
2467 // The required segment has not been found and new segments
2468 // cannot be added, as the linked listed in already added.
2469 // This channel is already closed or cancelled; help to complete
2470 // the closing or cancellation procedure.
2471 completeCloseOrCancel()
2472 // Update `bufferEndSegment` to the last segment
2473 // in the linked list to avoid memory leaks.
2474 moveSegmentBufferEndToSpecifiedOrLast(id, startFrom)
2475 // When this function does not find the requested segment,
2476 // it should update the number of completed `expandBuffer()` attempts.
2477 incCompletedExpandBufferAttempts()
2478 null
2479 } else {
2480 // Get the found segment.
2481 val segment = it.segment
2482 // Is the required segment removed?
2483 if (segment.id > id) {
2484 // The required segment has been removed; `segment` is the first segment
2485 // with `id` not lower than the required one.
2486 // Try to skip the sequence of removed cells in O(1) by increasing the `bufferEnd` counter.
2487 // Importantly, when this function does not find the requested segment,
2488 // it should update the number of completed `expandBuffer()` attempts.
2489 if (bufferEnd.compareAndSet(currentBufferEndCounter + 1, segment.id * SEGMENT_SIZE)) {
2490 incCompletedExpandBufferAttempts(segment.id * SEGMENT_SIZE - currentBufferEndCounter)
2491 } else {
2492 incCompletedExpandBufferAttempts()
2493 }
2494 // As the required segment is already removed, return `null`.
2495 null
2496 } else {
2497 assert { segment.id == id }
2498 // The required segment has been found; return it!
2499 segment
2500 }
2501 }
2502 }
2503
2504 /**
2505 * Updates [bufferEndSegment] to the one with the specified [id] or
2506 * to the last existing segment, if the required segment is not yet created.
2507 *
2508 * Unlike [findSegmentBufferEnd], this function does not allocate new segments.
2509 */
2510 private fun moveSegmentBufferEndToSpecifiedOrLast(id: Long, startFrom: ChannelSegment<E>) {
2511 // Start searching the required segment from the specified one.
2512 var segment: ChannelSegment<E> = startFrom
2513 while (segment.id < id) {
2514 segment = segment.next ?: break
2515 }
2516 // Skip all removed segments and try to update `bufferEndSegment`
2517 // to the first non-removed one. This part should succeed eventually,
2518 // as the tail segment is never removed.
2519 while (true) {
2520 while (segment.isRemoved) {
2521 segment = segment.next ?: break
2522 }
2523 // Try to update `bufferEndSegment`. On failure,
2524 // the found segment is already removed, so it
2525 // should be skipped.
2526 if (bufferEndSegment.moveForward(segment)) return
2527 }
2528 }
2529
2530 /**
2531 * Updates the `senders` counter if its value
2532 * is lower that the specified one.
2533 *
2534 * Senders use this function to efficiently skip
2535 * a sequence of cancelled receivers.
2536 */
2537 private fun updateSendersCounterIfLower(value: Long): Unit =
2538 sendersAndCloseStatus.loop { cur ->
2539 val curCounter = cur.sendersCounter
2540 if (curCounter >= value) return
2541 val update = constructSendersAndCloseStatus(curCounter, cur.sendersCloseStatus)
2542 if (sendersAndCloseStatus.compareAndSet(cur, update)) return
2543 }
2544
2545 /**
2546 * Updates the `receivers` counter if its value
2547 * is lower that the specified one.
2548 *
2549 * Receivers use this function to efficiently skip
2550 * a sequence of cancelled senders.
2551 */
2552 private fun updateReceiversCounterIfLower(value: Long): Unit =
2553 receivers.loop { cur ->
2554 if (cur >= value) return
2555 if (receivers.compareAndSet(cur, value)) return
2556 }
2557
2558 // ###################
2559 // # Debug Functions #
2560 // ###################
2561
2562 @Suppress("ConvertTwoComparisonsToRangeCheck")
2563 override fun toString(): String {
2564 val sb = StringBuilder()
2565 // Append the close status
2566 when (sendersAndCloseStatus.value.sendersCloseStatus) {
2567 CLOSE_STATUS_CLOSED -> sb.append("closed,")
2568 CLOSE_STATUS_CANCELLED -> sb.append("cancelled,")
2569 }
2570 // Append the buffer capacity
2571 sb.append("capacity=$capacity,")
2572 // Append the data
2573 sb.append("data=[")
2574 val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2575 .filter { it !== NULL_SEGMENT }
2576 .minBy { it.id }
2577 val r = receiversCounter
2578 val s = sendersCounter
2579 var segment = firstSegment
2580 append_elements@ while (true) {
2581 process_cell@ for (i in 0 until SEGMENT_SIZE) {
2582 val globalCellIndex = segment.id * SEGMENT_SIZE + i
2583 if (globalCellIndex >= s && globalCellIndex >= r) break@append_elements
2584 val cellState = segment.getState(i)
2585 val element = segment.getElement(i)
2586 val cellStateString = when (cellState) {
2587 is CancellableContinuation<*> -> {
2588 when {
2589 globalCellIndex < r && globalCellIndex >= s -> "receive"
2590 globalCellIndex < s && globalCellIndex >= r -> "send"
2591 else -> "cont"
2592 }
2593 }
2594 is SelectInstance<*> -> {
2595 when {
2596 globalCellIndex < r && globalCellIndex >= s -> "onReceive"
2597 globalCellIndex < s && globalCellIndex >= r -> "onSend"
2598 else -> "select"
2599 }
2600 }
2601 is ReceiveCatching<*> -> "receiveCatching"
2602 is SendBroadcast -> "sendBroadcast"
2603 is WaiterEB -> "EB($cellState)"
2604 RESUMING_BY_RCV, RESUMING_BY_EB -> "resuming_sender"
2605 null, IN_BUFFER, DONE_RCV, POISONED, INTERRUPTED_RCV, INTERRUPTED_SEND, CHANNEL_CLOSED -> continue@process_cell
2606 else -> cellState.toString() // leave it just in case something is missed.
2607 }
2608 if (element != null) {
2609 sb.append("($cellStateString,$element),")
2610 } else {
2611 sb.append("$cellStateString,")
2612 }
2613 }
2614 // Process the next segment if exists.
2615 segment = segment.next ?: break
2616 }
2617 if (sb.last() == ',') sb.deleteAt(sb.length - 1)
2618 sb.append("]")
2619 // The string representation is constructed.
2620 return sb.toString()
2621 }
2622
2623 // Returns a debug representation of this channel,
2624 // which is actively used in Lincheck tests.
2625 internal fun toStringDebug(): String {
2626 val sb = StringBuilder()
2627 // Append the counter values and the close status
2628 sb.append("S=${sendersCounter},R=${receiversCounter},B=${bufferEndCounter},B'=${completedExpandBuffersAndPauseFlag.value},C=${sendersAndCloseStatus.value.sendersCloseStatus},")
2629 when (sendersAndCloseStatus.value.sendersCloseStatus) {
2630 CLOSE_STATUS_CANCELLATION_STARTED -> sb.append("CANCELLATION_STARTED,")
2631 CLOSE_STATUS_CLOSED -> sb.append("CLOSED,")
2632 CLOSE_STATUS_CANCELLED -> sb.append("CANCELLED,")
2633 }
2634 // Append the segment references
2635 sb.append("SEND_SEGM=${sendSegment.value.hexAddress},RCV_SEGM=${receiveSegment.value.hexAddress}")
2636 if (!isRendezvousOrUnlimited) sb.append(",EB_SEGM=${bufferEndSegment.value.hexAddress}")
2637 sb.append(" ") // add some space
2638 // Append the linked list of segments.
2639 val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2640 .filter { it !== NULL_SEGMENT }
2641 .minBy { it.id }
2642 var segment = firstSegment
2643 while (true) {
2644 sb.append("${segment.hexAddress}=[${if (segment.isRemoved) "*" else ""}${segment.id},prev=${segment.prev?.hexAddress},")
2645 repeat(SEGMENT_SIZE) { i ->
2646 val cellState = segment.getState(i)
2647 val element = segment.getElement(i)
2648 val cellStateString = when (cellState) {
2649 is CancellableContinuation<*> -> "cont"
2650 is SelectInstance<*> -> "select"
2651 is ReceiveCatching<*> -> "receiveCatching"
2652 is SendBroadcast -> "send(broadcast)"
2653 is WaiterEB -> "EB($cellState)"
2654 else -> cellState.toString()
2655 }
2656 sb.append("[$i]=($cellStateString,$element),")
2657 }
2658 sb.append("next=${segment.next?.hexAddress}] ")
2659 // Process the next segment if exists.
2660 segment = segment.next ?: break
2661 }
2662 // The string representation of this channel is now constructed!
2663 return sb.toString()
2664 }
2665
2666
2667 // This is an internal methods for tests.
2668 fun checkSegmentStructureInvariants() {
2669 if (isRendezvousOrUnlimited) {
2670 check(bufferEndSegment.value === NULL_SEGMENT) {
2671 "bufferEndSegment must be NULL_SEGMENT for rendezvous and unlimited channels; they do not manipulate it.\n" +
2672 "Channel state: $this"
2673 }
2674 } else {
2675 check(receiveSegment.value.id <= bufferEndSegment.value.id) {
2676 "bufferEndSegment should not have lower id than receiveSegment.\n" +
2677 "Channel state: $this"
2678 }
2679 }
2680 val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2681 .filter { it !== NULL_SEGMENT }
2682 .minBy { it.id }
2683 check(firstSegment.prev == null) {
2684 "All processed segments should be unreachable from the data structure, but the `prev` link of the leftmost segment is non-null.\n" +
2685 "Channel state: $this"
2686 }
2687 // Check that the doubly-linked list of segments does not
2688 // contain full-of-cancelled-cells segments.
2689 var segment = firstSegment
2690 while (segment.next != null) {
2691 // Note that the `prev` reference can be `null` if this channel is closed.
2692 check(segment.next!!.prev == null || segment.next!!.prev === segment) {
2693 "The `segment.next.prev === segment` invariant is violated.\n" +
2694 "Channel state: $this"
2695 }
2696 // Count the number of closed/interrupted cells
2697 // and check that all cells are in expected states.
2698 var interruptedOrClosedCells = 0
2699 for (i in 0 until SEGMENT_SIZE) {
2700 when (val state = segment.getState(i)) {
2701 BUFFERED -> {} // The cell stores a buffered element.
2702 is Waiter -> {} // The cell stores a suspended request.
2703 INTERRUPTED_RCV, INTERRUPTED_SEND, CHANNEL_CLOSED -> {
2704 // The cell stored an interrupted request or indicates
2705 // that this channel is already closed.
2706 // Check that the element slot is cleaned and increment
2707 // the number of cells in closed/interrupted state.
2708 check(segment.getElement(i) == null)
2709 interruptedOrClosedCells++
2710 }
2711 POISONED, DONE_RCV -> {
2712 // The cell is successfully processed or poisoned.
2713 // Check that the element slot is cleaned.
2714 check(segment.getElement(i) == null)
2715 }
2716 // Other states are illegal after all running operations finish.
2717 else -> error("Unexpected segment cell state: $state.\nChannel state: $this")
2718 }
2719 }
2720 // Is this segment full of cancelled/closed cells?
2721 // If so, this segment should be removed from the
2722 // linked list if nether `receiveSegment`, nor
2723 // `sendSegment`, nor `bufferEndSegment` reference it.
2724 if (interruptedOrClosedCells == SEGMENT_SIZE) {
2725 check(segment === receiveSegment.value || segment === sendSegment.value || segment === bufferEndSegment.value) {
2726 "Logically removed segment is reachable.\nChannel state: $this"
2727 }
2728 }
2729 // Process the next segment.
2730 segment = segment.next!!
2731 }
2732 }
2733 }
2734
2735 /**
2736 * The channel is represented as a list of segments, which simulates an infinite array.
2737 * Each segment has its own [id], which increase from the beginning. These [id]s help
2738 * to update [BufferedChannel.sendSegment], [BufferedChannel.receiveSegment],
2739 * and [BufferedChannel.bufferEndSegment] correctly.
2740 */
2741 internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: BufferedChannel<E>?, pointers: Int) : Segment<ChannelSegment<E>>(id, prev, pointers) {
2742 private val _channel: BufferedChannel<E>? = channel
2743 val channel get() = _channel!! // always non-null except for `NULL_SEGMENT`
2744
2745 private val data = atomicArrayOfNulls<Any?>(SEGMENT_SIZE * 2) // 2 registers per slot: state + element
2746 override val numberOfSlots: Int get() = SEGMENT_SIZE
2747
2748 // ########################################
2749 // # Manipulation with the Element Fields #
2750 // ########################################
2751
storeElementnull2752 internal fun storeElement(index: Int, element: E) {
2753 setElementLazy(index, element)
2754 }
2755
2756 @Suppress("UNCHECKED_CAST")
getElementnull2757 internal fun getElement(index: Int) = data[index * 2].value as E
2758
2759 internal fun retrieveElement(index: Int): E = getElement(index).also { cleanElement(index) }
2760
cleanElementnull2761 internal fun cleanElement(index: Int) {
2762 setElementLazy(index, null)
2763 }
2764
setElementLazynull2765 private fun setElementLazy(index: Int, value: Any?) {
2766 data[index * 2].lazySet(value)
2767 }
2768
2769 // ######################################
2770 // # Manipulation with the State Fields #
2771 // ######################################
2772
getStatenull2773 internal fun getState(index: Int): Any? = data[index * 2 + 1].value
2774
2775 internal fun setState(index: Int, value: Any?) {
2776 data[index * 2 + 1].value = value
2777 }
2778
casStatenull2779 internal fun casState(index: Int, from: Any?, to: Any?) = data[index * 2 + 1].compareAndSet(from, to)
2780
2781 internal fun getAndSetState(index: Int, update: Any?) = data[index * 2 + 1].getAndSet(update)
2782
2783
2784 // ########################
2785 // # Cancellation Support #
2786 // ########################
2787
2788 override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
2789 // To distinguish cancelled senders and receivers, senders equip the index value with
2790 // an additional marker, adding `SEGMENT_SIZE` to the value.
2791 val isSender = index >= SEGMENT_SIZE
2792 // Unwrap the index.
2793 @Suppress("NAME_SHADOWING") val index = if (isSender) index - SEGMENT_SIZE else index
2794 // Read the element, which may be needed further to call `onUndeliveredElement`.
2795 val element = getElement(index)
2796 // Update the cell state.
2797 while (true) {
2798 // CAS-loop
2799 // Read the current state of the cell.
2800 val cur = getState(index)
2801 when {
2802 // The cell stores a waiter.
2803 cur is Waiter || cur is WaiterEB -> {
2804 // The cancelled request is either send or receive.
2805 // Update the cell state correspondingly.
2806 val update = if (isSender) INTERRUPTED_SEND else INTERRUPTED_RCV
2807 if (casState(index, cur, update)) {
2808 // The waiter has been successfully cancelled.
2809 // Clean the element slot and invoke `onSlotCleaned()`,
2810 // which may cause deleting the whole segment from the linked list.
2811 // In case the cancelled request is receiver, it is critical to ensure
2812 // that the `expandBuffer()` attempt that processes this cell is completed,
2813 // so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`.
2814 cleanElement(index)
2815 onCancelledRequest(index, !isSender)
2816 // Call `onUndeliveredElement` if needed.
2817 if (isSender) {
2818 channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2819 }
2820 return
2821 }
2822 }
2823 // The cell already indicates that the operation is cancelled.
2824 cur === INTERRUPTED_SEND || cur === INTERRUPTED_RCV -> {
2825 // Clean the element slot to avoid memory leaks,
2826 // invoke `onUndeliveredElement` if needed, and finish
2827 cleanElement(index)
2828 // Call `onUndeliveredElement` if needed.
2829 if (isSender) {
2830 channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2831 }
2832 return
2833 }
2834 // An opposite operation is resuming this request;
2835 // wait until the cell state updates.
2836 // It is possible that an opposite operation has already
2837 // resumed this request, which will result in updating
2838 // the cell state to `DONE_RCV` or `BUFFERED`, while the
2839 // current cancellation is caused by prompt cancellation.
2840 cur === RESUMING_BY_EB || cur === RESUMING_BY_RCV -> continue
2841 // This request was successfully resumed, so this cancellation
2842 // is caused by the prompt cancellation feature and should be ignored.
2843 cur === DONE_RCV || cur === BUFFERED -> return
2844 // The cell state indicates that the channel is closed;
2845 // this cancellation should be ignored.
2846 cur === CHANNEL_CLOSED -> return
2847 else -> error("unexpected state: $cur")
2848 }
2849 }
2850 }
2851
2852 /**
2853 * Invokes `onSlotCleaned()` preceded by a `waitExpandBufferCompletion(..)` call
2854 * in case the cancelled request is receiver.
2855 */
onCancelledRequestnull2856 fun onCancelledRequest(index: Int, receiver: Boolean) {
2857 if (receiver) channel.waitExpandBufferCompletion(id * SEGMENT_SIZE + index)
2858 onSlotCleaned()
2859 }
2860 }
2861
2862 // WA for atomicfu + JVM_IR compiler bug that lead to SMAP-related compiler crashes: KT-55983
createSegmentFunctionnull2863 internal fun <E> createSegmentFunction(): KFunction2<Long, ChannelSegment<E>, ChannelSegment<E>> = ::createSegment
2864
2865 private fun <E> createSegment(id: Long, prev: ChannelSegment<E>) = ChannelSegment(
2866 id = id,
2867 prev = prev,
2868 channel = prev.channel,
2869 pointers = 0
2870 )
2871 private val NULL_SEGMENT = ChannelSegment<Any?>(id = -1, prev = null, channel = null, pointers = 0)
2872
2873 /**
2874 * Number of cells in each segment.
2875 */
2876 @JvmField
2877 internal val SEGMENT_SIZE = systemProp("kotlinx.coroutines.bufferedChannel.segmentSize", 32)
2878
2879 /**
2880 * Number of iterations to wait in [BufferedChannel.waitExpandBufferCompletion] until the numbers of started and completed
2881 * [BufferedChannel.expandBuffer] calls coincide. When the limit is reached, [BufferedChannel.waitExpandBufferCompletion]
2882 * blocks further [BufferedChannel.expandBuffer]-s to avoid starvation.
2883 */
2884 private val EXPAND_BUFFER_COMPLETION_WAIT_ITERATIONS = systemProp("kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations", 10_000)
2885
2886 /**
2887 * Tries to resume this continuation with the specified
2888 * value. Returns `true` on success and `false` on failure.
2889 */
2890 private fun <T> CancellableContinuation<T>.tryResume0(
2891 value: T,
2892 onCancellation: ((cause: Throwable) -> Unit)? = null
2893 ): Boolean =
2894 tryResume(value, null, onCancellation).let { token ->
2895 if (token != null) {
2896 completeResume(token)
2897 true
2898 } else false
2899 }
2900
2901 /*
2902 If the channel is rendezvous or unlimited, the `bufferEnd` counter
2903 should be initialized with the corresponding value below and never change.
2904 In this case, the `expandBuffer(..)` operation does nothing.
2905 */
2906 private const val BUFFER_END_RENDEZVOUS = 0L // no buffer
2907 private const val BUFFER_END_UNLIMITED = Long.MAX_VALUE // infinite buffer
initialBufferEndnull2908 private fun initialBufferEnd(capacity: Int): Long = when (capacity) {
2909 Channel.RENDEZVOUS -> BUFFER_END_RENDEZVOUS
2910 Channel.UNLIMITED -> BUFFER_END_UNLIMITED
2911 else -> capacity.toLong()
2912 }
2913
2914 /*
2915 Cell states. The initial "empty" state is represented with `null`,
2916 and suspended operations are represented with [Waiter] instances.
2917 */
2918
2919 // The cell stores a buffered element.
2920 @JvmField
2921 internal val BUFFERED = Symbol("BUFFERED")
2922 // Concurrent `expandBuffer(..)` can inform the
2923 // upcoming sender that it should buffer the element.
2924 private val IN_BUFFER = Symbol("SHOULD_BUFFER")
2925 // Indicates that a receiver (RCV suffix) is resuming
2926 // the suspended sender; after that, it should update
2927 // the state to either `DONE_RCV` (on success) or
2928 // `INTERRUPTED_SEND` (on failure).
2929 private val RESUMING_BY_RCV = Symbol("S_RESUMING_BY_RCV")
2930 // Indicates that `expandBuffer(..)` (RCV suffix) is resuming
2931 // the suspended sender; after that, it should update
2932 // the state to either `BUFFERED` (on success) or
2933 // `INTERRUPTED_SEND` (on failure).
2934 private val RESUMING_BY_EB = Symbol("RESUMING_BY_EB")
2935 // When a receiver comes to the cell already covered by
2936 // a sender (according to the counters), but the cell
2937 // is still in `EMPTY` or `IN_BUFFER` state, it breaks
2938 // the cell by changing its state to `POISONED`.
2939 private val POISONED = Symbol("POISONED")
2940 // When the element is successfully transferred
2941 // to a receiver, the cell changes to `DONE_RCV`.
2942 private val DONE_RCV = Symbol("DONE_RCV")
2943 // Cancelled sender.
2944 private val INTERRUPTED_SEND = Symbol("INTERRUPTED_SEND")
2945 // Cancelled receiver.
2946 private val INTERRUPTED_RCV = Symbol("INTERRUPTED_RCV")
2947 // Indicates that the channel is closed.
2948 internal val CHANNEL_CLOSED = Symbol("CHANNEL_CLOSED")
2949 // When the cell is already covered by both sender and
2950 // receiver (`sender` and `receivers` counters are greater
2951 // than the cell number), the `expandBuffer(..)` procedure
2952 // cannot distinguish which kind of operation is stored
2953 // in the cell. Thus, it wraps the waiter with this descriptor,
2954 // informing the possibly upcoming receiver that it should
2955 // complete the `expandBuffer(..)` procedure if the waiter stored
2956 // in the cell is sender. In turn, senders ignore this information.
2957 private class WaiterEB(@JvmField val waiter: Waiter) {
toStringnull2958 override fun toString() = "WaiterEB($waiter)"
2959 }
2960
2961
2962
2963 /**
2964 * To distinguish suspended [BufferedChannel.receive] and
2965 * [BufferedChannel.receiveCatching] operations, the latter
2966 * uses this wrapper for its continuation.
2967 */
2968 private class ReceiveCatching<E>(
2969 @JvmField val cont: CancellableContinuationImpl<ChannelResult<E>>
2970 ) : Waiter by cont
2971
2972 /*
2973 Internal results for [BufferedChannel.updateCellReceive].
2974 On successful rendezvous with waiting sender or
2975 buffered element retrieval, the corresponding element
2976 is returned as result of [BufferedChannel.updateCellReceive].
2977 */
2978 private val SUSPEND = Symbol("SUSPEND")
2979 private val SUSPEND_NO_WAITER = Symbol("SUSPEND_NO_WAITER")
2980 private val FAILED = Symbol("FAILED")
2981
2982 /*
2983 Internal results for [BufferedChannel.updateCellSend]
2984 */
2985 private const val RESULT_RENDEZVOUS = 0
2986 private const val RESULT_BUFFERED = 1
2987 private const val RESULT_SUSPEND = 2
2988 private const val RESULT_SUSPEND_NO_WAITER = 3
2989 private const val RESULT_CLOSED = 4
2990 private const val RESULT_FAILED = 5
2991
2992 /**
2993 * Special value for [BufferedChannel.BufferedChannelIterator.receiveResult]
2994 * that indicates the absence of pre-received result.
2995 */
2996 private val NO_RECEIVE_RESULT = Symbol("NO_RECEIVE_RESULT")
2997
2998 /*
2999 As [BufferedChannel.invokeOnClose] can be invoked concurrently
3000 with channel closing, we have to synchronize them. These two
3001 markers help with the synchronization.
3002 */
3003 private val CLOSE_HANDLER_CLOSED = Symbol("CLOSE_HANDLER_CLOSED")
3004 private val CLOSE_HANDLER_INVOKED = Symbol("CLOSE_HANDLER_INVOKED")
3005
3006 /**
3007 * Specifies the absence of closing cause, stored in [BufferedChannel._closeCause].
3008 * When the channel is closed or cancelled without exception, this [NO_CLOSE_CAUSE]
3009 * marker should be replaced with `null`.
3010 */
3011 private val NO_CLOSE_CAUSE = Symbol("NO_CLOSE_CAUSE")
3012
3013 /*
3014 The channel close statuses. The transition scheme is the following:
3015 +--------+ +----------------------+ +-----------+
3016 | ACTIVE |-->| CANCELLATION_STARTED |-->| CANCELLED |
3017 +--------+ +----------------------+ +-----------+
3018 | ^
3019 | +--------+ |
3020 +------------>| CLOSED |------------------+
3021 +--------+
3022 We need `CANCELLATION_STARTED` to synchronize
3023 concurrent closing and cancellation.
3024 */
3025 private const val CLOSE_STATUS_ACTIVE = 0
3026 private const val CLOSE_STATUS_CANCELLATION_STARTED = 1
3027 private const val CLOSE_STATUS_CLOSED = 2
3028 private const val CLOSE_STATUS_CANCELLED = 3
3029
3030 /*
3031 The `senders` counter and the channel close status
3032 are stored in a single 64-bit register to save the space
3033 and reduce the number of reads in sending operations.
3034 The code below encapsulates the required bit arithmetics.
3035 */
3036 private const val SENDERS_CLOSE_STATUS_SHIFT = 60
3037 private const val SENDERS_COUNTER_MASK = (1L shl SENDERS_CLOSE_STATUS_SHIFT) - 1
3038 private inline val Long.sendersCounter get() = this and SENDERS_COUNTER_MASK
3039 private inline val Long.sendersCloseStatus: Int get() = (this shr SENDERS_CLOSE_STATUS_SHIFT).toInt()
3040 private fun constructSendersAndCloseStatus(counter: Long, closeStatus: Int): Long =
3041 (closeStatus.toLong() shl SENDERS_CLOSE_STATUS_SHIFT) + counter
3042
3043 /*
3044 The `completedExpandBuffersAndPauseFlag` 64-bit counter contains
3045 the number of completed `expandBuffer()` attempts along with a special
3046 flag that pauses progress to avoid starvation in `waitExpandBufferCompletion(..)`.
3047 The code below encapsulates the required bit arithmetics.
3048 */
3049 private const val EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT = 1L shl 62
3050 private const val EB_COMPLETED_COUNTER_MASK = EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT - 1
3051 private inline val Long.ebCompletedCounter get() = this and EB_COMPLETED_COUNTER_MASK
3052 private inline val Long.ebPauseExpandBuffers: Boolean get() = (this and EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT) != 0L
3053 private fun constructEBCompletedAndPauseFlag(counter: Long, pauseEB: Boolean): Long =
3054 (if (pauseEB) EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT else 0) + counter
3055