1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.internal
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11
12 /**
13 * Returns the first segment `s` with `s.id >= id` or `CLOSED`
14 * if all the segments in this linked list have lower `id`, and the list is closed for further segment additions.
15 */
16 internal fun <S : Segment<S>> S.findSegmentInternal(
17 id: Long,
18 createNewSegment: (id: Long, prev: S) -> S
19 ): SegmentOrClosed<S> {
20 /*
21 Go through `next` references and add new segments if needed, similarly to the `push` in the Michael-Scott
22 queue algorithm. The only difference is that "CAS failure" means that the required segment has already been
23 added, so the algorithm just uses it. This way, only one segment with each id can be added.
24 */
25 var cur: S = this
26 while (cur.id < id || cur.isRemoved) {
27 val next = cur.nextOrIfClosed { return SegmentOrClosed(CLOSED) }
28 if (next != null) { // there is a next node -- move there
29 cur = next
30 continue
31 }
32 val newTail = createNewSegment(cur.id + 1, cur)
33 if (cur.trySetNext(newTail)) { // successfully added new node -- move there
34 if (cur.isRemoved) cur.remove()
35 cur = newTail
36 }
37 }
38 return SegmentOrClosed(cur)
39 }
40
41 /**
42 * Returns `false` if the segment `to` is logically removed, `true` on a successful update.
43 */
44 @Suppress("NOTHING_TO_INLINE", "RedundantNullableReturnType") // Must be inline because it is an AtomicRef extension
curnull45 internal inline fun <S : Segment<S>> AtomicRef<S>.moveForward(to: S): Boolean = loop { cur ->
46 if (cur.id >= to.id) return true
47 if (!to.tryIncPointers()) return false
48 if (compareAndSet(cur, to)) { // the segment is moved
49 if (cur.decPointers()) cur.remove()
50 return true
51 }
52 if (to.decPointers()) to.remove() // undo tryIncPointers
53 }
54
55 /**
56 * Tries to find a segment with the specified [id] following by next references from the
57 * [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values,
58 * doing some synchronization, and invoking this function to find the required segment and update the pointer.
59 * At the same time, [Segment.cleanPrev] should also be invoked if the previous segments are no longer needed
60 * (e.g., queues should use it in dequeue operations).
61 *
62 * Since segments can be removed from the list, or it can be closed for further segment additions.
63 * Returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id`,
64 * and the list is closed.
65 */
66 @Suppress("NOTHING_TO_INLINE")
findSegmentAndMoveForwardnull67 internal inline fun <S : Segment<S>> AtomicRef<S>.findSegmentAndMoveForward(
68 id: Long,
69 startFrom: S,
70 noinline createNewSegment: (id: Long, prev: S) -> S
71 ): SegmentOrClosed<S> {
72 while (true) {
73 val s = startFrom.findSegmentInternal(id, createNewSegment)
74 if (s.isClosed || moveForward(s.segment)) return s
75 }
76 }
77
78 /**
79 * Closes this linked list of nodes by forbidding adding new ones,
80 * returns the last node in the list.
81 */
closenull82 internal fun <N : ConcurrentLinkedListNode<N>> N.close(): N {
83 var cur: N = this
84 while (true) {
85 val next = cur.nextOrIfClosed { return cur }
86 if (next === null) {
87 if (cur.markAsClosed()) return cur
88 } else {
89 cur = next
90 }
91 }
92 }
93
94 internal abstract class ConcurrentLinkedListNode<N : ConcurrentLinkedListNode<N>>(prev: N?) {
95 // Pointer to the next node, updates similarly to the Michael-Scott queue algorithm.
96 private val _next = atomic<Any?>(null)
97 // Pointer to the previous node, updates in [remove] function.
98 private val _prev = atomic(prev)
99
100 private val nextOrClosed get() = _next.value
101
102 /**
103 * Returns the next segment or `null` of the one does not exist,
104 * and invokes [onClosedAction] if this segment is marked as closed.
105 */
106 @Suppress("UNCHECKED_CAST")
<lambda>null107 inline fun nextOrIfClosed(onClosedAction: () -> Nothing): N? = nextOrClosed.let {
108 if (it === CLOSED) {
109 onClosedAction()
110 } else {
111 it as N?
112 }
113 }
114
<lambda>null115 val next: N? get() = nextOrIfClosed { return null }
116
117 /**
118 * Tries to set the next segment if it is not specified and this segment is not marked as closed.
119 */
trySetNextnull120 fun trySetNext(value: N): Boolean = _next.compareAndSet(null, value)
121
122 /**
123 * Checks whether this node is the physical tail of the current linked list.
124 */
125 val isTail: Boolean get() = next == null
126
127 val prev: N? get() = _prev.value
128
129 /**
130 * Cleans the pointer to the previous node.
131 */
132 fun cleanPrev() { _prev.lazySet(null) }
133
134 /**
135 * Tries to mark the linked list as closed by forbidding adding new nodes after this one.
136 */
markAsClosednull137 fun markAsClosed() = _next.compareAndSet(null, CLOSED)
138
139 /**
140 * This property indicates whether the current node is logically removed.
141 * The expected use-case is removing the node logically (so that [isRemoved] becomes true),
142 * and invoking [remove] after that. Note that this implementation relies on the contract
143 * that the physical tail cannot be logically removed. Please, do not break this contract;
144 * otherwise, memory leaks and unexpected behavior can occur.
145 */
146 abstract val isRemoved: Boolean
147
148 /**
149 * Removes this node physically from this linked list. The node should be
150 * logically removed (so [isRemoved] returns `true`) at the point of invocation.
151 */
152 fun remove() {
153 assert { isRemoved || isTail } // The node should be logically removed at first.
154 // The physical tail cannot be removed. Instead, we remove it when
155 // a new segment is added and this segment is not the tail one anymore.
156 if (isTail) return
157 while (true) {
158 // Read `next` and `prev` pointers ignoring logically removed nodes.
159 val prev = aliveSegmentLeft
160 val next = aliveSegmentRight
161 // Link `next` and `prev`.
162 next._prev.update { if (it === null) null else prev }
163 if (prev !== null) prev._next.value = next
164 // Checks that prev and next are still alive.
165 if (next.isRemoved && !next.isTail) continue
166 if (prev !== null && prev.isRemoved) continue
167 // This node is removed.
168 return
169 }
170 }
171
172 private val aliveSegmentLeft: N? get() {
173 var cur = prev
174 while (cur !== null && cur.isRemoved)
175 cur = cur._prev.value
176 return cur
177 }
178
179 private val aliveSegmentRight: N get() {
<lambda>null180 assert { !isTail } // Should not be invoked on the tail node
181 var cur = next!!
182 while (cur.isRemoved)
183 cur = cur.next ?: return cur
184 return cur
185 }
186 }
187
188 /**
189 * Each segment in the list has a unique id and is created by the provided to [findSegmentAndMoveForward] method.
190 * Essentially, this is a node in the Michael-Scott queue algorithm,
191 * but with maintaining [prev] pointer for efficient [remove] implementation.
192 *
193 * NB: this class cannot be public or leak into user's code as public type as [CancellableContinuationImpl]
194 * instance-check it and uses a separate code-path for that.
195 */
196 internal abstract class Segment<S : Segment<S>>(
197 @JvmField val id: Long, prev: S?, pointers: Int
198 ) : ConcurrentLinkedListNode<S>(prev),
199 // Segments typically store waiting continuations. Thus, on cancellation, the corresponding
200 // slot should be cleaned and the segment should be removed if it becomes full of cancelled cells.
201 // To install such a handler efficiently, without creating an extra object, we allow storing
202 // segments as cancellation handlers in [CancellableContinuationImpl] state, putting the slot
203 // index in another field. The details are here: https://github.com/Kotlin/kotlinx.coroutines/pull/3084.
204 // For that, we need segments to implement this internal marker interface.
205 NotCompleted
206 {
207 /**
208 * This property should return the number of slots in this segment,
209 * it is used to define whether the segment is logically removed.
210 */
211 abstract val numberOfSlots: Int
212
213 /**
214 * Numbers of cleaned slots (the lowest bits) and AtomicRef pointers to this segment (the highest bits)
215 */
216 private val cleanedAndPointers = atomic(pointers shl POINTERS_SHIFT)
217
218 /**
219 * The segment is considered as removed if all the slots are cleaned
220 * and there are no pointers to this segment from outside.
221 */
222 override val isRemoved get() = cleanedAndPointers.value == numberOfSlots && !isTail
223
224 // increments the number of pointers if this segment is not logically removed.
<lambda>null225 internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != numberOfSlots || isTail }
226
227 // returns `true` if this segment is logically removed after the decrement.
decPointersnull228 internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == numberOfSlots && !isTail
229
230 /**
231 * This function is invoked on continuation cancellation when this segment
232 * with the specified [index] are installed as cancellation handler via
233 * `SegmentDisposable.disposeOnCancellation(Segment, Int)`.
234 *
235 * @param index the index under which the sement registered itself in the continuation.
236 * Indicies are opaque and arithmetics or numeric intepretation is not allowed on them,
237 * as they may encode additional metadata.
238 * @param cause the cause of the cancellation, with the same semantics as [CancellableContinuation.invokeOnCancellation]
239 * @param context the context of the cancellable continuation the segment was registered in
240 */
241 abstract fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext)
242
243 /**
244 * Invoked on each slot clean-up; should not be invoked twice for the same slot.
245 */
246 fun onSlotCleaned() {
247 if (cleanedAndPointers.incrementAndGet() == numberOfSlots) remove()
248 }
249 }
250
addConditionallynull251 private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) -> Boolean): Boolean {
252 while (true) {
253 val cur = this.value
254 if (!condition(cur)) return false
255 if (this.compareAndSet(cur, cur + delta)) return true
256 }
257 }
258
259 @JvmInline
260 internal value class SegmentOrClosed<S : Segment<S>>(private val value: Any?) {
261 val isClosed: Boolean get() = value === CLOSED
262 @Suppress("UNCHECKED_CAST")
263 val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S
264 }
265
266 private const val POINTERS_SHIFT = 16
267
268 private val CLOSED = Symbol("CLOSED")
269