1 /* 2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.scheduling 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.* 9 import java.util.concurrent.atomic.* 10 11 internal const val BUFFER_CAPACITY_BASE = 7 12 internal const val BUFFER_CAPACITY = 1 shl BUFFER_CAPACITY_BASE 13 internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default 14 15 internal const val TASK_STOLEN = -1L 16 internal const val NOTHING_TO_STEAL = -2L 17 18 /** 19 * Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity. 20 * At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue) 21 * and any amount of consumers, other pool workers which are trying to steal work. 22 * 23 * ### Fairness 24 * 25 * [WorkQueue] provides semi-FIFO order, but with priority for most recently submitted task assuming 26 * that these two (current one and submitted) are communicating and sharing state thus making such communication extremely fast. 27 * E.g. submitted jobs [1, 2, 3, 4] will be executed in [4, 1, 2, 3] order. 28 * 29 * ### Algorithm and implementation details 30 * This is a regular SPMC bounded queue with the additional property that tasks can be removed from the middle of the queue 31 * (scheduler workers without a CPU permit steal blocking tasks via this mechanism). Such property enforces us to use CAS in 32 * order to properly claim value from the buffer. 33 * Moreover, [Task] objects are reusable, so it may seem that this queue is prone to ABA problem. 34 * Indeed it formally has ABA-problem, but the whole processing logic is written in the way that such ABA is harmless. 35 * I have discovered a truly marvelous proof of this, which this KDoc is too narrow to contain. 36 */ 37 internal class WorkQueue { 38 39 /* 40 * We read two independent counter here. 41 * Producer index is incremented only by owner 42 * Consumer index is incremented both by owner and external threads 43 * 44 * The only harmful race is: 45 * [T1] readProducerIndex (1) preemption(2) readConsumerIndex(5) 46 * [T2] changeProducerIndex (3) 47 * [T3] changeConsumerIndex (4) 48 * 49 * Which can lead to resulting size bigger than actual size at any moment of time. 50 * This is in general harmless because steal will be blocked by timer 51 */ 52 internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value 53 internal val size: Int get() = if (lastScheduledTask.value != null) bufferSize + 1 else bufferSize 54 private val buffer: AtomicReferenceArray<Task?> = AtomicReferenceArray(BUFFER_CAPACITY) 55 private val lastScheduledTask = atomic<Task?>(null) 56 57 private val producerIndex = atomic(0) 58 private val consumerIndex = atomic(0) 59 // Shortcut to avoid scanning queue without blocking tasks 60 private val blockingTasksInBuffer = atomic(0) 61 62 /** 63 * Retrieves and removes task from the head of the queue 64 * Invariant: this method is called only by the owner of the queue. 65 */ pollnull66 fun poll(): Task? = lastScheduledTask.getAndSet(null) ?: pollBuffer() 67 68 /** 69 * Invariant: Called only by the owner of the queue, returns 70 * `null` if task was added, task that wasn't added otherwise. 71 */ 72 fun add(task: Task, fair: Boolean = false): Task? { 73 if (fair) return addLast(task) 74 val previous = lastScheduledTask.getAndSet(task) ?: return null 75 return addLast(previous) 76 } 77 78 /** 79 * Invariant: Called only by the owner of the queue, returns 80 * `null` if task was added, task that wasn't added otherwise. 81 */ addLastnull82 private fun addLast(task: Task): Task? { 83 if (task.isBlocking) blockingTasksInBuffer.incrementAndGet() 84 if (bufferSize == BUFFER_CAPACITY - 1) return task 85 val nextIndex = producerIndex.value and MASK 86 /* 87 * If current element is not null then we're racing with a really slow consumer that committed the consumer index, 88 * but hasn't yet nulled out the slot, effectively preventing us from using it. 89 * Such situations are very rare in practise (although possible) and we decided to give up a progress guarantee 90 * to have a stronger invariant "add to queue with bufferSize == 0 is always successful". 91 * This algorithm can still be wait-free for add, but if and only if tasks are not reusable, otherwise 92 * nulling out the buffer wouldn't be possible. 93 */ 94 while (buffer[nextIndex] != null) { 95 Thread.yield() 96 } 97 buffer.lazySet(nextIndex, task) 98 producerIndex.incrementAndGet() 99 return null 100 } 101 102 /** 103 * Tries stealing from [victim] queue into this queue. 104 * 105 * Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen 106 * or positive value of how many nanoseconds should pass until the head of this queue will be available to steal. 107 */ tryStealFromnull108 fun tryStealFrom(victim: WorkQueue): Long { 109 assert { bufferSize == 0 } 110 val task = victim.pollBuffer() 111 if (task != null) { 112 val notAdded = add(task) 113 assert { notAdded == null } 114 return TASK_STOLEN 115 } 116 return tryStealLastScheduled(victim, blockingOnly = false) 117 } 118 tryStealBlockingFromnull119 fun tryStealBlockingFrom(victim: WorkQueue): Long { 120 assert { bufferSize == 0 } 121 var start = victim.consumerIndex.value 122 val end = victim.producerIndex.value 123 val buffer = victim.buffer 124 125 while (start != end) { 126 val index = start and MASK 127 if (victim.blockingTasksInBuffer.value == 0) break 128 val value = buffer[index] 129 if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) { 130 victim.blockingTasksInBuffer.decrementAndGet() 131 add(value) 132 return TASK_STOLEN 133 } else { 134 ++start 135 } 136 } 137 return tryStealLastScheduled(victim, blockingOnly = true) 138 } 139 offloadAllWorkTonull140 fun offloadAllWorkTo(globalQueue: GlobalQueue) { 141 lastScheduledTask.getAndSet(null)?.let { globalQueue.addLast(it) } 142 while (pollTo(globalQueue)) { 143 // Steal everything 144 } 145 } 146 147 /** 148 * Contract on return value is the same as for [tryStealFrom] 149 */ tryStealLastSchedulednull150 private fun tryStealLastScheduled(victim: WorkQueue, blockingOnly: Boolean): Long { 151 while (true) { 152 val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL 153 if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL 154 155 // TODO time wraparound ? 156 val time = schedulerTimeSource.nanoTime() 157 val staleness = time - lastScheduled.submissionTime 158 if (staleness < WORK_STEALING_TIME_RESOLUTION_NS) { 159 return WORK_STEALING_TIME_RESOLUTION_NS - staleness 160 } 161 162 /* 163 * If CAS has failed, either someone else had stolen this task or the owner executed this task 164 * and dispatched another one. In the latter case we should retry to avoid missing task. 165 */ 166 if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) { 167 add(lastScheduled) 168 return TASK_STOLEN 169 } 170 continue 171 } 172 } 173 pollTonull174 private fun pollTo(queue: GlobalQueue): Boolean { 175 val task = pollBuffer() ?: return false 176 queue.addLast(task) 177 return true 178 } 179 pollBuffernull180 private fun pollBuffer(): Task? { 181 while (true) { 182 val tailLocal = consumerIndex.value 183 if (tailLocal - producerIndex.value == 0) return null 184 val index = tailLocal and MASK 185 if (consumerIndex.compareAndSet(tailLocal, tailLocal + 1)) { 186 // Nulls are allowed when blocking tasks are stolen from the middle of the queue. 187 val value = buffer.getAndSet(index, null) ?: continue 188 value.decrementIfBlocking() 189 return value 190 } 191 } 192 } 193 Tasknull194 private fun Task?.decrementIfBlocking() { 195 if (this != null && isBlocking) { 196 val value = blockingTasksInBuffer.decrementAndGet() 197 assert { value >= 0 } 198 } 199 } 200 } 201