• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.scheduling
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.internal.*
10 import java.io.*
11 import java.util.concurrent.*
12 import java.util.concurrent.locks.*
13 import kotlin.jvm.internal.Ref.ObjectRef
14 import kotlin.math.*
15 import kotlin.random.*
16 
17 /**
18  * Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines
19  * over worker threads, including both CPU-intensive and blocking tasks, in the most efficient manner.
20  *
21  * Current scheduler implementation has two optimization targets:
22  * * Efficiency in the face of communication patterns (e.g. actors communicating via channel)
23  * * Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool.
24  *
25  * ### Structural overview
26  *
27  * Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to
28  * [maxPoolSize] lazily created  threads to execute blocking tasks.
29  * Every worker has a local queue in addition to a global scheduler queue
30  * and the global queue has priority over local queue to avoid starvation of externally-submitted
31  * (e.g. from Android UI thread) tasks.
32  * Work-stealing is implemented on top of that queues to provide
33  * even load distribution and illusion of centralized run queue.
34  *
35  * ### Scheduling policy
36  *
37  * When a coroutine is dispatched from within a scheduler worker, it's placed into the head of worker run queue.
38  * If the head is not empty, the task from the head is moved to the tail. Though it is an unfair scheduling policy,
39  * it effectively couples communicating coroutines into one and eliminates scheduling latency
40  * that arises from placing tasks to the end of the queue.
41  * Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to stack.
42  * When a coroutine is dispatched from an external thread, it's put into the global queue.
43  * The original idea with a single-slot LIFO buffer comes from Golang runtime scheduler by D. Vyukov.
44  * It was proven to be "fair enough", performant and generally well accepted and initially was a significant inspiration
45  * source for the coroutine scheduler.
46  *
47  * ### Work stealing and affinity
48  *
49  * To provide even tasks distribution worker tries to steal tasks from other workers queues
50  * before parking when his local queue is empty.
51  * A non-standard solution is implemented to provide tasks affinity: a task from FIFO buffer may be stolen
52  * only if it is stale enough based on the value of [WORK_STEALING_TIME_RESOLUTION_NS].
53  * For this purpose, monotonic global clock is used, and every task has associated with its submission time.
54  * This approach shows outstanding results when coroutines are cooperative,
55  * but as downside scheduler now depends on a high-resolution global clock,
56  * which may limit scalability on NUMA machines. Tasks from LIFO buffer can be stolen on a regular basis.
57  *
58  * ### Thread management
59  * One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees
60  * similar to the regular centralized executors.
61  * The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
62  * The former field incorporates the amount of created threads, CPU-tokens and blocking tasks
63  * that require a thread compensation,
64  * while the latter represents intrusive versioned Treiber stack of idle workers.
65  * When a worker cannot find any work, they first add themselves to the stack,
66  * then re-scans the queue to avoid missing signals and then attempts to park
67  * with additional rendezvous against unnecessary parking.
68  * If a worker finds a task that it cannot yet steal due to time constraints, it stores this fact in its state
69  * (to be uncounted when additional work is signalled) and parks for such duration.
70  *
71  * When a new task arrives in the scheduler (whether it is local or global queue),
72  * either an idle worker is being signalled, or a new worker is attempted to be created.
73  * (Only [corePoolSize] workers can be created for regular CPU tasks)
74  *
75  * ### Support for blocking tasks
76  * The scheduler also supports the notion of [blocking][TASK_PROBABLY_BLOCKING] tasks.
77  * When executing or enqueuing blocking tasks, the scheduler notifies or creates one more worker in
78  * addition to core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
79  * to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
80  * "CPU permits" -- [corePoolSize] special tokens that permit an arbitrary worker to execute and steal CPU-bound tasks.
81  * When worker encounters blocking tasks, it basically hands off its permit to another thread (not directly though) to
82  * keep invariant "scheduler always has at least min(pending CPU tasks, core pool size)
83  * and at most core pool size threads to execute CPU tasks".
84  * To avoid overprovision, workers without CPU permit are allowed to scan [globalBlockingQueue]
85  * and steal **only** blocking tasks from other workers.
86  *
87  * The scheduler does not limit the count of pending blocking tasks, potentially creating up to [maxPoolSize] threads.
88  * End users do not have access to the scheduler directly and can dispatch blocking tasks only with
89  * [LimitingDispatcher] that does control concurrency level by its own mechanism.
90  */
91 @Suppress("NOTHING_TO_INLINE")
92 internal class CoroutineScheduler(
93     @JvmField val corePoolSize: Int,
94     @JvmField val maxPoolSize: Int,
95     @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
96     @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
97 ) : Executor, Closeable {
98     init {
99         require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
100             "Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
101         }
102         require(maxPoolSize >= corePoolSize) {
103             "Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
104         }
105         require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
106             "Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
107         }
108         require(idleWorkerKeepAliveNs > 0) {
109             "Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
110         }
111     }
112 
113     @JvmField
114     val globalCpuQueue = GlobalQueue()
115     @JvmField
116     val globalBlockingQueue = GlobalQueue()
117 
118     private fun addToGlobalQueue(task: Task): Boolean {
119         return if (task.isBlocking) {
120             globalBlockingQueue.addLast(task)
121         } else {
122             globalCpuQueue.addLast(task)
123         }
124     }
125 
126     /**
127      * The stack of parker workers.
128      * Every worker registers itself in a stack before parking (if it was not previously registered),
129      * so it can be signalled when new tasks arrive.
130      * This is a form of intrusive garbage-free Treiber stack where [Worker] also is a stack node.
131      *
132      * The stack is better than a queue (even with the contention on top) because it unparks threads
133      * in most-recently used order, improving both performance and locality.
134      * Moreover, it decreases threads thrashing, if the pool has n threads when only n / 2 is required,
135      * the latter half will never be unparked and will terminate itself after [IDLE_WORKER_KEEP_ALIVE_NS].
136      *
137      * This long version consist of version bits with [PARKED_VERSION_MASK]
138      * and top worker thread index bits with [PARKED_INDEX_MASK].
139      */
140     private val parkedWorkersStack = atomic(0L)
141 
142     /**
143      * Updates index of the worker at the top of [parkedWorkersStack].
144      * It always updates version to ensure interference with [parkedWorkersStackPop] operation
145      * that might have already decided to put this index to the top.
146      *
147      * Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
148      */
149     fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) {
150         parkedWorkersStack.loop { top ->
151             val index = (top and PARKED_INDEX_MASK).toInt()
152             val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
153             val updIndex = if (index == oldIndex) {
154                 if (newIndex == 0) {
155                     parkedWorkersStackNextIndex(worker)
156                 } else {
157                     newIndex
158                 }
159             } else {
160                 index // no change to index, but update version
161             }
162             if (updIndex < 0) return@loop // retry
163             if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return
164         }
165     }
166 
167     /**
168      * Pushes worker into [parkedWorkersStack].
169      * It does nothing is this worker is already physically linked to the stack.
170      * This method is invoked only from the worker thread itself.
171      * This invocation always precedes [LockSupport.parkNanos].
172      * See [Worker.tryPark].
173      *
174      * Returns `true` if worker was added to the stack by this invocation, `false` if it was already
175      * registered in the stack.
176      */
177     fun parkedWorkersStackPush(worker: Worker): Boolean {
178         if (worker.nextParkedWorker !== NOT_IN_STACK) return false // already in stack, bail out
179         /*
180          * The below loop can be entered only if this worker was not in the stack and, since no other thread
181          * can add it to the stack (only the worker itself), this invariant holds while this loop executes.
182          */
183         parkedWorkersStack.loop { top ->
184             val index = (top and PARKED_INDEX_MASK).toInt()
185             val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
186             val updIndex = worker.indexInArray
187             assert { updIndex != 0 } // only this worker can push itself, cannot be terminated
188             worker.nextParkedWorker = workers[index]
189             /*
190              * Other thread can be changing this worker's index at this point, but it
191              * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail.
192              * Successful CAS of the stack top completes successful push.
193              */
194             if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return true
195         }
196     }
197 
198     /**
199      * Pops worker from [parkedWorkersStack].
200      * It can be invoked concurrently from any thread that is looking for help and needs to unpark some worker.
201      * This invocation is always followed by an attempt to [LockSupport.unpark] resulting worker.
202      * See [tryUnpark].
203      */
204     private fun parkedWorkersStackPop(): Worker? {
205         parkedWorkersStack.loop { top ->
206             val index = (top and PARKED_INDEX_MASK).toInt()
207             val worker = workers[index] ?: return null // stack is empty
208             val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
209             val updIndex = parkedWorkersStackNextIndex(worker)
210             if (updIndex < 0) return@loop // retry
211             /*
212              * Other thread can be changing this worker's index at this point, but it
213              * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail.
214              * Successful CAS of the stack top completes successful pop.
215              */
216             if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) {
217                 /*
218                  * We've just took worker out of the stack, but nextParkerWorker is not reset yet, so if a worker is
219                  * currently invoking parkedWorkersStackPush it would think it is in the stack and bail out without
220                  * adding itself again. It does not matter, since we are going it invoke unpark on the thread
221                  * that was popped out of parkedWorkersStack anyway.
222                  */
223                 worker.nextParkedWorker = NOT_IN_STACK
224                 return worker
225             }
226         }
227     }
228 
229     /**
230      * Finds next usable index for [parkedWorkersStack]. The problem is that workers can
231      * be terminated at their [Worker.indexInArray] becomes zero, so they cannot be
232      * put at the top of the stack. In which case we are looking for next.
233      *
234      * Returns `index >= 0` or `-1` for retry.
235      */
236     private fun parkedWorkersStackNextIndex(worker: Worker): Int {
237         var next = worker.nextParkedWorker
238         findNext@ while (true) {
239             when {
240                 next === NOT_IN_STACK -> return -1 // we are too late -- other thread popped this element, retry
241                 next === null -> return 0 // stack becomes empty
242                 else -> {
243                     val nextWorker = next as Worker
244                     val updIndex = nextWorker.indexInArray
245                     if (updIndex != 0) return updIndex // found good index for next worker
246                     // Otherwise, this worker was terminated and we cannot put it to top anymore, check next
247                     next = nextWorker.nextParkedWorker
248                 }
249             }
250         }
251     }
252 
253     /**
254      * State of worker threads.
255      * [workers] is a dynamic array of lazily created workers up to [maxPoolSize] workers.
256      * [createdWorkers] is count of already created workers (worker with index lesser than [createdWorkers] exists).
257      * [blockingTasks] is count of pending (either in the queue or being executed) blocking tasks.
258      *
259      * Workers array is also used as a lock for workers' creation and termination sequence.
260      *
261      * **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value), so
262      * workers are 1-indexed, code path in [Worker.trySteal] is a bit faster and index swap during termination
263      * works properly.
264      *
265      * Initial size is `Dispatchers.Default` size * 2 to prevent unnecessary resizes for slightly or steadily loaded
266      * applications.
267      */
268     @JvmField
269     val workers = ResizableAtomicArray<Worker>((corePoolSize + 1) * 2)
270 
271     /**
272      * The `Long` value describing the state of workers in this pool.
273      * Currently, includes created, CPU-acquired, and blocking workers, each occupying [BLOCKING_SHIFT] bits.
274      *
275      * State layout (highest to lowest bits):
276      * | --- number of cpu permits, 22 bits ---  | --- number of blocking tasks, 21 bits ---  | --- number of created threads, 21 bits ---  |
277      */
278     private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
279 
280     private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
281     private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
282 
283     private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
284     private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
285     inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
286 
287     // Guarded by synchronization
288     private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
289     private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
290 
291     private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
292 
293     private inline fun decrementBlockingTasks() {
294         controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
295     }
296 
297     private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
298         val available = availableCpuPermits(state)
299         if (available == 0) return false
300         val update = state - (1L shl CPU_PERMITS_SHIFT)
301         if (controlState.compareAndSet(state, update)) return true
302     }
303 
304     private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
305 
306     // This is used a "stop signal" for close and shutdown functions
307     private val _isTerminated = atomic(false)
308     val isTerminated: Boolean get() = _isTerminated.value
309 
310     companion object {
311         // A symbol to mark workers that are not in parkedWorkersStack
312         @JvmField
313         val NOT_IN_STACK = Symbol("NOT_IN_STACK")
314 
315         // Worker ctl states
316         private const val PARKED = -1
317         private const val CLAIMED = 0
318         private const val TERMINATED = 1
319 
320         // Masks of control state
321         private const val BLOCKING_SHIFT = 21 // 2M threads max
322         private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
323         private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
324         private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
325         private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT
326 
327         internal const val MIN_SUPPORTED_POOL_SIZE = 1 // we support 1 for test purposes, but it is not usually used
328         internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
329 
330         // Masks of parkedWorkersStack
331         private const val PARKED_INDEX_MASK = CREATED_MASK
332         private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
333         private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
334     }
335 
336     override fun execute(command: Runnable) = dispatch(command)
337 
338     override fun close() = shutdown(10_000L)
339 
340     // Shuts down current scheduler and waits until all work is done and all threads are stopped.
341     fun shutdown(timeout: Long) {
342         // atomically set termination flag which is checked when workers are added or removed
343         if (!_isTerminated.compareAndSet(false, true)) return
344         // make sure we are not waiting for the current thread
345         val currentWorker = currentWorker()
346         // Capture # of created workers that cannot change anymore (mind the synchronized block!)
347         val created = synchronized(workers) { createdWorkers }
348         // Shutdown all workers with the only exception of the current thread
349         for (i in 1..created) {
350             val worker = workers[i]!!
351             if (worker !== currentWorker) {
352                 while (worker.isAlive) {
353                     LockSupport.unpark(worker)
354                     worker.join(timeout)
355                 }
356                 val state = worker.state
357                 assert { state === WorkerState.TERMINATED } // Expected TERMINATED state
358                 worker.localQueue.offloadAllWorkTo(globalBlockingQueue) // Doesn't actually matter which queue to use
359             }
360         }
361         // Make sure no more work is added to GlobalQueue from anywhere
362         globalBlockingQueue.close()
363         globalCpuQueue.close()
364         // Finish processing tasks from globalQueue and/or from this worker's local queue
365         while (true) {
366             val task = currentWorker?.findTask(true)
367                 ?: globalCpuQueue.removeFirstOrNull()
368                 ?: globalBlockingQueue.removeFirstOrNull()
369                 ?: break
370             runSafely(task)
371         }
372         // Shutdown current thread
373         currentWorker?.tryReleaseCpu(WorkerState.TERMINATED)
374         // check & cleanup state
375         assert { availableCpuPermits == corePoolSize }
376         parkedWorkersStack.value = 0L
377         controlState.value = 0L
378     }
379 
380     /**
381      * Dispatches execution of a runnable [block] with a hint to a scheduler whether
382      * this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
383      *
384      * [taskContext] -- concurrency context of given [block].
385      * [tailDispatch] -- whether this [dispatch] call is the last action the (presumably) worker thread does in its current task.
386      * If `true`, then  the task will be dispatched in a FIFO manner and no additional workers will be requested,
387      * but only if the current thread is a corresponding worker thread.
388      * Note that caller cannot be ensured that it is being executed on worker thread for the following reasons:
389      *   * [CoroutineStart.UNDISPATCHED]
390      *   * Concurrent [close] that effectively shutdowns the worker thread
391      */
392     fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
393         trackTask() // this is needed for virtual time support
394         val task = createTask(block, taskContext)
395         val isBlockingTask = task.isBlocking
396         // Invariant: we increment counter **before** publishing the task
397         // so executing thread can safely decrement the number of blocking tasks
398         val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0
399         // try to submit the task to the local queue and act depending on the result
400         val currentWorker = currentWorker()
401         val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
402         if (notAdded != null) {
403             if (!addToGlobalQueue(notAdded)) {
404                 // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
405                 throw RejectedExecutionException("$schedulerName was terminated")
406             }
407         }
408         val skipUnpark = tailDispatch && currentWorker != null
409         // Checking 'task' instead of 'notAdded' is completely okay
410         if (isBlockingTask) {
411             // Use state snapshot to better estimate the number of running threads
412             signalBlockingWork(stateSnapshot, skipUnpark = skipUnpark)
413         } else {
414             if (skipUnpark) return
415             signalCpuWork()
416         }
417     }
418 
419     fun createTask(block: Runnable, taskContext: TaskContext): Task {
420         val nanoTime = schedulerTimeSource.nanoTime()
421         if (block is Task) {
422             block.submissionTime = nanoTime
423             block.taskContext = taskContext
424             return block
425         }
426         return TaskImpl(block, nanoTime, taskContext)
427     }
428 
429     // NB: should only be called from 'dispatch' method due to blocking tasks increment
430     private fun signalBlockingWork(stateSnapshot: Long, skipUnpark: Boolean) {
431         if (skipUnpark) return
432         if (tryUnpark()) return
433         // Use state snapshot to avoid accidental thread overprovision
434         if (tryCreateWorker(stateSnapshot)) return
435         tryUnpark() // Try unpark again in case there was race between permit release and parking
436     }
437 
438     fun signalCpuWork() {
439         if (tryUnpark()) return
440         if (tryCreateWorker()) return
441         tryUnpark()
442     }
443 
444     private fun tryCreateWorker(state: Long = controlState.value): Boolean {
445         val created = createdWorkers(state)
446         val blocking = blockingTasks(state)
447         val cpuWorkers = (created - blocking).coerceAtLeast(0)
448         /*
449          * We check how many threads are there to handle non-blocking work,
450          * and create one more if we have not enough of them.
451          */
452         if (cpuWorkers < corePoolSize) {
453             val newCpuWorkers = createNewWorker()
454             // If we've created the first cpu worker and corePoolSize > 1 then create
455             // one more (second) cpu worker, so that stealing between them is operational
456             if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
457             if (newCpuWorkers > 0) return true
458         }
459         return false
460     }
461 
462     private fun tryUnpark(): Boolean {
463         while (true) {
464             val worker = parkedWorkersStackPop() ?: return false
465             if (worker.workerCtl.compareAndSet(PARKED, CLAIMED)) {
466                 LockSupport.unpark(worker)
467                 return true
468             }
469         }
470     }
471 
472     /**
473      * Returns the number of CPU workers after this function (including new worker) or
474      * 0 if no worker was created.
475      */
476     private fun createNewWorker(): Int {
477         val worker: Worker
478         return synchronized(workers) {
479             // Make sure we're not trying to resurrect terminated scheduler
480             if (isTerminated) return -1
481             val state = controlState.value
482             val created = createdWorkers(state)
483             val blocking = blockingTasks(state)
484             val cpuWorkers = (created - blocking).coerceAtLeast(0)
485             // Double check for overprovision
486             if (cpuWorkers >= corePoolSize) return 0
487             if (created >= maxPoolSize) return 0
488             // start & register new worker, commit index only after successful creation
489             val newIndex = createdWorkers + 1
490             require(newIndex > 0 && workers[newIndex] == null)
491             /*
492              * 1) Claim the slot (under a lock) by the newly created worker
493              * 2) Make it observable by increment created workers count
494              * 3) Only then start the worker, otherwise it may miss its own creation
495              */
496             worker = Worker(newIndex)
497             workers.setSynchronized(newIndex, worker)
498             require(newIndex == incrementCreatedWorkers())
499             cpuWorkers + 1
500         }.also { worker.start() } // Start worker when the lock is released to reduce contention, see #3652
501     }
502 
503     /**
504      * Returns `null` if task was successfully added or an instance of the
505      * task that was not added or replaced (thus should be added to global queue).
506      */
507     private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
508         if (this == null) return task
509         /*
510          * This worker could have been already terminated from this thread by close/shutdown and it should not
511          * accept any more tasks into its local queue.
512          */
513         if (state === WorkerState.TERMINATED) return task
514         // Do not add CPU tasks in local queue if we are not able to execute it
515         if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) {
516             return task
517         }
518         mayHaveLocalTasks = true
519         return localQueue.add(task, fair = tailDispatch)
520     }
521 
522     private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
523 
524     /**
525      * Returns a string identifying the state of this scheduler for nicer debugging.
526      * Note that this method is not atomic and represents rough state of pool.
527      *
528      * State of the queues:
529      * b for blocking, c for CPU, r for retiring.
530      * E.g. for [1b, 1b, 2c, 1d] means that pool has
531      * two blocking workers with queue size 1, one worker with CPU permit and queue size 1
532      * and one dormant (executing his local queue before parking) worker with queue size 1.
533      */
534     override fun toString(): String {
535         var parkedWorkers = 0
536         var blockingWorkers = 0
537         var cpuWorkers = 0
538         var dormant = 0
539         var terminated = 0
540         val queueSizes = arrayListOf<String>()
541         for (index in 1 until workers.currentLength()) {
542             val worker = workers[index] ?: continue
543             val queueSize = worker.localQueue.size
544             when (worker.state) {
545                 WorkerState.PARKING -> ++parkedWorkers
546                 WorkerState.BLOCKING -> {
547                     ++blockingWorkers
548                     queueSizes += queueSize.toString() + "b" // Blocking
549                 }
550                 WorkerState.CPU_ACQUIRED -> {
551                     ++cpuWorkers
552                     queueSizes += queueSize.toString() + "c" // CPU
553                 }
554                 WorkerState.DORMANT -> {
555                     ++dormant
556                     if (queueSize > 0) queueSizes += queueSize.toString() + "d" // Retiring
557                 }
558                 WorkerState.TERMINATED -> ++terminated
559             }
560         }
561         val state = controlState.value
562         return "$schedulerName@$hexAddress[" +
563                 "Pool Size {" +
564                     "core = $corePoolSize, " +
565                     "max = $maxPoolSize}, " +
566                 "Worker States {" +
567                     "CPU = $cpuWorkers, " +
568                     "blocking = $blockingWorkers, " +
569                     "parked = $parkedWorkers, " +
570                     "dormant = $dormant, " +
571                     "terminated = $terminated}, " +
572                 "running workers queues = $queueSizes, "+
573                 "global CPU queue size = ${globalCpuQueue.size}, " +
574                 "global blocking queue size = ${globalBlockingQueue.size}, " +
575                 "Control State {" +
576                     "created workers= ${createdWorkers(state)}, " +
577                     "blocking tasks = ${blockingTasks(state)}, " +
578                     "CPUs acquired = ${corePoolSize - availableCpuPermits(state)}" +
579                 "}]"
580     }
581 
582     fun runSafely(task: Task) {
583         try {
584             task.run()
585         } catch (e: Throwable) {
586             val thread = Thread.currentThread()
587             thread.uncaughtExceptionHandler.uncaughtException(thread, e)
588         } finally {
589             unTrackTask()
590         }
591     }
592 
593     internal inner class Worker private constructor() : Thread() {
594         init {
595             isDaemon = true
596         }
597 
598         // guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
599         @Volatile // volatile for push/pop operation into parkedWorkersStack
600         var indexInArray = 0
601             set(index) {
602                 name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
603                 field = index
604             }
605 
606         constructor(index: Int) : this() {
607             indexInArray = index
608         }
609 
610         inline val scheduler get() = this@CoroutineScheduler
611 
612         @JvmField
613         val localQueue: WorkQueue = WorkQueue()
614 
615         /**
616          * Slot that is used to steal tasks into to avoid re-adding them
617          * to the local queue. See [trySteal]
618          */
619         private val stolenTask: ObjectRef<Task?> = ObjectRef()
620 
621         /**
622          * Worker state. **Updated only by this worker thread**.
623          * By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken.
624          * Is used locally by the worker to maintain its own invariants.
625          */
626         @JvmField
627         var state = WorkerState.DORMANT
628 
629         /**
630          * Worker control state responsible for worker claiming, parking and termination.
631          * List of states:
632          * [PARKED] -- worker is parked and can self-terminate after a termination deadline.
633          * [CLAIMED] -- worker is claimed by an external submitter.
634          * [TERMINATED] -- worker is terminated and no longer usable.
635          */
636         val workerCtl = atomic(CLAIMED)
637 
638         /**
639          * It is set to the termination deadline when started doing [park] and it reset
640          * when there is a task. It serves as protection against spurious wakeups of parkNanos.
641          */
642         private var terminationDeadline = 0L
643 
644         /**
645          * Reference to the next worker in the [parkedWorkersStack].
646          * It may be `null` if there is no next parked worker.
647          * This reference is set to [NOT_IN_STACK] when worker is physically not in stack.
648          */
649         @Volatile
650         var nextParkedWorker: Any? = NOT_IN_STACK
651 
652         /*
653          * The delay until at least one task in other worker queues will  become stealable.
654          */
655         private var minDelayUntilStealableTaskNs = 0L
656 
657         private var rngState = Random.nextInt()
658 
659         /**
660          * Tries to acquire CPU token if worker doesn't have one
661          * @return whether worker acquired (or already had) CPU token
662          */
663         private fun tryAcquireCpuPermit(): Boolean = when {
664             state == WorkerState.CPU_ACQUIRED -> true
665             this@CoroutineScheduler.tryAcquireCpuPermit() -> {
666                 state = WorkerState.CPU_ACQUIRED
667                 true
668             }
669             else -> false
670         }
671 
672         /**
673          * Releases CPU token if worker has any and changes state to [newState].
674          * Returns `true` if CPU permit was returned to the pool
675          */
676       fun tryReleaseCpu(newState: WorkerState): Boolean {
677             val previousState = state
678             val hadCpu = previousState == WorkerState.CPU_ACQUIRED
679             if (hadCpu) releaseCpuPermit()
680             if (previousState != newState) state = newState
681             return hadCpu
682         }
683 
684         override fun run() = runWorker()
685 
686         @JvmField
687         var mayHaveLocalTasks = false
688 
689         private fun runWorker() {
690             var rescanned = false
691             while (!isTerminated && state != WorkerState.TERMINATED) {
692                 val task = findTask(mayHaveLocalTasks)
693                 // Task found. Execute and repeat
694                 if (task != null) {
695                     rescanned = false
696                     minDelayUntilStealableTaskNs = 0L
697                     executeTask(task)
698                     continue
699                 } else {
700                     mayHaveLocalTasks = false
701                 }
702                 /*
703                  * No tasks were found:
704                  * 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline.
705                  *    Then its deadline is stored in [minDelayUntilStealableTask]
706                  * // '2)' can be found below
707                  *
708                  * Then just park for that duration (ditto re-scanning).
709                  * While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
710                  * excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve
711                  * it with "spinning via scans" mechanism.
712                  * NB: this short potential parking does not interfere with `tryUnpark`
713                  */
714                 if (minDelayUntilStealableTaskNs != 0L) {
715                     if (!rescanned) {
716                         rescanned = true
717                     } else {
718                         rescanned = false
719                         tryReleaseCpu(WorkerState.PARKING)
720                         interrupted()
721                         LockSupport.parkNanos(minDelayUntilStealableTaskNs)
722                         minDelayUntilStealableTaskNs = 0L
723                     }
724                     continue
725                 }
726                 /*
727                  * 2) Or no tasks available, time to park and, potentially, shut down the thread.
728                  * Add itself to the stack of parked workers, re-scans all the queues
729                  * to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks.
730                  */
731                 tryPark()
732             }
733             tryReleaseCpu(WorkerState.TERMINATED)
734         }
735 
736         /**
737          * See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details.
738          * This is a fine-tailored method for a specific use-case not expected to be used widely.
739          */
740         fun runSingleTask(): Long {
741             val stateSnapshot = state
742             val isCpuThread  = state == WorkerState.CPU_ACQUIRED
743             val task = if (isCpuThread) {
744                 findCpuTask()
745             } else {
746                 findBlockingTask()
747             }
748             if (task == null) {
749                 if (minDelayUntilStealableTaskNs == 0L) return -1L
750                 return minDelayUntilStealableTaskNs
751             }
752             runSafely(task)
753             if (!isCpuThread) decrementBlockingTasks()
754             assert { state == stateSnapshot}
755             return 0L
756         }
757 
758         fun isIo() = state == WorkerState.BLOCKING
759 
760         // Counterpart to "tryUnpark"
761         private fun tryPark() {
762             if (!inStack()) {
763                 parkedWorkersStackPush(this)
764                 return
765             }
766             workerCtl.value = PARKED // Update value once
767             /*
768              * inStack() prevents spurious wakeups, while workerCtl.value == PARKED
769              * prevents the following race:
770              *
771              * - T2 scans the queue, adds itself to the stack, goes to rescan
772              * - T2 suspends in 'workerCtl.value = PARKED' line
773              * - T1 pops T2 from the stack, claims workerCtl, suspends
774              * - T2 fails 'while (inStack())' check, goes to full rescan
775              * - T2 adds itself to the stack, parks
776              * - T1 unparks T2, bails out with success
777              * - T2 unparks and loops in 'while (inStack())'
778              */
779             while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups
780                 if (isTerminated || state == WorkerState.TERMINATED) break
781                 tryReleaseCpu(WorkerState.PARKING)
782                 interrupted() // Cleanup interruptions
783                 park()
784             }
785         }
786 
787         private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK
788 
789         private fun executeTask(task: Task) {
790             val taskMode = task.mode
791             idleReset(taskMode)
792             beforeTask(taskMode)
793             runSafely(task)
794             afterTask(taskMode)
795         }
796 
797         private fun beforeTask(taskMode: Int) {
798             if (taskMode == TASK_NON_BLOCKING) return
799             // Always notify about new work when releasing CPU-permit to execute some blocking task
800             if (tryReleaseCpu(WorkerState.BLOCKING)) {
801                 signalCpuWork()
802             }
803         }
804 
805         private fun afterTask(taskMode: Int) {
806             if (taskMode == TASK_NON_BLOCKING) return
807             decrementBlockingTasks()
808             val currentState = state
809             // Shutdown sequence of blocking dispatcher
810             if (currentState !== WorkerState.TERMINATED) {
811                 assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
812                 state = WorkerState.DORMANT
813             }
814         }
815 
816         /*
817          * Marsaglia xorshift RNG with period 2^32-1 for work stealing purposes.
818          * ThreadLocalRandom cannot be used to support Android and ThreadLocal<Random> is up to 15% slower on Ktor benchmarks
819          */
820         fun nextInt(upperBound: Int): Int {
821             var r = rngState
822             r = r xor (r shl 13)
823             r = r xor (r shr 17)
824             r = r xor (r shl 5)
825             rngState = r
826             val mask = upperBound - 1
827             // Fast path for power of two bound
828             if (mask and upperBound == 0) {
829                 return r and mask
830             }
831             return (r and Int.MAX_VALUE) % upperBound
832         }
833 
834         private fun park() {
835             // set termination deadline the first time we are here (it is reset in idleReset)
836             if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
837             // actually park
838             LockSupport.parkNanos(idleWorkerKeepAliveNs)
839             // try terminate when we are idle past termination deadline
840             // note that comparison is written like this to protect against potential nanoTime wraparound
841             if (System.nanoTime() - terminationDeadline >= 0) {
842                 terminationDeadline = 0L // if attempt to terminate worker fails we'd extend deadline again
843                 tryTerminateWorker()
844             }
845         }
846 
847         /**
848          * Stops execution of current thread and removes it from [createdWorkers].
849          */
850         private fun tryTerminateWorker() {
851             synchronized(workers) {
852                 // Make sure we're not trying race with termination of scheduler
853                 if (isTerminated) return
854                 // Someone else terminated, bail out
855                 if (createdWorkers <= corePoolSize) return
856                 /*
857                  * See tryUnpark for state reasoning.
858                  * If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
859                  */
860                 if (!workerCtl.compareAndSet(PARKED, TERMINATED)) return
861                 /*
862                  * At this point this thread is no longer considered as usable for scheduling.
863                  * We need multi-step choreography to reindex workers.
864                  *
865                  * 1) Read current worker's index and reset it to zero.
866                  */
867                 val oldIndex = indexInArray
868                 indexInArray = 0
869                 /*
870                  * Now this worker cannot become the top of parkedWorkersStack, but it can
871                  * still be at the stack top via oldIndex.
872                  *
873                  * 2) Update top of stack if it was pointing to oldIndex and make sure no
874                  *    pending push/pop operation that might have already retrieved oldIndex could complete.
875                  */
876                 parkedWorkersStackTopUpdate(this, oldIndex, 0)
877                 /*
878                  * 3) Move last worker into an index in array that was previously occupied by this worker,
879                  *    if last worker was a different one (sic!).
880                  */
881                 val lastIndex = decrementCreatedWorkers()
882                 if (lastIndex != oldIndex) {
883                     val lastWorker = workers[lastIndex]!!
884                     workers.setSynchronized(oldIndex, lastWorker)
885                     lastWorker.indexInArray = oldIndex
886                     /*
887                      * Now lastWorker is available at both indices in the array, but it can
888                      * still be at the stack top on via its lastIndex
889                      *
890                      * 4) Update top of stack lastIndex -> oldIndex and make sure no
891                      *    pending push/pop operation that might have already retrieved lastIndex could complete.
892                      */
893                     parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex)
894                 }
895                 /*
896                  * 5) It is safe to clear reference from workers array now.
897                  */
898                 workers.setSynchronized(lastIndex, null)
899             }
900             state = WorkerState.TERMINATED
901         }
902 
903         // It is invoked by this worker when it finds a task
904         private fun idleReset(mode: Int) {
905             terminationDeadline = 0L // reset deadline for termination
906             if (state == WorkerState.PARKING) {
907                 assert { mode == TASK_PROBABLY_BLOCKING }
908                 state = WorkerState.BLOCKING
909             }
910         }
911 
912         fun findTask(mayHaveLocalTasks: Boolean): Task? {
913             if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks)
914             /*
915              * If we can't acquire a CPU permit, attempt to find blocking task:
916              * * Check if our queue has one (maybe mixed in with CPU tasks)
917              * * Poll global and try steal
918              */
919             return findBlockingTask()
920         }
921 
922         // NB: ONLY for runSingleTask method
923         private fun findBlockingTask(): Task? {
924             return localQueue.pollBlocking()
925                 ?: globalBlockingQueue.removeFirstOrNull()
926                 ?: trySteal(STEAL_BLOCKING_ONLY)
927         }
928 
929         // NB: ONLY for runSingleTask method
930         private fun findCpuTask(): Task? {
931             return localQueue.pollCpu()
932                 ?: globalBlockingQueue.removeFirstOrNull()
933                 ?: trySteal(STEAL_CPU_ONLY)
934         }
935 
936         private fun findAnyTask(scanLocalQueue: Boolean): Task? {
937             /*
938              * Anti-starvation mechanism: probabilistically poll either local
939              * or global queue to ensure progress for both external and internal tasks.
940              */
941             if (scanLocalQueue) {
942                 val globalFirst = nextInt(2 * corePoolSize) == 0
943                 if (globalFirst) pollGlobalQueues()?.let { return it }
944                 localQueue.poll()?.let { return it }
945                 if (!globalFirst) pollGlobalQueues()?.let { return it }
946             } else {
947                 pollGlobalQueues()?.let { return it }
948             }
949             return trySteal(STEAL_ANY)
950         }
951 
952         private fun pollGlobalQueues(): Task? {
953             if (nextInt(2) == 0) {
954                 globalCpuQueue.removeFirstOrNull()?.let { return it }
955                 return globalBlockingQueue.removeFirstOrNull()
956             } else {
957                 globalBlockingQueue.removeFirstOrNull()?.let { return it }
958                 return globalCpuQueue.removeFirstOrNull()
959             }
960         }
961 
962         private fun trySteal(stealingMode: StealingMode): Task? {
963             val created = createdWorkers
964             // 0 to await an initialization and 1 to avoid excess stealing on single-core machines
965             if (created < 2) {
966                 return null
967             }
968 
969             var currentIndex = nextInt(created)
970             var minDelay = Long.MAX_VALUE
971             repeat(created) {
972                 ++currentIndex
973                 if (currentIndex > created) currentIndex = 1
974                 val worker = workers[currentIndex]
975                 if (worker !== null && worker !== this) {
976                     val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask)
977                     if (stealResult == TASK_STOLEN) {
978                         val result = stolenTask.element
979                         stolenTask.element = null
980                         return result
981                     } else if (stealResult > 0) {
982                         minDelay = min(minDelay, stealResult)
983                     }
984                 }
985             }
986             minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
987             return null
988         }
989     }
990 
991     enum class WorkerState {
992         /**
993          * Has CPU token and either executes [TASK_NON_BLOCKING] task or tries to find one.
994          */
995         CPU_ACQUIRED,
996 
997         /**
998          * Executing task with [TASK_PROBABLY_BLOCKING].
999          */
1000         BLOCKING,
1001 
1002         /**
1003          * Currently parked.
1004          */
1005         PARKING,
1006 
1007         /**
1008          * Tries to execute its local work and then goes to infinite sleep as no longer needed worker.
1009          */
1010         DORMANT,
1011 
1012         /**
1013          * Terminal state, will no longer be used
1014          */
1015         TERMINATED
1016     }
1017 }
1018 
1019 /**
1020  * Checks if the thread is part of a thread pool that supports coroutines.
1021  * This function is needed for integration with BlockHound.
1022  */
1023 @JvmName("isSchedulerWorker")
isSchedulerWorkernull1024 internal fun isSchedulerWorker(thread: Thread) = thread is CoroutineScheduler.Worker
1025 
1026 /**
1027  * Checks if the thread is running a CPU-bound task.
1028  * This function is needed for integration with BlockHound.
1029  */
1030 @JvmName("mayNotBlock")
1031 internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker &&
1032     thread.state == CoroutineScheduler.WorkerState.CPU_ACQUIRED
1033