• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.internal
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlin.coroutines.*
6 import kotlin.jvm.*
7 
8 /**
9  * Returns the first segment `s` with `s.id >= id` or `CLOSED`
10  * if all the segments in this linked list have lower `id`, and the list is closed for further segment additions.
11  */
12 internal fun <S : Segment<S>> S.findSegmentInternal(
13     id: Long,
14     createNewSegment: (id: Long, prev: S) -> S
15 ): SegmentOrClosed<S> {
16     /*
17        Go through `next` references and add new segments if needed, similarly to the `push` in the Michael-Scott
18        queue algorithm. The only difference is that "CAS failure" means that the required segment has already been
19        added, so the algorithm just uses it. This way, only one segment with each id can be added.
20      */
21     var cur: S = this
22     while (cur.id < id || cur.isRemoved) {
23         val next = cur.nextOrIfClosed { return SegmentOrClosed(CLOSED) }
24         if (next != null) { // there is a next node -- move there
25             cur = next
26             continue
27         }
28         val newTail = createNewSegment(cur.id + 1, cur)
29         if (cur.trySetNext(newTail)) { // successfully added new node -- move there
30             if (cur.isRemoved) cur.remove()
31             cur = newTail
32         }
33     }
34     return SegmentOrClosed(cur)
35 }
36 
37 /**
38  * Returns `false` if the segment `to` is logically removed, `true` on a successful update.
39  */
40 @Suppress("NOTHING_TO_INLINE", "RedundantNullableReturnType") // Must be inline because it is an AtomicRef extension
curnull41 internal inline fun <S : Segment<S>> AtomicRef<S>.moveForward(to: S): Boolean = loop { cur ->
42     if (cur.id >= to.id) return true
43     if (!to.tryIncPointers()) return false
44     if (compareAndSet(cur, to)) { // the segment is moved
45         if (cur.decPointers()) cur.remove()
46         return true
47     }
48     if (to.decPointers()) to.remove() // undo tryIncPointers
49 }
50 
51 /**
52  * Tries to find a segment with the specified [id] following by next references from the
53  * [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values,
54  * doing some synchronization, and invoking this function to find the required segment and update the pointer.
55  * At the same time, [Segment.cleanPrev] should also be invoked if the previous segments are no longer needed
56  * (e.g., queues should use it in dequeue operations).
57  *
58  * Since segments can be removed from the list, or it can be closed for further segment additions.
59  * Returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id`,
60  * and the list is closed.
61  */
62 @Suppress("NOTHING_TO_INLINE")
findSegmentAndMoveForwardnull63 internal inline fun <S : Segment<S>> AtomicRef<S>.findSegmentAndMoveForward(
64     id: Long,
65     startFrom: S,
66     noinline createNewSegment: (id: Long, prev: S) -> S
67 ): SegmentOrClosed<S> {
68     while (true) {
69         val s = startFrom.findSegmentInternal(id, createNewSegment)
70         if (s.isClosed || moveForward(s.segment)) return s
71     }
72 }
73 
74 /**
75  * Closes this linked list of nodes by forbidding adding new ones,
76  * returns the last node in the list.
77  */
closenull78 internal fun <N : ConcurrentLinkedListNode<N>> N.close(): N {
79     var cur: N = this
80     while (true) {
81         val next = cur.nextOrIfClosed { return cur }
82         if (next === null) {
83             if (cur.markAsClosed()) return cur
84         } else {
85             cur = next
86         }
87     }
88 }
89 
90 internal abstract class ConcurrentLinkedListNode<N : ConcurrentLinkedListNode<N>>(prev: N?) {
91     // Pointer to the next node, updates similarly to the Michael-Scott queue algorithm.
92     private val _next = atomic<Any?>(null)
93     // Pointer to the previous node, updates in [remove] function.
94     private val _prev = atomic(prev)
95 
96     private val nextOrClosed get() = _next.value
97 
98     /**
99      * Returns the next segment or `null` of the one does not exist,
100      * and invokes [onClosedAction] if this segment is marked as closed.
101      */
102     @Suppress("UNCHECKED_CAST")
<lambda>null103     inline fun nextOrIfClosed(onClosedAction: () -> Nothing): N? = nextOrClosed.let {
104         if (it === CLOSED) {
105             onClosedAction()
106         } else {
107             it as N?
108         }
109     }
110 
<lambda>null111     val next: N? get() = nextOrIfClosed { return null }
112 
113     /**
114      * Tries to set the next segment if it is not specified and this segment is not marked as closed.
115      */
trySetNextnull116     fun trySetNext(value: N): Boolean = _next.compareAndSet(null, value)
117 
118     /**
119      * Checks whether this node is the physical tail of the current linked list.
120      */
121     val isTail: Boolean get() = next == null
122 
123     val prev: N? get() = _prev.value
124 
125     /**
126      * Cleans the pointer to the previous node.
127      */
128     fun cleanPrev() { _prev.lazySet(null) }
129 
130     /**
131      * Tries to mark the linked list as closed by forbidding adding new nodes after this one.
132      */
markAsClosednull133     fun markAsClosed() = _next.compareAndSet(null, CLOSED)
134 
135     /**
136      * This property indicates whether the current node is logically removed.
137      * The expected use-case is removing the node logically (so that [isRemoved] becomes true),
138      * and invoking [remove] after that. Note that this implementation relies on the contract
139      * that the physical tail cannot be logically removed. Please, do not break this contract;
140      * otherwise, memory leaks and unexpected behavior can occur.
141      */
142     abstract val isRemoved: Boolean
143 
144     /**
145      * Removes this node physically from this linked list. The node should be
146      * logically removed (so [isRemoved] returns `true`) at the point of invocation.
147      */
148     fun remove() {
149         assert { isRemoved || isTail } // The node should be logically removed at first.
150         // The physical tail cannot be removed. Instead, we remove it when
151         // a new segment is added and this segment is not the tail one anymore.
152         if (isTail) return
153         while (true) {
154             // Read `next` and `prev` pointers ignoring logically removed nodes.
155             val prev = aliveSegmentLeft
156             val next = aliveSegmentRight
157             // Link `next` and `prev`.
158             next._prev.update { if (it === null) null else prev }
159             if (prev !== null) prev._next.value = next
160             // Checks that prev and next are still alive.
161             if (next.isRemoved && !next.isTail) continue
162             if (prev !== null && prev.isRemoved) continue
163             // This node is removed.
164             return
165         }
166     }
167 
168     private val aliveSegmentLeft: N? get() {
169         var cur = prev
170         while (cur !== null && cur.isRemoved)
171             cur = cur._prev.value
172         return cur
173     }
174 
175     private val aliveSegmentRight: N get() {
<lambda>null176         assert { !isTail } // Should not be invoked on the tail node
177         var cur = next!!
178         while (cur.isRemoved)
179             cur = cur.next ?: return cur
180         return cur
181     }
182 }
183 
184 /**
185  * Each segment in the list has a unique id and is created by the provided to [findSegmentAndMoveForward] method.
186  * Essentially, this is a node in the Michael-Scott queue algorithm,
187  * but with maintaining [prev] pointer for efficient [remove] implementation.
188  *
189  * NB: this class cannot be public or leak into user's code as public type as [CancellableContinuationImpl]
190  * instance-check it and uses a separate code-path for that.
191  */
192 internal abstract class Segment<S : Segment<S>>(
193     @JvmField val id: Long, prev: S?, pointers: Int
194 ) : ConcurrentLinkedListNode<S>(prev),
195     // Segments typically store waiting continuations. Thus, on cancellation, the corresponding
196     // slot should be cleaned and the segment should be removed if it becomes full of cancelled cells.
197     // To install such a handler efficiently, without creating an extra object, we allow storing
198     // segments as cancellation handlers in [CancellableContinuationImpl] state, putting the slot
199     // index in another field. The details are here: https://github.com/Kotlin/kotlinx.coroutines/pull/3084.
200     // For that, we need segments to implement this internal marker interface.
201     NotCompleted
202 {
203     /**
204      * This property should return the number of slots in this segment,
205      * it is used to define whether the segment is logically removed.
206      */
207     abstract val numberOfSlots: Int
208 
209     /**
210      * Numbers of cleaned slots (the lowest bits) and AtomicRef pointers to this segment (the highest bits)
211      */
212     private val cleanedAndPointers = atomic(pointers shl POINTERS_SHIFT)
213 
214     /**
215      * The segment is considered as removed if all the slots are cleaned
216      * and there are no pointers to this segment from outside.
217      */
218     override val isRemoved get() = cleanedAndPointers.value == numberOfSlots && !isTail
219 
220     // increments the number of pointers if this segment is not logically removed.
<lambda>null221     internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != numberOfSlots || isTail }
222 
223     // returns `true` if this segment is logically removed after the decrement.
decPointersnull224     internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == numberOfSlots && !isTail
225 
226     /**
227      * This function is invoked on continuation cancellation when this segment
228      * with the specified [index] are installed as cancellation handler via
229      * `SegmentDisposable.disposeOnCancellation(Segment, Int)`.
230      *
231      * @param index the index under which the sement registered itself in the continuation.
232      *        Indicies are opaque and arithmetics or numeric intepretation is not allowed on them,
233      *        as they may encode additional metadata.
234      * @param cause the cause of the cancellation, with the same semantics as [CancellableContinuation.invokeOnCancellation]
235      * @param context the context of the cancellable continuation the segment was registered in
236      */
237     abstract fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext)
238 
239     /**
240      * Invoked on each slot clean-up; should not be invoked twice for the same slot.
241      */
242     fun onSlotCleaned() {
243         if (cleanedAndPointers.incrementAndGet() == numberOfSlots) remove()
244     }
245 }
246 
addConditionallynull247 private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) -> Boolean): Boolean {
248     while (true) {
249         val cur = this.value
250         if (!condition(cur)) return false
251         if (this.compareAndSet(cur, cur + delta)) return true
252     }
253 }
254 
255 @JvmInline
256 internal value class SegmentOrClosed<S : Segment<S>>(private val value: Any?) {
257     val isClosed: Boolean get() = value === CLOSED
258     @Suppress("UNCHECKED_CAST")
259     val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S
260 }
261 
262 private const val POINTERS_SHIFT = 16
263 
264 private val CLOSED = Symbol("CLOSED")
265