• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.internal
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlin.jvm.*
10 
11 private typealias Core<E> = LockFreeTaskQueueCore<E>
12 
13 /**
14  * Lock-free Multiply-Producer xxx-Consumer Queue for task scheduling purposes.
15  *
16  * **Note 1: This queue is NOT linearizable. It provides only quiescent consistency for its operations.**
17  * However, this guarantee is strong enough for task-scheduling purposes.
18  * In particular, the following execution is permitted for this queue, but is not permitted for a linearizable queue:
19  *
20  * ```
21  * Thread 1: addLast(1) = true, removeFirstOrNull() = null
22  * Thread 2: addLast(2) = 2 // this operation is concurrent with both operations in the first thread
23  * ```
24  *
25  * **Note 2: When this queue is used with multiple consumers (`singleConsumer == false`) this it is NOT lock-free.**
26  * In particular, consumer spins until producer finishes its operation in the case of near-empty queue.
27  * It is a very short window that could manifest itself rarely and only under specific load conditions,
28  * but it still deprives this algorithm of its lock-freedom.
29  */
30 internal open class LockFreeTaskQueue<E : Any>(
31     singleConsumer: Boolean // true when there is only a single consumer (slightly faster & lock-free)
32 ) {
33     private val _cur = atomic(Core<E>(Core.INITIAL_CAPACITY, singleConsumer))
34 
35     // Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
36     val isEmpty: Boolean get() = _cur.value.isEmpty
37     val size: Int get() = _cur.value.size
38 
39     fun close() {
40         _cur.loop { cur ->
41             if (cur.close()) return // closed this copy
42             _cur.compareAndSet(cur, cur.next()) // move to next
43         }
44     }
45 
46     fun addLast(element: E): Boolean {
47         _cur.loop { cur ->
48             when (cur.addLast(element)) {
49                 Core.ADD_SUCCESS -> return true
50                 Core.ADD_CLOSED -> return false
51                 Core.ADD_FROZEN -> _cur.compareAndSet(cur, cur.next()) // move to next
52             }
53         }
54     }
55 
56     @Suppress("UNCHECKED_CAST")
57     fun removeFirstOrNull(): E? = removeFirstOrNullIf { true }
58 
59     @Suppress("UNCHECKED_CAST")
60     inline fun removeFirstOrNullIf(predicate: (E) -> Boolean): E? {
61         _cur.loop { cur ->
62             val result = cur.removeFirstOrNullIf(predicate)
63             if (result !== Core.REMOVE_FROZEN) return result as E?
64             _cur.compareAndSet(cur, cur.next())
65         }
66     }
67 
68     // Used for validation in tests only
69     fun <R> map(transform: (E) -> R): List<R> = _cur.value.map(transform)
70 
71     // Used for validation in tests only
72     fun isClosed(): Boolean = _cur.value.isClosed()
73 }
74 
75 /**
76  * Lock-free Multiply-Producer xxx-Consumer Queue core.
77  * @see LockFreeTaskQueue
78  */
79 internal class LockFreeTaskQueueCore<E : Any>(
80     private val capacity: Int,
81     private val singleConsumer: Boolean // true when there is only a single consumer (slightly faster)
82 ) {
83     private val mask = capacity - 1
84     private val _next = atomic<Core<E>?>(null)
85     private val _state = atomic(0L)
86     private val array = atomicArrayOfNulls<Any?>(capacity)
87 
88     init {
89         check(mask <= MAX_CAPACITY_MASK)
90         check(capacity and mask == 0)
91     }
92 
93     // Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
headnull94     val isEmpty: Boolean get() = _state.value.withState { head, tail -> head == tail }
headnull95     val size: Int get() = _state.value.withState { head, tail -> (tail - head) and MAX_CAPACITY_MASK }
96 
closenull97     fun close(): Boolean {
98         _state.update { state ->
99             if (state and CLOSED_MASK != 0L) return true // ok - already closed
100             if (state and FROZEN_MASK != 0L) return false // frozen -- try next
101             state or CLOSED_MASK // try set closed bit
102         }
103         return true
104     }
105 
106     // ADD_CLOSED | ADD_FROZEN | ADD_SUCCESS
addLastnull107     fun addLast(element: E): Int {
108         _state.loop { state ->
109             if (state and (FROZEN_MASK or CLOSED_MASK) != 0L) return state.addFailReason() // cannot add
110             state.withState { head, tail ->
111                 val mask = this.mask // manually move instance field to local for performance
112                 // If queue is Single-Consumer then there could be one element beyond head that we cannot overwrite,
113                 // so we check for full queue with an extra margin of one element
114                 if ((tail + 2) and mask == head and mask) return ADD_FROZEN // overfull, so do freeze & copy
115                 // If queue is Multi-Consumer then the consumer could still have not cleared element
116                 // despite the above check for one free slot.
117                 if (!singleConsumer && array[tail and mask].value != null) {
118                     // There are two options in this situation
119                     // 1. Spin-wait until consumer clears the slot
120                     // 2. Freeze & resize to avoid spinning
121                     // We use heuristic here to avoid memory-overallocation
122                     // Freeze & reallocate when queue is small or more than half of the queue is used
123                     if (capacity < MIN_ADD_SPIN_CAPACITY || (tail - head) and MAX_CAPACITY_MASK > capacity shr 1) {
124                         return ADD_FROZEN
125                     }
126                     // otherwise spin
127                     return@loop
128                 }
129                 val newTail = (tail + 1) and MAX_CAPACITY_MASK
130                 if (_state.compareAndSet(state, state.updateTail(newTail))) {
131                     // successfully added
132                     array[tail and mask].value = element
133                     // could have been frozen & copied before this item was set -- correct it by filling placeholder
134                     var cur = this
135                     while(true) {
136                         if (cur._state.value and FROZEN_MASK == 0L) break // all fine -- not frozen yet
137                         cur = cur.next().fillPlaceholder(tail, element) ?: break
138                     }
139                     return ADD_SUCCESS // added successfully
140                 }
141             }
142         }
143     }
144 
fillPlaceholdernull145     private fun fillPlaceholder(index: Int, element: E): Core<E>? {
146         val old = array[index and mask].value
147         /*
148          * addLast actions:
149          * 1) Commit tail slot
150          * 2) Write element to array slot
151          * 3) Check for array copy
152          *
153          * If copy happened between 2 and 3 then the consumer might have consumed our element,
154          * then another producer might have written its placeholder in our slot, so we should
155          * perform *unique* check that current placeholder is our to avoid overwriting another producer placeholder
156          */
157         if (old is Placeholder && old.index == index) {
158             array[index and mask].value = element
159             // we've corrected missing element, should check if that propagated to further copies, just in case
160             return this
161         }
162         // it is Ok, no need for further action
163         return null
164     }
165 
166     // REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
<lambda>null167     fun removeFirstOrNull(): Any? = removeFirstOrNullIf { true }
168 
169     // REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
removeFirstOrNullIfnull170     inline fun removeFirstOrNullIf(predicate: (E) -> Boolean): Any? {
171         _state.loop { state ->
172             if (state and FROZEN_MASK != 0L) return REMOVE_FROZEN // frozen -- cannot modify
173             state.withState { head, tail ->
174                 if ((tail and mask) == (head and mask)) return null // empty
175                 val element = array[head and mask].value
176                 if (element == null) {
177                     // If queue is Single-Consumer, then element == null only when add has not finished yet
178                     if (singleConsumer) return null // consider it not added yet
179                     // retry (spin) until consumer adds it
180                     return@loop
181                 }
182                 // element == Placeholder can only be when add has not finished yet
183                 if (element is Placeholder) return null // consider it not added yet
184                 // now we tentative know element to remove -- check predicate
185                 @Suppress("UNCHECKED_CAST")
186                 if (!predicate(element as E)) return null
187                 // we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
188                 val newHead = (head + 1) and MAX_CAPACITY_MASK
189                 if (_state.compareAndSet(state, state.updateHead(newHead))) {
190                     // Array could have been copied by another thread and it is perfectly fine, since only elements
191                     // between head and tail were copied and there are no extra steps we should take here
192                     array[head and mask].value = null // now can safely put null (state was updated)
193                     return element // successfully removed in fast-path
194                 }
195                 // Multi-Consumer queue must retry this loop on CAS failure (another consumer might have removed element)
196                 if (!singleConsumer) return@loop
197                 // Single-consumer queue goes to slow-path for remove in case of interference
198                 var cur = this
199                 while (true) {
200                     @Suppress("UNUSED_VALUE")
201                     cur = cur.removeSlowPath(head, newHead) ?: return element
202                 }
203             }
204         }
205     }
206 
removeSlowPathnull207     private fun removeSlowPath(oldHead: Int, newHead: Int): Core<E>? {
208         _state.loop { state ->
209             state.withState { head, _ ->
210                 assert { head == oldHead } // "This queue can have only one consumer"
211                 if (state and FROZEN_MASK != 0L) {
212                     // state was already frozen, so removed element was copied to next
213                     return next() // continue to correct head in next
214                 }
215                 if (_state.compareAndSet(state, state.updateHead(newHead))) {
216                     array[head and mask].value = null // now can safely put null (state was updated)
217                     return null
218                 }
219             }
220         }
221     }
222 
nextnull223     fun next(): LockFreeTaskQueueCore<E> = allocateOrGetNextCopy(markFrozen())
224 
225     private fun markFrozen(): Long =
226         _state.updateAndGet { state ->
227             if (state and FROZEN_MASK != 0L) return state // already marked
228             state or FROZEN_MASK
229         }
230 
allocateOrGetNextCopynull231     private fun allocateOrGetNextCopy(state: Long): Core<E> {
232         _next.loop { next ->
233             if (next != null) return next // already allocated & copied
234             _next.compareAndSet(null, allocateNextCopy(state))
235         }
236     }
237 
allocateNextCopynull238     private fun allocateNextCopy(state: Long): Core<E> {
239         val next = LockFreeTaskQueueCore<E>(capacity * 2, singleConsumer)
240         state.withState { head, tail ->
241             var index = head
242             while (index and mask != tail and mask) {
243                 // replace nulls with placeholders on copy
244                 val value = array[index and mask].value ?: Placeholder(index)
245                 next.array[index and next.mask].value = value
246                 index++
247             }
248             next._state.value = state wo FROZEN_MASK
249         }
250         return next
251     }
252 
253     // Used for validation in tests only
mapnull254     fun <R> map(transform: (E) -> R): List<R> {
255         val res = ArrayList<R>(capacity)
256         _state.value.withState { head, tail ->
257             var index = head
258             while (index and mask != tail and mask) {
259                 // replace nulls with placeholders on copy
260                 val element = array[index and mask].value
261                 @Suppress("UNCHECKED_CAST")
262                 if (element != null && element !is Placeholder) res.add(transform(element as E))
263                 index++
264             }
265         }
266         return res
267     }
268 
269     // Used for validation in tests only
isClosednull270     fun isClosed(): Boolean = _state.value and CLOSED_MASK != 0L
271 
272 
273     // Instance of this class is placed into array when we have to copy array, but addLast is in progress --
274     // it had already reserved a slot in the array (with null) and have not yet put its value there.
275     // Placeholder keeps the actual index (not masked) to distinguish placeholders on different wraparounds of array
276     // Internal because of inlining
277     internal class Placeholder(@JvmField val index: Int)
278 
279     @Suppress("PrivatePropertyName", "MemberVisibilityCanBePrivate")
280     internal companion object {
281         const val INITIAL_CAPACITY = 8
282 
283         const val CAPACITY_BITS = 30
284         const val MAX_CAPACITY_MASK = (1 shl CAPACITY_BITS) - 1
285         const val HEAD_SHIFT = 0
286         const val HEAD_MASK = MAX_CAPACITY_MASK.toLong() shl HEAD_SHIFT
287         const val TAIL_SHIFT = HEAD_SHIFT + CAPACITY_BITS
288         const val TAIL_MASK = MAX_CAPACITY_MASK.toLong() shl TAIL_SHIFT
289 
290         const val FROZEN_SHIFT = TAIL_SHIFT + CAPACITY_BITS
291         const val FROZEN_MASK = 1L shl FROZEN_SHIFT
292         const val CLOSED_SHIFT = FROZEN_SHIFT + 1
293         const val CLOSED_MASK = 1L shl CLOSED_SHIFT
294 
295         const val MIN_ADD_SPIN_CAPACITY = 1024
296 
297         @JvmField val REMOVE_FROZEN = Symbol("REMOVE_FROZEN")
298 
299         const val ADD_SUCCESS = 0
300         const val ADD_FROZEN = 1
301         const val ADD_CLOSED = 2
302 
303         infix fun Long.wo(other: Long) = this and other.inv()
304         fun Long.updateHead(newHead: Int) = (this wo HEAD_MASK) or (newHead.toLong() shl HEAD_SHIFT)
305         fun Long.updateTail(newTail: Int) = (this wo TAIL_MASK) or (newTail.toLong() shl TAIL_SHIFT)
306 
307         inline fun <T> Long.withState(block: (head: Int, tail: Int) -> T): T {
308             val head = ((this and HEAD_MASK) shr HEAD_SHIFT).toInt()
309             val tail = ((this and TAIL_MASK) shr TAIL_SHIFT).toInt()
310             return block(head, tail)
311         }
312 
313         // FROZEN | CLOSED
314         fun Long.addFailReason(): Int = if (this and CLOSED_MASK != 0L) ADD_CLOSED else ADD_FROZEN
315     }
316 }