1 /* <lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.internal 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.* 9 import kotlin.jvm.* 10 11 private typealias Core<E> = LockFreeTaskQueueCore<E> 12 13 /** 14 * Lock-free Multiply-Producer xxx-Consumer Queue for task scheduling purposes. 15 * 16 * **Note 1: This queue is NOT linearizable. It provides only quiescent consistency for its operations.** 17 * However, this guarantee is strong enough for task-scheduling purposes. 18 * In particular, the following execution is permitted for this queue, but is not permitted for a linearizable queue: 19 * 20 * ``` 21 * Thread 1: addLast(1) = true, removeFirstOrNull() = null 22 * Thread 2: addLast(2) = 2 // this operation is concurrent with both operations in the first thread 23 * ``` 24 * 25 * **Note 2: When this queue is used with multiple consumers (`singleConsumer == false`) this it is NOT lock-free.** 26 * In particular, consumer spins until producer finishes its operation in the case of near-empty queue. 27 * It is a very short window that could manifest itself rarely and only under specific load conditions, 28 * but it still deprives this algorithm of its lock-freedom. 29 */ 30 internal open class LockFreeTaskQueue<E : Any>( 31 singleConsumer: Boolean // true when there is only a single consumer (slightly faster & lock-free) 32 ) { 33 private val _cur = atomic(Core<E>(Core.INITIAL_CAPACITY, singleConsumer)) 34 35 // Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false) 36 val isEmpty: Boolean get() = _cur.value.isEmpty 37 val size: Int get() = _cur.value.size 38 39 fun close() { 40 _cur.loop { cur -> 41 if (cur.close()) return // closed this copy 42 _cur.compareAndSet(cur, cur.next()) // move to next 43 } 44 } 45 46 fun addLast(element: E): Boolean { 47 _cur.loop { cur -> 48 when (cur.addLast(element)) { 49 Core.ADD_SUCCESS -> return true 50 Core.ADD_CLOSED -> return false 51 Core.ADD_FROZEN -> _cur.compareAndSet(cur, cur.next()) // move to next 52 } 53 } 54 } 55 56 @Suppress("UNCHECKED_CAST") 57 fun removeFirstOrNull(): E? = removeFirstOrNullIf { true } 58 59 @Suppress("UNCHECKED_CAST") 60 inline fun removeFirstOrNullIf(predicate: (E) -> Boolean): E? { 61 _cur.loop { cur -> 62 val result = cur.removeFirstOrNullIf(predicate) 63 if (result !== Core.REMOVE_FROZEN) return result as E? 64 _cur.compareAndSet(cur, cur.next()) 65 } 66 } 67 68 // Used for validation in tests only 69 fun <R> map(transform: (E) -> R): List<R> = _cur.value.map(transform) 70 71 // Used for validation in tests only 72 fun isClosed(): Boolean = _cur.value.isClosed() 73 } 74 75 /** 76 * Lock-free Multiply-Producer xxx-Consumer Queue core. 77 * @see LockFreeTaskQueue 78 */ 79 internal class LockFreeTaskQueueCore<E : Any>( 80 private val capacity: Int, 81 private val singleConsumer: Boolean // true when there is only a single consumer (slightly faster) 82 ) { 83 private val mask = capacity - 1 84 private val _next = atomic<Core<E>?>(null) 85 private val _state = atomic(0L) 86 private val array = atomicArrayOfNulls<Any?>(capacity) 87 88 init { 89 check(mask <= MAX_CAPACITY_MASK) 90 check(capacity and mask == 0) 91 } 92 93 // Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false) headnull94 val isEmpty: Boolean get() = _state.value.withState { head, tail -> head == tail } headnull95 val size: Int get() = _state.value.withState { head, tail -> (tail - head) and MAX_CAPACITY_MASK } 96 closenull97 fun close(): Boolean { 98 _state.update { state -> 99 if (state and CLOSED_MASK != 0L) return true // ok - already closed 100 if (state and FROZEN_MASK != 0L) return false // frozen -- try next 101 state or CLOSED_MASK // try set closed bit 102 } 103 return true 104 } 105 106 // ADD_CLOSED | ADD_FROZEN | ADD_SUCCESS addLastnull107 fun addLast(element: E): Int { 108 _state.loop { state -> 109 if (state and (FROZEN_MASK or CLOSED_MASK) != 0L) return state.addFailReason() // cannot add 110 state.withState { head, tail -> 111 val mask = this.mask // manually move instance field to local for performance 112 // If queue is Single-Consumer then there could be one element beyond head that we cannot overwrite, 113 // so we check for full queue with an extra margin of one element 114 if ((tail + 2) and mask == head and mask) return ADD_FROZEN // overfull, so do freeze & copy 115 // If queue is Multi-Consumer then the consumer could still have not cleared element 116 // despite the above check for one free slot. 117 if (!singleConsumer && array[tail and mask].value != null) { 118 // There are two options in this situation 119 // 1. Spin-wait until consumer clears the slot 120 // 2. Freeze & resize to avoid spinning 121 // We use heuristic here to avoid memory-overallocation 122 // Freeze & reallocate when queue is small or more than half of the queue is used 123 if (capacity < MIN_ADD_SPIN_CAPACITY || (tail - head) and MAX_CAPACITY_MASK > capacity shr 1) { 124 return ADD_FROZEN 125 } 126 // otherwise spin 127 return@loop 128 } 129 val newTail = (tail + 1) and MAX_CAPACITY_MASK 130 if (_state.compareAndSet(state, state.updateTail(newTail))) { 131 // successfully added 132 array[tail and mask].value = element 133 // could have been frozen & copied before this item was set -- correct it by filling placeholder 134 var cur = this 135 while(true) { 136 if (cur._state.value and FROZEN_MASK == 0L) break // all fine -- not frozen yet 137 cur = cur.next().fillPlaceholder(tail, element) ?: break 138 } 139 return ADD_SUCCESS // added successfully 140 } 141 } 142 } 143 } 144 fillPlaceholdernull145 private fun fillPlaceholder(index: Int, element: E): Core<E>? { 146 val old = array[index and mask].value 147 /* 148 * addLast actions: 149 * 1) Commit tail slot 150 * 2) Write element to array slot 151 * 3) Check for array copy 152 * 153 * If copy happened between 2 and 3 then the consumer might have consumed our element, 154 * then another producer might have written its placeholder in our slot, so we should 155 * perform *unique* check that current placeholder is our to avoid overwriting another producer placeholder 156 */ 157 if (old is Placeholder && old.index == index) { 158 array[index and mask].value = element 159 // we've corrected missing element, should check if that propagated to further copies, just in case 160 return this 161 } 162 // it is Ok, no need for further action 163 return null 164 } 165 166 // REMOVE_FROZEN | null (EMPTY) | E (SUCCESS) <lambda>null167 fun removeFirstOrNull(): Any? = removeFirstOrNullIf { true } 168 169 // REMOVE_FROZEN | null (EMPTY) | E (SUCCESS) removeFirstOrNullIfnull170 inline fun removeFirstOrNullIf(predicate: (E) -> Boolean): Any? { 171 _state.loop { state -> 172 if (state and FROZEN_MASK != 0L) return REMOVE_FROZEN // frozen -- cannot modify 173 state.withState { head, tail -> 174 if ((tail and mask) == (head and mask)) return null // empty 175 val element = array[head and mask].value 176 if (element == null) { 177 // If queue is Single-Consumer, then element == null only when add has not finished yet 178 if (singleConsumer) return null // consider it not added yet 179 // retry (spin) until consumer adds it 180 return@loop 181 } 182 // element == Placeholder can only be when add has not finished yet 183 if (element is Placeholder) return null // consider it not added yet 184 // now we tentative know element to remove -- check predicate 185 @Suppress("UNCHECKED_CAST") 186 if (!predicate(element as E)) return null 187 // we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster 188 val newHead = (head + 1) and MAX_CAPACITY_MASK 189 if (_state.compareAndSet(state, state.updateHead(newHead))) { 190 // Array could have been copied by another thread and it is perfectly fine, since only elements 191 // between head and tail were copied and there are no extra steps we should take here 192 array[head and mask].value = null // now can safely put null (state was updated) 193 return element // successfully removed in fast-path 194 } 195 // Multi-Consumer queue must retry this loop on CAS failure (another consumer might have removed element) 196 if (!singleConsumer) return@loop 197 // Single-consumer queue goes to slow-path for remove in case of interference 198 var cur = this 199 while (true) { 200 @Suppress("UNUSED_VALUE") 201 cur = cur.removeSlowPath(head, newHead) ?: return element 202 } 203 } 204 } 205 } 206 removeSlowPathnull207 private fun removeSlowPath(oldHead: Int, newHead: Int): Core<E>? { 208 _state.loop { state -> 209 state.withState { head, _ -> 210 assert { head == oldHead } // "This queue can have only one consumer" 211 if (state and FROZEN_MASK != 0L) { 212 // state was already frozen, so removed element was copied to next 213 return next() // continue to correct head in next 214 } 215 if (_state.compareAndSet(state, state.updateHead(newHead))) { 216 array[head and mask].value = null // now can safely put null (state was updated) 217 return null 218 } 219 } 220 } 221 } 222 nextnull223 fun next(): LockFreeTaskQueueCore<E> = allocateOrGetNextCopy(markFrozen()) 224 225 private fun markFrozen(): Long = 226 _state.updateAndGet { state -> 227 if (state and FROZEN_MASK != 0L) return state // already marked 228 state or FROZEN_MASK 229 } 230 allocateOrGetNextCopynull231 private fun allocateOrGetNextCopy(state: Long): Core<E> { 232 _next.loop { next -> 233 if (next != null) return next // already allocated & copied 234 _next.compareAndSet(null, allocateNextCopy(state)) 235 } 236 } 237 allocateNextCopynull238 private fun allocateNextCopy(state: Long): Core<E> { 239 val next = LockFreeTaskQueueCore<E>(capacity * 2, singleConsumer) 240 state.withState { head, tail -> 241 var index = head 242 while (index and mask != tail and mask) { 243 // replace nulls with placeholders on copy 244 val value = array[index and mask].value ?: Placeholder(index) 245 next.array[index and next.mask].value = value 246 index++ 247 } 248 next._state.value = state wo FROZEN_MASK 249 } 250 return next 251 } 252 253 // Used for validation in tests only mapnull254 fun <R> map(transform: (E) -> R): List<R> { 255 val res = ArrayList<R>(capacity) 256 _state.value.withState { head, tail -> 257 var index = head 258 while (index and mask != tail and mask) { 259 // replace nulls with placeholders on copy 260 val element = array[index and mask].value 261 @Suppress("UNCHECKED_CAST") 262 if (element != null && element !is Placeholder) res.add(transform(element as E)) 263 index++ 264 } 265 } 266 return res 267 } 268 269 // Used for validation in tests only isClosednull270 fun isClosed(): Boolean = _state.value and CLOSED_MASK != 0L 271 272 273 // Instance of this class is placed into array when we have to copy array, but addLast is in progress -- 274 // it had already reserved a slot in the array (with null) and have not yet put its value there. 275 // Placeholder keeps the actual index (not masked) to distinguish placeholders on different wraparounds of array 276 // Internal because of inlining 277 internal class Placeholder(@JvmField val index: Int) 278 279 @Suppress("PrivatePropertyName", "MemberVisibilityCanBePrivate") 280 internal companion object { 281 const val INITIAL_CAPACITY = 8 282 283 const val CAPACITY_BITS = 30 284 const val MAX_CAPACITY_MASK = (1 shl CAPACITY_BITS) - 1 285 const val HEAD_SHIFT = 0 286 const val HEAD_MASK = MAX_CAPACITY_MASK.toLong() shl HEAD_SHIFT 287 const val TAIL_SHIFT = HEAD_SHIFT + CAPACITY_BITS 288 const val TAIL_MASK = MAX_CAPACITY_MASK.toLong() shl TAIL_SHIFT 289 290 const val FROZEN_SHIFT = TAIL_SHIFT + CAPACITY_BITS 291 const val FROZEN_MASK = 1L shl FROZEN_SHIFT 292 const val CLOSED_SHIFT = FROZEN_SHIFT + 1 293 const val CLOSED_MASK = 1L shl CLOSED_SHIFT 294 295 const val MIN_ADD_SPIN_CAPACITY = 1024 296 297 @JvmField val REMOVE_FROZEN = Symbol("REMOVE_FROZEN") 298 299 const val ADD_SUCCESS = 0 300 const val ADD_FROZEN = 1 301 const val ADD_CLOSED = 2 302 303 infix fun Long.wo(other: Long) = this and other.inv() 304 fun Long.updateHead(newHead: Int) = (this wo HEAD_MASK) or (newHead.toLong() shl HEAD_SHIFT) 305 fun Long.updateTail(newTail: Int) = (this wo TAIL_MASK) or (newTail.toLong() shl TAIL_SHIFT) 306 307 inline fun <T> Long.withState(block: (head: Int, tail: Int) -> T): T { 308 val head = ((this and HEAD_MASK) shr HEAD_SHIFT).toInt() 309 val tail = ((this and TAIL_MASK) shr TAIL_SHIFT).toInt() 310 return block(head, tail) 311 } 312 313 // FROZEN | CLOSED 314 fun Long.addFailReason(): Int = if (this and CLOSED_MASK != 0L) ADD_CLOSED else ADD_FROZEN 315 } 316 }