• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.scheduling
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import java.util.concurrent.atomic.*
10 
11 internal const val BUFFER_CAPACITY_BASE = 7
12 internal const val BUFFER_CAPACITY = 1 shl BUFFER_CAPACITY_BASE
13 internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
14 
15 internal const val TASK_STOLEN = -1L
16 internal const val NOTHING_TO_STEAL = -2L
17 
18 /**
19  * Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity.
20  * At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue)
21  * and any amount of consumers, other pool workers which are trying to steal work.
22  *
23  * ### Fairness
24  *
25  * [WorkQueue] provides semi-FIFO order, but with priority for most recently submitted task assuming
26  * that these two (current one and submitted) are communicating and sharing state thus making such communication extremely fast.
27  * E.g. submitted jobs [1, 2, 3, 4] will be executed in [4, 1, 2, 3] order.
28  *
29  * ### Algorithm and implementation details
30  * This is a regular SPMC bounded queue with the additional property that tasks can be removed from the middle of the queue
31  * (scheduler workers without a CPU permit steal blocking tasks via this mechanism). Such property enforces us to use CAS in
32  * order to properly claim value from the buffer.
33  * Moreover, [Task] objects are reusable, so it may seem that this queue is prone to ABA problem.
34  * Indeed it formally has ABA-problem, but the whole processing logic is written in the way that such ABA is harmless.
35  * I have discovered a truly marvelous proof of this, which this KDoc is too narrow to contain.
36  */
37 internal class WorkQueue {
38 
39     /*
40      * We read two independent counter here.
41      * Producer index is incremented only by owner
42      * Consumer index is incremented both by owner and external threads
43      *
44      * The only harmful race is:
45      * [T1] readProducerIndex (1) preemption(2) readConsumerIndex(5)
46      * [T2] changeProducerIndex (3)
47      * [T3] changeConsumerIndex (4)
48      *
49      * Which can lead to resulting size bigger than actual size at any moment of time.
50      * This is in general harmless because steal will be blocked by timer
51      */
52     internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value
53     internal val size: Int get() = if (lastScheduledTask.value != null) bufferSize + 1 else bufferSize
54     private val buffer: AtomicReferenceArray<Task?> = AtomicReferenceArray(BUFFER_CAPACITY)
55     private val lastScheduledTask = atomic<Task?>(null)
56 
57     private val producerIndex = atomic(0)
58     private val consumerIndex = atomic(0)
59     // Shortcut to avoid scanning queue without blocking tasks
60     private val blockingTasksInBuffer = atomic(0)
61 
62     /**
63      * Retrieves and removes task from the head of the queue
64      * Invariant: this method is called only by the owner of the queue.
65      */
pollnull66     fun poll(): Task? = lastScheduledTask.getAndSet(null) ?: pollBuffer()
67 
68     /**
69      * Invariant: Called only by the owner of the queue, returns
70      * `null` if task was added, task that wasn't added otherwise.
71      */
72     fun add(task: Task, fair: Boolean = false): Task? {
73         if (fair) return addLast(task)
74         val previous = lastScheduledTask.getAndSet(task) ?: return null
75         return addLast(previous)
76     }
77 
78     /**
79      * Invariant: Called only by the owner of the queue, returns
80      * `null` if task was added, task that wasn't added otherwise.
81      */
addLastnull82     private fun addLast(task: Task): Task? {
83         if (task.isBlocking) blockingTasksInBuffer.incrementAndGet()
84         if (bufferSize == BUFFER_CAPACITY - 1) return task
85         val nextIndex = producerIndex.value and MASK
86         /*
87          * If current element is not null then we're racing with a really slow consumer that committed the consumer index,
88          * but hasn't yet nulled out the slot, effectively preventing us from using it.
89          * Such situations are very rare in practise (although possible) and we decided to give up a progress guarantee
90          * to have a stronger invariant "add to queue with bufferSize == 0 is always successful".
91          * This algorithm can still be wait-free for add, but if and only if tasks are not reusable, otherwise
92          * nulling out the buffer wouldn't be possible.
93          */
94         while (buffer[nextIndex] != null) {
95             Thread.yield()
96         }
97         buffer.lazySet(nextIndex, task)
98         producerIndex.incrementAndGet()
99         return null
100     }
101 
102     /**
103      * Tries stealing from [victim] queue into this queue.
104      *
105      * Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen
106      * or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
107      */
tryStealFromnull108     fun tryStealFrom(victim: WorkQueue): Long {
109         assert { bufferSize == 0 }
110         val task  = victim.pollBuffer()
111         if (task != null) {
112             val notAdded = add(task)
113             assert { notAdded == null }
114             return TASK_STOLEN
115         }
116         return tryStealLastScheduled(victim, blockingOnly = false)
117     }
118 
tryStealBlockingFromnull119     fun tryStealBlockingFrom(victim: WorkQueue): Long {
120         assert { bufferSize == 0 }
121         var start = victim.consumerIndex.value
122         val end = victim.producerIndex.value
123         val buffer = victim.buffer
124 
125         while (start != end) {
126             val index = start and MASK
127             if (victim.blockingTasksInBuffer.value == 0) break
128             val value = buffer[index]
129             if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) {
130                 victim.blockingTasksInBuffer.decrementAndGet()
131                 add(value)
132                 return TASK_STOLEN
133             } else {
134                 ++start
135             }
136         }
137         return tryStealLastScheduled(victim, blockingOnly = true)
138     }
139 
offloadAllWorkTonull140     fun offloadAllWorkTo(globalQueue: GlobalQueue) {
141         lastScheduledTask.getAndSet(null)?.let { globalQueue.addLast(it) }
142         while (pollTo(globalQueue)) {
143             // Steal everything
144         }
145     }
146 
147     /**
148      * Contract on return value is the same as for [tryStealFrom]
149      */
tryStealLastSchedulednull150     private fun tryStealLastScheduled(victim: WorkQueue, blockingOnly: Boolean): Long {
151         while (true) {
152             val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL
153             if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL
154 
155             // TODO time wraparound ?
156             val time = schedulerTimeSource.nanoTime()
157             val staleness = time - lastScheduled.submissionTime
158             if (staleness < WORK_STEALING_TIME_RESOLUTION_NS) {
159                 return WORK_STEALING_TIME_RESOLUTION_NS - staleness
160             }
161 
162             /*
163              * If CAS has failed, either someone else had stolen this task or the owner executed this task
164              * and dispatched another one. In the latter case we should retry to avoid missing task.
165              */
166             if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) {
167                 add(lastScheduled)
168                 return TASK_STOLEN
169             }
170             continue
171         }
172     }
173 
pollTonull174     private fun pollTo(queue: GlobalQueue): Boolean {
175         val task = pollBuffer() ?: return false
176         queue.addLast(task)
177         return true
178     }
179 
pollBuffernull180     private fun pollBuffer(): Task? {
181         while (true) {
182             val tailLocal = consumerIndex.value
183             if (tailLocal - producerIndex.value == 0) return null
184             val index = tailLocal and MASK
185             if (consumerIndex.compareAndSet(tailLocal, tailLocal + 1)) {
186                 // Nulls are allowed when blocking tasks are stolen from the middle of the queue.
187                 val value = buffer.getAndSet(index, null) ?: continue
188                 value.decrementIfBlocking()
189                 return value
190             }
191         }
192     }
193 
Tasknull194     private fun Task?.decrementIfBlocking() {
195         if (this != null && isBlocking) {
196             val value = blockingTasksInBuffer.decrementAndGet()
197             assert { value >= 0 }
198         }
199     }
200 }
201