<lambda>null1 @file:Suppress("PrivatePropertyName")
2
3 package kotlinx.coroutines.channels
4
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.channels.ChannelResult.Companion.closed
8 import kotlinx.coroutines.channels.ChannelResult.Companion.failure
9 import kotlinx.coroutines.channels.ChannelResult.Companion.success
10 import kotlinx.coroutines.internal.*
11 import kotlinx.coroutines.selects.*
12 import kotlinx.coroutines.selects.TrySelectDetailedResult.*
13 import kotlin.coroutines.*
14 import kotlin.js.*
15 import kotlin.jvm.*
16 import kotlin.math.*
17 import kotlin.reflect.*
18
19 /**
20 * The buffered channel implementation, which also serves as a rendezvous channel when the capacity is zero.
21 * The high-level structure bases on a conceptually infinite array for storing elements and waiting requests,
22 * separate counters of [send] and [receive] invocations that were ever performed, and an additional counter
23 * that indicates the end of the logical buffer by counting the number of array cells it ever contained.
24 * The key idea is that both [send] and [receive] start by incrementing their counters, assigning the array cell
25 * referenced by the counter. In case of rendezvous channels, the operation either suspends and stores its continuation
26 * in the cell or makes a rendezvous with the opposite request. Each cell can be processed by exactly one [send] and
27 * one [receive]. As for buffered channels, [send]-s can also add elements without suspension if the logical buffer
28 * contains the cell, while the [receive] operation updates the end of the buffer when its synchronization finishes.
29 *
30 * Please see the ["Fast and Scalable Channels in Kotlin Coroutines"](https://arxiv.org/abs/2211.04986)
31 * paper by Nikita Koval, Roman Elizarov, and Dan Alistarh for the detailed algorithm description.
32 */
33 internal open class BufferedChannel<E>(
34 /**
35 * Channel capacity; `Channel.RENDEZVOUS` for rendezvous channel
36 * and `Channel.UNLIMITED` for unlimited capacity.
37 */
38 private val capacity: Int,
39 @JvmField
40 internal val onUndeliveredElement: OnUndeliveredElement<E>? = null
41 ) : Channel<E> {
42 init {
43 require(capacity >= 0) { "Invalid channel capacity: $capacity, should be >=0" }
44 // This implementation has second `init`.
45 }
46
47 // Maintenance note: use `Buffered1ChannelLincheckTest` to check hypotheses.
48
49 /*
50 The counters indicate the total numbers of send, receive, and buffer expansion calls
51 ever performed. The counters are incremented in the beginning of the corresponding
52 operation; thus, acquiring a unique (for the operation type) cell to process.
53 The segments reference to the last working one for each operation type.
54
55 Notably, the counter for send is combined with the channel closing status
56 for synchronization simplicity and performance reasons.
57
58 The logical end of the buffer is initialized with the channel capacity.
59 If the channel is rendezvous or unlimited, the counter equals `BUFFER_END_RENDEZVOUS`
60 or `BUFFER_END_RENDEZVOUS`, respectively, and never updates. The `bufferEndSegment`
61 point to a special `NULL_SEGMENT` in this case.
62 */
63 private val sendersAndCloseStatus = atomic(0L)
64 private val receivers = atomic(0L)
65 private val bufferEnd = atomic(initialBufferEnd(capacity))
66
67 internal val sendersCounter: Long get() = sendersAndCloseStatus.value.sendersCounter
68 internal val receiversCounter: Long get() = receivers.value
69 private val bufferEndCounter: Long get() = bufferEnd.value
70
71 /*
72 Additionally to the counters above, we need an extra one that
73 tracks the number of cells processed by `expandBuffer()`.
74 When a receiver aborts, the corresponding cell might be
75 physically removed from the data structure to avoid memory
76 leaks, while it still can be unprocessed by `expandBuffer()`.
77 In this case, `expandBuffer()` cannot know whether the
78 removed cell contained sender or receiver and, therefore,
79 cannot proceed. To solve the race, we ensure that cells
80 correspond to cancelled receivers cannot be physically
81 removed until the cell is processed.
82 This additional counter enables the synchronization,
83 */
84 private val completedExpandBuffersAndPauseFlag = atomic(bufferEndCounter)
85
86 private val isRendezvousOrUnlimited
87 get() = bufferEndCounter.let { it == BUFFER_END_RENDEZVOUS || it == BUFFER_END_UNLIMITED }
88
89 private val sendSegment: AtomicRef<ChannelSegment<E>>
90 private val receiveSegment: AtomicRef<ChannelSegment<E>>
91 private val bufferEndSegment: AtomicRef<ChannelSegment<E>>
92
93 init {
94 @Suppress("LeakingThis")
95 val firstSegment = ChannelSegment(id = 0, prev = null, channel = this, pointers = 3)
96 sendSegment = atomic(firstSegment)
97 receiveSegment = atomic(firstSegment)
98 // If this channel is rendezvous or has unlimited capacity, the algorithm never
99 // invokes the buffer expansion procedure, and the corresponding segment reference
100 // points to a special `NULL_SEGMENT` one and never updates.
101 @Suppress("UNCHECKED_CAST")
102 bufferEndSegment = atomic(if (isRendezvousOrUnlimited) (NULL_SEGMENT as ChannelSegment<E>) else firstSegment)
103 }
104
105 // #########################
106 // ## The send operations ##
107 // #########################
108
109 override suspend fun send(element: E): Unit =
110 sendImpl( // <-- this is an inline function
111 element = element,
112 // Do not create a continuation until it is required;
113 // it is created later via [onNoWaiterSuspend], if needed.
114 waiter = null,
115 // Finish immediately if a rendezvous happens
116 // or the element has been buffered.
117 onRendezvousOrBuffered = {},
118 // As no waiter is provided, suspension is impossible.
119 onSuspend = { _, _ -> assert { false } },
120 // According to the `send(e)` contract, we need to call
121 // `onUndeliveredElement(..)` handler and throw an exception
122 // if the channel is already closed.
123 onClosed = { onClosedSend(element) },
124 // When `send(e)` decides to suspend, the corresponding
125 // `onNoWaiterSuspend` function that creates a continuation
126 // is called. The tail-call optimization is applied here.
127 onNoWaiterSuspend = { segm, i, elem, s -> sendOnNoWaiterSuspend(segm, i, elem, s) }
128 )
129
130 // NB: return type could've been Nothing, but it breaks TCO
131 private suspend fun onClosedSend(element: E): Unit = suspendCancellableCoroutine { continuation ->
132 onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
133 // If it crashes, add send exception as suppressed for better diagnostics
134 it.addSuppressed(sendException)
135 continuation.resumeWithStackTrace(it)
136 return@suspendCancellableCoroutine
137 }
138 continuation.resumeWithStackTrace(sendException)
139 }
140
141 private suspend fun sendOnNoWaiterSuspend(
142 /* The working cell is specified by
143 the segment and the index in it. */
144 segment: ChannelSegment<E>,
145 index: Int,
146 /** The element to be inserted. */
147 element: E,
148 /** The global index of the cell. */
149 s: Long
150 ) = suspendCancellableCoroutineReusable sc@{ cont ->
151 sendImplOnNoWaiter( // <-- this is an inline function
152 segment = segment, index = index, element = element, s = s,
153 // Store the created continuation as a waiter.
154 waiter = cont,
155 // If a rendezvous happens or the element has been buffered,
156 // resume the continuation and finish. In case of prompt
157 // cancellation, it is guaranteed that the element
158 // has been already buffered or passed to receiver.
159 onRendezvousOrBuffered = { cont.resume(Unit) },
160 // If the channel is closed, call `onUndeliveredElement(..)` and complete the
161 // continuation with the corresponding exception.
162 onClosed = { onClosedSendOnNoWaiterSuspend(element, cont) },
163 )
164 }
165
166 private fun Waiter.prepareSenderForSuspension(
167 /* The working cell is specified by
168 the segment and the index in it. */
169 segment: ChannelSegment<E>,
170 index: Int
171 ) {
172 // To distinguish cancelled senders and receivers,
173 // senders equip the index value with an additional marker,
174 // adding `SEGMENT_SIZE` to the value.
175 invokeOnCancellation(segment, index + SEGMENT_SIZE)
176 }
177
178 private fun onClosedSendOnNoWaiterSuspend(element: E, cont: CancellableContinuation<Unit>) {
179 onUndeliveredElement?.callUndeliveredElement(element, cont.context)
180 cont.resumeWithException(recoverStackTrace(sendException, cont))
181 }
182
183 override fun trySend(element: E): ChannelResult<Unit> {
184 // Do not try to send the element if the plain `send(e)` operation would suspend.
185 if (shouldSendSuspend(sendersAndCloseStatus.value)) return failure()
186 // This channel either has waiting receivers or is closed.
187 // Let's try to send the element!
188 // The logic is similar to the plain `send(e)` operation, with
189 // the only difference that we install `INTERRUPTED_SEND` in case
190 // the operation decides to suspend.
191 return sendImpl( // <-- this is an inline function
192 element = element,
193 // Store an already interrupted sender in case of suspension.
194 waiter = INTERRUPTED_SEND,
195 // Finish successfully when a rendezvous happens
196 // or the element has been buffered.
197 onRendezvousOrBuffered = { success(Unit) },
198 // On suspension, the `INTERRUPTED_SEND` token has been installed,
199 // and this `trySend(e)` must fail. According to the contract,
200 // we do not need to call the [onUndeliveredElement] handler.
201 onSuspend = { segm, _ ->
202 segm.onSlotCleaned()
203 failure()
204 },
205 // If the channel is closed, return the corresponding result.
206 onClosed = { closed(sendException) }
207 )
208 }
209
210 /**
211 * This is a special `send(e)` implementation that returns `true` if the element
212 * has been successfully sent, and `false` if the channel is closed.
213 *
214 * In case of coroutine cancellation, the element may be undelivered --
215 * the [onUndeliveredElement] feature is unsupported in this implementation.
216 *
217 */
218 internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutine { cont ->
219 check(onUndeliveredElement == null) {
220 "the `onUndeliveredElement` feature is unsupported for `sendBroadcast(e)`"
221 }
222 sendImpl(
223 element = element,
224 waiter = SendBroadcast(cont),
225 onRendezvousOrBuffered = { cont.resume(true) },
226 onSuspend = { _, _ -> },
227 onClosed = { cont.resume(false) }
228 )
229 }
230
231 /**
232 * Specifies waiting [sendBroadcast] operation.
233 */
234 private class SendBroadcast(
235 val cont: CancellableContinuation<Boolean>
236 ) : Waiter by cont as CancellableContinuationImpl<Boolean>
237
238 /**
239 * Abstract send implementation.
240 */
241 private inline fun <R> sendImpl(
242 /* The element to be sent. */
243 element: E,
244 /* The waiter to be stored in case of suspension,
245 or `null` if the waiter is not created yet.
246 In the latter case, when the algorithm decides
247 to suspend, [onNoWaiterSuspend] is called. */
248 waiter: Any?,
249 /* This lambda is invoked when the element has been
250 buffered or a rendezvous with a receiver happens. */
251 onRendezvousOrBuffered: () -> R,
252 /* This lambda is called when the operation suspends in the
253 cell specified by the segment and the index in it. */
254 onSuspend: (segm: ChannelSegment<E>, i: Int) -> R,
255 /* This lambda is called when the channel
256 is observed in the closed state. */
257 onClosed: () -> R,
258 /* This lambda is called when the operation decides
259 to suspend, but the waiter is not provided (equals `null`).
260 It should create a waiter and delegate to `sendImplOnNoWaiter`. */
261 onNoWaiterSuspend: (
262 segm: ChannelSegment<E>,
263 i: Int,
264 element: E,
265 s: Long
266 ) -> R = { _, _, _, _ -> error("unexpected") }
267 ): R {
268 // Read the segment reference before the counter increment;
269 // it is crucial to be able to find the required segment later.
270 var segment = sendSegment.value
271 while (true) {
272 // Atomically increment the `senders` counter and obtain the
273 // value right before the increment along with the close status.
274 val sendersAndCloseStatusCur = sendersAndCloseStatus.getAndIncrement()
275 val s = sendersAndCloseStatusCur.sendersCounter
276 // Is this channel already closed? Keep the information.
277 val closed = sendersAndCloseStatusCur.isClosedForSend0
278 // Count the required segment id and the cell index in it.
279 val id = s / SEGMENT_SIZE
280 val i = (s % SEGMENT_SIZE).toInt()
281 // Try to find the required segment if the initially obtained
282 // one (in the beginning of this function) has lower id.
283 if (segment.id != id) {
284 // Find the required segment.
285 segment = findSegmentSend(id, segment) ?:
286 // The required segment has not been found.
287 // Finish immediately if this channel is closed,
288 // restarting the operation otherwise.
289 // In the latter case, the required segment was full
290 // of interrupted waiters and, therefore, removed
291 // physically to avoid memory leaks.
292 if (closed) {
293 return onClosed()
294 } else {
295 continue
296 }
297 }
298 // Update the cell according to the algorithm. Importantly, when
299 // the channel is already closed, storing a waiter is illegal, so
300 // the algorithm stores the `INTERRUPTED_SEND` token in this case.
301 when (updateCellSend(segment, i, element, s, waiter, closed)) {
302 RESULT_RENDEZVOUS -> {
303 // A rendezvous with a receiver has happened.
304 // The previous segments are no longer needed
305 // for the upcoming requests, so the algorithm
306 // resets the link to the previous segment.
307 segment.cleanPrev()
308 return onRendezvousOrBuffered()
309 }
310 RESULT_BUFFERED -> {
311 // The element has been buffered.
312 return onRendezvousOrBuffered()
313 }
314 RESULT_SUSPEND -> {
315 // The operation has decided to suspend and installed the
316 // specified waiter. If the channel was already closed,
317 // and the `INTERRUPTED_SEND` token has been installed as a waiter,
318 // this request finishes with the `onClosed()` action.
319 if (closed) {
320 segment.onSlotCleaned()
321 return onClosed()
322 }
323 (waiter as? Waiter)?.prepareSenderForSuspension(segment, i)
324 return onSuspend(segment, i)
325 }
326 RESULT_CLOSED -> {
327 // This channel is closed.
328 // In case this segment is already or going to be
329 // processed by a receiver, ensure that all the
330 // previous segments are unreachable.
331 if (s < receiversCounter) segment.cleanPrev()
332 return onClosed()
333 }
334 RESULT_FAILED -> {
335 // Either the cell stores an interrupted receiver,
336 // or it was poisoned by a concurrent receiver.
337 // In both cases, all the previous segments are already processed,
338 segment.cleanPrev()
339 continue
340 }
341 RESULT_SUSPEND_NO_WAITER -> {
342 // The operation has decided to suspend,
343 // but no waiter has been provided.
344 return onNoWaiterSuspend(segment, i, element, s)
345 }
346 }
347 }
348 }
349
350 // Note: this function is temporarily moved from ConflatedBufferedChannel to BufferedChannel class, because of this issue: KT-65554.
351 // For now, an inline function, which invokes atomic operations, may only be called within a parent class.
352 protected fun trySendDropOldest(element: E): ChannelResult<Unit> =
353 sendImpl( // <-- this is an inline function
354 element = element,
355 // Put the element into the logical buffer even
356 // if this channel is already full, the `onSuspend`
357 // callback below extract the first (oldest) element.
358 waiter = BUFFERED,
359 // Finish successfully when a rendezvous has happened
360 // or the element has been buffered.
361 onRendezvousOrBuffered = { return success(Unit) },
362 // In case the algorithm decided to suspend, the element
363 // was added to the buffer. However, as the buffer is now
364 // overflowed, the first (oldest) element has to be extracted.
365 onSuspend = { segm, i ->
366 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i)
367 return success(Unit)
368 },
369 // If the channel is closed, return the corresponding result.
370 onClosed = { return closed(sendException) }
371 )
372
373 private inline fun sendImplOnNoWaiter(
374 /* The working cell is specified by
375 the segment and the index in it. */
376 segment: ChannelSegment<E>,
377 index: Int,
378 /* The element to be sent. */
379 element: E,
380 /* The global index of the cell. */
381 s: Long,
382 /* The waiter to be stored in case of suspension. */
383 waiter: Waiter,
384 /* This lambda is invoked when the element has been
385 buffered or a rendezvous with a receiver happens.*/
386 onRendezvousOrBuffered: () -> Unit,
387 /* This lambda is called when the channel
388 is observed in the closed state. */
389 onClosed: () -> Unit,
390 ) {
391 // Update the cell again, now with the non-null waiter,
392 // restarting the operation from the beginning on failure.
393 // Check the `sendImpl(..)` function for the comments.
394 when (updateCellSend(segment, index, element, s, waiter, false)) {
395 RESULT_RENDEZVOUS -> {
396 segment.cleanPrev()
397 onRendezvousOrBuffered()
398 }
399 RESULT_BUFFERED -> {
400 onRendezvousOrBuffered()
401 }
402 RESULT_SUSPEND -> {
403 waiter.prepareSenderForSuspension(segment, index)
404 }
405 RESULT_CLOSED -> {
406 if (s < receiversCounter) segment.cleanPrev()
407 onClosed()
408 }
409 RESULT_FAILED -> {
410 segment.cleanPrev()
411 sendImpl(
412 element = element,
413 waiter = waiter,
414 onRendezvousOrBuffered = onRendezvousOrBuffered,
415 onSuspend = { _, _ -> },
416 onClosed = onClosed,
417 )
418 }
419 else -> error("unexpected")
420 }
421 }
422
423 private fun updateCellSend(
424 /* The working cell is specified by
425 the segment and the index in it. */
426 segment: ChannelSegment<E>,
427 index: Int,
428 /* The element to be sent. */
429 element: E,
430 /* The global index of the cell. */
431 s: Long,
432 /* The waiter to be stored in case of suspension. */
433 waiter: Any?,
434 closed: Boolean
435 ): Int {
436 // This is a fast-path of `updateCellSendSlow(..)`.
437 //
438 // First, the algorithm stores the element,
439 // performing the synchronization after that.
440 // This way, receivers safely retrieve the
441 // element, following the safe publication pattern.
442 segment.storeElement(index, element)
443 if (closed) return updateCellSendSlow(segment, index, element, s, waiter, closed)
444 // Read the current cell state.
445 val state = segment.getState(index)
446 when {
447 // The cell is empty.
448 state === null -> {
449 // If the element should be buffered, or a rendezvous should happen
450 // while the receiver is still coming, try to buffer the element.
451 // Otherwise, try to store the specified waiter in the cell.
452 if (bufferOrRendezvousSend(s)) {
453 // Move the cell state to `BUFFERED`.
454 if (segment.casState(index, null, BUFFERED)) {
455 // The element has been successfully buffered, finish.
456 return RESULT_BUFFERED
457 }
458 } else {
459 // This `send(e)` operation should suspend.
460 // However, in case the channel has already
461 // been observed closed, `INTERRUPTED_SEND`
462 // is installed instead.
463 if (waiter == null) {
464 // The waiter is not specified; return the corresponding result.
465 return RESULT_SUSPEND_NO_WAITER
466 } else {
467 // Try to install the waiter.
468 if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
469 }
470 }
471 }
472 // A waiting receiver is stored in the cell.
473 state is Waiter -> {
474 // As the element will be passed directly to the waiter,
475 // the algorithm cleans the element slot in the cell.
476 segment.cleanElement(index)
477 // Try to make a rendezvous with the suspended receiver.
478 return if (state.tryResumeReceiver(element)) {
479 // Rendezvous! Move the cell state to `DONE_RCV` and finish.
480 segment.setState(index, DONE_RCV)
481 onReceiveDequeued()
482 RESULT_RENDEZVOUS
483 } else {
484 // The resumption has failed. Update the cell state correspondingly
485 // and clean the element field. It is also possible for a concurrent
486 // cancellation handler to update the cell state; we can safely
487 // ignore these updates.
488 if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
489 segment.onCancelledRequest(index, true)
490 }
491 RESULT_FAILED
492 }
493 }
494 }
495 return updateCellSendSlow(segment, index, element, s, waiter, closed)
496 }
497
498 /**
499 * Updates the working cell of an abstract send operation.
500 */
501 private fun updateCellSendSlow(
502 /* The working cell is specified by
503 the segment and the index in it. */
504 segment: ChannelSegment<E>,
505 index: Int,
506 /* The element to be sent. */
507 element: E,
508 /* The global index of the cell. */
509 s: Long,
510 /* The waiter to be stored in case of suspension. */
511 waiter: Any?,
512 closed: Boolean
513 ): Int {
514 // Then, the cell state should be updated according to
515 // its state machine; see the paper mentioned in the very
516 // beginning for the cell life-cycle and the algorithm details.
517 while (true) {
518 // Read the current cell state.
519 val state = segment.getState(index)
520 when {
521 // The cell is empty.
522 state === null -> {
523 // If the element should be buffered, or a rendezvous should happen
524 // while the receiver is still coming, try to buffer the element.
525 // Otherwise, try to store the specified waiter in the cell.
526 if (bufferOrRendezvousSend(s) && !closed) {
527 // Move the cell state to `BUFFERED`.
528 if (segment.casState(index, null, BUFFERED)) {
529 // The element has been successfully buffered, finish.
530 return RESULT_BUFFERED
531 }
532 } else {
533 // This `send(e)` operation should suspend.
534 // However, in case the channel has already
535 // been observed closed, `INTERRUPTED_SEND`
536 // is installed instead.
537 when {
538 // The channel is closed
539 closed -> if (segment.casState(index, null, INTERRUPTED_SEND)) {
540 segment.onCancelledRequest(index, false)
541 return RESULT_CLOSED
542 }
543 // The waiter is not specified; return the corresponding result.
544 waiter == null -> return RESULT_SUSPEND_NO_WAITER
545 // Try to install the waiter.
546 else -> if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
547 }
548 }
549 }
550 // This cell is in the logical buffer.
551 state === IN_BUFFER -> {
552 // Try to buffer the element.
553 if (segment.casState(index, state, BUFFERED)) {
554 // The element has been successfully buffered, finish.
555 return RESULT_BUFFERED
556 }
557 }
558 // The cell stores a cancelled receiver.
559 state === INTERRUPTED_RCV -> {
560 // Clean the element slot to avoid memory leaks and finish.
561 segment.cleanElement(index)
562 return RESULT_FAILED
563 }
564 // The cell is poisoned by a concurrent receive.
565 state === POISONED -> {
566 // Clean the element slot to avoid memory leaks and finish.
567 segment.cleanElement(index)
568 return RESULT_FAILED
569 }
570 // The channel is already closed.
571 state === CHANNEL_CLOSED -> {
572 // Clean the element slot to avoid memory leaks,
573 // ensure that the closing/cancellation procedure
574 // has been completed, and finish.
575 segment.cleanElement(index)
576 completeCloseOrCancel()
577 return RESULT_CLOSED
578 }
579 // A waiting receiver is stored in the cell.
580 else -> {
581 assert { state is Waiter || state is WaiterEB }
582 // As the element will be passed directly to the waiter,
583 // the algorithm cleans the element slot in the cell.
584 segment.cleanElement(index)
585 // Unwrap the waiting receiver from `WaiterEB` if needed.
586 // As a receiver is stored in the cell, the buffer expansion
587 // procedure would finish, so senders simply ignore the "EB" marker.
588 val receiver = if (state is WaiterEB) state.waiter else state
589 // Try to make a rendezvous with the suspended receiver.
590 return if (receiver.tryResumeReceiver(element)) {
591 // Rendezvous! Move the cell state to `DONE_RCV` and finish.
592 segment.setState(index, DONE_RCV)
593 onReceiveDequeued()
594 RESULT_RENDEZVOUS
595 } else {
596 // The resumption has failed. Update the cell state correspondingly
597 // and clean the element field. It is also possible for a concurrent
598 // `expandBuffer()` or the cancellation handler to update the cell state;
599 // we can safely ignore these updates as senders do not help `expandBuffer()`.
600 if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
601 segment.onCancelledRequest(index, true)
602 }
603 RESULT_FAILED
604 }
605 }
606 }
607 }
608 }
609
610 /**
611 * Checks whether a [send] invocation is bound to suspend if it is called
612 * with the specified [sendersAndCloseStatus], [receivers], and [bufferEnd]
613 * values. When this channel is already closed, the function returns `false`.
614 *
615 * Specifically, [send] suspends if the channel is not unlimited,
616 * the number of receivers is greater than then index of the working cell of the
617 * potential [send] invocation, and the buffer does not cover this cell
618 * in case of buffered channel.
619 * When the channel is already closed, [send] does not suspend.
620 */
621 @JsName("shouldSendSuspend0")
622 private fun shouldSendSuspend(curSendersAndCloseStatus: Long): Boolean {
623 // Does not suspend if the channel is already closed.
624 if (curSendersAndCloseStatus.isClosedForSend0) return false
625 // Does not suspend if a rendezvous may happen or the buffer is not full.
626 return !bufferOrRendezvousSend(curSendersAndCloseStatus.sendersCounter)
627 }
628
629 /**
630 * Returns `true` when the specified [send] should place
631 * its element to the working cell without suspension.
632 */
633 private fun bufferOrRendezvousSend(curSenders: Long): Boolean =
634 curSenders < bufferEndCounter || curSenders < receiversCounter + capacity
635
636 /**
637 * Checks whether a [send] invocation is bound to suspend if it is called
638 * with the current counter and close status values. See [shouldSendSuspend] for details.
639 *
640 * Note that this implementation is _false positive_ in case of rendezvous channels,
641 * so it can return `false` when a [send] invocation is bound to suspend. Specifically,
642 * the counter of `receive()` operations may indicate that there is a waiting receiver,
643 * while it has already been cancelled, so the potential rendezvous is bound to fail.
644 */
645 internal open fun shouldSendSuspend(): Boolean = shouldSendSuspend(sendersAndCloseStatus.value)
646
647 /**
648 * Tries to resume this receiver with the specified [element] as a result.
649 * Returns `true` on success and `false` otherwise.
650 */
651 @Suppress("UNCHECKED_CAST")
652 private fun Any.tryResumeReceiver(element: E): Boolean = when(this) {
653 is SelectInstance<*> -> { // `onReceiveXXX` select clause
654 trySelect(this@BufferedChannel, element)
655 }
656 is ReceiveCatching<*> -> {
657 this as ReceiveCatching<E>
658 cont.tryResume0(success(element), onUndeliveredElement?.bindCancellationFunResult())
659 }
660 is BufferedChannel<*>.BufferedChannelIterator -> {
661 this as BufferedChannel<E>.BufferedChannelIterator
662 tryResumeHasNext(element)
663 }
664 is CancellableContinuation<*> -> { // `receive()`
665 this as CancellableContinuation<E>
666 tryResume0(element, onUndeliveredElement?.bindCancellationFun())
667 }
668 else -> error("Unexpected receiver type: $this")
669 }
670
671 // ##########################
672 // # The receive operations #
673 // ##########################
674
675 /**
676 * This function is invoked when a receiver is added as a waiter in this channel.
677 */
678 protected open fun onReceiveEnqueued() {}
679
680 /**
681 * This function is invoked when a waiting receiver is no longer stored in this channel;
682 * independently on whether it is caused by rendezvous, cancellation, or channel closing.
683 */
684 protected open fun onReceiveDequeued() {}
685
686 override suspend fun receive(): E =
687 receiveImpl( // <-- this is an inline function
688 // Do not create a continuation until it is required;
689 // it is created later via [onNoWaiterSuspend], if needed.
690 waiter = null,
691 // Return the received element on successful retrieval from
692 // the buffer or rendezvous with a suspended sender.
693 // Also, inform `BufferedChannel` extensions that
694 // synchronization of this receive operation is completed.
695 onElementRetrieved = { element ->
696 return element
697 },
698 // As no waiter is provided, suspension is impossible.
699 onSuspend = { _, _, _ -> error("unexpected") },
700 // Throw an exception if the channel is already closed.
701 onClosed = { throw recoverStackTrace(receiveException) },
702 // If `receive()` decides to suspend, the corresponding
703 // `suspend` function that creates a continuation is called.
704 // The tail-call optimization is applied here.
705 onNoWaiterSuspend = { segm, i, r -> receiveOnNoWaiterSuspend(segm, i, r) }
706 )
707
708 private suspend fun receiveOnNoWaiterSuspend(
709 /* The working cell is specified by
710 the segment and the index in it. */
711 segment: ChannelSegment<E>,
712 index: Int,
713 /* The global index of the cell. */
714 r: Long
715 ) = suspendCancellableCoroutineReusable { cont ->
716 receiveImplOnNoWaiter( // <-- this is an inline function
717 segment = segment, index = index, r = r,
718 // Store the created continuation as a waiter.
719 waiter = cont,
720 // In case of successful element retrieval, resume
721 // the continuation with the element and inform the
722 // `BufferedChannel` extensions that the synchronization
723 // is completed. Importantly, the receiver coroutine
724 // may be cancelled after it is successfully resumed but
725 // not dispatched yet. In case `onUndeliveredElement` is
726 // specified, we need to invoke it in the latter case.
727 onElementRetrieved = { element ->
728 val onCancellation = onUndeliveredElement?.bindCancellationFun()
729 cont.resume(element, onCancellation)
730 },
731 onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
732 )
733 }
734
735 private fun Waiter.prepareReceiverForSuspension(segment: ChannelSegment<E>, index: Int) {
736 onReceiveEnqueued()
737 invokeOnCancellation(segment, index)
738 }
739
740 private fun onClosedReceiveOnNoWaiterSuspend(cont: CancellableContinuation<E>) {
741 cont.resumeWithException(receiveException)
742 }
743
744 /*
745 The implementation is exactly the same as of `receive()`,
746 with the only difference that this function returns a `ChannelResult`
747 instance and does not throw exception explicitly in case the channel
748 is already closed for receiving. Please refer the plain `receive()`
749 implementation for the comments.
750 */
751 override suspend fun receiveCatching(): ChannelResult<E> =
752 receiveImpl( // <-- this is an inline function
753 waiter = null,
754 onElementRetrieved = { element ->
755 success(element)
756 },
757 onSuspend = { _, _, _ -> error("unexpected") },
758 onClosed = { closed(closeCause) },
759 onNoWaiterSuspend = { segm, i, r -> receiveCatchingOnNoWaiterSuspend(segm, i, r) }
760 )
761
762 private suspend fun receiveCatchingOnNoWaiterSuspend(
763 segment: ChannelSegment<E>,
764 index: Int,
765 r: Long
766 ) = suspendCancellableCoroutineReusable { cont ->
767 val waiter = ReceiveCatching(cont as CancellableContinuationImpl<ChannelResult<E>>)
768 receiveImplOnNoWaiter(
769 segment, index, r,
770 waiter = waiter,
771 onElementRetrieved = { element ->
772 cont.resume(success(element), onUndeliveredElement?.bindCancellationFunResult())
773 },
774 onClosed = { onClosedReceiveCatchingOnNoWaiterSuspend(cont) }
775 )
776 }
777
778 private fun onClosedReceiveCatchingOnNoWaiterSuspend(cont: CancellableContinuation<ChannelResult<E>>) {
779 cont.resume(closed(closeCause))
780 }
781
782 override fun tryReceive(): ChannelResult<E> {
783 // Read the `receivers` counter first.
784 val r = receivers.value
785 val sendersAndCloseStatusCur = sendersAndCloseStatus.value
786 // Is this channel closed for receive?
787 if (sendersAndCloseStatusCur.isClosedForReceive0) {
788 return closed(closeCause)
789 }
790 // Do not try to receive an element if the plain `receive()` operation would suspend.
791 val s = sendersAndCloseStatusCur.sendersCounter
792 if (r >= s) return failure()
793 // Let's try to retrieve an element!
794 // The logic is similar to the plain `receive()` operation, with
795 // the only difference that we store `INTERRUPTED_RCV` in case
796 // the operation decides to suspend. This way, we can leverage
797 // the unconditional `Fetch-and-Add` instruction.
798 // One may consider storing `INTERRUPTED_RCV` instead of an actual waiter
799 // on suspension (a.k.a. "no elements to retrieve") as a short-cut of
800 // "suspending and cancelling immediately".
801 return receiveImpl( // <-- this is an inline function
802 // Store an already interrupted receiver in case of suspension.
803 waiter = INTERRUPTED_RCV,
804 // Finish when an element is successfully retrieved.
805 onElementRetrieved = { element -> success(element) },
806 // On suspension, the `INTERRUPTED_RCV` token has been
807 // installed, and this `tryReceive()` must fail.
808 onSuspend = { segm, _, globalIndex ->
809 // Emulate "cancelled" receive, thus invoking 'waitExpandBufferCompletion' manually,
810 // because effectively there were no cancellation
811 waitExpandBufferCompletion(globalIndex)
812 segm.onSlotCleaned()
813 failure()
814 },
815 // If the channel is closed, return the corresponding result.
816 onClosed = { closed(closeCause) }
817 )
818 }
819
820 /**
821 * Extracts the first element from this channel until the cell with the specified
822 * index is moved to the logical buffer. This is a key procedure for the _conflated_
823 * channel implementation, see [ConflatedBufferedChannel] with the [BufferOverflow.DROP_OLDEST]
824 * strategy on buffer overflowing.
825 */
826 protected fun dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(globalCellIndex: Long) {
827 assert { isConflatedDropOldest }
828 // Read the segment reference before the counter increment;
829 // it is crucial to be able to find the required segment later.
830 var segment = receiveSegment.value
831 while (true) {
832 // Read the receivers counter to check whether the specified cell is already in the buffer
833 // or should be moved to the buffer in a short time, due to the already started `receive()`.
834 val r = this.receivers.value
835 if (globalCellIndex < max(r + capacity, bufferEndCounter)) return
836 // The cell is outside the buffer. Try to extract the first element
837 // if the `receivers` counter has not been changed.
838 if (!this.receivers.compareAndSet(r, r + 1)) continue
839 // Count the required segment id and the cell index in it.
840 val id = r / SEGMENT_SIZE
841 val i = (r % SEGMENT_SIZE).toInt()
842 // Try to find the required segment if the initially obtained
843 // segment (in the beginning of this function) has lower id.
844 if (segment.id != id) {
845 // Find the required segment, restarting the operation if it has not been found.
846 segment = findSegmentReceive(id, segment) ?:
847 // The required segment has not been found. It is possible that the channel is already
848 // closed for receiving, so the linked list of segments is closed as well.
849 // In the latter case, the operation will finish eventually after incrementing
850 // the `receivers` counter sufficient times. Note that it is impossible to check
851 // whether this channel is closed for receiving (we do this in `receive`),
852 // as it may call this function when helping to complete closing the channel.
853 continue
854 }
855 // Update the cell according to the cell life-cycle.
856 val updCellResult = updateCellReceive(segment, i, r, null)
857 when {
858 updCellResult === FAILED -> {
859 // The cell is poisoned; restart from the beginning.
860 // To avoid memory leaks, we also need to reset
861 // the `prev` pointer of the working segment.
862 if (r < sendersCounter) segment.cleanPrev()
863 }
864 else -> { // element
865 // A buffered element was retrieved from the cell.
866 // Clean the reference to the previous segment.
867 segment.cleanPrev()
868 @Suppress("UNCHECKED_CAST")
869 onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it }
870 }
871 }
872 }
873 }
874
875 /**
876 * Abstract receive implementation.
877 */
878 private inline fun <R> receiveImpl(
879 /* The waiter to be stored in case of suspension,
880 or `null` if the waiter is not created yet.
881 In the latter case, if the algorithm decides
882 to suspend, [onNoWaiterSuspend] is called. */
883 waiter: Any?,
884 /* This lambda is invoked when an element has been
885 successfully retrieved, either from the buffer or
886 by making a rendezvous with a suspended sender. */
887 onElementRetrieved: (element: E) -> R,
888 /* This lambda is called when the operation suspends in the cell
889 specified by the segment and its global and in-segment indices. */
890 onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
891 /* This lambda is called when the channel is observed
892 in the closed state and no waiting sender is found,
893 which means that it is closed for receiving. */
894 onClosed: () -> R,
895 /* This lambda is called when the operation decides
896 to suspend, but the waiter is not provided (equals `null`).
897 It should create a waiter and delegate to `sendImplOnNoWaiter`. */
898 onNoWaiterSuspend: (
899 segm: ChannelSegment<E>,
900 i: Int,
901 r: Long
902 ) -> R = { _, _, _ -> error("unexpected") }
903 ): R {
904 // Read the segment reference before the counter increment;
905 // it is crucial to be able to find the required segment later.
906 var segment = receiveSegment.value
907 while (true) {
908 // Similar to the `send(e)` operation, `receive()` first checks
909 // whether the channel is already closed for receiving.
910 if (isClosedForReceive) return onClosed()
911 // Atomically increments the `receivers` counter
912 // and obtain the value right before the increment.
913 val r = this.receivers.getAndIncrement()
914 // Count the required segment id and the cell index in it.
915 val id = r / SEGMENT_SIZE
916 val i = (r % SEGMENT_SIZE).toInt()
917 // Try to find the required segment if the initially obtained
918 // segment (in the beginning of this function) has lower id.
919 if (segment.id != id) {
920 // Find the required segment, restarting the operation if it has not been found.
921 segment = findSegmentReceive(id, segment) ?:
922 // The required segment is not found. It is possible that the channel is already
923 // closed for receiving, so the linked list of segments is closed as well.
924 // In the latter case, the operation fails with the corresponding check at the beginning.
925 continue
926 }
927 // Update the cell according to the cell life-cycle.
928 val updCellResult = updateCellReceive(segment, i, r, waiter)
929 return when {
930 updCellResult === SUSPEND -> {
931 // The operation has decided to suspend and
932 // stored the specified waiter in the cell.
933 (waiter as? Waiter)?.prepareReceiverForSuspension(segment, i)
934 onSuspend(segment, i, r)
935 }
936 updCellResult === FAILED -> {
937 // The operation has tried to make a rendezvous
938 // but failed: either the opposite request has
939 // already been cancelled or the cell is poisoned.
940 // Restart from the beginning in this case.
941 // To avoid memory leaks, we also need to reset
942 // the `prev` pointer of the working segment.
943 if (r < sendersCounter) segment.cleanPrev()
944 continue
945 }
946 updCellResult === SUSPEND_NO_WAITER -> {
947 // The operation has decided to suspend,
948 // but no waiter has been provided.
949 onNoWaiterSuspend(segment, i, r)
950 }
951 else -> { // element
952 // Either a buffered element was retrieved from the cell
953 // or a rendezvous with a waiting sender has happened.
954 // Clean the reference to the previous segment before finishing.
955 segment.cleanPrev()
956 @Suppress("UNCHECKED_CAST")
957 onElementRetrieved(updCellResult as E)
958 }
959 }
960 }
961 }
962
963 private inline fun receiveImplOnNoWaiter(
964 /* The working cell is specified by
965 the segment and the index in it. */
966 segment: ChannelSegment<E>,
967 index: Int,
968 /* The global index of the cell. */
969 r: Long,
970 /* The waiter to be stored in case of suspension. */
971 waiter: Waiter,
972 /* This lambda is invoked when an element has been
973 successfully retrieved, either from the buffer or
974 by making a rendezvous with a suspended sender. */
975 onElementRetrieved: (element: E) -> Unit,
976 /* This lambda is called when the channel is observed
977 in the closed state and no waiting senders is found,
978 which means that it is closed for receiving. */
979 onClosed: () -> Unit
980 ) {
981 // Update the cell with the non-null waiter,
982 // restarting from the beginning on failure.
983 // Check the `receiveImpl(..)` function for the comments.
984 val updCellResult = updateCellReceive(segment, index, r, waiter)
985 when {
986 updCellResult === SUSPEND -> {
987 waiter.prepareReceiverForSuspension(segment, index)
988 }
989 updCellResult === FAILED -> {
990 if (r < sendersCounter) segment.cleanPrev()
991 receiveImpl(
992 waiter = waiter,
993 onElementRetrieved = onElementRetrieved,
994 onSuspend = { _, _, _ -> },
995 onClosed = onClosed
996 )
997 }
998 else -> {
999 segment.cleanPrev()
1000 @Suppress("UNCHECKED_CAST")
1001 onElementRetrieved(updCellResult as E)
1002 }
1003 }
1004 }
1005
1006 private fun updateCellReceive(
1007 /* The working cell is specified by
1008 the segment and the index in it. */
1009 segment: ChannelSegment<E>,
1010 index: Int,
1011 /* The global index of the cell. */
1012 r: Long,
1013 /* The waiter to be stored in case of suspension. */
1014 waiter: Any?,
1015 ): Any? {
1016 // This is a fast-path of `updateCellReceiveSlow(..)`.
1017 //
1018 // Read the current cell state.
1019 val state = segment.getState(index)
1020 when {
1021 // The cell is empty.
1022 state === null -> {
1023 // If a rendezvous must happen, the operation does not wait
1024 // until the cell stores a buffered element or a suspended
1025 // sender, poisoning the cell and restarting instead.
1026 // Otherwise, try to store the specified waiter in the cell.
1027 val senders = sendersAndCloseStatus.value.sendersCounter
1028 if (r >= senders) {
1029 // This `receive()` operation should suspend.
1030 if (waiter === null) {
1031 // The waiter is not specified;
1032 // return the corresponding result.
1033 return SUSPEND_NO_WAITER
1034 }
1035 // Try to install the waiter.
1036 if (segment.casState(index, state, waiter)) {
1037 // The waiter has been successfully installed.
1038 // Invoke the `expandBuffer()` procedure and finish.
1039 expandBuffer()
1040 return SUSPEND
1041 }
1042 }
1043 }
1044 // The cell stores a buffered element.
1045 state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
1046 // Retrieve the element and expand the buffer.
1047 expandBuffer()
1048 return segment.retrieveElement(index)
1049 }
1050 }
1051 return updateCellReceiveSlow(segment, index, r, waiter)
1052 }
1053
1054 private fun updateCellReceiveSlow(
1055 /* The working cell is specified by
1056 the segment and the index in it. */
1057 segment: ChannelSegment<E>,
1058 index: Int,
1059 /* The global index of the cell. */
1060 r: Long,
1061 /* The waiter to be stored in case of suspension. */
1062 waiter: Any?,
1063 ): Any? {
1064 // The cell state should be updated according to its state machine;
1065 // see the paper mentioned in the very beginning for the algorithm details.
1066 while (true) {
1067 // Read the current cell state.
1068 val state = segment.getState(index)
1069 when {
1070 // The cell is empty.
1071 state === null || state === IN_BUFFER -> {
1072 // If a rendezvous must happen, the operation does not wait
1073 // until the cell stores a buffered element or a suspended
1074 // sender, poisoning the cell and restarting instead.
1075 // Otherwise, try to store the specified waiter in the cell.
1076 val senders = sendersAndCloseStatus.value.sendersCounter
1077 if (r < senders) {
1078 // The cell is already covered by sender,
1079 // so a rendezvous must happen. Unfortunately,
1080 // the cell is empty, so the operation poisons it.
1081 if (segment.casState(index, state, POISONED)) {
1082 // When the cell becomes poisoned, it is essentially
1083 // the same as storing an already cancelled receiver.
1084 // Thus, the `expandBuffer()` procedure should be invoked.
1085 expandBuffer()
1086 return FAILED
1087 }
1088 } else {
1089 // This `receive()` operation should suspend.
1090 if (waiter === null) {
1091 // The waiter is not specified;
1092 // return the corresponding result.
1093 return SUSPEND_NO_WAITER
1094 }
1095 // Try to install the waiter.
1096 if (segment.casState(index, state, waiter)) {
1097 // The waiter has been successfully installed.
1098 // Invoke the `expandBuffer()` procedure and finish.
1099 expandBuffer()
1100 return SUSPEND
1101 }
1102 }
1103 }
1104 // The cell stores a buffered element.
1105 state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
1106 // Retrieve the element and expand the buffer.
1107 expandBuffer()
1108 return segment.retrieveElement(index)
1109 }
1110 // The cell stores an interrupted sender.
1111 state === INTERRUPTED_SEND -> return FAILED
1112 // The cell is already poisoned by a concurrent
1113 // `hasElements` call. Restart in this case.
1114 state === POISONED -> return FAILED
1115 // This channel is already closed.
1116 state === CHANNEL_CLOSED -> {
1117 // Although the channel is closed, it is still required
1118 // to call the `expandBuffer()` procedure to keep
1119 // `waitForExpandBufferCompletion()` correct.
1120 expandBuffer()
1121 return FAILED
1122 }
1123 // A concurrent `expandBuffer()` is resuming a
1124 // suspended sender. Wait in a spin-loop until
1125 // the resumption attempt completes: the cell
1126 // state must change to either `BUFFERED` or
1127 // `INTERRUPTED_SEND`.
1128 state === RESUMING_BY_EB -> continue
1129 // The cell stores a suspended sender; try to resume it.
1130 else -> {
1131 // To synchronize with expandBuffer(), the algorithm
1132 // first moves the cell to an intermediate `S_RESUMING_BY_RCV`
1133 // state, updating it to either `BUFFERED` (on success) or
1134 // `INTERRUPTED_SEND` (on failure).
1135 if (segment.casState(index, state, RESUMING_BY_RCV)) {
1136 // Has a concurrent `expandBuffer()` delegated its completion?
1137 val helpExpandBuffer = state is WaiterEB
1138 // Extract the sender if needed and try to resume it.
1139 val sender = if (state is WaiterEB) state.waiter else state
1140 return if (sender.tryResumeSender(segment, index)) {
1141 // The sender has been resumed successfully!
1142 // Update the cell state correspondingly,
1143 // expand the buffer, and return the element
1144 // stored in the cell.
1145 // In case a concurrent `expandBuffer()` has delegated
1146 // its completion, the procedure should finish, as the
1147 // sender is resumed. Thus, no further action is required.
1148 segment.setState(index, DONE_RCV)
1149 expandBuffer()
1150 segment.retrieveElement(index)
1151 } else {
1152 // The resumption has failed. Update the cell correspondingly.
1153 // In case a concurrent `expandBuffer()` has delegated
1154 // its completion, the procedure should skip this cell, so
1155 // `expandBuffer()` should be called once again.
1156 segment.setState(index, INTERRUPTED_SEND)
1157 segment.onCancelledRequest(index, false)
1158 if (helpExpandBuffer) expandBuffer()
1159 FAILED
1160 }
1161 }
1162 }
1163 }
1164 }
1165 }
1166
1167 private fun Any.tryResumeSender(segment: ChannelSegment<E>, index: Int): Boolean = when (this) {
1168 is CancellableContinuation<*> -> { // suspended `send(e)` operation
1169 @Suppress("UNCHECKED_CAST")
1170 this as CancellableContinuation<Unit>
1171 tryResume0(Unit)
1172 }
1173 is SelectInstance<*> -> {
1174 this as SelectImplementation<*>
1175 val trySelectResult = trySelectDetailed(clauseObject = this@BufferedChannel, result = Unit)
1176 // Clean the element slot to avoid memory leaks
1177 // if this `select` clause should be re-registered.
1178 if (trySelectResult === REREGISTER) segment.cleanElement(index)
1179 // Was the resumption successful?
1180 trySelectResult === SUCCESSFUL
1181 }
1182 is SendBroadcast -> cont.tryResume0(true) // // suspended `sendBroadcast(e)` operation
1183 else -> error("Unexpected waiter: $this")
1184 }
1185
1186 // ################################
1187 // # The expandBuffer() procedure #
1188 // ################################
1189
1190 private fun expandBuffer() {
1191 // Do not need to take any action if
1192 // this channel is rendezvous or unlimited.
1193 if (isRendezvousOrUnlimited) return
1194 // Read the current segment of
1195 // the `expandBuffer()` procedure.
1196 var segment = bufferEndSegment.value
1197 // Try to expand the buffer until succeed.
1198 try_again@ while (true) {
1199 // Increment the logical end of the buffer.
1200 // The `b`-th cell is going to be added to the buffer.
1201 val b = bufferEnd.getAndIncrement()
1202 val id = b / SEGMENT_SIZE
1203 // After that, read the current `senders` counter.
1204 // In case its value is lower than `b`, the `send(e)`
1205 // invocation that will work with this `b`-th cell
1206 // will detect that the cell is already a part of the
1207 // buffer when comparing with the `bufferEnd` counter.
1208 // However, `bufferEndSegment` may reference an outdated
1209 // segment, which should be updated to avoid memory leaks.
1210 val s = sendersCounter
1211 if (s <= b) {
1212 // Should `bufferEndSegment` be moved forward to avoid memory leaks?
1213 if (segment.id < id && segment.next != null)
1214 moveSegmentBufferEndToSpecifiedOrLast(id, segment)
1215 // Increment the number of completed `expandBuffer()`-s and finish.
1216 incCompletedExpandBufferAttempts()
1217 return
1218 }
1219 // Is `bufferEndSegment` outdated or is the segment with the required id already removed?
1220 // Find the required segment, creating new ones if needed.
1221 if (segment.id != id) {
1222 segment = findSegmentBufferEnd(id, segment, b)
1223 // Restart if the required segment is removed, or
1224 // the linked list of segments is already closed,
1225 // and the required one will never be created.
1226 // Please note that `findSegmentBuffer(..)` updates
1227 // the number of completed `expandBuffer()` attempt
1228 // in this case.
1229 ?: continue@try_again
1230 }
1231 // Try to add the cell to the logical buffer,
1232 // updating the cell state according to the state-machine.
1233 val i = (b % SEGMENT_SIZE).toInt()
1234 if (updateCellExpandBuffer(segment, i, b)) {
1235 // The cell has been added to the logical buffer!
1236 // Increment the number of completed `expandBuffer()`-s and finish.
1237 //
1238 // Note that it is possible to increment the number of
1239 // completed `expandBuffer()` attempts earlier, right
1240 // after the segment is obtained. We find this change
1241 // counter-intuitive and prefer to avoid it.
1242 incCompletedExpandBufferAttempts()
1243 return
1244 } else {
1245 // The cell has not been added to the buffer.
1246 // Increment the number of completed `expandBuffer()`
1247 // attempts and restart.
1248 incCompletedExpandBufferAttempts()
1249 continue@try_again
1250 }
1251 }
1252 }
1253
1254 private fun updateCellExpandBuffer(
1255 /* The working cell is specified by
1256 the segment and the index in it. */
1257 segment: ChannelSegment<E>,
1258 index: Int,
1259 /* The global index of the cell. */
1260 b: Long
1261 ): Boolean {
1262 // This is a fast-path of `updateCellExpandBufferSlow(..)`.
1263 //
1264 // Read the current cell state.
1265 val state = segment.getState(index)
1266 if (state is Waiter) {
1267 // Usually, a sender is stored in the cell.
1268 // However, it is possible for a concurrent
1269 // receiver to be already suspended there.
1270 // Try to distinguish whether the waiter is a
1271 // sender by comparing the global cell index with
1272 // the `receivers` counter. In case the cell is not
1273 // covered by a receiver, a sender is stored in the cell.
1274 if (b >= receivers.value) {
1275 // The cell stores a suspended sender. Try to resume it.
1276 // To synchronize with a concurrent `receive()`, the algorithm
1277 // first moves the cell state to an intermediate `RESUMING_BY_EB`
1278 // state, updating it to either `BUFFERED` (on successful resumption)
1279 // or `INTERRUPTED_SEND` (on failure).
1280 if (segment.casState(index, state, RESUMING_BY_EB)) {
1281 return if (state.tryResumeSender(segment, index)) {
1282 // The sender has been resumed successfully!
1283 // Move the cell to the logical buffer and finish.
1284 segment.setState(index, BUFFERED)
1285 true
1286 } else {
1287 // The resumption has failed.
1288 segment.setState(index, INTERRUPTED_SEND)
1289 segment.onCancelledRequest(index, false)
1290 false
1291 }
1292 }
1293 }
1294 }
1295 return updateCellExpandBufferSlow(segment, index, b)
1296 }
1297
1298 private fun updateCellExpandBufferSlow(
1299 /* The working cell is specified by
1300 the segment and the index in it. */
1301 segment: ChannelSegment<E>,
1302 index: Int,
1303 /* The global index of the cell. */
1304 b: Long
1305 ): Boolean {
1306 // Update the cell state according to its state machine.
1307 // See the paper mentioned in the very beginning for
1308 // the cell life-cycle and the algorithm details.
1309 while (true) {
1310 // Read the current cell state.
1311 val state = segment.getState(index)
1312 when {
1313 // A suspended waiter, sender or receiver.
1314 state is Waiter -> {
1315 // Usually, a sender is stored in the cell.
1316 // However, it is possible for a concurrent
1317 // receiver to be already suspended there.
1318 // Try to distinguish whether the waiter is a
1319 // sender by comparing the global cell index with
1320 // the `receivers` counter. In case the cell is not
1321 // covered by a receiver, a sender is stored in the cell.
1322 if (b < receivers.value) {
1323 // The algorithm cannot distinguish whether the
1324 // suspended in the cell operation is sender or receiver.
1325 // To make progress, `expandBuffer()` delegates its completion
1326 // to an upcoming pairwise request, atomically wrapping
1327 // the waiter in `WaiterEB`. In case a sender is stored
1328 // in the cell, the upcoming receiver will call `expandBuffer()`
1329 // if the sender resumption fails; thus, effectively, skipping
1330 // this cell. Otherwise, if a receiver is stored in the cell,
1331 // this `expandBuffer()` procedure must finish; therefore,
1332 // sender ignore the `WaiterEB` wrapper.
1333 if (segment.casState(index, state, WaiterEB(waiter = state)))
1334 return true
1335 } else {
1336 // The cell stores a suspended sender. Try to resume it.
1337 // To synchronize with a concurrent `receive()`, the algorithm
1338 // first moves the cell state to an intermediate `RESUMING_BY_EB`
1339 // state, updating it to either `BUFFERED` (on successful resumption)
1340 // or `INTERRUPTED_SEND` (on failure).
1341 if (segment.casState(index, state, RESUMING_BY_EB)) {
1342 return if (state.tryResumeSender(segment, index)) {
1343 // The sender has been resumed successfully!
1344 // Move the cell to the logical buffer and finish.
1345 segment.setState(index, BUFFERED)
1346 true
1347 } else {
1348 // The resumption has failed.
1349 segment.setState(index, INTERRUPTED_SEND)
1350 segment.onCancelledRequest(index, false)
1351 false
1352 }
1353 }
1354 }
1355 }
1356 // The cell stores an interrupted sender, skip it.
1357 state === INTERRUPTED_SEND -> return false
1358 // The cell is empty, a concurrent sender is coming.
1359 state === null -> {
1360 // To inform a concurrent sender that this cell is
1361 // already a part of the buffer, the algorithm moves
1362 // it to a special `IN_BUFFER` state.
1363 if (segment.casState(index, state, IN_BUFFER)) return true
1364 }
1365 // The cell is already a part of the buffer, finish.
1366 state === BUFFERED -> return true
1367 // The cell is already processed by a receiver, no further action is required.
1368 state === POISONED || state === DONE_RCV || state === INTERRUPTED_RCV -> return true
1369 // The channel is closed, all the following
1370 // cells are already in the same state, finish.
1371 state === CHANNEL_CLOSED -> return true
1372 // A concurrent receiver is resuming the suspended sender.
1373 // Wait in a spin-loop until it changes the cell state
1374 // to either `DONE_RCV` or `INTERRUPTED_SEND`.
1375 state === RESUMING_BY_RCV -> continue // spin wait
1376 else -> error("Unexpected cell state: $state")
1377 }
1378 }
1379 }
1380
1381 /**
1382 * Increments the counter of completed [expandBuffer] invocations.
1383 * To guarantee starvation-freedom for [waitExpandBufferCompletion],
1384 * which waits until the counters of started and completed [expandBuffer] calls
1385 * coincide and become greater or equal to the specified value,
1386 * [waitExpandBufferCompletion] may set a flag that pauses further progress.
1387 */
1388 private fun incCompletedExpandBufferAttempts(nAttempts: Long = 1) {
1389 // Increment the number of completed `expandBuffer()` calls.
1390 completedExpandBuffersAndPauseFlag.addAndGet(nAttempts).also {
1391 // Should further `expandBuffer()`-s be paused?
1392 // If so, this thread should wait in a spin-loop
1393 // until the flag is unset.
1394 if (it.ebPauseExpandBuffers) {
1395 @Suppress("ControlFlowWithEmptyBody")
1396 while (completedExpandBuffersAndPauseFlag.value.ebPauseExpandBuffers) {}
1397 }
1398 }
1399 }
1400
1401 /**
1402 * Waits in a spin-loop until the [expandBuffer] call that
1403 * should process the [globalIndex]-th cell is completed.
1404 * Essentially, it waits until the numbers of started ([bufferEnd])
1405 * and completed ([completedExpandBuffersAndPauseFlag]) [expandBuffer]
1406 * attempts coincide and become equal or greater than [globalIndex].
1407 * To avoid starvation, this function may set a flag
1408 * that pauses further progress.
1409 */
1410 internal fun waitExpandBufferCompletion(globalIndex: Long) {
1411 // Do nothing if this channel is rendezvous or unlimited;
1412 // `expandBuffer()` is not used in these cases.
1413 if (isRendezvousOrUnlimited) return
1414 // Wait in an infinite loop until the number of started
1415 // buffer expansion calls become not lower than the cell index.
1416 @Suppress("ControlFlowWithEmptyBody")
1417 while (bufferEndCounter <= globalIndex) {}
1418 // Now it is guaranteed that the `expandBuffer()` call that
1419 // should process the required cell has been started.
1420 // Wait in a fixed-size spin-loop until the numbers of
1421 // started and completed buffer expansion calls coincide.
1422 repeat(EXPAND_BUFFER_COMPLETION_WAIT_ITERATIONS) {
1423 // Read the number of started buffer expansion calls.
1424 val b = bufferEndCounter
1425 // Read the number of completed buffer expansion calls.
1426 val ebCompleted = completedExpandBuffersAndPauseFlag.value.ebCompletedCounter
1427 // Do the numbers of started and completed calls coincide?
1428 // Note that we need to re-read the number of started `expandBuffer()`
1429 // calls to obtain a correct snapshot.
1430 // Here we wait to a precise match in order to ensure that **our matching expandBuffer()**
1431 // completed. The only way to ensure that is to check that number of started expands == number of finished expands
1432 if (b == ebCompleted && b == bufferEndCounter) return
1433 }
1434 // To avoid starvation, pause further `expandBuffer()` calls.
1435 completedExpandBuffersAndPauseFlag.update {
1436 constructEBCompletedAndPauseFlag(it.ebCompletedCounter, true)
1437 }
1438 // Now wait in an infinite spin-loop until the counters coincide.
1439 while (true) {
1440 // Read the number of started buffer expansion calls.
1441 val b = bufferEndCounter
1442 // Read the number of completed buffer expansion calls
1443 // along with the flag that pauses further progress.
1444 val ebCompletedAndBit = completedExpandBuffersAndPauseFlag.value
1445 val ebCompleted = ebCompletedAndBit.ebCompletedCounter
1446 val pauseExpandBuffers = ebCompletedAndBit.ebPauseExpandBuffers
1447 // Do the numbers of started and completed calls coincide?
1448 // Note that we need to re-read the number of started `expandBuffer()`
1449 // calls to obtain a correct snapshot.
1450 if (b == ebCompleted && b == bufferEndCounter) {
1451 // Unset the flag, which pauses progress, and finish.
1452 completedExpandBuffersAndPauseFlag.update {
1453 constructEBCompletedAndPauseFlag(it.ebCompletedCounter, false)
1454 }
1455 return
1456 }
1457 // It is possible that a concurrent caller of this function
1458 // has unset the flag, which pauses further progress to avoid
1459 // starvation. In this case, set the flag back.
1460 if (!pauseExpandBuffers) {
1461 completedExpandBuffersAndPauseFlag.compareAndSet(
1462 ebCompletedAndBit,
1463 constructEBCompletedAndPauseFlag(ebCompleted, true)
1464 )
1465 }
1466 }
1467 }
1468
1469
1470 // #######################
1471 // ## Select Expression ##
1472 // #######################
1473
1474 @Suppress("UNCHECKED_CAST")
1475 override val onSend: SelectClause2<E, BufferedChannel<E>>
1476 get() = SelectClause2Impl(
1477 clauseObject = this@BufferedChannel,
1478 regFunc = BufferedChannel<*>::registerSelectForSend as RegistrationFunction,
1479 processResFunc = BufferedChannel<*>::processResultSelectSend as ProcessResultFunction
1480 )
1481
1482 @Suppress("UNCHECKED_CAST")
1483 protected open fun registerSelectForSend(select: SelectInstance<*>, element: Any?) =
1484 sendImpl( // <-- this is an inline function
1485 element = element as E,
1486 waiter = select,
1487 onRendezvousOrBuffered = { select.selectInRegistrationPhase(Unit) },
1488 onSuspend = { _, _ -> },
1489 onClosed = { onClosedSelectOnSend(element, select) }
1490 )
1491
1492
1493 private fun onClosedSelectOnSend(element: E, select: SelectInstance<*>) {
1494 onUndeliveredElement?.callUndeliveredElement(element, select.context)
1495 select.selectInRegistrationPhase(CHANNEL_CLOSED)
1496 }
1497
1498 @Suppress("UNUSED_PARAMETER", "RedundantNullableReturnType")
1499 private fun processResultSelectSend(ignoredParam: Any?, selectResult: Any?): Any? =
1500 if (selectResult === CHANNEL_CLOSED) throw sendException
1501 else this
1502
1503 @Suppress("UNCHECKED_CAST")
1504 override val onReceive: SelectClause1<E>
1505 get() = SelectClause1Impl(
1506 clauseObject = this@BufferedChannel,
1507 regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1508 processResFunc = BufferedChannel<*>::processResultSelectReceive as ProcessResultFunction,
1509 onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1510 )
1511
1512 @Suppress("UNCHECKED_CAST")
1513 override val onReceiveCatching: SelectClause1<ChannelResult<E>>
1514 get() = SelectClause1Impl(
1515 clauseObject = this@BufferedChannel,
1516 regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1517 processResFunc = BufferedChannel<*>::processResultSelectReceiveCatching as ProcessResultFunction,
1518 onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1519 )
1520
1521 @Suppress("OVERRIDE_DEPRECATION", "UNCHECKED_CAST")
1522 override val onReceiveOrNull: SelectClause1<E?>
1523 get() = SelectClause1Impl(
1524 clauseObject = this@BufferedChannel,
1525 regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1526 processResFunc = BufferedChannel<*>::processResultSelectReceiveOrNull as ProcessResultFunction,
1527 onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1528 )
1529
1530 @Suppress("UNUSED_PARAMETER")
1531 private fun registerSelectForReceive(select: SelectInstance<*>, ignoredParam: Any?) =
1532 receiveImpl( // <-- this is an inline function
1533 waiter = select,
1534 onElementRetrieved = { elem -> select.selectInRegistrationPhase(elem) },
1535 onSuspend = { _, _, _ -> },
1536 onClosed = { onClosedSelectOnReceive(select) }
1537 )
1538
1539 private fun onClosedSelectOnReceive(select: SelectInstance<*>) {
1540 select.selectInRegistrationPhase(CHANNEL_CLOSED)
1541 }
1542
1543 @Suppress("UNUSED_PARAMETER")
1544 private fun processResultSelectReceive(ignoredParam: Any?, selectResult: Any?): Any? =
1545 if (selectResult === CHANNEL_CLOSED) throw receiveException
1546 else selectResult
1547
1548 @Suppress("UNUSED_PARAMETER")
1549 private fun processResultSelectReceiveOrNull(ignoredParam: Any?, selectResult: Any?): Any? =
1550 if (selectResult === CHANNEL_CLOSED) {
1551 if (closeCause == null) null
1552 else throw receiveException
1553 } else selectResult
1554
1555 @Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER", "RedundantNullableReturnType")
1556 private fun processResultSelectReceiveCatching(ignoredParam: Any?, selectResult: Any?): Any? =
1557 if (selectResult === CHANNEL_CLOSED) closed(closeCause)
1558 else success(selectResult as E)
1559
1560 @Suppress("UNCHECKED_CAST")
1561 private val onUndeliveredElementReceiveCancellationConstructor: OnCancellationConstructor? = onUndeliveredElement?.let {
1562 { select: SelectInstance<*>, _: Any?, element: Any? ->
1563 { _, _, _ ->
1564 if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(element as E, select.context)
1565 }
1566 }
1567 }
1568
1569 // ######################
1570 // ## Iterator Support ##
1571 // ######################
1572
1573 override fun iterator(): ChannelIterator<E> = BufferedChannelIterator()
1574
1575 /**
1576 * The key idea is that an iterator is a special receiver type,
1577 * which should be resumed differently to [receive] and [onReceive]
1578 * operations, but can be served as a waiter in a way similar to
1579 * [CancellableContinuation] and [SelectInstance].
1580 *
1581 * Roughly, [hasNext] is a [receive] sibling, while [next] simply
1582 * returns the already retrieved element and [hasNext] being idempotent.
1583 * From the implementation side, [receiveResult] stores the element retrieved by [hasNext]
1584 * (or a special [CHANNEL_CLOSED] token if the channel is closed).
1585 *
1586 * The [invoke] function is a [CancelHandler] implementation,
1587 * which requires knowing the [segment] and the [index] in it
1588 * that specify the location of the stored iterator.
1589 *
1590 * To resume the suspended [hasNext] call, a special [tryResumeHasNext]
1591 * function should be used in a way similar to [CancellableContinuation.tryResume]
1592 * and [SelectInstance.trySelect]. When the channel becomes closed,
1593 * [tryResumeHasNextOnClosedChannel] should be used instead.
1594 */
1595 private inner class BufferedChannelIterator : ChannelIterator<E>, Waiter {
1596 /**
1597 * Stores the element retrieved by [hasNext] or
1598 * a special [CHANNEL_CLOSED] token if this channel is closed.
1599 * If [hasNext] has not been invoked yet, [NO_RECEIVE_RESULT] is stored.
1600 */
1601 private var receiveResult: Any? = NO_RECEIVE_RESULT
1602
1603 /**
1604 * When [hasNext] suspends, this field stores the corresponding
1605 * continuation. The [tryResumeHasNext] and [tryResumeHasNextOnClosedChannel]
1606 * function resume this continuation when the [hasNext] invocation should complete.
1607 *
1608 * This property is the subject to bening data race:
1609 * It is nulled-out on both completion and cancellation paths that
1610 * could happen concurrently.
1611 */
1612 @BenignDataRace
1613 private var continuation: CancellableContinuationImpl<Boolean>? = null
1614
1615 // `hasNext()` is just a special receive operation.
1616 override suspend fun hasNext(): Boolean {
1617 return if (this.receiveResult !== NO_RECEIVE_RESULT && this.receiveResult !== CHANNEL_CLOSED) {
1618 true
1619 } else receiveImpl( // <-- this is an inline function
1620 // Do not create a continuation until it is required;
1621 // it is created later via [onNoWaiterSuspend], if needed.
1622 waiter = null,
1623 // Store the received element in `receiveResult` on successful
1624 // retrieval from the buffer or rendezvous with a suspended sender.
1625 // Also, inform the `BufferedChannel` extensions that
1626 // the synchronization of this receive operation is completed.
1627 onElementRetrieved = { element ->
1628 this.receiveResult = element
1629 true
1630 },
1631 // As no waiter is provided, suspension is impossible.
1632 onSuspend = { _, _, _ -> error("unreachable") },
1633 // Return `false` or throw an exception if the channel is already closed.
1634 onClosed = { onClosedHasNext() },
1635 // If `hasNext()` decides to suspend, the corresponding
1636 // `suspend` function that creates a continuation is called.
1637 // The tail-call optimization is applied here.
1638 onNoWaiterSuspend = { segm, i, r -> return hasNextOnNoWaiterSuspend(segm, i, r) }
1639 )
1640 }
1641
1642 private fun onClosedHasNext(): Boolean {
1643 this.receiveResult = CHANNEL_CLOSED
1644 val cause = closeCause ?: return false
1645 throw recoverStackTrace(cause)
1646 }
1647
1648 private suspend fun hasNextOnNoWaiterSuspend(
1649 /* The working cell is specified by
1650 the segment and the index in it. */
1651 segment: ChannelSegment<E>,
1652 index: Int,
1653 /* The global index of the cell. */
1654 r: Long
1655 ): Boolean = suspendCancellableCoroutineReusable { cont ->
1656 this.continuation = cont
1657 receiveImplOnNoWaiter( // <-- this is an inline function
1658 segment = segment, index = index, r = r,
1659 waiter = this, // store this iterator as a waiter
1660 // In case of successful element retrieval, store
1661 // it in `receiveResult` and resume the continuation.
1662 // Importantly, the receiver coroutine may be cancelled
1663 // after it is successfully resumed but not dispatched yet.
1664 // In case `onUndeliveredElement` is present, we must
1665 // invoke it in the latter case.
1666 onElementRetrieved = { element ->
1667 this.receiveResult = element
1668 this.continuation = null
1669 cont.resume(true, onUndeliveredElement?.bindCancellationFun(element))
1670 },
1671 onClosed = { onClosedHasNextNoWaiterSuspend() }
1672 )
1673 }
1674
1675 override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
1676 this.continuation?.invokeOnCancellation(segment, index)
1677 }
1678
1679 private fun onClosedHasNextNoWaiterSuspend() {
1680 // Read the current continuation and clean
1681 // the corresponding field to avoid memory leaks.
1682 val cont = this.continuation!!
1683 this.continuation = null
1684 // Update the `hasNext()` internal result.
1685 this.receiveResult = CHANNEL_CLOSED
1686 // If this channel was closed without exception,
1687 // `hasNext()` should return `false`; otherwise,
1688 // it throws the closing exception.
1689 val cause = closeCause
1690 if (cause == null) {
1691 cont.resume(false)
1692 } else {
1693 cont.resumeWithException(recoverStackTrace(cause, cont))
1694 }
1695 }
1696
1697 @Suppress("UNCHECKED_CAST")
1698 override fun next(): E {
1699 // Read the already received result, or [NO_RECEIVE_RESULT] if [hasNext] has not been invoked yet.
1700 val result = receiveResult
1701 check(result !== NO_RECEIVE_RESULT) { "`hasNext()` has not been invoked" }
1702 receiveResult = NO_RECEIVE_RESULT
1703 // Is this channel closed?
1704 if (result === CHANNEL_CLOSED) throw recoverStackTrace(receiveException)
1705 // Return the element.
1706 return result as E
1707 }
1708
1709 fun tryResumeHasNext(element: E): Boolean {
1710 // Read the current continuation and clean
1711 // the corresponding field to avoid memory leaks.
1712 val cont = this.continuation!!
1713 this.continuation = null
1714 // Store the retrieved element in `receiveResult`.
1715 this.receiveResult = element
1716 // Try to resume this `hasNext()`. Importantly, the receiver coroutine
1717 // may be cancelled after it is successfully resumed but not dispatched yet.
1718 // In case `onUndeliveredElement` is specified, we need to invoke it in the latter case.
1719 return cont.tryResume0(true, onUndeliveredElement?.bindCancellationFun(element))
1720 }
1721
1722 fun tryResumeHasNextOnClosedChannel() {
1723 /*
1724 * Read the current continuation of the suspended `hasNext()` call and clean the corresponding field to avoid memory leaks.
1725 * While this nulling out is unnecessary, it eliminates memory leaks (through the continuation)
1726 * if the channel iterator accidentally remains GC-reachable after the channel is closed.
1727 */
1728 val cont = this.continuation!!
1729 this.continuation = null
1730 // Update the `hasNext()` internal result and inform
1731 // `BufferedChannel` extensions that synchronization
1732 // of this receive operation is completed.
1733 this.receiveResult = CHANNEL_CLOSED
1734 // If this channel was closed without exception,
1735 // `hasNext()` should return `false`; otherwise,
1736 // it throws the closing exception.
1737 val cause = closeCause
1738 if (cause == null) {
1739 cont.resume(false)
1740 } else {
1741 cont.resumeWithException(recoverStackTrace(cause, cont))
1742 }
1743 }
1744 }
1745
1746 // ##############################
1747 // ## Closing and Cancellation ##
1748 // ##############################
1749
1750 /**
1751 * Store the cause of closing this channel, either via [close] or [cancel] call.
1752 * The closing cause can be set only once.
1753 */
1754 private val _closeCause = atomic<Any?>(NO_CLOSE_CAUSE)
1755 // Should be called only if this channel is closed or cancelled.
1756 protected val closeCause get() = _closeCause.value as Throwable?
1757
1758 /** Returns the closing cause if it is non-null, or [ClosedSendChannelException] otherwise. */
1759 protected val sendException get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
1760
1761 /** Returns the closing cause if it is non-null, or [ClosedReceiveChannelException] otherwise. */
1762 private val receiveException get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1763
1764 /**
1765 Stores the closed handler installed by [invokeOnClose].
1766 To synchronize [invokeOnClose] and [close], two additional
1767 marker states, [CLOSE_HANDLER_INVOKED] and [CLOSE_HANDLER_CLOSED]
1768 are used. The resulting state diagram is presented below.
1769
1770 +------+ install handler +---------+ close(..) +---------+
1771 | null |------------------>| handler |------------>| INVOKED |
1772 +------+ +---------+ +---------+
1773 |
1774 | close(..) +--------+
1775 +----------->| CLOSED |
1776 +--------+
1777 */
1778 private val closeHandler = atomic<Any?>(null)
1779
1780 /**
1781 * Invoked when channel is closed as the last action of [close] invocation.
1782 * This method should be idempotent and can be called multiple times.
1783 */
1784 protected open fun onClosedIdempotent() {}
1785
1786 override fun close(cause: Throwable?): Boolean =
1787 closeOrCancelImpl(cause, cancel = false)
1788
1789 @Suppress("OVERRIDE_DEPRECATION")
1790 final override fun cancel(cause: Throwable?): Boolean = cancelImpl(cause)
1791
1792 @Suppress("OVERRIDE_DEPRECATION")
1793 final override fun cancel() { cancelImpl(null) }
1794
1795 final override fun cancel(cause: CancellationException?) { cancelImpl(cause) }
1796
1797 internal open fun cancelImpl(cause: Throwable?): Boolean =
1798 closeOrCancelImpl(cause ?: CancellationException("Channel was cancelled"), cancel = true)
1799
1800 /**
1801 * This is a common implementation for [close] and [cancel]. It first tries
1802 * to install the specified cause; the invocation that successfully installs
1803 * the cause returns `true` as a results of this function, while all further
1804 * [close] and [cancel] calls return `false`.
1805 *
1806 * After the closing/cancellation cause is installed, the channel should be marked
1807 * as closed or cancelled, which bounds further `send(e)`-s to fails.
1808 *
1809 * Then, [completeCloseOrCancel] is called, which cancels waiting `receive()`
1810 * requests ([cancelSuspendedReceiveRequests]) and removes unprocessed elements
1811 * ([removeUnprocessedElements]) in case this channel is cancelled.
1812 *
1813 * Finally, if this [closeOrCancelImpl] has installed the cause, therefore,
1814 * has closed the channel, [closeHandler] and [onClosedIdempotent] should be invoked.
1815 */
1816 protected open fun closeOrCancelImpl(cause: Throwable?, cancel: Boolean): Boolean {
1817 // If this is a `cancel(..)` invocation, set a bit that the cancellation
1818 // has been started. This is crucial for ensuring linearizability,
1819 // when concurrent `close(..)` and `isClosedFor[Send,Receive]` operations
1820 // help this `cancel(..)`.
1821 if (cancel) markCancellationStarted()
1822 // Try to install the specified cause. On success, this invocation will
1823 // return `true` as a result; otherwise, it will complete with `false`.
1824 val closedByThisOperation = _closeCause.compareAndSet(NO_CLOSE_CAUSE, cause)
1825 // Mark this channel as closed or cancelled, depending on this operation type.
1826 if (cancel) markCancelled() else markClosed()
1827 // Complete the closing or cancellation procedure.
1828 completeCloseOrCancel()
1829 // Finally, if this operation has installed the cause,
1830 // it should invoke the close handlers.
1831 return closedByThisOperation.also {
1832 onClosedIdempotent()
1833 if (it) invokeCloseHandler()
1834 }
1835 }
1836
1837 /**
1838 * Invokes the installed close handler,
1839 * updating the [closeHandler] state correspondingly.
1840 */
1841 private fun invokeCloseHandler() {
1842 val closeHandler = closeHandler.getAndUpdate {
1843 if (it === null) {
1844 // Inform concurrent `invokeOnClose`
1845 // that this channel is already closed.
1846 CLOSE_HANDLER_CLOSED
1847 } else {
1848 // Replace the handler with a special
1849 // `INVOKED` marker to avoid memory leaks.
1850 CLOSE_HANDLER_INVOKED
1851 }
1852 } ?: return // no handler was installed, finish.
1853 // Invoke the handler.
1854 @Suppress("UNCHECKED_CAST")
1855 closeHandler as (cause: Throwable?) -> Unit
1856 closeHandler(closeCause)
1857 }
1858
1859 override fun invokeOnClose(handler: (cause: Throwable?) -> Unit) {
1860 // Try to install the handler, finishing on success.
1861 if (closeHandler.compareAndSet(null, handler)) {
1862 // Handler has been successfully set, finish the operation.
1863 return
1864 }
1865 // Either another handler is already set, or this channel is closed.
1866 // In the latter case, the current handler should be invoked.
1867 // However, the implementation must ensure that at most one
1868 // handler is called, throwing an `IllegalStateException`
1869 // if another close handler has been invoked.
1870 closeHandler.loop { cur ->
1871 when {
1872 cur === CLOSE_HANDLER_CLOSED -> {
1873 // Try to update the state from `CLOSED` to `INVOKED`.
1874 // This is crucial to guarantee that at most one handler can be called.
1875 // On success, invoke the handler and finish.
1876 if (closeHandler.compareAndSet(CLOSE_HANDLER_CLOSED, CLOSE_HANDLER_INVOKED)) {
1877 handler(closeCause)
1878 return
1879 }
1880 }
1881 cur === CLOSE_HANDLER_INVOKED -> error("Another handler was already registered and successfully invoked")
1882 else -> error("Another handler is already registered: $cur")
1883 }
1884 }
1885 }
1886
1887 /**
1888 * Marks this channel as closed.
1889 * In case [cancelImpl] has already been invoked,
1890 * and this channel is marked with [CLOSE_STATUS_CANCELLATION_STARTED],
1891 * this function marks the channel as cancelled.
1892 *
1893 * All operation that notice this channel in the closed state,
1894 * must help to complete the closing via [completeCloseOrCancel].
1895 */
1896 private fun markClosed(): Unit =
1897 sendersAndCloseStatus.update { cur ->
1898 when (cur.sendersCloseStatus) {
1899 CLOSE_STATUS_ACTIVE -> // the channel is neither closed nor cancelled
1900 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CLOSED)
1901 CLOSE_STATUS_CANCELLATION_STARTED -> // the channel is going to be cancelled
1902 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLED)
1903 else -> return // the channel is already marked as closed or cancelled.
1904 }
1905 }
1906
1907 /**
1908 * Marks this channel as cancelled.
1909 *
1910 * All operation that notice this channel in the cancelled state,
1911 * must help to complete the cancellation via [completeCloseOrCancel].
1912 */
1913 private fun markCancelled(): Unit =
1914 sendersAndCloseStatus.update { cur ->
1915 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLED)
1916 }
1917
1918 /**
1919 * When the cancellation procedure starts, it is critical
1920 * to mark the closing status correspondingly. Thus, other
1921 * operations, which may help to complete the cancellation,
1922 * always correctly update the status to `CANCELLED`.
1923 */
1924 private fun markCancellationStarted(): Unit =
1925 sendersAndCloseStatus.update { cur ->
1926 if (cur.sendersCloseStatus == CLOSE_STATUS_ACTIVE)
1927 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLATION_STARTED)
1928 else return // this channel is already closed or cancelled
1929 }
1930
1931 /**
1932 * Completes the started [close] or [cancel] procedure.
1933 */
1934 private fun completeCloseOrCancel() {
1935 isClosedForSend // must finish the started close/cancel if one is detected.
1936 }
1937
1938 protected open val isConflatedDropOldest get() = false
1939
1940 /**
1941 * Completes the channel closing procedure.
1942 */
1943 private fun completeClose(sendersCur: Long): ChannelSegment<E> {
1944 // Close the linked list for further segment addition,
1945 // obtaining the last segment in the data structure.
1946 val lastSegment = closeLinkedList()
1947 // In the conflated channel implementation (with the DROP_OLDEST
1948 // elements conflation strategy), it is critical to mark all empty
1949 // cells as closed to prevent in-progress `send(e)`-s, which have not
1950 // put their elements yet, completions after this channel is closed.
1951 // Otherwise, it is possible for a `send(e)` to put an element when
1952 // the buffer is already full, while a concurrent receiver may extract
1953 // the oldest element. When the channel is not closed, we can linearize
1954 // this `receive()` before the `send(e)`, but after the channel is closed,
1955 // `send(e)` must fails. Marking all unprocessed cells as `CLOSED` solves the issue.
1956 if (isConflatedDropOldest) {
1957 val lastBufferedCellGlobalIndex = markAllEmptyCellsAsClosed(lastSegment)
1958 if (lastBufferedCellGlobalIndex != -1L)
1959 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(lastBufferedCellGlobalIndex)
1960 }
1961 // Resume waiting `receive()` requests,
1962 // informing them that the channel is closed.
1963 cancelSuspendedReceiveRequests(lastSegment, sendersCur)
1964 // Return the last segment in the linked list as a result
1965 // of this function; we need it in `completeCancel(..)`.
1966 return lastSegment
1967 }
1968
1969 /**
1970 * Completes the channel cancellation procedure.
1971 */
1972 private fun completeCancel(sendersCur: Long) {
1973 // First, ensure that this channel is closed,
1974 // obtaining the last segment in the linked list.
1975 val lastSegment = completeClose(sendersCur)
1976 // Cancel suspended `send(e)` requests and
1977 // remove buffered elements in the reverse order.
1978 removeUnprocessedElements(lastSegment)
1979 }
1980
1981 /**
1982 * Closes the underlying linked list of segments for further segment addition.
1983 */
1984 private fun closeLinkedList(): ChannelSegment<E> {
1985 // Choose the last segment.
1986 var lastSegment = bufferEndSegment.value
1987 sendSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
1988 receiveSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
1989 // Close the linked list of segment for new segment addition
1990 // and return the last segment in the linked list.
1991 return lastSegment.close()
1992 }
1993
1994 /**
1995 * This function marks all empty cells, in the `null` and [IN_BUFFER] state,
1996 * as closed. Notably, it processes the cells from right to left, and finishes
1997 * immediately when the processing cell is already covered by `receive()` or
1998 * contains a buffered elements ([BUFFERED] state).
1999 *
2000 * This function returns the global index of the last buffered element,
2001 * or `-1` if this channel does not contain buffered elements.
2002 */
2003 private fun markAllEmptyCellsAsClosed(lastSegment: ChannelSegment<E>): Long {
2004 // Process the cells in reverse order, from right to left.
2005 var segment = lastSegment
2006 while (true) {
2007 for (index in SEGMENT_SIZE - 1 downTo 0) {
2008 // Is this cell already covered by `receive()`?
2009 val globalIndex = segment.id * SEGMENT_SIZE + index
2010 if (globalIndex < receiversCounter) return -1
2011 // Process the cell `segment[index]`.
2012 cell_update@ while (true) {
2013 val state = segment.getState(index)
2014 when {
2015 // The cell is empty.
2016 state === null || state === IN_BUFFER -> {
2017 // Inform a possibly upcoming sender that this channel is already closed.
2018 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2019 segment.onSlotCleaned()
2020 break@cell_update
2021 }
2022 }
2023 // The cell stores a buffered element.
2024 state === BUFFERED -> return globalIndex
2025 // Skip this cell if it is not empty and does not store a buffered element.
2026 else -> break@cell_update
2027 }
2028 }
2029 }
2030 // Process the next segment, finishing if the linked list ends.
2031 segment = segment.prev ?: return -1
2032 }
2033 }
2034
2035 /**
2036 * Cancels suspended `send(e)` requests and removes buffered elements
2037 * starting from the last cell in the specified [lastSegment] (it must
2038 * be the physical tail of the underlying linked list) and updating
2039 * the cells in reverse order.
2040 */
2041 private fun removeUnprocessedElements(lastSegment: ChannelSegment<E>) {
2042 // Read the `onUndeliveredElement` lambda at once. In case it
2043 // throws an exception, this exception is handled and stored in
2044 // the variable below. If multiple exceptions are thrown, the first
2045 // one is stored in the variable, while the others are suppressed.
2046 val onUndeliveredElement = onUndeliveredElement
2047 var undeliveredElementException: UndeliveredElementException? = null // first cancel exception, others suppressed
2048 // To perform synchronization correctly, it is critical to
2049 // process the cells in reverse order, from right to left.
2050 // However, according to the API, suspended senders should
2051 // be cancelled in the order of their suspension. Therefore,
2052 // we need to collect all of them and cancel in the reverse
2053 // order after that.
2054 var suspendedSenders = InlineList<Waiter>()
2055 var segment = lastSegment
2056 process_segments@ while (true) {
2057 for (index in SEGMENT_SIZE - 1 downTo 0) {
2058 // Process the cell `segment[index]`.
2059 val globalIndex = segment.id * SEGMENT_SIZE + index
2060 // Update the cell state.
2061 update_cell@ while (true) {
2062 // Read the current state of the cell.
2063 val state = segment.getState(index)
2064 when {
2065 // The cell is already processed by a receiver.
2066 state === DONE_RCV -> break@process_segments
2067 // The cell stores a buffered element.
2068 state === BUFFERED -> {
2069 // Is the cell already covered by a receiver?
2070 if (globalIndex < receiversCounter) break@process_segments
2071 // Update the cell state to `CHANNEL_CLOSED`.
2072 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2073 // If `onUndeliveredElement` lambda is non-null, call it.
2074 if (onUndeliveredElement != null) {
2075 val element = segment.getElement(index)
2076 undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(element, undeliveredElementException)
2077 }
2078 // Clean the element field and inform the segment
2079 // that the slot is cleaned to avoid memory leaks.
2080 segment.cleanElement(index)
2081 segment.onSlotCleaned()
2082 break@update_cell
2083 }
2084 }
2085 // The cell is empty.
2086 state === IN_BUFFER || state === null -> {
2087 // Update the cell state to `CHANNEL_CLOSED`.
2088 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2089 // Inform the segment that the slot is cleaned to avoid memory leaks.
2090 segment.onSlotCleaned()
2091 break@update_cell
2092 }
2093 }
2094 // The cell stores a suspended waiter.
2095 state is Waiter || state is WaiterEB -> {
2096 // Is the cell already covered by a receiver?
2097 if (globalIndex < receiversCounter) break@process_segments
2098 // Obtain the sender.
2099 val sender: Waiter = if (state is WaiterEB) state.waiter
2100 else state as Waiter
2101 // Update the cell state to `CHANNEL_CLOSED`.
2102 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2103 // If `onUndeliveredElement` lambda is non-null, call it.
2104 if (onUndeliveredElement != null) {
2105 val element = segment.getElement(index)
2106 undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(element, undeliveredElementException)
2107 }
2108 // Save the sender for further cancellation.
2109 suspendedSenders += sender
2110 // Clean the element field and inform the segment
2111 // that the slot is cleaned to avoid memory leaks.
2112 segment.cleanElement(index)
2113 segment.onSlotCleaned()
2114 break@update_cell
2115 }
2116 }
2117 // A concurrent receiver is resuming a suspended sender.
2118 // As the cell is covered by a receiver, finish immediately.
2119 state === RESUMING_BY_EB || state === RESUMING_BY_RCV -> break@process_segments
2120 // A concurrent `expandBuffer()` is resuming a suspended sender.
2121 // Wait in a spin-loop until the cell state changes.
2122 state === RESUMING_BY_EB -> continue@update_cell
2123 else -> break@update_cell
2124 }
2125 }
2126 }
2127 // Process the previous segment.
2128 segment = segment.prev ?: break
2129 }
2130 // Cancel suspended senders in their order of addition to this channel.
2131 suspendedSenders.forEachReversed { it.resumeSenderOnCancelledChannel() }
2132 // Throw `UndeliveredElementException` at the end if there was one.
2133 undeliveredElementException?.let { throw it }
2134 }
2135
2136 /**
2137 * Cancels suspended `receive` requests from the end to the beginning,
2138 * also moving empty cells to the `CHANNEL_CLOSED` state.
2139 */
2140 private fun cancelSuspendedReceiveRequests(lastSegment: ChannelSegment<E>, sendersCounter: Long) {
2141 // To perform synchronization correctly, it is critical to
2142 // extract suspended requests in the reverse order,
2143 // from the end to the beginning.
2144 // However, according to the API, they should be cancelled
2145 // in the order of their suspension. Therefore, we need to
2146 // collect the suspended requests first, cancelling them
2147 // in the reverse order after that.
2148 var suspendedReceivers = InlineList<Waiter>()
2149 var segment: ChannelSegment<E>? = lastSegment
2150 process_segments@ while (segment != null) {
2151 for (index in SEGMENT_SIZE - 1 downTo 0) {
2152 // Is the cell already covered by a sender? Finish immediately in this case.
2153 if (segment.id * SEGMENT_SIZE + index < sendersCounter) break@process_segments
2154 // Try to move the cell state to `CHANNEL_CLOSED`.
2155 cell_update@ while (true) {
2156 val state = segment.getState(index)
2157 when {
2158 state === null || state === IN_BUFFER -> {
2159 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2160 segment.onSlotCleaned()
2161 break@cell_update
2162 }
2163 }
2164 state is WaiterEB -> {
2165 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2166 suspendedReceivers += state.waiter // save for cancellation.
2167 segment.onCancelledRequest(index = index, receiver = true)
2168 break@cell_update
2169 }
2170 }
2171 state is Waiter -> {
2172 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2173 suspendedReceivers += state // save for cancellation.
2174 segment.onCancelledRequest(index = index, receiver = true)
2175 break@cell_update
2176 }
2177 }
2178 else -> break@cell_update // nothing to cancel.
2179 }
2180 }
2181 }
2182 // Process the previous segment.
2183 segment = segment.prev
2184 }
2185 // Cancel the suspended requests in their order of addition to this channel.
2186 suspendedReceivers.forEachReversed { it.resumeReceiverOnClosedChannel() }
2187 }
2188
2189 /**
2190 * Resumes this receiver because this channel is closed.
2191 * This function does not take any effect if the operation has already been resumed or cancelled.
2192 */
2193 private fun Waiter.resumeReceiverOnClosedChannel() = resumeWaiterOnClosedChannel(receiver = true)
2194
2195 /**
2196 * Resumes this sender because this channel is cancelled.
2197 * This function does not take any effect if the operation has already been resumed or cancelled.
2198 */
2199 private fun Waiter.resumeSenderOnCancelledChannel() = resumeWaiterOnClosedChannel(receiver = false)
2200
2201 private fun Waiter.resumeWaiterOnClosedChannel(receiver: Boolean) {
2202 when (this) {
2203 is SendBroadcast -> cont.resume(false)
2204 is CancellableContinuation<*> -> resumeWithException(if (receiver) receiveException else sendException)
2205 is ReceiveCatching<*> -> cont.resume(closed(closeCause))
2206 is BufferedChannel<*>.BufferedChannelIterator -> tryResumeHasNextOnClosedChannel()
2207 is SelectInstance<*> -> trySelect(this@BufferedChannel, CHANNEL_CLOSED)
2208 else -> error("Unexpected waiter: $this")
2209 }
2210 }
2211
2212 @ExperimentalCoroutinesApi
2213 override val isClosedForSend: Boolean
2214 get() = sendersAndCloseStatus.value.isClosedForSend0
2215
2216 private val Long.isClosedForSend0 get() =
2217 isClosed(this, isClosedForReceive = false)
2218
2219 @ExperimentalCoroutinesApi
2220 override val isClosedForReceive: Boolean
2221 get() = sendersAndCloseStatus.value.isClosedForReceive0
2222
2223 private val Long.isClosedForReceive0 get() =
2224 isClosed(this, isClosedForReceive = true)
2225
2226 private fun isClosed(
2227 sendersAndCloseStatusCur: Long,
2228 isClosedForReceive: Boolean
2229 ) = when (sendersAndCloseStatusCur.sendersCloseStatus) {
2230 // This channel is active and has not been closed.
2231 CLOSE_STATUS_ACTIVE -> false
2232 // The cancellation procedure has been started but
2233 // not linearized yet, so this channel should be
2234 // considered as active.
2235 CLOSE_STATUS_CANCELLATION_STARTED -> false
2236 // This channel has been successfully closed.
2237 // Help to complete the closing procedure to
2238 // guarantee linearizability, and return `true`
2239 // for senders or the flag whether there still
2240 // exist elements to retrieve for receivers.
2241 CLOSE_STATUS_CLOSED -> {
2242 completeClose(sendersAndCloseStatusCur.sendersCounter)
2243 // When `isClosedForReceive` is `false`, always return `true`.
2244 // Otherwise, it is possible that the channel is closed but
2245 // still has elements to retrieve.
2246 if (isClosedForReceive) !hasElements() else true
2247 }
2248 // This channel has been successfully cancelled.
2249 // Help to complete the cancellation procedure to
2250 // guarantee linearizability and return `true`.
2251 CLOSE_STATUS_CANCELLED -> {
2252 completeCancel(sendersAndCloseStatusCur.sendersCounter)
2253 true
2254 }
2255 else -> error("unexpected close status: ${sendersAndCloseStatusCur.sendersCloseStatus}")
2256 }
2257
2258 @ExperimentalCoroutinesApi
2259 override val isEmpty: Boolean get() {
2260 // This function should return `false` if
2261 // this channel is closed for `receive`.
2262 if (isClosedForReceive) return false
2263 // Does this channel has elements to retrieve?
2264 if (hasElements()) return false
2265 // This channel does not have elements to retrieve;
2266 // Check that it is still not closed for `receive`.
2267 return !isClosedForReceive
2268 }
2269
2270 /**
2271 * Checks whether this channel contains elements to retrieve.
2272 * Unfortunately, simply comparing the counters is insufficient,
2273 * as some cells can be in the `INTERRUPTED` state due to cancellation.
2274 * This function tries to find the first "alive" element,
2275 * updating the `receivers` counter to skip empty cells.
2276 *
2277 * The implementation is similar to `receive()`.
2278 */
2279 internal fun hasElements(): Boolean {
2280 while (true) {
2281 // Read the segment before obtaining the `receivers` counter value.
2282 var segment = receiveSegment.value
2283 // Obtains the `receivers` and `senders` counter values.
2284 val r = receiversCounter
2285 val s = sendersCounter
2286 // Is there a chance that this channel has elements?
2287 if (s <= r) return false // no elements
2288 // The `r`-th cell is covered by a sender; check whether it contains an element.
2289 // First, try to find the required segment if the initially
2290 // obtained segment (in the beginning of this function) has lower id.
2291 val id = r / SEGMENT_SIZE
2292 if (segment.id != id) {
2293 // Try to find the required segment.
2294 segment = findSegmentReceive(id, segment) ?:
2295 // The required segment has not been found. Either it has already
2296 // been removed, or the underlying linked list is already closed
2297 // for segment additions. In the latter case, the channel is closed
2298 // and does not contain elements, so this operation returns `false`.
2299 // Otherwise, if the required segment is removed, the operation restarts.
2300 if (receiveSegment.value.id < id) return false else continue
2301 }
2302 segment.cleanPrev() // all the previous segments are no longer needed.
2303 // Does the `r`-th cell contain waiting sender or buffered element?
2304 val i = (r % SEGMENT_SIZE).toInt()
2305 if (isCellNonEmpty(segment, i, r)) return true
2306 // The cell is empty. Update `receivers` counter and try again.
2307 receivers.compareAndSet(r, r + 1) // if this CAS fails, the counter has already been updated.
2308 }
2309 }
2310
2311 /**
2312 * Checks whether this cell contains a buffered element or a waiting sender,
2313 * returning `true` in this case. Otherwise, if this cell is empty
2314 * (due to waiter cancellation, cell poisoning, or channel closing),
2315 * this function returns `false`.
2316 *
2317 * Notably, this function must be called only if the cell is covered by a sender.
2318 */
2319 private fun isCellNonEmpty(
2320 segment: ChannelSegment<E>,
2321 index: Int,
2322 globalIndex: Long
2323 ): Boolean {
2324 // The logic is similar to `updateCellReceive` with the only difference
2325 // that this function neither changes the cell state nor retrieves the element.
2326 while (true) {
2327 // Read the current cell state.
2328 val state = segment.getState(index)
2329 when {
2330 // The cell is empty but a sender is coming.
2331 state === null || state === IN_BUFFER -> {
2332 // Poison the cell to ensure correctness.
2333 if (segment.casState(index, state, POISONED)) {
2334 // When the cell becomes poisoned, it is essentially
2335 // the same as storing an already cancelled receiver.
2336 // Thus, the `expandBuffer()` procedure should be invoked.
2337 expandBuffer()
2338 return false
2339 }
2340 }
2341 // The cell stores a buffered element.
2342 state === BUFFERED -> return true
2343 // The cell stores an interrupted sender.
2344 state === INTERRUPTED_SEND -> return false
2345 // This channel is already closed.
2346 state === CHANNEL_CLOSED -> return false
2347 // The cell is already processed
2348 // by a concurrent receiver.
2349 state === DONE_RCV -> return false
2350 // The cell is already poisoned
2351 // by a concurrent receiver.
2352 state === POISONED -> return false
2353 // A concurrent `expandBuffer()` is resuming
2354 // a suspended sender. This function is eligible
2355 // to linearize before the buffer expansion procedure.
2356 state === RESUMING_BY_EB -> return true
2357 // A concurrent receiver is resuming
2358 // a suspended sender. The element
2359 // is no longer available for retrieval.
2360 state === RESUMING_BY_RCV -> return false
2361 // The cell stores a suspended request.
2362 // However, it is possible that this request
2363 // is receiver if the cell is covered by both
2364 // send and receive operations.
2365 // In case the cell is already covered by
2366 // a receiver, the element is no longer
2367 // available for retrieval, and this function
2368 // return `false`. Otherwise, it is guaranteed
2369 // that the suspended request is sender, so
2370 // this function returns `true`.
2371 else -> return globalIndex == receiversCounter
2372 }
2373 }
2374 }
2375
2376 // #######################
2377 // # Segments Management #
2378 // #######################
2379
2380 /**
2381 * Finds the segment with the specified [id] starting by the [startFrom]
2382 * segment and following the [ChannelSegment.next] references. In case
2383 * the required segment has not been created yet, this function attempts
2384 * to add it to the underlying linked list. Finally, it updates [sendSegment]
2385 * to the found segment if its [ChannelSegment.id] is greater than the one
2386 * of the already stored segment.
2387 *
2388 * In case the requested segment is already removed, or if it should be allocated
2389 * but the linked list structure is closed for new segments addition, this function
2390 * returns `null`. The implementation also efficiently skips a sequence of removed
2391 * segments, updating the counter value in [sendersAndCloseStatus] correspondingly.
2392 */
2393 private fun findSegmentSend(id: Long, startFrom: ChannelSegment<E>): ChannelSegment<E>? {
2394 return sendSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2395 if (it.isClosed) {
2396 // The required segment has not been found and new segments
2397 // cannot be added, as the linked listed in already added.
2398 // This channel is already closed or cancelled; help to complete
2399 // the closing or cancellation procedure.
2400 completeCloseOrCancel()
2401 // Clean the `prev` reference of the provided segment
2402 // if all the previous cells are already covered by senders.
2403 // It is important to clean the `prev` reference only in
2404 // this case, as the closing/cancellation procedure may
2405 // need correct value to traverse the linked list from right to left.
2406 if (startFrom.id * SEGMENT_SIZE < receiversCounter) startFrom.cleanPrev()
2407 // As the required segment is not found and cannot be allocated, return `null`.
2408 null
2409 } else {
2410 // Get the found segment.
2411 val segment = it.segment
2412 // Is the required segment removed?
2413 if (segment.id > id) {
2414 // The required segment has been removed; `segment` is the first
2415 // segment with `id` not lower than the required one.
2416 // Skip the sequence of removed cells in O(1).
2417 updateSendersCounterIfLower(segment.id * SEGMENT_SIZE)
2418 // Clean the `prev` reference of the provided segment
2419 // if all the previous cells are already covered by senders.
2420 // It is important to clean the `prev` reference only in
2421 // this case, as the closing/cancellation procedure may
2422 // need correct value to traverse the linked list from right to left.
2423 if (segment.id * SEGMENT_SIZE < receiversCounter) segment.cleanPrev()
2424 // As the required segment is not found and cannot be allocated, return `null`.
2425 null
2426 } else {
2427 assert { segment.id == id }
2428 // The required segment has been found; return it!
2429 segment
2430 }
2431 }
2432 }
2433 }
2434
2435 /**
2436 * Finds the segment with the specified [id] starting by the [startFrom]
2437 * segment and following the [ChannelSegment.next] references. In case
2438 * the required segment has not been created yet, this function attempts
2439 * to add it to the underlying linked list. Finally, it updates [receiveSegment]
2440 * to the found segment if its [ChannelSegment.id] is greater than the one
2441 * of the already stored segment.
2442 *
2443 * In case the requested segment is already removed, or if it should be allocated
2444 * but the linked list structure is closed for new segments addition, this function
2445 * returns `null`. The implementation also efficiently skips a sequence of removed
2446 * segments, updating the [receivers] counter correspondingly.
2447 */
2448 private fun findSegmentReceive(id: Long, startFrom: ChannelSegment<E>): ChannelSegment<E>? =
2449 receiveSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2450 if (it.isClosed) {
2451 // The required segment has not been found and new segments
2452 // cannot be added, as the linked listed in already added.
2453 // This channel is already closed or cancelled; help to complete
2454 // the closing or cancellation procedure.
2455 completeCloseOrCancel()
2456 // Clean the `prev` reference of the provided segment
2457 // if all the previous cells are already covered by senders.
2458 // It is important to clean the `prev` reference only in
2459 // this case, as the closing/cancellation procedure may
2460 // need correct value to traverse the linked list from right to left.
2461 if (startFrom.id * SEGMENT_SIZE < sendersCounter) startFrom.cleanPrev()
2462 // As the required segment is not found and cannot be allocated, return `null`.
2463 null
2464 } else {
2465 // Get the found segment.
2466 val segment = it.segment
2467 // Advance the `bufferEnd` segment if required.
2468 if (!isRendezvousOrUnlimited && id <= bufferEndCounter / SEGMENT_SIZE) {
2469 bufferEndSegment.moveForward(segment)
2470 }
2471 // Is the required segment removed?
2472 if (segment.id > id) {
2473 // The required segment has been removed; `segment` is the first
2474 // segment with `id` not lower than the required one.
2475 // Skip the sequence of removed cells in O(1).
2476 updateReceiversCounterIfLower(segment.id * SEGMENT_SIZE)
2477 // Clean the `prev` reference of the provided segment
2478 // if all the previous cells are already covered by senders.
2479 // It is important to clean the `prev` reference only in
2480 // this case, as the closing/cancellation procedure may
2481 // need correct value to traverse the linked list from right to left.
2482 if (segment.id * SEGMENT_SIZE < sendersCounter) segment.cleanPrev()
2483 // As the required segment is already removed, return `null`.
2484 null
2485 } else {
2486 assert { segment.id == id }
2487 // The required segment has been found; return it!
2488 segment
2489 }
2490 }
2491 }
2492
2493 /**
2494 * Importantly, when this function does not find the requested segment,
2495 * it always updates the number of completed `expandBuffer()` attempts.
2496 */
2497 private fun findSegmentBufferEnd(id: Long, startFrom: ChannelSegment<E>, currentBufferEndCounter: Long): ChannelSegment<E>? =
2498 bufferEndSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2499 if (it.isClosed) {
2500 // The required segment has not been found and new segments
2501 // cannot be added, as the linked listed in already added.
2502 // This channel is already closed or cancelled; help to complete
2503 // the closing or cancellation procedure.
2504 completeCloseOrCancel()
2505 // Update `bufferEndSegment` to the last segment
2506 // in the linked list to avoid memory leaks.
2507 moveSegmentBufferEndToSpecifiedOrLast(id, startFrom)
2508 // When this function does not find the requested segment,
2509 // it should update the number of completed `expandBuffer()` attempts.
2510 incCompletedExpandBufferAttempts()
2511 null
2512 } else {
2513 // Get the found segment.
2514 val segment = it.segment
2515 // Is the required segment removed?
2516 if (segment.id > id) {
2517 // The required segment has been removed; `segment` is the first segment
2518 // with `id` not lower than the required one.
2519 // Try to skip the sequence of removed cells in O(1) by increasing the `bufferEnd` counter.
2520 // Importantly, when this function does not find the requested segment,
2521 // it should update the number of completed `expandBuffer()` attempts.
2522 if (bufferEnd.compareAndSet(currentBufferEndCounter + 1, segment.id * SEGMENT_SIZE)) {
2523 incCompletedExpandBufferAttempts(segment.id * SEGMENT_SIZE - currentBufferEndCounter)
2524 } else {
2525 incCompletedExpandBufferAttempts()
2526 }
2527 // As the required segment is already removed, return `null`.
2528 null
2529 } else {
2530 assert { segment.id == id }
2531 // The required segment has been found; return it!
2532 segment
2533 }
2534 }
2535 }
2536
2537 /**
2538 * Updates [bufferEndSegment] to the one with the specified [id] or
2539 * to the last existing segment, if the required segment is not yet created.
2540 *
2541 * Unlike [findSegmentBufferEnd], this function does not allocate new segments.
2542 */
2543 private fun moveSegmentBufferEndToSpecifiedOrLast(id: Long, startFrom: ChannelSegment<E>) {
2544 // Start searching the required segment from the specified one.
2545 var segment: ChannelSegment<E> = startFrom
2546 while (segment.id < id) {
2547 segment = segment.next ?: break
2548 }
2549 // Skip all removed segments and try to update `bufferEndSegment`
2550 // to the first non-removed one. This part should succeed eventually,
2551 // as the tail segment is never removed.
2552 while (true) {
2553 while (segment.isRemoved) {
2554 segment = segment.next ?: break
2555 }
2556 // Try to update `bufferEndSegment`. On failure,
2557 // the found segment is already removed, so it
2558 // should be skipped.
2559 if (bufferEndSegment.moveForward(segment)) return
2560 }
2561 }
2562
2563 /**
2564 * Updates the `senders` counter if its value
2565 * is lower that the specified one.
2566 *
2567 * Senders use this function to efficiently skip
2568 * a sequence of cancelled receivers.
2569 */
2570 private fun updateSendersCounterIfLower(value: Long): Unit =
2571 sendersAndCloseStatus.loop { cur ->
2572 val curCounter = cur.sendersCounter
2573 if (curCounter >= value) return
2574 val update = constructSendersAndCloseStatus(curCounter, cur.sendersCloseStatus)
2575 if (sendersAndCloseStatus.compareAndSet(cur, update)) return
2576 }
2577
2578 /**
2579 * Updates the `receivers` counter if its value
2580 * is lower that the specified one.
2581 *
2582 * Receivers use this function to efficiently skip
2583 * a sequence of cancelled senders.
2584 */
2585 private fun updateReceiversCounterIfLower(value: Long): Unit =
2586 receivers.loop { cur ->
2587 if (cur >= value) return
2588 if (receivers.compareAndSet(cur, value)) return
2589 }
2590
2591 // ###################
2592 // # Debug Functions #
2593 // ###################
2594
2595 @Suppress("ConvertTwoComparisonsToRangeCheck")
2596 override fun toString(): String {
2597 val sb = StringBuilder()
2598 // Append the close status
2599 when (sendersAndCloseStatus.value.sendersCloseStatus) {
2600 CLOSE_STATUS_CLOSED -> sb.append("closed,")
2601 CLOSE_STATUS_CANCELLED -> sb.append("cancelled,")
2602 }
2603 // Append the buffer capacity
2604 sb.append("capacity=$capacity,")
2605 // Append the data
2606 sb.append("data=[")
2607 val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2608 .filter { it !== NULL_SEGMENT }
2609 .minBy { it.id }
2610 val r = receiversCounter
2611 val s = sendersCounter
2612 var segment = firstSegment
2613 append_elements@ while (true) {
2614 process_cell@ for (i in 0 until SEGMENT_SIZE) {
2615 val globalCellIndex = segment.id * SEGMENT_SIZE + i
2616 if (globalCellIndex >= s && globalCellIndex >= r) break@append_elements
2617 val cellState = segment.getState(i)
2618 val element = segment.getElement(i)
2619 val cellStateString = when (cellState) {
2620 is CancellableContinuation<*> -> {
2621 when {
2622 globalCellIndex < r && globalCellIndex >= s -> "receive"
2623 globalCellIndex < s && globalCellIndex >= r -> "send"
2624 else -> "cont"
2625 }
2626 }
2627 is SelectInstance<*> -> {
2628 when {
2629 globalCellIndex < r && globalCellIndex >= s -> "onReceive"
2630 globalCellIndex < s && globalCellIndex >= r -> "onSend"
2631 else -> "select"
2632 }
2633 }
2634 is ReceiveCatching<*> -> "receiveCatching"
2635 is SendBroadcast -> "sendBroadcast"
2636 is WaiterEB -> "EB($cellState)"
2637 RESUMING_BY_RCV, RESUMING_BY_EB -> "resuming_sender"
2638 null, IN_BUFFER, DONE_RCV, POISONED, INTERRUPTED_RCV, INTERRUPTED_SEND, CHANNEL_CLOSED -> continue@process_cell
2639 else -> cellState.toString() // leave it just in case something is missed.
2640 }
2641 if (element != null) {
2642 sb.append("($cellStateString,$element),")
2643 } else {
2644 sb.append("$cellStateString,")
2645 }
2646 }
2647 // Process the next segment if exists.
2648 segment = segment.next ?: break
2649 }
2650 if (sb.last() == ',') sb.deleteAt(sb.length - 1)
2651 sb.append("]")
2652 // The string representation is constructed.
2653 return sb.toString()
2654 }
2655
2656 // Returns a debug representation of this channel,
2657 // which is actively used in Lincheck tests.
2658 internal fun toStringDebug(): String {
2659 val sb = StringBuilder()
2660 // Append the counter values and the close status
2661 sb.append("S=${sendersCounter},R=${receiversCounter},B=${bufferEndCounter},B'=${completedExpandBuffersAndPauseFlag.value},C=${sendersAndCloseStatus.value.sendersCloseStatus},")
2662 when (sendersAndCloseStatus.value.sendersCloseStatus) {
2663 CLOSE_STATUS_CANCELLATION_STARTED -> sb.append("CANCELLATION_STARTED,")
2664 CLOSE_STATUS_CLOSED -> sb.append("CLOSED,")
2665 CLOSE_STATUS_CANCELLED -> sb.append("CANCELLED,")
2666 }
2667 // Append the segment references
2668 sb.append("SEND_SEGM=${sendSegment.value.hexAddress},RCV_SEGM=${receiveSegment.value.hexAddress}")
2669 if (!isRendezvousOrUnlimited) sb.append(",EB_SEGM=${bufferEndSegment.value.hexAddress}")
2670 sb.append(" ") // add some space
2671 // Append the linked list of segments.
2672 val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2673 .filter { it !== NULL_SEGMENT }
2674 .minBy { it.id }
2675 var segment = firstSegment
2676 while (true) {
2677 sb.append("${segment.hexAddress}=[${if (segment.isRemoved) "*" else ""}${segment.id},prev=${segment.prev?.hexAddress},")
2678 repeat(SEGMENT_SIZE) { i ->
2679 val cellState = segment.getState(i)
2680 val element = segment.getElement(i)
2681 val cellStateString = when (cellState) {
2682 is CancellableContinuation<*> -> "cont"
2683 is SelectInstance<*> -> "select"
2684 is ReceiveCatching<*> -> "receiveCatching"
2685 is SendBroadcast -> "send(broadcast)"
2686 is WaiterEB -> "EB($cellState)"
2687 else -> cellState.toString()
2688 }
2689 sb.append("[$i]=($cellStateString,$element),")
2690 }
2691 sb.append("next=${segment.next?.hexAddress}] ")
2692 // Process the next segment if exists.
2693 segment = segment.next ?: break
2694 }
2695 // The string representation of this channel is now constructed!
2696 return sb.toString()
2697 }
2698
2699
2700 // This is an internal methods for tests.
2701 fun checkSegmentStructureInvariants() {
2702 if (isRendezvousOrUnlimited) {
2703 check(bufferEndSegment.value === NULL_SEGMENT) {
2704 "bufferEndSegment must be NULL_SEGMENT for rendezvous and unlimited channels; they do not manipulate it.\n" +
2705 "Channel state: $this"
2706 }
2707 } else {
2708 check(receiveSegment.value.id <= bufferEndSegment.value.id) {
2709 "bufferEndSegment should not have lower id than receiveSegment.\n" +
2710 "Channel state: $this"
2711 }
2712 }
2713 val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2714 .filter { it !== NULL_SEGMENT }
2715 .minBy { it.id }
2716 check(firstSegment.prev == null) {
2717 "All processed segments should be unreachable from the data structure, but the `prev` link of the leftmost segment is non-null.\n" +
2718 "Channel state: $this"
2719 }
2720 // Check that the doubly-linked list of segments does not
2721 // contain full-of-cancelled-cells segments.
2722 var segment = firstSegment
2723 while (segment.next != null) {
2724 // Note that the `prev` reference can be `null` if this channel is closed.
2725 check(segment.next!!.prev == null || segment.next!!.prev === segment) {
2726 "The `segment.next.prev === segment` invariant is violated.\n" +
2727 "Channel state: $this"
2728 }
2729 // Count the number of closed/interrupted cells
2730 // and check that all cells are in expected states.
2731 var interruptedOrClosedCells = 0
2732 for (i in 0 until SEGMENT_SIZE) {
2733 when (val state = segment.getState(i)) {
2734 BUFFERED -> {} // The cell stores a buffered element.
2735 is Waiter -> {} // The cell stores a suspended request.
2736 INTERRUPTED_RCV, INTERRUPTED_SEND, CHANNEL_CLOSED -> {
2737 // The cell stored an interrupted request or indicates
2738 // that this channel is already closed.
2739 // Check that the element slot is cleaned and increment
2740 // the number of cells in closed/interrupted state.
2741 check(segment.getElement(i) == null)
2742 interruptedOrClosedCells++
2743 }
2744 POISONED, DONE_RCV -> {
2745 // The cell is successfully processed or poisoned.
2746 // Check that the element slot is cleaned.
2747 check(segment.getElement(i) == null)
2748 }
2749 // Other states are illegal after all running operations finish.
2750 else -> error("Unexpected segment cell state: $state.\nChannel state: $this")
2751 }
2752 }
2753 // Is this segment full of cancelled/closed cells?
2754 // If so, this segment should be removed from the
2755 // linked list if nether `receiveSegment`, nor
2756 // `sendSegment`, nor `bufferEndSegment` reference it.
2757 if (interruptedOrClosedCells == SEGMENT_SIZE) {
2758 check(segment === receiveSegment.value || segment === sendSegment.value || segment === bufferEndSegment.value) {
2759 "Logically removed segment is reachable.\nChannel state: $this"
2760 }
2761 }
2762 // Process the next segment.
2763 segment = segment.next!!
2764 }
2765 }
2766
2767 private fun OnUndeliveredElement<E>.bindCancellationFunResult() = ::onCancellationChannelResultImplDoNotCall
2768
2769 /**
2770 * Do not call directly. Go through [bindCancellationFunResult] to ensure the callback isn't null.
2771 * [bindCancellationFunResult] could have just returned a lambda as well, but there would be a risk of that
2772 * lambda capturing the environment.
2773 */
2774 private fun onCancellationChannelResultImplDoNotCall(
2775 cause: Throwable, element: ChannelResult<E>, context: CoroutineContext
2776 ) {
2777 onUndeliveredElement!!.callUndeliveredElement(element.getOrNull()!!, context)
2778 }
2779
2780 private fun OnUndeliveredElement<E>.bindCancellationFun(element: E):
2781 (Throwable, Any?, CoroutineContext) -> Unit =
2782 { _: Throwable, _, context: CoroutineContext -> callUndeliveredElement(element, context) }
2783
2784 private fun OnUndeliveredElement<E>.bindCancellationFun() = ::onCancellationImplDoNotCall
2785
2786 /**
2787 * Do not call directly. Go through [bindCancellationFun] to ensure the callback isn't null.
2788 * [bindCancellationFun] could have just returned a lambda as well, but there would be a risk of that
2789 * lambda capturing the environment.
2790 */
2791 private fun onCancellationImplDoNotCall(cause: Throwable, element: E, context: CoroutineContext) {
2792 onUndeliveredElement!!.callUndeliveredElement(element, context)
2793 }
2794 }
2795
2796 /**
2797 * The channel is represented as a list of segments, which simulates an infinite array.
2798 * Each segment has its own [id], which increase from the beginning. These [id]s help
2799 * to update [BufferedChannel.sendSegment], [BufferedChannel.receiveSegment],
2800 * and [BufferedChannel.bufferEndSegment] correctly.
2801 */
2802 internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: BufferedChannel<E>?, pointers: Int) : Segment<ChannelSegment<E>>(id, prev, pointers) {
2803 private val _channel: BufferedChannel<E>? = channel
2804 val channel get() = _channel!! // always non-null except for `NULL_SEGMENT`
2805
2806 private val data = atomicArrayOfNulls<Any?>(SEGMENT_SIZE * 2) // 2 registers per slot: state + element
2807 override val numberOfSlots: Int get() = SEGMENT_SIZE
2808
2809 // ########################################
2810 // # Manipulation with the Element Fields #
2811 // ########################################
2812
storeElementnull2813 internal fun storeElement(index: Int, element: E) {
2814 setElementLazy(index, element)
2815 }
2816
2817 @Suppress("UNCHECKED_CAST")
getElementnull2818 internal fun getElement(index: Int) = data[index * 2].value as E
2819
2820 internal fun retrieveElement(index: Int): E = getElement(index).also { cleanElement(index) }
2821
cleanElementnull2822 internal fun cleanElement(index: Int) {
2823 setElementLazy(index, null)
2824 }
2825
setElementLazynull2826 private fun setElementLazy(index: Int, value: Any?) {
2827 data[index * 2].lazySet(value)
2828 }
2829
2830 // ######################################
2831 // # Manipulation with the State Fields #
2832 // ######################################
2833
getStatenull2834 internal fun getState(index: Int): Any? = data[index * 2 + 1].value
2835
2836 internal fun setState(index: Int, value: Any?) {
2837 data[index * 2 + 1].value = value
2838 }
2839
casStatenull2840 internal fun casState(index: Int, from: Any?, to: Any?) = data[index * 2 + 1].compareAndSet(from, to)
2841
2842 internal fun getAndSetState(index: Int, update: Any?) = data[index * 2 + 1].getAndSet(update)
2843
2844
2845 // ########################
2846 // # Cancellation Support #
2847 // ########################
2848
2849 override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
2850 // To distinguish cancelled senders and receivers, senders equip the index value with
2851 // an additional marker, adding `SEGMENT_SIZE` to the value.
2852 val isSender = index >= SEGMENT_SIZE
2853 // Unwrap the index.
2854 @Suppress("NAME_SHADOWING") val index = if (isSender) index - SEGMENT_SIZE else index
2855 // Read the element, which may be needed further to call `onUndeliveredElement`.
2856 val element = getElement(index)
2857 // Update the cell state.
2858 while (true) {
2859 // CAS-loop
2860 // Read the current state of the cell.
2861 val cur = getState(index)
2862 when {
2863 // The cell stores a waiter.
2864 cur is Waiter || cur is WaiterEB -> {
2865 // The cancelled request is either send or receive.
2866 // Update the cell state correspondingly.
2867 val update = if (isSender) INTERRUPTED_SEND else INTERRUPTED_RCV
2868 if (casState(index, cur, update)) {
2869 // The waiter has been successfully cancelled.
2870 // Clean the element slot and invoke `onSlotCleaned()`,
2871 // which may cause deleting the whole segment from the linked list.
2872 // In case the cancelled request is receiver, it is critical to ensure
2873 // that the `expandBuffer()` attempt that processes this cell is completed,
2874 // so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`.
2875 cleanElement(index)
2876 onCancelledRequest(index, !isSender)
2877 // Call `onUndeliveredElement` if needed.
2878 if (isSender) {
2879 channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2880 }
2881 return
2882 }
2883 }
2884 // The cell already indicates that the operation is cancelled.
2885 cur === INTERRUPTED_SEND || cur === INTERRUPTED_RCV -> {
2886 // Clean the element slot to avoid memory leaks,
2887 // invoke `onUndeliveredElement` if needed, and finish
2888 cleanElement(index)
2889 // Call `onUndeliveredElement` if needed.
2890 if (isSender) {
2891 channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2892 }
2893 return
2894 }
2895 // An opposite operation is resuming this request;
2896 // wait until the cell state updates.
2897 // It is possible that an opposite operation has already
2898 // resumed this request, which will result in updating
2899 // the cell state to `DONE_RCV` or `BUFFERED`, while the
2900 // current cancellation is caused by prompt cancellation.
2901 cur === RESUMING_BY_EB || cur === RESUMING_BY_RCV -> continue
2902 // This request was successfully resumed, so this cancellation
2903 // is caused by the prompt cancellation feature and should be ignored.
2904 cur === DONE_RCV || cur === BUFFERED -> return
2905 // The cell state indicates that the channel is closed;
2906 // this cancellation should be ignored.
2907 cur === CHANNEL_CLOSED -> return
2908 else -> error("unexpected state: $cur")
2909 }
2910 }
2911 }
2912
2913 /**
2914 * Invokes `onSlotCleaned()` preceded by a `waitExpandBufferCompletion(..)` call
2915 * in case the cancelled request is receiver.
2916 */
onCancelledRequestnull2917 fun onCancelledRequest(index: Int, receiver: Boolean) {
2918 if (receiver) channel.waitExpandBufferCompletion(id * SEGMENT_SIZE + index)
2919 onSlotCleaned()
2920 }
2921 }
2922
2923 // WA for atomicfu + JVM_IR compiler bug that lead to SMAP-related compiler crashes: KT-55983
createSegmentFunctionnull2924 internal fun <E> createSegmentFunction(): KFunction2<Long, ChannelSegment<E>, ChannelSegment<E>> = ::createSegment
2925
2926 private fun <E> createSegment(id: Long, prev: ChannelSegment<E>) = ChannelSegment(
2927 id = id,
2928 prev = prev,
2929 channel = prev.channel,
2930 pointers = 0
2931 )
2932 private val NULL_SEGMENT = ChannelSegment<Any?>(id = -1, prev = null, channel = null, pointers = 0)
2933
2934 /**
2935 * Number of cells in each segment.
2936 */
2937 @JvmField
2938 internal val SEGMENT_SIZE = systemProp("kotlinx.coroutines.bufferedChannel.segmentSize", 32)
2939
2940 /**
2941 * Number of iterations to wait in [BufferedChannel.waitExpandBufferCompletion] until the numbers of started and completed
2942 * [BufferedChannel.expandBuffer] calls coincide. When the limit is reached, [BufferedChannel.waitExpandBufferCompletion]
2943 * blocks further [BufferedChannel.expandBuffer]-s to avoid starvation.
2944 */
2945 private val EXPAND_BUFFER_COMPLETION_WAIT_ITERATIONS = systemProp("kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations", 10_000)
2946
2947 /**
2948 * Tries to resume this continuation with the specified
2949 * value. Returns `true` on success and `false` on failure.
2950 */
2951 private fun <T> CancellableContinuation<T>.tryResume0(
2952 value: T,
2953 onCancellation: ((cause: Throwable, value: T, context: CoroutineContext) -> Unit)? = null
2954 ): Boolean =
2955 tryResume(value, null, onCancellation).let { token ->
2956 if (token != null) {
2957 completeResume(token)
2958 true
2959 } else false
2960 }
2961
2962 /*
2963 If the channel is rendezvous or unlimited, the `bufferEnd` counter
2964 should be initialized with the corresponding value below and never change.
2965 In this case, the `expandBuffer(..)` operation does nothing.
2966 */
2967 private const val BUFFER_END_RENDEZVOUS = 0L // no buffer
2968 private const val BUFFER_END_UNLIMITED = Long.MAX_VALUE // infinite buffer
initialBufferEndnull2969 private fun initialBufferEnd(capacity: Int): Long = when (capacity) {
2970 Channel.RENDEZVOUS -> BUFFER_END_RENDEZVOUS
2971 Channel.UNLIMITED -> BUFFER_END_UNLIMITED
2972 else -> capacity.toLong()
2973 }
2974
2975 /*
2976 Cell states. The initial "empty" state is represented with `null`,
2977 and suspended operations are represented with [Waiter] instances.
2978 */
2979
2980 // The cell stores a buffered element.
2981 @JvmField
2982 internal val BUFFERED = Symbol("BUFFERED")
2983 // Concurrent `expandBuffer(..)` can inform the
2984 // upcoming sender that it should buffer the element.
2985 private val IN_BUFFER = Symbol("SHOULD_BUFFER")
2986 // Indicates that a receiver (RCV suffix) is resuming
2987 // the suspended sender; after that, it should update
2988 // the state to either `DONE_RCV` (on success) or
2989 // `INTERRUPTED_SEND` (on failure).
2990 private val RESUMING_BY_RCV = Symbol("S_RESUMING_BY_RCV")
2991 // Indicates that `expandBuffer(..)` (RCV suffix) is resuming
2992 // the suspended sender; after that, it should update
2993 // the state to either `BUFFERED` (on success) or
2994 // `INTERRUPTED_SEND` (on failure).
2995 private val RESUMING_BY_EB = Symbol("RESUMING_BY_EB")
2996 // When a receiver comes to the cell already covered by
2997 // a sender (according to the counters), but the cell
2998 // is still in `EMPTY` or `IN_BUFFER` state, it breaks
2999 // the cell by changing its state to `POISONED`.
3000 private val POISONED = Symbol("POISONED")
3001 // When the element is successfully transferred
3002 // to a receiver, the cell changes to `DONE_RCV`.
3003 private val DONE_RCV = Symbol("DONE_RCV")
3004 // Cancelled sender.
3005 private val INTERRUPTED_SEND = Symbol("INTERRUPTED_SEND")
3006 // Cancelled receiver.
3007 private val INTERRUPTED_RCV = Symbol("INTERRUPTED_RCV")
3008 // Indicates that the channel is closed.
3009 internal val CHANNEL_CLOSED = Symbol("CHANNEL_CLOSED")
3010 // When the cell is already covered by both sender and
3011 // receiver (`sender` and `receivers` counters are greater
3012 // than the cell number), the `expandBuffer(..)` procedure
3013 // cannot distinguish which kind of operation is stored
3014 // in the cell. Thus, it wraps the waiter with this descriptor,
3015 // informing the possibly upcoming receiver that it should
3016 // complete the `expandBuffer(..)` procedure if the waiter stored
3017 // in the cell is sender. In turn, senders ignore this information.
3018 private class WaiterEB(@JvmField val waiter: Waiter) {
toStringnull3019 override fun toString() = "WaiterEB($waiter)"
3020 }
3021
3022
3023
3024 /**
3025 * To distinguish suspended [BufferedChannel.receive] and
3026 * [BufferedChannel.receiveCatching] operations, the latter
3027 * uses this wrapper for its continuation.
3028 */
3029 private class ReceiveCatching<E>(
3030 @JvmField val cont: CancellableContinuationImpl<ChannelResult<E>>
3031 ) : Waiter by cont
3032
3033 /*
3034 Internal results for [BufferedChannel.updateCellReceive].
3035 On successful rendezvous with waiting sender or
3036 buffered element retrieval, the corresponding element
3037 is returned as result of [BufferedChannel.updateCellReceive].
3038 */
3039 private val SUSPEND = Symbol("SUSPEND")
3040 private val SUSPEND_NO_WAITER = Symbol("SUSPEND_NO_WAITER")
3041 private val FAILED = Symbol("FAILED")
3042
3043 /*
3044 Internal results for [BufferedChannel.updateCellSend]
3045 */
3046 private const val RESULT_RENDEZVOUS = 0
3047 private const val RESULT_BUFFERED = 1
3048 private const val RESULT_SUSPEND = 2
3049 private const val RESULT_SUSPEND_NO_WAITER = 3
3050 private const val RESULT_CLOSED = 4
3051 private const val RESULT_FAILED = 5
3052
3053 /**
3054 * Special value for [BufferedChannel.BufferedChannelIterator.receiveResult]
3055 * that indicates the absence of pre-received result.
3056 */
3057 private val NO_RECEIVE_RESULT = Symbol("NO_RECEIVE_RESULT")
3058
3059 /*
3060 As [BufferedChannel.invokeOnClose] can be invoked concurrently
3061 with channel closing, we have to synchronize them. These two
3062 markers help with the synchronization.
3063 */
3064 private val CLOSE_HANDLER_CLOSED = Symbol("CLOSE_HANDLER_CLOSED")
3065 private val CLOSE_HANDLER_INVOKED = Symbol("CLOSE_HANDLER_INVOKED")
3066
3067 /**
3068 * Specifies the absence of closing cause, stored in [BufferedChannel._closeCause].
3069 * When the channel is closed or cancelled without exception, this [NO_CLOSE_CAUSE]
3070 * marker should be replaced with `null`.
3071 */
3072 private val NO_CLOSE_CAUSE = Symbol("NO_CLOSE_CAUSE")
3073
3074 /*
3075 The channel close statuses. The transition scheme is the following:
3076 +--------+ +----------------------+ +-----------+
3077 | ACTIVE |-->| CANCELLATION_STARTED |-->| CANCELLED |
3078 +--------+ +----------------------+ +-----------+
3079 | ^
3080 | +--------+ |
3081 +------------>| CLOSED |------------------+
3082 +--------+
3083 We need `CANCELLATION_STARTED` to synchronize
3084 concurrent closing and cancellation.
3085 */
3086 private const val CLOSE_STATUS_ACTIVE = 0
3087 private const val CLOSE_STATUS_CANCELLATION_STARTED = 1
3088 private const val CLOSE_STATUS_CLOSED = 2
3089 private const val CLOSE_STATUS_CANCELLED = 3
3090
3091 /*
3092 The `senders` counter and the channel close status
3093 are stored in a single 64-bit register to save the space
3094 and reduce the number of reads in sending operations.
3095 The code below encapsulates the required bit arithmetics.
3096 */
3097 private const val SENDERS_CLOSE_STATUS_SHIFT = 60
3098 private const val SENDERS_COUNTER_MASK = (1L shl SENDERS_CLOSE_STATUS_SHIFT) - 1
3099 private inline val Long.sendersCounter get() = this and SENDERS_COUNTER_MASK
3100 private inline val Long.sendersCloseStatus: Int get() = (this shr SENDERS_CLOSE_STATUS_SHIFT).toInt()
3101 private fun constructSendersAndCloseStatus(counter: Long, closeStatus: Int): Long =
3102 (closeStatus.toLong() shl SENDERS_CLOSE_STATUS_SHIFT) + counter
3103
3104 /*
3105 The `completedExpandBuffersAndPauseFlag` 64-bit counter contains
3106 the number of completed `expandBuffer()` attempts along with a special
3107 flag that pauses progress to avoid starvation in `waitExpandBufferCompletion(..)`.
3108 The code below encapsulates the required bit arithmetics.
3109 */
3110 private const val EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT = 1L shl 62
3111 private const val EB_COMPLETED_COUNTER_MASK = EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT - 1
3112 private inline val Long.ebCompletedCounter get() = this and EB_COMPLETED_COUNTER_MASK
3113 private inline val Long.ebPauseExpandBuffers: Boolean get() = (this and EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT) != 0L
3114 private fun constructEBCompletedAndPauseFlag(counter: Long, pauseEB: Boolean): Long =
3115 (if (pauseEB) EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT else 0) + counter
3116